Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix search performance when creating a large number of channels #39

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace pvxs {
namespace client {

constexpr timeval bucketInterval{1,0};
constexpr timeval initialSearchDelay{0, 10000}; // 10 ms
constexpr size_t nBuckets = 30u;

// try not to fragment with usual MTU==1500
Expand Down Expand Up @@ -340,9 +341,9 @@ std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& cont
context->chanByName[namekey] = chan;

if(server.empty()) {
context->searchBuckets[context->currentBucket].push_back(chan);
context->initialSearchBucket.push_back(chan);

context->poke(true);
context->scheduleInitialSearch();

} else { // bypass search and connect so a specific server
chan->forcedServer = forceServer;
Expand Down Expand Up @@ -498,6 +499,8 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
event_new(tcp_loop.base, searchTx6.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this))
,searchTimer(__FILE__, __LINE__,
event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::tickSearchS, this))
,initialSearcher(__FILE__, __LINE__,
event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::initialSearchS, this))
,manager(UDPManager::instance(effective.shareUDP()))
,beaconCleaner(__FILE__, __LINE__,
event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this))
Expand Down Expand Up @@ -694,6 +697,18 @@ void ContextImpl::poke(bool force)
throw std::runtime_error("Unable to schedule searchTimer");
}

void ContextImpl::scheduleInitialSearch()
{
if (!initialSearchScheduled)
{
log_debug_printf(setup, "%s()\n", __func__);

initialSearchScheduled = true;
if (event_add(initialSearcher.get(), &initialSearchDelay))
throw std::runtime_error("Unable to schedule initialSearcher");
}
}

void ContextImpl::onBeacon(const UDPManager::Beacon& msg)
{
epicsTimeStamp now;
Expand Down Expand Up @@ -958,27 +973,38 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
}
}

void ContextImpl::tickSearch(bool discover)
void ContextImpl::tickSearch(SearchKind kind)
{
// If !discover, then this is a discovery ping.
// If kind == SearchKind::discover, then this is a discovery ping.
// these are really empty searches with must-reply set.
// So if !discover, then we should not be modifying any internal state
{
//
// If kind == SearchKind::initial we are sending the first search request
// for the channels in initalSearchBucket, and not resending requests for
// channels in the searchBuckets.
//
// If kind == SearchKind::check then we may have been poked.
if (kind == SearchKind::check) {
Guard G(pokeLock);
poked = false;
} else if (kind == SearchKind::initial) {
initialSearchScheduled = false;
}

auto idx = currentBucket;
if(!discover)
if(kind == SearchKind::check)
currentBucket = (currentBucket+1u)%searchBuckets.size();

log_debug_printf(io, "Search tick %zu\n", idx);

decltype (searchBuckets)::value_type bucket;
if(!discover)
if (kind == SearchKind::initial) {
initialSearchBucket.swap(bucket);
} else if(kind == SearchKind::check) {
searchBuckets[idx].swap(bucket);
}

while(!bucket.empty() || discover) {
while(!bucket.empty() || kind == SearchKind::discover) {
// when 'discover' we only loop once

searchMsg.resize(0x10000);
Expand All @@ -991,7 +1017,8 @@ void ContextImpl::tickSearch(bool discover)
// flags and reserved.
// initially flags[7] is cleared (bcast)
auto pflags = M.save();
to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
to_wire(M, uint8_t(kind == SearchKind::discover ?
pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
to_wire(M, uint8_t(0u));
to_wire(M, uint16_t(0u));

Expand All @@ -1004,7 +1031,7 @@ void ContextImpl::tickSearch(bool discover)
auto pport = M.save();
to_wire(M, uint16_t(searchRxPort));

if(discover) {
if(kind == SearchKind::discover) {
to_wire(M, uint8_t(0u));

} else {
Expand All @@ -1019,7 +1046,7 @@ void ContextImpl::tickSearch(bool discover)

bool payload = false;
while(!bucket.empty()) {
assert(!discover);
assert(kind != SearchKind::discover);

auto chan = bucket.front().lock();
if(!chan || chan->state!=Channel::Searching) {
Expand Down Expand Up @@ -1076,7 +1103,7 @@ void ContextImpl::tickSearch(bool discover)
}
assert(M.good());

if(!payload && !discover)
if(!payload && kind != SearchKind::discover)
break;

{
Expand Down Expand Up @@ -1144,23 +1171,32 @@ void ContextImpl::tickSearch(bool discover)
// fail silently, will retry
}

if(discover)
if(kind == SearchKind::discover)
break;
}

if(event_add(searchTimer.get(), &bucketInterval))
if(kind != SearchKind::initial && event_add(searchTimer.get(), &bucketInterval))
log_err_printf(setup, "Error re-enabling search timer on\n%s", "");
}

void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<ContextImpl*>(raw)->tickSearch(false);
static_cast<ContextImpl*>(raw)->tickSearch(SearchKind::check);
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what());
}
}

void ContextImpl::initialSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<ContextImpl*>(raw)->tickSearch(SearchKind::initial);
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in initial search callback: %s\n", e.what());
}
}

void ContextImpl::tickBeaconClean()
{
epicsTimeStamp now;
Expand Down
2 changes: 1 addition & 1 deletion src/clientdiscover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ std::shared_ptr<Operation> DiscoverBuilder::exec()
if(first && ping) {
log_debug_printf(setup, "Starting Discover%s", "\n");

context->tickSearch(true);
context->tickSearch(ContextImpl::SearchKind::discover);
}
});

Expand Down
14 changes: 13 additions & 1 deletion src/clientimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
epicsTimeStamp lastPoke{};
bool poked = false;

// unlike `poke`, `scheduleInitialSearch` is only ever called from the
// tcp_loop so this does not need to be guarded by a mutex
bool initialSearchScheduled = false;

// map: endpoint+proto -> Beaconer
typedef std::pair<SockAddr, std::string> BeaconServer;
struct BeaconInfo {
Expand All @@ -287,6 +291,9 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
std::vector<std::pair<SockEndpoint, bool>> searchDest;

size_t currentBucket = 0u;
// Channels where we have yet to send out an initial search request
std::list<std::weak_ptr<Channel>> initialSearchBucket;
// Channels where we are waiting for a search response
std::vector<std::list<std::weak_ptr<Channel>>> searchBuckets;

std::list<std::unique_ptr<UDPListener> > beaconRx;
Expand All @@ -304,6 +311,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
evbase tcp_loop;
const evevent searchRx4, searchRx6;
const evevent searchTimer;
const evevent initialSearcher;

// beacon handling done on UDP worker.
// we keep a ref here as long as beaconCleaner is in use
Expand All @@ -330,10 +338,14 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>

void onBeacon(const UDPManager::Beacon& msg);

void scheduleInitialSearch();

bool onSearch(evutil_socket_t fd);
static void onSearchS(evutil_socket_t fd, short evt, void *raw);
void tickSearch(bool discover);
enum class SearchKind { discover, initial, check };
void tickSearch(SearchKind kind);
static void tickSearchS(evutil_socket_t fd, short evt, void *raw);
static void initialSearchS(evutil_socket_t fd, short evt, void *raw);
void tickBeaconClean();
static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw);
void cacheClean(const std::string &name, Context::cacheAction force);
Expand Down