From 408102d9b794e7cb987f1db9f1b76b20627b2505 Mon Sep 17 00:00:00 2001 From: Mahmoud Ismail Date: Thu, 9 Jun 2016 16:26:28 +0200 Subject: [PATCH] insert bulks into elasticsearch asynchronously, use libcurl instead of cppnetlib populate dataset cache from databaase in case of cache miss --- CMakeLists.txt | 8 +- include/Batcher.h | 4 +- include/DatasetTableTailer.h | 10 +- include/ElasticSearch.h | 85 +++++++++ include/FsMutationsDataReader.h | 5 +- include/Logger.h | 1 + include/MetadataReader.h | 6 +- include/NdbDataReader.h | 30 +--- include/Notifier.h | 8 +- include/ProjectTableTailer.h | 10 +- include/Utils.h | 167 +----------------- include/Version.h | 2 +- src/Batcher.cpp | 10 +- src/DatasetTableTailer.cpp | 73 +++++--- src/ElasticSearch.cpp | 295 ++++++++++++++++++++++++++++++++ src/FsMutationsDataReader.cpp | 20 +-- src/Logger.cpp | 4 + src/MetadataReader.cpp | 21 +-- src/Notifier.cpp | 21 ++- src/ProjectTableTailer.cpp | 18 +- src/main.cpp | 19 +- 21 files changed, 526 insertions(+), 291 deletions(-) create mode 100644 include/ElasticSearch.h create mode 100644 src/ElasticSearch.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 587c4535..df585df0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,19 +3,19 @@ project (ePipe) cmake_minimum_required (VERSION 2.8) set(EPIPE_VERSION_MAJOR 0) -set(EPIPE_VERSION_MINOR 0) +set(EPIPE_VERSION_MINOR 1) set(EPIPE_VERSION_BUILD 1) configure_file(Version.h.in ${CMAKE_SOURCE_DIR}/include/Version.h) set(Boost_USE_STATIC_LIBS ON) find_package(Boost 1.54 REQUIRED COMPONENTS system date_time program_options thread) -find_package(cppnetlib 0.11.1 REQUIRED uri client-connections) +find_package(CURL 7.35.0) find_package(RapidJSON 1.0.2) include_directories(${Boost_INCLUDE_DIRS}) -include_directories(${CPPNETLIB_INCLUDE_DIRS}) include_directories(${RAPIDJSON_INCLUDE_DIRS}) +include_directories(${CURL_INCLUDE_DIRS}) include_directories (${CMAKE_SOURCE_DIR}/include) if(NOT NDB_DIR) @@ -32,4 +32,4 @@ file(GLOB SOURCE ${CMAKE_SOURCE_DIR}/src/*.cpp) add_executable(ePipe ${SOURCE}) -target_link_libraries(ePipe ${Boost_LIBRARIES} ${CPPNETLIB_LIBRARIES} ndbclient pthread) +target_link_libraries(ePipe ${Boost_LIBRARIES} ${CURL_LIBRARIES} ndbclient pthread) diff --git a/include/Batcher.h b/include/Batcher.h index 97ebb634..830c8d5b 100644 --- a/include/Batcher.h +++ b/include/Batcher.h @@ -29,7 +29,7 @@ class Batcher { public: - Batcher(const int time_before_issuing_ndb_reqs, const int batch_size); + Batcher(const int time_to_wait, const int batch_size); void start(); void waitToFinish(); virtual ~Batcher(); @@ -46,7 +46,7 @@ class Batcher { bool mStarted; bool mFirstTimer; - const int mTimeBeforeIssuingNDBReqs; + const int mTimeToWait; boost::thread mTimerThread; void startTimer(); diff --git a/include/DatasetTableTailer.h b/include/DatasetTableTailer.h index 17e27d12..90181183 100644 --- a/include/DatasetTableTailer.h +++ b/include/DatasetTableTailer.h @@ -28,20 +28,20 @@ #include "TableTailer.h" #include "ProjectDatasetINodeCache.h" +#include "ElasticSearch.h" class DatasetTableTailer : public TableTailer{ public: - DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, string elastic_addr, - const string elastic_index, const string elastic_dataset_type, ProjectDatasetINodeCache* cache); + DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, ElasticSearch* elastic, + ProjectDatasetINodeCache* cache); virtual ~DatasetTableTailer(); static void updateProjectIds(const NdbDictionary::Dictionary* database, NdbTransaction* transaction, UISet dataset_ids, ProjectDatasetINodeCache* cache); private: static const WatchTable TABLE; virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]); - string mElasticAddr; - const string mElasticIndex; - const string mElasticDatasetType; + + ElasticSearch* mElasticSearch; ProjectDatasetINodeCache* mPDICache; }; diff --git a/include/ElasticSearch.h b/include/ElasticSearch.h new file mode 100644 index 00000000..40f1d27d --- /dev/null +++ b/include/ElasticSearch.h @@ -0,0 +1,85 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +/* + * File: ElasticSearch.h + * Author: Mahmoud Ismail + * + * Created on June 9, 2016, 1:39 PM + */ + +#ifndef ELASTICSEARCH_H +#define ELASTICSEARCH_H +#include "Batcher.h" +#include "ConcurrentQueue.h" +#include +#include "rapidjson/document.h" + +enum HttpOp { + HTTP_POST, + HTTP_DELETE +}; + +class ElasticSearch : public Batcher{ +public: + ElasticSearch(string elastic_addr, string index, string proj_type, string ds_type, + string inode_type, int time_to_wait_before_inserting, int bulk_size); + + void addBulk(string json); + + bool addProject(int projectId, string json); + bool deleteProject(int projectId); + bool deleteProjectChildren(int projectId, string json); + + bool addDataset(int projectId, int datasetId, string json); + bool deleteDataset(int projectId, int datasetId); + bool deleteDatasetChildren(int projectId, int datasetId, string json); + + const char* getIndex(); + const char* getProjectType(); + const char* getDatasetType(); + const char* getINodeType(); + + virtual ~ElasticSearch(); +private: + const string mIndex; + const string mProjectType; + const string mDatasetType; + const string mInodeType; + + string mElasticAddr; + string mElasticBulkAddr; + + ConcurrentQueue mQueue; + + mutable boost::mutex mLock; + stringstream mToProcess; + int mToProcessLength; + + CURL* mHttpHandle; + + void addToProcess(string str); + string resetToProcess(); + + virtual void run(); + virtual void processBatch(); + + string getElasticSearchBulkUrl(); + string getElasticSearchUrlonIndex(string index); + string getElasticSearchUrlOnDoc(string index, string type, int doc); + string getElasticSearchUpdateDocUrl(string index, string type, int doc); + string getElasticSearchUpdateDocUrl(string index, string type, int doc, int parent); + string getElasticSearchDeleteDocUrl(string index, string type, int doc, int parent); + string getElasticSearchDeleteByQueryUrl(string index, int routing); + string getElasticSearchDeleteByQueryUrl(string index, int parent, int routing); + + bool elasticSearchHttpRequest(HttpOp op, string elasticUrl, string json); + bool elasticSearchHttpRequestInternal(HttpOp op, string elasticUrl, string json); + bool parseResponse(string response); +}; + +#endif /* ELASTICSEARCH_H */ + diff --git a/include/FsMutationsDataReader.h b/include/FsMutationsDataReader.h index 1d42e649..b57674dc 100644 --- a/include/FsMutationsDataReader.h +++ b/include/FsMutationsDataReader.h @@ -32,9 +32,8 @@ typedef vector Rows; class FsMutationsDataReader : public NdbDataReader{ public: - FsMutationsDataReader(MConn* connections, const int num_readers, string elastic_ip, - const bool hopsworks, const string elastic_index, const string elastic_inode_type, - ProjectDatasetINodeCache* cache, const int lru_cap); + FsMutationsDataReader(MConn* connections, const int num_readers, const bool hopsworks, + ElasticSearch* elastic, ProjectDatasetINodeCache* cache, const int lru_cap); virtual ~FsMutationsDataReader(); private: Cache mUsersCache; diff --git a/include/Logger.h b/include/Logger.h index 413bada4..fe50021f 100644 --- a/include/Logger.h +++ b/include/Logger.h @@ -69,6 +69,7 @@ class Logger{ static void error(const char* msg); static void fatal(const char* msg); + static bool isTrace(); private: static Logger* mInstance; static int mLoggerLevel; diff --git a/include/MetadataReader.h b/include/MetadataReader.h index 2c4d7efc..c7b35643 100644 --- a/include/MetadataReader.h +++ b/include/MetadataReader.h @@ -51,9 +51,8 @@ struct Table{ class MetadataReader : public NdbDataReader{ public: - MetadataReader(MConn* connections, const int num_readers, string elastic_ip, - const bool hopsworks, const string elastic_index, const string elastic_inode_type, - const string elastic_ds_type, ProjectDatasetINodeCache* cache, const int lru_cap); + MetadataReader(MConn* connections, const int num_readers, const bool hopsworks, + ElasticSearch* elastic, ProjectDatasetINodeCache* cache, const int lru_cap); virtual ~MetadataReader(); private: virtual ptime getEventCreationTime(MetadataEntry entry); @@ -75,7 +74,6 @@ class MetadataReader : public NdbDataReader{ string createJSON(UIRowMap tuples, Mq* data_batch); - const string mElasticDatasetType; Cache mFieldsCache; Cache mTablesCache; Cache mTemplatesCache; diff --git a/include/NdbDataReader.h b/include/NdbDataReader.h index 282f34f4..1a4bc42e 100644 --- a/include/NdbDataReader.h +++ b/include/NdbDataReader.h @@ -30,6 +30,7 @@ #include "Cache.h" #include "Utils.h" #include "ProjectDatasetINodeCache.h" +#include "ElasticSearch.h" #include #include @@ -42,7 +43,6 @@ namespace bc = boost::accumulators; typedef bc::accumulator_set > Accumulator; using namespace Utils; -using namespace Utils::ElasticSearch; using namespace Utils::NdbC; struct BatchStats{ @@ -65,17 +65,13 @@ struct MConn{ template class NdbDataReader { public: - NdbDataReader(Conn* connections, const int num_readers, string elastic_addr, - const bool hopsworks, const string elastic_index, const string elastic_inode_type, - ProjectDatasetINodeCache* cache); + NdbDataReader(Conn* connections, const int num_readers, const bool hopsworks, + ElasticSearch* elastic, ProjectDatasetINodeCache* cache); void start(); void processBatch(vector* data_batch); virtual ~NdbDataReader(); private: - const string mElasticAddr; - string mElasticBulkUrl; - bool mStarted; std::vector mThreads; @@ -87,8 +83,7 @@ class NdbDataReader { const int mNumReaders; Conn* mNdbConnections; const bool mHopsworksEnalbed; - const string mElasticIndex; - const string mElasticInodeType; + ElasticSearch* mElasticSearch; ProjectDatasetINodeCache* mPDICache; //Accumulator mQueuingAcc; //Accumulator mBatchingAcc; @@ -96,30 +91,19 @@ class NdbDataReader { virtual ptime getEventCreationTime(Data row) = 0; virtual BatchStats readData(Conn connection, vector* data_batch) = 0; - void bulkUpdateElasticSearch(string json); string getAccString(Accumulator acc); }; template NdbDataReader::NdbDataReader(Conn* connections, const int num_readers, - string elastic_ip, const bool hopsworks, const string elastic_index, - const string elastic_inode_type, ProjectDatasetINodeCache* cache) - : mElasticAddr(elastic_ip), mNumReaders(num_readers), mNdbConnections(connections), - mHopsworksEnalbed(hopsworks), mElasticIndex(elastic_index), mElasticInodeType(elastic_inode_type), - mPDICache(cache){ + const bool hopsworks,ElasticSearch* elastic, ProjectDatasetINodeCache* cache) + : mNumReaders(num_readers), mNdbConnections(connections), + mHopsworksEnalbed(hopsworks), mElasticSearch(elastic), mPDICache(cache){ mStarted = false; - mElasticBulkUrl = getElasticSearchBulkUrl(mElasticAddr); mBatchedQueue = new ConcurrentQueue*>(); } -template -void NdbDataReader::bulkUpdateElasticSearch(string json) { - if(!elasticSearchPOST(mElasticBulkUrl, json)){ - //TODO: handle elasticsearch failures - } -} - template void NdbDataReader::start() { if (mStarted) { diff --git a/include/Notifier.h b/include/Notifier.h index aca1ad85..8575298f 100644 --- a/include/Notifier.h +++ b/include/Notifier.h @@ -29,6 +29,7 @@ #include "MetadataBatcher.h" #include "ProjectTableTailer.h" #include "DatasetTableTailer.h" +#include "ElasticSearch.h" class Notifier { public: @@ -36,7 +37,8 @@ class Notifier { const int time_before_issuing_ndb_reqs, const int batch_size, const int poll_maxTimeToWait, const int num_ndb_readers, const string elastic_addr, const bool hopsworks, const string elastic_index, const string elasttic_project_type, - const string elastic_dataset_type, const string elastic_inode_type, const int lru_cap); + const string elastic_dataset_type, const string elastic_inode_type, const int elastic_batch_size, + const int elastic_issue_time, const int lru_cap); void start(); virtual ~Notifier(); @@ -56,8 +58,12 @@ class Notifier { const string mElastticProjectType; const string mElasticDatasetType; const string mElasticInodeType; + const int mElasticBatchsize; + const int mElasticIssueTime; const int mLRUCap; + ElasticSearch* mElasticSearch; + FsMutationsTableTailer* mFsMutationsTableTailer; FsMutationsDataReader* mFsMutationsDataReader; FsMutationsBatcher* mFsMutationsBatcher; diff --git a/include/ProjectTableTailer.h b/include/ProjectTableTailer.h index a19a5847..8696bf3d 100644 --- a/include/ProjectTableTailer.h +++ b/include/ProjectTableTailer.h @@ -27,20 +27,20 @@ #include "TableTailer.h" #include "ProjectDatasetINodeCache.h" +#include "ElasticSearch.h" class ProjectTableTailer : public TableTailer{ public: - ProjectTableTailer(Ndb* ndb, const int poll_maxTimeToWait, string elastic_addr, - const string elastic_index, const string elastic_project_type, ProjectDatasetINodeCache* cache); + ProjectTableTailer(Ndb* ndb, const int poll_maxTimeToWait, ElasticSearch* elastic, + ProjectDatasetINodeCache* cache); virtual ~ProjectTableTailer(); private: static const WatchTable TABLE; virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]); - string mElasticAddr; - const string mElasticIndex; - const string mElasticProjectType; + + ElasticSearch* mElasticSearch; ProjectDatasetINodeCache* mPDICache; }; diff --git a/include/Utils.h b/include/Utils.h index fa3fe343..bb8afba6 100644 --- a/include/Utils.h +++ b/include/Utils.h @@ -28,11 +28,8 @@ #include "common.h" #include #include -#include "boost/network.hpp" -#include "rapidjson/document.h" typedef boost::posix_time::ptime ptime; -typedef boost::network::http::client httpclient; typedef vector Row; typedef boost::unordered_map UIRowMap; @@ -64,9 +61,9 @@ namespace Utils { if (!col) LOG_NDB_API_ERROR(op->getNdbError()); return col; } - - inline static NdbIndexOperation* getNdbIndexOperation(NdbTransaction* transaction, const NdbDictionary::Index* index) { - NdbIndexOperation* op = transaction->getNdbIndexOperation(index); + + inline static NdbIndexScanOperation* getNdbIndexScanOperation(NdbTransaction* transaction, const NdbDictionary::Index* index) { + NdbIndexScanOperation* op = transaction->getNdbIndexScanOperation(index); if (!op) LOG_NDB_API_ERROR(transaction->getNdbError()); return op; } @@ -212,164 +209,6 @@ namespace Utils { } } - - namespace ElasticSearch { - - inline static string _getElasticSearchBaseUrl(string elasticUrl) { - string url = "http://" + elasticUrl; - return url; - } - - inline static string getElasticSearchBulkUrl(string elasticUrl) { - string bulkUrl = _getElasticSearchBaseUrl(elasticUrl) + "/_bulk"; - return bulkUrl; - } - - inline static string _getElasticSearchUrlonIndex(string elasticUrl, string index) { - string str = _getElasticSearchBaseUrl(elasticUrl) + "/" + index; - return str; - } - - inline static string getElasticSearchUrlOnDoc(string elasticUrl, string index, string type, int doc) { - stringstream out; - out << _getElasticSearchUrlonIndex(elasticUrl, index) << "/" << type << "/" << doc; - return out.str(); - } - - inline static string getElasticSearchUpdateDocUrl(string elasticUrl, string index, string type, int doc) { - string str = getElasticSearchUrlOnDoc(elasticUrl, index, type, doc) + "/_update"; - return str; - } - - inline static string getElasticSearchUpdateDocUrl(string elasticUrl, string index, string type, int doc, int parent) { - stringstream out; - out << getElasticSearchUpdateDocUrl(elasticUrl, index, type, doc) << "?parent=" << parent; - return out.str(); - } - - inline static string getElasticSearchDeleteDocUrl(string elasticUrl, string index, string type, int doc, int parent) { - stringstream out; - out << getElasticSearchUrlOnDoc(elasticUrl, index, type, doc) << "?parent=" << parent; - return out.str(); - } - - inline static string getElasticSearchDeleteByQueryUrl(string elasticUrl, string index, int routing) { - stringstream out; - out << _getElasticSearchUrlonIndex(elasticUrl, index) << "/_query" << "?routing=" << routing; - return out.str(); - } - - inline static string getElasticSearchDeleteByQueryUrl(string elasticUrl, string index, int parent, int routing) { - stringstream out; - out << getElasticSearchDeleteByQueryUrl(elasticUrl, index, routing) << "&parent=" << parent; - return out.str(); - } - - enum HttpOp { - HTTP_POST, - HTTP_DELETE - }; - - inline static bool elasticSearchHttpRequest(HttpOp op, string elasticUrl, string json) { - //TODO: support retry if server is down - try { - httpclient::request request_(elasticUrl); - request_ << boost::network::header("Connection", "close"); - - if (!json.empty()) { - request_ << boost::network::header("Content-Type", "application/json"); - - char body_str_len[8]; - sprintf(body_str_len, "%lu", json.length()); - - request_ << boost::network::header("Content-Length", body_str_len); - request_ << boost::network::body(json); - } - - string opString; - httpclient client_; - httpclient::response response_; - switch (op) { - case HTTP_POST: - response_ = client_.post(request_); - opString = "POST"; - break; - case HTTP_DELETE: - response_ = client_.delete_(request_); - opString = "DELETE"; - break; - } - std::string body_ = boost::network::http::body(response_); - - LOG_TRACE(opString << " " << elasticUrl << endl - << json << endl << "Response::" << endl << body_); - - rapidjson::Document d; - if (!d.Parse<0>(body_.c_str()).HasParseError()) { - if (d.HasMember("errors")) { - const rapidjson::Value &bulkErrors = d["errors"]; - if (bulkErrors.IsBool() && bulkErrors.GetBool()) { - const rapidjson::Value &items = d["items"]; - stringstream errors; - for (rapidjson::SizeType i = 0; i < items.Size(); ++i) { - const rapidjson::Value &obj = items[i]; - for (rapidjson::Value::ConstMemberIterator itr = obj.MemberBegin(); itr != obj.MemberEnd(); ++itr) - { - const rapidjson::Value & opObj = itr->value; - if(opObj.HasMember("error")){ - const rapidjson::Value & error = opObj["error"]; - if(error.IsObject()){ - const rapidjson::Value & errorType = error["type"]; - const rapidjson::Value & errorReason = error["reason"]; - errors << errorType.GetString() << ":" << errorReason.GetString(); - }else if(error.IsString()){ - errors << error.GetString(); - } - errors << ", "; - } - } - } - string errorsStr = errors.str(); - LOG_ERROR(" ES got errors: " << errorsStr); - return false; - } - } else if (d.HasMember("error")) { - const rapidjson::Value &error = d["error"]; - if (error.IsObject()) { - const rapidjson::Value & errorType = error["type"]; - const rapidjson::Value & errorReason = error["reason"]; - LOG_ERROR(" ES got error: " << errorType.GetString() << ":" << errorReason.GetString()); - } else if (error.IsString()) { - LOG_ERROR(" ES got error: " << error.GetString()); - } - return false; - } - }else{ - LOG_ERROR(" ES got json error (" << d.GetParseError() << ") while parsing " << body_); - return false; - } - - } catch (std::exception &e) { - LOG_ERROR(e.what()); - return false; - } - return true; - } - - inline static bool elasticSearchPOST(string elasticUrl, string json) { - return elasticSearchHttpRequest(HTTP_POST, elasticUrl, json); - } - - inline static bool elasticSearchDELETE(string elasticUrl) { - return elasticSearchHttpRequest(HTTP_DELETE, elasticUrl, string()); - } - - inline static bool elasticSearchDELETE(string elasticUrl, string json) { - return elasticSearchHttpRequest(HTTP_DELETE, elasticUrl, json); - } - - } - inline static ptime getCurrentTime() { return boost::posix_time::microsec_clock::local_time(); } diff --git a/include/Version.h b/include/Version.h index b43e2e33..81e1958c 100644 --- a/include/Version.h +++ b/include/Version.h @@ -26,7 +26,7 @@ #define VERSION_H_IN #define EPIPE_VERSION_MAJOR 0 -#define EPIPE_VERSION_MINOR 0 +#define EPIPE_VERSION_MINOR 1 #define EPIPE_VERSION_BUILD 1 #endif /* VERSION_H_IN */ diff --git a/src/Batcher.cpp b/src/Batcher.cpp index ade397ed..61d866f6 100644 --- a/src/Batcher.cpp +++ b/src/Batcher.cpp @@ -28,9 +28,9 @@ #include #include -Batcher::Batcher(const int time_before_issuing_ndb_reqs, const int batch_size) +Batcher::Batcher(const int time_to_wait, const int batch_size) : mBatchSize(batch_size), mTimerProcessing(false), mStarted(false), - mFirstTimer(true), mTimeBeforeIssuingNDBReqs(time_before_issuing_ndb_reqs) { + mFirstTimer(true), mTimeToWait(time_to_wait) { srand(time(NULL)); } @@ -60,10 +60,10 @@ void Batcher::startTimer() { void Batcher::timerThread() { while (true) { boost::asio::io_service io; - int timeout = mTimeBeforeIssuingNDBReqs; + int timeout = mTimeToWait; if(mFirstTimer){ - int baseTime = mTimeBeforeIssuingNDBReqs / 4; - timeout = rand() % (mTimeBeforeIssuingNDBReqs - baseTime) + baseTime; + int baseTime = mTimeToWait / 4; + timeout = rand() % (mTimeToWait - baseTime) + baseTime; mFirstTimer = false; LOG_TRACE("fire the first timer after " << timeout << " msec"); } diff --git a/src/DatasetTableTailer.cpp b/src/DatasetTableTailer.cpp index 893e0e85..3b964145 100644 --- a/src/DatasetTableTailer.cpp +++ b/src/DatasetTableTailer.cpp @@ -25,10 +25,9 @@ #include "DatasetTableTailer.h" using namespace Utils::NdbC; -using namespace Utils::ElasticSearch; const char* _dataset_table= "dataset"; -const int _dataset_noCols= 7; +const int _dataset_noCols= 8; const char* _dataset_cols[_dataset_noCols]= {"inode_id", "inode_pid", @@ -36,7 +35,8 @@ const char* _dataset_cols[_dataset_noCols]= "projectId", "description", "public_ds", - "searchable" + "searchable", + "shared" }; const int _dataset_noEvents = 3; @@ -49,23 +49,26 @@ const NdbDictionary::Event::TableEvent _dataset_events[_dataset_noEvents] = const WatchTable DatasetTableTailer::TABLE = {_dataset_table, _dataset_cols, _dataset_noCols , _dataset_events, _dataset_noEvents}; -DatasetTableTailer::DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, string elastic_addr, - const string elastic_index, const string elastic_dataset_type,ProjectDatasetINodeCache* cache) - : TableTailer(ndb, TABLE, poll_maxTimeToWait), mElasticAddr(elastic_addr), mElasticIndex(elastic_index), - mElasticDatasetType(elastic_dataset_type), mPDICache(cache){ +DatasetTableTailer::DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, + ElasticSearch* elastic,ProjectDatasetINodeCache* cache) + : TableTailer(ndb, TABLE, poll_maxTimeToWait), mElasticSearch(elastic), mPDICache(cache){ } void DatasetTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]) { int id = value[0]->int32_value(); int projectId = value[3]->int32_value(); + bool originalDS = value[7]->int8_value() == 0; + + if(!originalDS){ + LOG_DEBUG("Ignore shared Dataset [" << id << "] in Project [" << projectId << "]"); + return; + } if(eventType == NdbDictionary::Event::TE_DELETE){ - string deleteDatasetUrl = getElasticSearchDeleteDocUrl(mElasticAddr, mElasticIndex, mElasticDatasetType, id, projectId); - if(elasticSearchDELETE(deleteDatasetUrl)){ + if(mElasticSearch->deleteDataset(projectId, id)){ LOG_INFO("Delete Dataset[" << id << "]: Succeeded"); } - string deteteDatasetChildren = getElasticSearchDeleteByQueryUrl(mElasticAddr, mElasticIndex, id, projectId); rapidjson::StringBuffer sbDoc; rapidjson::Writer docWriter(sbDoc); @@ -85,7 +88,7 @@ void DatasetTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, docWriter.EndObject(); //TODO: handle failures in elastic search - if(elasticSearchDELETE(deteteDatasetChildren, string(sbDoc.GetString()))){ + if(mElasticSearch->deleteDatasetChildren(projectId, id, string(sbDoc.GetString()))){ LOG_INFO("Delete Dataset[" << id << "] children inodes: Succeeded"); } @@ -131,8 +134,7 @@ void DatasetTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, docWriter.EndObject(); string data = string(sbDoc.GetString()); - string url = getElasticSearchUpdateDocUrl(mElasticAddr, mElasticIndex, mElasticDatasetType, id, projectId); - if(elasticSearchPOST(url, data)){ + if(mElasticSearch->addDataset(projectId, id, data)){ LOG_INFO("Add Dataset[" << id << "]: Succeeded"); } } @@ -140,30 +142,55 @@ void DatasetTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, void DatasetTableTailer::updateProjectIds(const NdbDictionary::Dictionary* database, NdbTransaction* transaction, UISet dataset_ids, ProjectDatasetINodeCache* cache) { - const NdbDictionary::Index * index= database->getIndex(Utils::concat(_dataset_cols[0], "$unique"), TABLE.mTableName); + const NdbDictionary::Index * index= database->getIndex(_dataset_cols[0], TABLE.mTableName); + vector indexScanOps; UIRowMap rows; for (UISet::iterator it = dataset_ids.begin(); it != dataset_ids.end(); ++it) { - NdbIndexOperation* op = getNdbIndexOperation(transaction, index); - op->readTuple(NdbOperation::LM_CommittedRead); + NdbIndexScanOperation* op = getNdbIndexScanOperation(transaction, index); + op->readTuples(NdbOperation::LM_CommittedRead); + op->equal(_dataset_cols[0], *it); NdbRecAttr* id_col = getNdbOperationValue(op, _dataset_cols[0]); NdbRecAttr* proj_id_col = getNdbOperationValue(op, _dataset_cols[3]); + NdbRecAttr* shared_col = getNdbOperationValue(op, _dataset_cols[7]); rows[*it].push_back(id_col); rows[*it].push_back(proj_id_col); + rows[*it].push_back(shared_col); + indexScanOps.push_back(op); } executeTransaction(transaction, NdbTransaction::NoCommit); - for (UIRowMap::iterator it = rows.begin(); it != rows.end(); ++it) { - if (it->first != it->second[0]->int32_value()) { - LOG_ERROR("Dataset [" << it->first << "] doesn't exists"); - continue; + int i = 0; + for (UIRowMap::iterator it = rows.begin(); it != rows.end(); ++it, i++) { + + stringstream projs; + int res=0; + while (indexScanOps[i]->nextResult(true) == 0) { + if (it->first != it->second[0]->int32_value()) { + LOG_ERROR("Dataset [" << it->first << "] doesn't exists"); + continue; + } + + bool originalDs = it->second[2]->int8_value() == 0; + + if(!originalDs){ + continue; + } + + int projectId = it->second[1]->int32_value(); + if(res == 0){ + cache->addDatasetToProject(it->first, projectId); + } + projs << projectId << ","; + res++; + } + + if(res > 1){ + LOG_FATAL("Got " << res << " rows of the original Dataset [" << it->first << "] in projects [" << projs.str() << "], only one was expected"); } - - int projectId = it->second[1]->int32_value(); - cache->addDatasetToProject(it->first, projectId); } } diff --git a/src/ElasticSearch.cpp b/src/ElasticSearch.cpp new file mode 100644 index 00000000..f1a808ce --- /dev/null +++ b/src/ElasticSearch.cpp @@ -0,0 +1,295 @@ +/* + * Copyright (C) 2016 Hops.io + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +/* + * File: ElasticSearch.cpp + * Author: Mahmoud Ismail + * + */ + +#include "ElasticSearch.h" +#include "Utils.h" + +static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) +{ + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; +} + +static const char* getStr(HttpOp op) { + switch (op) { + case HTTP_POST: + return "POST"; + case HTTP_DELETE: + return "DELETE"; + } + return "UNKOWN"; +} + +ElasticSearch::ElasticSearch(string elastic_addr, string index, string proj_type, + string ds_type, string inode_type, int time_to_wait_before_inserting, + int bulk_size) : Batcher(time_to_wait_before_inserting, bulk_size), + mIndex(index), mProjectType(proj_type), mDatasetType(ds_type), mInodeType(inode_type), mToProcessLength(0){ + + mElasticAddr = "http://" + elastic_addr; + mElasticBulkAddr = mElasticAddr + "/" + mIndex + "/" + mInodeType + "/_bulk"; + + curl_global_init(CURL_GLOBAL_ALL); + mHttpHandle = curl_easy_init(); +} + +void ElasticSearch::addBulk(string json) { + LOG_TRACE("Add Bulk JSON:" << endl << json << endl); + mQueue.push(json); +} + +void ElasticSearch::run() { + while(true){ + string msg; + mQueue.wait_and_pop(msg); + + addToProcess(msg); + if (mToProcessLength >= mBatchSize && !mTimerProcessing) { + processBatch(); + } + } +} + +void ElasticSearch::processBatch() { + if(mToProcessLength > 0){ + LOG_TRACE("Process Bulk JSONs [" << mToProcessLength << "]"); + string batch = resetToProcess(); + //TODO: handle failures + elasticSearchHttpRequest(HTTP_POST, mElasticBulkAddr, batch); + } +} + +void ElasticSearch::addToProcess(string str) { + boost::mutex::scoped_lock lock(mLock); + mToProcess << str; + mToProcessLength += str.length(); +} + +string ElasticSearch::resetToProcess() { + boost::mutex::scoped_lock lock(mLock); + string res = mToProcess.str(); + mToProcess.clear(); + mToProcess.str(string()); + mToProcessLength = 0; + return res; +} + +const char* ElasticSearch::getIndex() { + return mIndex.c_str(); +} + +const char* ElasticSearch::getProjectType() { + return mProjectType.c_str(); +} + +const char* ElasticSearch::getDatasetType() { + return mDatasetType.c_str(); +} + +const char* ElasticSearch::getINodeType() { + return mInodeType.c_str(); +} + +bool ElasticSearch::addProject(int projectId, string json) { + string url = getElasticSearchUpdateDocUrl(mIndex, mProjectType, projectId); + return elasticSearchHttpRequest(HTTP_POST, url, json); +} + +bool ElasticSearch::deleteProject(int projectId) { + string deleteProjUrl = getElasticSearchUrlOnDoc(mIndex, mProjectType, projectId); + return elasticSearchHttpRequest(HTTP_DELETE, deleteProjUrl, string()); +} + +bool ElasticSearch::deleteProjectChildren(int projectId, string json) { + string deteteProjectChildren = getElasticSearchDeleteByQueryUrl(mIndex, projectId); + return elasticSearchHttpRequest(HTTP_DELETE, deteteProjectChildren, json); +} + +bool ElasticSearch::addDataset(int projectId, int datasetId, string json) { + string url = getElasticSearchUpdateDocUrl(mIndex, mDatasetType, datasetId, projectId); + return elasticSearchHttpRequest(HTTP_POST, url, json); +} + +bool ElasticSearch::deleteDataset(int projectId, int datasetId) { + string deleteDatasetUrl = getElasticSearchDeleteDocUrl(mIndex, mDatasetType, datasetId, projectId); + return elasticSearchHttpRequest(HTTP_DELETE, deleteDatasetUrl, string()); +} + +bool ElasticSearch::deleteDatasetChildren(int projectId, int datasetId, string json) { + string deteteDatasetChildren = getElasticSearchDeleteByQueryUrl(mIndex, datasetId, projectId); + return elasticSearchHttpRequest(HTTP_DELETE, deteteDatasetChildren, json); +} + +string ElasticSearch::getElasticSearchUrlonIndex(string index) { + string str = mElasticAddr + "/" + index; + return str; +} + +string ElasticSearch::getElasticSearchUrlOnDoc(string index, string type, int doc) { + stringstream out; + out << getElasticSearchUrlonIndex(index) << "/" << type << "/" << doc; + return out.str(); +} + +string ElasticSearch::getElasticSearchUpdateDocUrl(string index, string type, int doc) { + string str = getElasticSearchUrlOnDoc(index, type, doc) + "/_update"; + return str; +} + +string ElasticSearch::getElasticSearchUpdateDocUrl(string index, string type, int doc, int parent) { + stringstream out; + out << getElasticSearchUpdateDocUrl(index, type, doc) << "?parent=" << parent; + return out.str(); +} + +string ElasticSearch::getElasticSearchDeleteDocUrl(string index, string type, int doc, int parent) { + stringstream out; + out << getElasticSearchUrlOnDoc(index, type, doc) << "?parent=" << parent; + return out.str(); +} + +string ElasticSearch::getElasticSearchDeleteByQueryUrl(string index, int routing) { + stringstream out; + out << getElasticSearchUrlonIndex(index) << "/_query" << "?routing=" << routing; + return out.str(); +} + +string ElasticSearch::getElasticSearchDeleteByQueryUrl(string index, int parent, int routing) { + stringstream out; + out << getElasticSearchDeleteByQueryUrl(index, routing) << "&parent=" << parent; + return out.str(); +} + +bool ElasticSearch::elasticSearchHttpRequest(HttpOp op, string elasticUrl, string json) { + ptime t1 = Utils::getCurrentTime(); + bool res = elasticSearchHttpRequestInternal(op, elasticUrl, json); + ptime t2 = Utils::getCurrentTime(); + LOG_INFO(getStr(op) << " " << elasticUrl << " [" << json.length() << "] took " << Utils::getTimeDiffInMilliseconds(t1, t2) << " msec"); + return res; +} + +bool ElasticSearch::elasticSearchHttpRequestInternal(HttpOp op, string elasticUrl, string json) { + //TODO: support retry if server is down + if (!mHttpHandle) { + mHttpHandle = curl_easy_init(); + } + + string opString; + string response; + + if (Logger::isTrace()) { + curl_easy_setopt(mHttpHandle, CURLOPT_VERBOSE, 1L); + } + + curl_easy_setopt(mHttpHandle, CURLOPT_TCP_KEEPALIVE, 1L); + curl_easy_setopt(mHttpHandle, CURLOPT_TCP_KEEPIDLE, 120L); + curl_easy_setopt(mHttpHandle, CURLOPT_TCP_KEEPINTVL, 60L); + + curl_easy_setopt(mHttpHandle, CURLOPT_URL, elasticUrl.c_str()); + curl_easy_setopt(mHttpHandle, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(mHttpHandle, CURLOPT_WRITEDATA, &response); + + switch (op) { + case HTTP_POST: + curl_easy_setopt(mHttpHandle, CURLOPT_POST, 1); + opString = "POST"; + break; + case HTTP_DELETE: + curl_easy_setopt(mHttpHandle, CURLOPT_CUSTOMREQUEST, "DELETE"); + opString = "DELETE"; + break; + } + + if (!json.empty()) { + curl_easy_setopt(mHttpHandle, CURLOPT_POSTFIELDS, json.c_str()); + curl_easy_setopt(mHttpHandle, CURLOPT_POSTFIELDSIZE, json.length()); + } + + CURLcode res = curl_easy_perform(mHttpHandle); + if (res != CURLE_OK) { + LOG_ERROR("CURL Failed: " << curl_easy_strerror(res)); + return false; + } + + LOG_TRACE(opString << " " << elasticUrl << endl + << json << endl << "Response::" << endl << response); + + return parseResponse(response); +} + +bool ElasticSearch::parseResponse(string response) { + try { + rapidjson::Document d; + if (!d.Parse<0>(response.c_str()).HasParseError()) { + if (d.HasMember("errors")) { + const rapidjson::Value &bulkErrors = d["errors"]; + if (bulkErrors.IsBool() && bulkErrors.GetBool()) { + const rapidjson::Value &items = d["items"]; + stringstream errors; + for (rapidjson::SizeType i = 0; i < items.Size(); ++i) { + const rapidjson::Value &obj = items[i]; + for (rapidjson::Value::ConstMemberIterator itr = obj.MemberBegin(); itr != obj.MemberEnd(); ++itr) { + const rapidjson::Value & opObj = itr->value; + if (opObj.HasMember("error")) { + const rapidjson::Value & error = opObj["error"]; + if (error.IsObject()) { + const rapidjson::Value & errorType = error["type"]; + const rapidjson::Value & errorReason = error["reason"]; + errors << errorType.GetString() << ":" << errorReason.GetString(); + } else if (error.IsString()) { + errors << error.GetString(); + } + errors << ", "; + } + } + } + string errorsStr = errors.str(); + LOG_ERROR(" ES got errors: " << errorsStr); + return false; + } + } else if (d.HasMember("error")) { + const rapidjson::Value &error = d["error"]; + if (error.IsObject()) { + const rapidjson::Value & errorType = error["type"]; + const rapidjson::Value & errorReason = error["reason"]; + LOG_ERROR(" ES got error: " << errorType.GetString() << ":" << errorReason.GetString()); + } else if (error.IsString()) { + LOG_ERROR(" ES got error: " << error.GetString()); + } + return false; + } + } else { + LOG_ERROR(" ES got json error (" << d.GetParseError() << ") while parsing " << response); + return false; + } + + } catch (std::exception &e) { + LOG_ERROR(e.what()); + return false; + } + return true; +} + +ElasticSearch::~ElasticSearch() { +} + diff --git a/src/FsMutationsDataReader.cpp b/src/FsMutationsDataReader.cpp index c3e231ee..b9167fcb 100644 --- a/src/FsMutationsDataReader.cpp +++ b/src/FsMutationsDataReader.cpp @@ -41,9 +41,9 @@ const int UG_ID_COL = 0; const int UG_NAME_COL = 1; FsMutationsDataReader::FsMutationsDataReader(MConn* connections, const int num_readers, - string elastic_ip, const bool hopsworks, const string elastic_index, const string elastic_inode_type, - ProjectDatasetINodeCache* cache, const int lru_cap) : NdbDataReader(connections, - num_readers, elastic_ip, hopsworks, elastic_index, elastic_inode_type, cache), mUsersCache(lru_cap, "User"), + const bool hopsworks, ElasticSearch* elastic ,ProjectDatasetINodeCache* cache, + const int lru_cap) : NdbDataReader(connections, num_readers, + hopsworks, elastic, cache), mUsersCache(lru_cap, "User"), mGroupsCache(lru_cap, "Group"){ } @@ -62,7 +62,7 @@ BatchStats FsMutationsDataReader::readData(MConn connection, Fmq* data_batch) { if (!json.empty()) { ptime t1 = getCurrentTime(); - bulkUpdateElasticSearch(json); + mElasticSearch->addBulk(json); ptime t2 = getCurrentTime(); rt.mElasticSearchTime = getTimeDiffInMilliseconds(t1, t2); } @@ -238,12 +238,6 @@ string FsMutationsDataReader::createJSON(Fmq* pending, Rows& inodes) { opWriter.String("delete"); opWriter.StartObject(); - opWriter.String("_index"); - opWriter.String(mElasticIndex.c_str()); - - opWriter.String("_type"); - opWriter.String(mElasticInodeType.c_str()); - if (mHopsworksEnalbed) { // set project (rounting) and dataset (parent) ids opWriter.String("_parent"); @@ -283,12 +277,6 @@ string FsMutationsDataReader::createJSON(Fmq* pending, Rows& inodes) { opWriter.String("update"); opWriter.StartObject(); - - opWriter.String("_index"); - opWriter.String(mElasticIndex.c_str()); - - opWriter.String("_type"); - opWriter.String(mElasticInodeType.c_str()); int projectId = -1; if(mHopsworksEnalbed){ diff --git a/src/Logger.cpp b/src/Logger.cpp index 87fbb893..e459263c 100644 --- a/src/Logger.cpp +++ b/src/Logger.cpp @@ -60,6 +60,10 @@ void Logger::fatal(const char* msg){ } } +bool Logger::isTrace() { + return mLoggerLevel == LOG_LEVEL_TRACE; +} + void Logger::log(const char* level, const char* msg){ cout << getTimestamp() << " <" << level << "> " << msg << endl; } diff --git a/src/MetadataReader.cpp b/src/MetadataReader.cpp index 075749b9..ad39134a 100644 --- a/src/MetadataReader.cpp +++ b/src/MetadataReader.cpp @@ -63,10 +63,9 @@ const int INODE_DATASET_LOOKUO_DATASET_ID_COL = 1; const int DONT_EXIST_INT = -1; const char* DONT_EXIST_STR = "-1"; -MetadataReader::MetadataReader(MConn* connections, const int num_readers, string elastic_ip, - const bool hopsworks, const string elastic_index, const string elastic_inode_type, const string elastic_ds_type, - ProjectDatasetINodeCache* cache, const int lru_cap) : NdbDataReader(connections, - num_readers, elastic_ip, hopsworks, elastic_index, elastic_inode_type, cache), mElasticDatasetType(elastic_ds_type), +MetadataReader::MetadataReader(MConn* connections, const int num_readers, const bool hopsworks, + ElasticSearch* elastic, ProjectDatasetINodeCache* cache, const int lru_cap) + : NdbDataReader(connections, num_readers, hopsworks, elastic, cache), mFieldsCache(lru_cap, "Field"), mTablesCache(lru_cap, "Table"), mTemplatesCache(lru_cap, "Template"){ } @@ -85,7 +84,7 @@ BatchStats MetadataReader::readData(MConn connection, Mq* data_batch) { if (!json.empty()) { ptime t1 = getCurrentTime(); - bulkUpdateElasticSearch(json); + mElasticSearch->addBulk(json); ptime t2 = getCurrentTime(); rt.mElasticSearchTime = getTimeDiffInMilliseconds(t1, t2); } @@ -366,8 +365,6 @@ string MetadataReader::createJSON(UIRowMap tuples, Mq* data_batch) { opWriter.String("update"); opWriter.StartObject(); - opWriter.String("_index"); - opWriter.String(mElasticIndex.c_str()); if(mHopsworksEnalbed){ int datasetId = mPDICache->getDatasetId(inodeId); @@ -378,13 +375,11 @@ string MetadataReader::createJSON(UIRowMap tuples, Mq* data_batch) { opWriter.String("_routing"); opWriter.Int(mPDICache->getProjectId(datasetId)); - const char* type = (datasetId == inodeId) ? mElasticDatasetType.c_str() : mElasticInodeType.c_str(); - opWriter.String("_type"); - opWriter.String(type); + if(datasetId == inodeId){ + opWriter.String("_type"); + opWriter.String(mElasticSearch->getDatasetType()); + } - }else{ - opWriter.String("_type"); - opWriter.String(mElasticInodeType.c_str()); } opWriter.String("_id"); diff --git a/src/Notifier.cpp b/src/Notifier.cpp index 888135d3..8f5d518b 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -27,11 +27,12 @@ Notifier::Notifier(const char* connection_string, const char* database_name, const char* meta_database_name, const int time_before_issuing_ndb_reqs, const int batch_size, const int poll_maxTimeToWait, const int num_ndb_readers, const string elastic_ip, const bool hopsworks, const string elastic_index, - const string elasttic_project_type, const string elastic_dataset_type, const string elastic_inode_type, const int lru_cap) + const string elasttic_project_type, const string elastic_dataset_type, const string elastic_inode_type, + const int elastic_batch_size, const int elastic_issue_time, const int lru_cap) : mDatabaseName(database_name), mMetaDatabaseName(meta_database_name), mTimeBeforeIssuingNDBReqs(time_before_issuing_ndb_reqs), mBatchSize(batch_size), mPollMaxTimeToWait(poll_maxTimeToWait), mNumNdbReaders(num_ndb_readers), mElasticAddr(elastic_ip), mHopsworksEnabled(hopsworks), mElasticIndex(elastic_index), mElastticProjectType(elasttic_project_type), mElasticDatasetType(elastic_dataset_type), - mElasticInodeType(elastic_inode_type), mLRUCap(lru_cap){ + mElasticInodeType(elastic_inode_type), mElasticBatchsize(elastic_batch_size), mElasticIssueTime(elastic_issue_time), mLRUCap(lru_cap){ mClusterConnection = connect_to_cluster(connection_string); setup(); } @@ -51,13 +52,19 @@ void Notifier::start() { mDatasetTableTailer->start(); } + mElasticSearch->start(); + mFsMutationsBatcher->waitToFinish(); mMetadataBatcher->waitToFinish(); + mElasticSearch->waitToFinish(); } void Notifier::setup() { mPDICache = new ProjectDatasetINodeCache(mLRUCap); + mElasticSearch = new ElasticSearch(mElasticAddr, mElasticIndex, mElastticProjectType, + mElasticDatasetType, mElasticInodeType, mElasticIssueTime, mElasticBatchsize); + Ndb* mutations_tailer_connection = create_ndb_connection(mDatabaseName); mFsMutationsTableTailer = new FsMutationsTableTailer(mutations_tailer_connection, mPollMaxTimeToWait, mPDICache); @@ -68,7 +75,7 @@ void Notifier::setup() { } mFsMutationsDataReader = new FsMutationsDataReader(mutations_connections, mNumNdbReaders, - mElasticAddr, mHopsworksEnabled, mElasticIndex, mElasticInodeType, mPDICache, mLRUCap); + mHopsworksEnabled, mElasticSearch, mPDICache, mLRUCap); mFsMutationsBatcher = new FsMutationsBatcher(mFsMutationsTableTailer, mFsMutationsDataReader, mTimeBeforeIssuingNDBReqs, mBatchSize); @@ -82,18 +89,18 @@ void Notifier::setup() { metadata_connections[i].metadataConnection = create_ndb_connection(mMetaDatabaseName); } - mMetadataReader = new MetadataReader(metadata_connections, mNumNdbReaders, mElasticAddr, - mHopsworksEnabled, mElasticIndex, mElasticInodeType, mElasticDatasetType, mPDICache, mLRUCap); + mMetadataReader = new MetadataReader(metadata_connections, mNumNdbReaders, + mHopsworksEnabled, mElasticSearch, mPDICache, mLRUCap); mMetadataBatcher = new MetadataBatcher(mMetadataTableTailer, mMetadataReader, mTimeBeforeIssuingNDBReqs, mBatchSize); if (mHopsworksEnabled) { Ndb* project_tailer_connection = create_ndb_connection(mMetaDatabaseName); mProjectTableTailer = new ProjectTableTailer(project_tailer_connection, mPollMaxTimeToWait, - mElasticAddr, mElasticIndex, mElastticProjectType, mPDICache); + mElasticSearch, mPDICache); Ndb* dataset_tailer_connection = create_ndb_connection(mMetaDatabaseName); mDatasetTableTailer = new DatasetTableTailer(dataset_tailer_connection, mPollMaxTimeToWait, - mElasticAddr, mElasticIndex, mElasticDatasetType, mPDICache); + mElasticSearch, mPDICache); } } diff --git a/src/ProjectTableTailer.cpp b/src/ProjectTableTailer.cpp index ffd2eb62..cd1e4b85 100644 --- a/src/ProjectTableTailer.cpp +++ b/src/ProjectTableTailer.cpp @@ -25,7 +25,6 @@ #include "ProjectTableTailer.h" using namespace Utils::NdbC; -using namespace Utils::ElasticSearch; const char* _project_table= "project"; const int _project_noCols= 5; @@ -46,23 +45,19 @@ const NdbDictionary::Event::TableEvent _project_events[_project_noEvents] = const WatchTable ProjectTableTailer::TABLE = {_project_table, _project_cols, _project_noCols , _project_events, _project_noEvents}; -ProjectTableTailer::ProjectTableTailer(Ndb* ndb, const int poll_maxTimeToWait, string elastic_addr, - const string elastic_index, const string elastic_project_type, ProjectDatasetINodeCache* cache) - : TableTailer(ndb, TABLE, poll_maxTimeToWait), mElasticAddr(elastic_addr), mElasticIndex(elastic_index), - mElasticProjectType(elastic_project_type), mPDICache(cache){ +ProjectTableTailer::ProjectTableTailer(Ndb* ndb, const int poll_maxTimeToWait, + ElasticSearch* elastic, ProjectDatasetINodeCache* cache) + : TableTailer(ndb, TABLE, poll_maxTimeToWait), mElasticSearch(elastic), mPDICache(cache){ } void ProjectTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]) { int id = value[0]->int32_value(); if(eventType == NdbDictionary::Event::TE_DELETE){ - string deleteDatasetUrl = getElasticSearchUrlOnDoc(mElasticAddr, mElasticIndex, mElasticProjectType, id); - if(elasticSearchDELETE(deleteDatasetUrl)){ + if(mElasticSearch->deleteProject(id)){ LOG_INFO("Delete Project[" << id << "]: Succeeded"); } - string deteteProjectChildren = getElasticSearchDeleteByQueryUrl(mElasticAddr, mElasticIndex, id); - rapidjson::StringBuffer sbDoc; rapidjson::Writer docWriter(sbDoc); docWriter.StartObject(); @@ -81,7 +76,7 @@ void ProjectTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, docWriter.EndObject(); //TODO: handle failures in elastic search - if(elasticSearchDELETE(deteteProjectChildren, string(sbDoc.GetString()))){ + if(mElasticSearch->deleteProjectChildren(id, string(sbDoc.GetString()))){ LOG_INFO("Delete Project[" << id << "] children inodes and datasets : Succeeded"); } @@ -115,8 +110,7 @@ void ProjectTableTailer::handleEvent(NdbDictionary::Event::TableEvent eventType, docWriter.EndObject(); string data = string(sbDoc.GetString()); - string url = getElasticSearchUpdateDocUrl(mElasticAddr, mElasticIndex, mElasticProjectType, id); - if(elasticSearchPOST(url, data)){ + if(mElasticSearch->addProject(id, data)){ LOG_INFO("Add Project[" << id << "]: Succeeded"); } } diff --git a/src/main.cpp b/src/main.cpp index bc0e1652..9c7cd210 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -34,7 +34,7 @@ int main(int argc, char** argv) { string meta_database_name = "hopsworks"; int wait_time = 1000; int ndb_batch = 5; - int poll_maxTimeToWait = 10000; + int poll_maxTimeToWait = 2000; int num_ndb_readers = 5; string elastic_addr = "localhost:9200"; int log_level = 2; @@ -44,6 +44,9 @@ int main(int argc, char** argv) { string elastic_project_type = "proj"; string elastic_dataset_type = "ds"; string elastic_inode_type = "inode"; + int elastic_batch_size = 5000; + int elastic_issue_time = 5000; + int lru_cap = DEFAULT_MAX_CAPACITY; po::options_description desc("Allowed options"); @@ -53,7 +56,7 @@ int main(int argc, char** argv) { ("database", po::value()->default_value(database_name), "database name.") ("meta_database", po::value()->default_value(meta_database_name), "database name for metadata") ("poll_maxTimeToWait", po::value()->default_value(wait_time), "max time to wait in miliseconds while waiting for events in pollEvents") - ("wait_time", po::value()->default_value(poll_maxTimeToWait), "time to wait in miliseconds before issuing the ndb request or the batch size reached") + ("wait_time", po::value()->default_value(poll_maxTimeToWait), "time to wait in miliseconds before issuing the ndb request if the batch size wasn't reached") ("ndb_batch", po::value()->default_value(ndb_batch), "batch size for reading from ndb") ("num_ndb_readers", po::value()->default_value(num_ndb_readers), "num of ndb reader threads") ("elastic_addr", po::value()->default_value(elastic_addr), "ip and port of the elasticsearch server") @@ -62,6 +65,8 @@ int main(int argc, char** argv) { ("project_type", po::value()->default_value(elastic_project_type), "Elastic type for projects, only used when hopsworks is enabled.") ("dataset_type", po::value()->default_value(elastic_dataset_type), "Elastic type for datasets, only used when hopsworks is enabled.") ("inode_type", po::value()->default_value(elastic_inode_type), "Elastic type for inodes.") + ("elastic_batch", po::value()->default_value(elastic_batch_size), "Elastic batch size in bytes for bulk requests") + ("ewait_time", po::value()->default_value(elastic_issue_time), "time to wait in miliseconds before issuing a bulk request to Elasticsearch if the batch size wasn't reached") ("lru_cap", po::value()->default_value(lru_cap), "LRU Cache max capacity") ("log_level", po::value()->default_value(log_level), "log level trace=0, debug=1, info=2, warn=3, error=4, fatal=5") ("version", "ePipe version") @@ -139,6 +144,14 @@ int main(int argc, char** argv) { elastic_inode_type = vm["inode_type"].as(); } + if (vm.count("elastic_batch")) { + elastic_batch_size = vm["elastic_batch"].as(); + } + + if (vm.count("ewait_time")) { + elastic_issue_time = vm["ewait_time"].as(); + } + if (vm.count("lru_cap")) { lru_cap = vm["lru_cap"].as(); } @@ -152,7 +165,7 @@ int main(int argc, char** argv) { Notifier *notifer = new Notifier(connection_string.c_str(), database_name.c_str(), meta_database_name.c_str(), wait_time, ndb_batch, poll_maxTimeToWait, num_ndb_readers, elastic_addr, hopsworks, elastic_index, - elastic_project_type, elastic_dataset_type, elastic_inode_type, lru_cap); + elastic_project_type, elastic_dataset_type, elastic_inode_type, elastic_batch_size, elastic_issue_time, lru_cap); notifer->start(); return EXIT_SUCCESS;