Skip to content

Commit

Permalink
Backup-restore now restart the etcd member incase of etcd's advertise…
Browse files Browse the repository at this point in the history
… peerURL found to be updated. (#788) (#794)

Fix for comparing member Peer URLs before updating and invoking stop wrapper endpoint
Added a CLI flag:`--use-etcd-wrapper` to server sub-command.

Co-authored-by: madhav bhargava <madhav.bhargava@sap.com>
Co-authored-by: Shreyas Rao <42259948+shreyas-s-rao@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 17, 2024
1 parent aa43839 commit 14fc3cf
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 37 deletions.
1 change: 1 addition & 0 deletions example/00-backup-restore-server-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ restorationConfig:
autoCompactionRetention: "30m"

defragmentationSchedule: "0 0 */3 * *"
useEtcdWrapper: false

compressionConfig:
enabled: true
Expand Down
50 changes: 32 additions & 18 deletions pkg/member/member_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type Control interface {

// IsLearnerPresent checks for the learner(non-voting) member in a cluster.
IsLearnerPresent(context.Context) (bool, error)

// GetPeerURLs returns the list of current peer URLs of the etcd cluster member.
GetPeerURLs(context.Context, etcdClient.ClusterCloser) ([]string, error)
}

// memberControl holds the configuration for the mechanism of adding a new member to the cluster.
Expand Down Expand Up @@ -113,7 +116,7 @@ func NewMemberControl(etcdConnConfig *brtypes.EtcdConnectionConfig) Control {
// AddMemberAsLearner add a member as a learner to the etcd cluster
func (m *memberControl) AddMemberAsLearner(ctx context.Context) error {
//Add member as learner to cluster
memberURL, err := getMemberPeerURL(m.configFile, m.podName)
memberURL, err := miscellaneous.GetMemberPeerURL(m.configFile, m.podName)
if err != nil {
m.logger.Fatalf("Error fetching etcd member URL : %v", err)
}
Expand Down Expand Up @@ -198,28 +201,12 @@ func (m *memberControl) IsMemberInCluster(ctx context.Context) (bool, error) {
return false, nil
}

func getMemberPeerURL(configFile string, podName string) (string, error) {
config, err := miscellaneous.ReadConfigFileAsMap(configFile)
if err != nil {
return "", err
}
initAdPeerURL := config["initial-advertise-peer-urls"]
if initAdPeerURL == nil {
return "", errors.New("initial-advertise-peer-urls must be set in etcd config")
}
peerURL, err := miscellaneous.ParsePeerURL(initAdPeerURL.(string), podName)
if err != nil {
return "", fmt.Errorf("could not parse peer URL from the config file : %v", err)
}
return peerURL, nil
}

// doUpdateMemberPeerAddress updated the peer address of a specified etcd member
func (m *memberControl) doUpdateMemberPeerAddress(ctx context.Context, cli etcdClient.ClusterCloser, id uint64) error {
// Already existing clusters or cluster after restoration have `http://localhost:2380` as the peer address. This needs to explicitly updated to the correct peer address.
m.logger.Infof("Updating member peer URL for %s", m.podName)

memberPeerURL, err := getMemberPeerURL(m.configFile, m.podName)
memberPeerURL, err := miscellaneous.GetMemberPeerURL(m.configFile, m.podName)
if err != nil {
return fmt.Errorf("could not fetch member URL : %v", err)
}
Expand Down Expand Up @@ -352,6 +339,33 @@ func (m *memberControl) IsClusterScaledUp(ctx context.Context, clientSet client.
return false, nil
}

// GetPeerURLs returns the list of current peer URLs of the etcd cluster member.
func (m *memberControl) GetPeerURLs(ctx context.Context, closer etcdClient.ClusterCloser) ([]string, error) {
var (
etcdMemberList *clientv3.MemberListResponse
err error
)
backoff := miscellaneous.CreateBackoff(RetryPeriod, RetrySteps)

// List all members in etcd cluster
if err = retry.OnError(backoff, func(err error) bool {
return err != nil
}, func() error {
memListCtx, cancel := context.WithTimeout(ctx, EtcdTimeout)
defer cancel()
etcdMemberList, err = closer.MemberList(memListCtx)
return err
}); err != nil {
return nil, fmt.Errorf("could not list any etcd members %w", err)
}
for _, member := range etcdMemberList.Members {
if member.GetName() == m.podName {
return member.GetPeerURLs(), nil
}
}
return []string{}, nil
}

// WasMemberInCluster checks the whether etcd member was part of etcd cluster.
func (m *memberControl) WasMemberInCluster(ctx context.Context, clientSet client.Client) bool {

Expand Down
75 changes: 75 additions & 0 deletions pkg/miscellaneous/miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package miscellaneous

import (
"context"
"crypto/tls"
"crypto/x509"
errored "errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -54,6 +57,9 @@ const (
ScaledToMultiNodeAnnotationKey = "gardener.cloud/scaled-to-multi-node"

https = "https"

// etcdWrapperPort defines the port no. used by etcd-wrapper.
etcdWrapperPort = "9095"
)

// GetLatestFullSnapshotAndDeltaSnapList returns the latest snapshot
Expand Down Expand Up @@ -604,3 +610,72 @@ func RemoveDir(dir string) error {
}
return nil
}

// GetMemberPeerURL returns the peerURL from fiven configuration file provided to etcd member.
func GetMemberPeerURL(configFile string, podName string) (string, error) {
config, err := ReadConfigFileAsMap(configFile)
if err != nil {
return "", err
}
initAdPeerURL := config["initial-advertise-peer-urls"]
if initAdPeerURL == nil {
return "", fmt.Errorf("initial-advertise-peer-urls must be set in etcd config")
}
peerURL, err := ParsePeerURL(initAdPeerURL.(string), podName)
if err != nil {
return "", fmt.Errorf("could not parse peer URL from the config file : %v", err)
}
return peerURL, nil
}

// RestartEtcdWrapper is to call the "/stop" endpoint of etcd-wrapper to restart the etcd-wrapper container.
func RestartEtcdWrapper(ctx context.Context, tlsEnabled bool, etcdConnectionConfig *brtypes.EtcdConnectionConfig) error {
client := &http.Client{}

etcdWrapperURL, err := getEtcdWrapperEndpoint(etcdConnectionConfig.Endpoints)
if err != nil {
return err
}

if tlsEnabled {
caCertPool := x509.NewCertPool()
caCert, err := os.ReadFile(etcdConnectionConfig.CaFile)
if err != nil {
return err
}
caCertPool.AppendCertsFromPEM(caCert)
client.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
},
}
}

httpCtx, cancel := context.WithCancel(ctx)
defer cancel()

req, err := http.NewRequestWithContext(httpCtx, http.MethodPost, fmt.Sprintf("%s/%s", etcdWrapperURL, "stop"), nil)
if err != nil {
return err
}
response, err := client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()

return nil
}

func getEtcdWrapperEndpoint(etcdEndpoints []string) (string, error) {
if len(etcdEndpoints) == 0 {
return "", fmt.Errorf("etcd endpoints are not passed correctly")
}

etcdURL, err := url.Parse(etcdEndpoints[0])
if err != nil {
return "", err
}

return fmt.Sprintf("%s://%s:%s", etcdURL.Scheme, etcdURL.Hostname(), etcdWrapperPort), nil
}
85 changes: 68 additions & 17 deletions pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"sync/atomic"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil/client"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/gardener/etcd-backup-restore/pkg/backoff"
"github.com/gardener/etcd-backup-restore/pkg/defragmentor"
"github.com/gardener/etcd-backup-restore/pkg/errors"
Expand All @@ -28,7 +31,7 @@ import (
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"

"github.com/prometheus/client_golang/prometheus"
cron "github.com/robfig/cron/v3"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/pkg/types"
"k8s.io/client-go/util/retry"
Expand All @@ -44,8 +47,7 @@ type BackupRestoreServer struct {

var (
// runServerWithSnapshotter indicates whether to start server with or without snapshotter.
runServerWithSnapshotter bool = true
retryTimeout = 5 * time.Second
runServerWithSnapshotter = true
)

// NewBackupRestoreServer return new backup restore server.
Expand Down Expand Up @@ -154,7 +156,7 @@ func waitUntilEtcdRunning(ctx context.Context, etcdConnectionConfig *brtypes.Etc
case <-ticker.C:
}
}
logger.Info("Etcd is now running. Continuing br startup")
logger.Info("Etcd is now running. Continuing backup-restore startup.")
return nil
}

Expand Down Expand Up @@ -217,19 +219,7 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype
return err
}

m := member.NewMemberControl(b.config.EtcdConnectionConfig)
if err := retry.OnError(retry.DefaultBackoff, errors.IsErrNotNil, func() error {
cli, err := etcdutil.NewFactory(*b.config.EtcdConnectionConfig).NewCluster()
if err != nil {
return err
}
defer cli.Close()

if err := m.UpdateMemberPeerURL(ctx, cli); err != nil {
return err
}
return nil
}); err != nil {
if err := b.updatePeerURLIfChanged(ctx, handler.EnableTLS, b.logger.Logger); err != nil {
b.logger.Errorf("failed to update member peer url: %v", err)
}

Expand Down Expand Up @@ -328,6 +318,50 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype
return le.Run(ctx)
}

func (b *BackupRestoreServer) updatePeerURLIfChanged(ctx context.Context, tlsEnabled bool, logger *logrus.Logger) error {
logger.Info("Checking if peerURL has changed or not.")

m := member.NewMemberControl(b.config.EtcdConnectionConfig)

cli, err := etcdutil.NewFactory(*b.config.EtcdConnectionConfig).NewCluster()
if err != nil {
return err
}
defer func() {
if err := cli.Close(); err != nil {
b.logger.Errorf("failed to close etcd client: %v", err)
}
}()

changed, err := hasPeerURLChanged(ctx, m, cli)
if err != nil {
return err
}
if changed {
b.logger.Info("Etcd member peerURLs found to be changed.")
if err = retry.OnError(retry.DefaultBackoff, errors.IsErrNotNil, func() error {
if err = m.UpdateMemberPeerURL(ctx, cli); err != nil {
return err
}
b.logger.Info("Successfully updated the peerURLs for etcd member.")
return nil
}); err != nil {
return err
}
if b.config.UseEtcdWrapper {
if err := miscellaneous.RestartEtcdWrapper(ctx, tlsEnabled, b.config.EtcdConnectionConfig); err != nil {
b.logger.Fatalf("failed to restart the etcd-wrapper: %v", err)
}
} else {
b.logger.Info("Usage of etcd-wrapper found to be disabled")
b.logger.Warnf("To correcly reflect peerURLs in etcd cluster. Please restart the etcd member. More info: https://etcd.io/docs/v3.5/op-guide/runtime-configuration/#update-advertise-peer-urls")
}
} else {
b.logger.Info("No change in peerURLs found. Skipping update of member peer URLs.")
}
return nil
}

// runEtcdProbeLoopWithSnapshotter runs the etcd probe loop
// for the case when backup-restore becomes leading sidecar.
func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ss brtypes.SnapStore, ssrStopCh <-chan struct{}, ackCh chan<- struct{}) {
Expand Down Expand Up @@ -535,3 +569,20 @@ func handleSsrStopRequest(ctx context.Context, handler *HTTPHandler, _ *snapshot
}
}
}

