Skip to content

Commit

Permalink
Cherry pick changes for rel v0.18.1 (#508)
Browse files Browse the repository at this point in the history
* Added support to run etcd client calls only when in multi-node case (#504)

* Added support to run etcd client calls only when in multi-node case

* Addressed review comments

* Used values from restoreOpts rather than another func call

* Temp fix: skip single member restoration in case of data-dir invalid. (#501)

* Temp fix: skip single member restoration in case of data-dir invalid.

* Small fixes.

* Improved error log.

* Fixed a bug in scaleup feature which present in func: IsMemberInCluster.

* Fix the unit tests.

* Address the review comments.

* Handle the case of when storageProvider is not configured.

* Address the review comments.

* Address the review comments.

* Assigned the correct Peer Address to Etcd after it restores from the backup-bucket. (#505)

* Assigned the correct Peer address to the Etcd after restoration.

* Fix the unit tests and some refactoring.

* Improve some logs.

* Address the review comments.

* To update the peer address of existing cluster.

* Don't update peer URL when promoting member (#506)

* Don't update peer URL when promoting member

* Update interface name in member package

Co-authored-by: Ishan Tyagi <42602577+ishan16696@users.noreply.github.com>
  • Loading branch information
aaronfern and ishan16696 authored Jul 21, 2022
1 parent b1179ae commit f32b99b
Show file tree
Hide file tree
Showing 14 changed files with 457 additions and 150 deletions.
2 changes: 1 addition & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewRestoreCommand(ctx context.Context) *cobra.Command {
}

rs := restorer.NewRestorer(store, logrus.NewEntry(logger))
if err := rs.RestoreAndStopEtcd(*options); err != nil {
if err := rs.RestoreAndStopEtcd(*options, nil); err != nil {
logger.Fatalf("Failed to restore snapshot: %v", err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (cp *Compactor) Compact(ctx context.Context, opts *brtypes.CompactOptions)

// Then restore from the snapshots
r := restorer.NewRestorer(cp.store, cp.logger)
embeddedEtcd, err := r.Restore(*cmpctOptions)
embeddedEtcd, err := r.Restore(*cmpctOptions, nil)
if err != nil {
return nil, fmt.Errorf("unable to restore snapshots during compaction: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ var _ = Describe("Running Compactor", func() {

rstr := restorer.NewRestorer(store, logger)

err = rstr.RestoreAndStopEtcd(*restoreOpts)
err = rstr.RestoreAndStopEtcd(*restoreOpts, nil)

Expect(err).ShouldNot(HaveOccurred())
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger)
Expand Down Expand Up @@ -237,7 +237,7 @@ var _ = Describe("Running Compactor", func() {

rstr := restorer.NewRestorer(store, logger)

err = rstr.RestoreAndStopEtcd(*restoreOpts)
err = rstr.RestoreAndStopEtcd(*restoreOpts, nil)

Expect(err).ShouldNot(HaveOccurred())
err = utils.CheckDataConsistency(testCtx, restoreOpts.Config.RestoreDataDir, keyTo, logger)
Expand Down
32 changes: 20 additions & 12 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,19 @@ import (
func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error {
start := time.Now()
//Etcd cluster scale-up case
m := member.NewMemberControl(e.Config.EtcdConnectionConfig)
ctx := context.Background()
clusterMember, err := m.IsMemberInCluster(ctx)
if !clusterMember && err == nil {
retry.OnError(retry.DefaultBackoff, func(err error) bool {
return err != nil
}, func() error {
return m.AddMemberAsLearner(ctx)
})
// return here after adding member as no restoration or validation needed
return nil
if miscellaneous.IsMultiNode(e.Logger.WithField("actor", "initializer")) {
m := member.NewMemberControl(e.Config.EtcdConnectionConfig)
ctx := context.Background()
clusterMember, err := m.IsMemberInCluster(ctx)
if !clusterMember && err == nil {
retry.OnError(retry.DefaultBackoff, func(err error) bool {
return err != nil
}, func() error {
return m.AddMemberAsLearner(ctx)
})
// return here after adding member as no restoration or validation needed
return nil
}
}

dataDirStatus, err := e.Validator.Validate(mode, failBelowRevision)
Expand All @@ -75,6 +77,11 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6
return fmt.Errorf("error while initializing: %v", err)
}

if dataDirStatus == validator.DataDirStatusUnknownInMultiNode {
metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(time.Since(start).Seconds())
return fmt.Errorf("failed to initialize data dir of cluster member in multi-node cluster")
}

if dataDirStatus == validator.FailBelowRevisionConsistencyError {
metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(time.Since(start).Seconds())
return fmt.Errorf("failed to initialize since fail below revision check failed")
Expand Down Expand Up @@ -160,7 +167,8 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) {
}

rs := restorer.NewRestorer(store, logrus.NewEntry(logger))
if err := rs.RestoreAndStopEtcd(tempRestoreOptions); err != nil {
m := member.NewMemberControl(e.Config.EtcdConnectionConfig)
if err := rs.RestoreAndStopEtcd(tempRestoreOptions, m); err != nil {
err = fmt.Errorf("failed to restore snapshot: %v", err)
return false, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(),
func (d *DataValidator) Validate(mode Mode, failBelowRevision int64) (DataDirStatus, error) {
status, err := d.sanityCheck(failBelowRevision)
if status != DataDirectoryValid {
// TODO: To be removed when backup-restore supports restoration of single member in multi-node etcd cluster.
if d.OriginalClusterSize > 1 && !miscellaneous.IsBackupBucketEmpty(d.Config.SnapstoreConfig, d.Logger) {
return DataDirStatusUnknownInMultiNode, nil
}
return status, err
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/initializer/validator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
DataDirectoryCorrupt
// DataDirectoryStatusUnknown indicates validator failed to check the data directory status.
DataDirectoryStatusUnknown
// DataDirStatusUnknownInMultiNode indicates validator failed to check the data directory status in multi-node etcd cluster.
// TODO: To be removed when backup-restore supports restoration of single member in multi-node etcd cluster.
DataDirStatusUnknownInMultiNode
// RevisionConsistencyError indicates current etcd revision is inconsistent with latest snapshot revision.
RevisionConsistencyError
// FailBelowRevisionConsistencyError indicates the current etcd revision is inconsistent with failBelowRevision.
Expand Down
99 changes: 54 additions & 45 deletions pkg/member/member_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import (
)

const (
// retryPeriod is the peroid after which an operation is retried
retryPeriod = 5 * time.Second
// RetryPeriod is the peroid after which an operation is retried
RetryPeriod = 2 * time.Second

// etcdTimeout is timeout for etcd operations
etcdTimeout = 5 * time.Second
// EtcdTimeout is timeout for etcd operations
EtcdTimeout = 5 * time.Second
)

var (
// ErrMissingMember is a sentient error to describe a case of a member being missing from the member list
ErrMissingMember = errors.New("member missing from member list")
)

// ControlMember interface defines the functionalities needed to manipulate a member in the etcd cluster
type ControlMember interface {
// Control interface defines the functionalities needed to manipulate a member in the etcd cluster
type Control interface {
// AddMemberAsLearner add a new member as a learner to the etcd cluster
AddMemberAsLearner(context.Context) error

Expand All @@ -43,6 +43,9 @@ type ControlMember interface {

// PromoteMember promotes an etcd member from a learner to a voting member of the cluster. This will succeed if and only if learner is in a healthy state and the learner is in sync with leader
PromoteMember(context.Context) error

// UpdateMember updates the peer address of a specified etcd cluster member.
UpdateMember(context.Context, client.ClusterCloser) error
}

// memberControl holds the configuration for the mechanism of adding a new member to the cluster.
Expand All @@ -54,7 +57,7 @@ type memberControl struct {
}

// NewMemberControl returns new ExponentialBackoff.
func NewMemberControl(etcdConnConfig *brtypes.EtcdConnectionConfig) ControlMember {
func NewMemberControl(etcdConnConfig *brtypes.EtcdConnectionConfig) Control {
var configFile string
logger := logrus.New().WithField("actor", "member-add")
etcdConn := *etcdConnConfig
Expand All @@ -71,12 +74,8 @@ func NewMemberControl(etcdConnConfig *brtypes.EtcdConnectionConfig) ControlMembe
logger.Fatalf("Error reading POD_NAME env var : %v", err)
}
//TODO: Refactor needed
etcdConfigForTest := os.Getenv("ETCD_CONF")
if etcdConfigForTest != "" {
configFile = etcdConfigForTest
} else {
configFile = "/var/etcd/config/etcd.conf.yaml"
}
configFile = miscellaneous.GetConfigFilePath()

return &memberControl{
clientFactory: clientFactory,
logger: *logger,
Expand All @@ -88,7 +87,7 @@ func NewMemberControl(etcdConnConfig *brtypes.EtcdConnectionConfig) ControlMembe
// AddMemberAsLearner add a member as a learner to the etcd cluster
func (m *memberControl) AddMemberAsLearner(ctx context.Context) error {
//Add member as learner to cluster
memberURL, err := m.getMemberURL()
memberURL, err := getMemberURL(m.configFile, m.podName)
if err != nil {
m.logger.Fatalf("Error fetching etcd member URL : %v", err)
}
Expand All @@ -99,7 +98,7 @@ func (m *memberControl) AddMemberAsLearner(ctx context.Context) error {
}
defer cli.Close()

memAddCtx, cancel := context.WithTimeout(ctx, etcdTimeout)
memAddCtx, cancel := context.WithTimeout(ctx, EtcdTimeout)
defer cancel()
_, err = cli.MemberAddAsLearner(memAddCtx, []string{memberURL})

Expand All @@ -124,7 +123,7 @@ func (m *memberControl) IsMemberInCluster(ctx context.Context) (bool, error) {
err := retry.OnError(backoff, func(err error) bool {
return err != nil
}, func() error {
etcdProbeCtx, cancel := context.WithTimeout(context.TODO(), etcdTimeout)
etcdProbeCtx, cancel := context.WithTimeout(context.TODO(), EtcdTimeout)
defer cancel()
return miscellaneous.ProbeEtcd(etcdProbeCtx, m.clientFactory, &m.logger)
})
Expand All @@ -144,13 +143,13 @@ func (m *memberControl) IsMemberInCluster(ctx context.Context) (bool, error) {
err = retry.OnError(retry.DefaultBackoff, func(err error) bool {
return err != nil
}, func() error {
memListCtx, cancel := context.WithTimeout(context.TODO(), etcdTimeout)
memListCtx, cancel := context.WithTimeout(context.TODO(), EtcdTimeout)
defer cancel()
etcdMemberList, err = cli.MemberList(memListCtx)
return err
})
if err != nil {
return false, fmt.Errorf("Could not list any etcd members %w", err)
return false, fmt.Errorf("could not list any etcd members %w", err)
}

for _, y := range etcdMemberList.Members {
Expand All @@ -160,56 +159,58 @@ func (m *memberControl) IsMemberInCluster(ctx context.Context) (bool, error) {
}
}

m.logger.Infof("Member %s not part of any running cluster", m.podName)
return false, fmt.Errorf("Could not find member %s in the list", m.podName)
m.logger.Infof("Member %v not part of any running cluster", m.podName)
m.logger.Infof("Could not find member %v in the list", m.podName)
return false, nil
}

func (m *memberControl) getMemberURL() (string, error) {
configYML, err := os.ReadFile(m.configFile)
func getMemberURL(configFile string, podName string) (string, error) {
configYML, err := os.ReadFile(configFile)
if err != nil {
return "", fmt.Errorf("Unable to read etcd config file: %v", err)
return "", fmt.Errorf("unable to read etcd config file: %v", err)
}

config := map[string]interface{}{}
if err := yaml.Unmarshal([]byte(configYML), &config); err != nil {
return "", fmt.Errorf("Unable to unmarshal etcd config yaml file: %v", err)
return "", fmt.Errorf("unable to unmarshal etcd config yaml file: %v", err)
}

initAdPeerURL := config["initial-advertise-peer-urls"]
peerURL, err := parsePeerURL(initAdPeerURL.(string), m.podName)
peerURL, err := parsePeerURL(initAdPeerURL.(string), podName)
if err != nil {
return "", fmt.Errorf("Could not parse peer URL from the config file : %v", err)
return "", fmt.Errorf("could not parse peer URL from the config file : %v", err)
}
return peerURL, nil
}

func parsePeerURL(peerURL, podName string) (string, error) {
tokens := strings.Split(peerURL, "@")
if len(tokens) < 4 {
return "", fmt.Errorf("Invalid peer URL : %s", peerURL)
return "", fmt.Errorf("invalid peer URL : %s", peerURL)
}
domaiName := fmt.Sprintf("%s.%s.%s", tokens[1], tokens[2], "svc")

return fmt.Sprintf("%s://%s.%s:%s", tokens[0], podName, domaiName, tokens[3]), nil
}

// UpdateMemberPeerAddress updated the peer address of a specified etcd member
func (m *memberControl) updateMemberPeerAddress(ctx context.Context, id uint64) error {
// updateMemberPeerAddress updated the peer address of a specified etcd member
func (m *memberControl) updateMemberPeerAddress(ctx context.Context, cli client.ClusterCloser, id uint64) error {
// Already existing clusters have `http://localhost:2380` as the peer address. This needs to explicitly updated to the new address
// TODO: Remove this peer address updation logic on etcd-br v0.20.0
m.logger.Infof("Updating member peer URL for %s", m.podName)
cli, err := m.clientFactory.NewCluster()
if err != nil {
return fmt.Errorf("failed to build etcd cluster client : %v", err)
}

memberURL, err := m.getMemberURL()
memberURL, err := getMemberURL(m.configFile, m.podName)
if err != nil {
return fmt.Errorf("Could not fetch member URL : %v", err)
return fmt.Errorf("could not fetch member URL : %v", err)
}

memberUpdateCtx, cancel := context.WithTimeout(ctx, etcdTimeout)
memberUpdateCtx, cancel := context.WithTimeout(ctx, EtcdTimeout)
defer cancel()

_, err = cli.MemberUpdate(memberUpdateCtx, id, []string{memberURL})
if _, err := cli.MemberUpdate(memberUpdateCtx, id, []string{memberURL}); err == nil {
m.logger.Info("Successfully updated the member peer URL")
return nil
}
return err
}

Expand Down Expand Up @@ -243,27 +244,35 @@ func (m *memberControl) PromoteMember(ctx context.Context) error {

func findMember(existingMembers []*etcdserverpb.Member, memberName string) *etcdserverpb.Member {
for _, member := range existingMembers {
if member.Name == memberName {
if member.GetName() == memberName {
return member
}
}
return nil
}

// UpdateMember updates the peer address of a specified etcd cluster member.
func (m *memberControl) UpdateMember(ctx context.Context, cli client.ClusterCloser) error {
m.logger.Infof("Attempting to update the member Info: %v", m.podName)
ctx, cancel := context.WithTimeout(ctx, brtypes.DefaultEtcdConnectionTimeout)
defer cancel()

membersInfo, err := cli.MemberList(ctx)
if err != nil {
return fmt.Errorf("error listing members: %v", err)
}

return m.updateMemberPeerAddress(ctx, cli, membersInfo.Header.GetMemberId())
}

func (m *memberControl) doPromoteMember(ctx context.Context, member *etcdserverpb.Member, cli client.ClusterCloser) error {
memPromoteCtx, cancel := context.WithTimeout(ctx, brtypes.DefaultEtcdConnectionTimeout)
defer cancel()
_, err := cli.MemberPromote(memPromoteCtx, member.ID) //Member promote call will succeed only if member is in sync with leader, and will error out otherwise
if err == nil { //Member successfully promoted
m.logger.Info("Member promoted ", member.Name, " : ", member.ID)
m.logger.Infof("Member %v with ID: %v has been promoted", member.GetName(), member.GetID())
return nil
} else if errors.Is(err, rpctypes.Error(rpctypes.ErrGRPCMemberNotLearner)) { //Member is not a learner
if member.PeerURLs[0] == "http://localhost:2380" { //[]string{"http://localhost:2380"}[0] {
// Already existing clusters have `http://localhost:2380` as the peer address. This needs to explicitly updated to the new address
// TODO: Remove this peer address updation logic on etcd-br v0.20.0
err = m.updateMemberPeerAddress(ctx, member.ID)
m.logger.Errorf("Could not update member peer URL : %v", err)
}
m.logger.Info("Member ", member.Name, " : ", member.ID, " already part of etcd cluster")
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/member/member_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ var _ = Describe("Membercontrol", func() {

Describe("While attempting to check if etcd is part of a cluster", func() {
Context("When cluster is up and member is not part of the list", func() {
It("Should return an error", func() {
It("Should return false and no error", func() {
mem := member.NewMemberControl(etcdConnectionConfig)
_, err := mem.IsMemberInCluster(context.TODO())
Expect(err).ToNot(BeNil())
bool, err := mem.IsMemberInCluster(context.TODO())
Expect(bool).To(BeFalse())
Expect(err).To(BeNil())
})
})
})

})
Loading

0 comments on commit f32b99b

Please sign in to comment.