Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace pthreads by std::thread #23

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
1f5eeab
use std::mutex, locks, and condition variables in assembled_chunk_rin…
dstndstn Jan 31, 2018
8c3840d
use std::mutex, locks, and condition variables in udp_packet_ringbuf
dstndstn Jan 31, 2018
b03b4bf
use std::mutex and condition_variable for intensity_network_stream st…
dstndstn Jan 31, 2018
305a11d
use std::mutex for intensity_network_stream event_lock and packet_his…
dstndstn Jan 31, 2018
c7617b7
use std::mutex for intensity_network_ostream state and statistics locks
dstndstn Feb 1, 2018
52da5fd
switch to std::thread for intensity_network_ostream network thread
dstndstn Feb 1, 2018
d5ff573
FIXME fixed
dstndstn Feb 1, 2018
b9b168f
add msgpack_binary_vector.hpp, defining a 'wire protocol' for vectors…
dstndstn Nov 13, 2018
1a8db72
Merge branch 'stdthread' into dstn-stdthread
dstndstn Nov 18, 2018
dbfa325
more pthreads->std::thread
dstndstn Nov 18, 2018
f6899b0
Merge branch 'relcand_latency' into dstn-injection
dstndstn Nov 27, 2018
01f1f5c
make first_packet_received an atomic bool; raise error if get_first_f…
dstndstn Nov 28, 2018
f94914d
add frame0_nano to get_statistics()
dstndstn Jan 11, 2019
ff5cb5c
intensity_network_stream : get_first_fpga_count: throw exception if b…
dstndstn Jan 18, 2019
0fe1b24
Merge branch 'dstn-injection' of github.com:CHIMEFRB/ch_frb_io into d…
dstndstn Jan 18, 2019
fbd63f9
set thread names
dstndstn Jan 25, 2019
44fd842
quiet about frame0_ctime fetching
dstndstn Jan 25, 2019
9c37095
change frame0_nano from uint64 to atomic<uint64>, now that RPC thread…
kmsmith137 Feb 5, 2019
5fbea08
Merge branch 'dstn-injection' into dstn-maskmon-injection
dstndstn Feb 23, 2019
11fb70e
initial shot at packet forking
dstndstn Jun 20, 2019
0a4d29a
revert beam numbers
dstndstn Jun 21, 2019
eba9f83
implement forking a single beam
dstndstn Jun 21, 2019
7ddfd74
allowing stopping all packet forking
dstndstn Jun 21, 2019
826a203
add pause and resume functions for packet forking
dstndstn Jun 21, 2019
00bbd34
debugging: log number of packets per assembled chunk
dstndstn Jun 24, 2019
5beb0f4
also increment packets-received counter in fast_assembled_chunk path
dstndstn Jun 24, 2019
b3295de
print packet forking list after updating
dstndstn Jun 24, 2019
f18bef4
copy socket init logic from primary packet socket
dstndstn Jun 24, 2019
55bad3a
also set buffer size and timeout for sending forked beam data
dstndstn Jun 24, 2019
1a02c90
collect some stats on forking socket send
dstndstn Jul 2, 2019
30bb916
retrieve socket errors too
dstndstn Jul 2, 2019
e9431e2
chlog rather than cout
dstndstn Jul 22, 2019
71a8ade
try SIOCOUTQ ioctl
dstndstn Jul 22, 2019
738eb10
oops, typo in start/end
dstndstn Jul 22, 2019
46f0cc0
report how long we take to process data
dstndstn Jul 22, 2019
feafea1
report time since last batch of sends
dstndstn Jul 22, 2019
91badfc
report if fewer bytes sent than desired
dstndstn Jul 25, 2019
2b2d722
allow forking the first 2 or 3 beams
dstndstn Jul 26, 2019
268eaf0
excessive logging of packet rates
dstndstn Jul 26, 2019
9c6ca9a
log timestamps along with packet rates
dstndstn Jul 26, 2019
03dc486
log packet rates at each flush of the buffer
dstndstn Jul 26, 2019
e88f2c3
retrieve the SNDBUF size
dstndstn Jul 26, 2019
15e9614
set send buffer size to 1MB -- seems to improve packet losses (puts t…
Jul 26, 2019
eadeb06
packet forking: try to fix malformed packets
dstndstn Jul 26, 2019
f86621c
quiet down some excessive logging
dstndstn Jul 26, 2019
6f69516
Merge pull request #26 from CHIMEFRB/dstn-maskmon-injection
dstndstn Sep 18, 2019
6b725c5
Merge pull request #27 from CHIMEFRB/dstn-maskmon-injection-dup
dstndstn Sep 18, 2019
78255e1
Merge branch 'dstn-master' into dstn-stdthread
dstndstn Sep 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CPP += -Ibitshuffle
INCFILES=ch_frb_io.hpp \
ch_frb_io_internals.hpp \
assembled_chunk_msgpack.hpp \
msgpack_binary_vector.hpp \
bitshuffle/bitshuffle.h bitshuffle/bitshuffle_core.h \
bitshuffle/bitshuffle_internals.h bitshuffle/iochain.h \
chlog.hpp
Expand Down
99 changes: 37 additions & 62 deletions assembled_chunk_ringbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ namespace ch_frb_io {
}; // pacify emacs c-mode!
#endif

// This is a lightweight scoped lock
typedef std::lock_guard<std::mutex> guard_t;
// This is also a scoped lock that supports use of a condition variable.
typedef std::unique_lock<std::mutex> ulock_t;

assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream::initializer &ini_params_, int beam_id_, int stream_id_) :
max_fpga_flushed(0),
max_fpga_retrieved(0),
first_fpgacount(0),
first_packet_received(false),
ini_params(ini_params_),
beam_id(beam_id_),
stream_id(stream_id_),
Expand All @@ -39,9 +44,6 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream:
throw runtime_error("ch_frb_io: the 'force_fast_kernels' flag was set, but this machine does not have the AVX2 instruction set");
#endif

