/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ package depager import "fmt" /* The `Page` interface must wrap server responses. This allows pagers to calculate page sizes and iterate over response aggregates. If the underlying value of this interface is `nil` (e.g. a nil pointer to a struct or a nil slice), `Elems()` will panic. */ type Page[T any] interface { // Elems must return the items from this page Elems() []T // URI must return the URI associated with this page URI() string } // Exposes the part of the client that depager understands. type Client[T any] interface { // NextPage returns the next page or it returns an error NextPage( page Page[T], offset uint64, // item offset at which to start page ) ( count uint64, // total count of all items being paged err error, ) } type Pager[T any] interface { // Iter is intended to be used in a for-range loop Iter() <-chan T // IterPages iterates over whole pages rather than items IterPages() <-chan Page[T] // LastErr must return the first error encountered, if any LastErr() error } func NewPager[T any]( c Client[T], pagePool chan Page[T], ) Pager[T] { if len(pagePool) == 0 { panic("new pager: provided page pool is empty") } var pageSize uint64 pg := <-pagePool pageSize = uint64(cap(pg.Elems())) pagePool <- pg return &pager[T]{ client: c, n: pageSize, pagePool: pagePool, } } /* Retrieve n items in the range [m*n, m*n + n - 1], inclusive. We keep len(pagePool) pages buffered. */ type pager[T any] struct { client Client[T] m uint64 n uint64 err error pagePool chan Page[T] cnt uint64 } func (p *pager[T]) iteratePages() <-chan Page[T] { ch := make(chan Page[T], len(p.pagePool)) go func() { defer close(ch) var page Page[T] for { page = <-p.pagePool cnt, err := p.client.NextPage(page, p.m*p.n) if err != nil { p.pagePool <- page p.err = err return } if p.cnt == 0 { p.cnt = cnt } ch <- page if (p.m*p.n + p.n) >= p.cnt { return } p.m++ } }() return ch } func (p *pager[T]) IterPages() <-chan Page[T] { ch := make(chan Page[T], p.n) go func() { defer close(ch) for page := range p.iteratePages() { if p.err != nil { p.pagePool <- page p.err = fmt.Errorf("pager: iterate pages: %s", p.err) return } ch <- page } }() return ch } func (p *pager[T]) Iter() <-chan T { ch := make(chan T, p.n) go func() { defer close(ch) for page := range p.iteratePages() { for _, i := range page.Elems() { ch <- i } p.pagePool <- page if p.err != nil { p.err = fmt.Errorf("pager: iterate items: %s", p.err) return } } }() return ch } func (p *pager[T]) LastErr() error { return p.err }