depager.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. /*
  2. * This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  5. */
  6. package depager
  7. import (
  8. "encoding/json"
  9. "fmt"
  10. "net/url"
  11. )
  12. /*
  13. The `Page` interface must wrap server responses. This
  14. allows pagers to calculate page sizes and iterate over
  15. response aggregates.
  16. The underlying implementation must be a pointer to a
  17. struct containing the desired response fields, all tagged
  18. appropriately. Any fields corresponding to
  19. platform-specific error responses should also be included.
  20. */
  21. type Page[T any] interface {
  22. // Count must return the total number of items to be paged
  23. Count() uint64
  24. // Elems must return the items from the current page
  25. Elems() []T
  26. }
  27. // Exposes the part of the client that depager understands.
  28. type Client interface {
  29. // Get must return the response body and/or an error
  30. Get(uri url.URL) ([]byte, error)
  31. }
  32. // Maps depager's parameters into the API's parameters.
  33. type PagedURI interface {
  34. // PageURI generates a URI to request the indicated items
  35. PageURI(limit, offset uint64) url.URL
  36. }
  37. type Pager[T any] interface {
  38. // Iter is intended to be used in a for-range loop
  39. Iter() <-chan T
  40. // LastErr must return the first error encountered, if any
  41. LastErr() error
  42. }
  43. func NewPager[T any](
  44. u PagedURI,
  45. c Client,
  46. pageSize uint64,
  47. newPage func() Page[T],
  48. ) Pager[T] {
  49. return &pager[T]{
  50. client: c,
  51. uri: u,
  52. m: 0,
  53. n: pageSize,
  54. p: 4,
  55. newPage: newPage,
  56. }
  57. }
  58. /*
  59. Retrieve n items in the range [m*n, m*n + n - 1], inclusive.
  60. Keep p pages buffered.
  61. */
  62. type pager[T any] struct {
  63. client Client
  64. uri PagedURI
  65. m uint64
  66. n uint64
  67. err error
  68. p int
  69. newPage func() Page[T]
  70. }
  71. func (p *pager[T]) iteratePages() <-chan Page[T] {
  72. ch := make(chan Page[T], p.p)
  73. go func() {
  74. defer close(ch)
  75. for {
  76. uri := p.uri.PageURI(p.n, p.m*p.n)
  77. body, err := p.client.Get(uri)
  78. if err != nil {
  79. // assume the client knows better than we do
  80. p.err = err
  81. return
  82. }
  83. page := p.newPage()
  84. // TODO: eventually allow other codecs
  85. err = json.Unmarshal(body, page)
  86. if err != nil {
  87. p.err = fmt.Errorf("pager: iterate pages: unmarshal response: %v", err)
  88. return
  89. }
  90. ch <- page
  91. if (p.m*p.n + p.n) >= page.Count() {
  92. return
  93. }
  94. p.m++
  95. }
  96. }()
  97. return ch
  98. }
  99. func (p *pager[T]) Iter() <-chan T {
  100. ch := make(chan T, p.n)
  101. go func() {
  102. defer close(ch)
  103. for page := range p.iteratePages() {
  104. for _, i := range page.Elems() {
  105. ch <- i
  106. }
  107. if p.err != nil {
  108. p.err = fmt.Errorf("pager: iterate items: %s", p.err)
  109. return
  110. }
  111. }
  112. }()
  113. return ch
  114. }
  115. func (p *pager[T]) LastErr() error {
  116. return p.err
  117. }