Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel conference bridge #4241

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

LeonidGoltsblat
Copy link
Contributor

The strictly sequential and single-threaded conference bridge of pjsip must service all connected ports within a single timer tick, inevitably leading to limitations on the number of serviced ports and high CPU performance requirements. The goal of this pull request is to implement parallel servicing of conference bridge ports while preserving the switch behavior as much as possible and minimizing changes to the original codebase.

Parallelism is implemented using the OpenMP C/C++ Application Program Interface (API) (hereinafter referred to as OpenMP), which allows declarative parallel execution of code segments that were not initially designed for parallel processing. OpenMP is cross-platform and supported by the vast majority of C language compilers (and other languages). This solution uses a set of OpenMP tools limited to version 2.0, an older standard version. This ensures that all modern compilers can compile and execute this code correctly.

OpenMP support is not enabled by default; each development environment must be explicitly configured to use OpenMP, which is disabled by default. This ensures compatibility of the proposed solution with applications that do not require parallel switching. No changes are required for such applications.

The changes to the source code primarily involve two aspects:

  • Instead of using the shared buffer provided by the master port for all ports to read data, each port reserves its own buffer and reads data into it. This enables parallel reading of data by different ports. (This approach uses memory less efficiently; in practice, a buffer is needed not for each port but for each thread.)

  • Since data from different input ports may need to be mixed into the buffer of the same output port to create a conference, access to this buffer must be synchronized. For this purpose, each port creates its own lock using OpenMP tools, which is then used to synchronize the mixing of data from different sources.

The entire get_frame() function (as before) is divided into three steps:

  1. Initialization
  2. Reading and mixing data
  3. Transmitting data

Each of steps 2 and 3 represents an OpenMP parallel execution region. The steps themselves are executed sequentially, meaning the next step begins only after all tasks from the previous step have been completed by all threads.

Unlike step 2, during step 3, the ports operate completely independently; no changes to the switching scheme affect the data processing for the ports. Therefore, the tasks in the asynchronous switching queue are executed concurrently with the main tasks of step 3. To prevent premature deletion of ports with transmitters, the grplock reference count of such ports is incremented during initialization (step 1). The reference count is decremented when the port processing is completed in step 3. This ensures that even if an OP_REMOVE_PORT operation is performed concurrently with data transmission in step 3, the physical resources will only be released once they are no longer in use.

In this version, the remaining risk of deadlocks after the introduction of asynchronous switching #3928 has also been resolved. The grplock handle, called within the OP_REMOVE_PORT operation under the protection of the conference bridge mutex, could previously initiate other locks in an unpredictable order. This was a potential source of deadlocks. However, the asynchronous switching algorithm ensures that such operations are executed by only one thread at a time. Therefore, these operations do not require additional synchronization. For this reason, the execution of asynchronous operations has been moved out from under the protection of the conference bridge mutex. This mutex is required only for the asynchronous operation queue, not for the operations themselves.

Other changes in the code are minor and not critical to the overall optimization concept.

  • For instance, the aforementioned step 1 does not initialize the output port buffers. Instead, ports store a "timestamp" of the last frame loaded into the buffer. If this timestamp differs from the timestamp of the frame being loaded, the first (and possibly only) frame is loaded into the buffer. This allows for straightforward copying without prior initialization and without unnecessary summing with zero. Importantly, the timestamp itself only increases and, therefore, generally does not require separate initialization.

  • The OP_ADD_PORT operation has been excluded from asynchronous operations. This version of the conference bridge does not use port counters or the is_new flag. Instead, lower and upper bounds for the range of active ports are maintained. An "active" port is defined as one that is connected to something, i.e., has a non-zero number of listeners or transmitters. Only such ports participate in the sound transmission process. Immediately after being added, a port is not yet active and does not affect the operation of the conference bridge. The appearance of such a port concurrently with the execution of get_frame() does not influence the execution of steps 1-3 in any way.

  • Creating new ports can also be performed "practically" in parallel. Ideally, it would be "fully parallel," but reserving a slot in the port array, though performed in O(1) time, still requires mutex locking for a short duration (by default, pj_stack is not used). Once the slot is reserved, further creation and initialization of the port within the reserved slot can be carried out concurrently with similar actions in other slots.

