Skip to content

Commit

Permalink
insert bulks into elasticsearch asynchronously, use libcurl instead o…
Browse files Browse the repository at this point in the history
…f cppnetlib

populate dataset cache from databaase in case of cache miss
  • Loading branch information
Mahmoud Ismail committed Jun 13, 2016
1 parent bde1d89 commit 408102d
Show file tree
Hide file tree
Showing 21 changed files with 526 additions and 291 deletions.
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ project (ePipe)
cmake_minimum_required (VERSION 2.8)

set(EPIPE_VERSION_MAJOR 0)
set(EPIPE_VERSION_MINOR 0)
set(EPIPE_VERSION_MINOR 1)
set(EPIPE_VERSION_BUILD 1)
configure_file(Version.h.in ${CMAKE_SOURCE_DIR}/include/Version.h)

set(Boost_USE_STATIC_LIBS ON)

find_package(Boost 1.54 REQUIRED COMPONENTS system date_time program_options thread)
find_package(cppnetlib 0.11.1 REQUIRED uri client-connections)
find_package(CURL 7.35.0)
find_package(RapidJSON 1.0.2)

include_directories(${Boost_INCLUDE_DIRS})
include_directories(${CPPNETLIB_INCLUDE_DIRS})
include_directories(${RAPIDJSON_INCLUDE_DIRS})
include_directories(${CURL_INCLUDE_DIRS})
include_directories (${CMAKE_SOURCE_DIR}/include)

