-
Notifications
You must be signed in to change notification settings - Fork 489
/
Copy pathdht.go
289 lines (254 loc) · 6.82 KB
/
dht.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
// Package dht implements the bittorrent dht protocol. For more information
// see http://www.bittorrent.org/beps/bep_0005.html.
package dht
import (
"encoding/hex"
"errors"
"math"
"net"
"time"
)
const (
// StandardMode follows the standard protocol
StandardMode = iota
// CrawlMode for crawling the dht network.
CrawlMode
)
var (
// ErrNotReady is the error when DHT is not initialized.
ErrNotReady = errors.New("dht is not ready")
// ErrOnGetPeersResponseNotSet is the error that config
// OnGetPeersResponseNotSet is not set when call dht.GetPeers.
ErrOnGetPeersResponseNotSet = errors.New("OnGetPeersResponse is not set")
)
// Config represents the configure of dht.
type Config struct {
// in mainline dht, k = 8
K int
// for crawling mode, we put all nodes in one bucket, so KBucketSize may
// not be K
KBucketSize int
// candidates are udp, udp4, udp6
Network string
// format is `ip:port`
Address string
// the prime nodes through which we can join in dht network
PrimeNodes []string
// the kbucket expired duration
KBucketExpiredAfter time.Duration
// the node expired duration
NodeExpriedAfter time.Duration
// how long it checks whether the bucket is expired
CheckKBucketPeriod time.Duration
// peer token expired duration
TokenExpiredAfter time.Duration
// the max transaction id
MaxTransactionCursor uint64
// how many nodes routing table can hold
MaxNodes int
// callback when got get_peers request
OnGetPeers func(string, string, int)
// callback when receive get_peers response
OnGetPeersResponse func(string, *Peer)
// callback when got announce_peer request
OnAnnouncePeer func(string, string, int)
// blcoked ips
BlockedIPs []string
// blacklist size
BlackListMaxSize int
// StandardMode or CrawlMode
Mode int
// the times it tries when send fails
Try int
// the size of packet need to be dealt with
PacketJobLimit int
// the size of packet handler
PacketWorkerLimit int
// the nodes num to be fresh in a kbucket
RefreshNodeNum int
}
// NewStandardConfig returns a Config pointer with default values.
func NewStandardConfig() *Config {
return &Config{
K: 8,
KBucketSize: 8,
Network: "udp4",
Address: ":6881",
PrimeNodes: []string{
"router.bittorrent.com:6881",
"router.utorrent.com:6881",
"dht.transmissionbt.com:6881",
},
NodeExpriedAfter: time.Duration(time.Minute * 15),
KBucketExpiredAfter: time.Duration(time.Minute * 15),
CheckKBucketPeriod: time.Duration(time.Second * 30),
TokenExpiredAfter: time.Duration(time.Minute * 10),
MaxTransactionCursor: math.MaxUint32,
MaxNodes: 5000,
BlockedIPs: make([]string, 0),
BlackListMaxSize: 65536,
Try: 2,
Mode: StandardMode,
PacketJobLimit: 1024,
PacketWorkerLimit: 256,
RefreshNodeNum: 8,
}
}
// NewCrawlConfig returns a config in crawling mode.
func NewCrawlConfig() *Config {
config := NewStandardConfig()
config.NodeExpriedAfter = 0
config.KBucketExpiredAfter = 0
config.CheckKBucketPeriod = time.Second * 5
config.KBucketSize = math.MaxInt32
config.Mode = CrawlMode
config.RefreshNodeNum = 256
return config
}
// DHT represents a DHT node.
type DHT struct {
*Config
node *node
conn *net.UDPConn
routingTable *routingTable
transactionManager *transactionManager
peersManager *peersManager
tokenManager *tokenManager
blackList *blackList
Ready bool
packets chan packet
workerTokens chan struct{}
}
// New returns a DHT pointer. If config is nil, then config will be set to
// the default config.
func New(config *Config) *DHT {
if config == nil {
config = NewStandardConfig()
}
node, err := newNode(randomString(20), config.Network, config.Address)
if err != nil {
panic(err)
}
d := &DHT{
Config: config,
node: node,
blackList: newBlackList(config.BlackListMaxSize),
packets: make(chan packet, config.PacketJobLimit),
workerTokens: make(chan struct{}, config.PacketWorkerLimit),
}
for _, ip := range config.BlockedIPs {
d.blackList.insert(ip, -1)
}
go func() {
for _, ip := range getLocalIPs() {
d.blackList.insert(ip, -1)
}
ip, err := getRemoteIP()
if err != nil {
d.blackList.insert(ip, -1)
}
}()
return d
}
// IsStandardMode returns whether mode is StandardMode.
func (dht *DHT) IsStandardMode() bool {
return dht.Mode == StandardMode
}
// IsCrawlMode returns whether mode is CrawlMode.
func (dht *DHT) IsCrawlMode() bool {
return dht.Mode == CrawlMode
}
// init initializes global varables.
func (dht *DHT) init() {
listener, err := net.ListenPacket(dht.Network, dht.Address)
if err != nil {
panic(err)
}
dht.conn = listener.(*net.UDPConn)
dht.routingTable = newRoutingTable(dht.KBucketSize, dht)
dht.peersManager = newPeersManager(dht)
dht.tokenManager = newTokenManager(dht.TokenExpiredAfter, dht)
dht.transactionManager = newTransactionManager(
dht.MaxTransactionCursor, dht)
go dht.transactionManager.run()
go dht.tokenManager.clear()
go dht.blackList.clear()
}
// join makes current node join the dht network.
func (dht *DHT) join() {
for _, addr := range dht.PrimeNodes {
raddr, err := net.ResolveUDPAddr(dht.Network, addr)
if err != nil {
continue
}
// NOTE: Temporary node has NOT node id.
dht.transactionManager.findNode(
&node{addr: raddr},
dht.node.id.RawString(),
)
}
}
// listen receives message from udp.
func (dht *DHT) listen() {
go func() {
buff := make([]byte, 8192)
for {
n, raddr, err := dht.conn.ReadFromUDP(buff)
if err != nil {
continue
}
dht.packets <- packet{buff[:n], raddr}
}
}()
}
// id returns a id near to target if target is not null, otherwise it returns
// the dht's node id.
func (dht *DHT) id(target string) string {
if dht.IsStandardMode() || target == "" {
return dht.node.id.RawString()
}
return target[:15] + dht.node.id.RawString()[15:]
}
// GetPeers returns peers who have announced having infoHash.
func (dht *DHT) GetPeers(infoHash string) error {
if !dht.Ready {
return ErrNotReady
}
if dht.OnGetPeersResponse == nil {
return ErrOnGetPeersResponseNotSet
}
if len(infoHash) == 40 {
data, err := hex.DecodeString(infoHash)
if err != nil {
return err
}
infoHash = string(data)
}
neighbors := dht.routingTable.GetNeighbors(
newBitmapFromString(infoHash), dht.routingTable.Len())
for _, no := range neighbors {
dht.transactionManager.getPeers(no, infoHash)
}
return nil
}
// Run starts the dht.
func (dht *DHT) Run() {
dht.init()
dht.listen()
dht.join()
dht.Ready = true
var pkt packet
tick := time.Tick(dht.CheckKBucketPeriod)
for {
select {
case pkt = <-dht.packets:
handle(dht, pkt)
case <-tick:
if dht.routingTable.Len() == 0 {
dht.join()
} else if dht.transactionManager.len() == 0 {
go dht.routingTable.Fresh()
}
}
}
}