Skip to content

Commit

Permalink
remove the log entries only if insertion in elastic succeeded
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmoud Ismail committed Feb 13, 2017
1 parent aae152c commit 298c220
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 59 deletions.
4 changes: 3 additions & 1 deletion include/ElasticSearch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct Bulk{
vector<ptime> mArrivalTimes;
ptime mStartProcessing;
ptime mEndProcessing;
UISet mPKs;
};

struct ESResponse{
Expand All @@ -62,7 +63,7 @@ class ElasticSearch : public Batcher{
public:
ElasticSearch(string elastic_addr, string index, string proj_type, string ds_type,
string inode_type, int time_to_wait_before_inserting, int bulk_size,
const bool stats);
const bool stats, Ndb* metaConn);

void addBulk(Bulk data);

Expand Down Expand Up @@ -92,6 +93,7 @@ class ElasticSearch : public Batcher{
string mElasticAddr;
string mElasticBulkAddr;

Ndb* mMetaBulkConn;
ConcurrentQueue<Bulk> mQueue;

vector<Bulk>* mToProcess;
Expand Down
16 changes: 8 additions & 8 deletions include/HopsworksOpsLogTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ class HopsworksOpsLogTailer : public TableTailer{
static const WatchTable TABLE;
virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]);

void handleDataset(int opId, OperationType opType, int datasetId, int projectId);
void handleUpsertDataset(int opId, OperationType opType, int datasetId, int projectId);
void handleDeleteDataset(int datasetId, int projectId);
bool handleDataset(int opId, OperationType opType, int datasetId, int projectId);
bool handleUpsertDataset(int opId, OperationType opType, int datasetId, int projectId);
bool handleDeleteDataset(int datasetId, int projectId);
string createDatasetJSONUpSert(int porjectId, NdbRecAttr* value[]);

void handleProject(int projectId, OperationType opType);
void handleDeleteProject(int projectId);
void handleUpsertProject(int projectId, OperationType opType);
bool handleProject(int projectId, OperationType opType);
bool handleDeleteProject(int projectId);
bool handleUpsertProject(int projectId, OperationType opType);
string createProjectJSONUpSert(NdbRecAttr* value[]);

void handleSchema(int schemaId, OperationType opType, int datasetId, int projectId, int inodeId);
void handleSchemaDelete(int schemaId, int datasetId, int projectId, int inodeId);
bool handleSchema(int schemaId, OperationType opType, int datasetId, int projectId, int inodeId);
bool handleSchemaDelete(int schemaId, int datasetId, int projectId, int inodeId);
string createSchemaDeleteJSON(string schema);

void removeLog(int pk);
Expand Down
7 changes: 3 additions & 4 deletions include/MetadataLogTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,13 @@ class MetadataLogTailer : public RCTableTailer<MetadataLogEntry> {
MetadataLogEntry consumeMultiQueue(int queue_id);
MetadataLogEntry consume();

static void removeLogs(const NdbDictionary::Dictionary* database,
NdbTransaction* transaction, MetaQ* batch);
static void removeLogs(Ndb* conn, UISet& pks);

static SchemabasedMq* readSchemaBasedMetadataRows(const NdbDictionary::Dictionary* database,
NdbTransaction* transaction, MetaQ* batch);
NdbTransaction* transaction, MetaQ* batch, UISet& primaryKeys);

static SchemalessMq* readSchemalessMetadataRows(const NdbDictionary::Dictionary* database,
NdbTransaction* transaction, MetaQ* batch);
NdbTransaction* transaction, MetaQ* batch, UISet& primaryKeys);

virtual ~MetadataLogTailer();
private:
Expand Down
11 changes: 8 additions & 3 deletions src/ElasticSearch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/

#include "ElasticSearch.h"
#include "MetadataLogTailer.h"

using namespace Utils;

