diff --git a/Makefile b/Makefile index 45ac5f2..4a3228b 100644 --- a/Makefile +++ b/Makefile @@ -57,6 +57,7 @@ CPP += -Ibitshuffle INCFILES=ch_frb_io.hpp \ ch_frb_io_internals.hpp \ assembled_chunk_msgpack.hpp \ + msgpack_binary_vector.hpp \ bitshuffle/bitshuffle.h bitshuffle/bitshuffle_core.h \ bitshuffle/bitshuffle_internals.h bitshuffle/iochain.h \ chlog.hpp diff --git a/assembled_chunk_ringbuf.cpp b/assembled_chunk_ringbuf.cpp index f8fbb3e..c4bef97 100644 --- a/assembled_chunk_ringbuf.cpp +++ b/assembled_chunk_ringbuf.cpp @@ -9,11 +9,16 @@ namespace ch_frb_io { }; // pacify emacs c-mode! #endif +// This is a lightweight scoped lock +typedef std::lock_guard guard_t; +// This is also a scoped lock that supports use of a condition variable. +typedef std::unique_lock ulock_t; assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream::initializer &ini_params_, int beam_id_, int stream_id_) : max_fpga_flushed(0), max_fpga_retrieved(0), first_fpgacount(0), + first_packet_received(false), ini_params(ini_params_), beam_id(beam_id_), stream_id(stream_id_), @@ -39,9 +44,6 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream: throw runtime_error("ch_frb_io: the 'force_fast_kernels' flag was set, but this machine does not have the AVX2 instruction set"); #endif - pthread_mutex_init(&this->lock, NULL); - pthread_cond_init(&this->cond_assembled_chunks_added, NULL); - this->num_downsampling_levels = max(ini_params.telescoping_ringbuf_capacity.size(), 1UL); this->ringbuf_pos.resize(num_downsampling_levels, 0); this->ringbuf_size.resize(num_downsampling_levels, 0); @@ -64,21 +66,13 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream: this->_check_invariants(); } - -assembled_chunk_ringbuf::~assembled_chunk_ringbuf() -{ - pthread_cond_destroy(&this->cond_assembled_chunks_added); - pthread_mutex_destroy(&this->lock); -} - - void assembled_chunk_ringbuf::set_frame0(uint64_t f0) { frame0_nano = f0; } void assembled_chunk_ringbuf::print_state() { - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); cout << "Beam " << beam_id << "\n"; @@ -96,35 +90,28 @@ void assembled_chunk_ringbuf::print_state() cout << " " << this->ringbuf_entry(ids,ipos)->ichunk; cout << " ]\n"; } - - pthread_mutex_unlock(&this->lock); } shared_ptr assembled_chunk_ringbuf::find_assembled_chunk(uint64_t fpga_counts, bool top_level_only) { - pthread_mutex_lock(&this->lock); + ulock_t lock(mutx); // Return an empty pointer iff stream has ended, and chunk is requested past end-of-stream. // (If anything else goes wrong, an exception will be thrown.) - if (this->doneflag && (fpga_counts >= this->final_fpga)) { - pthread_mutex_unlock(&this->lock); + if (this->doneflag && (fpga_counts >= this->final_fpga)) return shared_ptr (); - } // Scan telescoping ring buffer int start_level = (top_level_only ? 0 : num_downsampling_levels-1); for (int lev = start_level; lev >= 0; lev--) { for (int ipos = ringbuf_pos[lev]; ipos < ringbuf_pos[lev] + ringbuf_size[lev]; ipos++) { auto ch = this->ringbuf_entry(lev, ipos); - if (ch->fpga_begin == fpga_counts) { - pthread_mutex_unlock(&this->lock); + if (ch->fpga_begin == fpga_counts) return ch; - } } } - pthread_mutex_unlock(&this->lock); throw runtime_error("ch_frb_io::assembled_chunk::find_assembled_chunk(): couldn't find chunk, maybe your ring buffer is too small?"); } @@ -135,7 +122,7 @@ assembled_chunk_ringbuf::get_ringbuf_snapshot(uint64_t min_fpga_counts, uint64_t vector, uint64_t>> ret; ret.reserve(sum(ringbuf_capacity)); - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); // Scan telescoping ring buffer, in a time-ordered way. for (int ids = num_downsampling_levels-1; ids >= 0; ids--) { @@ -154,8 +141,6 @@ assembled_chunk_ringbuf::get_ringbuf_snapshot(uint64_t min_fpga_counts, uint64_t ret.push_back({ chunk, where }); } } - - pthread_mutex_unlock(&this->lock); return ret; } @@ -169,7 +154,7 @@ void assembled_chunk_ringbuf::get_ringbuf_size(uint64_t *ringbuf_fpga_next, uint64_t *ringbuf_fpga_max, int level) { - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); if (ringbuf_fpga_next && (level == 0)) { *ringbuf_fpga_next = 0; @@ -246,20 +231,17 @@ void assembled_chunk_ringbuf::get_ringbuf_size(uint64_t *ringbuf_fpga_next, } } } - - pthread_mutex_unlock(&this->lock); } void assembled_chunk_ringbuf::stream_to_files(const string &filename_pattern, int priority, bool need_rfi) { - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); this->stream_pattern = filename_pattern; this->stream_priority = priority; this->stream_rfi_mask = need_rfi; this->stream_chunks_written = 0; this->stream_bytes_written = 0; - pthread_mutex_unlock(&this->lock); } @@ -365,17 +347,15 @@ void assembled_chunk_ringbuf::chunk_streamed(const std::string &filename) { } size_t len = st.st_size; - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); this->stream_chunks_written ++; this->stream_bytes_written += len; - pthread_mutex_unlock(&this->lock); } void assembled_chunk_ringbuf::get_streamed_chunks(int &achunks, size_t &abytes) { - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); achunks = stream_chunks_written; abytes = stream_bytes_written; - pthread_mutex_unlock(&this->lock); } // Helper function called assembler thread, to add a new assembled_chunk to the ring buffer. @@ -388,6 +368,8 @@ bool assembled_chunk_ringbuf::_put_assembled_chunk(unique_ptr & if (chunk->has_rfi_mask) throw runtime_error("ch_frb_io: internal error: chunk passed to assembled_chunk_ringbuf::_put_unassembled_packet() has rfi_mask flag set"); + chlog("Assembled chunk " << chunk->ichunk << " beam " << beam_id << ": received " << chunk->packets_received << " packets"); + // Step 1: prepare all data needed to modify the ring buffer. In this step, we do all of our // buffer allocation and downsampling, without the lock held. In step 2, we will acquire the // lock and modify the ring buffer (without expensive operations like allocation/downsampling). @@ -458,12 +440,10 @@ bool assembled_chunk_ringbuf::_put_assembled_chunk(unique_ptr & // be added/removed at each level (pushlist/poplist), so we don't malloc/free/downsample with // the lock held. - pthread_mutex_lock(&this->lock); - - if (this->doneflag) { - pthread_mutex_unlock(&this->lock); + ulock_t lock(mutx); + + if (this->doneflag) throw runtime_error("ch_frb_io: internal error: assembled_chunk_ringbuf::put_unassembled_packet() called after end_stream()"); - } for (int ids = 0; ids < nds; ids++) { // Number of chunks to be removed from level 'ids' of the telescoping ring buffer. @@ -505,8 +485,8 @@ bool assembled_chunk_ringbuf::_put_assembled_chunk(unique_ptr & int loc_stream_priority = this->stream_priority; bool loc_stream_rfi_mask = this->stream_rfi_mask; - pthread_cond_broadcast(&this->cond_assembled_chunks_added); - pthread_mutex_unlock(&this->lock); + this->cond_assembled_chunks_added.notify_all(); + lock.unlock(); // Stream new chunk to disk (if 'stream_pattern' is a nonempty string). // It's better to do this processing without the lock held, we just need to use @@ -620,9 +600,11 @@ void assembled_chunk_ringbuf::_check_invariants() // We do need to acquire the lock to access 'downstream_pos', since it's modified // by the downstream thread. - pthread_mutex_lock(&lock); - int dpos = this->downstream_pos; - pthread_mutex_unlock(&lock); + int dpos; + { + guard_t lock(mutx); + dpos = this->downstream_pos; + } ch_assert(downstream_bufsize > 0); ch_assert(downstream_bufsize <= ringbuf_capacity[0]); @@ -650,7 +632,7 @@ bool assembled_chunk_ringbuf::inject_assembled_chunk(assembled_chunk* chunk) shared_ptr assembled_chunk_ringbuf::get_assembled_chunk(bool wait) { shared_ptr chunk; - pthread_mutex_lock(&this->lock); + ulock_t lock(mutx); for (;;) { if (downstream_pos < ringbuf_pos[0] + ringbuf_size[0]) { @@ -666,16 +648,13 @@ shared_ptr assembled_chunk_ringbuf::get_assembled_chunk(bool wa break; // Ring buffer is empty and end_stream() has been called // Wait for chunks to be added to the ring buffer. - pthread_cond_wait(&this->cond_assembled_chunks_added, &this->lock); + this->cond_assembled_chunks_added.wait(lock); } - pthread_mutex_unlock(&this->lock); - if (chunk) { assert(chunk->fpga_end > this->max_fpga_retrieved); this->max_fpga_retrieved = chunk->fpga_end; } - return chunk; } @@ -693,21 +672,17 @@ void assembled_chunk_ringbuf::end_stream(int64_t *event_counts) this->_put_assembled_chunk(active_chunk0, event_counts); this->_put_assembled_chunk(active_chunk1, event_counts); - pthread_mutex_lock(&this->lock); + { + ulock_t lock(mutx); + if (doneflag) + throw runtime_error("ch_frb_io: internal error: doneflag already set in assembled_chunk_ringbuf::end_stream()"); - if (doneflag) { - pthread_mutex_unlock(&this->lock); - throw runtime_error("ch_frb_io: internal error: doneflag already set in assembled_chunk_ringbuf::end_stream()"); + // With lock held + this->doneflag = true; + this->final_fpga = loc_final_fpga; + // Wake up processing thread, if it is waiting for data + this->cond_assembled_chunks_added.notify_all(); } - - // Wake up processing thread, if it is waiting for data - pthread_cond_broadcast(&this->cond_assembled_chunks_added); - - // With lock held - this->doneflag = true; - this->final_fpga = loc_final_fpga; - - pthread_mutex_unlock(&this->lock); } diff --git a/avx2_kernels.cpp b/avx2_kernels.cpp index 13d9581..96822a3 100644 --- a/avx2_kernels.cpp +++ b/avx2_kernels.cpp @@ -936,6 +936,7 @@ void fast_assembled_chunk::add_packet(const intensity_packet &packet) _add_packet_kernel(dst, src, nupfreq); } + this->packets_received++; } diff --git a/ch_frb_io.hpp b/ch_frb_io.hpp index b2402af..f41b391 100644 --- a/ch_frb_io.hpp +++ b/ch_frb_io.hpp @@ -468,6 +468,7 @@ class intensity_network_stream : noncopyable { // Returns the first fpgacount of the first chunk sent downstream by // the given beam id. + // Raises runtime_error if the first packet has not been received yet. uint64_t get_first_fpga_count(int beam); // Returns the last FPGA count processed by each of the assembler, @@ -496,6 +497,11 @@ class intensity_network_stream : noncopyable { // For debugging/testing: pretend a packet has just arrived. void fake_packet_from(const struct sockaddr_in& sender, int nbytes); + void start_forking_packets(int beam, int destbeam, const struct sockaddr_in& dest); + void stop_forking_packets (int beam, int destbeam, const struct sockaddr_in& dest); + void pause_forking_packets(); + void resume_forking_packets(); + // stream_to_files(): for streaming incoming data to disk. // // 'filename_pattern': see assembled_chunk::format_filename below (empty string means "streaming disabled") @@ -549,8 +555,8 @@ class intensity_network_stream : noncopyable { std::atomic assembler_thread_waiting_usec; std::atomic assembler_thread_working_usec; - // Initialized by assembler thread when first packet is received, constant thereafter. - uint64_t frame0_nano = 0; // nanosecond time() value for fgpacount zero. + // Initialized to zero by constructor, set to nonzero value by assembler thread when first packet is received. + std::atomic frame0_nano; // nanosecond time() value for fgpacount zero. char _pad1b[constants::cache_line_size]; @@ -581,24 +587,24 @@ class intensity_network_stream : noncopyable { // but doesn't mean that it has actually shut down yet, it may still be reading packets. // So far it hasn't been necessary to include a 'stream_ended' flag in the state model. - pthread_mutex_t state_lock; - pthread_cond_t cond_state_changed; // threads wait here for state to change - + std::mutex state_mutex; + std::condition_variable cond_state_changed; + bool stream_started = false; // set asynchonously by calling start_stream() bool stream_end_requested = false; // can be set asynchronously by calling end_stream(), or by network/assembler threads on exit bool join_called = false; // set by calling join_threads() bool threads_joined = false; // set when both threads (network + assembler) are joined char _pad4[constants::cache_line_size]; - pthread_mutex_t event_lock; + std::mutex event_mutex; std::vector cumulative_event_counts; std::shared_ptr perhost_packets; - pthread_mutex_t packet_history_lock; + std::mutex packet_history_mutex; std::map > packet_history; // Streaming-related data (arguments to stream_to_files()). - std::mutex stream_lock; // FIXME need to convert pthread_mutex to std::mutex everywhere + std::mutex stream_lock; std::string stream_filename_pattern; std::vector stream_beam_ids; int stream_priority; @@ -606,6 +612,17 @@ class intensity_network_stream : noncopyable { int stream_chunks_written; size_t stream_bytes_written; + struct packetfork { + int beam; + int destbeam; + struct sockaddr_in dest; + }; + + std::mutex forking_mutex; + std::vector forking_packets; + int forking_socket = 0; + std::atomic forking_paused; + // The actual constructor is protected, so it can be a helper function // for intensity_network_stream::make(), but can't be called otherwise. intensity_network_stream(const initializer &x); @@ -1148,15 +1165,15 @@ class intensity_network_ostream : noncopyable { std::string hostname; uint16_t udp_port = constants::default_udp_port; - pthread_mutex_t statistics_lock; + std::mutex statistics_lock; int64_t curr_timestamp = 0; // microseconds between first packet and most recent packet int64_t npackets_sent = 0; int64_t nbytes_sent = 0; // State model. - pthread_t network_thread; - pthread_mutex_t state_lock; - pthread_cond_t cond_state_changed; + std::thread network_thread; + std::mutex state_lock; + std::condition_variable cond_state_changed; bool network_thread_started = false; bool network_thread_joined = false; @@ -1170,8 +1187,8 @@ class intensity_network_ostream : noncopyable { // for intensity_network_ostream::make(), but can't be called otherwise. intensity_network_ostream(const initializer &ini_params); - static void *network_pthread_main(void *opaque_args); - + void _network_thread_main(); + void _network_thread_body(); // For testing purposes (eg, can create a subclass that randomly drops packets), a wrapper on the underlying packet send() function. diff --git a/ch_frb_io_internals.hpp b/ch_frb_io_internals.hpp index de4c630..d20e00f 100644 --- a/ch_frb_io_internals.hpp +++ b/ch_frb_io_internals.hpp @@ -140,6 +140,11 @@ struct intensity_packet { const float *weights, int beam_wstride, int freq_wstride, float wt_cutoff); + // sets up my pointers to point into the given data array (which + // must be large enough to hold the data); returns the number of + // bytes used. This allows the "dest" array to be send as the + // data packet. + int set_pointers(uint8_t *dest); // Currently used only for debugging int find_coarse_freq_id(int id) const; @@ -203,9 +208,9 @@ struct udp_packet_ringbuf : noncopyable { const int max_npackets_per_list; const int max_nbytes_per_list; - pthread_mutex_t lock; - pthread_cond_t cond_packets_added; - pthread_cond_t cond_packets_removed; + std::mutex mutx; + std::condition_variable cond_packets_added; + std::condition_variable cond_packets_removed; bool stream_ended = false; int ringbuf_size = 0; @@ -213,7 +218,6 @@ struct udp_packet_ringbuf : noncopyable { std::vector > ringbuf; udp_packet_ringbuf(int ringbuf_capacity, int max_npackets_per_list, int max_nbytes_per_list); - ~udp_packet_ringbuf(); // Note! The pointer 'p' is _swapped_ with an empty udp_packet_list from the ring buffer. // In other words, when put_packet_list() returns, the argument 'p' points to an empty udp_packet_list. @@ -254,10 +258,11 @@ class assembled_chunk_ringbuf : noncopyable, std::atomic max_fpga_retrieved; // The fpgacount of the first chunk produced by this stream std::atomic first_fpgacount; - - assembled_chunk_ringbuf(const intensity_network_stream::initializer &ini_params, int beam_id, int stream_id); - ~assembled_chunk_ringbuf(); + // Set to 'true' in the first call to put_unassembled_packet(). + std::atomic first_packet_received; + + assembled_chunk_ringbuf(const intensity_network_stream::initializer &ini_params, int beam_id, int stream_id); // Called by assembler thread, to "assemble" an intensity_packet into the appropriate assembled_chunk. // The length-(intensity_network_stream::event_type::num_types) event_counts array is incremented @@ -347,9 +352,6 @@ class assembled_chunk_ringbuf : noncopyable, output_device_pool output_devices; - // Set to 'true' in the first call to put_unassembled_packet(). - bool first_packet_received = false; - // Helper function called in assembler thread, to add a new assembled_chunk to the ring buffer. // Resets 'chunk' to a null pointer. // Warning: only safe to call from assembler thread. @@ -376,10 +378,10 @@ class assembled_chunk_ringbuf : noncopyable, char pad[constants::cache_line_size]; // All fields below are protected by the lock - pthread_mutex_t lock; + std::mutex mutx; // Processing thread waits here if the ring buffer is empty. - pthread_cond_t cond_assembled_chunks_added; + std::condition_variable cond_assembled_chunks_added; // Telescoping ring buffer. // All ringbuf* vectors have length num_downsampling_levels. diff --git a/intensity_network_ostream.cpp b/intensity_network_ostream.cpp index fd2ab69..0598651 100644 --- a/intensity_network_ostream.cpp +++ b/intensity_network_ostream.cpp @@ -21,6 +21,10 @@ namespace ch_frb_io { #endif +// This is a lightweight scoped lock +typedef std::lock_guard guard_t; +// This is also a scoped lock that supports use of a condition variable. +typedef std::unique_lock ulock_t; // ------------------------------------------------------------------------------------------------- // @@ -30,24 +34,15 @@ namespace ch_frb_io { // static member function shared_ptr intensity_network_ostream::make(const initializer &ini_params_) { - intensity_network_ostream *retp = new intensity_network_ostream(ini_params_); - shared_ptr ret(retp); + shared_ptr ret(new intensity_network_ostream(ini_params_)); ret->_open_socket(); - // Spawn network thread. Note that we pass a bare pointer to an object ('ret') on our stack - // and this pointer will be invalid after make() returns. Therefore, the network thread only - // dereferences the pointer before setting the network_thread_started flag, and make() waits for this - // flag to be set before it returns. + ret->network_thread = thread(std::bind(&intensity_network_ostream::_network_thread_main, ret)); - int err = pthread_create(&ret->network_thread, NULL, intensity_network_ostream::network_pthread_main, (void *) &ret); - if (err < 0) - throw runtime_error(string("ch_frb_io: pthread_create() failed in intensity_network_ostream constructor: ") + strerror(errno)); - - pthread_mutex_lock(&ret->state_lock); + ulock_t lock(ret->state_lock); while (!ret->network_thread_started) - pthread_cond_wait(&ret->cond_state_changed, &ret->state_lock); - pthread_mutex_unlock(&ret->state_lock); + ret->cond_state_changed.wait(lock); return ret; } @@ -159,10 +154,6 @@ intensity_network_ostream::intensity_network_ostream(const initializer &ini_para for (int i = 0; i < nfreq_coarse_per_chunk; i++) coarse_freq_ids_16bit[i] = uint16_t(ini_params.coarse_freq_ids[i]); - pthread_mutex_init(&this->state_lock, NULL); - pthread_mutex_init(&this->statistics_lock, NULL); - pthread_cond_init(&this->cond_state_changed, NULL); - int capacity = constants::output_ringbuf_capacity; this->ringbuf = make_unique (capacity, npackets_per_chunk, nbytes_per_chunk); this->tmp_packet_list = make_unique (npackets_per_chunk, nbytes_per_chunk); @@ -175,10 +166,6 @@ intensity_network_ostream::~intensity_network_ostream() close(sockfd); sockfd = -1; } - - pthread_cond_destroy(&cond_state_changed); - pthread_mutex_destroy(&state_lock); - pthread_mutex_destroy(&statistics_lock); } @@ -324,17 +311,14 @@ void intensity_network_ostream::end_stream(bool join_network_thread) if (!join_network_thread) return; - pthread_mutex_lock(&this->state_lock); + ulock_t lock(this->state_lock); - if (network_thread_joined) { - pthread_mutex_unlock(&this->state_lock); + if (network_thread_joined) throw runtime_error("ch_frb_io: attempt to join ostream output thread twice"); - } - - pthread_mutex_unlock(&this->state_lock); - if (pthread_join(network_thread, NULL)) - throw runtime_error("ch_frb_io: couldn't join network thread [output]"); + lock.unlock(); + + network_thread.join(); } @@ -343,38 +327,26 @@ void intensity_network_ostream::end_stream(bool join_network_thread) // Network write thread -// static member function -void *intensity_network_ostream::network_pthread_main(void *opaque_arg) +void intensity_network_ostream::_network_thread_main() { - if (!opaque_arg) - throw runtime_error("ch_frb_io: internal error: NULL opaque pointer passed to network_pthread_main()"); - - // Note that the arg/opaque_arg pointer is only dereferenced here, for reasons explained in a comment in make() above. - shared_ptr *arg = (shared_ptr *) opaque_arg; - shared_ptr stream = *arg; - - if (!stream) - throw runtime_error("ch_frb_io: internal error: empty shared_ptr passed to network_pthread_main()"); - try { - stream->_network_thread_body(); + _network_thread_body(); } catch (exception &e) { cout << e.what() << endl; - stream->end_stream(false); // "false" means "don't join threads" (would deadlock otherwise!) + end_stream(false); // "false" means "don't join threads" (would deadlock otherwise!) throw; } - - stream->end_stream(false); // "false" has same meaning as above - return NULL; + end_stream(false); // "false" has same meaning as above } void intensity_network_ostream::_network_thread_body() { - pthread_mutex_lock(&state_lock); - network_thread_started = true; - pthread_cond_broadcast(&cond_state_changed); - pthread_mutex_unlock(&state_lock); + { + ulock_t lock(state_lock); + network_thread_started = true; + cond_state_changed.notify_all(); + } auto packet_list = make_unique (npackets_per_chunk, nbytes_per_chunk); @@ -449,11 +421,12 @@ void intensity_network_ostream::_network_thread_body() npackets_tot++; // Keep thread-shared variables in sync with thread-locals, as described above. - pthread_mutex_lock(&statistics_lock); - this->curr_timestamp = tstamp; - this->nbytes_sent = nbytes_tot; - this->npackets_sent = npackets_tot; - pthread_mutex_unlock(&statistics_lock); + { + guard_t lock(statistics_lock); + this->curr_timestamp = tstamp; + this->nbytes_sent = nbytes_tot; + this->npackets_sent = npackets_tot; + } } } @@ -471,11 +444,10 @@ ssize_t intensity_network_ostream::_send(int socket, const uint8_t* packet, int void intensity_network_ostream::get_statistics(int64_t& curr_timestamp, int64_t& npackets_sent, int64_t& nbytes_sent) { - pthread_mutex_lock(&statistics_lock); + guard_t lock(statistics_lock); curr_timestamp = this->curr_timestamp; npackets_sent = this->npackets_sent; nbytes_sent = this->nbytes_sent; - pthread_mutex_unlock(&statistics_lock); } diff --git a/intensity_network_stream.cpp b/intensity_network_stream.cpp index bed6d0c..14e7290 100644 --- a/intensity_network_stream.cpp +++ b/intensity_network_stream.cpp @@ -4,6 +4,11 @@ #include #include +#if defined(__linux__) +// for forking stats (SIOCOUTQ) +#include +#endif + #include #include #include @@ -26,6 +31,11 @@ namespace ch_frb_io { // // class intensity_network_stream +// This is a lightweight scoped lock +typedef std::lock_guard guard_t; +// This is also a scoped lock that supports use of a condition variable. +typedef std::unique_lock ulock_t; + // Static member function (de facto constructor) shared_ptr intensity_network_stream::make(const initializer &x) @@ -56,7 +66,8 @@ intensity_network_stream::intensity_network_stream(const initializer &ini_params frame0_nano(0), stream_priority(0), stream_chunks_written(0), - stream_bytes_written(0) + stream_bytes_written(0), + forking_paused(false) { // Argument checking @@ -136,21 +147,11 @@ intensity_network_stream::intensity_network_stream(const initializer &ini_params network_thread_perhost_packets = make_shared(); perhost_packets = make_shared(); - - pthread_mutex_init(&state_lock, NULL); - pthread_mutex_init(&event_lock, NULL); - pthread_mutex_init(&packet_history_lock, NULL); - pthread_cond_init(&cond_state_changed, NULL); } intensity_network_stream::~intensity_network_stream() { - pthread_cond_destroy(&cond_state_changed); - pthread_mutex_destroy(&state_lock); - pthread_mutex_destroy(&packet_history_lock); - pthread_mutex_destroy(&event_lock); - if (sockfd >= 0) { close(sockfd); sockfd = -1; @@ -196,10 +197,11 @@ void intensity_network_stream::_add_event_counts(vector &event_subcount if (cumulative_event_counts.size() != event_subcounts.size()) throw runtime_error("ch_frb_io: internal error: vector length mismatch in intensity_network_stream::_add_event_counts()"); - pthread_mutex_lock(&this->event_lock); - for (unsigned int i = 0; i < cumulative_event_counts.size(); i++) - this->cumulative_event_counts[i] += event_subcounts[i]; - pthread_mutex_unlock(&this->event_lock); + { + guard_t lock(this->event_mutex); + for (unsigned int i = 0; i < cumulative_event_counts.size(); i++) + this->cumulative_event_counts[i] += event_subcounts[i]; + } memset(&event_subcounts[0], 0, event_subcounts.size() * sizeof(event_subcounts[0])); } @@ -207,17 +209,14 @@ void intensity_network_stream::_add_event_counts(vector &event_subcount void intensity_network_stream::start_stream() { - pthread_mutex_lock(&this->state_lock); + ulock_t lock(this->state_mutex); - if (stream_end_requested || join_called) { - pthread_mutex_unlock(&this->state_lock); + if (stream_end_requested || join_called) throw runtime_error("ch_frb_io: intensity_network_stream::start_stream() called on completed or cancelled stream"); - } // If stream has already been started, this is not treated as an error. this->stream_started = true; - pthread_cond_broadcast(&this->cond_state_changed); - pthread_mutex_unlock(&this->state_lock); + this->cond_state_changed.notify_all(); } @@ -230,41 +229,36 @@ void intensity_network_stream::start_stream() void intensity_network_stream::end_stream() { - pthread_mutex_lock(&this->state_lock); + ulock_t lock(this->state_mutex); this->stream_started = true; this->stream_end_requested = true; - pthread_cond_broadcast(&this->cond_state_changed); - pthread_mutex_unlock(&this->state_lock); + this->cond_state_changed.notify_all(); } void intensity_network_stream::join_threads() { - pthread_mutex_lock(&this->state_lock); + ulock_t lock(this->state_mutex); - if (!stream_started) { - pthread_mutex_unlock(&this->state_lock); + if (!stream_started) throw runtime_error("ch_frb_io: intensity_network_stream::join_threads() was called with no prior call to start_stream()"); - } if (join_called) { while (!threads_joined) - pthread_cond_wait(&this->cond_state_changed, &this->state_lock); - pthread_mutex_unlock(&this->state_lock); + this->cond_state_changed.wait(lock); return; } this->join_called = true; - pthread_cond_broadcast(&this->cond_state_changed); - pthread_mutex_unlock(&this->state_lock); + this->cond_state_changed.notify_all(); + lock.unlock(); network_thread.join(); assembler_thread.join(); - pthread_mutex_lock(&this->state_lock); + lock.lock(); this->threads_joined = true; - pthread_cond_broadcast(&this->cond_state_changed); - pthread_mutex_unlock(&this->state_lock); + this->cond_state_changed.notify_all(); } @@ -350,9 +344,8 @@ vector intensity_network_stream::get_event_counts() { vector ret(event_type::num_types, 0); - pthread_mutex_lock(&this->event_lock); + guard_t lock(this->event_mutex); memcpy(&ret[0], &this->cumulative_event_counts[0], ret.size() * sizeof(ret[0])); - pthread_mutex_unlock(&this->event_lock); return ret; } @@ -360,10 +353,9 @@ vector intensity_network_stream::get_event_counts() unordered_map intensity_network_stream::get_perhost_packets() { // Quickly grab a copy of perhost_packets - pthread_mutex_lock(&this->event_lock); + ulock_t lock(this->event_mutex); packet_counts pc(*perhost_packets); - pthread_mutex_unlock(&this->event_lock); - + lock.unlock(); return pc.to_string(); } @@ -456,9 +448,10 @@ shared_ptr intensity_network_stream::get_packet_rates(double start, double period) { // This returns a single history entry. vector > counts; - pthread_mutex_lock(&this->packet_history_lock); - _get_history(start, start+period, packet_history, counts); - pthread_mutex_unlock(&this->packet_history_lock); + { + guard_t lock(this->packet_history_mutex); + _get_history(start, start+period, packet_history, counts); + } // FIXME -- if *period* is specified, we could sum over the requested period... if (counts.size() == 0) return shared_ptr(); @@ -480,9 +473,8 @@ intensity_network_stream::get_packet_rates(double start, double period) { vector > intensity_network_stream::get_packet_rate_history(double start, double end, double period) { vector > counts; - pthread_mutex_lock(&this->packet_history_lock); + guard_t lock(this->packet_history_mutex); _get_history(start, end, packet_history, counts); - pthread_mutex_unlock(&this->packet_history_lock); return counts; } @@ -491,15 +483,13 @@ void intensity_network_stream::fake_packet_from(const struct sockaddr_in& sender // network thread so is not lock-protected, but when updating the // history this is the lock used before reading the // network_thread_perhost_packets, so it should work... - pthread_mutex_lock(&this->event_lock); + guard_t lock(this->event_mutex); network_thread_perhost_packets->increment(sender, nbytes); network_thread_perhost_packets->tv = xgettimeofday(); cumulative_event_counts[event_type::packet_received] ++; cumulative_event_counts[event_type::packet_good] ++; cumulative_event_counts[event_type::byte_received] += nbytes; - - pthread_mutex_unlock(&this->event_lock); } vector > @@ -514,6 +504,7 @@ intensity_network_stream::get_statistics() { m["nupfreq"] = ini_params.nupfreq; m["nt_per_packet"] = ini_params.nt_per_packet; m["fpga_counts_per_sample"] = ini_params.fpga_counts_per_sample; + m["frame0_nano"] = frame0_nano; m["fpga_count"] = 0; // XXX FIXME XXX m["network_thread_waiting_usec"] = network_thread_waiting_usec; m["network_thread_working_usec"] = network_thread_working_usec; @@ -652,9 +643,12 @@ uint64_t intensity_network_stream::get_first_fpga_count(int beam) { // Which of my assemblers (if any) is handling the requested beam? int nbeams = this->ini_params.beam_ids.size(); for (int i=0; iini_params.beam_ids[i] == beam) + if (this->ini_params.beam_ids[i] == beam) { + if (!this->assemblers[i]->first_packet_received) + throw runtime_error("ch_frb_io: get_first_fpga_count called, but first packet has not been received yet."); return this->assemblers[i]->first_fpgacount; - return 0; + } + throw runtime_error("ch_frb_io internal error: beam_id not found in intensity_network_stream::get_first_fpga_count()"); } void intensity_network_stream::get_max_fpga_count_seen(vector &flushed, @@ -716,6 +710,8 @@ void intensity_network_stream::network_thread_main() // We use try..catch to ensure that _network_thread_exit() always gets called, even if an exception is thrown. // We also print the exception so that it doesn't get "swallowed". + chime_log_set_thread_name("Network-" + std::to_string(ini_params.stream_id)); + try { _network_thread_body(); // calls pin_thread_to_cores() } catch (exception &e) { @@ -730,21 +726,20 @@ void intensity_network_stream::network_thread_main() void intensity_network_stream::_network_thread_body() { pin_thread_to_cores(ini_params.network_thread_cores); - pthread_mutex_lock(&this->state_lock); + ulock_t lock(this->state_mutex); // Wait for "stream_started" for (;;) { - if (this->stream_end_requested) { + if (this->stream_end_requested) // This case can arise if end_stream() is called early - pthread_mutex_unlock(&this->state_lock); return; - } if (this->stream_started) { - pthread_mutex_unlock(&this->state_lock); + lock.unlock(); break; } - pthread_cond_wait(&this->cond_state_changed, &this->state_lock); + this->cond_state_changed.wait(lock); } + // unlocked at this point. // Sleep hack (a temporary kludge that will go away soon) @@ -810,20 +805,23 @@ void intensity_network_stream::_network_thread_body() // the previous counts added to the history list last_packet_counts->tv = tv_ini; + //uint64_t rate_logging_timestamp = 0; + //uint64_t rate_nbytes = 0; + //uint64_t rate_npackets = 0; + for (;;) { uint64_t timestamp; // Periodically check whether stream has been cancelled by end_stream(). if (curr_timestamp > cancellation_check_timestamp + ini_params.stream_cancellation_latency_usec) { - pthread_mutex_lock(&this->state_lock); + lock.lock(); if (this->stream_end_requested) { - pthread_mutex_unlock(&this->state_lock); + lock.unlock(); _network_flush_packets(); return; } - - pthread_mutex_unlock(&this->state_lock); + lock.unlock(); // We call _add_event_counts() in a few different places in this routine, to ensure that // the network thread's event counts are always regularly accumulated. @@ -843,7 +841,7 @@ void intensity_network_stream::_network_thread_body() _update_packet_rates(last_packet_counts); packet_history_timestamp = curr_timestamp; } - + timestamp = usec_between(tv_ini, xgettimeofday()); network_thread_working_usec += (timestamp - curr_timestamp); @@ -898,8 +896,23 @@ void intensity_network_stream::_network_thread_body() incoming_packet_list->add_packet(packet_nbytes); - if (incoming_packet_list->is_full) + //rate_nbytes += packet_nbytes; + //rate_npackets++; + + if (incoming_packet_list->is_full) { _network_flush_packets(); + + /* + // Log our packet receive rate each flush! + float dt = (float)(curr_timestamp - rate_logging_timestamp) / 1e6; + chlogf("Packet receive rate: t %.3f : %.3f packets/sec, %.0f bits/sec", + curr_timestamp * 1e-6, + (float)rate_npackets / dt, (float)rate_nbytes * 8 / dt); + rate_logging_timestamp = curr_timestamp; + rate_nbytes = 0; + rate_npackets = 0; + */ + } } } @@ -910,23 +923,22 @@ void intensity_network_stream::_network_flush_packets() this->_add_event_counts(network_thread_event_subcounts); // Update the "perhost_packets" counter from "network_thread_perhost_packets" - pthread_mutex_lock(&this->event_lock); + guard_t lock(this->event_mutex); perhost_packets->update(*network_thread_perhost_packets); perhost_packets->tv = network_thread_perhost_packets->tv; - pthread_mutex_unlock(&this->event_lock); } // This gets called from the network thread to update the "perhost_packets" counter from "network_thread_perhost_packets". void intensity_network_stream::_update_packet_rates(std::shared_ptr last_packet_counts) { std::shared_ptr this_packet_counts; - pthread_mutex_lock(&this->event_lock); - perhost_packets->update(*network_thread_perhost_packets); - perhost_packets->tv = network_thread_perhost_packets->tv; - // deep copy - this_packet_counts = make_shared(*perhost_packets); - pthread_mutex_unlock(&this->event_lock); - + { + guard_t lock(this->event_mutex); + perhost_packets->update(*network_thread_perhost_packets); + perhost_packets->tv = network_thread_perhost_packets->tv; + // deep copy + this_packet_counts = make_shared(*perhost_packets); + } // Build new packet_counts structure with differences vs last_ shared_ptr count_diff = make_shared(); count_diff->tv = this_packet_counts->tv; @@ -939,16 +951,16 @@ void intensity_network_stream::_update_packet_rates(std::shared_ptrcounts[it.first] = it.second - it2->second; } - // last_packet_count = this_packet_counts last_packet_counts.reset(); last_packet_counts.swap(this_packet_counts); // Add *count_diff* to the packet history - pthread_mutex_lock(&this->packet_history_lock); - if (packet_history.size() >= (size_t)ini_params.max_packet_history_size) - packet_history.erase(--packet_history.end()); - packet_history.insert(make_pair(count_diff->start_time(), count_diff)); - pthread_mutex_unlock(&this->packet_history_lock); + { + guard_t lock(this->packet_history_mutex); + if (packet_history.size() >= (size_t)ini_params.max_packet_history_size) + packet_history.erase(--packet_history.end()); + packet_history.insert(make_pair(count_diff->start_time(), count_diff)); + } } // This gets called when the network thread exits (on all exit paths). @@ -1004,6 +1016,8 @@ void intensity_network_stream::assembler_thread_main() { // We use try..catch to ensure that _assembler_thread_exit() always gets called, even if an exception is thrown. // We also print the exception so that it doesn't get "swallowed". + chime_log_set_thread_name("Assembler-" + std::to_string(ini_params.stream_id)); + try { _assembler_thread_body(); // calls pin_thread_to_cores() } catch (exception &e) { @@ -1014,6 +1028,70 @@ void intensity_network_stream::assembler_thread_main() { _assembler_thread_exit(); } +void intensity_network_stream::start_forking_packets(int beam, int destbeam, const struct sockaddr_in& dest) { + unique_lock ulock(forking_mutex); + //if (forking_packets.size() == 0) { + if (forking_socket == 0) { + forking_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (forking_socket < 0) + throw runtime_error(string("ch_frb_io: couldn't create udp socket for forking: ") + strerror(errno)); + int flags = fcntl(forking_socket, F_GETFD); + flags |= FD_CLOEXEC; + if (fcntl(forking_socket, F_SETFD, flags) < 0) + throw runtime_error(string("ch_frb_io: couldn't set close-on-exec flag on packet-forking socket file descriptor") + strerror(errno)); + + // bufsize + //if (setsockopt(forking_socket, SOL_SOCKET, SO_SNDBUF, (void *) &ini_params.socket_bufsize, sizeof(ini_params.socket_bufsize)) < 0) + int bufsize = 1024*1024; + if (setsockopt(forking_socket, SOL_SOCKET, SO_SNDBUF, (void *) &bufsize, sizeof(bufsize)) < 0) + throw runtime_error(string("ch_frb_io: setsockopt(SO_SNDBUF) failed for forking socket: ") + strerror(errno)); + // timeout + const struct timeval tv_timeout = { 0, ini_params.socket_timeout_usec }; + if (setsockopt(forking_socket, SOL_SOCKET, SO_SNDTIMEO, &tv_timeout, sizeof(tv_timeout)) < 0) + throw runtime_error(string("ch_frb_io: setsockopt(SO_SNDTIMEO) failed for forking socket: ") + strerror(errno)); + } + intensity_network_stream::packetfork pf; + // FIXME -- check that we actually hold the requested beam!! + // FIXME -- check that the dest beam is valid! + pf.beam = beam; + pf.destbeam = destbeam; + pf.dest = dest; + forking_packets.push_back(pf); + + cout << "Forking packet list:" << endl; + for (auto it = forking_packets.begin(); it != forking_packets.end(); it++) + cout << " beam " << it->beam << " -> " << it->destbeam << " to " << + inet_ntoa(it->dest.sin_addr) << " port " << ntohs(it->dest.sin_port) << endl; +} + +void intensity_network_stream::stop_forking_packets(int beam, int destbeam, const struct sockaddr_in& dest) { + unique_lock ulock(forking_mutex); + if ((beam == -1) && (destbeam == -1)) + // Stop all! + forking_packets.clear(); + else + for (auto it = forking_packets.begin(); it != forking_packets.end(); it++) + if ((it->beam == beam) && (it->destbeam == destbeam) && + (it->dest.sin_port == dest.sin_port) && + (it->dest.sin_addr.s_addr == dest.sin_addr.s_addr)) { + forking_packets.erase(it); + it--; + } + + cout << "Forking packet list:" << endl; + for (auto it = forking_packets.begin(); it != forking_packets.end(); it++) + cout << " beam " << it->beam << " -> " << it->destbeam << " to " << + inet_ntoa(it->dest.sin_addr) << " port " << ntohs(it->dest.sin_port) << endl; +} + +void intensity_network_stream::pause_forking_packets() { + forking_paused = true; +} + +void intensity_network_stream::resume_forking_packets() { + forking_paused = false; +} + void intensity_network_stream::_assembler_thread_body() { pin_thread_to_cores(ini_params.assembler_thread_cores); @@ -1028,9 +1106,12 @@ void intensity_network_stream::_assembler_thread_body() int64_t *event_subcounts = &this->assembler_thread_event_subcounts[0]; + uint8_t* forked_packet_data = reinterpret_cast(malloc(16384)); + struct timeval tva, tvb; tva = xgettimeofday(); - + + struct timeval tvf = tva; // Main packet loop while (1) { @@ -1055,105 +1136,301 @@ void intensity_network_stream::_assembler_thread_body() tva = xgettimeofday(); assembler_thread_waiting_usec += usec_between(tvb, tva); - for (int ipacket = 0; ipacket < packet_list->curr_npackets; ipacket++) { - uint8_t *packet_data = packet_list->get_packet_data(ipacket); - int packet_nbytes = packet_list->get_packet_nbytes(ipacket); - intensity_packet packet; - - if (!packet.decode(packet_data, packet_nbytes)) { - event_subcounts[event_type::packet_bad]++; - continue; - } - - bool mismatch = ((packet.nbeams != nbeams) || - (packet.nupfreq != nupfreq) || - (packet.ntsamp != nt_per_packet) || - (packet.fpga_counts_per_sample != fpga_counts_per_sample)); - - if (_unlikely(mismatch)) { - if (ini_params.throw_exception_on_packet_mismatch) { - stringstream ss; - ss << "ch_frb_io: fatal: packet (nbeams, nupfreq, nt_per_packet, fpga_counts_per_sample) = (" - << packet.nbeams << "," << packet.nupfreq << "," << packet.ntsamp << "," << packet.fpga_counts_per_sample - << "), expected (" - << nbeams << "," << nupfreq << "," << nt_per_packet << "," << fpga_counts_per_sample << ")"; - - throw runtime_error(ss.str()); - } + { + // We hold this lock while processing the whole packet + // list (even if we're not doing any packet forking); the + // only contention for the lock is from rare RPCs. + unique_lock ulock(forking_mutex); + + int fork_sendqueue_start = 0; + int fork_sendqueue_end = 0; + int fork_sendspace_start = 0; + int fork_sendspace_end = 0; + int fork_packets_sent = 0; + int fork_bytes_sent = 0; + + bool forking_active = !forking_paused && (this->forking_packets.size() > 0); + + if (forking_active) { +#if defined(FIONWRITE) + if (ioctl(forking_socket, FIONWRITE, &fork_sendqueue_start) == -1) { + chlog("Failed to call ioctl(FIONWRITE): " << strerror(errno)); + } +#elif defined(SIOCOUTQ) + // linux + if (ioctl(forking_socket, SIOCOUTQ, &fork_sendqueue_start) == -1) { + chlog("Failed to call ioctl(SIOCOUTQ): " << strerror(errno)); + } +#elif defined(SO_NWRITE) + int sz = sizeof(int); + if (getsockopt(forking_socket, SOL_SOCKET, SO_NWRITE, + &fork_sendqueue_start, reinterpret_cast(&sz))) { + chlog("Failed to call getsockopt(SO_NWRITE): " << strerror(errno)); + } +#else + fork_sendqueue_start = -1; +#endif +#if defined(FIONSPACE) + if (ioctl(forking_socket, FIONSPACE, &fork_sendspace_start) == -1) { + chlog("Failed to call ioctl(FIONSPACE): " << strerror(errno)); + } +#else + fork_sendspace_start = -1; +#endif + } - event_subcounts[event_type::stream_mismatch]++; - continue; - } + for (int ipacket = 0; ipacket < packet_list->curr_npackets; ipacket++) { + uint8_t *packet_data = packet_list->get_packet_data(ipacket); + int packet_nbytes = packet_list->get_packet_nbytes(ipacket); + intensity_packet packet; - // All checks passed. Packet is declared "good" here. - // - // The following checks have been performed, either in this routine or in intensity_packet::read(). - // - dimensions (nbeams, nfreq_coarse, nupfreq, ntsamp) are positive, - // and not large enough to lead to integer overflows - // - packet and data byte counts are correct - // - coarse_freq_ids are valid (didn't check for duplicates but that's ok) - // - ntsamp is a power of two - // - fpga_counts_per_sample is > 0 - // - fpga_count is a multiple of (fpga_counts_per_sample * ntsamp) - // - // These checks are assumed by assembled_chunk::add_packet(), and mostly aren't rechecked, - // so it's important that they're done here! - - event_subcounts[event_type::packet_good]++; - - this->packet_max_fpga_seen = std::max(this->packet_max_fpga_seen.load(), packet.fpga_count + (uint64_t)packet.ntsamp * (uint64_t)packet.fpga_counts_per_sample); - - int nfreq_coarse = packet.nfreq_coarse; - int new_data_nbytes = nfreq_coarse * packet.nupfreq * packet.ntsamp; - const int *assembler_beam_ids = &ini_params.beam_ids[0]; // bare pointer for speed - - // Danger zone: we modify the packet by leaving its pointers in place, but shortening its - // length fields. The new packet corresponds to a subset of the original packet containing - // only beam index zero. This scheme avoids the overhead of copying the packet. - - packet.data_nbytes = new_data_nbytes; - packet.nbeams = 1; + if (!packet.decode(packet_data, packet_nbytes)) { + event_subcounts[event_type::packet_bad]++; + continue; + } + + bool mismatch = ((packet.nbeams != nbeams) || + (packet.nupfreq != nupfreq) || + (packet.ntsamp != nt_per_packet) || + (packet.fpga_counts_per_sample != fpga_counts_per_sample)); + + if (_unlikely(mismatch)) { + if (ini_params.throw_exception_on_packet_mismatch) { + stringstream ss; + ss << "ch_frb_io: fatal: packet (nbeams, nupfreq, nt_per_packet, fpga_counts_per_sample) = (" + << packet.nbeams << "," << packet.nupfreq << "," << packet.ntsamp << "," << packet.fpga_counts_per_sample + << "), expected (" + << nbeams << "," << nupfreq << "," << nt_per_packet << "," << fpga_counts_per_sample << ")"; + throw runtime_error(ss.str()); + } + + event_subcounts[event_type::stream_mismatch]++; + continue; + } + + // All checks passed. Packet is declared "good" here. + // + // The following checks have been performed, either in this routine or in intensity_packet::read(). + // - dimensions (nbeams, nfreq_coarse, nupfreq, ntsamp) are positive, + // and not large enough to lead to integer overflows + // - packet and data byte counts are correct + // - coarse_freq_ids are valid (didn't check for duplicates but that's ok) + // - ntsamp is a power of two + // - fpga_counts_per_sample is > 0 + // - fpga_count is a multiple of (fpga_counts_per_sample * ntsamp) + // + // These checks are assumed by assembled_chunk::add_packet(), and mostly aren't rechecked, + // so it's important that they're done here! + + event_subcounts[event_type::packet_good]++; + + this->packet_max_fpga_seen = std::max(this->packet_max_fpga_seen.load(), + packet.fpga_count + (uint64_t)packet.ntsamp * (uint64_t)packet.fpga_counts_per_sample); + + int nfreq_coarse = packet.nfreq_coarse; + int new_data_nbytes = nfreq_coarse * packet.nupfreq * packet.ntsamp; + const int *assembler_beam_ids = &ini_params.beam_ids[0]; // bare pointer for speed + + // Danger zone: we modify the packet by leaving its pointers in place, but shortening its + // length fields. The new packet corresponds to a subset of the original packet containing + // only beam index zero. This scheme avoids the overhead of copying the packet. + + // If we're re-sending packets, make a copy, because pointers get + // modified in this loop! + intensity_packet packetcopy; + if (forking_active) + memcpy(&packetcopy, &packet, sizeof(intensity_packet)); + + packet.data_nbytes = new_data_nbytes; + packet.nbeams = 1; - for (int ibeam = 0; ibeam < nbeams; ibeam++) { - // Loop invariant: at the top of this loop, 'packet' corresponds to a subset of the - // original packet containing only beam index 'ibeam'. - - // Loop over assembler ids, to find a match for the packet_id. - int packet_id = packet.beam_ids[0]; - int assembler_ix = 0; - - for (;;) { - if (assembler_ix >= nbeams) { - // No match found - event_subcounts[event_type::beam_id_mismatch]++; - if (ini_params.throw_exception_on_beam_id_mismatch) - throw runtime_error("ch_frb_io: beam_id mismatch occurred and stream was constructed with 'throw_exception_on_beam_id_mismatch' flag. packet's beam_id: " + std::to_string(packet_id)); - break; - } - - if (assembler_beam_ids[assembler_ix] != packet_id) { - assembler_ix++; - continue; - } - - // Match found - assemblers[assembler_ix]->put_unassembled_packet(packet, event_subcounts); - break; - } + for (int ibeam = 0; ibeam < nbeams; ibeam++) { + // Loop invariant: at the top of this loop, 'packet' corresponds to a subset of the + // original packet containing only beam index 'ibeam'. - // Danger zone: we do some pointer arithmetic, to modify the packet so that it now - // corresponds to a new subset of the original packet, corresponding to beam index (ibeam+1). - - packet.beam_ids += 1; - packet.scales += nfreq_coarse; - packet.offsets += nfreq_coarse; - packet.data += new_data_nbytes; - } - } + // Loop over assembler ids, to find a match for the packet_id. + int packet_id = packet.beam_ids[0]; + int assembler_ix = 0; + + for (;;) { + if (assembler_ix >= nbeams) { + // No match found + event_subcounts[event_type::beam_id_mismatch]++; + if (ini_params.throw_exception_on_beam_id_mismatch) + throw runtime_error("ch_frb_io: beam_id mismatch occurred and stream was constructed with 'throw_exception_on_beam_id_mismatch' flag. packet's beam_id: " + std::to_string(packet_id)); + break; + } + + if (assembler_beam_ids[assembler_ix] != packet_id) { + assembler_ix++; + continue; + } + + // Match found + assemblers[assembler_ix]->put_unassembled_packet(packet, event_subcounts); + break; + } + + // Danger zone: we do some pointer arithmetic, to modify the packet so that it now + // corresponds to a new subset of the original packet, corresponding to beam index (ibeam+1). + + packet.beam_ids += 1; + packet.scales += nfreq_coarse; + packet.offsets += nfreq_coarse; + packet.data += new_data_nbytes; + } + + if (forking_active) { + // Revert the packet header to its original state! + memcpy(&packet, &packetcopy, sizeof(intensity_packet)); + + /* + // In case there are any single-beam forks, create a header + // for 1 beam. + intensity_packet subpacket; + memcpy(&subpacket, &packetcopy, sizeof(intensity_packet)); + int nc = subpacket.nfreq_coarse; + subpacket.nbeams = 1; + subpacket.data_nbytes = nc * subpacket.nupfreq * subpacket.ntsamp; + int nsub = subpacket.set_pointers(forked_packet_data); + */ + + int nc = packet.nfreq_coarse; + // Create headers for 1, 2, or 3-beam sub-packets. + const int NS = 3; + intensity_packet subpackets[NS]; + int nsubs[NS]; + for (int s=0; sbeam == 0) { + // send all (four) beams! + for (int i=0; idestbeam; + int nsent = sendto(forking_socket, packet_data, packet_nbytes, 0, + reinterpret_cast(&(it->dest)), sizeof(it->dest)); + fork_packets_sent++; + fork_bytes_sent += nsent; + for (int i=0; idestbeam; + if (nsent == -1) + chlog("Failed to send forked packet data: " << strerror(errno)); + if (nsent < packet_nbytes) + chlog("Sent " << nsent << " < " << packet_nbytes << " bytes of forked packet data!"); + continue; + } + // send 1-3 beams + // start beam index + int ibeam = 0; + if (it->beam == 0) { + // find beam index + ibeam = -1; + for (int i=0; ibeam) { + ibeam = i; + break; + } + if (ibeam == -1) { + chlog("Forking: beam " << it->beam << " not found"); + continue; + } + } + int nbeams = 1; + if (it->beam == -2) { + nbeams = 2; + } else if (it->beam == -3) { + nbeams = 3; + } + intensity_packet* subpacket = &(subpackets[nbeams-1]); + int nsub = nsubs[nbeams-1]; + memcpy(subpacket->coarse_freq_ids, + packet.coarse_freq_ids, + nc * sizeof(uint16_t)); + for (int i=0; ibeam_ids[i] = packet.beam_ids[ibeam + i] + it->destbeam; + memcpy(subpacket->scales, + packet.scales + ibeam * nc, + nbeams * nc * sizeof(float)); + memcpy(subpacket->offsets, + packet.offsets + ibeam * nc, + nbeams * nc * sizeof(float)); + memcpy(subpacket->data, + packet.data + ibeam * subpacket->data_nbytes/nbeams, + subpacket->data_nbytes); + + int nsent = sendto(forking_socket, forked_packet_data, nsub, 0, + reinterpret_cast(&(it->dest)), sizeof(it->dest)); + fork_packets_sent++; + fork_bytes_sent += nsent; + if (nsent == -1) + chlog("Failed to send forked packet data: " << strerror(errno)); + if (nsent < nsub) + chlog("Sent " << nsent << " < " << nsub << " bytes of forked packet data!"); + /* check what we sent + intensity_packet dec; + bool ok = dec.decode(forked_packet_data, nsub); + cout << "Send sub-packet: parsed " << (ok?"ok":"failed") << ", nbeams " << dec.nbeams << ", first beam " << dec.beam_ids[0] << ", data bytes " << dec.data_nbytes << ", nc " << nc << ", nu " << subpacket.nupfreq << ", nt " << subpacket.ntsamp << endl; + */ + } + } // end forking_active + } // end of loop over packet list + if (forking_active) { +#if defined(FIONWRITE) + if (ioctl(forking_socket, FIONWRITE, &fork_sendqueue_end) == -1) { + chlog("Failed to call ioctl(FIONWRITE): " << strerror(errno)); + } +#elif defined(SIOCOUTQ) + if (ioctl(forking_socket, SIOCOUTQ, &fork_sendqueue_end) == -1) { + chlog("Failed to call ioctl(SIOCOUTQ): " << strerror(errno)); + } +#elif defined(SO_NWRITE) + int sz = sizeof(int); + if (getsockopt(forking_socket, SOL_SOCKET, SO_NWRITE, + &fork_sendqueue_end, reinterpret_cast(&sz))) { + chlog("Failed to call getsockopt(SO_NWRITE): " << strerror(errno)); + } +#else + fork_sendqueue_end = -1; +#endif +#if defined(FIONSPACE) + if (ioctl(forking_socket, FIONSPACE, &fork_sendspace_end) == -1) { + chlog("Failed to call ioctl(FIONSPACE): " << strerror(errno)); + } +#else + int isz = sizeof(int); + int bufsize = 0; + if (getsockopt(forking_socket, SOL_SOCKET, SO_SNDBUF, + &bufsize, reinterpret_cast(&isz))) { + chlog("Failed to call getsockopt(SO_SNDBUF): " << strerror(errno)); + } + fork_sendspace_end = bufsize; + //fork_sendspace_end = -1; +#endif + int esz = sizeof(int); + int forking_error = 0; + if (getsockopt(forking_socket, SOL_SOCKET, SO_ERROR, + &forking_error, reinterpret_cast(&esz))) { + chlog("Failed to call getsockopt(SO_ERROR): " << strerror(errno)); + } + + struct timeval tvnow = xgettimeofday(); + uint64_t worktime = usec_between(tva, tvnow); + uint64_t tperiod = usec_between(tvf, tvnow); + tvf = tvnow; + + //chlog("Packet list: " << packet_list->curr_npackets << ", forwarded " << fork_packets_sent << " packets, " << fork_bytes_sent << " bytes in " << worktime/1000 << " ms / " << tperiod/1000 << " ms -> " << ((fork_packets_sent * 1e6) / tperiod) << " packets/sec, " << ((fork_bytes_sent * 1e6 * 8) / tperiod) << " bits/sec. Send queue: " << fork_sendqueue_start << " | " << fork_sendspace_start << " at start, " << fork_sendqueue_end << " | " << fork_sendspace_end << " at end. Socket error: " << forking_error); + } + } // end of forking_mutex lock // We accumulate event counts once per udp_packet_list. this->_add_event_counts(assembler_thread_event_subcounts); } + free(forked_packet_data); } bool intensity_network_stream::inject_assembled_chunk(assembled_chunk* chunk) @@ -1246,13 +1523,13 @@ void intensity_network_stream::_fetch_frame0() { curl_easy_cleanup(curl_handle); string frame0_txt = holder.thestring; - chlog("Received frame0 text: " << frame0_txt); + //chlog("Received frame0 text: " << frame0_txt); Json::Reader frame0_reader; Json::Value frame0_json; if (!frame0_reader.parse(frame0_txt, frame0_json)) throw runtime_error("ch_frb_io: failed to parse 'frame0' string: '" + frame0_txt + "'"); - chlog("Parsed: " << frame0_json); + //chlog("Parsed: " << frame0_json); if (!frame0_json.isObject()) throw runtime_error("ch_frb_io: 'frame0' was not a JSON 'Object' as expected"); diff --git a/intensity_packet.cpp b/intensity_packet.cpp index 175ba5b..26f1ad2 100644 --- a/intensity_packet.cpp +++ b/intensity_packet.cpp @@ -75,6 +75,22 @@ bool intensity_packet::decode(const uint8_t *src, int src_nbytes) return true; } +int intensity_packet::set_pointers(uint8_t *dst) { + int nb = this->nbeams; + int nf = this->nfreq_coarse; + int nu = this->nupfreq; + int nt = this->ntsamp; + + memcpy(dst, this, 24); + + this->beam_ids = (uint16_t *)(dst + 24); + this->coarse_freq_ids = (uint16_t *)(dst + 24 + 2*nb); + this->scales = (float *) (dst + 24 + 2*nb + 2*nf); + this->offsets = (float *) (dst + 24 + 2*nb + 2*nf + 4*nb*nf); + this->data = dst + 24 + 2*nb + 2*nf + 8*nb*nf; + + return 24 + 2*nb + 2*nf + 8*nb*nf + nb*nf*nu*nt; +} // Encodes a floating-point array of intensities into raw packet data, before sending packet. // The precise semantics aren't very intuitive, see extended comment in ch_frb_io_internals.hpp for details! @@ -86,19 +102,22 @@ int intensity_packet::encode(uint8_t *dst, const float *intensity, int beam_istr int nu = this->nupfreq; int nt = this->ntsamp; + // similar but not exactly the same as set_pointers()... memcpy(dst, this, 24); memcpy(dst + 24, this->beam_ids, 2*nb); memcpy(dst + 24 + 2*nb, this->coarse_freq_ids, 2*nf); - this->scales = (float *) (dst + 24 + 2*nb + 2*nf); + this->scales = (float *) (dst + 24 + 2*nb + 2*nf); this->offsets = (float *) (dst + 24 + 2*nb + 2*nf + 4*nb*nf); this->data = dst + 24 + 2*nb + 2*nf + 8*nb*nf; + int nbytes = 24 + 2*nb + 2*nf + 8*nb*nf + nb*nf*nu*nt; + for (int b = 0; b < nb; b++) { for (int f = 0; f < nf; f++) { uint8_t *sub_data = data + (b*nf+f) * (nu*nt); const float *sub_int = intensity + b*beam_istride + f*nu*freq_istride; - const float *sub_wt = weights + b*beam_wstride + f*nu*freq_wstride; + const float *sub_wt = weights + b*beam_wstride + f*nu*freq_wstride; float acc0 = 0.0; float acc1 = 0.0; @@ -116,7 +135,7 @@ int intensity_packet::encode(uint8_t *dst, const float *intensity, int beam_istr } if (acc0 <= 0.0) { - this->scales[b*nf+f] = 1.0; + this->scales [b*nf+f] = 1.0; this->offsets[b*nf+f] = 0.0; memset(sub_data, 0, nu*nt); continue; @@ -135,7 +154,7 @@ int intensity_packet::encode(uint8_t *dst, const float *intensity, int beam_istr float scale = sqrt(var) / 25.; float offset = -128.*scale + mean; // 0x80 -> mean - this->scales[b*nf+f] = scale; + this->scales [b*nf+f] = scale; this->offsets[b*nf+f] = offset; for (int u = 0; u < nu; u++) { @@ -151,8 +170,7 @@ int intensity_packet::encode(uint8_t *dst, const float *intensity, int beam_istr } } } - - return 24 + 2*nb + 2*nf + 8*nb*nf + nb*nf*nu*nt; + return nbytes; } diff --git a/msgpack_binary_vector.hpp b/msgpack_binary_vector.hpp new file mode 100644 index 0000000..7043aa1 --- /dev/null +++ b/msgpack_binary_vector.hpp @@ -0,0 +1,73 @@ +#ifndef _MSGPACK_BINARY_VECTOR_HPP +#define _MSGPACK_BINARY_VECTOR_HPP + +#include +#include + +#include + +namespace ch_frb_io { + +template +class msgpack_binary_vector : public std::vector +{}; + +} + +namespace msgpack { +MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS) { +namespace adaptor { + +template +struct convert > { + msgpack::object const& operator()(msgpack::object const& o, + ch_frb_io::msgpack_binary_vector& v) const { + //std::cout << "msgpack_binary_vector: type " << o.type << std::endl; + if (o.type != msgpack::type::ARRAY) + throw std::runtime_error("msgpack_binary_vector: expected type ARRAY"); + // Make sure array is big enough to check version + //std::cout << "msgpack_binary_vector: array size " << o.via.array.size << std::endl; + if (o.via.array.size != 3) + throw std::runtime_error("msgpack_binary_vector: expected array size 3"); + msgpack::object* arr = o.via.array.ptr; + uint8_t version = arr[0].as(); + //std::cout << "version " << version << std::endl; + if (version != 1) + throw std::runtime_error("msgpack_binary_vector: expected version=1"); + size_t n = arr[1].as(); + //std::cout << "msgpack_binary_vector: vector size " << n << std::endl; //", type " << arr[2].type << std::endl; + v.resize(n); + if (arr[2].type != msgpack::type::BIN) + throw msgpack::type_error(); + //std::cout << "binary size " << arr[2].via.bin.size << " vs " << n << " x " << + //sizeof(T) << " = " << (n * sizeof(T)) << std::endl; + if (arr[2].via.bin.size != n * sizeof(T)) + throw msgpack::type_error(); + memcpy(reinterpret_cast(v.data()), arr[2].via.bin.ptr, n * sizeof(T)); + //std::cout << "msgpack_binary_vector: returned vector size " << v.size() << std::endl; + return o; + } +}; + +template +struct pack > { + template + packer& operator()(msgpack::packer& o, ch_frb_io::msgpack_binary_vector const& v) const { + uint8_t version = 1; + o.pack_array(3); + o.pack(version); + o.pack(v.size()); + o.pack_bin(v.size() * sizeof(T)); + o.pack_bin_body(reinterpret_cast(v.data())); + return o; + } +}; + +} // namespace adaptor +} // MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS) +} // namespace msgpack + +#endif + + + diff --git a/udp_packet_ringbuf.cpp b/udp_packet_ringbuf.cpp index faee243..ed54840 100644 --- a/udp_packet_ringbuf.cpp +++ b/udp_packet_ringbuf.cpp @@ -8,6 +8,10 @@ namespace ch_frb_io { }; // pacify emacs c-mode! #endif +// This is a lightweight scoped lock +typedef std::lock_guard guard_t; +// This is also a scoped lock that supports use of a condition variable. +typedef std::unique_lock ulock_t; udp_packet_ringbuf::udp_packet_ringbuf(int ringbuf_capacity_, int max_npackets_per_list_, int max_nbytes_per_list_) : ringbuf_capacity(ringbuf_capacity_), @@ -20,27 +24,15 @@ udp_packet_ringbuf::udp_packet_ringbuf(int ringbuf_capacity_, int max_npackets_p this->ringbuf.resize(ringbuf_capacity); for (int i = 0; i < ringbuf_capacity; i++) ringbuf[i] = make_unique (this->max_npackets_per_list, this->max_nbytes_per_list); - - pthread_mutex_init(&this->lock, NULL); - pthread_cond_init(&this->cond_packets_added, NULL); - pthread_cond_init(&this->cond_packets_removed, NULL); } -udp_packet_ringbuf::~udp_packet_ringbuf() -{ - pthread_mutex_destroy(&lock); - pthread_cond_destroy(&cond_packets_added); - pthread_cond_destroy(&cond_packets_removed); -} - void udp_packet_ringbuf::get_size(int* currsize, int* maxsize) { - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); if (currsize) *currsize = ringbuf_size; if (maxsize) *maxsize = ringbuf_capacity; - pthread_mutex_unlock(&this->lock); } bool udp_packet_ringbuf::put_packet_list(unique_ptr &p, bool is_blocking) @@ -48,32 +40,28 @@ bool udp_packet_ringbuf::put_packet_list(unique_ptr &p, bool is if (!p) throw runtime_error("ch_frb_io: udp_packet_ringbuf::put_packet_list() was called with empty pointer"); - pthread_mutex_lock(&this->lock); + ulock_t lock(mutx); for (;;) { - if (stream_ended) { - pthread_mutex_unlock(&this->lock); + if (stream_ended) throw runtime_error("ch_frb_io: internal error: udp_packet_ringbuf::put_packet_list() called after end of stream"); - } if (ringbuf_size < ringbuf_capacity) { int i = (ringbuf_pos + ringbuf_size) % ringbuf_capacity; std::swap(this->ringbuf[i], p); this->ringbuf_size++; - - pthread_cond_broadcast(&this->cond_packets_added); - pthread_mutex_unlock(&this->lock); + + cond_packets_added.notify_all(); p->reset(); return true; } if (!is_blocking) { - pthread_mutex_unlock(&this->lock); p->reset(); return false; } - pthread_cond_wait(&this->cond_packets_removed, &this->lock); + cond_packets_removed.wait(lock); } } @@ -84,7 +72,8 @@ bool udp_packet_ringbuf::get_packet_list(unique_ptr &p) throw runtime_error("ch_frb_io: udp_packet_ringbuf::get_packet_list() was called with empty pointer"); p->reset(); - pthread_mutex_lock(&this->lock); + + ulock_t lock(mutx); for (;;) { if (ringbuf_size > 0) { @@ -93,36 +82,31 @@ bool udp_packet_ringbuf::get_packet_list(unique_ptr &p) this->ringbuf_pos++; this->ringbuf_size--; - pthread_cond_broadcast(&this->cond_packets_removed); - pthread_mutex_unlock(&this->lock); + cond_packets_removed.notify_all(); return true; } - if (stream_ended) { - pthread_mutex_unlock(&this->lock); + if (stream_ended) return false; - } - pthread_cond_wait(&this->cond_packets_added, &this->lock); + cond_packets_added.wait(lock); } } void udp_packet_ringbuf::end_stream() { - pthread_mutex_lock(&this->lock); + ulock_t lock(mutx); this->stream_ended = true; - pthread_cond_broadcast(&this->cond_packets_added); - pthread_cond_broadcast(&this->cond_packets_removed); - pthread_mutex_unlock(&this->lock); + cond_packets_added.notify_all(); + cond_packets_removed.notify_all(); } bool udp_packet_ringbuf::is_alive() { - pthread_mutex_lock(&this->lock); + guard_t lock(mutx); bool ret = !this->stream_ended; - pthread_mutex_unlock(&this->lock); return ret; }