diff --git a/kvrocks.conf b/kvrocks.conf index defd99854bc..17fe485426e 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -43,8 +43,11 @@ daemonize no # If you enable cluster, kvrocks will encode key with its slot id calculated by # CRC16 and modulo 16384, encoding key with its slot id makes it efficient to # migrate keys based on the slot. So if you enabled at first time, cluster mode must -# not be disabled after restarting, and vice versa. That is to say, data is not -# compatible between standalone mode with cluster mode, you must migrate data +# not be disabled after restarting, and vice versa. Currently, kvrocks will keep +# using the cluster-enabled status that is persisted at first time, regardless of +# what cluster-enabled status is provided afterwards. +# Note that even if kvrocks has such protection, you should also be aware that data +# is not compatible between standalone mode with cluster mode, you must migrate data # if you want to change mode, otherwise, kvrocks will make data corrupt. # # Default: no diff --git a/src/server/server.cc b/src/server/server.cc index bc4c29391f8..21d910bf6df 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -152,6 +152,11 @@ Status Server::Start() { } } + s = checkClusterMode(); + if (!s.IsOK()) { + return s; + } + if (config_->cluster_enabled) { if (config_->persist_cluster_nodes_enabled) { auto s = cluster->LoadClusterNodes(config_->NodesFilePath()); @@ -1839,6 +1844,27 @@ void Server::cleanupExitedWorkerThreads(bool force) { } } +Status Server::checkClusterMode() { + std::string value; + auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName); + rocksdb::Status check_cluster_enabled = + storage->Get(rocksdb::ReadOptions(), cf, rocksdb::Slice(engine::kClusterEnabledKey), &value); + + if (check_cluster_enabled.IsNotFound()) { + return storage->WriteToPropagateCF(engine::kClusterEnabledKey, std::to_string(config_->cluster_enabled)); + } + if (check_cluster_enabled.ok()) { + auto is_enabled = GET_OR_RET(ParseInt(value, 10)); + if (config_->cluster_enabled != is_enabled) { + LOG(ERROR) << "cluster_enabled status from config file is inconsistent with the persisted one"; + return {Status::NotOK, "inconsistent cluster_enabled status"}; + } + return Status::OK(); + } else { + return {Status::NotOK, "failed to load cluster_enabled from storage: " + check_cluster_enabled.ToString()}; + } +} + std::string ServerLogData::Encode() const { if (type_ == kReplIdLog) { return std::string(1, kReplIdTag) + " " + content_; diff --git a/src/server/server.h b/src/server/server.h index a86eedf1cd8..42a020162c5 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -311,6 +311,7 @@ class Server { void increaseWorkerThreads(size_t delta); void decreaseWorkerThreads(size_t delta); void cleanupExitedWorkerThreads(bool force); + Status checkClusterMode(); std::atomic stop_ = false; std::atomic is_loading_ = false; diff --git a/src/storage/storage.h b/src/storage/storage.h index 6114c7fc437..dfa60659494 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -80,6 +80,8 @@ constexpr const char *kLuaFuncSHAPrefix = "lua_f_"; constexpr const char *kLuaFuncLibPrefix = "lua_func_lib_"; constexpr const char *kLuaLibCodePrefix = "lua_lib_code_"; +const std::string kClusterEnabledKey = "config_cluster_enabled"; + struct CompressionOption { rocksdb::CompressionType type; const std::string name; diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 8bc42fdafdb..d213fe505e3 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -93,6 +93,17 @@ func TestClusterNodes(t *testing.T) { require.EqualValues(t, []redis.ClusterNode{{ID: nodeID, Addr: srv.HostPort()}}, slots[0].Nodes) }) + t.Run("enable/disable cluster-enabled option", func(t *testing.T) { + // force change cluster-enabled status in kvrocks.conf file + srv.ForceChangeClusterMode(false) + defer func() { + srv.ForceChangeClusterMode(true) + srv.Restart(util.RestartOpt{Nowait: false, Noclose: true}) + }() + srv.Restart(util.RestartOpt{Nowait: true, Noclose: false}) + require.ErrorContains(t, rdb.Do(ctx, "clusterx", "version").Err(), "connection refused") + }) + t.Run("enable/disable the persist cluster nodes", func(t *testing.T) { require.NoError(t, rdb.ConfigSet(ctx, "persist-cluster-nodes-enabled", "yes").Err()) srv.Restart() diff --git a/tests/gocase/util/server.go b/tests/gocase/util/server.go index a3f4314e342..7c3e55cc5e2 100644 --- a/tests/gocase/util/server.go +++ b/tests/gocase/util/server.go @@ -28,6 +28,7 @@ import ( "os/exec" "path/filepath" "regexp" + "strings" "sync" "syscall" "testing" @@ -39,6 +40,11 @@ import ( "golang.org/x/exp/slices" ) +type RestartOpt struct { + Nowait bool + Noclose bool +} + type KvrocksServer struct { t testing.TB cmd *exec.Cmd @@ -134,8 +140,38 @@ func (s *KvrocksServer) close(keepDir bool) { s.clean(keepDir) } -func (s *KvrocksServer) Restart() { - s.close(true) +func (s *KvrocksServer) ForceChangeClusterMode(enable bool) { + dir := s.configs["dir"] + f, err := os.OpenFile(filepath.Join(dir, "kvrocks.conf"), os.O_RDWR, 0666) + require.NoError(s.t, err) + defer func() { require.NoError(s.t, f.Close()) }() + + // change the line containing cluster-enabled to no + data, err := os.ReadFile(filepath.Join(dir, "kvrocks.conf")) + require.NoError(s.t, err) + + content := string(data) + var newContent string + if !enable { + newContent = strings.ReplaceAll(content, "cluster-enabled yes", "cluster-enabled no") + } else { + newContent = strings.ReplaceAll(content, "cluster-enabled no", "cluster-enabled yes") + } + err = os.WriteFile(filepath.Join(dir, "kvrocks.conf"), []byte(newContent), 0666) + require.NoError(s.t, err) +} + +func (s *KvrocksServer) Restart(opt ...RestartOpt) { + nowait := false + noclose := false + if len(opt) >= 1 { + nowait = opt[0].Nowait + noclose = opt[0].Noclose + } + + if !noclose { + s.close(true) + } b := *binPath require.NotEmpty(s.t, b, "please set the binary path by `-binPath`") @@ -157,12 +193,14 @@ func (s *KvrocksServer) Restart() { require.NoError(s.t, cmd.Start()) - c := redis.NewClient(&redis.Options{Addr: s.addr.String()}) - defer func() { require.NoError(s.t, c.Close()) }() - require.Eventually(s.t, func() bool { - err := c.Ping(context.Background()).Err() - return err == nil || err.Error() == "NOAUTH Authentication required." - }, time.Minute, time.Second) + if !nowait { + c := redis.NewClient(&redis.Options{Addr: s.addr.String()}) + defer func() { require.NoError(s.t, c.Close()) }() + require.Eventually(s.t, func() bool { + err := c.Ping(context.Background()).Err() + return err == nil || err.Error() == "NOAUTH Authentication required." + }, time.Minute, time.Second) + } s.cmd = cmd s.clean = func(keepDir bool) {