From 6385f052a8a8b6f69a6e7d77c4103057fa71e433 Mon Sep 17 00:00:00 2001 From: Mahmoud Ismail Date: Tue, 7 May 2019 09:52:56 +0200 Subject: [PATCH] [HOPSWORKS-1053] [ePipe] Add support for the extended attributes feature added in HOPS-1312 (#15) --- include/FsMutationsDataReader.h | 6 +- include/tables/DBTable.h | 8 +- include/tables/DBWatchTable.h | 2 + include/tables/FsMutationsLogTable.h | 98 ++++++++-- include/tables/HopsworksOpsLogTable.h | 2 +- include/tables/INodeTable.h | 23 ++- include/tables/MetadataLogTable.h | 2 +- include/tables/XAttrTable.h | 246 ++++++++++++++++++++++++++ src/FsMutationsDataReader.cpp | 97 +++++++--- src/Reindexer.cpp | 33 ++++ 10 files changed, 451 insertions(+), 66 deletions(-) create mode 100644 include/tables/XAttrTable.h diff --git a/include/FsMutationsDataReader.h b/include/FsMutationsDataReader.h index e37de538..27a92b37 100644 --- a/include/FsMutationsDataReader.h +++ b/include/FsMutationsDataReader.h @@ -30,7 +30,7 @@ #include "tables/INodeTable.h" #include "tables/DatasetTable.h" #include "NdbDataReaders.h" - +#include "tables/XAttrTable.h" class FsMutationsDataReader : public NdbDataReader { public: @@ -39,10 +39,12 @@ class FsMutationsDataReader : public NdbDataReader private: INodeTable mInodesTable; DatasetTable mDatasetTable; + XAttrTable mXAttrTable; virtual void processAddedandDeleted(Fmq* data_batch, FSBulk& bulk); - void createJSON(Fmq* pending, INodeMap& inodes, FSBulk& bulk); + void createJSON(Fmq* pending, INodeMap& inodes, XAttrMap& xattrs, FSBulk& + bulk); }; class FsMutationsDataReaders : public NdbDataReaders{ diff --git a/include/tables/DBTable.h b/include/tables/DBTable.h index f381f76f..b6885740 100644 --- a/include/tables/DBTable.h +++ b/include/tables/DBTable.h @@ -74,7 +74,7 @@ class DBTable : public DBTableBase { boost::unordered_map doRead(Ndb* connection, ULSet& ids); vector doRead(Ndb* connection, string index, AnyMap& anys); - + void doDelete(Any any); void doDelete(AnyMap& any); @@ -336,7 +336,11 @@ void DBTable::applyConditionOnOperation(NdbOperation* operation, AnyMa Int64 pk = boost::any_cast(a); log << colName << " = " << pk << endl; operation->equal(colName.c_str(), pk); - }else if (a.type() == typeid (string)) { + } else if(a.type() == typeid(Int8)){ + Int8 pk = boost::any_cast(a); + log << colName << " = " << (int) pk << endl; + operation->equal(colName.c_str(), pk); + } else if (a.type() == typeid (string)) { string pk = boost::any_cast(a); log << colName << " = " << pk << endl; operation->equal(colName.c_str(), get_ndb_varchar(pk, diff --git a/include/tables/DBWatchTable.h b/include/tables/DBWatchTable.h index 86f0c35a..c78a0c7d 100644 --- a/include/tables/DBWatchTable.h +++ b/include/tables/DBWatchTable.h @@ -25,6 +25,8 @@ #define DBWATCHTABLE_H #include "DBTable.h" +#define PRIMARY_INDEX "PRIMARY" + typedef vector TEventVec; template diff --git a/include/tables/FsMutationsLogTable.h b/include/tables/FsMutationsLogTable.h index 191b3345..95ec917a 100644 --- a/include/tables/FsMutationsLogTable.h +++ b/include/tables/FsMutationsLogTable.h @@ -33,10 +33,15 @@ enum FsOpType { FsDelete = 1, FsUpdate = 2, FsRename = 3, - FsChangeDataset = 4 + FsChangeDataset = 4, + + XAttrAdd = 10, + XAttrAddAll = 11, + XAttrUpdate = 12, + XAttrDelete = 13 }; -inline static const char* FsOpTypeToStr(FsOpType optype) { +inline static const char *FsOpTypeToStr(FsOpType optype) { switch (optype) { case FsAdd: return "Add"; @@ -48,6 +53,14 @@ inline static const char* FsOpTypeToStr(FsOpType optype) { return "Rename"; case FsChangeDataset: return "ChangeDataset"; + case XAttrAdd: + return "XAttrAdd"; + case XAttrAddAll: + return "XAttrAddAll"; + case XAttrUpdate: + return "XAttrUpdate"; + case XAttrDelete: + return "XAttrDelete"; default: return "Unkown"; } @@ -68,10 +81,11 @@ struct FsMutationPK { struct FsMutationRow { Int64 mDatasetINodeId; Int64 mInodeId; - Int64 mPartitionId; - Int64 mParentId; - string mInodeName; int mLogicalTime; + + Int64 mPk1; + Int64 mPk2; + string mPk3; FsOpType mOperation; ptime mEventCreationTime; @@ -80,26 +94,71 @@ struct FsMutationRow { return FsMutationPK(mDatasetINodeId, mInodeId, mLogicalTime); } + string getPKStr(){ + stringstream stream; + stream << mDatasetINodeId << "-" << mInodeId << "-" << mLogicalTime; + return stream.str(); + } + string to_string() { stringstream stream; stream << "-------------------------" << endl; stream << "DatasetId = " << mDatasetINodeId << endl; stream << "InodeId = " << mInodeId << endl; - stream << "PartitionId = " << mPartitionId << endl; - stream << "ParentId = " << mParentId << endl; - stream << "InodeName = " << mInodeName << endl; + stream << "Pk1 = " << mPk1 << endl; + stream << "Pk2 = " << mPk2 << endl; + stream << "Pk3 = " << mPk3 << endl; stream << "LogicalTime = " << mLogicalTime << endl; stream << "Operation = " << FsOpTypeToStr(mOperation) << endl; stream << "-------------------------" << endl; return stream.str(); } + + bool isINodeOperation(){ + return mOperation == FsAdd || mOperation == FsDelete || mOperation == + FsUpdate || mOperation == FsRename || mOperation == FsChangeDataset; + } + + bool requiresReadingINode(){ + return mOperation == FsAdd || mOperation == FsUpdate; + } + + Int64 getPartitionId(){ + return mPk1; + } + + Int64 getParentId(){ + return mPk2; + } + + string getINodeName(){ + return mPk3; + } + + bool isXAttrOperation(){ + return mOperation == XAttrAdd || mOperation == XAttrAddAll || mOperation + == XAttrUpdate || mOperation == XAttrDelete; + } + + bool requiresReadingXAttr(){ + return mOperation == XAttrAdd || mOperation == XAttrUpdate || mOperation + == XAttrAddAll ; + } + + Int8 getNamespace(){ + return static_cast(mPk2); + } + + string getXAttrName(){ + return mPk3; + } }; struct FsMutationRowEqual { bool operator()(const FsMutationRow &lhs, const FsMutationRow &rhs) const { - return lhs.mDatasetINodeId == rhs.mDatasetINodeId && lhs.mParentId == rhs.mParentId - && lhs.mInodeName == rhs.mInodeName && lhs.mInodeId == rhs.mInodeId; + return lhs.mDatasetINodeId == rhs.mDatasetINodeId && lhs.mInodeId == rhs + .mInodeId && lhs.mLogicalTime == rhs.mLogicalTime; } }; @@ -108,10 +167,11 @@ struct FsMutationRowHash { std::size_t operator()(const FsMutationRow &a) const { std::size_t seed = 0; boost::hash_combine(seed, a.mDatasetINodeId); - boost::hash_combine(seed, a.mPartitionId); - boost::hash_combine(seed, a.mParentId); - boost::hash_combine(seed, a.mInodeName); boost::hash_combine(seed, a.mInodeId); + boost::hash_combine(seed, a.mLogicalTime); + boost::hash_combine(seed, a.mPk1); + boost::hash_combine(seed, a.mPk2); + boost::hash_combine(seed, a.mPk3); return seed; } @@ -145,9 +205,9 @@ class FsMutationsLogTable : public DBWatchTable { addColumn("dataset_id"); addColumn("inode_id"); addColumn("logical_time"); - addColumn("inode_partition_id"); - addColumn("inode_parent_id"); - addColumn("inode_name"); + addColumn("pk1"); + addColumn("pk2"); + addColumn("pk3"); addColumn("operation"); addRecoveryIndex("logical_time"); addWatchEvent(NdbDictionary::Event::TE_INSERT); @@ -159,9 +219,9 @@ class FsMutationsLogTable : public DBWatchTable { row.mDatasetINodeId = value[0]->int64_value(); row.mInodeId = value[1]->int64_value(); row.mLogicalTime = value[2]->int32_value(); - row.mPartitionId = value[3]->int64_value(); - row.mParentId = value[4]->int64_value(); - row.mInodeName = get_string(value[5]); + row.mPk1 = value[3]->int64_value(); + row.mPk2 = value[4]->int64_value(); + row.mPk3 = get_string(value[5]); row.mOperation = static_cast (value[6]->int8_value()); return row; } diff --git a/include/tables/HopsworksOpsLogTable.h b/include/tables/HopsworksOpsLogTable.h index 90b69182..45ba70cb 100644 --- a/include/tables/HopsworksOpsLogTable.h +++ b/include/tables/HopsworksOpsLogTable.h @@ -98,7 +98,7 @@ class HopsworksOpsLogTable : public DBWatchTable { addColumn("project_id"); addColumn("dataset_id"); addColumn("inode_id"); - addRecoveryIndex("PRIMARY"); + addRecoveryIndex(PRIMARY_INDEX); addWatchEvent(NdbDictionary::Event::TE_INSERT); } diff --git a/include/tables/INodeTable.h b/include/tables/INodeTable.h index 21563040..526db587 100644 --- a/include/tables/INodeTable.h +++ b/include/tables/INodeTable.h @@ -32,10 +32,6 @@ #define DOC_TYPE_INODE "inode" -inline static bool requiresINode(FsMutationRow row){ - return row.mOperation == FsAdd || row.mOperation == FsUpdate; -} - struct INodeRow { Int64 mParentId; string mName; @@ -50,7 +46,8 @@ struct INodeRow { int mLogicalTime; FsOpType mOperation; bool mIsDir; - + Int8 mNumXAttrs; + bool is_equal(ProjectRow proj){ return proj.mInodeName == mName && proj.mInodeParentId == mParentId && proj.mInodePartitionId == mPartitionId; @@ -174,13 +171,13 @@ struct INodeRow { docWriter.StartObject(); docWriter.String("parent_id"); - docWriter.Int64(row.mParentId); + docWriter.Int64(row.getParentId()); docWriter.String("partition_id"); - docWriter.Int64(row.mPartitionId); + docWriter.Int64(row.getPartitionId()); docWriter.String("name"); - docWriter.String(row.mInodeName.c_str()); + docWriter.String(row.getINodeName().c_str()); docWriter.String("operation"); docWriter.Int(row.mOperation); @@ -271,6 +268,7 @@ class INodeTable : public DBTable { addColumn("group_id"); addColumn("logical_time"); addColumn("is_dir"); + addColumn("num_xattrs"); } INodeRow getRow(NdbRecAttr* values[]) { @@ -284,6 +282,7 @@ class INodeTable : public DBTable { row.mGroupId = values[6]->int32_value(); row.mLogicalTime = values[7]->int32_value(); row.mIsDir = values[8]->int8_value() == 1; + row.mNumXAttrs = values[9]->int8_value(); return row; } @@ -334,15 +333,15 @@ class INodeTable : public DBTable { boost::unordered_map mutationsByInode; for (Fmq::iterator it = data_batch->begin(); it != data_batch->end(); ++it) { FsMutationRow row = *it; - if (!requiresINode(row)) { + if (!row.requiresReadingINode() || !row.isINodeOperation()) { continue; } mutationsByInode[row.mInodeId] = row; AnyMap pk; - pk[0] = row.mParentId; - pk[1] = row.mInodeName; - pk[2] = row.mPartitionId; + pk[0] = row.getParentId(); + pk[1] = row.getINodeName(); + pk[2] = row.getPartitionId(); anyVec.push_back(pk); } diff --git a/include/tables/MetadataLogTable.h b/include/tables/MetadataLogTable.h index 426ba65c..bc1e1a76 100644 --- a/include/tables/MetadataLogTable.h +++ b/include/tables/MetadataLogTable.h @@ -121,7 +121,7 @@ class MetadataLogTable : public DBWatchTable { addColumn("meta_pk3"); addColumn("meta_type"); addColumn("meta_op_type"); - addRecoveryIndex("PRIMARY"); + addRecoveryIndex(PRIMARY_INDEX); addWatchEvent(NdbDictionary::Event::TE_INSERT); } diff --git a/include/tables/XAttrTable.h b/include/tables/XAttrTable.h new file mode 100644 index 00000000..016988af --- /dev/null +++ b/include/tables/XAttrTable.h @@ -0,0 +1,246 @@ +/* + * Copyright (C) 2018 Hops.io + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ +/* + * File: XAttrTable.h + * Author: Mahmoud Ismail + * + */ + +#ifndef EPIPE_XATTRTABLE_H +#define EPIPE_XATTRTABLE_H +#include "DBTable.h" +#include "FsMutationsLogTable.h" +#include "MetadataLogTable.h" + + +struct XAttrRow { + Int64 mInodeId; + Int8 mNamespace; + string mName; + string mValue; + + + string to_upsert_json(){ + stringstream out; + out << getDocUpdatePrefix(mInodeId) << endl; + out << upsert() << endl; + return out.str(); + } + + static string to_delete_json(FsMutationRow row){ + stringstream out; + out << getDocUpdatePrefix(row.mInodeId) << endl; + out << removeXAttrScript(row.getXAttrName()) << endl; + return out.str(); + } + + string to_string(){ + stringstream stream; + stream << "-------------------------" << endl; + stream << "InodeId = " << mInodeId << endl; + stream << "Namespace = " << (int)mNamespace << endl; + stream << "Name = " << mName << endl; + stream << "Value = " << mValue << endl; + stream << "-------------------------" << endl; + return stream.str(); + } +private: + + string upsert() { + rapidjson::Document doc; + doc.Parse(getXAttrDoc(true).c_str()); + rapidjson::Document xattr(&doc.GetAllocator()); + if (!xattr.Parse(mValue.c_str()).HasParseError()) { + mergeDoc(doc, xattr); + rapidjson::StringBuffer sbDoc; + rapidjson::Writer docWriter(sbDoc); + doc.Accept(docWriter); + return string(sbDoc.GetString()); + } else { + LOG_DEBUG("XAttr is non json " << mName << "=" << mValue); + return getXAttrDoc(false); + } + + } + + void mergeDoc(rapidjson::Document& target, rapidjson::Document& source) { + for (rapidjson::Document::MemberIterator itr = source.MemberBegin(); itr != source.MemberEnd(); ++itr) { + target["doc"][XATTR_FIELD_NAME][mName.c_str()].AddMember(itr->name, + itr->value, target.GetAllocator()); + } + } + + static string getDocUpdatePrefix(Int64 inodeId){ + rapidjson::StringBuffer sbOp; + rapidjson::Writer opWriter(sbOp); + + opWriter.StartObject(); + + opWriter.String("update"); + opWriter.StartObject(); + + opWriter.String("_id"); + opWriter.Int64(inodeId); + + opWriter.EndObject(); + opWriter.EndObject(); + + return string(sbOp.GetString()); + } + + static string removeXAttrScript(string xattrname){ + + rapidjson::StringBuffer sbOp; + rapidjson::Writer opWriter(sbOp); + + opWriter.StartObject(); + + opWriter.String("script"); + + stringstream rmout; + rmout << "ctx._source." << XATTR_FIELD_NAME << ".remove(\"" << xattrname << "\")"; + opWriter.String(rmout.str().c_str()); + + opWriter.EndObject(); + + return string(sbOp.GetString()); + + } + + string getXAttrDoc(bool isJSONVal){ + + rapidjson::StringBuffer sbOp; + rapidjson::Writer opWriter(sbOp); + + opWriter.StartObject(); + + opWriter.String("doc"); + opWriter.StartObject(); + + opWriter.String(XATTR_FIELD_NAME); + opWriter.StartObject(); + + opWriter.String(mName.c_str()); + if(isJSONVal) { + opWriter.StartObject(); + opWriter.EndObject(); + }else{ + opWriter.String(mValue.c_str()); + } + opWriter.EndObject(); + + + opWriter.EndObject(); + opWriter.String("doc_as_upsert"); + opWriter.Bool(true); + + opWriter.EndObject(); + + return string(sbOp.GetString()); + } + +}; + +typedef vector XAttrVec; +typedef boost::unordered_map XAttrMap; + +class XAttrTable : public DBTable { + +public: + + XAttrTable() : DBTable("hdfs_xattrs"){ + addColumn("inode_id"); + addColumn("namespace"); + addColumn("name"); + addColumn("value"); + } + + XAttrRow getRow(NdbRecAttr* values[]) { + XAttrRow row; + row.mInodeId = values[0]->int64_value(); + row.mNamespace = values[1]->int8_value(); + row.mName = get_string(values[2]); + row.mValue = get_string(values[3]); + return row; + } + + XAttrRow get(Ndb* connection, Int64 inodeId, Int8 ns, string name) { + AnyMap a; + a[0] = inodeId; + a[1] = ns; + a[2] = name; + return DBTable::doRead(connection, a); + } + + XAttrMap get(Ndb* connection, Fmq* data_batch) { + AnyVec anyVec; + Fmq batchedMutations; + + Fmq addAllXattrs; + + for (Fmq::iterator it = data_batch->begin(); + it != data_batch->end(); ++it) { + FsMutationRow row = *it; + if (!row.requiresReadingXAttr() || !row.isXAttrOperation()) { + continue; + } + + if(row.mOperation == XAttrAddAll){ + addAllXattrs.push_back(row); + continue; + } + + AnyMap pk; + pk[0] = row.mInodeId; + pk[1] = row.getNamespace(); + pk[2] = row.getXAttrName(); + anyVec.push_back(pk); + batchedMutations.push_back(row); + } + + XAttrVec xattrs = doRead(connection, anyVec); + + XAttrMap results; + + int i=0; + for(XAttrVec::iterator it = xattrs.begin(); it != xattrs.end(); ++it, i++){ + XAttrRow xattr = *it; + FsMutationRow mr = batchedMutations[i]; + + XAttrVec xvec; + xvec.push_back(xattr); + results[mr.getPKStr()] = xvec; + } + + for(Fmq::iterator it = addAllXattrs.begin(); it != addAllXattrs.end(); + ++it){ + FsMutationRow mr = *it; + results[mr.getPKStr()] = getByInodeId(connection, mr.mInodeId); + } + + return results; + } + + XAttrVec getByInodeId(Ndb* connection, Int64 inodeId){ + AnyMap args; + args[0] = inodeId; + return doRead(connection, PRIMARY_INDEX, args); + } + +}; +#endif //EPIPE_XATTRTABLE_H diff --git a/src/FsMutationsDataReader.cpp b/src/FsMutationsDataReader.cpp index 1156bc93..c7a8c4ea 100644 --- a/src/FsMutationsDataReader.cpp +++ b/src/FsMutationsDataReader.cpp @@ -34,7 +34,7 @@ FsMutationsDataReader::FsMutationsDataReader(MConn connection, const bool hopswo void FsMutationsDataReader::processAddedandDeleted(Fmq* data_batch, FSBulk& bulk) { INodeMap inodes = mInodesTable.get(mNdbConnection.inodeConnection, data_batch); - + XAttrMap xattrs = mXAttrTable.get(mNdbConnection.inodeConnection, data_batch); if (mHopsworksEnalbed) { ULSet dataset_inode_ids; for (Fmq::iterator it = data_batch->begin(); it != data_batch->end(); ++it) { @@ -44,10 +44,11 @@ void FsMutationsDataReader::processAddedandDeleted(Fmq* data_batch, FSBulk& bulk mDatasetTable.loadProjectIds(mNdbConnection.metadataConnection, dataset_inode_ids); } - createJSON(data_batch, inodes, bulk); + createJSON(data_batch, inodes, xattrs, bulk); } -void FsMutationsDataReader::createJSON(Fmq* pending, INodeMap& inodes, FSBulk& bulk) { +void FsMutationsDataReader::createJSON(Fmq* pending, INodeMap& inodes, + XAttrMap& xattrs, FSBulk& bulk) { vector arrivalTimes(pending->size()); stringstream out; @@ -57,33 +58,71 @@ void FsMutationsDataReader::createJSON(Fmq* pending, INodeMap& inodes, FSBulk& b arrivalTimes[i] = row.mEventCreationTime; bulk.mPKs.mFSPKs.push_back(row.getPK()); - if (!requiresINode(row)) { - //Handle the delete, rename, and change dataset - out << INodeRow::to_json(row); - out << endl; - continue; - } - - if (inodes.find(row.mInodeId) == inodes.end()) { - LOG_DEBUG(" Data for " << row.mParentId << ", " << row.mInodeName - << ", " << row.mInodeId << " was not found"); - out << INodeRow::to_delete_json(row.mInodeId); - out << endl; - continue; - } - - INodeRow inode = inodes[row.mInodeId]; - - Int64 datasetINodeId = DONT_EXIST_INT(); - int projectId = DONT_EXIST_INT(); - if (mHopsworksEnalbed) { - datasetINodeId = row.mDatasetINodeId; - projectId = mDatasetTable.getProjectIdFromCache(row.mDatasetINodeId); + if(row.isINodeOperation()) { + if (!row.requiresReadingINode()) { + //Handle the delete, rename, and change dataset + out << INodeRow::to_json(row); + out << endl; + continue; + } + + if (inodes.find(row.mInodeId) == inodes.end()) { + LOG_DEBUG( + " Data for inode: " << row.getParentId() << ", " << row + .getINodeName() << ", " << row.mInodeId << " was not found"); + out << INodeRow::to_delete_json(row.mInodeId); + out << endl; + continue; + } + + INodeRow inode = inodes[row.mInodeId]; + + Int64 datasetINodeId = DONT_EXIST_INT(); + int projectId = DONT_EXIST_INT(); + if (mHopsworksEnalbed) { + datasetINodeId = row.mDatasetINodeId; + projectId = mDatasetTable.getProjectIdFromCache(row.mDatasetINodeId); + } + + string inodeJSON = inode.to_create_json(datasetINodeId, projectId); + + out << inodeJSON << endl; + } else if(row.isXAttrOperation()){ + if(!row.requiresReadingXAttr()){ + //handle delete xattr + out << XAttrRow::to_delete_json(row); + out << endl; + continue; + } + + string mutationpk = row.getPKStr(); + + if(xattrs.find(mutationpk) == xattrs.end()){ + LOG_DEBUG(" Data for xattr: " << row.getXAttrName() << ", " + << row.getNamespace() << " for inode " << row.mInodeId + << " was not ""found"); + out << XAttrRow::to_delete_json(row); + out << endl; + continue; + } + + XAttrVec xattr = xattrs[mutationpk]; + for(XAttrVec::iterator it = xattr.begin(); it != xattr.end() ; ++it){ + XAttrRow xAttrRow = *it; + if(xAttrRow.mInodeId == row.mInodeId){ + out << xAttrRow.to_upsert_json(); + out << endl; + }else{ + LOG_DEBUG(" Data for xattr: " << row.getXAttrName() << ", " + << row.getNamespace() << " for inode " << row.mInodeId + << " was not ""found"); + out << XAttrRow::to_delete_json(row); + out << endl; + } + } + }else{ + LOG_ERROR("Unknown fs operation " << row.to_string()); } - - string inodeJSON = inode.to_create_json(datasetINodeId, projectId); - - out << inodeJSON << endl; } bulk.mArrivalTimes = arrivalTimes; diff --git a/src/Reindexer.cpp b/src/Reindexer.cpp index fbad72db..c1255fef 100644 --- a/src/Reindexer.cpp +++ b/src/Reindexer.cpp @@ -28,6 +28,7 @@ #include "tables/DatasetTable.h" #include "tables/SchemabasedMetadataTable.h" #include "tables/SchemalessMetadataTable.h" +#include "tables/XAttrTable.h" struct DatasetInodes { Int64 mDatasetId; @@ -80,6 +81,7 @@ void Reindexer::run() { DatasetTable datasetsTable(mLRUCap); SchemabasedMetadataTable schemaBasedTable(mLRUCap); SchemalessMetadataTable schemalessTable; + XAttrTable xAttrTable; int projects = 0; int nonExistentProject = 0; @@ -119,6 +121,8 @@ void Reindexer::run() { DatasetInodesVec datasetStats; + ULSet inodesWithXAttrs; + for (DatasetInfoMap::iterator mapIt = dsInfoMap.begin(); mapIt != dsInfoMap.end(); ++mapIt) { Int64 datasetId = mapIt->first; int projectId = mapIt->second.mProjectId; @@ -140,6 +144,11 @@ void Reindexer::run() { if (inode.mIsDir) { dirs.push(inode.mId); } + + if(inode.mNumXAttrs > 0){ + inodesWithXAttrs.insert(inode.mId); + } + out << inode.to_create_json(datasetId, projectId) << endl; totalInodes++; datasetInodes++; @@ -163,6 +172,30 @@ void Reindexer::run() { LOG_INFO("Dataset [ " << dsi.mDatasetId << ", " << dsInfoMap[dsi.mDatasetId].mName << " ] has " << dsi.mTotalNumberOfInodes << " files/dirs."); } + int numXAttrs = 0; + int nonExistentXAttrs = 0; + for(ULSet::iterator it = inodesWithXAttrs.begin(); it != inodesWithXAttrs + .end(); ++it){ + Int64 inodeId = *it; + FSBulk bulk; + XAttrVec xattrs = xAttrTable.getByInodeId(conn, inodeId); + for(XAttrVec::iterator xit = xattrs.begin(); xit != xattrs.end(); ++xit){ + XAttrRow xAttrRow = *xit; + if(xAttrRow.mInodeId == inodeId){ + bulk.mJSON += xAttrRow.to_upsert_json(); + }else{ + LOG_WARN("XAttrs doesn't exists for [" + << inodeId << "] - " << xAttrRow.to_string()); + nonExistentXAttrs++; + } + mElasticSearch->addData(bulk); + numXAttrs++; + } + } + + LOG_INFO((numXAttrs - nonExistentXAttrs) << " XAttrs added, " + << nonExistentXAttrs << " doesn't exists"); + int extMetadata = 0; int nonExistentMetadata = 0; schemaBasedTable.getAll(metaConn);