package main import ( "context" "errors" "fmt" "io" "log/slog" "net" "os" "os/signal" "time" ) func main() { logLevel := new(slog.LevelVar) logHandler := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ Level: logLevel, }) slog.SetDefault(slog.New(logHandler)) logLevel.Set(slog.LevelDebug) if len(os.Args) != 2 { fmt.Fprintf(os.Stderr, "usage: %s \n", os.Args[0]) os.Exit(1) } remote := os.Args[1] ctx, cancel := context.WithCancel(context.Background()) slog.Info("Starting signal watchdog...") go func() { defer cancel() sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, os.Kill) <-sig slog.Info("Caught signal; terminating...") }() for { if ctx.Err() != nil { break } slog.Info("Opening connection", "remote", remote) c, err := connect(remote) if err != nil { slog.Error("While opening connection", "error", err) sleepNice(ctx, 10*time.Second) continue } mc := MonConn{ conn: c, timeout: time.Second, } monitor(ctx, &mc) sleepNice(ctx, 100*time.Millisecond) } } func monitor(ctx context.Context, conn *MonConn) { defer func() { if er := conn.Close(); er != nil { slog.Warn("Error while closing connection", "local", conn.Local(), "remote", conn.Remote(), "error", er, ) } }() slog.Debug("Starting loop...", "local", conn.Local(), "remote", conn.Remote(), ) pingPeriod := time.Second lastPing := time.Now().Add(-pingPeriod) for { if ctx.Err() != nil { break } if time.Since(lastPing) >= pingPeriod { lastPing = time.Now() if err := conn.Ping(); err != nil { slog.Debug("During ping", "remote", conn.Remote(), "error", err, ) return } } sleepNice(ctx, 100*time.Millisecond) } } // TODO Multiplex pings and data collection type MonConn struct { conn net.Conn timeout, minrtt, srtt, rttstd time.Duration } func (c *MonConn) Local() string { return c.conn.LocalAddr().String() } func (c *MonConn) Remote() string { return c.conn.RemoteAddr().String() } func (c *MonConn) Close() error { return c.conn.Close() } // TODO Calculate and retain statistics func (c *MonConn) Ping() error { buf := [4]byte{} buf[0] = 1 start := time.Now() if err := send(c.conn, buf[:], c.timeout); err != nil { return fmt.Errorf("send: %w", err) } if err := recv(c.conn, buf[:], c.timeout); err != nil { return fmt.Errorf("receive: %w", err) } rtt := time.Now().Sub(start) if c.minrtt == 0 { c.minrtt = rtt c.srtt = rtt c.rttstd = rtt >> 1 return nil } absdiff := c.srtt - rtt if absdiff < 0 { absdiff *= -1 } c.rttstd += (absdiff - c.rttstd) >> 2 c.srtt += (rtt - c.srtt) >> 3 c.minrtt = min(c.minrtt, rtt) //if c.srtt > c.minrtt+(c.rttstd<<2) { minus8Sigma := c.srtt - (c.rttstd << 3) if c.minrtt < minus8Sigma { c.minrtt = minus8Sigma } slog.Debug("Ping", "remote", c.conn.RemoteAddr(), "minrtt", fmt.Sprintf("%.3fms", float64(c.minrtt.Microseconds())/1000.0), "rtt", fmt.Sprintf("%.3fms", float64(rtt.Microseconds())/1000.0), "srtt", fmt.Sprintf("%.3fms", float64(c.srtt.Microseconds())/1000.0), "rttstd", fmt.Sprintf("%.3fms", float64(c.rttstd.Microseconds())/1000.0), ) return nil } func connect(remote string) (conn *net.TCPConn, err error) { tcpAddr, err := net.ResolveTCPAddr("tcp4", remote) if err != nil { return nil, err } conn, err = net.DialTCP("tcp4", nil, tcpAddr) if err != nil { return nil, err } if err = conn.SetLinger(0); err != nil { err = fmt.Errorf("set TCP linger time: %w", err) return } if err = conn.SetNoDelay(true); err != nil { err = fmt.Errorf("set TCP NODELAY: %w", err) return } err = conn.SetKeepAliveConfig(net.KeepAliveConfig{ Enable: true, Idle: 30 * time.Second, Interval: 15 * time.Second, Count: 5, }) if err != nil { err = fmt.Errorf("set TCP keepalives: %w", err) return } return } func recv(c net.Conn, buf []byte, timeout time.Duration) error { deadline := time.Now().Add(timeout) if err := c.SetReadDeadline(deadline); err != nil { return fmt.Errorf("set deadline: %w", err) } _, err := c.Read(buf) if errors.Is(err, io.EOF) { slog.Info("Lost connection", "local", c.LocalAddr(), "remote", c.RemoteAddr(), ) return fmt.Errorf("read: %w", err) } if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) { slog.Debug("While reading from connection", "local", c.LocalAddr(), "remote", c.RemoteAddr(), "error", err, ) return fmt.Errorf("read: %w", err) } return err } func send(c net.Conn, buf []byte, timeout time.Duration) error { deadline := time.Now().Add(timeout) if err := c.SetWriteDeadline(deadline); err != nil { return fmt.Errorf("set deadline: %w", err) } _, err := c.Write(buf) if errors.Is(err, io.EOF) { slog.Info("Lost connection", "local", c.LocalAddr(), "remote", c.RemoteAddr(), ) return fmt.Errorf("write: %w", err) } if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) { slog.Debug("While writing to connection", "local", c.LocalAddr(), "remote", c.RemoteAddr(), "error", err, ) return fmt.Errorf("write: %w", err) } return err } func sleepNice(ctx context.Context, t time.Duration) { timer := time.NewTimer(t) for { if ctx.Err() != nil { return } select { case <-timer.C: return case <-time.After(100 * time.Millisecond): // Throttle CPU } } }