pthread_mutex_init(&this->lock, NULL);
pthread_cond_init(&this->cond_assembled_chunks_added, NULL);

this->num_downsampling_levels = max(ini_params.telescoping_ringbuf_capacity.size(), 1UL);
this->ringbuf_pos.resize(num_downsampling_levels, 0);
this->ringbuf_size.resize(num_downsampling_levels, 0);
Expand All @@ -64,21 +66,13 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream:
this->_check_invariants();
}


assembled_chunk_ringbuf::~assembled_chunk_ringbuf()
{
pthread_cond_destroy(&this->cond_assembled_chunks_added);
pthread_mutex_destroy(&this->lock);
}


void assembled_chunk_ringbuf::set_frame0(uint64_t f0) {
frame0_nano = f0;
}

void assembled_chunk_ringbuf::print_state()
{
pthread_mutex_lock(&this->lock);
guard_t lock(mutx);

cout << "Beam " << beam_id << "\n";

Expand All @@ -96,35 +90,28 @@ void assembled_chunk_ringbuf::print_state()
cout << " " << this->ringbuf_entry(ids,ipos)->ichunk;
cout << " ]\n";
}

pthread_mutex_unlock(&this->lock);
}

shared_ptr<assembled_chunk>
assembled_chunk_ringbuf::find_assembled_chunk(uint64_t fpga_counts, bool top_level_only)
{
pthread_mutex_lock(&this->lock);
ulock_t lock(mutx);

// Return an empty pointer iff stream has ended, and chunk is requested past end-of-stream.
// (If anything else goes wrong, an exception will be thrown.)
if (this->doneflag && (fpga_counts >= this->final_fpga)) {
pthread_mutex_unlock(&this->lock);
if (this->doneflag && (fpga_counts >= this->final_fpga))
return shared_ptr<assembled_chunk> ();
}

// Scan telescoping ring buffer
int start_level = (top_level_only ? 0 : num_downsampling_levels-1);
for (int lev = start_level; lev >= 0; lev--) {
for (int ipos = ringbuf_pos[lev]; ipos < ringbuf_pos[lev] + ringbuf_size[lev]; ipos++) {
auto ch = this->ringbuf_entry(lev, ipos);
if (ch->fpga_begin == fpga_counts) {
pthread_mutex_unlock(&this->lock);
if (ch->fpga_begin == fpga_counts)
return ch;
}
}
}

pthread_mutex_unlock(&this->lock);
throw runtime_error("ch_frb_io::assembled_chunk::find_assembled_chunk(): couldn't find chunk, maybe your ring buffer is too small?");
}

