depager.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. /*
  2. * This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  5. */
  6. package depager
  7. import (
  8. "fmt"
  9. )
  10. /*
  11. The `Page` interface must wrap server responses. This
  12. allows pagers to calculate page sizes and iterate over
  13. response aggregates.
  14. If the underlying value of this interface is `nil` (e.g. a
  15. nil pointer to a struct or a nil slice), `Elems()` will
  16. panic.
  17. */
  18. type Page[T any] interface {
  19. // Elems must return the items from the current page
  20. Elems() []T
  21. }
  22. // Exposes the part of the client that depager understands.
  23. type Client[T any] interface {
  24. // NextPage returns the next page or it returns an error
  25. NextPage(offset uint64) (
  26. page Page[T],
  27. count uint64, // total count of all items being paged
  28. err error,
  29. )
  30. }
  31. type Pager[T any] interface {
  32. // Iter is intended to be used in a for-range loop
  33. Iter() <-chan T
  34. // LastErr must return the first error encountered, if any
  35. LastErr() error
  36. }
  37. func NewPager[T any](
  38. c Client[T],
  39. pageSize uint64,
  40. ) Pager[T] {
  41. return &pager[T]{
  42. client: c,
  43. n: pageSize,
  44. p: 4,
  45. }
  46. }
  47. /*
  48. Retrieve n items in the range [m*n, m*n + n - 1], inclusive.
  49. Keep p pages buffered.
  50. */
  51. type pager[T any] struct {
  52. client Client[T]
  53. m uint64
  54. n uint64
  55. err error
  56. p int
  57. cnt uint64
  58. }
  59. func (p *pager[T]) iteratePages() <-chan Page[T] {
  60. ch := make(chan Page[T], p.p)
  61. go func() {
  62. defer close(ch)
  63. for {
  64. page, cnt, err :=
  65. p.client.NextPage(p.m * p.n)
  66. if err != nil {
  67. p.err = err
  68. return
  69. }
  70. if p.cnt == 0 {
  71. p.cnt = cnt
  72. }
  73. ch <- page
  74. if (p.m*p.n + p.n) >= p.cnt {
  75. return
  76. }
  77. p.m++
  78. }
  79. }()
  80. return ch
  81. }
  82. func (p *pager[T]) Iter() <-chan T {
  83. ch := make(chan T, p.n)
  84. go func() {
  85. defer close(ch)
  86. for page := range p.iteratePages() {
  87. for _, i := range page.Elems() {
  88. ch <- i
  89. }
  90. if p.err != nil {
  91. p.err = fmt.Errorf("pager: iterate items: %s", p.err)
  92. return
  93. }
  94. }
  95. }()
  96. return ch
  97. }
  98. func (p *pager[T]) LastErr() error {
  99. return p.err
  100. }