-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnect.go
126 lines (118 loc) · 3.61 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"bufio"
"context"
"fmt"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
"log"
"os"
"strings"
)
// TODO more informative/debug-friendly logs: step-by-step, node ID
func processArgs(args []string) ([]peer.ID, [][]multiaddr.Multiaddr, error) {
ids := make([]peer.ID, len(args))
addrsByPeer := make([][]multiaddr.Multiaddr, len(args))
for i, arg := range args {
peerID, multiAddrs, err := processArg(arg)
if err != nil {
return nil, nil, xerrors.Errorf("could not process arg: %v", err)
}
ids[i] = peerID
addrsByPeer[i] = multiAddrs
}
return ids, addrsByPeer, nil
}
func processArg(arg string) (peer.ID, []multiaddr.Multiaddr, error) {
// parse peer ID and multi-addresses
start, end := strings.Index(arg, "["), strings.Index(arg, "]")
id, err := peer.Decode(arg[:start])
if err != nil {
return "", nil, xerrors.Errorf("could not decode Peer ID: %v", err)
}
addrs := strings.Split(arg[start+1:end], " ")
multiAddrs := make([]multiaddr.Multiaddr, len(addrs))
for i, addr := range addrs {
mAddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return "", nil, xerrors.Errorf("could not parse address %s: %v", addr, err)
}
multiAddrs[i] = mAddr
}
return id, multiAddrs, nil
}
func startHost(id peer.ID, addrs ...multiaddr.Multiaddr) (host.Host, error) {
log.Printf("starting host %s at %v...\n", id, addrs)
// load node's private key
name := fmt.Sprintf("%v.txt", id)
bytes, err := os.ReadFile(name)
if err != nil {
return nil, xerrors.Errorf("could not read %s: %v", name, err)
}
privKey, err := crypto.UnmarshalPrivateKey(bytes)
if err != nil {
return nil, xerrors.Errorf("could not deserialize the key: %v", err)
}
// create node
h, err := libp2p.New(libp2p.ListenAddrs(addrs...), libp2p.Identity(privKey))
if err != nil {
return nil, xerrors.Errorf("could not create host %s: %v", id, err)
}
log.Printf("started host %s at %v\n", h.ID(), h.Addrs())
return h, nil
}
func addPeers(h host.Host, peers []peer.ID, addrs [][]multiaddr.Multiaddr) error {
log.Printf("adding peers to peer store...\n")
for i, id := range peers {
h.Peerstore().AddAddrs(id, addrs[i], peerstore.PermanentAddrTTL)
}
log.Printf("added peers\n")
return nil
}
func connectPeers(h host.Host, p protocol.ID, handler network.StreamHandler) {
// listen for incoming streams
h.SetStreamHandler(p, handler)
log.Printf("listening for streams...\n")
// initiate outgoing streams
log.Printf("opening streams...\n")
for _, id := range h.Peerstore().Peers() {
if id == h.ID() {
continue
}
s, err := h.NewStream(context.Background(), id, p)
if err != nil {
// simply fail and expect the other node to connect to us later
log.Printf("could not open stream to %s: %v\n", id, err)
continue
}
log.Printf("opened stream to %s\n", id)
go handler(s)
}
}
func pingOnce(s network.Stream) {
log.Printf("sending/receiving ping...\n")
src, dst := s.Conn().LocalPeer(), s.Conn().RemotePeer()
// send ping
msg := createPing(src, dst, 1)
_, err := s.Write([]byte(msg))
if err != nil {
log.Fatalf("could not write to stream: %v", err)
return
}
fmt.Printf("sent ping: %s", msg)
// listen for ping
reader := bufio.NewReader(s)
msg, err = reader.ReadString(pingDelimiter)
if err != nil {
log.Fatalf("could not read from stream: %v", err)
return
}
fmt.Printf("received ping: %s", msg)
}