浏览代码

Initial commit

Jonathan D. Storm 4 周之前
当前提交
1aa191a9ce
共有 5 个文件被更改,包括 544 次插入0 次删除
  1. 255 0
      cmd/agent/main.go
  2. 276 0
      cmd/server/main.go
  3. 5 0
      go.mod
  4. 2 0
      go.sum
  5. 6 0
      notes.md

+ 255 - 0
cmd/agent/main.go

@@ -0,0 +1,255 @@
+package main
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"log/slog"
+	"net"
+	"os"
+	"os/signal"
+	"time"
+)
+
+func main() {
+	logLevel := new(slog.LevelVar)
+
+	logHandler :=
+		slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
+			Level: logLevel,
+		})
+
+	slog.SetDefault(slog.New(logHandler))
+	logLevel.Set(slog.LevelDebug)
+
+	if len(os.Args) != 2 {
+		fmt.Fprintf(os.Stderr, "usage: %s <target>\n", os.Args[0])
+		os.Exit(1)
+	}
+	remote := os.Args[1]
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	slog.Info("Starting signal watchdog...")
+	go func() {
+		defer cancel()
+
+		sig := make(chan os.Signal, 1)
+		signal.Notify(sig, os.Interrupt, os.Kill)
+		<-sig
+		slog.Info("Caught signal; terminating...")
+	}()
+
+	for {
+		if ctx.Err() != nil {
+			break
+		}
+		slog.Info("Opening connection", "remote", remote)
+		c, err := connect(remote)
+		if err != nil {
+			slog.Error("While opening connection", "error", err)
+			sleepNice(ctx, 10*time.Second)
+			continue
+		}
+		mc := MonConn{
+			conn:    c,
+			timeout: time.Second,
+		}
+		monitor(ctx, &mc)
+		sleepNice(ctx, 100*time.Millisecond)
+	}
+}
+
+func monitor(ctx context.Context, conn *MonConn) {
+	defer func() {
+		if er := conn.Close(); er != nil {
+			slog.Warn("Error while closing connection",
+				"local", conn.Local(),
+				"remote", conn.Remote(),
+				"error", er,
+			)
+		}
+	}()
+
+	slog.Debug("Starting loop...",
+		"local", conn.Local(),
+		"remote", conn.Remote(),
+	)
+	pingPeriod := time.Second
+	lastPing := time.Now().Add(-pingPeriod)
+
+	for {
+		if ctx.Err() != nil {
+			break
+		}
+		if time.Since(lastPing) >= pingPeriod {
+			lastPing = time.Now()
+			if err := conn.Ping(); err != nil {
+				slog.Debug("During ping",
+					"remote", conn.Remote(),
+					"error", err,
+				)
+				return
+			}
+		}
+		sleepNice(ctx, 100*time.Millisecond)
+	}
+}
+
+// TODO Multiplex pings and data collection
+type MonConn struct {
+	conn net.Conn
+	timeout,
+	minrtt,
+	srtt,
+	rttstd time.Duration
+}
+
+func (c *MonConn) Local() string {
+	return c.conn.LocalAddr().String()
+}
+
+func (c *MonConn) Remote() string {
+	return c.conn.RemoteAddr().String()
+}
+
+func (c *MonConn) Close() error {
+	return c.conn.Close()
+}
+
+// TODO Calculate and retain statistics
+func (c *MonConn) Ping() error {
+	buf := [4]byte{}
+	buf[0] = 1
+	start := time.Now()
+	if err := send(c.conn, buf[:], c.timeout); err != nil {
+		return fmt.Errorf("send: %w", err)
+	}
+	if err := recv(c.conn, buf[:], c.timeout); err != nil {
+		return fmt.Errorf("receive: %w", err)
+	}
+	rtt := time.Now().Sub(start)
+	if c.minrtt == 0 {
+		c.minrtt = rtt
+		c.srtt = rtt
+		c.rttstd = rtt >> 1
+		return nil
+	}
+	absdiff := c.srtt - rtt
+	if absdiff < 0 {
+		absdiff *= -1
+	}
+	c.rttstd += (absdiff - c.rttstd) >> 2
+	c.srtt += (rtt - c.srtt) >> 3
+	c.minrtt = min(c.minrtt, rtt)
+	//if c.srtt > c.minrtt+(c.rttstd<<2) {
+	minus8Sigma := c.srtt - (c.rttstd << 3)
+	if c.minrtt < minus8Sigma {
+		c.minrtt = minus8Sigma
+	}
+	slog.Debug("Ping",
+		"remote", c.conn.RemoteAddr(),
+		"minrtt", fmt.Sprintf("%.3fms", float64(c.minrtt.Microseconds())/1000.0),
+		"rtt", fmt.Sprintf("%.3fms", float64(rtt.Microseconds())/1000.0),
+		"srtt", fmt.Sprintf("%.3fms", float64(c.srtt.Microseconds())/1000.0),
+		"rttstd", fmt.Sprintf("%.3fms", float64(c.rttstd.Microseconds())/1000.0),
+	)
+	return nil
+}
+
+func connect(remote string) (conn *net.TCPConn, err error) {
+	tcpAddr, err := net.ResolveTCPAddr("tcp4", remote)
+	if err != nil {
+		return nil, err
+	}
+	conn, err = net.DialTCP("tcp4", nil, tcpAddr)
+	if err != nil {
+		return nil, err
+	}
+	if err = conn.SetLinger(0); err != nil {
+		err = fmt.Errorf("set TCP linger time: %w", err)
+		return
+	}
+	if err = conn.SetNoDelay(true); err != nil {
+		err = fmt.Errorf("set TCP NODELAY: %w", err)
+		return
+	}
+	err = conn.SetKeepAliveConfig(net.KeepAliveConfig{
+		Enable:   true,
+		Idle:     30 * time.Second,
+		Interval: 15 * time.Second,
+		Count:    5,
+	})
+	if err != nil {
+		err = fmt.Errorf("set TCP keepalives: %w", err)
+		return
+	}
+	return
+}
+
+func recv(c net.Conn, buf []byte, timeout time.Duration) error {
+	deadline := time.Now().Add(timeout)
+	if err := c.SetReadDeadline(deadline); err != nil {
+		return fmt.Errorf("set deadline: %w", err)
+	}
+	_, err := c.Read(buf)
+
+	if errors.Is(err, io.EOF) {
+		slog.Info("Lost connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+		)
+		return fmt.Errorf("read: %w", err)
+	}
+	if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
+		slog.Debug("While reading from connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+			"error", err,
+		)
+		return fmt.Errorf("read: %w", err)
+	}
+	return err
+}
+
+func send(c net.Conn, buf []byte, timeout time.Duration) error {
+	deadline := time.Now().Add(timeout)
+	if err := c.SetWriteDeadline(deadline); err != nil {
+		return fmt.Errorf("set deadline: %w", err)
+	}
+	_, err := c.Write(buf)
+
+	if errors.Is(err, io.EOF) {
+		slog.Info("Lost connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+		)
+		return fmt.Errorf("write: %w", err)
+	}
+	if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
+		slog.Debug("While writing to connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+			"error", err,
+		)
+		return fmt.Errorf("write: %w", err)
+	}
+	return err
+}
+
+func sleepNice(ctx context.Context, t time.Duration) {
+	timer := time.NewTimer(t)
+
+	for {
+		if ctx.Err() != nil {
+			return
+		}
+		select {
+		case <-timer.C:
+			return
+		case <-time.After(100 * time.Millisecond):
+			// Throttle CPU
+		}
+	}
+}

