diff --git a/examples/chat-with-mdns/README.md b/examples/chat-with-mdns/README.md index 8e467e696a..c42fd880f2 100644 --- a/examples/chat-with-mdns/README.md +++ b/examples/chat-with-mdns/README.md @@ -19,10 +19,10 @@ Use two different terminal windows to run ./chat-with-mdns -port 6668 ``` - ## So how does it work? 1. **Configure a p2p host** + ```go ctx := context.Background() @@ -30,28 +30,27 @@ ctx := context.Background() // Other options can be added here. host, err := libp2p.New() ``` + [libp2p.New](https://godoc.org/github.com/libp2p/go-libp2p#New) is the constructor for libp2p node. It creates a host with given configuration. 2. **Set a default handler function for incoming connections.** This function is called on the local peer when a remote peer initiate a connection and starts a stream with the local peer. + ```go // Set a function as stream handler. host.SetStreamHandler("/chat/1.1.0", handleStream) ``` -```handleStream``` is executed for each new stream incoming to the local peer. ```stream``` is used to exchange data between local and remote peer. This example uses non blocking functions for reading and writing from this stream. +```handleStream``` is executed for each new stream incoming to the local peer. ```stream``` is used to exchange data between local and remote peer. This example uses non blocking functions for reading from this stream. ```go func handleStream(stream net.Stream) { - // Create a buffer stream for non blocking read and write. - rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + // Create a buffer stream for non blocking read. + r := bufio.NewReader(stream) - go readData(rw) - go writeData(rw) - - // 'stream' will stay open until you close it (or the other side closes it). + go readData(r) } ``` @@ -63,46 +62,47 @@ New [mdns discovery](https://godoc.org/github.com/libp2p/go-libp2p/p2p/discovery notifee := &discoveryNotifee{PeerChan: make(chan peer.AddrInfo)} ser, err := discovery.NewMdnsService(peerhost, rendezvous, notifee) ``` + register [Notifee interface](https://godoc.org/github.com/libp2p/go-libp2p/p2p/discovery#Notifee) with service so that we get notified about peer discovery ```go - ser.Start() + ser.Start() ``` - - 4. **Open streams to peers found.** Finally we open stream to the peers we found, as we find them ```go - peer := <-peerChan // will block until we discover a peer - // this is used to avoid call `NewStream` from both side - if peer.ID > host.ID() { - // if other end peer id greater than us, don't connect to it, just wait for it to connect us - fmt.Println("Found peer:", peer, " id is greater than us, wait for it to connect to us") - continue - } - fmt.Println("Found peer:", peer, ", connecting") - - if err := host.Connect(ctx, peer); err != nil { - fmt.Println("Connection failed:", err) - continue - } - - // open a stream, this stream will be handled by handleStream other end - stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID)) - - if err != nil { - fmt.Println("Stream open failed", err) - } else { - rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) - - go writeData(rw) - go readData(rw) - fmt.Println("Connected to:", peer) - } + peer := <-peerChan // will block until we discover a peer + fmt.Println("Found peer:", peer, ", connecting") + + if err := host.Connect(ctx, peer); err != nil { + fmt.Println("Connection failed:", err) + continue + } + + // open a stream, this stream will be handled by handleStream other end + stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID)) + if err != nil { + fmt.Println("Stream open failed", err) + + continue + } + + p := &Peer{ + id: string(peer.ID), + stream: stream, + } + p.Start() + + fmt.Println("Connected to:", peer.ID) + + peers.AddPeer(p) + peers.WriteAll(fmt.Sprintf("%s Joined", host.ID().String())) + } ``` ## Authors + 1. Bineesh Lazar diff --git a/examples/chat-with-mdns/main.go b/examples/chat-with-mdns/main.go index 55c36adf40..c3d9fc5686 100644 --- a/examples/chat-with-mdns/main.go +++ b/examples/chat-with-mdns/main.go @@ -17,58 +17,33 @@ import ( ) func handleStream(stream network.Stream) { - fmt.Println("Got a new stream!") - // Create a buffer stream for non-blocking read and write. - rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) - - go readData(rw) - go writeData(rw) + r := bufio.NewReader(stream) - // 'stream' will stay open until you close it (or the other side closes it). + go readData(r) } -func readData(rw *bufio.ReadWriter) { +// Read messages from connected peers +// In this example, we only read from the peers but use another Stream +// to broadcast the messages +func readData(r *bufio.Reader) { for { - str, err := rw.ReadString('\n') + str, err := r.ReadString('\n') if err != nil { fmt.Println("Error reading from buffer") - panic(err) + + return } if str == "" { return } + if str != "\n" { // Green console colour: \x1b[32m // Reset console colour: \x1b[0m fmt.Printf("\x1b[32m%s\x1b[0m> ", str) } - - } -} - -func writeData(rw *bufio.ReadWriter) { - stdReader := bufio.NewReader(os.Stdin) - - for { - fmt.Print("> ") - sendData, err := stdReader.ReadString('\n') - if err != nil { - fmt.Println("Error reading from stdin") - panic(err) - } - - _, err = rw.WriteString(fmt.Sprintf("%s\n", sendData)) - if err != nil { - fmt.Println("Error writing to buffer") - panic(err) - } - err = rw.Flush() - if err != nil { - fmt.Println("Error flushing buffer") - panic(err) - } } } @@ -98,7 +73,7 @@ func main() { sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", cfg.listenHost, cfg.listenPort)) // libp2p.New constructs a new libp2p Host. - // Other options can be added here. + // Other options can be added here host, err := libp2p.New( libp2p.ListenAddrs(sourceMultiAddr), libp2p.Identity(prvKey), @@ -113,32 +88,54 @@ func main() { fmt.Printf("\n[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s\n", cfg.listenHost, cfg.listenPort, host.ID()) + peers := &Peers{ + peers: map[string]*Peer{}, + } + + go func() { + stdReader := bufio.NewReader(os.Stdin) + + for { + fmt.Print("> ") + + sendData, err := stdReader.ReadString('\n') + if err != nil { + fmt.Println("Error reading from stdin") + panic(err) + } + + peers.SendAll(fmt.Sprintf("%s: %s", host.ID().String(), sendData)) + } + }() + peerChan := initMDNS(host, cfg.RendezvousString) for { // allows multiple peers to join peer := <-peerChan // will block until we discover a peer - if peer.ID > host.ID() { - // if other end peer id greater than us, don't connect to it, just wait for it to connect us - fmt.Println("Found peer:", peer, " id is greater than us, wait for it to connect to us") - continue - } - fmt.Println("Found peer:", peer, ", connecting") + fmt.Println("Found peer:", peer.ID, ", connecting") if err := host.Connect(ctx, peer); err != nil { fmt.Println("Connection failed:", err) + continue } // open a stream, this stream will be handled by handleStream other end stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID)) - if err != nil { fmt.Println("Stream open failed", err) - } else { - rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) - go writeData(rw) - go readData(rw) - fmt.Println("Connected to:", peer) + continue + } + + p := &Peer{ + id: string(peer.ID), + stream: stream, } + p.Start() + + fmt.Println("Connected to:", peer.ID) + + peers.AddPeer(p) + peers.SendAll(fmt.Sprintf("%s Joined", host.ID().String())) } } diff --git a/examples/chat-with-mdns/peers.go b/examples/chat-with-mdns/peers.go new file mode 100644 index 0000000000..1d11cacfde --- /dev/null +++ b/examples/chat-with-mdns/peers.go @@ -0,0 +1,80 @@ +package main + +import ( + "bufio" + "fmt" + "sync" + + "github.com/libp2p/go-libp2p/core/network" +) + +type Peers struct { + peers map[string]*Peer + peerMu sync.Mutex +} + +func (p *Peers) AddPeer(peer *Peer) { + p.peerMu.Lock() + defer p.peerMu.Unlock() + + if _, ok := p.peers[peer.id]; !ok { + p.peers[peer.id] = peer + } +} + +func (p *Peers) SendAll(msg string) { + for _, peer := range p.peers { + peer.write(msg) + } +} + +type Peer struct { + stream network.Stream + stdinCh chan string + stopCh chan any + id string +} + +func (p *Peer) Start() { + w := bufio.NewWriter(bufio.NewWriter(p.stream)) + + p.stopCh = make(chan any, 1) + p.stdinCh = make(chan string, 10) + + go p.writeData(w, p.stdinCh, p.stopCh) +} + +func (p *Peer) writeData(w *bufio.Writer, inCh chan string, stopCh chan any) { + for { + select { + case in := <-inCh: + _, err := fmt.Fprintf(w, "%s\n", in) + if err != nil { + fmt.Printf("Error writing to buffer: %s\n", err.Error()) + + return + } + + err = w.Flush() + if err != nil { + fmt.Printf("Error flushing buffer: %s\n", err.Error()) + + return + } + + case <-stopCh: + return + } + } +} + +func (p *Peer) Stop() { + defer close(p.stopCh) + defer close(p.stdinCh) + + p.stopCh <- struct{}{} +} + +func (p *Peer) write(msg string) { + p.stdinCh <- msg +}