Skip to content

Commit

Permalink
Merge branch 'unstable' into persist_cluster-enabled_option
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored Jan 15, 2024
2 parents f3b8266 + d1acbcb commit f3151b9
Show file tree
Hide file tree
Showing 36 changed files with 742 additions and 241 deletions.
16 changes: 8 additions & 8 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ void ReplicationThread::run() {
}

ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"AUTH", srv_->GetConfig()->masterauth}));
SendString(bev, redis::ArrayOfBulkStrings({"AUTH", srv_->GetConfig()->masterauth}));
LOG(INFO) << "[replication] Auth request was sent, waiting for response";
repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
Expand All @@ -418,7 +418,7 @@ ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { //
}

ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"_db_name"}));
SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
LOG(INFO) << "[replication] Check db name request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -456,7 +456,7 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev)
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
SendString(bev, redis::MultiBulkString(data_to_send));
SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
LOG(INFO) << "[replication] replconf request was sent, waiting for response";
return CBState::NEXT;
Expand Down Expand Up @@ -513,11 +513,11 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
// Also use old PSYNC if replica can't find replication id from WAL and DB.
if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || replid.length() != kReplIdLength) {
next_try_old_psync_ = false; // Reset next_try_old_psync_
SendString(bev, redis::MultiBulkString({"PSYNC", std::to_string(next_seq)}));
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id
SendString(bev, redis::MultiBulkString({"PSYNC", replid, std::to_string(next_seq)}));
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid, std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use new psync, current unique replication sequence id: " << replid << ":"
<< cur_seq;
}
Expand Down Expand Up @@ -607,7 +607,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}

ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent *bev) {
SendString(bev, redis::MultiBulkString({"_fetch_meta"}));
SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
LOG(INFO) << "[replication] Start syncing data with fullsync";
return CBState::NEXT;
Expand Down Expand Up @@ -835,7 +835,7 @@ Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) {
std::string auth = srv_->GetConfig()->masterauth;
if (!auth.empty()) {
UniqueEvbuf evbuf;
const auto auth_command = redis::MultiBulkString({"AUTH", auth});
const auto auth_command = redis::ArrayOfBulkStrings({"AUTH", auth});
auto s = util::SockSend(sock_fd, auth_command, ssl);
if (!s.IsOK()) return s.Prefixed("send auth command err");
while (true) {
Expand Down Expand Up @@ -921,7 +921,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const
}
files_str.pop_back();

const auto fetch_command = redis::MultiBulkString({"_fetch_file", files_str});
const auto fetch_command = redis::ArrayOfBulkStrings({"_fetch_file", files_str});
auto s = util::SockSend(sock_fd, fetch_command, ssl);
if (!s.IsOK()) return s.Prefixed("send fetch file command");

Expand Down
25 changes: 12 additions & 13 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ void SlotMigrator::clean() {
}

Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
std::string cmd = redis::MultiBulkString({"auth", password}, false);
std::string cmd = redis::ArrayOfBulkStrings({"auth", password});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send AUTH command");
Expand All @@ -456,7 +456,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};

std::string cmd =
redis::MultiBulkString({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
redis::ArrayOfBulkStrings({"cluster", "import", std::to_string(migrating_slot_), std::to_string(status)});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send command to the destination node");
Expand Down Expand Up @@ -666,7 +666,7 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
*restore_cmds += redis::MultiBulkString(command, false);
*restore_cmds += redis::ArrayOfBulkStrings(command);
current_pipeline_size_++;

// Check whether pipeline needs to be sent
Expand Down Expand Up @@ -747,7 +747,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
if (metadata.Type() != kRedisBitmap) {
item_count++;
if (item_count >= kMaxItemsInCommand) {
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
item_count = 0;
// Have to clear saved items
Expand All @@ -764,13 +764,13 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata

// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;
}

// Add TTL for complex key
if (metadata.expire > 0) {
*restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -809,7 +809,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
if (!s.IsOK()) {
return s;
}
*restore_cmds += redis::MultiBulkString(user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
current_pipeline_size_++;

user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());
Expand All @@ -822,15 +822,14 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad

// commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according to the current values on the source
*restore_cmds += redis::MultiBulkString(
{"XSETID", key.ToString(), metadata.last_generated_id.ToString(), "ENTRIESADDED",
std::to_string(metadata.entries_added), "MAXDELETEDID", metadata.max_deleted_entry_id.ToString()},
false);
*restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(),
"ENTRIESADDED", std::to_string(metadata.entries_added), "MAXDELETEDID",
metadata.max_deleted_entry_id.ToString()});
current_pipeline_size_++;

// Add TTL
if (metadata.expire > 0) {
*restore_cmds += redis::MultiBulkString({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}, false);
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}

Expand Down Expand Up @@ -862,7 +861,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr<
uint32_t offset = (index * 8) + (byte_idx * 8) + bit_idx;
user_cmd->emplace_back(std::to_string(offset));
user_cmd->emplace_back("1");
*restore_cmds += redis::MultiBulkString(*user_cmd, false);
*restore_cmds += redis::ArrayOfBulkStrings(*user_cmd);
current_pipeline_size_++;
user_cmd->erase(user_cmd->begin() + 2, user_cmd->end());
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
void SyncMigrateContext::TimerCB(int, int16_t events) {
auto &&slot_migrator = srv_->slot_migrator;

conn_->Reply(redis::NilString());
conn_->Reply(conn_->NilString());
timer_.reset();

slot_migrator->CancelSyncCtx();
Expand Down
6 changes: 3 additions & 3 deletions src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class BlockingCommander : public Commander,
private EventCallbackBase<BlockingCommander> {
public:
// method to reply when no operation happens
virtual std::string NoopReply() = 0;
virtual std::string NoopReply(const Connection *conn) = 0;

// method to block keys
virtual void BlockKeys() = 0;
Expand All @@ -48,7 +48,7 @@ class BlockingCommander : public Commander,
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
if (conn_->IsInExec()) {
*output = NoopReply();
*output = NoopReply(conn_);
return Status::OK(); // no blocking in multi-exec
}

Expand Down Expand Up @@ -111,7 +111,7 @@ class BlockingCommander : public Commander,
}

void TimerCB(int, int16_t) {
conn_->Reply(NoopReply());
conn_->Reply(NoopReply(conn_));
timer_.reset();
UnblockKeys();
auto bev = conn_->GetBufferEvent();
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class CommandBitfield : public Commander {
str_rets[i] = redis::Integer(rets[i]->Value());
}
} else {
str_rets[i] = redis::NilString();
str_rets[i] = conn->NilString();
}
}
*output = redis::Array(str_rets);
Expand Down
4 changes: 2 additions & 2 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class CommandBFInfo : public Commander {
*output += redis::SimpleString("Number of items inserted");
*output += redis::Integer(info.size);
*output += redis::SimpleString("Expansion rate");
*output += info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion);
*output += info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion);
break;
case BloomInfoType::kCapacity:
*output = redis::Integer(info.capacity);
Expand All @@ -360,7 +360,7 @@ class CommandBFInfo : public Commander {
*output = redis::Integer(info.size);
break;
case BloomInfoType::kExpansion:
*output = info.expansion == 0 ? redis::NilString() : redis::Integer(info.expansion);
*output = info.expansion == 0 ? conn->NilString() : redis::Integer(info.expansion);
break;
}

Expand Down
28 changes: 14 additions & 14 deletions src/commands/cmd_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class CommandGeoDist : public CommandGeoBase {
}

if (s.IsNotFound()) {
*output = redis::NilString();
*output = conn->NilString();
} else {
*output = redis::BulkString(util::Float2String(GetDistanceByUnit(distance)));
}
Expand Down Expand Up @@ -177,7 +177,7 @@ class CommandGeoHash : public Commander {
hashes.resize(members_.size(), "");
}

*output = redis::MultiBulkString(hashes);
*output = conn->MultiBulkString(hashes);
return Status::OK();
}

Expand Down Expand Up @@ -206,16 +206,16 @@ class CommandGeoPos : public Commander {

if (s.IsNotFound()) {
list.resize(members_.size(), "");
*output = redis::MultiBulkString(list);
*output = conn->MultiBulkString(list);
return Status::OK();
}

for (const auto &member : members_) {
auto iter = geo_points.find(member.ToString());
if (iter == geo_points.end()) {
list.emplace_back(redis::NilString());
list.emplace_back(conn->NilString());
} else {
list.emplace_back(redis::MultiBulkString(
list.emplace_back(conn->MultiBulkString(
{util::Float2String(iter->second.longitude), util::Float2String(iter->second.latitude)}));
}
}
Expand Down Expand Up @@ -314,12 +314,12 @@ class CommandGeoRadius : public CommandGeoBase {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
*output = GenerateOutput(geo_points);
*output = GenerateOutput(conn, geo_points);
}
return Status::OK();
}

std::string GenerateOutput(const std::vector<GeoPoint> &geo_points) {
std::string GenerateOutput(const Connection *conn, const std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_;
std::vector<std::string> list;
Expand All @@ -337,8 +337,8 @@ class CommandGeoRadius : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
one.emplace_back(redis::MultiBulkString(
{util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
one.emplace_back(
conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
}
list.emplace_back(redis::Array(one));
}
Expand Down Expand Up @@ -440,7 +440,7 @@ class CommandGeoSearch : public CommandGeoBase {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = generateOutput(geo_points);
*output = generateOutput(conn, geo_points);

return Status::OK();
}
Expand Down Expand Up @@ -496,7 +496,7 @@ class CommandGeoSearch : public CommandGeoBase {
return Status::OK();
}

std::string generateOutput(const std::vector<GeoPoint> &geo_points) {
std::string generateOutput(const Connection *conn, const std::vector<GeoPoint> &geo_points) {
int result_length = static_cast<int>(geo_points.size());
int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_;
std::vector<std::string> output;
Expand All @@ -515,8 +515,8 @@ class CommandGeoSearch : public CommandGeoBase {
one.emplace_back(redis::BulkString(util::Float2String(geo_point.score)));
}
if (with_coord_) {
one.emplace_back(redis::MultiBulkString(
{util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
one.emplace_back(
conn->MultiBulkString({util::Float2String(geo_point.longitude), util::Float2String(geo_point.latitude)}));
}
output.emplace_back(redis::Array(one));
}
Expand Down Expand Up @@ -644,7 +644,7 @@ class CommandGeoRadiusByMember : public CommandGeoRadius {
if (store_key_.size() != 0) {
*output = redis::Integer(geo_points.size());
} else {
*output = GenerateOutput(geo_points);
*output = GenerateOutput(conn, geo_points);
}

return Status::OK();
Expand Down
Loading

0 comments on commit f3151b9

Please sign in to comment.