+ 276 - 0
cmd/server/main.go

@@ -0,0 +1,276 @@
+package main
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"log/slog"
+	"net"
+	"net/netip"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+	"unsafe"
+)
+
+func main() {
+	logLevel := new(slog.LevelVar)
+
+	logHandler :=
+		slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
+			Level: logLevel,
+		})
+
+	slog.SetDefault(slog.New(logHandler))
+	logLevel.Set(slog.LevelInfo)
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	slog.Info("Starting signal watchdog...")
+	go func() {
+		sig := make(chan os.Signal, 1)
+		signal.Notify(sig, os.Interrupt, os.Kill)
+		<-sig
+		slog.Info("Caught signal; terminating...")
+		cancel()
+	}()
+
+	slog.Info("Starting connection listener on port 54321")
+	go listen(ctx, 54321)
+
+	for {
+		if ctx.Err() != nil {
+			break
+		}
+		sleepNice(ctx, 100*time.Millisecond)
+	}
+}
+
+func listen(ctx context.Context, port uint16) {
+	tcpAddr := net.TCPAddrFromAddrPort(
+		netip.AddrPortFrom(
+			netip.IPv4Unspecified(),
+			port,
+		),
+	)
+	ln, err := net.ListenTCP("tcp4", tcpAddr)
+	if err != nil {
+		panic(err)
+	}
+
+	for {
+		if ctx.Err() != nil {
+			break
+		}
+		slog.Info("Awaiting connection...")
+		conn, err := ln.AcceptTCP()
+		if err != nil {
+			slog.Error("Unable to accept connection",
+				"error", err,
+			)
+		}
+		slog.Info("Received connection",
+			"local", conn.LocalAddr(),
+			"remote", conn.RemoteAddr(),
+		)
+		go handleConnection(ctx, conn)
+	}
+}
+
+func handleConnection(
+	ctx context.Context,
+	conn *net.TCPConn,
+) {
+	defer func() {
+		if er := conn.Close(); er != nil {
+			slog.Warn("Error while closing connection",
+				"local", conn.LocalAddr(),
+				"remote", conn.RemoteAddr(),
+				"error", er,
+			)
+		}
+	}()
+
+	if err := conn.SetLinger(0); err != nil {
+		slog.Error("Could not set TCP linger time", "error", err)
+		return
+	}
+	if err := conn.SetNoDelay(true); err != nil {
+		slog.Error("Failed to enable NoDelay", "error", err)
+		return
+	}
+	err := conn.SetKeepAliveConfig(net.KeepAliveConfig{
+		Enable:   true,
+		Idle:     15 * time.Second,
+		Interval: 15 * time.Second,
+		Count:    5,
+	})
+	if err != nil {
+		slog.Error("Cannot configure TCP keeplives", "error", err)
+		return
+	}
+	/*sysConn, err := conn.SyscallConn()
+	if err != nil {
+		slog.Error("Could not obtain system-level connection", "error", err)
+		return
+	}*/
+
+	timeout := 10 * time.Second
+	//outputPeriod := 10 * time.Second
+	//lastOutput := time.Now().Add(-outputPeriod)
+	//tcpInfo := syscall.TCPInfo{}
+	buf := make([]byte, 4)
+
+	slog.Debug("Starting loop...",
+		"local", conn.LocalAddr(),
+		"remote", conn.RemoteAddr(),
+	)
+	for {
+		if ctx.Err() != nil {
+			break
+		}
+		/*slog.Debug("Retrieving TCP info",
+			"local", conn.LocalAddr(),
+			"remote", conn.RemoteAddr(),
+		)
+		err = retrieveTCPInfo(sysConn, &tcpInfo)
+		if err != nil {
+			slog.Error("Could not retrieve TCP information",
+				"local", conn.LocalAddr(),
+				"remote", conn.RemoteAddr(),
+				"error", err,
+			)
+		}
+		if tcpInfo.State == unix.BPF_TCP_CLOSE {
+			slog.Info("Lost connection",
+				"local", conn.LocalAddr(),
+				"remote", conn.RemoteAddr(),
+			)
+			return
+		}*/
+
+		// Send and receive some data.
+		if err := recv(conn, buf, timeout); err != nil {
+			slog.Debug("While reading from connection",
+				"remote", conn.RemoteAddr(),
+				"error", err,
+			)
+			return
+		}
+		if err := send(conn, buf, timeout); err != nil {
+			slog.Debug("While writing to connection",
+				"remote", conn.RemoteAddr(),
+				"error", err,
+			)
+			return
+		}
+
+		// Periodically print TCPInfo.
+		/*select {
+		case <-timer.C:
+			fmt.Printf("RTT: %0.3f ms; StdDev: %0.3f ms\n",
+				float32(tcpInfo.Rtt)/1000.0,
+				float32(tcpInfo.Rttvar)/1000.0,
+			)
+			timer.Stop()
+			timer.Reset(outputPeriod)
+		default:
+		}*/
+		sleepNice(ctx, 10*time.Millisecond)
+	}
+}
+
+func recv(c net.Conn, buf []byte, timeout time.Duration) error {
+	deadline := time.Now().Add(timeout)
+	if err := c.SetReadDeadline(deadline); err != nil {
+		return fmt.Errorf("set deadline: %w", err)
+	}
+	_, err := c.Read(buf)
+
+	if errors.Is(err, io.EOF) {
+		slog.Info("Lost connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+		)
+		return fmt.Errorf("read: %w", err)
+	}
+	if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
+		slog.Warn("While reading from connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+			"error", err,
+		)
+		return fmt.Errorf("read: %w", err)
+	}
+	return err
+}
+
+func send(c net.Conn, buf []byte, timeout time.Duration) error {
+	deadline := time.Now().Add(timeout)
+	if err := c.SetWriteDeadline(deadline); err != nil {
+		return fmt.Errorf("set deadline: %w", err)
+	}
+	_, err := c.Write(buf)
+
+	if errors.Is(err, io.EOF) {
+		slog.Info("Lost connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+		)
+		return fmt.Errorf("read: %w", err)
+	}
+	if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
+		slog.Debug("While reading from connection",
+			"local", c.LocalAddr(),
+			"remote", c.RemoteAddr(),
+			"error", err,
+		)
+		return fmt.Errorf("read: %w", err)
+	}
+	return err
+}
+
+func sleepNice(ctx context.Context, t time.Duration) {
+	timer := time.NewTimer(t)
+
+	for {
+		if ctx.Err() != nil {
+			return
+		}
+		select {
+		case <-timer.C:
+			return
+		case <-time.After(100 * time.Millisecond):
+			// Throttle CPU
+		}
+	}
+}
+
+func retrieveTCPInfo(
+	sysConn syscall.RawConn,
+	info *syscall.TCPInfo,
+) error {
+	size := unsafe.Sizeof(*info)
+
+	var errno syscall.Errno
+	err := sysConn.Control(func(fd uintptr) {
+		_, _, errno = syscall.Syscall6(
+			syscall.SYS_GETSOCKOPT,
+			fd,
+			syscall.SOL_TCP,
+			syscall.TCP_INFO,
+			uintptr(unsafe.Pointer(info)),
+			uintptr(unsafe.Pointer(&size)),
+			0,
+		)
+	})
+	if err != nil {
+		return err
+	}
+	if errno != 0 {
+		return fmt.Errorf("syscall errno %d", errno)
+	}
+	return nil
+}

+ 5 - 0
go.mod

@@ -0,0 +1,5 @@
+module network_monitor
+
+go 1.23.4
+
+require golang.org/x/sys v0.28.0

+ 2 - 0
go.sum

@@ -0,0 +1,2 @@
+golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
+golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

+ 6 - 0
notes.md

@@ -0,0 +1,6 @@
+* Write rrdcached client, using `unix:/tmp/rrdcached.sock`.
+* Store hi-res data, then quantile aggregates.
+* A probe library.
+* Fast probe generation, accurate probe measurement.
+* Interesting RRD visualizations (Gio, WebGL).
+