From 8312264fcbe5c45d169eb639893b5e495570d419 Mon Sep 17 00:00:00 2001 From: Mahmoud Ismail Date: Mon, 5 Nov 2018 18:16:39 +0100 Subject: [PATCH] [HOPSWORKS] [ePipe] Fix a race condition while reading from ndb --- include/ConcurrentPriorityQueue.h | 5 +++-- include/NdbDataReaders.h | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/include/ConcurrentPriorityQueue.h b/include/ConcurrentPriorityQueue.h index 69c70ffa..47697d93 100644 --- a/include/ConcurrentPriorityQueue.h +++ b/include/ConcurrentPriorityQueue.h @@ -32,7 +32,7 @@ class ConcurrentPriorityQueue { public: ConcurrentPriorityQueue(); void push(Data data); - void top(Data& result); + void pop(Data& result); void pop(); void wait_and_pop(Data &result); bool empty(); @@ -68,9 +68,10 @@ void ConcurrentPriorityQueue::wait_and_pop(Data& result) { } template -void ConcurrentPriorityQueue::top(Data& result) { +void ConcurrentPriorityQueue::pop(Data& result) { boost::mutex::scoped_lock lock(mLock); result = mQueue.top(); + mQueue.pop(); } template diff --git a/include/NdbDataReaders.h b/include/NdbDataReaders.h index aee9fced..cba64042 100644 --- a/include/NdbDataReaders.h +++ b/include/NdbDataReaders.h @@ -120,13 +120,13 @@ template void NdbDataReaders::processWaiting() { while(!mWaitingOutQueue->empty()){ Bulk out; - mWaitingOutQueue->top(out); + mWaitingOutQueue->pop(out); if(out.mProcessingIndex == mLastSent + 1){ LOG_INFO("publish enriched events with index [" << out.mProcessingIndex << "] to Elastic"); mElasticSearch->addData(out); mLastSent++; - mWaitingOutQueue->pop(); }else{ + mWaitingOutQueue->push(out); break; } }