func hasPeerURLChanged(ctx context.Context, m member.Control, cli client.ClusterCloser) (bool, error) {
podName, err := miscellaneous.GetEnvVarOrError("POD_NAME")
if err != nil {
return false, fmt.Errorf("error reading POD_NAME env var : %v", err)
}

peerURLsFromEtcdConfig, err := miscellaneous.GetMemberPeerURL(miscellaneous.GetConfigFilePath(), podName)
if err != nil {
return false, err
}
existingPeerURLs, err := m.GetPeerURLs(ctx, cli)
if err != nil {
return false, err
}
return sets.New[string](peerURLsFromEtcdConfig).Difference(sets.New[string](existingPeerURLs...)).Len() > 0, nil
}
2 changes: 2 additions & 0 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewBackupRestoreComponentConfig() *BackupRestoreComponentConfig {
HealthConfig: brtypes.NewHealthConfig(),
LeaderElectionConfig: brtypes.NewLeaderElectionConfig(),
ExponentialBackoffConfig: brtypes.NewExponentialBackOffConfig(),
UseEtcdWrapper: usageOfEtcdWrapperEnabled,
}
}

Expand All @@ -47,6 +48,7 @@ func (c *BackupRestoreComponentConfig) AddFlags(fs *flag.FlagSet) {

// Miscellaneous
fs.StringVar(&c.DefragmentationSchedule, "defragmentation-schedule", c.DefragmentationSchedule, "schedule to defragment etcd data directory")
fs.BoolVar(&c.UseEtcdWrapper, "use-etcd-wrapper", c.UseEtcdWrapper, "to enable backup-restore to use etcd-wrapper related functionality. Note: enable this flag only if etcd-wrapper is deployed.")
}

