From 5b6359a842eee7e619716628c8cb94900b5ba62d Mon Sep 17 00:00:00 2001 From: Mahmoud Ismail Date: Wed, 6 Jul 2016 15:16:51 +0200 Subject: [PATCH] Fix update for Dataset and Project table tailers --- include/DatasetTableTailer.h | 4 + include/ProjectTableTailer.h | 4 + include/TableTailer.h | 1 - src/DatasetTableTailer.cpp | 208 +++++++++++++++++++++++------------ src/ProjectTableTailer.cpp | 150 +++++++++++++++---------- src/TableTailer.cpp | 14 +-- 6 files changed, 240 insertions(+), 141 deletions(-) diff --git a/include/DatasetTableTailer.h b/include/DatasetTableTailer.h index 90181183..93780a9a 100644 --- a/include/DatasetTableTailer.h +++ b/include/DatasetTableTailer.h @@ -40,6 +40,10 @@ class DatasetTableTailer : public TableTailer{ private: static const WatchTable TABLE; virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]); + void handleDelete(int datasetId, int projectId); + void handleAdd(int datasetId, int projectId, NdbRecAttr* value[]); + void handleUpdate(NdbRecAttr* value[]); + string createJSONUpSert(int projectId, NdbRecAttr* value[]); ElasticSearch* mElasticSearch; ProjectDatasetINodeCache* mPDICache; diff --git a/include/ProjectTableTailer.h b/include/ProjectTableTailer.h index b979bb81..c093640f 100644 --- a/include/ProjectTableTailer.h +++ b/include/ProjectTableTailer.h @@ -39,6 +39,10 @@ class ProjectTableTailer : public TableTailer{ static const WatchTable TABLE; virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]); + void handleDelete(int projectId); + void handleAdd(int projectId, NdbRecAttr* value[]); + void handleUpdate(int projectId, NdbRecAttr* value[]); + string createJSONUpSert(NdbRecAttr* value[]); ElasticSearch* mElasticSearch; ProjectDatasetINodeCache* mPDICache; diff --git a/include/TableTailer.h b/include/TableTailer.h index db6fa5cd..c7b0f6c9 100644 --- a/include/TableTailer.h +++ b/include/TableTailer.h @@ -62,7 +62,6 @@ class TableTailer { void removeListenerEvent(); void waitForEvents(); void run(); - bool correctResult(NdbDictionary::Event::TableEvent event, NdbRecAttr* values[]); const char* getEventName(NdbDictionary::Event::TableEvent event); bool mStarted; diff --git a/src/DatasetTableTailer.cpp b/src/DatasetTableTailer.cpp index e02a702d..a6dcb561 100644 --- a/src/DatasetTableTailer.cpp +++ b/src/DatasetTableTailer.cpp @@ -27,7 +27,7 @@ using namespace Utils::NdbC; const string _dataset_table= "dataset"; -const int _dataset_noCols= 8; +const int _dataset_noCols= 9; const string _dataset_cols[_dataset_noCols]= {"inode_id", "inode_pid", @@ -36,7 +36,8 @@ const string _dataset_cols[_dataset_noCols]= "description", "public_ds", "searchable", - "shared" + "shared", + "id" }; const int _dataset_noEvents = 3; @@ -48,6 +49,15 @@ const NdbDictionary::Event::TableEvent _dataset_events[_dataset_noEvents] = const WatchTable DatasetTableTailer::TABLE = {_dataset_table, _dataset_cols, _dataset_noCols , _dataset_events, _dataset_noEvents, "inode_id", "inode_id"}; +const int DS_INODE_ID = 0; +const int DS_INODE_PID = 1; +const int DS_INODE_NAME = 2; +const int DS_PROJ_ID = 3; +const int DS_DESC = 4; +const int DS_PUBLIC = 5; +const int DS_SEARCH = 6; +const int DS_SHARED = 7; +const int DS_ID_PK = 8; DatasetTableTailer::DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, ElasticSearch* elastic,ProjectDatasetINodeCache* cache) @@ -55,93 +65,151 @@ DatasetTableTailer::DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, } void DatasetTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]) { - int id = value[0]->int32_value(); - int projectId = value[3]->int32_value(); - bool originalDS = value[7]->int8_value() == 0; - - if(!originalDS){ - LOG_DEBUG("Ignore shared Dataset [" << id << "] in Project [" << projectId << "]"); + int datasetId = value[DS_INODE_ID]->int32_value(); + int projectId = value[DS_PROJ_ID]->int32_value(); + bool originalDS = value[DS_SHARED]->int8_value() == 0; + + if (!originalDS && value[DS_SHARED]->isNULL() != -1) { + LOG_DEBUG("Ignore shared Dataset [" << datasetId << "] in Project [" << projectId << "]"); return; } + + switch (eventType) { + case NdbDictionary::Event::TE_INSERT: + handleAdd(datasetId, projectId, value); + break; + case NdbDictionary::Event::TE_DELETE: + handleDelete(datasetId, projectId); + break; + case NdbDictionary::Event::TE_UPDATE: + handleUpdate(value); + break; + } +} + +void DatasetTableTailer::handleAdd(int datasetId, int projectId, NdbRecAttr* value[]) { + mPDICache->addDatasetToProject(datasetId, projectId); - if(eventType == NdbDictionary::Event::TE_DELETE){ - - rapidjson::StringBuffer sbDoc; - rapidjson::Writer docWriter(sbDoc); - docWriter.StartObject(); - docWriter.String("query"); + string data = createJSONUpSert(projectId, value); + if (mElasticSearch->addDataset(projectId, datasetId, data)) { + LOG_INFO("Add Dataset[" << datasetId << "]: Succeeded"); + } + + const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection); + NdbTransaction* transaction = startNdbTransaction(mNdbConnection); + Recovery::checkpointDataset(database, transaction, datasetId); + transaction->close(); +} + +void DatasetTableTailer::handleDelete(int datasetId, int projectId) { + rapidjson::StringBuffer sbDoc; + rapidjson::Writer docWriter(sbDoc); + docWriter.StartObject(); + docWriter.String("query"); + + docWriter.StartObject(); - docWriter.StartObject(); + docWriter.String("match"); + docWriter.StartObject(); + docWriter.String("dataset_id"); + docWriter.Int(datasetId); + docWriter.EndObject(); - docWriter.String("match"); - docWriter.StartObject(); - docWriter.String("dataset_id"); - docWriter.Int(id); - docWriter.EndObject(); + docWriter.EndObject(); - docWriter.EndObject(); + docWriter.EndObject(); + + //TODO: handle failures in elastic search + if (mElasticSearch->deleteDatasetChildren(projectId, datasetId, string(sbDoc.GetString()))) { + LOG_INFO("Delete Dataset[" << datasetId << "] children inodes: Succeeded"); + } + + if (mElasticSearch->deleteDataset(projectId, datasetId)) { + LOG_INFO("Delete Dataset[" << datasetId << "]: Succeeded"); + } - docWriter.EndObject(); + mPDICache->removeDataset(datasetId); +} + +void DatasetTableTailer::handleUpdate(NdbRecAttr* value[]) { + int datasetPK = value[DS_ID_PK]->int32_value(); + int datasetId = -1; + int projectId = -1; + if(value[DS_INODE_ID]->isNULL() == -1){ + const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection); + const NdbDictionary::Table* table = getTable(database, TABLE.mTableName); + NdbTransaction* transaction = startNdbTransaction(mNdbConnection); + NdbOperation* op = getNdbOperation(transaction, table); - //TODO: handle failures in elastic search - if(mElasticSearch->deleteDatasetChildren(projectId, id, string(sbDoc.GetString()))){ - LOG_INFO("Delete Dataset[" << id << "] children inodes: Succeeded"); - } + op->readTuple(NdbOperation::LM_CommittedRead); - if(mElasticSearch->deleteDataset(projectId, id)){ - LOG_INFO("Delete Dataset[" << id << "]: Succeeded"); - } - - mPDICache->removeDataset(id); + op->equal(_dataset_cols[8].c_str(), datasetPK); - return; + NdbRecAttr* datasetIdCol = getNdbOperationValue(op, _dataset_cols[DS_INODE_ID]); + NdbRecAttr* projectIdCol = getNdbOperationValue(op, _dataset_cols[DS_PROJ_ID]); + + executeTransaction(transaction, NdbTransaction::Commit); + datasetId = datasetIdCol->int32_value(); + projectId = projectIdCol->int32_value(); + + transaction->close(); } - mPDICache->addDatasetToProject(id, projectId); + if(datasetId == -1 || projectId == -1){ + LOG_ERROR("Couldn't resolve projectId[" << projectId << "] or datasetId[" << datasetId << "]"); + return; + } + + string data = createJSONUpSert(projectId, value); + if (mElasticSearch->addDataset(projectId, datasetId, data)) { + LOG_INFO("Update Dataset[" << datasetId << "]: Succeeded"); + } +} + +string DatasetTableTailer::createJSONUpSert(int porjectId, NdbRecAttr* value[]) { rapidjson::StringBuffer sbDoc; rapidjson::Writer docWriter(sbDoc); docWriter.StartObject(); - + docWriter.String("doc"); docWriter.StartObject(); - - docWriter.String("parent_id"); - docWriter.Int(value[1]->int32_value()); - - docWriter.String("name"); - docWriter.String(get_string(value[2]).c_str()); - + + if (value[DS_INODE_PID]->isNULL() != -1) { + docWriter.String("parent_id"); + docWriter.Int(value[DS_INODE_PID]->int32_value()); + } + + if (value[DS_INODE_NAME]->isNULL() != -1) { + docWriter.String("name"); + docWriter.String(get_string(value[DS_INODE_NAME]).c_str()); + } + docWriter.String("project_id"); - docWriter.Int(projectId); - - docWriter.String("description"); - docWriter.String(get_string(value[4]).c_str()); - - bool public_ds = value[5]->int8_value() == 1; - - docWriter.String("public_ds"); - docWriter.Bool(public_ds); - - bool searchable = value[6]->int8_value() == 1; - - docWriter.String("searchable"); - docWriter.Bool(searchable); - + docWriter.Int(porjectId); + + if (value[DS_DESC]->isNULL() != -1) { + docWriter.String("description"); + docWriter.String(get_string(value[DS_DESC]).c_str()); + } + + if (value[DS_PUBLIC]->isNULL() != -1) { + bool public_ds = value[DS_PUBLIC]->int8_value() == 1; + docWriter.String("public_ds"); + docWriter.Bool(public_ds); + } + + if (value[DS_SEARCH]->isNULL() != -1) { + bool searchable = value[DS_SEARCH]->int8_value() == 1; + docWriter.String("searchable"); + docWriter.Bool(searchable); + } + docWriter.EndObject(); docWriter.String("doc_as_upsert"); docWriter.Bool(true); docWriter.EndObject(); - - string data = string(sbDoc.GetString()); - if(mElasticSearch->addDataset(projectId, id, data)){ - LOG_INFO("Add Dataset[" << id << "]: Succeeded"); - } - - const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection); - NdbTransaction* transaction = startNdbTransaction(mNdbConnection); - Recovery::checkpointDataset(database, transaction, id); - transaction->close(); + return string(sbDoc.GetString()); } void DatasetTableTailer::updateProjectIds(const NdbDictionary::Dictionary* database, @@ -155,11 +223,11 @@ void DatasetTableTailer::updateProjectIds(const NdbDictionary::Dictionary* datab NdbIndexScanOperation* op = getNdbIndexScanOperation(transaction, index); op->readTuples(NdbOperation::LM_CommittedRead); - op->equal(_dataset_cols[0].c_str(), *it); + op->equal(_dataset_cols[DS_INODE_ID].c_str(), *it); - NdbRecAttr* id_col = getNdbOperationValue(op, _dataset_cols[0]); - NdbRecAttr* proj_id_col = getNdbOperationValue(op, _dataset_cols[3]); - NdbRecAttr* shared_col = getNdbOperationValue(op, _dataset_cols[7]); + NdbRecAttr* id_col = getNdbOperationValue(op, _dataset_cols[DS_INODE_ID]); + NdbRecAttr* proj_id_col = getNdbOperationValue(op, _dataset_cols[DS_PROJ_ID]); + NdbRecAttr* shared_col = getNdbOperationValue(op, _dataset_cols[DS_SHARED]); rows[*it].push_back(id_col); rows[*it].push_back(proj_id_col); rows[*it].push_back(shared_col); diff --git a/src/ProjectTableTailer.cpp b/src/ProjectTableTailer.cpp index d2ced97f..a1424791 100644 --- a/src/ProjectTableTailer.cpp +++ b/src/ProjectTableTailer.cpp @@ -45,82 +45,118 @@ const NdbDictionary::Event::TableEvent _project_events[_project_noEvents] = const WatchTable ProjectTableTailer::TABLE = {_project_table, _project_cols, _project_noCols , _project_events, _project_noEvents, "PRIMARY", _project_cols[0]}; +const int PR_ID = 0; +const int PR_INODE_PID = 1; +const int PR_INODE_NAME = 2; +const int PR_USER = 3; +const int PR_DESC = 4; + ProjectTableTailer::ProjectTableTailer(Ndb* ndb, const int poll_maxTimeToWait, ElasticSearch* elastic, ProjectDatasetINodeCache* cache) : TableTailer(ndb, TABLE, poll_maxTimeToWait), mElasticSearch(elastic), mPDICache(cache){ } void ProjectTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]) { - int id = value[0]->int32_value(); - - if(eventType == NdbDictionary::Event::TE_DELETE){ - - rapidjson::StringBuffer sbDoc; - rapidjson::Writer docWriter(sbDoc); - docWriter.StartObject(); - docWriter.String("query"); - - docWriter.StartObject(); - - docWriter.String("match"); - docWriter.StartObject(); - docWriter.String("project_id"); - docWriter.Int(id); - docWriter.EndObject(); - - docWriter.EndObject(); - - docWriter.EndObject(); - - //TODO: handle failures in elastic search - if(mElasticSearch->deleteProjectChildren(id, string(sbDoc.GetString()))){ - LOG_INFO("Delete Project[" << id << "] children inodes and datasets : Succeeded"); - } - - if (mElasticSearch->deleteProject(id)) { - LOG_INFO("Delete Project[" << id << "]: Succeeded"); - } - - mPDICache->removeProject(id); - - return; + int projectId = value[PR_ID]->int32_value(); + + switch (eventType) { + case NdbDictionary::Event::TE_INSERT: + handleAdd(projectId, value); + break; + case NdbDictionary::Event::TE_DELETE: + handleDelete(projectId); + break; + case NdbDictionary::Event::TE_UPDATE: + handleUpdate(projectId, value); + break; } - +} + +void ProjectTableTailer::handleDelete(int projectId) { rapidjson::StringBuffer sbDoc; rapidjson::Writer docWriter(sbDoc); docWriter.StartObject(); - - docWriter.String("doc"); + docWriter.String("query"); + + docWriter.StartObject(); + + docWriter.String("match"); docWriter.StartObject(); - - docWriter.String("parent_id"); - docWriter.Int(value[1]->int32_value()); - - docWriter.String("name"); - docWriter.String(get_string(value[2]).c_str()); - - docWriter.String("user"); - docWriter.String(get_string(value[3]).c_str()); - - docWriter.String("description"); - docWriter.String(get_string(value[4]).c_str()); - + docWriter.String("project_id"); + docWriter.Int(projectId); docWriter.EndObject(); - docWriter.String("doc_as_upsert"); - docWriter.Bool(true); + + docWriter.EndObject(); + docWriter.EndObject(); - - string data = string(sbDoc.GetString()); - if(mElasticSearch->addProject(id, data)){ - LOG_INFO("Add Project[" << id << "]: Succeeded"); + + //TODO: handle failures in elastic search + if (mElasticSearch->deleteProjectChildren(projectId, string(sbDoc.GetString()))) { + LOG_INFO("Delete Project[" << projectId << "] children inodes and datasets : Succeeded"); + } + + if (mElasticSearch->deleteProject(projectId)) { + LOG_INFO("Delete Project[" << projectId << "]: Succeeded"); } - + + mPDICache->removeProject(projectId); +} + +void ProjectTableTailer::handleAdd(int projectId, NdbRecAttr* value[]) { + string data = createJSONUpSert(value); + if (mElasticSearch->addProject(projectId, data)) { + LOG_INFO("Add Project[" << projectId << "]: Succeeded"); + } + const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection); NdbTransaction* transaction = startNdbTransaction(mNdbConnection); - Recovery::checkpointProject(database, transaction, id); + Recovery::checkpointProject(database, transaction, projectId); transaction->close(); } +void ProjectTableTailer::handleUpdate(int projectId, NdbRecAttr* value[]) { + string data = createJSONUpSert(value); + if (mElasticSearch->addProject(projectId, data)) { + LOG_INFO("Update Project[" << projectId << "]: Succeeded"); + } +} + +string ProjectTableTailer::createJSONUpSert(NdbRecAttr* value[]) { + rapidjson::StringBuffer sbDoc; + rapidjson::Writer docWriter(sbDoc); + docWriter.StartObject(); + + docWriter.String("doc"); + docWriter.StartObject(); + + if (value[PR_INODE_PID]->isNULL() != -1) { + docWriter.String("parent_id"); + docWriter.Int(value[PR_INODE_PID]->int32_value()); + } + + if (value[PR_INODE_NAME]->isNULL() != -1) { + docWriter.String("name"); + docWriter.String(get_string(value[PR_INODE_NAME]).c_str()); + } + + if (value[PR_USER]->isNULL() != -1) { + docWriter.String("user"); + docWriter.String(get_string(value[PR_USER]).c_str()); + } + + if (value[PR_DESC]->isNULL() != -1) { + docWriter.String("description"); + docWriter.String(get_string(value[PR_DESC]).c_str()); + } + + docWriter.EndObject(); + docWriter.String("doc_as_upsert"); + docWriter.Bool(true); + docWriter.EndObject(); + + return string(sbDoc.GetString()); +} + ProjectTableTailer::~ProjectTableTailer() { } diff --git a/src/TableTailer.cpp b/src/TableTailer.cpp index 027ff5f7..e6830e98 100644 --- a/src/TableTailer.cpp +++ b/src/TableTailer.cpp @@ -157,9 +157,7 @@ void TableTailer::waitForEvents() { case NdbDictionary::Event::TE_INSERT: case NdbDictionary::Event::TE_DELETE: case NdbDictionary::Event::TE_UPDATE: - if(correctResult(event, recAttr)){ - handleEvent(event, recAttrPre, recAttr); - } + handleEvent(event, recAttrPre, recAttr); break; default: break; @@ -172,16 +170,6 @@ void TableTailer::waitForEvents() { } -bool TableTailer::correctResult(NdbDictionary::Event::TableEvent event, NdbRecAttr* values[]){ - for(int col=0; colisNULL() != 0 && event != NdbDictionary::Event::TE_DELETE ){ - LOG_ERROR("Error at column " << mTable.mColumnNames[col] << " " << values[col]->isNULL() << getEventName(event)); - return false; - } - } - return true; -} - const char* TableTailer::getEventName(NdbDictionary::Event::TableEvent event) { switch (event) { case NdbDictionary::Event::TE_INSERT: