depager.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. /*
  2. * This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  5. */
  6. package depager
  7. import (
  8. "context"
  9. "fmt"
  10. )
  11. /*
  12. The `Page` interface must wrap server responses. This
  13. allows pagers to calculate page sizes and iterate over
  14. response aggregates.
  15. If the underlying value of this interface is `nil` (e.g. a
  16. nil pointer to a struct or a nil slice), `Elems()` will
  17. panic.
  18. */
  19. type Page[T any] interface {
  20. // Elems must return the items from this page
  21. Elems() []T
  22. // URI must return the URI associated with this page
  23. URI() string
  24. // Count must return the total number of items being paged
  25. Count() uint64
  26. }
  27. // Exposes the part of the client that depager understands.
  28. type Client[T any] interface {
  29. // NextPage returns the next page or it returns an error
  30. NextPage(
  31. page Page[T],
  32. offset uint64, // item offset at which to start page
  33. ) (err error)
  34. }
  35. type Pager[T any] interface {
  36. // Iter is intended to be used in a for-range loop
  37. Iter() <-chan T
  38. // IterPages iterates over whole pages rather than items
  39. IterPages() <-chan Page[T]
  40. // LastErr must return the first error encountered, if any
  41. LastErr() error
  42. }
  43. func NewPager[T any](
  44. ctx context.Context,
  45. c Client[T],
  46. pagePool chan Page[T],
  47. ) Pager[T] {
  48. if len(pagePool) == 0 {
  49. panic("new pager: provided page pool is empty")
  50. }
  51. var pageSize uint64
  52. pg := <-pagePool
  53. pageSize = uint64(cap(pg.Elems()))
  54. pagePool <- pg
  55. return &pager[T]{
  56. ctx: ctx,
  57. client: c,
  58. n: pageSize,
  59. pagePool: pagePool,
  60. }
  61. }
  62. /*
  63. Retrieve n items in the range [m*n, m*n + n - 1], inclusive.
  64. We keep len(pagePool) pages buffered.
  65. */
  66. type pager[T any] struct {
  67. ctx context.Context
  68. client Client[T]
  69. m uint64
  70. n uint64
  71. err error
  72. pagePool chan Page[T]
  73. cnt uint64
  74. }
  75. func (p *pager[T]) iteratePages() <-chan Page[T] {
  76. ch := make(chan Page[T], len(p.pagePool))
  77. go func() {
  78. defer close(ch)
  79. var page Page[T]
  80. for {
  81. if p.ctx.Err() != nil {
  82. break
  83. }
  84. page = <-p.pagePool
  85. err := p.client.NextPage(page, p.m*p.n)
  86. if err != nil {
  87. p.pagePool <- page
  88. p.err = err
  89. return
  90. }
  91. if p.cnt == 0 {
  92. p.cnt = page.Count()
  93. }
  94. ch <- page
  95. if (p.m*p.n + p.n) >= p.cnt {
  96. return
  97. }
  98. p.m++
  99. }
  100. }()
  101. return ch
  102. }
  103. func (p *pager[T]) IterPages() <-chan Page[T] {
  104. ch := make(chan Page[T], p.n)
  105. go func() {
  106. defer close(ch)
  107. for page := range p.iteratePages() {
  108. if p.ctx.Err() != nil {
  109. break
  110. }
  111. if p.err != nil {
  112. p.pagePool <- page
  113. p.err = fmt.Errorf("pager: iterate pages: %s", p.err)
  114. return
  115. }
  116. ch <- page
  117. }
  118. }()
  119. return ch
  120. }
  121. func (p *pager[T]) Iter() <-chan T {
  122. ch := make(chan T, p.n)
  123. go func() {
  124. defer close(ch)
  125. for page := range p.iteratePages() {
  126. if p.ctx.Err() != nil {
  127. break
  128. }
  129. for _, i := range page.Elems() {
  130. ch <- i
  131. }
  132. p.pagePool <- page
  133. if p.err != nil {
  134. p.err = fmt.Errorf("pager: iterate items: %s", p.err)
  135. return
  136. }
  137. }
  138. }()
  139. return ch
  140. }
  141. func (p *pager[T]) LastErr() error {
  142. return p.err
  143. }