Skip to content

Commit

Permalink
fix: bunch of bugs and bad configs
Browse files Browse the repository at this point in the history
  • Loading branch information
lucas-jacques committed Oct 11, 2024
1 parent 7f46bb2 commit 6e2ae4b
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 54 deletions.
4 changes: 2 additions & 2 deletions internal/network/wg_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (m *WGDevice) configurePeers(peers []types.Peer, replacePeers bool) error {
for _, member := range peers {
peerConfig, err := member.WGPeerConfig()
if err != nil {
slog.Error("failed to get peer config", "error", err)
return err
slog.Error("failed to get peer config", "error", err, "peer_name", member.Name, "public_key", member.PublicKey.String(), "advertise_address", member.AdvertiseAddress, "allowed_ip", member.AllowedIP, "wg_port", member.WGPort)
continue
}

peerConfigs = append(peerConfigs, peerConfig)
Expand Down
51 changes: 33 additions & 18 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

type SyncedState struct {
stop chan struct{}
finish chan struct{}
config Config
peers map[string]types.Peer
mutex sync.RWMutex
Expand Down Expand Up @@ -42,6 +43,7 @@ func New(config Config) *SyncedState {

return &SyncedState{
stop: make(chan struct{}),
finish: make(chan struct{}),
peers: make(map[string]types.Peer),
config: config,
}
Expand All @@ -55,11 +57,12 @@ func (w *SyncedState) Start(ctx context.Context) error {
return fmt.Errorf("failed to watch: %w", err)
}

slog.Info("Initializing watcher", "subject", sub)
slog.Info("Started watching peers")
updates := watcher.Updates()
w.init(updates)

go func() {
slog.Info("Started continuous peer synchronization")
w.sync(updates)
}()

Expand Down Expand Up @@ -88,28 +91,37 @@ func (w *SyncedState) init(entries <-chan jetstream.KeyValueEntry) {

w.peers[entry.Key()] = peer
}
slog.Info("Initializing peers", "count", len(w.peers))
w.config.OnInitPeers(w.peers)
}

func (w *SyncedState) sync(entries <-chan jetstream.KeyValueEntry) {
for entry := range entries {
if entry == nil {
continue
}

key := entry.Key()

switch entry.Operation() {
case jetstream.KeyValuePut:
peer, err := readPeer(entry.Value())
if err != nil {
slog.Error("failed to read peer", "error", err)
for {

select {
case <-w.stop:
close(w.finish)
return
case entry := <-entries:
if entry == nil {
continue
}
w.onPeerPut(key, peer)
case jetstream.KeyValueDelete:
w.onPeerDelete(key)
case jetstream.KeyValuePurge:
w.onPeerDelete(key)

key := entry.Key()

switch entry.Operation() {
case jetstream.KeyValuePut:
peer, err := readPeer(entry.Value())
if err != nil {
slog.Error("failed to read peer", "error", err)
continue
}
w.onPeerPut(key, peer)
case jetstream.KeyValueDelete:
w.onPeerDelete(key)
case jetstream.KeyValuePurge:
w.onPeerDelete(key)
}
}
}
}
Expand All @@ -118,6 +130,7 @@ func (w *SyncedState) onPeerPut(key string, peer types.Peer) {
if peer.PublicKey == w.config.IgnorePeer {
return
}
slog.Info("Peer put", "public_key", peer.PublicKey.String(), "ip", peer.AllowedIP)
w.mutex.Lock()
w.peers[key] = peer
w.mutex.Unlock()
Expand All @@ -134,12 +147,14 @@ func (w *SyncedState) onPeerDelete(key string) {
return
}

slog.Info("Peer delete", "public_key", peer.PublicKey.String(), "ip", peer.AllowedIP)
delete(w.peers, key)
w.config.OnPeerDelete(peer)
}

func (w *SyncedState) Stop() {
close(w.stop)
<-w.finish
}

func (s *SyncedState) ListPeers() []types.Peer {
Expand Down
4 changes: 2 additions & 2 deletions internal/state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *Store) CreatePeer(ctx context.Context, peer types.Peer) (uint64, error)
return 0, err
}

revision, err := s.kv.Create(ctx, getKey(peer.PrivateCIDR), bytes)
revision, err := s.kv.Create(ctx, getKey(peer.AllowedIP), bytes)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func (s *Store) UpdatePeer(ctx context.Context, peer types.Peer, revision uint64
return 0, err
}

r, err := s.kv.Update(ctx, getKey(peer.PrivateCIDR), bytes, revision)
r, err := s.kv.Update(ctx, getKey(peer.AllowedIP), bytes, revision)
if err != nil {
return 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/commands/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func NewAgentCommand() *cobra.Command {
return err
}

defer ikto.Stop()

ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)

err = server.StartAdminServer(ctx, *ikto, socket)
if err != nil {
return err
}

ikto.Stop()

return nil
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/commands/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func printPeer(peer *proto.Peer) {
fmt.Printf("Name: %s\n", peer.Name)
fmt.Printf("Public Key: %s\n", peer.PublicKey)
fmt.Printf("Advertise Address: %s\n", peer.AdvertiseAddr)
fmt.Printf("Private CIDR: %s\n", peer.PrivateAddr)
fmt.Printf("Allowed IP: %s\n", peer.AllowedIp)
fmt.Printf("WireGuard Port: %d\n", peer.WgPort)
fmt.Println()
}
11 changes: 8 additions & 3 deletions pkg/ikto/ikto.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ func NewIkto(c *Config) (*Ikto, error) {
Name: c.Name,
PublicKey: types.PublicKey(publicKey),
AdvertiseAddress: c.AdvertiseAddress.String(),
PrivateCIDR: c.getPrivateCIDR(),
AllowedIP: c.getPrivateCIDR(),
WGPort: c.WGPort,
}

slog.Info("Starting with self config", "name", self.Name, "public_key", self.PublicKey.String(), "advertise_address", self.AdvertiseAddress, "allowed_ip", self.AllowedIP, "wg_port", self.WGPort, "wg_dev_name", c.WGDevName)

wg, err := network.New(fmt.Sprintf(c.WGDevName), c.WGPort, privateKey)
if err != nil {
return nil, fmt.Errorf("failed to create wg service: %w", err)
Expand All @@ -87,9 +89,11 @@ func NewIkto(c *Config) (*Ikto, error) {
return nil, fmt.Errorf("failed to init wireguard config: %w", err)
}

meshOnes, _ := c.MeshIPNet.Mask.Size()

err = wg.SetAddr(net.IPNet{
IP: c.PrivateAddress,
Mask: net.CIDRMask(c.HostPrefixLength, len(c.PrivateAddress)*8),
Mask: net.CIDRMask(meshOnes, len(c.PrivateAddress)*8),
})
if err != nil {
return nil, fmt.Errorf("failed to set address: %w", err)
Expand Down Expand Up @@ -154,7 +158,7 @@ func NewIkto(c *Config) (*Ikto, error) {
}

func (i *Ikto) init() error {
previous, revision, err := i.store.GetPeer(context.Background(), i.self.PrivateCIDR)
previous, revision, err := i.store.GetPeer(context.Background(), i.self.AllowedIP)
if err != nil && err != jetstream.ErrKeyNotFound {
return fmt.Errorf("failed to get self: % w", err)
}
Expand Down Expand Up @@ -192,6 +196,7 @@ func (i *Ikto) Start() error {
}

func (i *Ikto) Stop() {
slog.Info("stopping")
i.state.Stop()
i.nc.Close()
}
Expand Down
43 changes: 22 additions & 21 deletions pkg/proto/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message Peer {
string name = 1;
string public_key = 2;
string advertise_addr = 3;
string private_addr = 4;
// string private_addr = 4;
int32 wg_port = 5;
string allowed_ip = 6;
}
4 changes: 2 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *server) NodeInfo(context.Context, *emptypb.Empty) (*proto.NodeInfoRespo
Name: peer.Name,
PublicKey: peer.PublicKey.String(),
AdvertiseAddr: peer.AdvertiseAddress,
PrivateAddr: peer.PrivateCIDR,
AllowedIp: peer.AllowedIP,
WgPort: int32(peer.WGPort),
})
}
Expand All @@ -62,7 +62,7 @@ func (s *server) NodeInfo(context.Context, *emptypb.Empty) (*proto.NodeInfoRespo
Name: self.Name,
PublicKey: self.PublicKey.String(),
AdvertiseAddr: self.AdvertiseAddress,
PrivateAddr: self.PrivateCIDR,
AllowedIp: self.AllowedIP,
WgPort: int32(self.WGPort),
},
Peers: peersProto,
Expand Down
4 changes: 2 additions & 2 deletions pkg/types/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ type Peer struct {
Name string `json:"name"`
PublicKey PublicKey `json:"public_key"`
AdvertiseAddress string `json:"advertise_address"`
PrivateCIDR string `json:"private_cidr"`
AllowedIP string `json:"allowed_ip"`
WGPort int `json:"wg_port"`
}

func (p *Peer) WGPeerConfig() (wgtypes.PeerConfig, error) {
_, ipnet, err := net.ParseCIDR(p.PrivateCIDR)
_, ipnet, err := net.ParseCIDR(p.AllowedIP)
if err != nil {
return wgtypes.PeerConfig{}, err
}
Expand Down

0 comments on commit 6e2ae4b

Please sign in to comment.