forked from libp2p/go-libp2p-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpc_queue.go
147 lines (123 loc) · 2.99 KB
/
rpc_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package pubsub
import (
"context"
"errors"
"sync"
)
var (
ErrQueueCancelled = errors.New("rpc queue operation cancelled")
ErrQueueClosed = errors.New("rpc queue closed")
ErrQueueFull = errors.New("rpc queue full")
ErrQueuePushOnClosed = errors.New("push on closed rpc queue")
)
type priorityQueue struct {
normal []*RPC
priority []*RPC
}
func (q *priorityQueue) Len() int {
return len(q.normal) + len(q.priority)
}
func (q *priorityQueue) NormalPush(rpc *RPC) {
q.normal = append(q.normal, rpc)
}
func (q *priorityQueue) PriorityPush(rpc *RPC) {
q.priority = append(q.priority, rpc)
}
func (q *priorityQueue) Pop() *RPC {
var rpc *RPC
if len(q.priority) > 0 {
rpc = q.priority[0]
q.priority[0] = nil
q.priority = q.priority[1:]
} else if len(q.normal) > 0 {
rpc = q.normal[0]
q.normal[0] = nil
q.normal = q.normal[1:]
}
return rpc
}
type rpcQueue struct {
dataAvailable sync.Cond
spaceAvailable sync.Cond
// Mutex used to access queue
queueMu sync.Mutex
queue priorityQueue
closed bool
maxSize int
}
func newRpcQueue(maxSize int) *rpcQueue {
q := &rpcQueue{maxSize: maxSize}
q.dataAvailable.L = &q.queueMu
q.spaceAvailable.L = &q.queueMu
return q
}
func (q *rpcQueue) Push(rpc *RPC, block bool) error {
return q.push(rpc, false, block)
}
func (q *rpcQueue) UrgentPush(rpc *RPC, block bool) error {
return q.push(rpc, true, block)
}
func (q *rpcQueue) push(rpc *RPC, urgent bool, block bool) error {
q.queueMu.Lock()
defer q.queueMu.Unlock()
if q.closed {
panic(ErrQueuePushOnClosed)
}
for q.queue.Len() == q.maxSize {
if block {
q.spaceAvailable.Wait()
// It can receive a signal because the queue is closed.
if q.closed {
panic(ErrQueuePushOnClosed)
}
} else {
return ErrQueueFull
}
}
if urgent {
q.queue.PriorityPush(rpc)
} else {
q.queue.NormalPush(rpc)
}
q.dataAvailable.Signal()
return nil
}
// Note that, when the queue is empty and there are two blocked Pop calls, it
// doesn't mean that the first Pop will get the item from the next Push. The
// second Pop will probably get it instead.
func (q *rpcQueue) Pop(ctx context.Context) (*RPC, error) {
q.queueMu.Lock()
defer q.queueMu.Unlock()
if q.closed {
return nil, ErrQueueClosed
}
unregisterAfterFunc := context.AfterFunc(ctx, func() {
// Wake up all the waiting routines. The only routine that correponds
// to this Pop call will return from the function. Note that this can
// be expensive, if there are too many waiting routines.
q.dataAvailable.Broadcast()
})
defer unregisterAfterFunc()
for q.queue.Len() == 0 {
select {
case <-ctx.Done():
return nil, ErrQueueCancelled
default:
}
q.dataAvailable.Wait()
// It can receive a signal because the queue is closed.
if q.closed {
return nil, ErrQueueClosed
}
}
rpc := q.queue.Pop()
q.spaceAvailable.Signal()
return rpc, nil
}
func (q *rpcQueue) Close() {
q.queueMu.Lock()
defer q.queueMu.Unlock()
q.closed = true
q.dataAvailable.Broadcast()
q.spaceAvailable.Broadcast()
}