Jonathan D. Storm 1 жил өмнө
commit
919460a7aa
3 өөрчлөгдсөн 272 нэмэгдсэн , 0 устгасан
  1. 149 0
      README.md
  2. 120 0
      depager.go
  3. 3 0
      go.mod

+ 149 - 0
README.md

@@ -0,0 +1,149 @@
+# depager
+
+Trades REST API paging logic and request throttling for a
+channel and a `for` loop.
+
+For the moment, *depager* only supports JSON responses.
+
+*depager* requires structs conforming to the following
+interfaces:
+
+```go
+// The `Page` interface must wrap server responses. This
+// allows pagers to calculate page sizes and iterate over
+// response aggregates.
+//
+// The underlying implementation must be a pointer to a
+// struct containing the desired response fields, all tagged
+// appropriately. Any fields corresponding to
+// platform-specific error responses should also be
+// included.
+//
+// `Count()` must return the total number of items to be
+// paged. `Elems()` must return the items from the current
+// page.
+type Page[T any] interface {
+	Count() uint64
+	Elems() []T
+}
+
+type Client interface {
+	Get(uri url.URL) ([]byte, error)
+}
+
+type PagedURI interface {
+	PageURI(limit, offset uint64) url.URL
+}
+
+```
+
+And in return, *depager* provides the following:
+
+```go
+type Pager[T any] interface {
+	Iter() <-chan T
+	LastErr() error
+}
+```
+
+## Example
+
+```go
+import (
+    dp "idio.link/go/depager"
+)
+
+type pagedURI struct {
+	uri *url.URL
+}
+
+func (u pagedURI) PageURI(limit, offset uint64) url.URL {
+	if limit > MaxPageSize {
+		limit = MaxPageSize
+	}
+	uri := *u.uri
+	q := (&uri).Query()
+	q.Add("offset", strconv.FormatUint(offset, 10))
+	q.Add("size", strconv.FormatUint(limit, 10))
+	(&uri).RawQuery = q.Encode()
+	return uri
+}
+
+func NewMyAggregate[T any]() *MyAggregate[T] {
+	return &MyAggregate[T]{Items: make([]T, 0, 64)}
+}
+
+type MyAggregate[T any] struct {
+	Total int32 `json:"total"`
+	Items []T   `json:"items"`
+}
+
+func (a *MyAggregate[_]) Count() uint64 {
+	return uint64(a.Total)
+}
+
+func (a *MyAggregate[T]) Elems() []T {
+	return a.Items
+}
+
+// The client can use this func for all its paged responses.
+func makeMyPager[T any](
+	client *MyClient,
+	resource *url.URL,
+) dp.Pager[T] {
+	return dp.NewPager(
+		pagedURI{resource},
+		client,
+		MaxPageSize,  // most days, this should be the maximum page size that the server will provide
+		func() dp.Page[T] {
+			return NewMyAggregate[T]()
+		},
+	)
+}
+
+// .
+// .
+// .
+
+type Thing struct {
+	Id   int32  `json:"id"`
+	Name string `json:"name"`
+}
+
+func main() {
+    id := 82348
+    var thingPager dp.Pager[*Thing] =
+        client.GetThings(id)
+
+    for thing := range thingPager.Iter() {
+        if ctx.Err() != nil {
+            // stay responsive!
+            break
+        }
+        // do stuff with each thing
+    }
+
+    // finally, check for errors
+    if err := thingPager.LastErr(); err != nil {
+        log.Printf("do stuff with things: %v", err)
+        return err
+    }
+}
+```
+
+## Why?
+
+In a world of plentiful memory, application-level paging,
+when implemented over a buffered, flow-controlled protocol
+like TCP, becomes a needless redundancy, at best; at worst,
+it is a mindless abnegation of system-level affordances.
+But since we twisted the web into a transport layer, we
+no longer have the option of leveraging underlying flow
+control, and workarounds like QUIC only double down on this
+bizarre state of affairs.
+
+Since I have no expectation that the scourge of the web, as
+transport, will disappear anytime soon, I may as well
+recapitulate the same basic flow control that TCP would
+provide us, if only we let it.
+

+ 120 - 0
depager.go

@@ -0,0 +1,120 @@
+package depager
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/url"
+)
+
+// The `Page` interface must wrap server responses. This
+// allows pagers to calculate page sizes and iterate over
+// response aggregates.
+//
+// The underlying implementation must be a pointer to a
+// struct containing the desired response fields, all tagged
+// appropriately. Any fields corresponding to
+// platform-specific error responses should also be
+// included.
+//
+// `Count()` must return the total number of items to be
+// paged. `Elems()` must return the items from the current
+// page.
+type Page[T any] interface {
+	Count() uint64
+	Elems() []T
+}
+
+type Client interface {
+	Get(uri url.URL) ([]byte, error)
+}
+
+type PagedURI interface {
+	PageURI(limit, offset uint64) url.URL
+}
+
+type Pager[T any] interface {
+	Iter() <-chan T
+	LastErr() error
+}
+
+func NewPager[T any](
+	u PagedURI,
+	c Client,
+	pageSize uint64,
+	newPage func() Page[T],
+) Pager[T] {
+	return &pager[T]{
+		client:  c,
+		uri:     u,
+		m:       0,
+		n:       pageSize,
+		p:       2,
+		newPage: newPage,
+	}
+}
+
+// Retrieve n items in the range [m*n, m*n + n - 1],
+// inclusive. Keep p pages buffered.
+type pager[T any] struct {
+	client  Client
+	uri     PagedURI
+	m       uint64
+	n       uint64
+	err     error
+	p       int
+	newPage func() Page[T]
+}
+
+func (p *pager[T]) iteratePages() <-chan Page[T] {
+	ch := make(chan Page[T], p.p)
+	go func() {
+		defer close(ch)
+		for {
+			uri := p.uri.PageURI(p.n, p.m*p.n)
+
+			body, err := p.client.Get(uri)
+			if err != nil {
+				// the client knows better than we do
+				p.err = err
+				return
+			}
+
+			page := p.newPage()
+
+			// TODO: eventually allow other codecs
+			err = json.Unmarshal(body, page)
+			if err != nil {
+				p.err = fmt.Errorf("pager: iterate pages: unmarshal response: %v", err)
+				return
+			}
+			ch <- page
+
+			if (p.m*p.n + p.n) >= page.Count() {
+				return
+			}
+			p.m++
+		}
+	}()
+	return ch
+}
+
+func (p *pager[T]) Iter() <-chan T {
+	ch := make(chan T, p.n)
+	go func() {
+		defer close(ch)
+		for page := range p.iteratePages() {
+			for _, i := range page.Elems() {
+				ch <- i
+			}
+			if p.err != nil {
+				p.err = fmt.Errorf("pager: iterate items: %s", p.err)
+				return
+			}
+		}
+	}()
+	return ch
+}
+
+func (p *pager[T]) LastErr() error {
+	return p.err
+}

+ 3 - 0
go.mod

@@ -0,0 +1,3 @@
+module idio.link/go/depager
+
+go 1.18