Skip to content

Commit

Permalink
Fix update for Dataset and Project table tailers
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmoud Ismail committed Jul 6, 2016
1 parent ae20f31 commit 5b6359a
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 141 deletions.
4 changes: 4 additions & 0 deletions include/DatasetTableTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class DatasetTableTailer : public TableTailer{
private:
static const WatchTable TABLE;
virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]);
void handleDelete(int datasetId, int projectId);
void handleAdd(int datasetId, int projectId, NdbRecAttr* value[]);
void handleUpdate(NdbRecAttr* value[]);
string createJSONUpSert(int projectId, NdbRecAttr* value[]);

ElasticSearch* mElasticSearch;
ProjectDatasetINodeCache* mPDICache;
Expand Down
4 changes: 4 additions & 0 deletions include/ProjectTableTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class ProjectTableTailer : public TableTailer{
static const WatchTable TABLE;

virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]);
void handleDelete(int projectId);
void handleAdd(int projectId, NdbRecAttr* value[]);
void handleUpdate(int projectId, NdbRecAttr* value[]);
string createJSONUpSert(NdbRecAttr* value[]);

ElasticSearch* mElasticSearch;
ProjectDatasetINodeCache* mPDICache;
Expand Down
1 change: 0 additions & 1 deletion include/TableTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class TableTailer {
void removeListenerEvent();
void waitForEvents();
void run();
bool correctResult(NdbDictionary::Event::TableEvent event, NdbRecAttr* values[]);
const char* getEventName(NdbDictionary::Event::TableEvent event);

bool mStarted;
Expand Down
208 changes: 138 additions & 70 deletions src/DatasetTableTailer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
using namespace Utils::NdbC;

const string _dataset_table= "dataset";
const int _dataset_noCols= 8;
const int _dataset_noCols= 9;
const string _dataset_cols[_dataset_noCols]=
{"inode_id",
"inode_pid",
Expand All @@ -36,7 +36,8 @@ const string _dataset_cols[_dataset_noCols]=
"description",
"public_ds",
"searchable",
"shared"
"shared",
"id"
};

const int _dataset_noEvents = 3;
Expand All @@ -48,100 +49,167 @@ const NdbDictionary::Event::TableEvent _dataset_events[_dataset_noEvents] =

const WatchTable DatasetTableTailer::TABLE = {_dataset_table, _dataset_cols, _dataset_noCols , _dataset_events, _dataset_noEvents, "inode_id", "inode_id"};

const int DS_INODE_ID = 0;
const int DS_INODE_PID = 1;
const int DS_INODE_NAME = 2;
const int DS_PROJ_ID = 3;
const int DS_DESC = 4;
const int DS_PUBLIC = 5;
const int DS_SEARCH = 6;
const int DS_SHARED = 7;
const int DS_ID_PK = 8;

DatasetTableTailer::DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait,
ElasticSearch* elastic,ProjectDatasetINodeCache* cache)
: TableTailer(ndb, TABLE, poll_maxTimeToWait), mElasticSearch(elastic), mPDICache(cache){
}

void DatasetTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]) {
int id = value[0]->int32_value();
int projectId = value[3]->int32_value();
bool originalDS = value[7]->int8_value() == 0;
if(!originalDS){
LOG_DEBUG("Ignore shared Dataset [" << id << "] in Project [" << projectId << "]");
int datasetId = value[DS_INODE_ID]->int32_value();
int projectId = value[DS_PROJ_ID]->int32_value();
bool originalDS = value[DS_SHARED]->int8_value() == 0;

if (!originalDS && value[DS_SHARED]->isNULL() != -1) {
LOG_DEBUG("Ignore shared Dataset [" << datasetId << "] in Project [" << projectId << "]");
return;
}

switch (eventType) {
case NdbDictionary::Event::TE_INSERT:
handleAdd(datasetId, projectId, value);
break;
case NdbDictionary::Event::TE_DELETE:
handleDelete(datasetId, projectId);
break;
case NdbDictionary::Event::TE_UPDATE:
handleUpdate(value);
break;
}
}

void DatasetTableTailer::handleAdd(int datasetId, int projectId, NdbRecAttr* value[]) {
mPDICache->addDatasetToProject(datasetId, projectId);

if(eventType == NdbDictionary::Event::TE_DELETE){

rapidjson::StringBuffer sbDoc;
rapidjson::Writer<rapidjson::StringBuffer> docWriter(sbDoc);
docWriter.StartObject();
docWriter.String("query");
string data = createJSONUpSert(projectId, value);
if (mElasticSearch->addDataset(projectId, datasetId, data)) {
LOG_INFO("Add Dataset[" << datasetId << "]: Succeeded");
}

const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection);
NdbTransaction* transaction = startNdbTransaction(mNdbConnection);
Recovery::checkpointDataset(database, transaction, datasetId);
transaction->close();
}

void DatasetTableTailer::handleDelete(int datasetId, int projectId) {
rapidjson::StringBuffer sbDoc;
rapidjson::Writer<rapidjson::StringBuffer> docWriter(sbDoc);
docWriter.StartObject();
docWriter.String("query");

docWriter.StartObject();

docWriter.StartObject();
docWriter.String("match");
docWriter.StartObject();
docWriter.String("dataset_id");
docWriter.Int(datasetId);
docWriter.EndObject();

docWriter.String("match");
docWriter.StartObject();
docWriter.String("dataset_id");
docWriter.Int(id);
docWriter.EndObject();
docWriter.EndObject();

docWriter.EndObject();
docWriter.EndObject();

//TODO: handle failures in elastic search
if (mElasticSearch->deleteDatasetChildren(projectId, datasetId, string(sbDoc.GetString()))) {
LOG_INFO("Delete Dataset[" << datasetId << "] children inodes: Succeeded");
}

if (mElasticSearch->deleteDataset(projectId, datasetId)) {
LOG_INFO("Delete Dataset[" << datasetId << "]: Succeeded");
}

docWriter.EndObject();
mPDICache->removeDataset(datasetId);
}

void DatasetTableTailer::handleUpdate(NdbRecAttr* value[]) {
int datasetPK = value[DS_ID_PK]->int32_value();
int datasetId = -1;
int projectId = -1;
if(value[DS_INODE_ID]->isNULL() == -1){
const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection);
const NdbDictionary::Table* table = getTable(database, TABLE.mTableName);
NdbTransaction* transaction = startNdbTransaction(mNdbConnection);
NdbOperation* op = getNdbOperation(transaction, table);

//TODO: handle failures in elastic search
if(mElasticSearch->deleteDatasetChildren(projectId, id, string(sbDoc.GetString()))){
LOG_INFO("Delete Dataset[" << id << "] children inodes: Succeeded");
}
op->readTuple(NdbOperation::LM_CommittedRead);

if(mElasticSearch->deleteDataset(projectId, id)){
LOG_INFO("Delete Dataset[" << id << "]: Succeeded");
}

mPDICache->removeDataset(id);
op->equal(_dataset_cols[8].c_str(), datasetPK);

return;
NdbRecAttr* datasetIdCol = getNdbOperationValue(op, _dataset_cols[DS_INODE_ID]);
NdbRecAttr* projectIdCol = getNdbOperationValue(op, _dataset_cols[DS_PROJ_ID]);

executeTransaction(transaction, NdbTransaction::Commit);
datasetId = datasetIdCol->int32_value();
projectId = projectIdCol->int32_value();

transaction->close();
}

mPDICache->addDatasetToProject(id, projectId);
if(datasetId == -1 || projectId == -1){
LOG_ERROR("Couldn't resolve projectId[" << projectId << "] or datasetId[" << datasetId << "]");
return;
}


string data = createJSONUpSert(projectId, value);
if (mElasticSearch->addDataset(projectId, datasetId, data)) {
LOG_INFO("Update Dataset[" << datasetId << "]: Succeeded");
}
}

