Skip to content

Commit

Permalink
recovery bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmoud Ismail committed Oct 31, 2017
1 parent 298c220 commit afa2c76
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 9 deletions.
3 changes: 1 addition & 2 deletions include/TableTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ struct WatchTable{
const NdbDictionary::Event::TableEvent* mWatchEvents;
const int mNoEvents;
const string mRecoveryIndex;
const string mRecoveryColumn;
};

class TableTailer {
Expand All @@ -50,7 +49,7 @@ class TableTailer {
Ndb* mNdbConnection;

private:
void recover(int recoverFromId);
void recover();
void createListenerEvent();
void removeListenerEvent();
void waitForEvents();
Expand Down
2 changes: 1 addition & 1 deletion src/FsMutationsTableTailer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const string _mutation_cols[_mutation_noCols]=
const int _mutation_noEvents = 1;
const NdbDictionary::Event::TableEvent _mutation_events[_mutation_noEvents] = { NdbDictionary::Event::TE_INSERT };

const WatchTable FsMutationsTableTailer::TABLE = {_mutation_table, _mutation_cols, _mutation_noCols , _mutation_events, _mutation_noEvents, _mutation_cols[2], _mutation_cols[2]};
const WatchTable FsMutationsTableTailer::TABLE = {_mutation_table, _mutation_cols, _mutation_noCols , _mutation_events, _mutation_noEvents, _mutation_cols[2]};

//const static ptime EPOCH_TIME(boost::gregorian::date(1970,1,1));

Expand Down
2 changes: 1 addition & 1 deletion src/HopsworksOpsLogTailer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const string _opslog_cols[_opslog_noCols]=
const int _opslog_noEvents = 1;
const NdbDictionary::Event::TableEvent _opslog_events[_opslog_noEvents] = { NdbDictionary::Event::TE_INSERT};

const WatchTable HopsworksOpsLogTailer::TABLE = {_opslog_table, _opslog_cols, _opslog_noCols , _opslog_events, _opslog_noEvents, "PRIMARY", "id"};
const WatchTable HopsworksOpsLogTailer::TABLE = {_opslog_table, _opslog_cols, _opslog_noCols , _opslog_events, _opslog_noEvents, "PRIMARY"};

const int OPS_ID_PK = 0;
const int OPS_OP_ID = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/MetadataLogTailer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const string _metalog_cols[_metalog_noCols]=
const int _metalog_noEvents = 1;
const NdbDictionary::Event::TableEvent _metalog_events[_metalog_noEvents] = { NdbDictionary::Event::TE_INSERT };

const WatchTable MetadataLogTailer::TABLE = {_metalog_table, _metalog_cols, _metalog_noCols , _metalog_events, _metalog_noEvents, "PRIMARY", _metalog_cols[0]};
const WatchTable MetadataLogTailer::TABLE = {_metalog_table, _metalog_cols, _metalog_noCols , _metalog_events, _metalog_noEvents, "PRIMARY"};


//Common
Expand Down
4 changes: 3 additions & 1 deletion src/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ void Notifier::start() {
mFsMutationsBatcher->start();
mFsMutationsTableTailer->start(mRecovery);

mMetadataLogTailer->start(mRecovery);
if(mMetadataType == Schemabased || mMetadataType == Schemaless || mMetadataType == Both){
mMetadataLogTailer->start(mRecovery);
}

if (mMetadataType == Schemabased || mMetadataType == Both) {
mSchemabasedMetadataBatcher->start();
Expand Down
5 changes: 2 additions & 3 deletions src/TableTailer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,22 @@ void TableTailer::start(bool recovery) {

if(recovery){
LOG_INFO("start with recovery for " << mTable.mTableName);
recover(0);
recover();
}

createListenerEvent();
mThread = boost::thread(&TableTailer::run, this);
mStarted = true;
}

void TableTailer::recover(int recoverFromId) {
void TableTailer::recover() {
const NdbDictionary::Dictionary* database = getDatabase(mNdbConnection);
const NdbDictionary::Index* index = getIndex(database, mTable.mTableName, mTable.mRecoveryIndex);

NdbTransaction* transaction = startNdbTransaction(mNdbConnection);
NdbIndexScanOperation* scanOp = getNdbIndexScanOperation(transaction, index);

scanOp->readTuples(NdbOperation::LM_CommittedRead, NdbScanOperation::SF_OrderBy);
scanOp->setBound(mTable.mRecoveryColumn.c_str(), NdbIndexScanOperation::BoundLT, (char*) & recoverFromId);

NdbRecAttr * row[mTable.mNoColumns];

Expand Down

0 comments on commit afa2c76

Please sign in to comment.