depager.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package depager
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/url"
  6. )
  7. // The `Page` interface must wrap server responses. This
  8. // allows pagers to calculate page sizes and iterate over
  9. // response aggregates.
  10. //
  11. // The underlying implementation must be a pointer to a
  12. // struct containing the desired response fields, all tagged
  13. // appropriately. Any fields corresponding to
  14. // platform-specific error responses should also be
  15. // included.
  16. //
  17. // `Count()` must return the total number of items to be
  18. // paged. `Elems()` must return the items from the current
  19. // page.
  20. type Page[T any] interface {
  21. Count() uint64
  22. Elems() []T
  23. }
  24. type Client interface {
  25. Get(uri url.URL) ([]byte, error)
  26. }
  27. type PagedURI interface {
  28. PageURI(limit, offset uint64) url.URL
  29. }
  30. type Pager[T any] interface {
  31. Iter() <-chan T
  32. LastErr() error
  33. }
  34. func NewPager[T any](
  35. u PagedURI,
  36. c Client,
  37. pageSize uint64,
  38. newPage func() Page[T],
  39. ) Pager[T] {
  40. return &pager[T]{
  41. client: c,
  42. uri: u,
  43. m: 0,
  44. n: pageSize,
  45. p: 2,
  46. newPage: newPage,
  47. }
  48. }
  49. // Retrieve n items in the range [m*n, m*n + n - 1],
  50. // inclusive. Keep p pages buffered.
  51. type pager[T any] struct {
  52. client Client
  53. uri PagedURI
  54. m uint64
  55. n uint64
  56. err error
  57. p int
  58. newPage func() Page[T]
  59. }
  60. func (p *pager[T]) iteratePages() <-chan Page[T] {
  61. ch := make(chan Page[T], p.p)
  62. go func() {
  63. defer close(ch)
  64. for {
  65. uri := p.uri.PageURI(p.n, p.m*p.n)
  66. body, err := p.client.Get(uri)
  67. if err != nil {
  68. // the client knows better than we do
  69. p.err = err
  70. return
  71. }
  72. page := p.newPage()
  73. // TODO: eventually allow other codecs
  74. err = json.Unmarshal(body, page)
  75. if err != nil {
  76. p.err = fmt.Errorf("pager: iterate pages: unmarshal response: %v", err)
  77. return
  78. }
  79. ch <- page
  80. if (p.m*p.n + p.n) >= page.Count() {
  81. return
  82. }
  83. p.m++
  84. }
  85. }()
  86. return ch
  87. }
  88. func (p *pager[T]) Iter() <-chan T {
  89. ch := make(chan T, p.n)
  90. go func() {
  91. defer close(ch)
  92. for page := range p.iteratePages() {
  93. for _, i := range page.Elems() {
  94. ch <- i
  95. }
  96. if p.err != nil {
  97. p.err = fmt.Errorf("pager: iterate items: %s", p.err)
  98. return
  99. }
  100. }
  101. }()
  102. return ch
  103. }
  104. func (p *pager[T]) LastErr() error {
  105. return p.err
  106. }