The only expected change in behavior is not directly related to parallelism and involves special handling of the PJ_EEOF code returned by read_port(). This code is interpreted as a signal from the port that it no longer has and will not produce any new data (e.g., a fileplayer has reached the end of the file). In this case, rx_setting = PJMEDIA_PORT_DISABLE is triggered, ensuring that no further attempts are made to retrieve data from the port. This also prevents repeated triggering of the eof_cb() and eof_cb2() callbacks, sparing the application from unnecessary calls.

Incidentally, the asynchronous switching implementation in PR #3928 resolved many deadlock-related issues. Among other improvements, it eliminated the need to prohibit very convenient synchronous callbacks (see #2251).

@sauwming
Copy link
Member

sauwming commented Jan 6, 2025

I like the premise of the introduction of parallelism. I'm curious whether you have any performance speedup data.

It must be noted though, that the additional compile-time switches will undoubtedly make conference much much more difficult to maintain and debug should issues arise. So I wonder whether the performance boost is worth the additional complexity.

@LeonidGoltsblat
Copy link
Contributor Author

LeonidGoltsblat commented Jan 6, 2025

I'm curious whether you have any performance speedup data.

OpenMP conference bridge was inroduced 3-4 years ago.
Previously, speech intelligibility disappeared after 30-40 ports. The OpenMP conference bridge version with OMP_NUM_THREADS=8 works fine with 240 ports without any degradation in audio quality. Obviously, this is far from the limit, but there were no tests with higher density.

No tests were performed with a lower OMP_NUM_THREADS value.

On compile-time switching. To avoid conditional compilation, we can add an "OpenMP stub" that emulates sequential semantics. See an example here:
https://learn.microsoft.com/en-us/cpp/parallel/openmp/b-stubs-for-run-time-library-functions?view=msvc-170

@sauwming
Copy link
Member

sauwming commented Jan 7, 2025

I like the idea of adding an additional layer (i.e. OpenMP stub -- perhaps this can be put in pjlib) to avoid compilation-time switches. I believe this is necessary if we want to integrate this, otherwise the conference code will become such a nightmare to read.

So for me, I vote towards adopting this (with the condition of removing the compile-time switches). But before we proceed further, let's hear first what others think about this parallel feature.

Also, your note about #2251 is interesting. So should we undeprecate/reactivate the callbacks eof_cb() now that conf is async, @nanangizz?

@nanangizz
Copy link
Member

I like the idea of adding an additional layer (i.e. OpenMP stub -- perhaps this can be put in pjlib) to avoid compilation-time switches. I believe this is necessary if we want to integrate this, otherwise the conference code will become such a nightmare to read.

So for me, I vote towards adopting this (with the condition of removing the compile-time switches). But before we proceed further, let's hear first what others think about this parallel feature.

Yes, I think it is a good idea to wrap OpenMP in PJLIB (or perhaps PJLIB-UTIL?), for readibility/maintainability & platform compatibility.

Also, we use background processing in some places already (e.g: job queue, worker thread, event manager), this new framework (background & multiprocessing) may standardize them perhaps.

Also, your note about #2251 is interesting. So should we undeprecate/reactivate the callbacks eof_cb() now that conf is async, @nanangizz?

Maybe :)
The deadlock should be no longer a problem. But there is also a side effect of such synchronous callback: blocking the conf clock (even with parallel conf, as at some steps it may still need sync all workers, haven't checked the details). So IMHO it is still reasonable to encourage app to use the eof_cb2() instead of eof_cb() to avoid possible performance blocker.

@bennylp
Copy link
Member

bennylp commented Jan 7, 2025

First of all, thank you for the patch submission again. This is really interesting and exciting, especially to hear that the parallel version can achieve much higher performance than the plain one.

However, unfortunately it is implemented using OpenMP... :) OpenMP is quite a "beast" to support. It's (too) high level, too implicit, requires support from many tools (compilers, debuggers), will require changes in build commands, require another skill set to master, and last but not least, not supported by iOS and Android (at least officially). I would very much prefer it to be implemented using pj_threads, so it's automatically as portable as the rest of the code.