if(NOT NDB_DIR)
Expand All @@ -32,4 +32,4 @@ file(GLOB SOURCE ${CMAKE_SOURCE_DIR}/src/*.cpp)

add_executable(ePipe ${SOURCE})

target_link_libraries(ePipe ${Boost_LIBRARIES} ${CPPNETLIB_LIBRARIES} ndbclient pthread)
target_link_libraries(ePipe ${Boost_LIBRARIES} ${CURL_LIBRARIES} ndbclient pthread)
4 changes: 2 additions & 2 deletions include/Batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

class Batcher {
public:
Batcher(const int time_before_issuing_ndb_reqs, const int batch_size);
Batcher(const int time_to_wait, const int batch_size);
void start();
void waitToFinish();
virtual ~Batcher();
Expand All @@ -46,7 +46,7 @@ class Batcher {
bool mStarted;
bool mFirstTimer;

const int mTimeBeforeIssuingNDBReqs;
const int mTimeToWait;
boost::thread mTimerThread;

void startTimer();
Expand Down
10 changes: 5 additions & 5 deletions include/DatasetTableTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@

#include "TableTailer.h"
#include "ProjectDatasetINodeCache.h"
#include "ElasticSearch.h"

class DatasetTableTailer : public TableTailer{
public:
DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, string elastic_addr,
const string elastic_index, const string elastic_dataset_type, ProjectDatasetINodeCache* cache);
DatasetTableTailer(Ndb* ndb, const int poll_maxTimeToWait, ElasticSearch* elastic,
ProjectDatasetINodeCache* cache);
virtual ~DatasetTableTailer();

static void updateProjectIds(const NdbDictionary::Dictionary* database, NdbTransaction* transaction, UISet dataset_ids, ProjectDatasetINodeCache* cache);
private:
static const WatchTable TABLE;
virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]);
string mElasticAddr;
const string mElasticIndex;
const string mElasticDatasetType;

ElasticSearch* mElasticSearch;
ProjectDatasetINodeCache* mPDICache;
};

Expand Down
85 changes: 85 additions & 0 deletions include/ElasticSearch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/

/*
* File: ElasticSearch.h
* Author: Mahmoud Ismail<[email protected]>
*
* Created on June 9, 2016, 1:39 PM
*/

#ifndef ELASTICSEARCH_H
#define ELASTICSEARCH_H
#include "Batcher.h"
#include "ConcurrentQueue.h"
#include <curl/curl.h>
#include "rapidjson/document.h"

enum HttpOp {
HTTP_POST,
HTTP_DELETE
};

class ElasticSearch : public Batcher{
public:
ElasticSearch(string elastic_addr, string index, string proj_type, string ds_type,
string inode_type, int time_to_wait_before_inserting, int bulk_size);

void addBulk(string json);

bool addProject(int projectId, string json);
bool deleteProject(int projectId);
bool deleteProjectChildren(int projectId, string json);

bool addDataset(int projectId, int datasetId, string json);
bool deleteDataset(int projectId, int datasetId);
bool deleteDatasetChildren(int projectId, int datasetId, string json);

const char* getIndex();
const char* getProjectType();
const char* getDatasetType();
const char* getINodeType();

virtual ~ElasticSearch();
private:
const string mIndex;
const string mProjectType;
const string mDatasetType;
const string mInodeType;

string mElasticAddr;
string mElasticBulkAddr;

ConcurrentQueue<string> mQueue;

mutable boost::mutex mLock;
stringstream mToProcess;
int mToProcessLength;

CURL* mHttpHandle;

void addToProcess(string str);
string resetToProcess();

virtual void run();
virtual void processBatch();

string getElasticSearchBulkUrl();
string getElasticSearchUrlonIndex(string index);
string getElasticSearchUrlOnDoc(string index, string type, int doc);
string getElasticSearchUpdateDocUrl(string index, string type, int doc);
string getElasticSearchUpdateDocUrl(string index, string type, int doc, int parent);
string getElasticSearchDeleteDocUrl(string index, string type, int doc, int parent);
string getElasticSearchDeleteByQueryUrl(string index, int routing);
string getElasticSearchDeleteByQueryUrl(string index, int parent, int routing);

bool elasticSearchHttpRequest(HttpOp op, string elasticUrl, string json);
bool elasticSearchHttpRequestInternal(HttpOp op, string elasticUrl, string json);
bool parseResponse(string response);
};

#endif /* ELASTICSEARCH_H */

5 changes: 2 additions & 3 deletions include/FsMutationsDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ typedef vector<Row> Rows;

class FsMutationsDataReader : public NdbDataReader<FsMutationRow, MConn>{
public:
FsMutationsDataReader(MConn* connections, const int num_readers, string elastic_ip,
const bool hopsworks, const string elastic_index, const string elastic_inode_type,
ProjectDatasetINodeCache* cache, const int lru_cap);
FsMutationsDataReader(MConn* connections, const int num_readers, const bool hopsworks,
ElasticSearch* elastic, ProjectDatasetINodeCache* cache, const int lru_cap);
virtual ~FsMutationsDataReader();
private:
Cache<int, string> mUsersCache;
Expand Down
1 change: 1 addition & 0 deletions include/Logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Logger{
static void error(const char* msg);
static void fatal(const char* msg);

static bool isTrace();
private:
static Logger* mInstance;
static int mLoggerLevel;
Expand Down
6 changes: 2 additions & 4 deletions include/MetadataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ struct Table{

class MetadataReader : public NdbDataReader<MetadataEntry, MConn>{
public:
MetadataReader(MConn* connections, const int num_readers, string elastic_ip,
const bool hopsworks, const string elastic_index, const string elastic_inode_type,
const string elastic_ds_type, ProjectDatasetINodeCache* cache, const int lru_cap);
MetadataReader(MConn* connections, const int num_readers, const bool hopsworks,
ElasticSearch* elastic, ProjectDatasetINodeCache* cache, const int lru_cap);
virtual ~MetadataReader();
private:
virtual ptime getEventCreationTime(MetadataEntry entry);
Expand All @@ -75,7 +74,6 @@ class MetadataReader : public NdbDataReader<MetadataEntry, MConn>{

string createJSON(UIRowMap tuples, Mq* data_batch);

const string mElasticDatasetType;
Cache<int, Field> mFieldsCache;
Cache<int, Table> mTablesCache;
Cache<int, string> mTemplatesCache;
Expand Down
30 changes: 7 additions & 23 deletions include/NdbDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "Cache.h"
#include "Utils.h"
#include "ProjectDatasetINodeCache.h"
#include "ElasticSearch.h"

#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics/stats.hpp>
Expand All @@ -42,7 +43,6 @@ namespace bc = boost::accumulators;
typedef bc::accumulator_set<double, bc::stats<bc::tag::mean, bc::tag::min, bc::tag::max> > Accumulator;

using namespace Utils;
using namespace Utils::ElasticSearch;
using namespace Utils::NdbC;

struct BatchStats{
Expand All @@ -65,17 +65,13 @@ struct MConn{
template<typename Data, typename Conn>
class NdbDataReader {
public:
NdbDataReader(Conn* connections, const int num_readers, string elastic_addr,
const bool hopsworks, const string elastic_index, const string elastic_inode_type,
ProjectDatasetINodeCache* cache);
NdbDataReader(Conn* connections, const int num_readers, const bool hopsworks,
ElasticSearch* elastic, ProjectDatasetINodeCache* cache);
void start();
void processBatch(vector<Data>* data_batch);
virtual ~NdbDataReader();

private:
const string mElasticAddr;
string mElasticBulkUrl;

bool mStarted;
std::vector<boost::thread* > mThreads;

Expand All @@ -87,39 +83,27 @@ class NdbDataReader {
const int mNumReaders;
Conn* mNdbConnections;
const bool mHopsworksEnalbed;
const string mElasticIndex;
const string mElasticInodeType;
ElasticSearch* mElasticSearch;
ProjectDatasetINodeCache* mPDICache;
//Accumulator mQueuingAcc;
//Accumulator mBatchingAcc;
//Accumulator mProcessingAcc;

virtual ptime getEventCreationTime(Data row) = 0;
virtual BatchStats readData(Conn connection, vector<Data>* data_batch) = 0;
void bulkUpdateElasticSearch(string json);
string getAccString(Accumulator acc);
};


template<typename Data, typename Conn>
NdbDataReader<Data, Conn>::NdbDataReader(Conn* connections, const int num_readers,
string elastic_ip, const bool hopsworks, const string elastic_index,
const string elastic_inode_type, ProjectDatasetINodeCache* cache)
: mElasticAddr(elastic_ip), mNumReaders(num_readers), mNdbConnections(connections),
mHopsworksEnalbed(hopsworks), mElasticIndex(elastic_index), mElasticInodeType(elastic_inode_type),
mPDICache(cache){
const bool hopsworks,ElasticSearch* elastic, ProjectDatasetINodeCache* cache)
: mNumReaders(num_readers), mNdbConnections(connections),
mHopsworksEnalbed(hopsworks), mElasticSearch(elastic), mPDICache(cache){
mStarted = false;
mElasticBulkUrl = getElasticSearchBulkUrl(mElasticAddr);
mBatchedQueue = new ConcurrentQueue<vector<Data>*>();
}

template<typename Data, typename Conn>
void NdbDataReader<Data, Conn>::bulkUpdateElasticSearch(string json) {
if(!elasticSearchPOST(mElasticBulkUrl, json)){
//TODO: handle elasticsearch failures
}
}

template<typename Data, typename Conn>
void NdbDataReader<Data, Conn>::start() {
if (mStarted) {
Expand Down
8 changes: 7 additions & 1 deletion include/Notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
#include "MetadataBatcher.h"
#include "ProjectTableTailer.h"
#include "DatasetTableTailer.h"
#include "ElasticSearch.h"

class Notifier {
public:
Notifier(const char* connection_string, const char* database_name, const char* meta_database_name,
const int time_before_issuing_ndb_reqs, const int batch_size,
const int poll_maxTimeToWait, const int num_ndb_readers, const string elastic_addr,
const bool hopsworks, const string elastic_index, const string elasttic_project_type,
const string elastic_dataset_type, const string elastic_inode_type, const int lru_cap);
const string elastic_dataset_type, const string elastic_inode_type, const int elastic_batch_size,
const int elastic_issue_time, const int lru_cap);
void start();
virtual ~Notifier();

Expand All @@ -56,8 +58,12 @@ class Notifier {
const string mElastticProjectType;
const string mElasticDatasetType;
const string mElasticInodeType;
const int mElasticBatchsize;
const int mElasticIssueTime;
const int mLRUCap;

ElasticSearch* mElasticSearch;

FsMutationsTableTailer* mFsMutationsTableTailer;
FsMutationsDataReader* mFsMutationsDataReader;
FsMutationsBatcher* mFsMutationsBatcher;
Expand Down
10 changes: 5 additions & 5 deletions include/ProjectTableTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@

#include "TableTailer.h"
#include "ProjectDatasetINodeCache.h"
#include "ElasticSearch.h"

class ProjectTableTailer : public TableTailer{
public:
ProjectTableTailer(Ndb* ndb, const int poll_maxTimeToWait, string elastic_addr,
const string elastic_index, const string elastic_project_type, ProjectDatasetINodeCache* cache);
ProjectTableTailer(Ndb* ndb, const int poll_maxTimeToWait, ElasticSearch* elastic,
ProjectDatasetINodeCache* cache);
virtual ~ProjectTableTailer();

private:
static const WatchTable TABLE;

virtual void handleEvent(NdbDictionary::Event::TableEvent eventType, NdbRecAttr* preValue[], NdbRecAttr* value[]);
string mElasticAddr;
const string mElasticIndex;
const string mElasticProjectType;

ElasticSearch* mElasticSearch;
ProjectDatasetINodeCache* mPDICache;
};

Expand Down
Loading

0 comments on commit 408102d

Please sign in to comment.