depager.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /*
  2. * This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  5. */
  6. package depager
  7. import (
  8. "context"
  9. "fmt"
  10. )
  11. /*
  12. The `Page` interface must wrap server responses. This
  13. allows pagers to calculate page sizes and iterate over
  14. response aggregates.
  15. If the underlying value of this interface is `nil` (e.g. a
  16. nil pointer to a struct or a nil slice), `Elems()` will
  17. panic.
  18. */
  19. type Page[T any] interface {
  20. // Elems must return the items from this page
  21. Elems() []T
  22. // URI must return the URI associated with this page
  23. URI() string
  24. // Count must return the total number of items being paged
  25. Count() uint64
  26. }
  27. // Exposes the part of the client that depager understands.
  28. type Client[T any] interface {
  29. // NextPage returns the next page or it returns an error
  30. NextPage(
  31. page Page[T],
  32. offset uint64, // item offset at which to start page
  33. ) (err error)
  34. }
  35. type Pager[T any] interface {
  36. // Iter is intended to be used in a for-range loop
  37. Iter() <-chan T
  38. // IterPages iterates over whole pages rather than items
  39. IterPages() <-chan Page[T]
  40. // LastErr must return the first error encountered, if any
  41. LastErr() error
  42. // Abort causes the pager to relinquish all pages back to
  43. // the page pool and stop all running goroutines.
  44. Abort() error
  45. }
  46. func NewPager[T any](
  47. ctx context.Context,
  48. c Client[T],
  49. pagePool chan Page[T],
  50. ) Pager[T] {
  51. if len(pagePool) == 0 {
  52. panic("new pager: provided page pool is empty")
  53. }
  54. var pageSize uint64
  55. pg := <-pagePool
  56. pageSize = uint64(cap(pg.Elems()))
  57. pagePool <- pg
  58. ctx2, cancel := context.WithCancel(ctx)
  59. done := make(chan struct{})
  60. return &pager[T]{
  61. pctx: ctx,
  62. ctx: ctx2,
  63. cancel: cancel,
  64. done: done,
  65. client: c,
  66. n: pageSize,
  67. pagePool: pagePool,
  68. poolSize: len(pagePool),
  69. }
  70. }
  71. /*
  72. Retrieve n items in the range [m*n, m*n + n - 1], inclusive.
  73. We keep len(pagePool) pages buffered.
  74. */
  75. type pager[T any] struct {
  76. pctx context.Context // Parent context.
  77. ctx context.Context
  78. cancel context.CancelFunc
  79. done chan struct{} // Notify Abort when finished.
  80. client Client[T]
  81. m uint64
  82. n uint64
  83. err error
  84. pagePool chan Page[T]
  85. poolSize int
  86. cnt uint64
  87. }
  88. func (p *pager[T]) iteratePages() <-chan Page[T] {
  89. ch := make(chan Page[T], len(p.pagePool))
  90. go func() {
  91. defer close(ch)
  92. var page Page[T]
  93. for {
  94. if p.pctx.Err() != nil {
  95. break
  96. }
  97. page = <-p.pagePool
  98. err := p.client.NextPage(page, p.m*p.n)
  99. if err != nil {
  100. p.err = err
  101. p.pagePool <- page
  102. return
  103. }
  104. if p.cnt == 0 {
  105. p.cnt = page.Count()
  106. }
  107. // When page.Count() is zero, we must rely on the
  108. // absence of returned results to know when to stop.
  109. if p.cnt == 0 && len(page.Elems()) == 0 {
  110. p.pagePool <- page
  111. return
  112. }
  113. ch <- page
  114. if (p.m*p.n + p.n) >= p.cnt {
  115. return
  116. }
  117. p.m++
  118. }
  119. }()
  120. return ch
  121. }
  122. func (p *pager[T]) IterPages() <-chan Page[T] {
  123. ch := make(chan Page[T], p.n)
  124. go func() {
  125. defer close(p.done)
  126. defer close(ch)
  127. for page := range p.iteratePages() {
  128. if p.pctx.Err() != nil {
  129. p.pagePool <- page
  130. break
  131. }
  132. if p.err != nil {
  133. p.err = fmt.Errorf("pager: iterate pages: %w", p.err)
  134. p.pagePool <- page
  135. return
  136. }
  137. ch <- page
  138. }
  139. }()
  140. return ch
  141. }
  142. func (p *pager[T]) Iter() <-chan T {
  143. ch := make(chan T, p.n)
  144. go func() {
  145. defer close(p.done)
  146. defer close(ch)
  147. for page := range p.iteratePages() {
  148. if p.pctx.Err() != nil {
  149. p.pagePool <- page
  150. break
  151. }
  152. for _, i := range page.Elems() {
  153. ch <- i
  154. }
  155. p.pagePool <- page
  156. if p.err != nil {
  157. p.err = fmt.Errorf("pager: iterate items: %w", p.err)
  158. return
  159. }
  160. }
  161. }()
  162. return ch
  163. }
  164. func (p *pager[T]) LastErr() error {
  165. return p.err
  166. }
  167. func (p *pager[T]) reset() error {
  168. err := p.err
  169. p.ctx, p.cancel = context.WithCancel(p.pctx)
  170. p.done = make(chan struct{})
  171. p.m, p.cnt = 0, 0
  172. p.err = nil
  173. return err
  174. }
  175. func (p *pager[T]) Abort() error {
  176. p.cancel()
  177. <-p.done
  178. return p.reset()
  179. }