123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- package toil
- import (
- "bytes"
- "context"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "sync"
- "time"
- dp "idio.link/go/depager/v2"
- )
- const MaxAuthTokenMinutes = 60
- const MaxAuthTokenRefreshes = 3
- const MinutesBeforeRefreshDeadline = 3
- func NewAuthTokenExpiry(time.Time) time.Time {
- return time.Now().Add(MaxAuthTokenMinutes * time.Minute)
- }
- func withinAuthTokenRefreshWindow(expiry time.Time) bool {
- minUntilExpiry := time.Until(expiry).Minutes()
- return 0 < minUntilExpiry &&
- minUntilExpiry <= MinutesBeforeRefreshDeadline
- }
- func authTokenRefreshDeadlineHasPassed(
- expiry time.Time,
- ) bool {
- return time.Until(expiry) <= time.Duration(0)
- }
- func maxAuthTokenRefreshesHasBeenReached(cnt int) bool {
- return cnt >= MaxAuthTokenRefreshes
- }
- func GenericHTTPClient() *http.Client {
- t := &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
- TLSHandshakeTimeout: 15 * time.Second,
- IdleConnTimeout: 30 * time.Minute,
- ResponseHeaderTimeout: 180 * time.Second,
- ExpectContinueTimeout: 10 * time.Second,
- }
- return &http.Client{
- Transport: t,
- Timeout: 30 * time.Minute,
- }
- }
- // TODO Various headers may need to be sent, contingent on
- // the server API. Accommodate these.
- func NewClient(
- ctx context.Context,
- baseURI string,
- client *http.Client,
- ) (*Client, error) {
- base, err := url.Parse(baseURI)
- if err != nil {
- return nil, fmt.Errorf("new client: parse base uri: %v", err)
- }
- return &Client{
- baseURI: base,
- ctx: ctx,
- httpClient: client,
- header: make(http.Header),
- retries: 3,
- retryWait: 10 * time.Second,
- // See https://developer.webex.com/docs/basics#rate-limiting
- remaining: 300,
- }, nil
- }
- // TODO Implement various credential models, i.e. access
- // tokens, logins, refresh tokens, OAuth2, etc.
- type Client struct {
- ctx context.Context
- httpClient *http.Client
- baseURI *url.URL
- header http.Header
- retries int
- retryWait time.Duration
- remaining int
- armed bool
- armedLock *sync.RWMutex
- }
- // Arm the client for writing. Arm panics if the client is
- // already armed.
- func (c *Client) Arm() (armed bool) {
- if c.armedLock.TryLock() {
- c.armed = true
- armed = true
- c.armedLock.Unlock()
- }
- return armed
- }
- func (c *Client) Disarm() bool {
- return c.armedLock.TryLock()
- }
- func (c *Client) WithHeader(key, value string) {
- c.header.Add(key, value)
- }
- func (c *Client) WithRetries(retries int) {
- if retries < 0 {
- retries = 0
- }
- c.retries = retries
- }
- func (c *Client) WithRetryWait(wait time.Duration) {
- c.retryWait = wait
- }
- func (c *Client) expandResource(
- resource *url.URL,
- ) *url.URL {
- ex, err :=
- url.JoinPath(c.baseURI.String(), resource.EscapedPath())
- if err != nil {
- panic(fmt.Sprintf("BUG: webex client: expand resource: failed to join path: '%s' + '%s': %v", c.baseURI, resource.RequestURI(), err))
- }
- next, err := url.Parse(ex)
- if err != nil {
- panic(fmt.Sprintf("BUG: webex client: expand resource: failed to parse expanded resource '%s': %v", ex, err))
- }
- return next
- }
- // Retry when server rate limits are exceeded.
- // See https://developer.webex.com/docs/basics#rate-limiting
- func (c *Client) sendRequestWithRetry(
- req *http.Request,
- ) (resp *http.Response, err error) {
- for i := 0; i < c.retries+1; i++ {
- resp, err = c.httpClient.Do(req)
- if err != nil {
- err = fmt.Errorf("send request with retry: %w", err)
- return
- }
- if resp.StatusCode != http.StatusTooManyRequests {
- break
- }
- // TODO Extract the following. It is a matter of policy
- // how rate-limiting detection is handled.
- // TODO Offer an async option in future using
- // blockReqsBefore, but this is acceptable, for now.
- log.Printf("info: request was rate limited by server: %s %#v", req.Method, req.RequestURI)
- var wait time.Duration
- wait, err =
- time.ParseDuration(resp.Header.Get("Retry-After") + "s")
- if err != nil {
- log.Printf("Warn: request was rate limited by server, but parsing the expected Retry-After header failed: %v", err)
- time.Sleep(c.retryWait)
- return
- }
- time.Sleep(wait)
- }
- if resp == nil {
- err = fmt.Errorf("send request with retry: unknown failure")
- return
- }
- return
- }
- func (c *Client) request(
- method string,
- resource *url.URL,
- body io.Reader,
- ) (respHead http.Header, respBody []byte, err error) {
- req, err := http.NewRequestWithContext(
- c.ctx,
- method,
- resource.String(),
- body,
- )
- if err != nil {
- err = fmt.Errorf("request %v: %w", resource, err)
- return
- }
- req.Header = c.header
- var reqBody []byte
- if req.Body != nil {
- reqBody, err = io.ReadAll(req.Body)
- if err != nil {
- err = fmt.Errorf("request %v: failed to read request body: %v", uri, err)
- return
- }
- }
- // TODO How to deal with this mess?
- /* From https://www.logicmonitor.com/support/rest-api-authentication:
- > When LogicMonitor servers receive an API request, they
- > ensure the specified timestamp is within 30 minutes of
- > the current time.
- In the worst case, the below logic will fail long before
- the timestamp disparity reaches 30 minutes.
- */
- timestamp := strconv.FormatInt(time.Now().UnixMilli(), 10)
- auth := generateAuthorization(
- timestamp,
- req.Method,
- resource,
- reqBody,
- c.credential,
- )
- req.Header.Set("Authorization", auth)
- resp, err := c.sendRequestWithRetry(req)
- if err != nil {
- err = fmt.Errorf("request '%s': %+v: %w", req.URL, req, err)
- return
- }
- defer resp.Body.Close()
- respHead = resp.Header
- respBody, err = io.ReadAll(resp.Body)
- if err != nil {
- err = fmt.Errorf("request %v: failed to read response body: %w", resource, err)
- return
- }
- // TODO Make this work as needed.
- err = c.ratelimitRequests(resp)
- if err != nil {
- err = fmt.Errorf("request %v: %w", resource, err)
- return
- }
- // Success response
- if 200 <= resp.StatusCode && resp.StatusCode <= 299 {
- return
- }
- err = fmt.Errorf("request %v: %s", resource, http.StatusText(resp.StatusCode))
- return
- }
- // TODO Implement the other verbs, as needed.
- func (c *Client) get(
- resource *url.URL,
- ) (http.Header, []byte, error) {
- return c.request(http.MethodGet, resource, nil)
- }
- func (c *Client) post(
- resource *url.URL,
- body io.Reader,
- ) (http.Header, []byte, error) {
- return c.request(http.MethodPost, resource, body)
- }
- func (f *FMC) postGoThing(
- url string,
- goThing any,
- ) (respBody []byte, err error) {
- reqBody, err := json.Marshal(goThing)
- if err != nil {
- err = fmt.Errorf("failed to marshal json: %v: '%#v'", err, goThing)
- return
- }
- resp, err := f.post(url, bytes.NewReader(reqBody))
- if err != nil {
- err = fmt.Errorf("failed to post: %v: '%s'", err, reqBody)
- return
- }
- defer resp.Body.Close()
- respBody, err = io.ReadAll(resp.Body)
- if err != nil {
- err = fmt.Errorf("failed to read response body: %v", err)
- return
- }
- if err = checkForCiscoError(resp, respBody); err != nil {
- log.Printf("DEBUG: req body: %s", string(reqBody))
- return
- }
- return respBody, nil
- }
- func (f *FMC) putGoThing(
- url string,
- goThing any,
- ) (respBody []byte, err error) {
- reqBody, err := json.Marshal(goThing)
- if err != nil {
- err = fmt.Errorf("failed to marshal json: %v: '%#v'", err, goThing)
- return
- }
- resp, err := f.put(url, bytes.NewReader(reqBody))
- if err != nil {
- err = fmt.Errorf("failed to put: %v: '%s'", err, reqBody)
- return
- }
- defer resp.Body.Close()
- respBody, err = io.ReadAll(resp.Body)
- if err != nil {
- err = fmt.Errorf("failed to read response body: %v", err)
- return
- }
- if err = checkForCiscoError(resp, respBody); err != nil {
- log.Printf("DEBUG: req body: %s", string(reqBody))
- return
- }
- return respBody, nil
- }
- func newSubclient[T any](
- c *Client,
- resource *url.URL,
- ) *Subclient[T] {
- expanded := c.expandResource(resource)
- return &Subclient[T]{
- Client: *c,
- uri: expanded,
- }
- }
- type Subclient[T any] struct {
- Client
- uri *url.URL
- blockReqsBefore time.Time
- }
- /*
- Throttle requests. See
- * https://developer.webex.com/docs/basics#rate-limiting
- We postpone subsequent requests by at least win/rem
- */
- func (c *Subclient[T]) ratelimitRequests(
- _ http.Header,
- _ []byte,
- ) error {
- /*
- What I want is to cache remaining request counts for each
- endpoint, regardless of parameters. Trouble is, we inject
- parameters prior to calls to request(), which makes the
- resources passed to request() inequivalent.
- To push parameter injection further down the call stack,
- we need a reliable way to associate parameters with
- their insertion points. ~~Injection can then be deferred
- until resource expansion, which actually makes some
- sense.~~
- Nope, not far down enough. We have to push all the way
- down through the subclient. Yuck.
- */
- window := 60 * time.Second // window is not sent in headers
- remaining := c.Client.remaining
- // TODO Also handle this case.
- // Throttle requests
- // See https://www.logicmonitor.com/support/rest-api-developers-guide/overview/rate-limiting
- remStr := resp.Header.Get("X-Rate-Limit-Remaining")
- winStr := resp.Header.Get("X-Rate-Limit-Window")
- remaining, err := strconv.ParseInt(remStr, 10, 64)
- if remStr != "" && err != nil {
- err = fmt.Errorf("request %v: failed to parse header X-Rate-Limit-Remaining: %v", uri, err)
- return
- }
- window, err := time.ParseDuration(winStr + "s")
- if winStr != "" && err != nil {
- err = fmt.Errorf("request %v: failed to parse header X-Rate-Limit-Window: %v", uri, err)
- return
- }
- if remaining != 0 && window != 0 {
- // We postpone subsequent requests by at least win/rem
- msDelay := int64(window/time.Millisecond) / remaining
- delay := time.Duration(msDelay) * time.Millisecond
- c.blockReqsBefore = time.Now().Add(delay)
- }
- return nil
- }
- func (c *Subclient[T]) NextPage(
- offset uint64,
- ) (page dp.Page[T], cnt uint64, err error) {
- // TODO Is it possible to pool these for each T? Is it
- // worth the memory?
- aggr := make([]T, 0, 32)
- head, body, err := c.get(c.uri)
- if err != nil {
- err = fmt.Errorf("webex client: next page: %w", err)
- return
- }
- err = json.Unmarshal(body, &aggr)
- if err != nil {
- err = fmt.Errorf("webex client: next page: unmarshal response: %w", err)
- return
- }
- // Proactively rate-limit our own requests to avoid
- // triggering server rate limiting.
- err = c.ratelimitRequests(head, body)
- if err != nil {
- err = fmt.Errorf("request %v: %w", resource, err)
- return
- }
- if next := getLinkNext(head.Get("link")); next != "" {
- c.uri, err = url.Parse(next)
- if err != nil {
- err = fmt.Errorf("webex client: next page: unable to parse next link '%s': %w", next, err)
- return
- }
- }
- page = Aggregate[T](aggr)
- return
- }
- func getLinkNext(link string) (next string) {
- // This could be made faster but doesn't seem necessary.
- // See https://www.w3.org/wiki/LinkHeader for details.
- // basic format: `<meta.rdf>; rel=meta, ...`
- before, _, found := strings.Cut(link, `rel="next"`)
- if !found {
- return
- }
- idx := strings.LastIndex(before, "<")
- if idx == -1 {
- return
- }
- next = before[idx+1:]
- parts := strings.Split(next, ">")
- if len(parts) != 2 {
- return
- }
- next = parts[0]
- return
- }
- // User side
- type Aggregate[T any] []T
- func (a Aggregate[T]) Elems() []T {
- return a
- }
- type HuntGroupPrimaryAvailablePhoneNumber struct {
- PathWithNamespace string `json:"path_with_namespace"`
- RepoURL string `json:"http_url_to_repo"`
- WebURL string `json:"web_url"`
- DefBranch string `json:"default_branch"`
- }
- func (r *HuntGroupPrimaryAvailablePhoneNumber) Path() string {
- return r.PathWithNamespace
- }
- func (r *HuntGroupPrimaryAvailablePhoneNumber) GitURI() string {
- return r.RepoURL
- }
- func (r *HuntGroupPrimaryAvailablePhoneNumber) WebURI() string {
- return r.WebURL
- }
- func (r *HuntGroupPrimaryAvailablePhoneNumber) DefaultBranch() string {
- return r.DefBranch
- }
- func GetHuntGroupPrimaryAvailablePhoneNumbers[T *HuntGroupPrimaryAvailablePhoneNumber](
- c *Client,
- locationId,
- orgId string,
- ) dp.Pager[T] {
- resStrFmt := "/telephony/config/locations/%s/huntGroups/availableNumbers?orgId=%s"
- resStr := fmt.Sprintf(
- resStrFmt,
- url.PathEscape(locationId),
- url.QueryEscape(orgId),
- )
- /*
- See
- * https://developer.webex.com/docs/basics#pagination
- * https://developer.webex.com/docs/api/v1/features-hunt-group/get-hunt-group-primary-available-phone-numbers
- */
- maxPageSize := 8000
- resource, err := url.Parse(resStr)
- if err != nil {
- panic(fmt.Sprintf("BUG: error while parsing resource path: %v", err))
- }
- return dp.NewPager[T](
- newSubclient[T](c, resource),
- uint64(maxPageSize),
- )
- }
- type DetailedCallRecord struct {
- }
- func GetDetailedCallHistory[T *DetailedCallRecord](
- c *Client,
- ) dp.Pager[T] {
- // TODO Need to move expandResource out of subclient into
- // API functions, as having a different base URI per API
- // call invalidates our assumptions.
- resource, err := url.Parse("https://analytics.webexapis.com/v1/cdr_feed")
- }
|