// Validate validates the config.
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
const (
defaultServerPort = 8080
defaultDefragmentationSchedule = "0 0 */3 * *"
// to enable backup-restore to use etcd-wrapper related functionality.
usageOfEtcdWrapperEnabled = false
)

// BackupRestoreComponentConfig holds the component configuration.
Expand All @@ -26,6 +28,7 @@ type BackupRestoreComponentConfig struct {
HealthConfig *brtypes.HealthConfig `json:"healthConfig,omitempty"`
LeaderElectionConfig *brtypes.Config `json:"leaderElectionConfig,omitempty"`
ExponentialBackoffConfig *brtypes.ExponentialBackoffConfig `json:"exponentialBackoffConfig,omitempty"`
UseEtcdWrapper bool `json:"useEtcdWrapper,omitempty"`
}

// latestSnapshotMetadata holds snapshot details of latest full and delta snapshots
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/etcdconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
defaultEtcdConnectionEndpoint string = "127.0.0.1:2379"
defaultEtcdConnectionEndpoint string = "http://127.0.0.1:2379"

// DefaultEtcdConnectionTimeout defines default timeout duration for etcd client connection.
DefaultEtcdConnectionTimeout time.Duration = 30 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/integration/cloud_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ enable-v2: false
quota-backend-bytes: 1073741824
listen-client-urls: http://0.0.0.0:2379
advertise-client-urls: http://0.0.0.0:2379
initial-advertise-peer-urls: http@etcd-main-peer@default@2380
initial-advertise-peer-urls: http://0.0.0.0:2380
initial-cluster: etcd1=http://0.0.0.0:2380
initial-cluster-token: new
initial-cluster-state: new
Expand Down

0 comments on commit 14fc3cf

Please sign in to comment.