Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LB removes invalid Target neighbor entries #549

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
run:
deadline: 5m
timeout: 5m

linters:
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ output-dir:

.PHONY: golangci-lint
golangci-lint:
$(call go-get-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.2)
$(call go-get-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/cmd/golangci-lint@v1.63.4)

.PHONY: proto-compiler
proto-compiler: protoc protoc-gen-go protoc-gen-go-grpc
Expand Down Expand Up @@ -335,6 +335,7 @@ define go-get-tool
set -e ;\
TMP_DIR=$$(mktemp -d) ;\
cd $$TMP_DIR ;\
go version ;\
go mod init tmp ;\
echo "Downloading $(2)" ;\
GOBIN=$(PROJECT_DIR)/bin go install $(2) ;\
Expand Down
2 changes: 1 addition & 1 deletion api/v1/webhook_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var _ = BeforeSuite(func() {
if err != nil {
return err
}
conn.Close()
_ = conn.Close()
return nil
}).Should(Succeed())

Expand Down
2 changes: 1 addition & 1 deletion build/frontend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG USER=meridio
ARG UID=10001
ARG HOME=/home/${USER}

FROM golang:1.22 as build
FROM golang:1.23 as build

ENV GO111MODULE=on
ENV CGO_ENABLED=0
Expand Down
2 changes: 1 addition & 1 deletion build/ipam/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG USER=meridio
ARG UID=10004
ARG HOME=/home/${USER}

FROM golang:1.22 as build
FROM golang:1.23 as build

ENV GO111MODULE=on
ARG meridio_version=0.0.0-unknown
Expand Down
2 changes: 1 addition & 1 deletion build/nsp/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG USER=meridio
ARG UID=10003
ARG HOME=/home/${USER}

FROM golang:1.22 as build
FROM golang:1.23 as build

ARG meridio_version=0.0.0-unknown
ENV GO111MODULE=on
Expand Down
2 changes: 1 addition & 1 deletion build/operator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG USER=meridio
ARG UID=10005
ARG HOME=/home/${USER}

FROM golang:1.22 as build
FROM golang:1.23 as build
ARG LDFLAGS

WORKDIR /workspace
Expand Down
2 changes: 1 addition & 1 deletion build/proxy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG USER=meridio
ARG UID=10005
ARG HOME=/home/${USER}

FROM golang:1.22 as build
FROM golang:1.23 as build

ARG meridio_version=0.0.0-unknown
ENV GO111MODULE=on
Expand Down
2 changes: 1 addition & 1 deletion build/stateless-lb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG USER=meridio
ARG UID=10002
ARG HOME=/home/${USER}

FROM golang:1.22 as build
FROM golang:1.23 as build
ARG meridio_version=0.0.0-unknown
ENV GO111MODULE=on

Expand Down
2 changes: 1 addition & 1 deletion build/tapa/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG USER=tapa
ARG UID=10005
ARG HOME=/home/${USER}

FROM golang:1.22 as build
FROM golang:1.23 as build

