depager.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. /*
  2. * This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  5. */
  6. package depager
  7. import (
  8. "context"
  9. "fmt"
  10. "io"
  11. )
  12. /*
  13. The `Page` interface must wrap server responses. This
  14. allows pagers to calculate page sizes and iterate over
  15. response aggregates.
  16. If the underlying value of this interface is `nil` (e.g. a
  17. nil pointer to a struct or a nil slice), `Elems()` will
  18. panic.
  19. */
  20. type Page[T any] interface {
  21. // Elems must return the items from this page
  22. Elems() []T
  23. // URI must return the URI associated with this page
  24. URI() string
  25. // Count must return the total number of items being paged
  26. Count() uint64
  27. }
  28. // Exposes the part of the client that depager understands.
  29. type Client[T any] interface {
  30. // NextPage returns the next page or it returns an error
  31. NextPage(
  32. page Page[T],
  33. offset uint64, // item offset at which to start page
  34. ) (err error)
  35. }
  36. type Pager[T any] interface {
  37. // Iter is intended to be used in a for-range loop
  38. Iter() <-chan T
  39. // IterPages iterates over whole pages rather than items
  40. IterPages() <-chan Page[T]
  41. // LastErr must return the first error encountered, if any
  42. LastErr() error
  43. // Abort causes the pager to relinquish all pages back to
  44. // the page pool and stop all running goroutines.
  45. Abort() error
  46. }
  47. func NewPager[T any](
  48. ctx context.Context,
  49. c Client[T],
  50. pagePool chan Page[T],
  51. ) Pager[T] {
  52. if len(pagePool) == 0 {
  53. panic("new pager: provided page pool is empty")
  54. }
  55. var pageSize uint64
  56. pg := <-pagePool
  57. pageSize = uint64(cap(pg.Elems()))
  58. pagePool <- pg
  59. ctx2, cancel := context.WithCancel(ctx)
  60. done := make(chan struct{})
  61. return &pager[T]{
  62. pctx: ctx,
  63. ctx: ctx2,
  64. cancel: cancel,
  65. done: done,
  66. client: c,
  67. n: pageSize,
  68. pagePool: pagePool,
  69. poolSize: len(pagePool),
  70. }
  71. }
  72. /*
  73. Retrieve n items in the range [m*n, m*n + n - 1], inclusive.
  74. We keep len(pagePool) pages buffered.
  75. */
  76. type pager[T any] struct {
  77. pctx context.Context // Parent context.
  78. ctx context.Context
  79. cancel context.CancelFunc
  80. done chan struct{} // Notify Abort when finished.
  81. client Client[T]
  82. m uint64
  83. n uint64
  84. err error
  85. pagePool chan Page[T]
  86. poolSize int
  87. cnt uint64
  88. }
  89. func (p *pager[T]) iteratePages() <-chan Page[T] {
  90. ch := make(chan Page[T], len(p.pagePool))
  91. go func() {
  92. defer close(ch)
  93. var page Page[T]
  94. for {
  95. if p.ctx.Err() != nil {
  96. break
  97. }
  98. select {
  99. case page = <-p.pagePool:
  100. err := p.client.NextPage(page, p.m*p.n)
  101. if err != nil {
  102. p.err = err
  103. p.pagePool <- page
  104. return
  105. }
  106. if p.cnt == 0 {
  107. p.cnt = page.Count()
  108. }
  109. // When page.Count() is zero, we must rely on the
  110. // absence of returned results to know when to stop.
  111. if p.cnt == 0 && len(page.Elems()) == 0 {
  112. p.pagePool <- page
  113. return
  114. }
  115. ch <- page
  116. if p.cnt != 0 && (p.m*p.n+p.n) >= p.cnt {
  117. return
  118. }
  119. p.m++
  120. default:
  121. }
  122. }
  123. }()
  124. return ch
  125. }
  126. func (p *pager[T]) IterPages() <-chan Page[T] {
  127. ch := make(chan Page[T], p.n)
  128. go func() {
  129. defer close(p.done)
  130. defer close(ch)
  131. for page := range p.iteratePages() {
  132. if p.ctx.Err() != nil {
  133. p.pagePool <- page
  134. break
  135. }
  136. if p.err != nil {
  137. if p.err != io.EOF {
  138. p.err = fmt.Errorf("pager: iterate pages: %w", p.err)
  139. }
  140. p.pagePool <- page
  141. return
  142. }
  143. ch <- page
  144. }
  145. }()
  146. return ch
  147. }
  148. func (p *pager[T]) Iter() <-chan T {
  149. ch := make(chan T, p.n)
  150. go func() {
  151. defer close(p.done)
  152. defer close(ch)
  153. for page := range p.iteratePages() {
  154. if p.ctx.Err() != nil {
  155. p.pagePool <- page
  156. break
  157. }
  158. for _, i := range page.Elems() {
  159. ch <- i
  160. }
  161. p.pagePool <- page
  162. if p.err != nil {
  163. if p.err != io.EOF {
  164. p.err = fmt.Errorf("pager: iterate items: %w", p.err)
  165. }
  166. return
  167. }
  168. }
  169. }()
  170. return ch
  171. }
  172. func (p *pager[T]) LastErr() error {
  173. return p.err
  174. }
  175. func (p *pager[T]) reset() error {
  176. err := p.err
  177. p.ctx, p.cancel = context.WithCancel(p.pctx)
  178. p.done = make(chan struct{})
  179. p.m, p.cnt = 0, 0
  180. p.err = nil
  181. return err
  182. }
  183. func (p *pager[T]) Abort() error {
  184. p.cancel()
  185. <-p.done
  186. return p.reset()
  187. }