toil.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. package toil
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/tls"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. dp "idio.link/go/depager/v2"
  17. )
  18. const MaxAuthTokenMinutes = 60
  19. const MaxAuthTokenRefreshes = 3
  20. const MinutesBeforeRefreshDeadline = 3
  21. func NewAuthTokenExpiry(time.Time) time.Time {
  22. return time.Now().Add(MaxAuthTokenMinutes * time.Minute)
  23. }
  24. func withinAuthTokenRefreshWindow(expiry time.Time) bool {
  25. minUntilExpiry := time.Until(expiry).Minutes()
  26. return 0 < minUntilExpiry &&
  27. minUntilExpiry <= MinutesBeforeRefreshDeadline
  28. }
  29. func authTokenRefreshDeadlineHasPassed(
  30. expiry time.Time,
  31. ) bool {
  32. return time.Until(expiry) <= time.Duration(0)
  33. }
  34. func maxAuthTokenRefreshesHasBeenReached(cnt int) bool {
  35. return cnt >= MaxAuthTokenRefreshes
  36. }
  37. func GenericHTTPClient() *http.Client {
  38. t := &http.Transport{
  39. TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
  40. TLSHandshakeTimeout: 15 * time.Second,
  41. IdleConnTimeout: 30 * time.Minute,
  42. ResponseHeaderTimeout: 180 * time.Second,
  43. ExpectContinueTimeout: 10 * time.Second,
  44. }
  45. return &http.Client{
  46. Transport: t,
  47. Timeout: 30 * time.Minute,
  48. }
  49. }
  50. // TODO Various headers may need to be sent, contingent on
  51. // the server API. Accommodate these.
  52. func NewClient(
  53. ctx context.Context,
  54. baseURI string,
  55. client *http.Client,
  56. ) (*Client, error) {
  57. base, err := url.Parse(baseURI)
  58. if err != nil {
  59. return nil, fmt.Errorf("new client: parse base uri: %v", err)
  60. }
  61. return &Client{
  62. baseURI: base,
  63. ctx: ctx,
  64. httpClient: client,
  65. header: make(http.Header),
  66. retries: 3,
  67. retryWait: 10 * time.Second,
  68. // See https://developer.webex.com/docs/basics#rate-limiting
  69. remaining: 300,
  70. }, nil
  71. }
  72. // TODO Implement various credential models, i.e. access
  73. // tokens, logins, refresh tokens, OAuth2, etc.
  74. type Client struct {
  75. ctx context.Context
  76. httpClient *http.Client
  77. baseURI *url.URL
  78. header http.Header
  79. retries int
  80. retryWait time.Duration
  81. remaining int
  82. armed bool
  83. armedLock *sync.RWMutex
  84. }
  85. // Arm the client for writing. Arm panics if the client is
  86. // already armed.
  87. func (c *Client) Arm() (armed bool) {
  88. if c.armedLock.TryLock() {
  89. c.armed = true
  90. armed = true
  91. c.armedLock.Unlock()
  92. }
  93. return armed
  94. }
  95. func (c *Client) Disarm() bool {
  96. return c.armedLock.TryLock()
  97. }
  98. func (c *Client) WithHeader(key, value string) {
  99. c.header.Add(key, value)
  100. }
  101. func (c *Client) WithRetries(retries int) {
  102. if retries < 0 {
  103. retries = 0
  104. }
  105. c.retries = retries
  106. }
  107. func (c *Client) WithRetryWait(wait time.Duration) {
  108. c.retryWait = wait
  109. }
  110. func (c *Client) expandResource(
  111. resource *url.URL,
  112. ) *url.URL {
  113. ex, err :=
  114. url.JoinPath(c.baseURI.String(), resource.EscapedPath())
  115. if err != nil {
  116. panic(fmt.Sprintf("BUG: webex client: expand resource: failed to join path: '%s' + '%s': %v", c.baseURI, resource.RequestURI(), err))
  117. }
  118. next, err := url.Parse(ex)
  119. if err != nil {
  120. panic(fmt.Sprintf("BUG: webex client: expand resource: failed to parse expanded resource '%s': %v", ex, err))
  121. }
  122. return next
  123. }
  124. // Retry when server rate limits are exceeded.
  125. // See https://developer.webex.com/docs/basics#rate-limiting
  126. func (c *Client) sendRequestWithRetry(
  127. req *http.Request,
  128. ) (resp *http.Response, err error) {
  129. for i := 0; i < c.retries+1; i++ {
  130. resp, err = c.httpClient.Do(req)
  131. if err != nil {
  132. err = fmt.Errorf("send request with retry: %w", err)
  133. return
  134. }
  135. if resp.StatusCode != http.StatusTooManyRequests {
  136. break
  137. }
  138. // TODO Extract the following. It is a matter of policy
  139. // how rate-limiting detection is handled.
  140. // TODO Offer an async option in future using
  141. // blockReqsBefore, but this is acceptable, for now.
  142. log.Printf("info: request was rate limited by server: %s %#v", req.Method, req.RequestURI)
  143. var wait time.Duration
  144. wait, err =
  145. time.ParseDuration(resp.Header.Get("Retry-After") + "s")
  146. if err != nil {
  147. log.Printf("Warn: request was rate limited by server, but parsing the expected Retry-After header failed: %v", err)
  148. time.Sleep(c.retryWait)
  149. return
  150. }
  151. time.Sleep(wait)
  152. }
  153. if resp == nil {
  154. err = fmt.Errorf("send request with retry: unknown failure")
  155. return
  156. }
  157. return
  158. }
  159. func (c *Client) request(
  160. method string,
  161. resource *url.URL,
  162. body io.Reader,
  163. ) (respHead http.Header, respBody []byte, err error) {
  164. req, err := http.NewRequestWithContext(
  165. c.ctx,
  166. method,
  167. resource.String(),
  168. body,
  169. )
  170. if err != nil {
  171. err = fmt.Errorf("request %v: %w", resource, err)
  172. return
  173. }
  174. req.Header = c.header
  175. var reqBody []byte
  176. if req.Body != nil {
  177. reqBody, err = io.ReadAll(req.Body)
  178. if err != nil {
  179. err = fmt.Errorf("request %v: failed to read request body: %v", uri, err)
  180. return
  181. }
  182. }
  183. // TODO How to deal with this mess?
  184. /* From https://www.logicmonitor.com/support/rest-api-authentication:
  185. > When LogicMonitor servers receive an API request, they
  186. > ensure the specified timestamp is within 30 minutes of
  187. > the current time.
  188. In the worst case, the below logic will fail long before
  189. the timestamp disparity reaches 30 minutes.
  190. */
  191. timestamp := strconv.FormatInt(time.Now().UnixMilli(), 10)
  192. auth := generateAuthorization(
  193. timestamp,
  194. req.Method,
  195. resource,
  196. reqBody,
  197. c.credential,
  198. )
  199. req.Header.Set("Authorization", auth)
  200. resp, err := c.sendRequestWithRetry(req)
  201. if err != nil {
  202. err = fmt.Errorf("request '%s': %+v: %w", req.URL, req, err)
  203. return
  204. }
  205. defer resp.Body.Close()
  206. respHead = resp.Header
  207. respBody, err = io.ReadAll(resp.Body)
  208. if err != nil {
  209. err = fmt.Errorf("request %v: failed to read response body: %w", resource, err)
  210. return
  211. }
  212. // TODO Make this work as needed.
  213. err = c.ratelimitRequests(resp)
  214. if err != nil {
  215. err = fmt.Errorf("request %v: %w", resource, err)
  216. return
  217. }
  218. // Success response
  219. if 200 <= resp.StatusCode && resp.StatusCode <= 299 {
  220. return
  221. }
  222. err = fmt.Errorf("request %v: %s", resource, http.StatusText(resp.StatusCode))
  223. return
  224. }
  225. // TODO Implement the other verbs, as needed.
  226. func (c *Client) get(
  227. resource *url.URL,
  228. ) (http.Header, []byte, error) {
  229. return c.request(http.MethodGet, resource, nil)
  230. }
  231. func (c *Client) post(
  232. resource *url.URL,
  233. body io.Reader,
  234. ) (http.Header, []byte, error) {
  235. return c.request(http.MethodPost, resource, body)
  236. }
  237. func (f *FMC) postGoThing(
  238. url string,
  239. goThing any,
  240. ) (respBody []byte, err error) {
  241. reqBody, err := json.Marshal(goThing)
  242. if err != nil {
  243. err = fmt.Errorf("failed to marshal json: %v: '%#v'", err, goThing)
  244. return
  245. }
  246. resp, err := f.post(url, bytes.NewReader(reqBody))
  247. if err != nil {
  248. err = fmt.Errorf("failed to post: %v: '%s'", err, reqBody)
  249. return
  250. }
  251. defer resp.Body.Close()
  252. respBody, err = io.ReadAll(resp.Body)
  253. if err != nil {
  254. err = fmt.Errorf("failed to read response body: %v", err)
  255. return
  256. }
  257. if err = checkForCiscoError(resp, respBody); err != nil {
  258. log.Printf("DEBUG: req body: %s", string(reqBody))
  259. return
  260. }
  261. return respBody, nil
  262. }
  263. func (f *FMC) putGoThing(
  264. url string,
  265. goThing any,
  266. ) (respBody []byte, err error) {
  267. reqBody, err := json.Marshal(goThing)
  268. if err != nil {
  269. err = fmt.Errorf("failed to marshal json: %v: '%#v'", err, goThing)
  270. return
  271. }
  272. resp, err := f.put(url, bytes.NewReader(reqBody))
  273. if err != nil {
  274. err = fmt.Errorf("failed to put: %v: '%s'", err, reqBody)
  275. return
  276. }
  277. defer resp.Body.Close()
  278. respBody, err = io.ReadAll(resp.Body)
  279. if err != nil {
  280. err = fmt.Errorf("failed to read response body: %v", err)
  281. return
  282. }
  283. if err = checkForCiscoError(resp, respBody); err != nil {
  284. log.Printf("DEBUG: req body: %s", string(reqBody))
  285. return
  286. }
  287. return respBody, nil
  288. }
  289. func newSubclient[T any](
  290. c *Client,
  291. resource *url.URL,
  292. ) *Subclient[T] {
  293. expanded := c.expandResource(resource)
  294. return &Subclient[T]{
  295. Client: *c,
  296. uri: expanded,
  297. }
  298. }
  299. type Subclient[T any] struct {
  300. Client
  301. uri *url.URL
  302. blockReqsBefore time.Time
  303. }
  304. /*
  305. Throttle requests. See
  306. * https://developer.webex.com/docs/basics#rate-limiting
  307. We postpone subsequent requests by at least win/rem
  308. */
  309. func (c *Subclient[T]) ratelimitRequests(
  310. _ http.Header,
  311. _ []byte,
  312. ) error {
  313. /*
  314. What I want is to cache remaining request counts for each
  315. endpoint, regardless of parameters. Trouble is, we inject
  316. parameters prior to calls to request(), which makes the
  317. resources passed to request() inequivalent.
  318. To push parameter injection further down the call stack,
  319. we need a reliable way to associate parameters with
  320. their insertion points. ~~Injection can then be deferred
  321. until resource expansion, which actually makes some
  322. sense.~~
  323. Nope, not far down enough. We have to push all the way
  324. down through the subclient. Yuck.
  325. */
  326. window := 60 * time.Second // window is not sent in headers
  327. remaining := c.Client.remaining
  328. // TODO Also handle this case.
  329. // Throttle requests
  330. // See https://www.logicmonitor.com/support/rest-api-developers-guide/overview/rate-limiting
  331. remStr := resp.Header.Get("X-Rate-Limit-Remaining")
  332. winStr := resp.Header.Get("X-Rate-Limit-Window")
  333. remaining, err := strconv.ParseInt(remStr, 10, 64)
  334. if remStr != "" && err != nil {
  335. err = fmt.Errorf("request %v: failed to parse header X-Rate-Limit-Remaining: %v", uri, err)
  336. return
  337. }
  338. window, err := time.ParseDuration(winStr + "s")
  339. if winStr != "" && err != nil {
  340. err = fmt.Errorf("request %v: failed to parse header X-Rate-Limit-Window: %v", uri, err)
  341. return
  342. }
  343. if remaining != 0 && window != 0 {
  344. // We postpone subsequent requests by at least win/rem
  345. msDelay := int64(window/time.Millisecond) / remaining
  346. delay := time.Duration(msDelay) * time.Millisecond
  347. c.blockReqsBefore = time.Now().Add(delay)
  348. }
  349. return nil
  350. }
  351. func (c *Subclient[T]) NextPage(
  352. offset uint64,
  353. ) (page dp.Page[T], cnt uint64, err error) {
  354. // TODO Is it possible to pool these for each T? Is it
  355. // worth the memory?
  356. aggr := make([]T, 0, 32)
  357. head, body, err := c.get(c.uri)
  358. if err != nil {
  359. err = fmt.Errorf("webex client: next page: %w", err)
  360. return
  361. }
  362. err = json.Unmarshal(body, &aggr)
  363. if err != nil {
  364. err = fmt.Errorf("webex client: next page: unmarshal response: %w", err)
  365. return
  366. }
  367. // Proactively rate-limit our own requests to avoid
  368. // triggering server rate limiting.
  369. err = c.ratelimitRequests(head, body)
  370. if err != nil {
  371. err = fmt.Errorf("request %v: %w", resource, err)
  372. return
  373. }
  374. if next := getLinkNext(head.Get("link")); next != "" {
  375. c.uri, err = url.Parse(next)
  376. if err != nil {
  377. err = fmt.Errorf("webex client: next page: unable to parse next link '%s': %w", next, err)
  378. return
  379. }
  380. }
  381. page = Aggregate[T](aggr)
  382. return
  383. }
  384. func getLinkNext(link string) (next string) {
  385. // This could be made faster but doesn't seem necessary.
  386. // See https://www.w3.org/wiki/LinkHeader for details.
  387. // basic format: `<meta.rdf>; rel=meta, ...`
  388. before, _, found := strings.Cut(link, `rel="next"`)
  389. if !found {
  390. return
  391. }
  392. idx := strings.LastIndex(before, "<")
  393. if idx == -1 {
  394. return
  395. }
  396. next = before[idx+1:]
  397. parts := strings.Split(next, ">")
  398. if len(parts) != 2 {
  399. return
  400. }
  401. next = parts[0]
  402. return
  403. }
  404. // User side
  405. type Aggregate[T any] []T
  406. func (a Aggregate[T]) Elems() []T {
  407. return a
  408. }
  409. type HuntGroupPrimaryAvailablePhoneNumber struct {
  410. PathWithNamespace string `json:"path_with_namespace"`
  411. RepoURL string `json:"http_url_to_repo"`
  412. WebURL string `json:"web_url"`
  413. DefBranch string `json:"default_branch"`
  414. }
  415. func (r *HuntGroupPrimaryAvailablePhoneNumber) Path() string {
  416. return r.PathWithNamespace
  417. }
  418. func (r *HuntGroupPrimaryAvailablePhoneNumber) GitURI() string {
  419. return r.RepoURL
  420. }
  421. func (r *HuntGroupPrimaryAvailablePhoneNumber) WebURI() string {
  422. return r.WebURL
  423. }
  424. func (r *HuntGroupPrimaryAvailablePhoneNumber) DefaultBranch() string {
  425. return r.DefBranch
  426. }
  427. func GetHuntGroupPrimaryAvailablePhoneNumbers[T *HuntGroupPrimaryAvailablePhoneNumber](
  428. c *Client,
  429. locationId,
  430. orgId string,
  431. ) dp.Pager[T] {
  432. resStrFmt := "/telephony/config/locations/%s/huntGroups/availableNumbers?orgId=%s"
  433. resStr := fmt.Sprintf(
  434. resStrFmt,
  435. url.PathEscape(locationId),
  436. url.QueryEscape(orgId),
  437. )
  438. /*
  439. See
  440. * https://developer.webex.com/docs/basics#pagination
  441. * https://developer.webex.com/docs/api/v1/features-hunt-group/get-hunt-group-primary-available-phone-numbers
  442. */
  443. maxPageSize := 8000
  444. resource, err := url.Parse(resStr)
  445. if err != nil {
  446. panic(fmt.Sprintf("BUG: error while parsing resource path: %v", err))
  447. }
  448. return dp.NewPager[T](
  449. newSubclient[T](c, resource),
  450. uint64(maxPageSize),
  451. )
  452. }
  453. type DetailedCallRecord struct {
  454. }
  455. func GetDetailedCallHistory[T *DetailedCallRecord](
  456. c *Client,
  457. ) dp.Pager[T] {
  458. // TODO Need to move expandResource out of subclient into
  459. // API functions, as having a different base URI per API
  460. // call invalidates our assumptions.
  461. resource, err := url.Parse("https://analytics.webexapis.com/v1/cdr_feed")
  462. }