string DatasetTableTailer::createJSONUpSert(int porjectId, NdbRecAttr* value[]) {
rapidjson::StringBuffer sbDoc;
rapidjson::Writer<rapidjson::StringBuffer> docWriter(sbDoc);
docWriter.StartObject();

docWriter.String("doc");
docWriter.StartObject();

docWriter.String("parent_id");
docWriter.Int(value[1]->int32_value());

docWriter.String("name");
docWriter.String(get_string(value[2]).c_str());


if (value[DS_INODE_PID]->isNULL() != -1) {
docWriter.String("parent_id");
docWriter.Int(value[DS_INODE_PID]->int32_value());
}

if (value[DS_INODE_NAME]->isNULL() != -1) {
docWriter.String("name");
docWriter.String(get_string(value[DS_INODE_NAME]).c_str());
}

docWriter.String("project_id");
docWriter.Int(projectId);

docWriter.String("description");
docWriter.String(get_string(value[4]).c_str());

bool public_ds = value[5]->int8_value() == 1;

docWriter.String("public_ds");
docWriter.Bool(public_ds);

bool searchable = value[6]->int8_value() == 1;

docWriter.String("searchable");
docWriter.Bool(searchable);

docWriter.Int(porjectId);

if (value[DS_DESC]->isNULL() != -1) {
docWriter.String("description");
docWriter.String(get_string(value[DS_DESC]).c_str());
}

if (value[DS_PUBLIC]->isNULL() != -1) {
bool public_ds = value[DS_PUBLIC]->int8_value() == 1;
docWriter.String("public_ds");
docWriter.Bool(public_ds);
}

if (value[DS_SEARCH]->isNULL() != -1) {
bool searchable = value[DS_SEARCH]->int8_value() == 1;
docWriter.String("searchable");
docWriter.Bool(searchable);
}

docWriter.EndObject();
docWriter.String("doc_as_upsert");
docWriter.Bool(true);
docWriter.EndObject();

string data = string(sbDoc.GetString());
if(mElasticSearch->addDataset(projectId, id, data)){
LOG_INFO("Add Dataset[" << id << "]: Succeeded");
}

const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection);
NdbTransaction* transaction = startNdbTransaction(mNdbConnection);
Recovery::checkpointDataset(database, transaction, id);
transaction->close();
return string(sbDoc.GetString());
}

void DatasetTableTailer::updateProjectIds(const NdbDictionary::Dictionary* database,
Expand All @@ -155,11 +223,11 @@ void DatasetTableTailer::updateProjectIds(const NdbDictionary::Dictionary* datab
NdbIndexScanOperation* op = getNdbIndexScanOperation(transaction, index);
op->readTuples(NdbOperation::LM_CommittedRead);

op->equal(_dataset_cols[0].c_str(), *it);
op->equal(_dataset_cols[DS_INODE_ID].c_str(), *it);

NdbRecAttr* id_col = getNdbOperationValue(op, _dataset_cols[0]);
NdbRecAttr* proj_id_col = getNdbOperationValue(op, _dataset_cols[3]);
NdbRecAttr* shared_col = getNdbOperationValue(op, _dataset_cols[7]);
NdbRecAttr* id_col = getNdbOperationValue(op, _dataset_cols[DS_INODE_ID]);
NdbRecAttr* proj_id_col = getNdbOperationValue(op, _dataset_cols[DS_PROJ_ID]);
NdbRecAttr* shared_col = getNdbOperationValue(op, _dataset_cols[DS_SHARED]);
rows[*it].push_back(id_col);
rows[*it].push_back(proj_id_col);
rows[*it].push_back(shared_col);
Expand Down
Loading

0 comments on commit 5b6359a

Please sign in to comment.