-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathentry_iterator.go
132 lines (103 loc) · 2.46 KB
/
entry_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package streedb
import (
"cmp"
)
type EntryIterator[O cmp.Ordered] interface {
Next() (Entry[O], bool, error)
}
type IteratorFilter[O cmp.Ordered] func(EntriesMap[O]) bool
func newIteratorWithFilters[O cmp.Ordered](data []*Fileblock[O], filters []EntryFilter) *btreeWrapperIterator[O] {
sFilters := make([]EntryFilter, 0)
for _, filter := range filters {
if filter.Kind() == SecondaryIndexFilterKind {
sFilters = append(sFilters, filter)
}
}
tree := &btreeWrapperIterator[O]{
ch: make(chan Entry[O]),
}
tree.startFilters(data, sFilters)
return tree
}
type btreeWrapperIterator[O cmp.Ordered] struct {
ch chan Entry[O]
btree *BtreeIndex[O, O]
}
func (b *btreeWrapperIterator[O]) startFilters(data []*Fileblock[O], filters []EntryFilter) {
go func() {
defer close(b.ch)
for _, e := range data {
entriesMap, err := e.Load()
if err != nil {
return
}
entriesMap.Range(func(key string, entry Entry[O]) bool {
valid := true
for _, filter := range filters {
if !filter.Filter(entry) {
valid = false
break
}
}
if valid {
b.ch <- entry
}
return true
})
}
}()
}
func (b *btreeWrapperIterator[O]) Next() (Entry[O], bool, error) {
entry := <-b.ch
if entry == nil {
return nil, false, nil
}
return entry, true, nil
}
func NewSingleItemIterator[O cmp.Ordered](data Entry[O]) EntryIterator[O] {
return &singleItemIterator[O]{data: data}
}
type singleItemIterator[O cmp.Ordered] struct {
data Entry[O]
}
func (l *singleItemIterator[O]) Next() (Entry[O], bool, error) {
if l.data == nil {
return nil, false, nil
}
data := l.data
l.data = nil
return data, true, nil
}
func NewListIterator[O cmp.Ordered](data []Entry[O]) *listIterator[O] {
return &listIterator[O]{data: data}
}
type listIterator[O cmp.Ordered] struct {
data []Entry[O]
index int
}
func (l *listIterator[O]) Next() (Entry[O], bool, error) {
if l.index >= len(l.data) {
return nil, false, nil
}
data := l.data[l.index]
l.index++
return data, true, nil
}
func NewIteratorMerger[O cmp.Ordered](iterators ...EntryIterator[O]) *iteratorMerger[O] {
return &iteratorMerger[O]{iterators: iterators}
}
type iteratorMerger[O cmp.Ordered] struct {
iterators []EntryIterator[O]
}
func (m *iteratorMerger[O]) Next() (Entry[O], bool, error) {
for _, it := range m.iterators {
entry, found, err := it.Next()
if err != nil {
return nil, false, err
}
if found {
return entry, true, nil
}
}
return nil, false, nil
}