Skip to content

Commit

Permalink
[HOPSWORKS-1053] [ePipe] Add support for the extended attributes feat…
Browse files Browse the repository at this point in the history
…ure added in HOPS-1312 (#15)
  • Loading branch information
Mahmoud Ismail authored May 7, 2019
1 parent c424f2b commit 6385f05
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 66 deletions.
6 changes: 4 additions & 2 deletions include/FsMutationsDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "tables/INodeTable.h"
#include "tables/DatasetTable.h"
#include "NdbDataReaders.h"

#include "tables/XAttrTable.h"

class FsMutationsDataReader : public NdbDataReader<FsMutationRow, MConn, FSKeys> {
public:
Expand All @@ -39,10 +39,12 @@ class FsMutationsDataReader : public NdbDataReader<FsMutationRow, MConn, FSKeys>
private:
INodeTable mInodesTable;
DatasetTable mDatasetTable;
XAttrTable mXAttrTable;

virtual void processAddedandDeleted(Fmq* data_batch, FSBulk& bulk);

void createJSON(Fmq* pending, INodeMap& inodes, FSBulk& bulk);
void createJSON(Fmq* pending, INodeMap& inodes, XAttrMap& xattrs, FSBulk&
bulk);
};

class FsMutationsDataReaders : public NdbDataReaders<FsMutationRow, MConn, FSKeys>{
Expand Down
8 changes: 6 additions & 2 deletions include/tables/DBTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DBTable : public DBTableBase {
boost::unordered_map<Int64, TableRow> doRead(Ndb* connection, ULSet& ids);

vector<TableRow> doRead(Ndb* connection, string index, AnyMap& anys);

void doDelete(Any any);
void doDelete(AnyMap& any);

Expand Down Expand Up @@ -336,7 +336,11 @@ void DBTable<TableRow>::applyConditionOnOperation(NdbOperation* operation, AnyMa
Int64 pk = boost::any_cast<Int64>(a);
log << colName << " = " << pk << endl;
operation->equal(colName.c_str(), pk);
}else if (a.type() == typeid (string)) {
} else if(a.type() == typeid(Int8)){
Int8 pk = boost::any_cast<Int8>(a);
log << colName << " = " << (int) pk << endl;
operation->equal(colName.c_str(), pk);
} else if (a.type() == typeid (string)) {
string pk = boost::any_cast<string>(a);
log << colName << " = " << pk << endl;
operation->equal(colName.c_str(), get_ndb_varchar(pk,
Expand Down
2 changes: 2 additions & 0 deletions include/tables/DBWatchTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#define DBWATCHTABLE_H
#include "DBTable.h"

#define PRIMARY_INDEX "PRIMARY"

typedef vector<NdbDictionary::Event::TableEvent> TEventVec;

template<typename TableRow>
Expand Down
98 changes: 79 additions & 19 deletions include/tables/FsMutationsLogTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ enum FsOpType {
FsDelete = 1,
FsUpdate = 2,
FsRename = 3,
FsChangeDataset = 4
FsChangeDataset = 4,

XAttrAdd = 10,
XAttrAddAll = 11,
XAttrUpdate = 12,
XAttrDelete = 13
};

inline static const char* FsOpTypeToStr(FsOpType optype) {
inline static const char *FsOpTypeToStr(FsOpType optype) {
switch (optype) {
case FsAdd:
return "Add";
Expand All @@ -48,6 +53,14 @@ inline static const char* FsOpTypeToStr(FsOpType optype) {
return "Rename";
case FsChangeDataset:
return "ChangeDataset";
case XAttrAdd:
return "XAttrAdd";
case XAttrAddAll:
return "XAttrAddAll";
case XAttrUpdate:
return "XAttrUpdate";
case XAttrDelete:
return "XAttrDelete";
default:
return "Unkown";
}
Expand All @@ -68,10 +81,11 @@ struct FsMutationPK {
struct FsMutationRow {
Int64 mDatasetINodeId;
Int64 mInodeId;
Int64 mPartitionId;
Int64 mParentId;
string mInodeName;
int mLogicalTime;

Int64 mPk1;
Int64 mPk2;
string mPk3;
FsOpType mOperation;

ptime mEventCreationTime;
Expand All @@ -80,26 +94,71 @@ struct FsMutationRow {
return FsMutationPK(mDatasetINodeId, mInodeId, mLogicalTime);
}

string getPKStr(){
stringstream stream;
stream << mDatasetINodeId << "-" << mInodeId << "-" << mLogicalTime;
return stream.str();
}

string to_string() {
stringstream stream;
stream << "-------------------------" << endl;
stream << "DatasetId = " << mDatasetINodeId << endl;
stream << "InodeId = " << mInodeId << endl;
stream << "PartitionId = " << mPartitionId << endl;
stream << "ParentId = " << mParentId << endl;
stream << "InodeName = " << mInodeName << endl;
stream << "Pk1 = " << mPk1 << endl;
stream << "Pk2 = " << mPk2 << endl;
stream << "Pk3 = " << mPk3 << endl;
stream << "LogicalTime = " << mLogicalTime << endl;
stream << "Operation = " << FsOpTypeToStr(mOperation) << endl;
stream << "-------------------------" << endl;
return stream.str();
}

bool isINodeOperation(){
return mOperation == FsAdd || mOperation == FsDelete || mOperation ==
FsUpdate || mOperation == FsRename || mOperation == FsChangeDataset;
}

bool requiresReadingINode(){
return mOperation == FsAdd || mOperation == FsUpdate;
}

Int64 getPartitionId(){
return mPk1;
}

Int64 getParentId(){
return mPk2;
}

string getINodeName(){
return mPk3;
}

bool isXAttrOperation(){
return mOperation == XAttrAdd || mOperation == XAttrAddAll || mOperation
== XAttrUpdate || mOperation == XAttrDelete;
}

bool requiresReadingXAttr(){
return mOperation == XAttrAdd || mOperation == XAttrUpdate || mOperation
== XAttrAddAll ;
}

Int8 getNamespace(){
return static_cast<Int8>(mPk2);
}

string getXAttrName(){
return mPk3;
}
};

struct FsMutationRowEqual {

bool operator()(const FsMutationRow &lhs, const FsMutationRow &rhs) const {
return lhs.mDatasetINodeId == rhs.mDatasetINodeId && lhs.mParentId == rhs.mParentId
&& lhs.mInodeName == rhs.mInodeName && lhs.mInodeId == rhs.mInodeId;
return lhs.mDatasetINodeId == rhs.mDatasetINodeId && lhs.mInodeId == rhs
.mInodeId && lhs.mLogicalTime == rhs.mLogicalTime;
}
};

Expand All @@ -108,10 +167,11 @@ struct FsMutationRowHash {
std::size_t operator()(const FsMutationRow &a) const {
std::size_t seed = 0;
boost::hash_combine(seed, a.mDatasetINodeId);
boost::hash_combine(seed, a.mPartitionId);
boost::hash_combine(seed, a.mParentId);
boost::hash_combine(seed, a.mInodeName);
boost::hash_combine(seed, a.mInodeId);
boost::hash_combine(seed, a.mLogicalTime);
boost::hash_combine(seed, a.mPk1);
boost::hash_combine(seed, a.mPk2);
boost::hash_combine(seed, a.mPk3);

return seed;
}
Expand Down Expand Up @@ -145,9 +205,9 @@ class FsMutationsLogTable : public DBWatchTable<FsMutationRow> {
addColumn("dataset_id");
addColumn("inode_id");
addColumn("logical_time");
addColumn("inode_partition_id");
addColumn("inode_parent_id");
addColumn("inode_name");
addColumn("pk1");
addColumn("pk2");
addColumn("pk3");
addColumn("operation");
addRecoveryIndex("logical_time");
addWatchEvent(NdbDictionary::Event::TE_INSERT);
Expand All @@ -159,9 +219,9 @@ class FsMutationsLogTable : public DBWatchTable<FsMutationRow> {
row.mDatasetINodeId = value[0]->int64_value();
row.mInodeId = value[1]->int64_value();
row.mLogicalTime = value[2]->int32_value();
row.mPartitionId = value[3]->int64_value();
row.mParentId = value[4]->int64_value();
row.mInodeName = get_string(value[5]);
row.mPk1 = value[3]->int64_value();
row.mPk2 = value[4]->int64_value();
row.mPk3 = get_string(value[5]);
row.mOperation = static_cast<FsOpType> (value[6]->int8_value());
return row;
}
Expand Down
2 changes: 1 addition & 1 deletion include/tables/HopsworksOpsLogTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class HopsworksOpsLogTable : public DBWatchTable<HopsworksOpRow> {
addColumn("project_id");
addColumn("dataset_id");
addColumn("inode_id");
addRecoveryIndex("PRIMARY");
addRecoveryIndex(PRIMARY_INDEX);
addWatchEvent(NdbDictionary::Event::TE_INSERT);
}

Expand Down
23 changes: 11 additions & 12 deletions include/tables/INodeTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@

#define DOC_TYPE_INODE "inode"

inline static bool requiresINode(FsMutationRow row){
return row.mOperation == FsAdd || row.mOperation == FsUpdate;
}

struct INodeRow {
Int64 mParentId;
string mName;
Expand All @@ -50,7 +46,8 @@ struct INodeRow {
int mLogicalTime;
FsOpType mOperation;
bool mIsDir;

Int8 mNumXAttrs;

bool is_equal(ProjectRow proj){
return proj.mInodeName == mName && proj.mInodeParentId == mParentId
&& proj.mInodePartitionId == mPartitionId;
Expand Down Expand Up @@ -174,13 +171,13 @@ struct INodeRow {
docWriter.StartObject();

docWriter.String("parent_id");
docWriter.Int64(row.mParentId);
docWriter.Int64(row.getParentId());

docWriter.String("partition_id");
docWriter.Int64(row.mPartitionId);
docWriter.Int64(row.getPartitionId());

docWriter.String("name");
docWriter.String(row.mInodeName.c_str());
docWriter.String(row.getINodeName().c_str());

docWriter.String("operation");
docWriter.Int(row.mOperation);
Expand Down Expand Up @@ -271,6 +268,7 @@ class INodeTable : public DBTable<INodeRow> {
addColumn("group_id");
addColumn("logical_time");
addColumn("is_dir");
addColumn("num_xattrs");
}

INodeRow getRow(NdbRecAttr* values[]) {
Expand All @@ -284,6 +282,7 @@ class INodeTable : public DBTable<INodeRow> {
row.mGroupId = values[6]->int32_value();
row.mLogicalTime = values[7]->int32_value();
row.mIsDir = values[8]->int8_value() == 1;
row.mNumXAttrs = values[9]->int8_value();
return row;
}

Expand Down Expand Up @@ -334,15 +333,15 @@ class INodeTable : public DBTable<INodeRow> {
boost::unordered_map<Int64, FsMutationRow> mutationsByInode;
for (Fmq::iterator it = data_batch->begin(); it != data_batch->end(); ++it) {
FsMutationRow row = *it;
if (!requiresINode(row)) {
if (!row.requiresReadingINode() || !row.isINodeOperation()) {
continue;
}
mutationsByInode[row.mInodeId] = row;

AnyMap pk;
pk[0] = row.mParentId;
pk[1] = row.mInodeName;
pk[2] = row.mPartitionId;
pk[0] = row.getParentId();
pk[1] = row.getINodeName();
pk[2] = row.getPartitionId();
anyVec.push_back(pk);
}

Expand Down
2 changes: 1 addition & 1 deletion include/tables/MetadataLogTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class MetadataLogTable : public DBWatchTable<MetadataLogEntry> {
addColumn("meta_pk3");
addColumn("meta_type");
addColumn("meta_op_type");
addRecoveryIndex("PRIMARY");
addRecoveryIndex(PRIMARY_INDEX);
addWatchEvent(NdbDictionary::Event::TE_INSERT);
}

Expand Down
Loading

0 comments on commit 6385f05

Please sign in to comment.