/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ package depager import ( "context" "fmt" ) /* The `Page` interface must wrap server responses. This allows pagers to calculate page sizes and iterate over response aggregates. If the underlying value of this interface is `nil` (e.g. a nil pointer to a struct or a nil slice), `Elems()` will panic. */ type Page[T any] interface { // Elems must return the items from this page Elems() []T // URI must return the URI associated with this page URI() string // Count must return the total number of items being paged Count() uint64 } // Exposes the part of the client that depager understands. type Client[T any] interface { // NextPage returns the next page or it returns an error NextPage( page Page[T], offset uint64, // item offset at which to start page ) (err error) } type Pager[T any] interface { // Iter is intended to be used in a for-range loop Iter() <-chan T // IterPages iterates over whole pages rather than items IterPages() <-chan Page[T] // LastErr must return the first error encountered, if any LastErr() error // Abort causes the pager to relinquish all pages back to // the page pool and stop all running goroutines. Abort() error } func NewPager[T any]( ctx context.Context, c Client[T], pagePool chan Page[T], ) Pager[T] { if len(pagePool) == 0 { panic("new pager: provided page pool is empty") } var pageSize uint64 pg := <-pagePool pageSize = uint64(cap(pg.Elems())) pagePool <- pg ctx2, cancel := context.WithCancel(ctx) done := make(chan struct{}) return &pager[T]{ pctx: ctx, ctx: ctx2, cancel: cancel, done: done, client: c, n: pageSize, pagePool: pagePool, poolSize: len(pagePool), } } /* Retrieve n items in the range [m*n, m*n + n - 1], inclusive. We keep len(pagePool) pages buffered. */ type pager[T any] struct { pctx context.Context // Parent context. ctx context.Context cancel context.CancelFunc done chan struct{} // Notify Abort when finished. client Client[T] m uint64 n uint64 err error pagePool chan Page[T] poolSize int cnt uint64 } func (p *pager[T]) iteratePages() <-chan Page[T] { ch := make(chan Page[T], len(p.pagePool)) go func() { defer close(ch) var page Page[T] for { if p.pctx.Err() != nil { break } page = <-p.pagePool err := p.client.NextPage(page, p.m*p.n) if err != nil { p.err = err p.pagePool <- page return } if p.cnt == 0 { p.cnt = page.Count() } // When page.Count() is zero, we must rely on the // absence of returned results to know when to stop. if p.cnt == 0 && len(page.Elems()) == 0 { p.pagePool <- page return } ch <- page if (p.m*p.n + p.n) >= p.cnt { return } p.m++ } }() return ch } func (p *pager[T]) IterPages() <-chan Page[T] { ch := make(chan Page[T], p.n) go func() { defer close(p.done) defer close(ch) for page := range p.iteratePages() { if p.pctx.Err() != nil { p.pagePool <- page break } if p.err != nil { p.err = fmt.Errorf("pager: iterate pages: %w", p.err) p.pagePool <- page return } ch <- page } }() return ch } func (p *pager[T]) Iter() <-chan T { ch := make(chan T, p.n) go func() { defer close(p.done) defer close(ch) for page := range p.iteratePages() { if p.pctx.Err() != nil { p.pagePool <- page break } for _, i := range page.Elems() { ch <- i } p.pagePool <- page if p.err != nil { p.err = fmt.Errorf("pager: iterate items: %w", p.err) return } } }() return ch } func (p *pager[T]) LastErr() error { return p.err } func (p *pager[T]) reset() error { err := p.err p.ctx, p.cancel = context.WithCancel(p.pctx) p.done = make(chan struct{}) p.m, p.cnt = 0, 0 p.err = nil return err } func (p *pager[T]) Abort() error { p.cancel() <-p.done return p.reset() }