Skip to content

Commit

Permalink
Upsert edge by pair unique index (TuGraph-family#636)
Browse files Browse the repository at this point in the history
* update

* update

* upsert with pair unique index

* fix cpplint

* fix test case error

* fix lint
  • Loading branch information
ljcui authored Aug 16, 2024
1 parent 1152332 commit 4a0d59f
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 19 deletions.
8 changes: 7 additions & 1 deletion include/lgraph/lgraph_txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ class Transaction {
int UpsertEdge(int64_t src, int64_t dst, size_t label_id,
const std::vector<size_t>& unique_pos,
const std::vector<size_t>& field_ids,
const std::vector<FieldData>& field_values);
const std::vector<FieldData>& field_values,
std::optional<size_t> pair_unique_pos);

/**
* @brief List indexes
Expand Down Expand Up @@ -482,6 +483,11 @@ class Transaction {
EdgeIndexIterator GetEdgeIndexIterator(size_t label_id, size_t field_id,
const FieldData& key_start, const FieldData& key_end);

EdgeIndexIterator GetEdgePairUniqueIndexIterator(size_t label_id, size_t field_id,
int64_t src_vid, int64_t dst_vid,
const FieldData& key_start,
const FieldData& key_end);

/**
* @brief Gets vertex index iterator. The iterator has field value [key_start, key_end]. So
* key_start=key_end=v returns an iterator pointing to all vertexes that has field
Expand Down
17 changes: 17 additions & 0 deletions src/core/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,23 @@ EdgeIndexIterator Transaction::GetEdgeIndexIterator(size_t label_id, size_t fiel
return index->GetIterator(this, std::move(ks), std::move(ke), 0);
}

EdgeIndexIterator Transaction::GetEdgePairUniqueIndexIterator(
size_t label_id, size_t field_id, VertexId src_vid, VertexId dst_vid,
const FieldData& key_start, const FieldData& key_end) {
EdgeIndex* index = GetEdgeIndex(label_id, field_id);
if (!index || !index->IsReady() || index->GetType() != IndexType::PairUniqueIndex) {
THROW_CODE(InputError, "Edge pair unique index is not created for this field");
}
Value ks, ke;
if (!key_start.IsNull()) {
ks = field_data_helper::FieldDataToValueOfFieldType(key_start, index->KeyType());
}
if (!key_end.IsNull()) {
ke = field_data_helper::FieldDataToValueOfFieldType(key_end, index->KeyType());
}
return index->GetIterator(this, std::move(ks), std::move(ke), src_vid, dst_vid);
}

EdgeIndexIterator Transaction::GetEdgeIndexIterator(const std::string& label,
const std::string& field,
const std::string& key_start = "",
Expand Down
4 changes: 4 additions & 0 deletions src/core/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,10 @@ class Transaction {
const FieldData& key_start = FieldData(),
const FieldData& key_end = FieldData());

EdgeIndexIterator GetEdgePairUniqueIndexIterator(
size_t label_id, size_t field_id, VertexId src_vid, VertexId dst_vid,
const FieldData& key_start, const FieldData& key_end);

EdgeIndexIterator GetEdgeIndexIterator(const std::string& label, const std::string& field,
const std::string& key_start, const std::string& key_end);

Expand Down
86 changes: 73 additions & 13 deletions src/cypher/procedure/procedure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,13 +679,17 @@ void BuiltinProcedure::DbUpsertVertexByJson(RTContext *ctx, const Record *record
void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
const VEC_EXPR &args, const VEC_STR &yield_items,
std::vector<Record> *records) {
CYPHER_ARG_CHECK(args.size() == 4,
"need 4 parameters, "
"e.g. db.upsertEdge(label_name, start_spec, end_spec, list_data)")
CYPHER_ARG_CHECK(args.size() == 4 || args.size() == 5,
"need 4 or 5 parameters, "
"e.g. db.upsertEdge(label_name, start_spec, end_spec, list_data) or "
"db.upsertEdge(label_name, start_spec, end_spec, list_data, pair_unique_field)")
CYPHER_ARG_CHECK(args[0].IsString(), "label_name type should be string")
CYPHER_ARG_CHECK(args[1].IsMap(), "start_spec type should be map")
CYPHER_ARG_CHECK(args[2].IsMap(), "end_spec type should be map")
CYPHER_ARG_CHECK(args[3].IsArray(), "list_data type should be list")
if (args.size() == 5) {
CYPHER_ARG_CHECK(args[4].IsString(), "pair_unique_field type should be string")
}
CYPHER_DB_PROCEDURE_GRAPH_CHECK();
if (ctx->txn_) ctx->txn_->Abort();
const auto& start = *args[1].constant.map;
Expand All @@ -696,6 +700,10 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
if (!end.count("type") || !end.count("key")) {
THROW_CODE(InputError, "end_spec missing 'type' or 'key'");
}
std::string pair_unique_field;
if (args.size() == 5) {
pair_unique_field = args[4].constant.AsString();
}
std::string start_type = start.at("type").AsString();
std::string start_json_key = start.at("key").AsString();
std::string end_type = end.at("type").AsString();
Expand Down Expand Up @@ -736,11 +744,20 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,

auto index_fds = txn.GetTxn()->ListEdgeIndexByLabel(args[0].constant.AsString());
std::unordered_map<size_t, bool> unique_indexs;
bool pair_unique_configured = false;
for (auto& index : index_fds) {
if (index.type == lgraph_api::IndexType::GlobalUniqueIndex) {
unique_indexs[txn.GetEdgeFieldId(label_id, index.field)] = true;
} else if (index.type == lgraph_api::IndexType::PairUniqueIndex) {
if (!pair_unique_field.empty() && index.field == pair_unique_field) {
pair_unique_configured = true;
}
}
}
if (!pair_unique_field.empty() && !pair_unique_configured) {
THROW_CODE(InputError, "No edge pair unique index is configured for this field: {}",
pair_unique_field);
}

const auto& list = *args[3].constant.array;
int64_t json_total = list.size();
Expand All @@ -749,17 +766,24 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
int64_t insert = 0;
int64_t update = 0;
std::vector<std::tuple<int64_t , int64_t, std::vector<size_t>, std::vector<size_t>,
std::vector<lgraph_api::FieldData>>> lines;
std::vector<lgraph_api::FieldData>, std::optional<size_t> >> lines;
for (auto& line : list) {
int64_t start_vid = -1;
int64_t end_vid = -1;
std::vector<size_t> unique_pos;
std::optional<size_t> pair_unique_pos;
std::vector<size_t> field_ids;
std::vector<lgraph_api::FieldData> fds;
bool success = true;
if (!line.IsMap()) {
THROW_CODE(InputError, "The type of the elements in the list must be map");
}
if (!pair_unique_field.empty()) {
if (!line.map->count(pair_unique_field)) {
json_error++;
continue;
}
}
for (auto& item : *line.map) {
if (item.first == start_json_key) {
auto fd = item.second.scalar;
Expand Down Expand Up @@ -801,13 +825,17 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
field_ids.push_back(iter->second.first);
if (unique_indexs.count(iter->second.first)) {
unique_pos.push_back(field_ids.size() - 1);
} else {
if (!pair_unique_field.empty() && pair_unique_field == item.first) {
pair_unique_pos = field_ids.size() - 1;
}
}
}
}
}
if (success && start_vid >= 0 && end_vid >= 0) {
lines.emplace_back(start_vid, end_vid, std::move(unique_pos),
std::move(field_ids), std::move(fds));
std::move(field_ids), std::move(fds), pair_unique_pos);
} else {
json_error++;
}
Expand All @@ -816,7 +844,8 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
txn = db.CreateWriteTxn();
for (auto& l : lines) {
int ret = txn.UpsertEdge(std::get<0>(l), std::get<1>(l),
label_id, std::get<2>(l), std::get<3>(l), std::get<4>(l));
label_id, std::get<2>(l), std::get<3>(l),
std::get<4>(l), std::get<5>(l));
if (ret == 0) {
index_conflict++;
} else if (ret == 1) {
Expand All @@ -838,13 +867,17 @@ void BuiltinProcedure::DbUpsertEdge(RTContext *ctx, const Record *record,
void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,
const VEC_EXPR &args, const VEC_STR &yield_items,
std::vector<Record> *records) {
CYPHER_ARG_CHECK(args.size() == 4,
"need 4 parameters, "
"e.g. db.upsertEdgeByJson(label_name, start_spec, end_spec, list_data)")
CYPHER_ARG_CHECK(args.size() == 4 || args.size() == 5,
"need 4 or 5 parameters, "
"e.g. db.upsertEdgeByJson(label_name, start_spec, end_spec, list_data) or "
"db.upsertEdgeByJson(label_name, start_spec, end_spec, list_data, pair_unique_field)")
CYPHER_ARG_CHECK(args[0].IsString(), "label_name type should be string")
CYPHER_ARG_CHECK(args[1].IsString(), "start_spec type should be json string")
CYPHER_ARG_CHECK(args[2].IsString(), "end_spec type should be json string")
CYPHER_ARG_CHECK(args[3].IsString(), "list_data type should be json string")
if (args.size() == 5) {
CYPHER_ARG_CHECK(args[4].IsString(), "pair_unique_field type should be json string")
}
CYPHER_DB_PROCEDURE_GRAPH_CHECK();
if (ctx->txn_) ctx->txn_->Abort();
nlohmann::json json_data = nlohmann::json::parse(args[3].constant.AsString());
Expand All @@ -860,6 +893,11 @@ void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,
THROW_CODE(InputError, "end_spec missing 'type' or 'key'");
}

std::string pair_unique_field;
if (args.size() == 5) {
pair_unique_field = args[4].constant.AsString();
}

std::string start_type = start["type"].get<std::string>();
std::string start_json_key = start["key"].get<std::string>();
std::string end_type = end["type"].get<std::string>();
Expand Down Expand Up @@ -900,26 +938,42 @@ void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,

auto index_fds = txn.GetTxn()->ListEdgeIndexByLabel(args[0].constant.AsString());
std::unordered_map<size_t, bool> unique_indexs;
bool pair_unique_configured = false;
for (auto& index : index_fds) {
if (index.type == lgraph_api::IndexType::GlobalUniqueIndex) {
unique_indexs[txn.GetEdgeFieldId(label_id, index.field)] = true;
} else if (index.type == lgraph_api::IndexType::PairUniqueIndex) {
if (!pair_unique_field.empty() && index.field == pair_unique_field) {
pair_unique_configured = true;
}
}
}
if (!pair_unique_field.empty() && !pair_unique_configured) {
THROW_CODE(InputError, "No edge pair unique index is configured for this field: {}",
pair_unique_field);
}

int64_t json_total = json_data.size();
int64_t json_error = 0;
int64_t index_conflict = 0;
int64_t insert = 0;
int64_t update = 0;
std::vector<std::tuple<int64_t , int64_t, std::vector<size_t>, std::vector<size_t>,
std::vector<lgraph_api::FieldData>>> lines;
std::vector<lgraph_api::FieldData>, std::optional<size_t>>> lines;
for (auto& line : json_data) {
int64_t start_vid = -1;
int64_t end_vid = -1;
std::optional<size_t> pair_unique_pos;
std::vector<size_t> unique_pos;
std::vector<size_t> field_ids;
std::vector<lgraph_api::FieldData> fds;
bool success = true;
if (!pair_unique_field.empty()) {
if (!line.count(pair_unique_field)) {
json_error++;
continue;
}
}
for (auto& item : line.items()) {
if (item.key() == start_json_key) {
auto fd = JsonToFieldData(item.value(), start_pf_fs);
Expand Down Expand Up @@ -961,22 +1015,28 @@ void BuiltinProcedure::DbUpsertEdgeByJson(RTContext *ctx, const Record *record,
field_ids.push_back(iter->second.first);
if (unique_indexs.count(iter->second.first)) {
unique_pos.push_back(field_ids.size() - 1);
} else {
if (!pair_unique_field.empty() && pair_unique_field == item.key()) {
pair_unique_pos = field_ids.size() - 1;
}
}
}
}
}
if (success && start_vid >= 0 && end_vid >= 0) {
lines.emplace_back(start_vid, end_vid, std::move(unique_pos),
std::move(field_ids), std::move(fds));
std::move(field_ids), std::move(fds), pair_unique_pos);
} else {
json_error++;
}
}
txn.Abort();
txn = db.CreateWriteTxn();
for (auto& l : lines) {
int ret = txn.UpsertEdge(std::get<0>(l), std::get<1>(l),
label_id, std::get<2>(l), std::get<3>(l), std::get<4>(l));
int ret = txn.UpsertEdge(
std::get<0>(l), std::get<1>(l),
label_id, std::get<2>(l),
std::get<3>(l), std::get<4>(l), std::get<5>(l));
if (ret == 0) {
index_conflict++;
} else if (ret == 1) {
Expand Down
43 changes: 38 additions & 5 deletions src/lgraph_api/lgraph_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,23 +269,45 @@ bool Transaction::UpsertEdge(int64_t src, int64_t dst, size_t label_id,
int Transaction::UpsertEdge(int64_t src, int64_t dst, size_t label_id,
const std::vector<size_t>& unique_pos,
const std::vector<size_t>& field_ids,
const std::vector<FieldData>& field_values) {
const std::vector<FieldData>& field_values,
std::optional<size_t> pair_unique_pos) {
ThrowIfInvalid();
for (auto pos : unique_pos) {
if (pos >= field_ids.size()) {
THROW_CODE(InputError, "unique_pos is out of the field_ids's range");
}
}
auto iter = txn_->GetOutEdgeIterator(EdgeUid(src, dst, label_id, 0, 0), false);
if (iter.IsValid()) {
std::optional<EdgeUid> euid;
if (pair_unique_pos.has_value()) {
if (pair_unique_pos.value() > field_ids.size()) {
THROW_CODE(InputError, "pair_unique_pos is out of the field_ids's range");
}
auto iter = txn_->GetEdgePairUniqueIndexIterator(
label_id, field_ids[pair_unique_pos.value()],
src, dst,
field_values[pair_unique_pos.value()],
field_values[pair_unique_pos.value()]);
if (iter.IsValid()) {
auto uid = iter.GetUid();
if (uid.src == src && uid.dst == dst && uid.lid == label_id) {
euid = uid;
}
}
} else {
auto iter = txn_->GetOutEdgeIterator(EdgeUid(src, dst, label_id, 0, 0), false);
if (iter.IsValid()) {
euid = iter.GetUid();
}
}
if (euid.has_value()) {
for (auto pos : unique_pos) {
auto tmp = txn_->GetEdgeIndexIterator(
label_id, field_ids[pos], field_values[pos], field_values[pos]);
if (tmp.IsValid() && (tmp.GetUid() != iter.GetUid())) {
if (tmp.IsValid() && (tmp.GetUid() != euid.value())) {
return 0;
}
}
txn_->SetEdgeProperty(iter, field_ids, field_values);
txn_->SetEdgeProperty(euid.value(), field_ids, field_values);
return 2;
} else {
for (auto pos : unique_pos) {
Expand Down Expand Up @@ -339,6 +361,16 @@ EdgeIndexIterator Transaction::GetEdgeIndexIterator(size_t label_id, size_t fiel
txn_);
}

EdgeIndexIterator Transaction::GetEdgePairUniqueIndexIterator(size_t label_id, size_t field_id,
int64_t src_vid, int64_t dst_vid,
const FieldData& key_start,
const FieldData& key_end) {
ThrowIfInvalid();
return EdgeIndexIterator(txn_->GetEdgePairUniqueIndexIterator(
label_id, field_id, src_vid, dst_vid, key_start, key_end),
txn_);
}

VertexIndexIterator Transaction::GetVertexIndexIterator(const std::string& label,
const std::string& field,
const FieldData& key_start,
Expand Down Expand Up @@ -501,6 +533,7 @@ OutEdgeIterator Transaction::GetEdgeByUniqueIndex(size_t label_id, size_t field_
euid = eit.GetUid();
return GetOutEdgeIterator(euid, false);
}

size_t Transaction::GetNumVertices() {
ThrowIfInvalid();
return txn_->graph_->GetLooseNumVertex(txn_->GetTxn());
Expand Down
Loading

0 comments on commit 4a0d59f

Please sign in to comment.