123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- /*
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at https://mozilla.org/MPL/2.0/.
- */
- package depager
- import (
- "context"
- "fmt"
- "io"
- )
- /*
- The `Page` interface must wrap server responses. This
- allows pagers to calculate page sizes and iterate over
- response aggregates.
- If the underlying value of this interface is `nil` (e.g. a
- nil pointer to a struct or a nil slice), `Elems()` will
- panic.
- */
- type Page[T any] interface {
- // Elems must return the items from this page
- Elems() []T
- // URI must return the URI associated with this page
- URI() string
- // Count must return the total number of items being paged
- Count() uint64
- }
- // Exposes the part of the client that depager understands.
- type Client[T any] interface {
- // NextPage returns the next page or it returns an error
- NextPage(
- page Page[T],
- offset uint64, // item offset at which to start page
- ) (err error)
- }
- type Pager[T any] interface {
- // Iter is intended to be used in a for-range loop
- Iter() <-chan T
- // IterPages iterates over whole pages rather than items
- IterPages() <-chan Page[T]
- // LastErr must return the first error encountered, if any
- LastErr() error
- // Abort causes the pager to relinquish all pages back to
- // the page pool and stop all running goroutines.
- Abort() error
- }
- func NewPager[T any](
- ctx context.Context,
- c Client[T],
- pagePool chan Page[T],
- ) Pager[T] {
- if len(pagePool) == 0 {
- panic("new pager: provided page pool is empty")
- }
- var pageSize uint64
- pg := <-pagePool
- pageSize = uint64(cap(pg.Elems()))
- pagePool <- pg
- ctx2, cancel := context.WithCancel(ctx)
- done := make(chan struct{})
- return &pager[T]{
- pctx: ctx,
- ctx: ctx2,
- cancel: cancel,
- done: done,
- client: c,
- n: pageSize,
- pagePool: pagePool,
- poolSize: len(pagePool),
- }
- }
- /*
- Retrieve n items in the range [m*n, m*n + n - 1], inclusive.
- We keep len(pagePool) pages buffered.
- */
- type pager[T any] struct {
- pctx context.Context // Parent context.
- ctx context.Context
- cancel context.CancelFunc
- done chan struct{} // Notify Abort when finished.
- client Client[T]
- m uint64
- n uint64
- err error
- pagePool chan Page[T]
- poolSize int
- cnt uint64
- }
- func (p *pager[T]) iteratePages() <-chan Page[T] {
- ch := make(chan Page[T], len(p.pagePool))
- go func() {
- defer close(ch)
- var page Page[T]
- for {
- if p.ctx.Err() != nil {
- break
- }
- select {
- case page = <-p.pagePool:
- err := p.client.NextPage(page, p.m*p.n)
- if err != nil {
- p.err = err
- p.pagePool <- page
- return
- }
- if p.cnt == 0 {
- p.cnt = page.Count()
- }
- // When page.Count() is zero, we must rely on the
- // absence of returned results to know when to stop.
- if p.cnt == 0 && len(page.Elems()) == 0 {
- p.pagePool <- page
- return
- }
- ch <- page
- if p.cnt != 0 && (p.m*p.n+p.n) >= p.cnt {
- return
- }
- p.m++
- default:
- }
- }
- }()
- return ch
- }
- func (p *pager[T]) IterPages() <-chan Page[T] {
- ch := make(chan Page[T], p.n)
- go func() {
- defer close(p.done)
- defer close(ch)
- for page := range p.iteratePages() {
- if p.ctx.Err() != nil {
- p.pagePool <- page
- break
- }
- if p.err != nil {
- if p.err != io.EOF {
- p.err = fmt.Errorf("pager: iterate pages: %w", p.err)
- }
- p.pagePool <- page
- return
- }
- ch <- page
- }
- }()
- return ch
- }
- func (p *pager[T]) Iter() <-chan T {
- ch := make(chan T, p.n)
- go func() {
- defer close(p.done)
- defer close(ch)
- for page := range p.iteratePages() {
- if p.ctx.Err() != nil {
- p.pagePool <- page
- break
- }
- for _, i := range page.Elems() {
- ch <- i
- }
- p.pagePool <- page
- if p.err != nil {
- if p.err != io.EOF {
- p.err = fmt.Errorf("pager: iterate items: %w", p.err)
- }
- return
- }
- }
- }()
- return ch
- }
- func (p *pager[T]) LastErr() error {
- return p.err
- }
- func (p *pager[T]) reset() error {
- err := p.err
- p.ctx, p.cancel = context.WithCancel(p.pctx)
- p.done = make(chan struct{})
- p.m, p.cnt = 0, 0
- p.err = nil
- return err
- }
- func (p *pager[T]) Abort() error {
- p.cancel()
- <-p.done
- return p.reset()
- }
|