If there is something like "thread pool" in pjlib (similar to Python's process pool), would it help? (and more importantly, are you willing to change it to use it :)

Or if you want to submit this as is, then I think the best way is to "fork" conference.c into new implementation. e.g. conf_openmp.c, activated by something like PJMEDIA_CONF_USE_OPENMP. But then this file will be less maintained.

@LeonidGoltsblat
Copy link
Contributor Author

Short answer: Let's try. Please wait for a more detailed answer in a couple of days. I'm a bit busy right now.

@LeonidGoltsblat
Copy link
Contributor Author

More Detailed Answer

A quick web search shows that OpenMP is supported by the Android NDK starting with r11 (though this information should be verified!). However, iOS still does not officially support OpenMP.

> something like "thread pool" in pjlib

Yes! I’ve always wondered how pjsip works without a thread pool! ☺

As a general rule, Windows programs rarely create threads explicitly. Instead, they register callbacks of various types (e.g., IO, events, timers, etc.) with a thread pool managed by the OS kernel (Microsoft Docs on Thread Pools).

With more information about the workload, the OS can manage the thread pool more efficiently than an application, for example, by deciding whether to start a new thread if all pool threads are waiting. It would be highly beneficial to have platform-dependent thread pool support in pjlib!

The main challenge preventing the use of the Windows thread pool API right now is the need to register a pj_thread_t object for each thread, which is inconvenient for threads created and managed outside the application’s control. To avoid memory leaks, a "wrapper" is required to register the thread at the start of the callback and unregister it at the end. However, it's unclear how to make this wrapper completely transparent to the application.

> to "fork" conference.c

The current conference implementation already includes numerous enhancements unrelated to OpenMP:

  • Concurrent, synchronous creation of conference ports
  • A more stable and a little bit efficient looping mechanism based on lower and upper boundaries instead of using port_counter and is_new flags
  • More efficient get_frame() initialization based on timestamps, which doesn’t require zeroing
  • Optimizations for 1-to-1 connections
  • etc

I propose integrating these enhancements first, then creating a "forked" version, and only after that proceeding with further work on parallelism

"In the real world"

In practice, I use a heavily optimized IOCP queue and RTP transport. I haven’t tested the standard ioqueue implementation, so I’m unsure how it behaves in multithreaded scenarios. It’s possible that both ioqueue and transport may require optimization after parallelizing the conference bridge (in my case, the bridge was optimized last).

@bennylp
Copy link
Member

bennylp commented Jan 14, 2025

Thanks for Android OpenMP info.

The initial idea for the thread pool is a high level pool of pj threads, unlike Windows thread pool which is a low level OS object, I think. Let me check if it can be abstracted using the same API. But the main objective of pj thread pool is to execute N jobs using M threads and as replacement for OpenMP.

Yes I notice there are many other changes in conf unrelated to parallelism. It would be better to submit them as separate enhancements.

Thanks for the communication. Although our high level API is geared towards client, the "core" was intended to be high performant, hence this topic is very interesting to us.

@LeonidGoltsblat
Copy link
Contributor Author

LeonidGoltsblat commented Jan 21, 2025

Hi!
Apologies for the delay.

Here is the updated version that avoids the use of OpenMP functions and conditional compilation where possible. Instead, only OpenMP pragmas are utilized, though some OpenMP functions are retained for debugging purposes.

Key notes:

  • This version does not require any modifications to the build system. By default, it operates without OpenMP and runs as a "normal" sequential conference bridge.
  • Users who wish to enable OpenMP will need to configure their build environment independently, relieving the PJSIP team of the need to support this framework.
  • Hopefully, this variant is stable enough to serve as a foundation for the next step ("standard PJ threading").
  • I aim to proceed with the next step in the coming days.
  • It would be great if this version could be merged into the main branch of the project to serve as a starting point for future enhancements.

Minor change:
For compatibility with #4253, the is_new flag had to be reinstated, while adding a port still operates synchronously and doesn’t rely on this flag. It would be ideal to explicitly check for the absence of a clock. If the clock is absent, all operations (not just OP_REMOVE_PORT) should be performed synchronously.

The initial concept for the thread pool is a high-level pool of PJ threads, in contrast to the Windows thread pool, which seems to be a low-level OS object. Let me verify whether it can be abstracted using the same API. However, the primary goal of the PJ thread pool is to execute N jobs using M threads, serving as a replacement for OpenMP.

Could you clarify: does a PJ thread pool currently exist, or is it a planned API? Such an API would significantly simplify the implementation of a multi-threaded conference bridge.

Regarding the Windows thread pool API:
This is not just a low-level API—it supports operations ranging from low-level (e.g., I/O) to high-level "Work." This aligns with your description: "to execute N jobs using M threads." Relevant functions include CreateThreadpoolWork and SubmitThreadpoolWork. These functions seem particularly suited for parallel conference bridges.

The primary obstacle to leveraging this well-designed API is the current requirement in PJSIP to register threads without the option to unregister them. A similar challenge arises in the OpenMP version and, more broadly, in scenarios where the user program doesn’t control the creation and termination of threads.

Implementing the ability to unregister threads would be highly beneficial. This would enable support for platform-dependent pools like the Windows thread pool.

@LeonidGoltsblat
Copy link
Contributor Author

Hi, everybody!
I am pleased to announce that my promise has been fulfilled! The parallel conference bridge is now implemented using the pj_thread API!

In this commit besides conference.c:

  • conf_openmp.c
  • some OS dependent files introduced barrier syncronization API into pjsip.

Shortly about this two punkts, then about parallel bridge:

a) To compile the OpenMP conference bridge, the user must add

