main.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log/slog"
  8. "net"
  9. "os"
  10. "os/signal"
  11. "time"
  12. )
  13. func main() {
  14. logLevel := new(slog.LevelVar)
  15. logHandler :=
  16. slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
  17. Level: logLevel,
  18. })
  19. slog.SetDefault(slog.New(logHandler))
  20. logLevel.Set(slog.LevelDebug)
  21. if len(os.Args) != 2 {
  22. fmt.Fprintf(os.Stderr, "usage: %s <target>\n", os.Args[0])
  23. os.Exit(1)
  24. }
  25. remote := os.Args[1]
  26. ctx, cancel := context.WithCancel(context.Background())
  27. slog.Info("Starting signal watchdog...")
  28. go func() {
  29. defer cancel()
  30. sig := make(chan os.Signal, 1)
  31. signal.Notify(sig, os.Interrupt, os.Kill)
  32. <-sig
  33. slog.Info("Caught signal; terminating...")
  34. }()
  35. for {
  36. if ctx.Err() != nil {
  37. break
  38. }
  39. slog.Info("Opening connection", "remote", remote)
  40. c, err := connect(remote)
  41. if err != nil {
  42. slog.Error("While opening connection", "error", err)
  43. sleepNice(ctx, 10*time.Second)
  44. continue
  45. }
  46. mc := MonConn{
  47. conn: c,
  48. timeout: time.Second,
  49. }
  50. monitor(ctx, &mc)
  51. sleepNice(ctx, 100*time.Millisecond)
  52. }
  53. }
  54. func monitor(ctx context.Context, conn *MonConn) {
  55. defer func() {
  56. if er := conn.Close(); er != nil {
  57. slog.Warn("Error while closing connection",
  58. "local", conn.Local(),
  59. "remote", conn.Remote(),
  60. "error", er,
  61. )
  62. }
  63. }()
  64. slog.Debug("Starting loop...",
  65. "local", conn.Local(),
  66. "remote", conn.Remote(),
  67. )
  68. pingPeriod := time.Second
  69. lastPing := time.Now().Add(-pingPeriod)
  70. for {
  71. if ctx.Err() != nil {
  72. break
  73. }
  74. if time.Since(lastPing) >= pingPeriod {
  75. lastPing = time.Now()
  76. if err := conn.Ping(); err != nil {
  77. slog.Debug("During ping",
  78. "remote", conn.Remote(),
  79. "error", err,
  80. )
  81. return
  82. }
  83. }
  84. sleepNice(ctx, 100*time.Millisecond)
  85. }
  86. }
  87. // TODO Multiplex pings and data collection
  88. type MonConn struct {
  89. conn net.Conn
  90. timeout,
  91. minrtt,
  92. srtt,
  93. rttstd time.Duration
  94. }
  95. func (c *MonConn) Local() string {
  96. return c.conn.LocalAddr().String()
  97. }
  98. func (c *MonConn) Remote() string {
  99. return c.conn.RemoteAddr().String()
  100. }
  101. func (c *MonConn) Close() error {
  102. return c.conn.Close()
  103. }
  104. // TODO Calculate and retain statistics
  105. func (c *MonConn) Ping() error {
  106. buf := [4]byte{}
  107. buf[0] = 1
  108. start := time.Now()
  109. if err := send(c.conn, buf[:], c.timeout); err != nil {
  110. return fmt.Errorf("send: %w", err)
  111. }
  112. if err := recv(c.conn, buf[:], c.timeout); err != nil {
  113. return fmt.Errorf("receive: %w", err)
  114. }
  115. rtt := time.Now().Sub(start)
  116. if c.minrtt == 0 {
  117. c.minrtt = rtt
  118. c.srtt = rtt
  119. c.rttstd = rtt >> 1
  120. return nil
  121. }
  122. absdiff := c.srtt - rtt
  123. if absdiff < 0 {
  124. absdiff *= -1
  125. }
  126. c.rttstd += (absdiff - c.rttstd) >> 2
  127. c.srtt += (rtt - c.srtt) >> 3
  128. c.minrtt = min(c.minrtt, rtt)
  129. //if c.srtt > c.minrtt+(c.rttstd<<2) {
  130. minus8Sigma := c.srtt - (c.rttstd << 3)
  131. if c.minrtt < minus8Sigma {
  132. c.minrtt = minus8Sigma
  133. }
  134. slog.Debug("Ping",
  135. "remote", c.conn.RemoteAddr(),
  136. "minrtt", fmt.Sprintf("%.3fms", float64(c.minrtt.Microseconds())/1000.0),
  137. "rtt", fmt.Sprintf("%.3fms", float64(rtt.Microseconds())/1000.0),
  138. "srtt", fmt.Sprintf("%.3fms", float64(c.srtt.Microseconds())/1000.0),
  139. "rttstd", fmt.Sprintf("%.3fms", float64(c.rttstd.Microseconds())/1000.0),
  140. )
  141. return nil
  142. }
  143. func connect(remote string) (conn *net.TCPConn, err error) {
  144. tcpAddr, err := net.ResolveTCPAddr("tcp4", remote)
  145. if err != nil {
  146. return nil, err
  147. }
  148. conn, err = net.DialTCP("tcp4", nil, tcpAddr)
  149. if err != nil {
  150. return nil, err
  151. }
  152. if err = conn.SetLinger(0); err != nil {
  153. err = fmt.Errorf("set TCP linger time: %w", err)
  154. return
  155. }
  156. if err = conn.SetNoDelay(true); err != nil {
  157. err = fmt.Errorf("set TCP NODELAY: %w", err)
  158. return
  159. }
  160. err = conn.SetKeepAliveConfig(net.KeepAliveConfig{
  161. Enable: true,
  162. Idle: 30 * time.Second,
  163. Interval: 15 * time.Second,
  164. Count: 5,
  165. })
  166. if err != nil {
  167. err = fmt.Errorf("set TCP keepalives: %w", err)
  168. return
  169. }
  170. return
  171. }
  172. func recv(c net.Conn, buf []byte, timeout time.Duration) error {
  173. deadline := time.Now().Add(timeout)
  174. if err := c.SetReadDeadline(deadline); err != nil {
  175. return fmt.Errorf("set deadline: %w", err)
  176. }
  177. _, err := c.Read(buf)
  178. if errors.Is(err, io.EOF) {
  179. slog.Info("Lost connection",
  180. "local", c.LocalAddr(),
  181. "remote", c.RemoteAddr(),
  182. )
  183. return fmt.Errorf("read: %w", err)
  184. }
  185. if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
  186. slog.Debug("While reading from connection",
  187. "local", c.LocalAddr(),
  188. "remote", c.RemoteAddr(),
  189. "error", err,
  190. )
  191. return fmt.Errorf("read: %w", err)
  192. }
  193. return err
  194. }
  195. func send(c net.Conn, buf []byte, timeout time.Duration) error {
  196. deadline := time.Now().Add(timeout)
  197. if err := c.SetWriteDeadline(deadline); err != nil {
  198. return fmt.Errorf("set deadline: %w", err)
  199. }
  200. _, err := c.Write(buf)
  201. if errors.Is(err, io.EOF) {
  202. slog.Info("Lost connection",
  203. "local", c.LocalAddr(),
  204. "remote", c.RemoteAddr(),
  205. )
  206. return fmt.Errorf("write: %w", err)
  207. }
  208. if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
  209. slog.Debug("While writing to connection",
  210. "local", c.LocalAddr(),
  211. "remote", c.RemoteAddr(),
  212. "error", err,
  213. )
  214. return fmt.Errorf("write: %w", err)
  215. }
  216. return err
  217. }
  218. func sleepNice(ctx context.Context, t time.Duration) {
  219. timer := time.NewTimer(t)
  220. for {
  221. if ctx.Err() != nil {
  222. return
  223. }
  224. select {
  225. case <-timer.C:
  226. return
  227. case <-time.After(100 * time.Millisecond):
  228. // Throttle CPU
  229. }
  230. }
  231. }