-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathreceive_queue.go
74 lines (59 loc) · 1.5 KB
/
receive_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Copyright 2022 Kirill Scherba <kirill@scherba.ru>. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// TRU Receive Queue module
package tru
import "sync"
type receiveQueue struct {
ma map[uint32]*Packet // Receive queue map
sync.RWMutex // Receive queue mutex
}
// init receive queue
func (r *receiveQueue) init(ch *Channel) {
r.ma = make(map[uint32]*Packet)
}
// add packet to receive queue
func (r *receiveQueue) add(pac *Packet) {
r.Lock()
defer r.Unlock()
id := uint32(pac.ID())
r.ma[id] = pac
}
// delete packet from receive queue
func (r *receiveQueue) delete(id int) (pac *Packet, ok bool) {
r.Lock()
defer r.Unlock()
pac, ok = r.get(id, false)
if ok {
delete(r.ma, uint32(id))
}
return
}
// get packet from receive queue
func (r *receiveQueue) get(id int, lock ...bool) (pac *Packet, ok bool) {
if len(lock) == 0 || lock[0] {
r.RLock()
defer r.RUnlock()
}
pac, ok = r.ma[uint32(id)]
return
}
// len return receive queue len
func (r *receiveQueue) len() int {
r.RLock()
defer r.RUnlock()
return len(r.ma)
}
// process find packets in received queue, send packets to user level and remove
// it from receive queue
func (r *receiveQueue) process(ch *Channel, send func(ch *Channel, pac *Packet)) (err error) {
id := int(ch.expectedID)
pac, ok := ch.recvQueue.get(id)
if !ok {
return
}
send(ch, pac)
// ch.newExpectedID()
ch.recvQueue.delete(id)
return r.process(ch, send)
}