Expand All @@ -135,7 +122,7 @@ assembled_chunk_ringbuf::get_ringbuf_snapshot(uint64_t min_fpga_counts, uint64_t
vector<pair<shared_ptr<assembled_chunk>, uint64_t>> ret;
ret.reserve(sum(ringbuf_capacity));

pthread_mutex_lock(&this->lock);
guard_t lock(mutx);

// Scan telescoping ring buffer, in a time-ordered way.
for (int ids = num_downsampling_levels-1; ids >= 0; ids--) {
Expand All @@ -154,8 +141,6 @@ assembled_chunk_ringbuf::get_ringbuf_snapshot(uint64_t min_fpga_counts, uint64_t
ret.push_back({ chunk, where });
}
}

pthread_mutex_unlock(&this->lock);
return ret;
}

Expand All @@ -169,7 +154,7 @@ void assembled_chunk_ringbuf::get_ringbuf_size(uint64_t *ringbuf_fpga_next,
uint64_t *ringbuf_fpga_max,
int level)
{
pthread_mutex_lock(&this->lock);
guard_t lock(mutx);

if (ringbuf_fpga_next && (level == 0)) {
*ringbuf_fpga_next = 0;
Expand Down Expand Up @@ -246,20 +231,17 @@ void assembled_chunk_ringbuf::get_ringbuf_size(uint64_t *ringbuf_fpga_next,
}
}
}

pthread_mutex_unlock(&this->lock);
}


void assembled_chunk_ringbuf::stream_to_files(const string &filename_pattern, int priority, bool need_rfi)
{
pthread_mutex_lock(&this->lock);
guard_t lock(mutx);
this->stream_pattern = filename_pattern;
this->stream_priority = priority;
this->stream_rfi_mask = need_rfi;
this->stream_chunks_written = 0;
this->stream_bytes_written = 0;
pthread_mutex_unlock(&this->lock);
}


Expand Down Expand Up @@ -365,17 +347,15 @@ void assembled_chunk_ringbuf::chunk_streamed(const std::string &filename) {
}

size_t len = st.st_size;
pthread_mutex_lock(&this->lock);
guard_t lock(mutx);
this->stream_chunks_written ++;
this->stream_bytes_written += len;
pthread_mutex_unlock(&this->lock);
}

void assembled_chunk_ringbuf::get_streamed_chunks(int &achunks, size_t &abytes) {
pthread_mutex_lock(&this->lock);
guard_t lock(mutx);
achunks = stream_chunks_written;
abytes = stream_bytes_written;
pthread_mutex_unlock(&this->lock);
}

