diff --git a/include/MetadataReader.h b/include/MetadataReader.h index 50a313e9..2c4d7efc 100644 --- a/include/MetadataReader.h +++ b/include/MetadataReader.h @@ -53,7 +53,7 @@ class MetadataReader : public NdbDataReader{ public: MetadataReader(MConn* connections, const int num_readers, string elastic_ip, const bool hopsworks, const string elastic_index, const string elastic_inode_type, - ProjectDatasetINodeCache* cache, const int lru_cap); + const string elastic_ds_type, ProjectDatasetINodeCache* cache, const int lru_cap); virtual ~MetadataReader(); private: virtual ptime getEventCreationTime(MetadataEntry entry); @@ -74,7 +74,8 @@ class MetadataReader : public NdbDataReader{ NdbTransaction* inodesTransaction, UISet inodes_ids, UISet& datasets_to_read); string createJSON(UIRowMap tuples, Mq* data_batch); - + + const string mElasticDatasetType; Cache mFieldsCache; Cache mTablesCache; Cache mTemplatesCache; diff --git a/src/MetadataReader.cpp b/src/MetadataReader.cpp index c4a2d171..075749b9 100644 --- a/src/MetadataReader.cpp +++ b/src/MetadataReader.cpp @@ -64,9 +64,9 @@ const int DONT_EXIST_INT = -1; const char* DONT_EXIST_STR = "-1"; MetadataReader::MetadataReader(MConn* connections, const int num_readers, string elastic_ip, - const bool hopsworks, const string elastic_index, const string elastic_inode_type, + const bool hopsworks, const string elastic_index, const string elastic_inode_type, const string elastic_ds_type, ProjectDatasetINodeCache* cache, const int lru_cap) : NdbDataReader(connections, - num_readers, elastic_ip, hopsworks, elastic_index, elastic_inode_type, cache), + num_readers, elastic_ip, hopsworks, elastic_index, elastic_inode_type, cache), mElasticDatasetType(elastic_ds_type), mFieldsCache(lru_cap, "Field"), mTablesCache(lru_cap, "Table"), mTemplatesCache(lru_cap, "Template"){ } @@ -369,9 +369,6 @@ string MetadataReader::createJSON(UIRowMap tuples, Mq* data_batch) { opWriter.String("_index"); opWriter.String(mElasticIndex.c_str()); - opWriter.String("_type"); - opWriter.String(mElasticInodeType.c_str()); - if(mHopsworksEnalbed){ int datasetId = mPDICache->getDatasetId(inodeId); // set project (rounting) and dataset (parent) ids @@ -380,6 +377,14 @@ string MetadataReader::createJSON(UIRowMap tuples, Mq* data_batch) { opWriter.String("_routing"); opWriter.Int(mPDICache->getProjectId(datasetId)); + + const char* type = (datasetId == inodeId) ? mElasticDatasetType.c_str() : mElasticInodeType.c_str(); + opWriter.String("_type"); + opWriter.String(type); + + }else{ + opWriter.String("_type"); + opWriter.String(mElasticInodeType.c_str()); } opWriter.String("_id"); diff --git a/src/Notifier.cpp b/src/Notifier.cpp index 0c4c7511..888135d3 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -83,7 +83,7 @@ void Notifier::setup() { } mMetadataReader = new MetadataReader(metadata_connections, mNumNdbReaders, mElasticAddr, - mHopsworksEnabled, mElasticIndex, mElasticInodeType, mPDICache, mLRUCap); + mHopsworksEnabled, mElasticIndex, mElasticInodeType, mElasticDatasetType, mPDICache, mLRUCap); mMetadataBatcher = new MetadataBatcher(mMetadataTableTailer, mMetadataReader, mTimeBeforeIssuingNDBReqs, mBatchSize); if (mHopsworksEnabled) { diff --git a/src/ProjectDatasetINodeCache.cpp b/src/ProjectDatasetINodeCache.cpp index 1208751c..96548387 100644 --- a/src/ProjectDatasetINodeCache.cpp +++ b/src/ProjectDatasetINodeCache.cpp @@ -31,6 +31,7 @@ ProjectDatasetINodeCache::ProjectDatasetINodeCache(const int lru_cap) void ProjectDatasetINodeCache::addINodeToDataset(int inodeId, int datasetId) { mINodeToDataset.put(inodeId, datasetId); + mINodeToDataset.put(datasetId, datasetId); if(!mDatasetToINodes.contains(datasetId)){ mDatasetToINodes.put(datasetId, new UISet()); @@ -41,6 +42,7 @@ void ProjectDatasetINodeCache::addINodeToDataset(int inodeId, int datasetId) { void ProjectDatasetINodeCache::addDatasetToProject(int datasetId, int projectId) { mDatasetToProject.put(datasetId, projectId); + mINodeToDataset.put(datasetId, datasetId); if(!mProjectToDataset.contains(projectId)){ mProjectToDataset.put(projectId, new UISet());