depager.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /*
  2. * This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  5. */
  6. package depager
  7. import "fmt"
  8. /*
  9. The `Page` interface must wrap server responses. This
  10. allows pagers to calculate page sizes and iterate over
  11. response aggregates.
  12. If the underlying value of this interface is `nil` (e.g. a
  13. nil pointer to a struct or a nil slice), `Elems()` will
  14. panic.
  15. */
  16. type Page[T any] interface {
  17. // Elems must return the items from this page
  18. Elems() []T
  19. // URI must return the URI associated with this page
  20. URI() string
  21. }
  22. // Exposes the part of the client that depager understands.
  23. type Client[T any] interface {
  24. // NextPage returns the next page or it returns an error
  25. NextPage(
  26. page Page[T],
  27. offset uint64, // item offset at which to start page
  28. ) (
  29. count uint64, // total count of all items being paged
  30. err error,
  31. )
  32. }
  33. type Pager[T any] interface {
  34. // Iter is intended to be used in a for-range loop
  35. Iter() <-chan T
  36. // IterPages iterates over whole pages rather than items
  37. IterPages() <-chan Page[T]
  38. // LastErr must return the first error encountered, if any
  39. LastErr() error
  40. }
  41. func NewPager[T any](
  42. c Client[T],
  43. pagePool chan Page[T],
  44. ) Pager[T] {
  45. if len(pagePool) == 0 {
  46. panic("new pager: provided page pool is empty")
  47. }
  48. var pageSize uint64
  49. pg := <-pagePool
  50. pageSize = uint64(cap(pg.Elems()))
  51. pagePool <- pg
  52. return &pager[T]{
  53. client: c,
  54. n: pageSize,
  55. pagePool: pagePool,
  56. }
  57. }
  58. /*
  59. Retrieve n items in the range [m*n, m*n + n - 1], inclusive.
  60. We keep len(pagePool) pages buffered.
  61. */
  62. type pager[T any] struct {
  63. client Client[T]
  64. m uint64
  65. n uint64
  66. err error
  67. pagePool chan Page[T]
  68. cnt uint64
  69. }
  70. func (p *pager[T]) iteratePages() <-chan Page[T] {
  71. ch := make(chan Page[T], len(p.pagePool))
  72. go func() {
  73. defer close(ch)
  74. var page Page[T]
  75. for {
  76. page = <-p.pagePool
  77. cnt, err := p.client.NextPage(page, p.m*p.n)
  78. if err != nil {
  79. p.pagePool <- page
  80. p.err = err
  81. return
  82. }
  83. if p.cnt == 0 {
  84. p.cnt = cnt
  85. }
  86. ch <- page
  87. if (p.m*p.n + p.n) >= p.cnt {
  88. return
  89. }
  90. p.m++
  91. }
  92. }()
  93. return ch
  94. }
  95. func (p *pager[T]) IterPages() <-chan Page[T] {
  96. ch := make(chan Page[T], p.n)
  97. go func() {
  98. defer close(ch)
  99. for page := range p.iteratePages() {
  100. if p.err != nil {
  101. p.pagePool <- page
  102. p.err = fmt.Errorf("pager: iterate pages: %s", p.err)
  103. return
  104. }
  105. ch <- page
  106. }
  107. }()
  108. return ch
  109. }
  110. func (p *pager[T]) Iter() <-chan T {
  111. ch := make(chan T, p.n)
  112. go func() {
  113. defer close(ch)
  114. for page := range p.iteratePages() {
  115. for _, i := range page.Elems() {
  116. ch <- i
  117. }
  118. p.pagePool <- page
  119. if p.err != nil {
  120. p.err = fmt.Errorf("pager: iterate items: %s", p.err)
  121. return
  122. }
  123. }
  124. }()
  125. return ch
  126. }
  127. func (p *pager[T]) LastErr() error {
  128. return p.err
  129. }