#define PJMEDIA_CONF_USE_OPENMP 1

line to config_site.h AND configure the development environment to use OpenMP. However, the main motivation for this may be testing and comparing the performance of different implementations. My tests show that the OpenMP implementation has no advantages over the "native" pj_thread threading.

b) The native multithreading used with the parallel conference bridge uses a synchronization barrier and required the implementation of this API. The API is implemented in 5 (phew!) variants:

  • 3 for different versions of Windows (Windows 8 and above uses its own barrier implementation, Vista and above uses condition variable-based implementation, and older versions (default for pjsip) use semaphore-based implementation (isn't it time to define PJ_WIN32_WINNT as at least _WIN32_WINNT_VISTA (0x0600) or even as _WIN32_WINNT_WIN8 (0x6002) ?) ,
  • and 2 for Unix (POSIX pthread_barrier_t, supported since POSIX.1-2001 and classic condition variable-based)
    The Windows implementation is quite debugged, the Unix implementation is based on pj_event code, looks simple and clear, but is NOT tested... I don't have access to a Linux machine with developer tools right now. Any assistance would be helpful, please review this code mindfully!
    Note about enum pj_barrier_flags: These flags are only supported in the Windows 8 and higher implementation, but manipulating them has resulted in a 5-10% reduction in CPU usage (from 40-% to 30+%). It's worth the effort!

About the parallel bridge implementation

The OpenMP-based implementation uses the processor cores allocated to it at 100%. For example, if the host has 40 cores and OMP_NUM_THREADS=8, 8 cores are used at about 100%, and 32 at about 0%, the average load is 20%. If OMP_NUM_THREADS==8 and the host has 8 cores, then the load is 100% (actually 90-100%).
On the same 8-core host, the "native" implementation with 240 ports (under Windows 10) works with a load of 20-30% with peaks of up to 40%. (Do we have a 3-4 times performance reserve?...) For OpenMP, the processor load does not depend on the application load: there is no difference between 1 port and 240 ports. The "native" practically does not use resources when there is no work. This does not tell us that the algorithm is extremely efficient, but it does tell us that OpenMP probably uses a spin loop to wait for work and is unaware of the conference bridge timer and other implementation details. Perhaps OpenMP is designed for a different type of workflow.
The current parallel bridge algorithm is optimized for 1-to-1 switching. So we immediately mix the data in the mix_buf listener and transmit the data (call write_port()) immediately after all sources have received data for the current listener. This works fine for 1-to-1 switching, but should not work so well for conference switching (m-to-n). The current algorithm acquires a lock on the listener side to protect the mix_buf listener that has more than 1 source, so threads that have source data for the same listener will block on this lock. There is an obvious optimization: have a queue on the listener side, then sources can add a reference to their receive buffers to this queue and continue processing other listeners without contention. This optimization will not give us a noticeable reduction in CPU load, but it will increase the stability of the wake time on each timer tick and reduce the duration of this time somewhat, so it may improve the audio quality. However, this optimization does require a lock-free queue... We are waiting for pj_pool_aligned_alloc().

By default, the conference bridge is serial. At compile time, the user can define the macro PJ_CONF_BRIDGE_MAX_THREADS. This is the number of threads that the conference bridge should use. This value is used to determine whether the conference bridge should be implemented as a parallel bridge or not. If this value is 1, the conference bridge will be implemented as a serial bridge, otherwise it will be implemented as a parallel bridge. The current implementation uses a static thread pool, as an optimization, it would be better to dynamically determine the required number of threads of the thread pool.

@bennylp
Copy link
Member

bennylp commented Jan 29, 2025

Thanks! Sorry for the late reply, I'm reviewing it now and will get back with more detailed replies.

Copy link
Member

@bennylp bennylp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general this is great, thanks for the hard work! I'm okay in general. I haven't reviewed conference.c in detailed, because I thought we should be addressing these points below first because they will modify the patch significantly.

  1. I would like to inform you about our coding style (which I just wrote in more detail, previously it was too vague :), pls have a look at https://docs.pjsip.org/en/coding-style/get-started/coding-style.html

  2. If you have unit test codes, it would be good to include as well (for the barrier, and conf maybe). If not, I can help with writing it.

  3. Activating the parallel feature (proposal, others pls comment):

At compile time, user should have option to disable multithreading code. This is activated by (new) macro PJMEDIA_CONF_HAS_THREADS, which default value is PJ_HAS_THREADS. I don't require that when threading is disabled we must use the old code, because this could result in too many variants of code.

At run time, user should have the option to control the number of worker threads to use, from 0-N. Zero means the operations will be done only by get_frame() thread. I propose creating new API as follows:

typedef struct pjmedia_conf_param
{
	unsigned max_slots;
	unsigned sampling_rate;
	unsigned channel_count;
	unsigned samples_per_frame;
	unsigned bits_per_sample;
	unsigned options;
	unsigned worker_threads;
} pjmedia_conf_param;

PJ_INLINEvoid) pjmedia_conf_param_default(*param)
{
  pj_bzero(param);
}

PJ_DEF(pj_status_t) pjmedia_conf_create2(pool, pjmedia_conf_param *, *p_port);

*/
enum pj_barrier_flags {
/* Specifies that the thread entering the barrier should block
* immediately until the last thread enters the barrier. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the enum member's comment a doxygen comment (instead of just plain comment)

*
* @param pool The pool to allocate the barrier object.
* @param trip_count The number of threads that must call pj_barrier_wait() before any are allowed to proceed.
* @param p_barrier Pointer to hold the barrier object upon return.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Align the parameter descriptions (to start at the same column)

*
* @return PJ_SUCCESS on success, or the error code.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API must be declared with PJ_DECL (and its corresponding implementation with PJ_DEF). Same comment for other APIs below.

pj_status_t pj_barrier_destroy(pj_barrier_t *barrier);

/**
* Wait for all threads to reach the barrier
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing dot after "barrier"? without it I think doxygen will continue the statement to the next line

/**
* Create a barrier object.
* pj_barrier_create() creates a barrier object that can be used to synchronize threads.
* The barrier object is initialized with a trip count that specifies the number of threads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We limit the length of a line in the source code to 80 chars. We usually less strict about it in the *.c file, especially the test files where the audience is usually limited to ourselves, but in the *.h file this should be easier to follow.

* @return PJ_SUCCESS on success, or the error code.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose changing parameter name "trip_count" to "thread_count", because I find the word "trip" here a bit confusing, and "thread_count" is inline with the naming in Win32 and pthread.

P.S. I'm fine with "trip_count" naming in the implementation

* similar to the POSIX pthread_barrier_wait or Windows EnterSynchronizationBarrier.
*
* @param barrier The barrier to wait on
* @param flags Flags that control the behavior of the barrier (combination of pj_barrier_flags)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it should be clarified what the default value should be (zero?) if the user just wants the default behavior for the barrier wait.

* at the barrier and PJ_FALSE for each of the other threads.
* Otherwise, an error number shall be returned to indicate the error.
*/
pj_status_t pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose replacing the return type with "int". Because with pj_status_t, all non PJ_SUCCESS values are considered error according to this

/**
* Barrier object.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to use PJ_DEF. Same comment for other APIs

/**
* Barrier object.
*/
pj_status_t pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not crucial but we'd appreciate if you follow it to make things uniform. FWIW we use K&R style hence the braces for function definition are on their own line (see https://en.wikipedia.org/wiki/Indentation_style#K&R).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants