Skip to content
This repository has been archived by the owner on Aug 18, 2020. It is now read-only.

Commit

Permalink
Validate diff item origin and target node
Browse files Browse the repository at this point in the history
  • Loading branch information
CatKang committed Oct 13, 2017
1 parent beccbd4 commit 038e57f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 2 deletions.
18 changes: 18 additions & 0 deletions src/meta/zp_meta_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,27 @@ void MigrateCmd::Do(const google::protobuf::Message *req,
if (item.left().ip() == item.right().ip()
&& item.left().port() == item.right().port()) {
// Skip same node
LOG(WARNING) << "Skip diff item with same origin and target node. "
<< item.table() << "_" << item.partition()
<< ", from: " << item.left().ip() << "_" << item.left().port()
<< ", to: " << item.right().ip() << "_" << item.right().port();
continue;
}
if (!g_meta_server->IsCharged(item.table(), item.partition(), item.left())
|| g_meta_server->IsCharged(item.table(), item.partition(),
item.right())) {
// Skip invalid diff item
LOG(WARNING) << "Skip diff invalid diff item. "
<< item.table() << "_" << item.partition()
<< ", from: " << item.left().ip() << "_" << item.left().port()
<< ", to: " << item.right().ip() << "_" << item.right().port();
continue;
}
diffs.push_back(item);
LOG(INFO) << "Migrate diff item: "
<< item.table() << "_" << item.partition()
<< ", from: " << item.left().ip() << "_" << item.left().port()
<< ", to: " << item.right().ip() << "_" << item.right().port();
}

if (diffs.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/zp_meta_condition_cron.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ bool ZPMetaConditionCron::RecoverWhenError(const OffsetCondition& condition) {
switch (condition.type) {
case ConditionTaskType::kMigrate:
migrate_->PutN(1);
break;
// Notice: no break here
case ConditionTaskType::kSetMaster:
s = update_thread_->PendingUpdate(
UpdateTask(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/zp_meta_migrate_register.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Status ZPMetaMigrateRegister::Check(ZPMeta::MigrateStatus* status) {
if (total_size_ == 0) {
return Status::Corruption("totol size be zero");
}
status->set_complete_proportion(1 - diff_keys_.size() * 100 / total_size_);
status->set_complete_proportion(100 - diff_keys_.size() * 100 / total_size_);
return Status::OK();
}

Expand Down
7 changes: 7 additions & 0 deletions src/meta/zp_meta_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,13 @@ Status ZPMetaServer::GetMetaStatus(ZPMeta::MetaCmdResponse_MetaStatus* ms) {
return Status::OK();
}

// Check whether node is response for specified partition
bool ZPMetaServer::IsCharged(const std::string& table,
int pnum, const ZPMeta::Node& target) {
return info_store_->IsSlave(table, pnum, target)
|| info_store_->IsMaster(table, pnum, target);
}

Status ZPMetaServer::GetTableList(std::set<std::string>* table_list) {
return info_store_->GetTableList(table_list);
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/zp_meta_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class ZPMetaServer {
const ZPMeta::Node& node);
Status GetAllMetaNodes(ZPMeta::MetaCmdResponse_ListMeta *nodes);
Status GetMetaStatus(ZPMeta::MetaCmdResponse_MetaStatus* ms);
bool IsCharged(const std::string& table, int pnum, const ZPMeta::Node& node);

// Migrate related
Status Migrate(int epoch, const std::vector<ZPMeta::RelationCmdUnit>& diffs);
Expand Down

0 comments on commit 038e57f

Please sign in to comment.