ARG meridio_version=0.0.0-unknown
ENV GO111MODULE=on
Expand Down
4 changes: 3 additions & 1 deletion cmd/frontend/internal/bird/bird.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ func (b *RoutingService) Run(ctx context.Context, monitorLogs bool) error {
return
}
// when context is done, close File thus signalling EOF to bufio Scan()
defer w.Close()
defer func() {
_ = w.Close()
}()
<-ctx.Done()
b.logger.Info("Context closed, terminate log monitoring")
}()
Expand Down
4 changes: 3 additions & 1 deletion cmd/frontend/internal/bird/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (r *routingConfig) Apply() error {
if err != nil {
return fmt.Errorf("create %v, err: %v", r.path, err)
}
defer file.Close()
defer func() {
_ = file.Close()
}()

_, err = file.WriteString(r.config)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions cmd/frontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func main() {
if err != nil {
log.Fatal(logger, "Dial NSP", "error", err)
}
defer conn.Close()
defer func() {
_ = conn.Close()
}()

// monitor status of NSP connection and adjust probe status accordingly
if err := connection.Monitor(ctx, health.NSPCliSvc, conn); err != nil {
Expand All @@ -131,7 +133,9 @@ func main() {
if err != nil {
log.Fatal(logger, "Dial loadbalancer", "error", err)
}
defer lbConn.Close()
defer func() {
_ = lbConn.Close()
}()

// create and start frontend service
c := &feConfig.Config{
Expand Down
4 changes: 3 additions & 1 deletion cmd/ipam/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ func main() {
if err != nil {
log.Fatal(logger, "Dial NSP err", "error", err)
}
defer conn.Close()
defer func() {
_ = conn.Close()
}()

// monitor status of NSP connection and adjust probe status accordingly
if err := connection.Monitor(ctx, health.NSPCliSvc, conn); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func setupTLSCert(socket string) error {
certDir := "/tmp/k8s-webhook-server/serving-certs"

go func() {
defer client.Close()
defer func() {
_ = client.Close()
}()
err := client.WatchX509Context(ctx, &x509Watcher{CertDir: certDir})
if err != nil && status.Code(err) != codes.Canceled {
log.Fatal(setupLog, "error watching X.509 context", "error", err)
Expand Down Expand Up @@ -132,7 +134,7 @@ func main() {
"Enabling this will ensure there is only one active controller manager.")

if os.Getenv(common.LogLevelEnv) == "" { // trace as default value
os.Setenv(common.LogLevelEnv, "trace")
_ = os.Setenv(common.LogLevelEnv, "trace")
}

ver := flag.Bool("version", false, "Print version and quit")
Expand Down
16 changes: 12 additions & 4 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ func main() {
if err != nil {
log.Fatal(logger, "Dialing IPAM", "error", err)
}
defer conn.Close()
defer func() {
_ = conn.Close()
}()

// monitor status of IPAM connection and adjust probe status accordingly
if err := connection.Monitor(signalCtx, health.IPAMCliSvc, conn); err != nil {
Expand Down Expand Up @@ -193,7 +195,9 @@ func main() {
if err != nil {
log.Fatal(logger, "Dialing NSP", "error", err)
}
defer nspConn.Close()
defer func() {
_ = nspConn.Close()
}()

// monitor status of NSP connection and adjust probe status accordingly
if err := connection.Monitor(signalCtx, health.NSPCliSvc, nspConn); err != nil {
Expand Down Expand Up @@ -234,14 +238,18 @@ func main() {
cancelSignalCtx()
return
}
defer cc.Close()
defer func() {
_ = cc.Close()
}()
monitorClient := networkservice.NewMonitorConnectionClient(cc)
go nsmmonitor.ConnectionMonitor(ctx, config.Name, monitorClient)

// create and start NSC that connects all remote NSE belonging to the right service
interfaceMonitorClient := interfacemonitor.NewClient(interfaceMonitor, p, netUtils)
nsmClient := service.GetNSC(ctx, &config, nsmAPIClient, p, interfaceMonitorClient, monitorClient)
defer nsmClient.Close()
defer func() {
_ = nsmClient.Close()
}()
go func() {
service.StartNSC(nsmClient, config.NetworkServiceName)
cancelSignalCtx() // let others with proper clean-up gracefully terminate
Expand Down
40 changes: 21 additions & 19 deletions cmd/stateless-lb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,27 @@ import (

// Config for the proxy
type Config struct {
Name string `default:"load-balancer" desc:"Name of the pod"`
ServiceName string `default:"load-balancer" desc:"Name of providing service" split_words:"true"`
ConnectTo url.URL `default:"unix:///var/lib/networkservicemesh/nsm.io.sock" desc:"url to connect to NSM" split_words:"true"`
DialTimeout time.Duration `default:"5s" desc:"timeout to dial NSMgr" split_words:"true"`
RequestTimeout time.Duration `default:"15s" desc:"timeout to request NSE" split_words:"true"`
MaxTokenLifetime time.Duration `default:"24h" desc:"maximum lifetime of tokens" split_words:"true"`
NSPService string `default:"nsp-service:7778" desc:"IP (or domain) and port of the NSP Service" split_words:"true"`
ConduitName string `default:"load-balancer" desc:"Name of the conduit" split_words:"true"`
TrenchName string `default:"default" desc:"Trench the pod is running on" split_words:"true"`
LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"`
Nfqueue string `default:"0:3" desc:"netfilter queue(s) to be used by nfqlb" split_words:"true"`
NfqueueFanout bool `default:"false" desc:"enable fanout nfqueue option" split_words:"true"`
IdentifierOffsetStart int `default:"5000" desc:"Each Stream will get a unique identifier range starting from that value" split_words:"true"`
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout" envconfig:"grpc_keepalive_time"`
GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"`
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
MetricsEnabled bool `default:"false" desc:"Enable the metrics collection" split_words:"true"`
MetricsPort int `default:"2223" desc:"Specify the port used to expose the metrics" split_words:"true"`
Socket url.URL `default:"unix:///var/lib/meridio/lb.sock" desc:"Server socket to host Stream Availability Service" split_words:"true"`
Name string `default:"load-balancer" desc:"Name of the pod"`
ServiceName string `default:"load-balancer" desc:"Name of providing service" split_words:"true"`
ConnectTo url.URL `default:"unix:///var/lib/networkservicemesh/nsm.io.sock" desc:"url to connect to NSM" split_words:"true"`
DialTimeout time.Duration `default:"5s" desc:"timeout to dial NSMgr" split_words:"true"`
RequestTimeout time.Duration `default:"15s" desc:"timeout to request NSE" split_words:"true"`
MaxTokenLifetime time.Duration `default:"24h" desc:"maximum lifetime of tokens" split_words:"true"`
NSPService string `default:"nsp-service:7778" desc:"IP (or domain) and port of the NSP Service" split_words:"true"`
ConduitName string `default:"load-balancer" desc:"Name of the conduit" split_words:"true"`
TrenchName string `default:"default" desc:"Trench the pod is running on" split_words:"true"`
LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"`
Nfqueue string `default:"0:3" desc:"netfilter queue(s) to be used by nfqlb" split_words:"true"`
NfqueueFanout bool `default:"false" desc:"enable fanout nfqueue option" split_words:"true"`
IdentifierOffsetStart int `default:"5000" desc:"Each Stream will get a unique identifier range starting from that value" split_words:"true"`
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout" envconfig:"grpc_keepalive_time"`
GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"`
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
MetricsEnabled bool `default:"false" desc:"Enable the metrics collection" split_words:"true"`
MetricsPort int `default:"2223" desc:"Specify the port used to expose the metrics" split_words:"true"`
Socket url.URL `default:"unix:///var/lib/meridio/lb.sock" desc:"Server socket to host Stream Availability Service" split_words:"true"`
Namespace string `default:"default" desc:"Namespace the pod is running on" split_words:"true"`
TargetDisconnectMonitoring bool `default:"true" desc:"Enable listening to Target disconnect events to clean-up linux neighbor cache" split_words:"true"`
}

// IsValid checks if the configuration is valid
Expand Down
90 changes: 90 additions & 0 deletions cmd/stateless-lb/internal/neighborcache/neighborcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright (c) 2024 OpenInfra Foundation Europe

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package neighborcache

import (
"context"
"net"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/nordix/meridio/pkg/log"
"github.com/vishvananda/netlink"
)

// RemoveInvalid attempts to remove potentially invalid neighbor entries for
// which NSM has reported that the connection was closed, implying that the
// interface has either disappeared or is about to disappear along with its
// IP and MAC addresses. Thus, even if the same IP address reappears shortly
// due to NSM heal successfully fixing or, more accurately, re-establishing the
// connection, communication disturbances caused by an old invalid neighbor
// cache entry can be avoided, which would have otherwise occurred due to the
// behavior of the neighbor state machine (DELAY state and unicast probes).
// Note: The LB monitors TAPA -> Proxy connections, where the SrcIpAddrs refer
// to TAPA-side IPs, including the ones used as Target IPs by the LB.
func RemoveInvalid(ctx context.Context, connectionEvent *networkservice.ConnectionEvent) {
if connectionEvent.Type != networkservice.ConnectionEventType_DELETE {
return
}
logger := log.FromContextOrGlobal(ctx).WithValues("func", "RemoveInvalid")
// Fetch neighbor cache from kernel
neighborList, err := netlink.NeighList(0, 0)
if err != nil {
logger.Info("Could not fetch neighbor list", "err", err)
return
}
// Convert neighbor list to a map
neighborMap := make(map[string][]netlink.Neigh)
for _, neigh := range neighborList {
ipStr := neigh.IP.String()
neighborMap[ipStr] = append(neighborMap[ipStr], neigh)
}

// Remove any of the NSM SrcIpAddrs from the neighbor cache if they are present
eventPrinted := false
for _, connection := range connectionEvent.Connections {
if connection.GetPath() == nil || len(connection.GetPath().GetPathSegments()) < 1 {
continue
}
if connection.GetContext() == nil || connection.GetContext().GetIpContext() == nil {
continue
}
ipContext := connection.GetContext().GetIpContext()
for _, ipStr := range ipContext.SrcIpAddrs {
if ip, _, err := net.ParseCIDR(ipStr); err == nil {
// Check if neighbor map has an entry for this IP
neighs, ok := neighborMap[ip.String()]
if !ok {
continue
}
if !eventPrinted {
eventPrinted = true
logger.Info("Connection event", "event", connectionEvent)
}
for _, neigh := range neighs {
logger.Info("Delete from neighbor cache", "neigh", neigh, "MAC", neigh.HardwareAddr.String())
err := netlink.NeighDel(&netlink.Neigh{
LinkIndex: neigh.LinkIndex,
IP: ip,
})
if err != nil {
logger.Info("Failed to delete from neighbor cache", "neigh", neigh, "MAC", neigh.HardwareAddr.String(), "err", err)
}
}
}
}
}
}
Loading