// Helper function called assembler thread, to add a new assembled_chunk to the ring buffer.
Expand All @@ -388,6 +368,8 @@ bool assembled_chunk_ringbuf::_put_assembled_chunk(unique_ptr<assembled_chunk> &
if (chunk->has_rfi_mask)
throw runtime_error("ch_frb_io: internal error: chunk passed to assembled_chunk_ringbuf::_put_unassembled_packet() has rfi_mask flag set");

chlog("Assembled chunk " << chunk->ichunk << " beam " << beam_id << ": received " << chunk->packets_received << " packets");

// Step 1: prepare all data needed to modify the ring buffer. In this step, we do all of our
// buffer allocation and downsampling, without the lock held. In step 2, we will acquire the
// lock and modify the ring buffer (without expensive operations like allocation/downsampling).
Expand Down Expand Up @@ -458,12 +440,10 @@ bool assembled_chunk_ringbuf::_put_assembled_chunk(unique_ptr<assembled_chunk> &
// be added/removed at each level (pushlist/poplist), so we don't malloc/free/downsample with
// the lock held.

pthread_mutex_lock(&this->lock);

if (this->doneflag) {
pthread_mutex_unlock(&this->lock);
ulock_t lock(mutx);

if (this->doneflag)
throw runtime_error("ch_frb_io: internal error: assembled_chunk_ringbuf::put_unassembled_packet() called after end_stream()");
}

for (int ids = 0; ids < nds; ids++) {
// Number of chunks to be removed from level 'ids' of the telescoping ring buffer.
Expand Down Expand Up @@ -505,8 +485,8 @@ bool assembled_chunk_ringbuf::_put_assembled_chunk(unique_ptr<assembled_chunk> &
int loc_stream_priority = this->stream_priority;
bool loc_stream_rfi_mask = this->stream_rfi_mask;

pthread_cond_broadcast(&this->cond_assembled_chunks_added);
pthread_mutex_unlock(&this->lock);
this->cond_assembled_chunks_added.notify_all();
lock.unlock();

// Stream new chunk to disk (if 'stream_pattern' is a nonempty string).
// It's better to do this processing without the lock held, we just need to use
Expand Down Expand Up @@ -620,9 +600,11 @@ void assembled_chunk_ringbuf::_check_invariants()
// We do need to acquire the lock to access 'downstream_pos', since it's modified
// by the downstream thread.

pthread_mutex_lock(&lock);
int dpos = this->downstream_pos;
pthread_mutex_unlock(&lock);
int dpos;
{
guard_t lock(mutx);
dpos = this->downstream_pos;
}

ch_assert(downstream_bufsize > 0);
ch_assert(downstream_bufsize <= ringbuf_capacity[0]);
Expand Down Expand Up @@ -650,7 +632,7 @@ bool assembled_chunk_ringbuf::inject_assembled_chunk(assembled_chunk* chunk)
shared_ptr<assembled_chunk> assembled_chunk_ringbuf::get_assembled_chunk(bool wait)
{
shared_ptr<assembled_chunk> chunk;
pthread_mutex_lock(&this->lock);
ulock_t lock(mutx);

for (;;) {
if (downstream_pos < ringbuf_pos[0] + ringbuf_size[0]) {
Expand All @@ -666,16 +648,13 @@ shared_ptr<assembled_chunk> assembled_chunk_ringbuf::get_assembled_chunk(bool wa
break; // Ring buffer is empty and end_stream() has been called

// Wait for chunks to be added to the ring buffer.
pthread_cond_wait(&this->cond_assembled_chunks_added, &this->lock);
this->cond_assembled_chunks_added.wait(lock);
}

pthread_mutex_unlock(&this->lock);

if (chunk) {
assert(chunk->fpga_end > this->max_fpga_retrieved);
this->max_fpga_retrieved = chunk->fpga_end;
}

return chunk;
}

Expand All @@ -693,21 +672,17 @@ void assembled_chunk_ringbuf::end_stream(int64_t *event_counts)
this->_put_assembled_chunk(active_chunk0, event_counts);
this->_put_assembled_chunk(active_chunk1, event_counts);

pthread_mutex_lock(&this->lock);
{
ulock_t lock(mutx);
if (doneflag)
throw runtime_error("ch_frb_io: internal error: doneflag already set in assembled_chunk_ringbuf::end_stream()");

if (doneflag) {
pthread_mutex_unlock(&this->lock);
throw runtime_error("ch_frb_io: internal error: doneflag already set in assembled_chunk_ringbuf::end_stream()");
// With lock held
this->doneflag = true;
this->final_fpga = loc_final_fpga;
// Wake up processing thread, if it is waiting for data
this->cond_assembled_chunks_added.notify_all();
}

// Wake up processing thread, if it is waiting for data
pthread_cond_broadcast(&this->cond_assembled_chunks_added);

// With lock held
this->doneflag = true;
this->final_fpga = loc_final_fpga;

pthread_mutex_unlock(&this->lock);
}


Expand Down
1 change: 1 addition & 0 deletions avx2_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ void fast_assembled_chunk::add_packet(const intensity_packet &packet)

_add_packet_kernel(dst, src, nupfreq);
}
this->packets_received++;
}


Expand Down
45 changes: 31 additions & 14 deletions ch_frb_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class intensity_network_stream : noncopyable {

// Returns the first fpgacount of the first chunk sent downstream by
// the given beam id.
// Raises runtime_error if the first packet has not been received yet.
uint64_t get_first_fpga_count(int beam);

// Returns the last FPGA count processed by each of the assembler,
Expand Down Expand Up @@ -496,6 +497,11 @@ class intensity_network_stream : noncopyable {
// For debugging/testing: pretend a packet has just arrived.
void fake_packet_from(const struct sockaddr_in& sender, int nbytes);

void start_forking_packets(int beam, int destbeam, const struct sockaddr_in& dest);
void stop_forking_packets (int beam, int destbeam, const struct sockaddr_in& dest);
void pause_forking_packets();
void resume_forking_packets();

// stream_to_files(): for streaming incoming data to disk.
//
// 'filename_pattern': see assembled_chunk::format_filename below (empty string means "streaming disabled")
Expand Down Expand Up @@ -549,8 +555,8 @@ class intensity_network_stream : noncopyable {
std::atomic<uint64_t> assembler_thread_waiting_usec;
std::atomic<uint64_t> assembler_thread_working_usec;

// Initialized by assembler thread when first packet is received, constant thereafter.
uint64_t frame0_nano = 0; // nanosecond time() value for fgpacount zero.
// Initialized to zero by constructor, set to nonzero value by assembler thread when first packet is received.
std::atomic<uint64_t> frame0_nano; // nanosecond time() value for fgpacount zero.

char _pad1b[constants::cache_line_size];

Expand Down Expand Up @@ -581,31 +587,42 @@ class intensity_network_stream : noncopyable {
// but doesn't mean that it has actually shut down yet, it may still be reading packets.
// So far it hasn't been necessary to include a 'stream_ended' flag in the state model.

pthread_mutex_t state_lock;
pthread_cond_t cond_state_changed; // threads wait here for state to change

std::mutex state_mutex;
std::condition_variable cond_state_changed;
bool stream_started = false; // set asynchonously by calling start_stream()
bool stream_end_requested = false; // can be set asynchronously by calling end_stream(), or by network/assembler threads on exit
bool join_called = false; // set by calling join_threads()
bool threads_joined = false; // set when both threads (network + assembler) are joined
char _pad4[constants::cache_line_size];

pthread_mutex_t event_lock;
std::mutex event_mutex;
std::vector<int64_t> cumulative_event_counts;
std::shared_ptr<packet_counts> perhost_packets;

pthread_mutex_t packet_history_lock;
std::mutex packet_history_mutex;
std::map<double, std::shared_ptr<packet_counts> > packet_history;

// Streaming-related data (arguments to stream_to_files()).
std::mutex stream_lock; // FIXME need to convert pthread_mutex to std::mutex everywhere
std::mutex stream_lock;
std::string stream_filename_pattern;
std::vector<int> stream_beam_ids;
int stream_priority;
bool stream_rfi_mask;
int stream_chunks_written;
size_t stream_bytes_written;

struct packetfork {
int beam;
int destbeam;
struct sockaddr_in dest;
};

std::mutex forking_mutex;
std::vector<packetfork> forking_packets;
int forking_socket = 0;
std::atomic<bool> forking_paused;

// The actual constructor is protected, so it can be a helper function
// for intensity_network_stream::make(), but can't be called otherwise.
intensity_network_stream(const initializer &x);
Expand Down Expand Up @@ -1148,15 +1165,15 @@ class intensity_network_ostream : noncopyable {
std::string hostname;
uint16_t udp_port = constants::default_udp_port;

pthread_mutex_t statistics_lock;
std::mutex statistics_lock;
int64_t curr_timestamp = 0; // microseconds between first packet and most recent packet
int64_t npackets_sent = 0;
int64_t nbytes_sent = 0;

// State model.
pthread_t network_thread;
pthread_mutex_t state_lock;
pthread_cond_t cond_state_changed;
std::thread network_thread;
std::mutex state_lock;
std::condition_variable cond_state_changed;
bool network_thread_started = false;
bool network_thread_joined = false;

Expand All @@ -1170,8 +1187,8 @@ class intensity_network_ostream : noncopyable {
// for intensity_network_ostream::make(), but can't be called otherwise.
intensity_network_ostream(const initializer &ini_params);

static void *network_pthread_main(void *opaque_args);

void _network_thread_main();
void _network_thread_body();

// For testing purposes (eg, can create a subclass that randomly drops packets), a wrapper on the underlying packet send() function.
Expand Down
Loading