Expand Down Expand Up @@ -51,9 +52,9 @@ static string getAccString(Accumulator acc){

ElasticSearch::ElasticSearch(string elastic_addr, string index, string proj_type,
string ds_type, string inode_type, int time_to_wait_before_inserting,
int bulk_size, const bool stats) : Batcher(time_to_wait_before_inserting, bulk_size),
int bulk_size, const bool stats, Ndb* metaConn) : Batcher(time_to_wait_before_inserting, bulk_size),
mIndex(index), mProjectType(proj_type), mDatasetType(ds_type), mInodeType(inode_type),
mStats(stats), mToProcessLength(0), mTotalNumOfEventsProcessed(0),
mStats(stats), mMetaBulkConn(metaConn), mToProcessLength(0), mTotalNumOfEventsProcessed(0),
mTotalNumOfBulksProcessed(0), mIsFirstEventArrived(false){

mElasticAddr = "http://" + elastic_addr;
Expand Down Expand Up @@ -94,14 +95,18 @@ void ElasticSearch::processBatch() {
mToProcessLength = 0;
mLock.unlock();

UISet pks;
string batch;
for (vector<Bulk>::iterator it = bulks->begin(); it != bulks->end(); ++it) {
Bulk bulk = *it;
batch.append(bulk.mJSON);
pks.insert(bulk.mPKs.begin(), bulk.mPKs.end());
}

//TODO: handle failures
elasticSearchHttpRequest(HTTP_POST, mElasticBulkAddr, batch);
if(elasticSearchHttpRequest(HTTP_POST, mElasticBulkAddr, batch)){
MetadataLogTailer::removeLogs(mMetaBulkConn, pks);
}

if (mStats) {
stats(bulks);
Expand Down
69 changes: 43 additions & 26 deletions src/HopsworksOpsLogTailer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,31 +76,34 @@ void HopsworksOpsLogTailer::handleEvent(NdbDictionary::Event::TableEvent eventTy
<< "DatasetID = " << project_id << endl
<< "INodeID = " << inode_id);

bool success = false;
switch(op_on){
case Dataset:
handleDataset(op_id, op_type, inode_id, project_id);
success = handleDataset(op_id, op_type, inode_id, project_id);
break;
case Project:
handleProject(op_id, op_type);
success = handleProject(op_id, op_type);
break;
case Schema:
handleSchema(op_id, op_type, dataset_id, project_id, inode_id);
success = handleSchema(op_id, op_type, dataset_id, project_id, inode_id);
break;
}

removeLog(id);
if(success){
removeLog(id);
}
}

void HopsworksOpsLogTailer::handleDataset(int opId, OperationType opType,
bool HopsworksOpsLogTailer::handleDataset(int opId, OperationType opType,
int datasetId, int projectId) {
if(opType == Delete){
handleDeleteDataset(datasetId, projectId);
return handleDeleteDataset(datasetId, projectId);
}else{
handleUpsertDataset(opId, opType, datasetId, projectId);
return handleUpsertDataset(opId, opType, datasetId, projectId);
}
}

void HopsworksOpsLogTailer::handleUpsertDataset(int opId, OperationType opType, int datasetId, int projectId){
bool HopsworksOpsLogTailer::handleUpsertDataset(int opId, OperationType opType, int datasetId, int projectId){
mPDICache->addDatasetToProject(datasetId, projectId);

const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection);
Expand All @@ -120,7 +123,8 @@ void HopsworksOpsLogTailer::handleUpsertDataset(int opId, OperationType opType,
executeTransaction(transaction, NdbTransaction::Commit);

string data = createDatasetJSONUpSert(projectId, row);
if (mElasticSearch->addDataset(projectId, datasetId, data)) {
bool success = mElasticSearch->addDataset(projectId, datasetId, data);
if (success) {
switch(opType){
case Add:
LOG_INFO("Add Dataset[" << datasetId << "]: Succeeded");
Expand All @@ -132,6 +136,7 @@ void HopsworksOpsLogTailer::handleUpsertDataset(int opId, OperationType opType,
}

transaction->close();
return success;
}

string HopsworksOpsLogTailer::createDatasetJSONUpSert(int porjectId, NdbRecAttr* value[]) {
Expand Down Expand Up @@ -173,7 +178,7 @@ string HopsworksOpsLogTailer::createDatasetJSONUpSert(int porjectId, NdbRecAttr*
return string(sbDoc.GetString());
}

void HopsworksOpsLogTailer::handleDeleteDataset(int datasetId, int projectId) {
bool HopsworksOpsLogTailer::handleDeleteDataset(int datasetId, int projectId) {
rapidjson::StringBuffer sbDoc;
rapidjson::Writer<rapidjson::StringBuffer> docWriter(sbDoc);
docWriter.StartObject();
Expand All @@ -192,26 +197,29 @@ void HopsworksOpsLogTailer::handleDeleteDataset(int datasetId, int projectId) {
docWriter.EndObject();

//TODO: handle failures in elastic search
if (mElasticSearch->deleteDatasetChildren(projectId, datasetId, string(sbDoc.GetString()))) {
bool success1 = mElasticSearch->deleteDatasetChildren(projectId, datasetId, string(sbDoc.GetString()));
if (success1) {
LOG_INFO("Delete Dataset[" << datasetId << "] children inodes: Succeeded");
}

if (mElasticSearch->deleteDataset(projectId, datasetId)) {

bool success2 = mElasticSearch->deleteDataset(projectId, datasetId);
if (success2) {
LOG_INFO("Delete Dataset[" << datasetId << "]: Succeeded");
}

mPDICache->removeDataset(datasetId);
return (success1 && success2);
}

void HopsworksOpsLogTailer::handleProject(int projectId, OperationType opType) {
bool HopsworksOpsLogTailer::handleProject(int projectId, OperationType opType) {
if (opType == Delete) {
handleDeleteProject(projectId);
return handleDeleteProject(projectId);
} else {
handleUpsertProject(projectId, opType);
return handleUpsertProject(projectId, opType);
}
}

void HopsworksOpsLogTailer::handleDeleteProject(int projectId) {
bool HopsworksOpsLogTailer::handleDeleteProject(int projectId) {
rapidjson::StringBuffer sbDoc;
rapidjson::Writer<rapidjson::StringBuffer> docWriter(sbDoc);
docWriter.StartObject();
Expand All @@ -230,18 +238,21 @@ void HopsworksOpsLogTailer::handleDeleteProject(int projectId) {
docWriter.EndObject();

//TODO: handle failures in elastic search
if (mElasticSearch->deleteProjectChildren(projectId, string(sbDoc.GetString()))) {
bool success1 = mElasticSearch->deleteProjectChildren(projectId, string(sbDoc.GetString()));
if (success1) {
LOG_INFO("Delete Project[" << projectId << "] children inodes and datasets : Succeeded");
}

if (mElasticSearch->deleteProject(projectId)) {

bool success2 = mElasticSearch->deleteProject(projectId);
if (success2) {
LOG_INFO("Delete Project[" << projectId << "]: Succeeded");
}

mPDICache->removeProject(projectId);
return (success1 && success2);
}

void HopsworksOpsLogTailer::handleUpsertProject(int projectId, OperationType opType){
bool HopsworksOpsLogTailer::handleUpsertProject(int projectId, OperationType opType){

const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection);
const NdbDictionary::Table* table = getTable(database, PR);
Expand All @@ -260,7 +271,8 @@ void HopsworksOpsLogTailer::handleUpsertProject(int projectId, OperationType opT
executeTransaction(transaction, NdbTransaction::Commit);

string data = createProjectJSONUpSert(row);
if (mElasticSearch->addProject(projectId, data)) {
bool success = mElasticSearch->addProject(projectId, data);
if (success) {
switch(opType){
case Add:
LOG_INFO("Add Project[" << projectId << "]: Succeeded");
Expand All @@ -272,6 +284,7 @@ void HopsworksOpsLogTailer::handleUpsertProject(int projectId, OperationType opT
}

transaction->close();
return success;

}

Expand Down Expand Up @@ -323,27 +336,31 @@ void HopsworksOpsLogTailer::removeLog(int pk){
mNdbConnection->closeTransaction(transaction);
}

void HopsworksOpsLogTailer::handleSchema(int schemaId, OperationType opType, int datasetId, int projectId, int inodeId) {
bool HopsworksOpsLogTailer::handleSchema(int schemaId, OperationType opType, int datasetId, int projectId, int inodeId) {
if(opType == Delete){
handleSchemaDelete(schemaId, datasetId, projectId, inodeId);
return handleSchemaDelete(schemaId, datasetId, projectId, inodeId);
}else{
LOG_ERROR("Unsupported Schema Operation [" << Utils::OperationTypeToStr(opType) << "]. Only Delete is supported.");
return true;
}
}

void HopsworksOpsLogTailer::handleSchemaDelete(int schemaId, int datasetId, int projectId, int inodeId) {
bool HopsworksOpsLogTailer::handleSchemaDelete(int schemaId, int datasetId, int projectId, int inodeId) {
const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection);
NdbTransaction* transaction = startNdbTransaction(mNdbConnection);
mSchemaCache->refresTemplate(database, transaction, schemaId);
boost::optional<string> schema_ptr = mSchemaCache->getTemplate(schemaId);
if(schema_ptr){
string schema = *schema_ptr;
string json = createSchemaDeleteJSON(schema);
if (mElasticSearch->deleteSchemaForINode(projectId, datasetId, inodeId, json)) {
bool success = mElasticSearch->deleteSchemaForINode(projectId, datasetId, inodeId, json);
if (success) {
LOG_INFO("Delete Schema/Template [" << schemaId << ", " << schema << "] for INode [" << inodeId<< "] : Succeeded");
}
return success;
}else{
LOG_WARN("Schema/Template ["<< schemaId << "] does not exist");
return true;
}
}

Expand Down
20 changes: 13 additions & 7 deletions src/MetadataLogTailer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,25 @@ MetadataLogEntry MetadataLogTailer::consume() {
return res;
}

void MetadataLogTailer::removeLogs(const NdbDictionary::Dictionary* database, NdbTransaction* transaction, MetaQ* batch) {
void MetadataLogTailer::removeLogs(Ndb* conn, UISet& pks) {
const NdbDictionary::Dictionary* database = getDatabase(conn);
NdbTransaction* transaction = startNdbTransaction(conn);
const NdbDictionary::Table* log_table = getTable(database, TABLE.mTableName);
for(MetaQ::iterator it=batch->begin(); it != batch->end() ; ++it){
MetadataLogEntry row = *it;
for(UISet::iterator it=pks.begin(); it != pks.end() ; ++it){
int id = *it;
NdbOperation* op = getNdbOperation(transaction, log_table);

op->deleteTuple();
op->equal(_metalog_cols[0].c_str(), row.mId);
op->equal(_metalog_cols[0].c_str(), id);

LOG_TRACE("Delete log row: " << row.mId << " -- " << row.mMetaPK.to_string());
LOG_TRACE("Delete log row: " << id);
}
executeTransaction(transaction, NdbTransaction::Commit);
conn->closeTransaction(transaction);
}

SchemabasedMq* MetadataLogTailer::readSchemaBasedMetadataRows(const NdbDictionary::Dictionary* database,
NdbTransaction* transaction, MetaQ* batch) {
NdbTransaction* transaction, MetaQ* batch, UISet& primaryKeys) {
UMetadataKeyRowMap rows = readMetadataRows(database, transaction, SB_METADATA, batch,
SB_METADATA_COLS, METADATA_NO_COLS, METADATA_PK1, METADATA_PK2, METADATA_PK3);

Expand All @@ -144,6 +148,7 @@ SchemabasedMq* MetadataLogTailer::readSchemaBasedMetadataRows(const NdbDictionar
SchemabasedMq* res = new SchemabasedMq();
for (MetaQ::iterator it = batch->begin(); it != batch->end(); ++it) {
MetadataLogEntry ml = *it;
primaryKeys.insert(ml.mId);
if(ml.mMetaOpType == Delete){
res->push_back(SchemabasedMetadataEntry(ml));
continue;
Expand All @@ -164,7 +169,7 @@ SchemabasedMq* MetadataLogTailer::readSchemaBasedMetadataRows(const NdbDictionar
}

SchemalessMq* MetadataLogTailer::readSchemalessMetadataRows(const NdbDictionary::Dictionary* database,
NdbTransaction* transaction, MetaQ* batch) {
NdbTransaction* transaction, MetaQ* batch, UISet& primaryKeys) {
UMetadataKeyRowMap rows = readMetadataRows(database, transaction, NS_METADATA, batch,
NS_METADATA_COLS, METADATA_NO_COLS, METADATA_PK1, METADATA_PK2, METADATA_PK3);

Expand All @@ -173,6 +178,7 @@ SchemalessMq* MetadataLogTailer::readSchemalessMetadataRows(const NdbDictionary:
SchemalessMq* res = new SchemalessMq();
for (MetaQ::iterator it = batch->begin(); it != batch->end(); ++it) {
MetadataLogEntry ml = *it;
primaryKeys.insert(ml.mId);
if(ml.mMetaOpType == Delete){
res->push_back(SchemalessMetadataEntry(ml));
continue;
Expand Down
5 changes: 4 additions & 1 deletion src/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ void Notifier::setup() {
mPDICache = new ProjectDatasetINodeCache(mLRUCap);
mSchemaCache = new SchemaCache(mLRUCap);

Ndb* metdata_connection = create_ndb_connection(mMetaDatabaseName);

mElasticSearch = new ElasticSearch(mElasticAddr, mElasticIndex, mElastticProjectType,
mElasticDatasetType, mElasticInodeType, mElasticIssueTime, mElasticBatchsize, mStats);
mElasticDatasetType, mElasticInodeType, mElasticIssueTime, mElasticBatchsize,
mStats, metdata_connection);

Ndb* mutations_tailer_connection = create_ndb_connection(mDatabaseName);
mFsMutationsTableTailer = new FsMutationsTableTailer(mutations_tailer_connection, mPollMaxTimeToWait, mPDICache);
Expand Down
6 changes: 2 additions & 4 deletions src/SchemabasedMetadataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void SchemabasedMetadataReader::processAddedandDeleted(MConn connection, MetaQ*
const NdbDictionary::Dictionary* metaDatabase = getDatabase(metaConn);
NdbTransaction* metaTransaction = startNdbTransaction(metaConn);

SchemabasedMq* data_queue = MetadataLogTailer::readSchemaBasedMetadataRows(metaDatabase, metaTransaction, data_batch);
SchemabasedMq* data_queue = MetadataLogTailer::readSchemaBasedMetadataRows(metaDatabase,
metaTransaction, data_batch, bulk.mPKs);

UIRowMap tuples = readMetadataColumns(metaDatabase, metaTransaction, data_queue);

Expand All @@ -52,9 +53,6 @@ void SchemabasedMetadataReader::processAddedandDeleted(MConn connection, MetaQ*

createJSON(tuples, data_queue, bulk);

MetadataLogTailer::removeLogs(metaDatabase, metaTransaction, data_batch);
executeTransaction(metaTransaction, NdbTransaction::Commit);

metaConn->closeTransaction(metaTransaction);
}

Expand Down
Loading

0 comments on commit 298c220

Please sign in to comment.