forked from libp2p/go-libp2p-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfloodsub.go
110 lines (87 loc) · 2.35 KB
/
floodsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package pubsub
import (
"context"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
FloodSubID = protocol.ID("/floodsub/1.0.0")
FloodSubTopicSearchSize = 5
)
// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) {
rt := &FloodSubRouter{
protocols: ps,
}
return NewPubSub(ctx, h, rt, opts...)
}
// NewFloodSub returns a new PubSub object using the FloodSubRouter.
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...)
}
type FloodSubRouter struct {
p *PubSub
protocols []protocol.ID
tracer *pubsubTracer
}
func (fs *FloodSubRouter) Protocols() []protocol.ID {
return fs.protocols
}
func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p
fs.tracer = p.tracer
}
func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
fs.tracer.AddPeer(p, proto)
}
func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
}
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := fs.p.topics[topic]
if !ok {
return false
}
if suggested == 0 {
suggested = FloodSubTopicSearchSize
}
if len(tmap) >= suggested {
return true
}
return false
}
func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
return AcceptAll
}
func (fs *FloodSubRouter) PreValidation([]*Message) {}
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
func (fs *FloodSubRouter) Publish(msg *Message) {
from := msg.ReceivedFrom
topic := msg.GetTopic()
out := rpcWithMessages(msg.Message)
for pid := range fs.p.topics[topic] {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
q, ok := fs.p.peers[pid]
if !ok {
continue
}
err := q.Push(out, false)
if err != nil {
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
continue
}
fs.tracer.SendRPC(out, pid)
}
}
func (fs *FloodSubRouter) Join(topic string) {
fs.tracer.Join(topic)
}
func (fs *FloodSubRouter) Leave(topic string) {
fs.tracer.Leave(topic)
}