Skip to content

Commit

Permalink
change threading strategy because overhead of allocating extra work b…
Browse files Browse the repository at this point in the history
…uffer for each thread seems to be larger in fewer channel process cases.
  • Loading branch information
ss3git committed Nov 14, 2023
1 parent 7c3ec94 commit 58c7d5f
Showing 1 changed file with 38 additions and 133 deletions.
171 changes: 38 additions & 133 deletions src/src_sinc.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,15 @@ sinc_get_description (int src_enum)
#define MULTI_THREADING_THRESHOLD (256)

ALWAYS_INLINE void
calc_output_multi_mt_core(const int skip_fraction, const int num_of_threads, const int th_no,
const SINC_FILTER *const filter, const increment_t increment, const increment_t start_filter_index, const int channels, const double scale, double *const output)
calc_output_multi_mt_core(const int skip_fraction, const SINC_FILTER * const filter,
const increment_t increment, const increment_t start_filter_index, const int channels, const double scale, float * const output)
{
assert(num_of_threads == 1 || num_of_threads % 2 == 0);
double left[MAX_CHANNELS] = {0};
double right[MAX_CHANNELS] = {0};

/* Convert input parameters into fixed point. */
const increment_t max_filter_index = int_to_fp(filter->coeff_half_len);

const int mt_increment_factor = (num_of_threads > 1) ? num_of_threads / 2 : 1;
const int mt_left_right_sw = th_no % 2;
const int mt_shift = th_no / 2;

if (!mt_left_right_sw || num_of_threads == 1)
{
/* First apply the left half of the filter. */
increment_t filter_index1 = start_filter_index;
Expand All @@ -262,9 +256,6 @@ calc_output_multi_mt_core(const int skip_fraction, const int num_of_threads, con
data_index1 += steps * channels;
}

filter_index1 -= increment * mt_shift;
data_index1 = data_index1 + channels * mt_shift;

// left = 0.0;
while (filter_index1 >= MAKE_INCREMENT_T(0))
{
Expand All @@ -277,12 +268,11 @@ calc_output_multi_mt_core(const int skip_fraction, const int num_of_threads, con
for (int ch = 0; ch < channels; ch++)
left[ch] += icoeff * filter->buffer[data_index1 + ch];

filter_index1 -= increment * mt_increment_factor;
data_index1 = data_index1 + channels * mt_increment_factor;
filter_index1 -= increment;
data_index1 = data_index1 + channels;
};
}

if (mt_left_right_sw || num_of_threads == 1)
{
/* Now apply the right half of the filter. */
increment_t filter_index2 = increment - start_filter_index;
Expand All @@ -291,7 +281,6 @@ calc_output_multi_mt_core(const int skip_fraction, const int num_of_threads, con
int data_index2 = filter->b_current + channels * (1 + coeff_count2);
// right = 0.0;

if (!mt_shift)
{
const double fraction = fp_to_double(filter_index2);
const int indx = fp_to_int(filter_index2);
Expand All @@ -306,9 +295,6 @@ calc_output_multi_mt_core(const int skip_fraction, const int num_of_threads, con
filter_index2 -= increment;
data_index2 = data_index2 - channels;

filter_index2 -= increment * mt_shift;
data_index2 = data_index2 - channels * mt_shift;

while (filter_index2 > MAKE_INCREMENT_T(0))
{
const double fraction = fp_to_double(filter_index2);
Expand All @@ -320,38 +306,38 @@ calc_output_multi_mt_core(const int skip_fraction, const int num_of_threads, con
for (int ch = 0; ch < channels; ch++)
right[ch] += icoeff * filter->buffer[data_index2 + ch];

filter_index2 -= increment * mt_increment_factor;
data_index2 = data_index2 - channels * mt_increment_factor;
filter_index2 -= increment;
data_index2 = data_index2 - channels;
}
}

for (int ch = 0; ch < channels; ch++)
output[ch] = (scale * (left[ch] + right[ch])); // double
output[ch] = (float)(scale * (left[ch] + right[ch]));
} /* calc_output_multi_mt_core */

ALWAYS_INLINE void
calc_output_multi_mt_3(const int num_of_threads, const int th_no,
const SINC_FILTER *const filter, const increment_t increment, const increment_t start_filter_index, const int channels, const double scale, double *const output)
calc_output_multi_mt_2(
const SINC_FILTER *const filter, const increment_t increment, const increment_t start_filter_index, const int channels, const double scale, float *const output)
{

const int skip_fraction = increment == ((increment >> SHIFT_BITS) << SHIFT_BITS) && start_filter_index == ((start_filter_index >> SHIFT_BITS) << SHIFT_BITS) ? 1 : 0;

if (skip_fraction)
{
calc_output_multi_mt_core(1, num_of_threads, th_no, filter, increment, start_filter_index, channels, scale, output);
calc_output_multi_mt_core(1, filter, increment, start_filter_index, channels, scale, output);
}
else
{
calc_output_multi_mt_core(0, num_of_threads, th_no, filter, increment, start_filter_index, channels, scale, output);
calc_output_multi_mt_core(0, filter, increment, start_filter_index, channels, scale, output);
}
}

ALWAYS_INLINE void
calc_output_multi_mt_2(const int num_of_threads, const int th_no, const SINC_FILTER *const filter, const increment_t increment, const increment_t start_filter_index, const int channels, const double scale, double *const output)
calc_output_multi_mt(const SINC_FILTER *const filter, const increment_t increment, const increment_t start_filter_index, const int channels, const double scale, float *const output)
{
#define OPTIMIZE_LINE(x) \
case (x): \
calc_output_multi_mt_3(num_of_threads, th_no, filter, increment, start_filter_index, x, scale, output); \
calc_output_multi_mt_2(filter, increment, start_filter_index, x, scale, output); \
break;

switch (channels) // to kick the compile-time optimizer, channel numbers up to 16 are extracted as constants here.
Expand All @@ -373,37 +359,15 @@ calc_output_multi_mt_2(const int num_of_threads, const int th_no, const SINC_FIL
OPTIMIZE_LINE(15);
OPTIMIZE_LINE(16);
default:
calc_output_multi_mt_3(num_of_threads, th_no, filter, increment, start_filter_index, channels, scale, output);
break;
}
#undef OPTIMIZE_LINE
}

ALWAYS_INLINE void
calc_output_multi_mt(const int num_of_threads, const int th_no,
const SINC_FILTER *const filter, const increment_t increment, const increment_t start_filter_index, const int channels, const double scale, double *const output)
{
#define OPTIMIZE_LINE(x) \
case (x): \
calc_output_multi_mt_2(x, th_no, filter, increment, start_filter_index, channels, scale, output); \
break;

switch (num_of_threads) // to kick the compile-time optimizer, the number of threads is extracted as constant here.
{
OPTIMIZE_LINE(1);
OPTIMIZE_LINE(6);
OPTIMIZE_LINE(10);
OPTIMIZE_LINE(14);
default:
calc_output_multi_mt_2(num_of_threads, th_no, filter, increment, start_filter_index, channels, scale, output);
calc_output_multi_mt_2(filter, increment, start_filter_index, channels, scale, output);
break;
}
#undef OPTIMIZE_LINE
}

static SRC_ERROR
_sinc_multichan_vari_process_mt(const int NUM_OF_THREADS, const int child_no, const int interleave, double *const per_thread_data_out,
SRC_STATE *const state, SRC_DATA *const data, SRC_STATE *const main_state )
_sinc_multichan_vari_process_mt(const int num_of_threads, const int child_no,
SRC_STATE * const state, SRC_DATA * const data, SRC_STATE *const main_state )
{
if (state->private_data == NULL)
return SRC_ERR_NO_PRIVATE;
Expand Down Expand Up @@ -452,11 +416,12 @@ _sinc_multichan_vari_process_mt(const int NUM_OF_THREADS, const int child_no, co

/* Main processing loop. */
int interleave_counter = 0;
const int interleave_mask = interleave - 1;
float * const data_out = data->data_out;

while (filter->out_gen < out_count)
{
/* Need to reload buffer? */
int samples_in_hand = (filter->b_end - filter->b_current + filter->b_len) % filter->b_len;
int samples_in_hand = ( filter->b_end < filter->b_current ) ? (filter->b_end - filter->b_current + filter->b_len) : (filter->b_end - filter->b_current);

if (samples_in_hand <= half_filter_chan_len)
{
Expand All @@ -480,7 +445,7 @@ _sinc_multichan_vari_process_mt(const int NUM_OF_THREADS, const int child_no, co
if (state->error != 0)
return state->error;

samples_in_hand = (filter->b_end - filter->b_current + filter->b_len) % filter->b_len;
samples_in_hand = ( filter->b_end < filter->b_current ) ? (filter->b_end - filter->b_current + filter->b_len) : (filter->b_end - filter->b_current);
if (samples_in_hand <= half_filter_chan_len)
break;
};
Expand Down Expand Up @@ -512,17 +477,18 @@ _sinc_multichan_vari_process_mt(const int NUM_OF_THREADS, const int child_no, co

increment_t start_filter_index = double_to_fp(input_index * float_increment);

if ( (interleave_counter & interleave_mask) == (child_no & interleave_mask) ){
calc_output_multi_mt(NUM_OF_THREADS/interleave, child_no/interleave, filter, increment, start_filter_index, channels, scale, per_thread_data_out + filter->out_gen/channels/interleave * channels);
if ( child_no == interleave_counter ){
calc_output_multi_mt(filter, increment, start_filter_index, channels, scale, data_out + filter->out_gen);
}
interleave_counter = (interleave_counter+1) & interleave_mask;
if ( ++interleave_counter == num_of_threads ) interleave_counter = 0;
filter->out_gen += channels;

/* Figure out the next index. */
input_index += (is_constant_ratio) ? constant_input_index_inc : 1.0 / src_ratio;
rem = fmod_one(input_index);

filter->b_current = (filter->b_current + channels * psf_lrint(input_index - rem)) % filter->b_len;
filter->b_current = (filter->b_current + channels * psf_lrint(input_index - rem));
if ( filter->b_current >= filter->b_len ) filter->b_current -= filter->b_len;
input_index = rem;
};

Expand Down Expand Up @@ -554,55 +520,30 @@ sinc_multithread_vari_process(SRC_STATE *state, SRC_DATA *data)
const int N_OF_CORES = omp_get_num_procs();

const int should_be_single_thread = (N_OF_CORES < 2 || in_count < MULTI_THREADING_THRESHOLD);
const int NUM_OF_THREADS = should_be_single_thread ? 1 : (N_OF_CORES / 2 * 2);

int _interleave = 1;
while ( ((NUM_OF_THREADS / _interleave / 2) % 2 == 0 || (NUM_OF_THREADS / _interleave / 2) == 1)
&& _interleave * 2 <= NUM_OF_THREADS)
{
_interleave *= 2;
}

const int interleave = _interleave;

assert( NUM_OF_THREADS % interleave == 0 );
assert( (NUM_OF_THREADS / interleave) % 2 == 0 );

const int interleave_mask = interleave - 1;
const int num_of_threads = should_be_single_thread ? 1 : (N_OF_CORES / 2 * 2);

SRC_STATE *per_thread_state = (SRC_STATE *)malloc(NUM_OF_THREADS * sizeof(SRC_STATE));
SRC_DATA *per_thread_data = (SRC_DATA *)malloc(NUM_OF_THREADS * sizeof(SRC_DATA));
SINC_FILTER *per_thread_filter = (SINC_FILTER *)malloc(NUM_OF_THREADS * sizeof(SINC_FILTER));
SRC_ERROR *per_thread_retval = (SRC_ERROR *)malloc(NUM_OF_THREADS * sizeof(SRC_ERROR));

double **per_thread_data_out = (double **)calloc(NUM_OF_THREADS, sizeof(double *));
SRC_STATE *per_thread_state = (SRC_STATE *)malloc(num_of_threads * sizeof(SRC_STATE));
SRC_DATA *per_thread_data = (SRC_DATA *)malloc(num_of_threads * sizeof(SRC_DATA));
SINC_FILTER *per_thread_filter = (SINC_FILTER *)malloc(num_of_threads * sizeof(SINC_FILTER));
SRC_ERROR *per_thread_retval = (SRC_ERROR *)malloc(num_of_threads * sizeof(SRC_ERROR));

SRC_ERROR retval = SRC_ERR_MALLOC_FAILED;

if ( !per_thread_state || !per_thread_data || !per_thread_filter
|| !per_thread_data_out || !per_thread_retval )
|| !per_thread_retval )
{
goto cleanup_and_return;
}

// OpenMP
omp_set_dynamic(0);
omp_set_num_threads(NUM_OF_THREADS);
omp_set_num_threads(num_of_threads);

assert( NUM_OF_THREADS == omp_get_num_threads() );
assert( num_of_threads == omp_get_num_threads() );

if (NUM_OF_THREADS == 1) // w/o OpenMP
if (num_of_threads == 1) // w/o OpenMP
{
per_thread_data_out[0] = (double *)malloc(out_count * sizeof(double));
if (!per_thread_data_out[0])
goto cleanup_and_return;

per_thread_retval[0] = _sinc_multichan_vari_process_mt(1, 0, 1, per_thread_data_out[0], state, data, state);

for (int count = 0; count < filter->out_gen; count++)
{
data->data_out[count] = (float)per_thread_data_out[0][count];
}
per_thread_retval[0] = _sinc_multichan_vari_process_mt(1, 0, state, data, state);

retval = per_thread_retval[0];

Expand All @@ -612,18 +553,9 @@ sinc_multithread_vari_process(SRC_STATE *state, SRC_DATA *data)
int omp_child_no;

#pragma omp parallel for
for (omp_child_no = 0; omp_child_no < NUM_OF_THREADS; omp_child_no++)
for (omp_child_no = 0; omp_child_no < num_of_threads; omp_child_no++)
{
const int child_no = omp_child_no;
per_thread_data_out[child_no] = (double *)malloc((out_count/interleave + channels) * sizeof(double));

if ( !per_thread_data_out[child_no])
{

per_thread_retval[child_no] = SRC_ERR_MALLOC_FAILED;

continue;
}

memcpy(&per_thread_data[child_no], data, sizeof(SRC_DATA));
memcpy(&per_thread_filter[child_no], filter, sizeof(SINC_FILTER));
Expand All @@ -634,12 +566,12 @@ sinc_multithread_vari_process(SRC_STATE *state, SRC_DATA *data)
per_thread_filter[child_no].buffer = filter->buffer;

per_thread_retval[child_no] = _sinc_multichan_vari_process_mt(
NUM_OF_THREADS, child_no, interleave, per_thread_data_out[child_no],
num_of_threads, child_no,
&per_thread_state[child_no], &per_thread_data[child_no], state);
}

// error checking for each worker
for (int child_no = 0; child_no < NUM_OF_THREADS; child_no++)
for (int child_no = 0; child_no < num_of_threads; child_no++)
{
if (per_thread_retval[child_no] != SRC_ERR_NO_ERROR)
{
Expand All @@ -658,22 +590,6 @@ sinc_multithread_vari_process(SRC_STATE *state, SRC_DATA *data)

memcpy(data, &per_thread_data[0], sizeof(SRC_DATA));

int omp_count;

#pragma omp parallel for
for (omp_count = 0; omp_count < filter->out_gen; omp_count++) // sum up every worker's result
{
const int count = omp_count;
const int interleave_counter = count/channels;
double sum = 0.0;
for (int c_no = 0; c_no < NUM_OF_THREADS/interleave; c_no++)
{
const int child_no = (c_no * interleave) + (interleave_counter & interleave_mask);
sum += per_thread_data_out[child_no][interleave_counter/interleave * channels];
}
data->data_out[count] = (float)sum;
}

retval = SRC_ERR_NO_ERROR;

cleanup_and_return:
Expand All @@ -690,17 +606,6 @@ sinc_multithread_vari_process(SRC_STATE *state, SRC_DATA *data)
if (per_thread_retval)
free(per_thread_retval);

if (per_thread_data_out)
{
for (int child_no = 0; child_no < NUM_OF_THREADS; child_no++)
{
if (per_thread_data_out[child_no])
free(per_thread_data_out[child_no]);
}

free(per_thread_data_out);
}

return retval;
}

Expand Down

0 comments on commit 58c7d5f

Please sign in to comment.