Skip to content

Commit

Permalink
Use audio pipe line for mixed meetings (#331)
Browse files Browse the repository at this point in the history
Use of audio pipe line in engine mixer for mixed users
  • Loading branch information
olofkallander authored Oct 27, 2023
1 parent c20f274 commit f7c277f
Show file tree
Hide file tree
Showing 28 changed files with 283 additions and 602 deletions.
37 changes: 10 additions & 27 deletions bridge/Mixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,23 +365,6 @@ bool Mixer::addAudioStream(std::string& outId,
return streamItr.first->second->transport->isInitialized();
}

void Mixer::allocateAudioBuffer(uint32_t ssrc)
{
std::lock_guard<std::mutex> locker(_configurationLock);
auto findResult = _audioBuffers.find(ssrc);
if (findResult != _audioBuffers.cend())
{
return;
}

logger::info("Allocating audio buffer for ssrc %u", getLoggableId().c_str(), ssrc);

auto audioBuffer = std::make_unique<EngineMixer::AudioBuffer>();
auto* rawAudioBuffer = audioBuffer.get();
_audioBuffers.emplace(ssrc, std::move(audioBuffer));
_engineMixer->asyncAddAudioBuffer(ssrc, rawAudioBuffer);
}

bool Mixer::addVideoStream(std::string& outId,
const std::string& endpointId,
const utils::Optional<ice::IceRole>& iceRole,
Expand Down Expand Up @@ -719,11 +702,6 @@ void Mixer::engineAudioStreamRemoved(const EngineAudioStream& engineStream)
return;
}

if (engineStream.remoteSsrc.isSet() && _audioBuffers.find(engineStream.remoteSsrc.get()) != _audioBuffers.end())
{
_audioBuffers.erase(engineStream.remoteSsrc.get());
}

const auto transport = streamItr->second->transport;
logger::info("AudioStream id %s, endpointId %s deleted.",
_loggableId.c_str(),
Expand Down Expand Up @@ -842,12 +820,17 @@ bridge::Stats::MixerBarbellStats Mixer::gatherBarbellStats(const uint64_t iterat
bridge::Stats::MixerBarbellStats stats;
uint64_t idleTimestamp = iterationStartTime - utils::Time::sec * 2;

for (const auto& itBb : _engineBarbells) {
for (const auto& itBb : _engineBarbells)
{
bridge::Stats::BarbellStats barbellStats;
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::AUDIO_SEND] = fromPacketCounter(itBb.second->transport.getAudioSendCounters(idleTimestamp));
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::AUDIO_RECV] = fromPacketCounter(itBb.second->transport.getAudioReceiveCounters(idleTimestamp));
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::VIDEO_SEND] = fromPacketCounter(itBb.second->transport.getVideoSendCounters(idleTimestamp));
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::VIDEO_RECV] = fromPacketCounter(itBb.second->transport.getVideoReceiveCounters(idleTimestamp));
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::AUDIO_SEND] =
fromPacketCounter(itBb.second->transport.getAudioSendCounters(idleTimestamp));
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::AUDIO_RECV] =
fromPacketCounter(itBb.second->transport.getAudioReceiveCounters(idleTimestamp));
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::VIDEO_SEND] =
fromPacketCounter(itBb.second->transport.getVideoSendCounters(idleTimestamp));
barbellStats._stats[bridge::Stats::BarbellStats::PayloadType::VIDEO_RECV] =
fromPacketCounter(itBb.second->transport.getVideoReceiveCounters(idleTimestamp));
stats._stats.emplace(itBb.second->id, barbellStats);
}

Expand Down
4 changes: 1 addition & 3 deletions bridge/Mixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

#include "bridge/Barbell.h"
#include "bridge/RecordingStream.h"
#include "bridge/Stats.h"
#include "bridge/engine/ActiveTalker.h"
#include "bridge/engine/EngineMixer.h"
#include "bridge/engine/SimulcastLevel.h"
#include "bridge/engine/SsrcRewrite.h"
#include "bridge/Stats.h"
#include "logger/Logger.h"
#include "transport/Endpoint.h"
#include "transport/dtls/SrtpClient.h"
Expand Down Expand Up @@ -114,7 +114,6 @@ class Mixer
const utils::Optional<ice::IceRole>& iceRole,
bool rewriteSsrcs,
utils::Optional<uint32_t> idleTimeoutSeconds = utils::Optional<uint32_t>());
void allocateAudioBuffer(uint32_t ssrc);

bool addBundledAudioStream(std::string& outId,
const std::string& endpointId,
Expand Down Expand Up @@ -320,7 +319,6 @@ class Mixer
utils::IdGenerator& _idGenerator;
utils::SsrcGenerator& _ssrcGenerator;

std::unordered_map<uint32_t, std::unique_ptr<EngineMixer::AudioBuffer>> _audioBuffers;
std::unordered_map<std::string, std::unique_ptr<AudioStream>> _audioStreams;
std::unordered_map<std::string, std::unique_ptr<EngineAudioStream>> _audioEngineStreams;
std::unordered_map<std::string, std::unique_ptr<VideoStream>> _videoStreams;
Expand Down
14 changes: 1 addition & 13 deletions bridge/MixerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ Mixer* MixerManager::create(uint32_t lastN, bool useGlobalPort)
_config,
_sendAllocator,
_audioAllocator,
_mainAllocator,
audioSsrcs,
videoSsrcs,
lastN);
Expand Down Expand Up @@ -354,19 +355,6 @@ void MixerManager::finalizeEngineMixerRemoval(const std::string& mixerId)
logger::info("Mixer %s has been finalized", "MixerManager", mixerId.c_str());
}

void MixerManager::allocateAudioBuffer(EngineMixer& mixer, uint32_t ssrc)
{
std::lock_guard<std::mutex> locker(_configurationLock);

auto it = _mixers.find(mixer.getId());
if (it == _mixers.end())
{
return;
}

it->second->allocateAudioBuffer(ssrc);
}

void MixerManager::audioStreamRemoved(EngineMixer& mixer, const EngineAudioStream& audioStream)
{
std::lock_guard<std::mutex> locker(_configurationLock);
Expand Down
1 change: 0 additions & 1 deletion bridge/MixerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class MixerManager final : public MixerManagerAsync
// Async interface
bool post(utils::Function&& task) override { return _backgroundJobQueue.post(std::move(task)); }

void allocateAudioBuffer(EngineMixer& mixer, uint32_t ssrc) override;
void audioStreamRemoved(EngineMixer& mixer, const EngineAudioStream& audioStream) override;
void engineMixerRemoved(EngineMixer& mixer) override;
void freeVideoPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) override;
Expand Down
6 changes: 0 additions & 6 deletions bridge/MixerManagerAsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class MixerManagerAsync
virtual bool post(utils::Function&& task) = 0;

protected:
virtual void allocateAudioBuffer(EngineMixer& mixer, uint32_t ssrc) = 0;
virtual void audioStreamRemoved(EngineMixer& mixer, const EngineAudioStream& audioStream) = 0;
virtual void engineMixerRemoved(EngineMixer& mixer) = 0;
virtual void freeVideoPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) = 0;
Expand All @@ -56,11 +55,6 @@ class MixerManagerAsync
virtual void engineRecordingStopped(EngineMixer& mixer, const RecordingDescription& recordingDesc) = 0;

public:
bool asyncAllocateAudioBuffer(EngineMixer& mixer, uint32_t ssrc)
{
return post(utils::bind(&MixerManagerAsync::allocateAudioBuffer, this, std::ref(mixer), ssrc));
}

bool asyncAudioStreamRemoved(EngineMixer& mixer, const EngineAudioStream& audioStream);
bool asyncEngineMixerRemoved(EngineMixer& mixer);
bool asyncFreeVideoPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash);
Expand Down
200 changes: 35 additions & 165 deletions bridge/engine/AudioForwarderReceiveJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,81 +14,6 @@
namespace bridge
{

void AudioForwarderReceiveJob::onPacketDecoded(const int32_t decodedFrames, const uint8_t* decodedData)
{
if (decodedFrames > 0)
{
auto pcmPacket = memory::makeUniquePacket(_engineMixer.getAudioAllocator(), *_packet);
if (!pcmPacket)
{
return;
}
auto rtpHeader = rtp::RtpHeader::fromPacket(*pcmPacket);
const auto decodedPayloadLength = decodedFrames * codec::Opus::channelsPerFrame * codec::Opus::bytesPerSample;
memcpy(rtpHeader->getPayload(), decodedData, decodedPayloadLength);
pcmPacket->setLength(rtpHeader->headerLength() + decodedPayloadLength);

_engineMixer.onMixerAudioRtpPacketDecoded(_ssrcContext, std::move(pcmPacket));
return;
}

logger::error("Unable to decode opus packet, error code %d, ssrc %u, %s",
"AudioForwarderReceiveJob",
decodedFrames,
_ssrcContext.ssrc,
_sender->getLoggableId().c_str());
}

memory::UniqueAudioPacket AudioForwarderReceiveJob::makePcmPacket(const memory::Packet& opusPacket,
uint32_t sequenceNumber)
{
const auto opusRtpHeader = rtp::RtpHeader::fromPacket(opusPacket);
auto pcmPacket =
memory::makeUniquePacket(_engineMixer.getAudioAllocator(), opusPacket.get(), opusRtpHeader->headerLength());
if (!pcmPacket)
{
return nullptr;
}
auto pcmRtpHeader = rtp::RtpHeader::fromPacket(*pcmPacket);
pcmRtpHeader->sequenceNumber = sequenceNumber & 0xFFFFu;
pcmRtpHeader->payloadType = 10;
return pcmPacket;
}

void AudioForwarderReceiveJob::conceal(memory::AudioPacket& pcmPacket)
{
codec::OpusDecoder& decoder = *_ssrcContext.opusDecoder;
auto pcmHeader = rtp::RtpHeader::fromPacket(pcmPacket);
const auto decodedFrames = decoder.conceal(pcmHeader->getPayload());
if (decodedFrames > 0)
{
const auto decodedPayloadLength = decodedFrames * codec::Opus::channelsPerFrame * codec::Opus::bytesPerSample;
pcmPacket.setLength(pcmHeader->headerLength() + decodedPayloadLength);
}
else
{
pcmPacket.setLength(0);
}
}

void AudioForwarderReceiveJob::conceal(const memory::Packet& opusPacket, memory::AudioPacket& pcmPacket)
{
codec::OpusDecoder& decoder = *_ssrcContext.opusDecoder;
auto pcmHeader = rtp::RtpHeader::fromPacket(pcmPacket);
const auto opusHeader = rtp::RtpHeader::fromPacket(opusPacket);
const auto opusPayloadLength = opusPacket.getLength() - opusHeader->headerLength();
const auto decodedFrames = decoder.conceal(opusHeader->getPayload(), opusPayloadLength, pcmHeader->getPayload());
if (decodedFrames > 0)
{
const auto decodedPayloadLength = decodedFrames * codec::Opus::channelsPerFrame * codec::Opus::bytesPerSample;
pcmPacket.setLength(pcmHeader->headerLength() + decodedPayloadLength);
}
else
{
pcmPacket.setLength(0);
}
}

void AudioForwarderReceiveJob::decode(const memory::Packet& opusPacket, memory::AudioPacket& pcmPacket)
{
const auto framesInPacketBuffer =
Expand Down Expand Up @@ -156,94 +81,6 @@ bool AudioForwarderReceiveJob::unprotect(memory::Packet& opusPacket)
return true;
}

// @return -1 on error, otherwise audio level if requested.
int AudioForwarderReceiveJob::decodeOpus(const memory::Packet& opusPacket, bool needAudioLevel)
{
if (!_ssrcContext.opusDecoder)
{
logger::debug("Creating new opus decoder for ssrc %u in mixer %s, %s",
"AudioForwarderReceiveJob",
_ssrcContext.ssrc,
_engineMixer.getLoggableId().c_str(),
_sender->getLoggableId().c_str());
_ssrcContext.opusDecoder.reset(new codec::OpusDecoder());
_ssrcContext.opusPacketRate.reset(new utils::AvgRateTracker(0.1));
}

codec::OpusDecoder& decoder = *_ssrcContext.opusDecoder;

if (!decoder.isInitialized())
{
return -1;
}

auto rtpPacket = rtp::RtpHeader::fromPacket(*_packet);
if (!rtpPacket)
{
return -1;
}

if (decoder.hasDecoded() && _extendedSequenceNumber != decoder.getExpectedSequenceNumber())
{
const int32_t lossCount = static_cast<int32_t>(_extendedSequenceNumber - decoder.getExpectedSequenceNumber());
if (lossCount <= 0)
{
logger::debug("Old opus packet sequence %u expected %u, discarding",
"AudioForwarderReceiveJob",
_extendedSequenceNumber,
decoder.getExpectedSequenceNumber());
return -1;
}

logger::debug("Lost opus packet sequence %u expected %u, fec",
"AudioForwarderReceiveJob",
_extendedSequenceNumber,
decoder.getExpectedSequenceNumber());

const auto concealCount = std::min(5u, _extendedSequenceNumber - decoder.getExpectedSequenceNumber() - 1);
for (uint32_t i = 0; concealCount > 1 && i < concealCount - 1; ++i)
{
const uint32_t sequenceNumber = _extendedSequenceNumber - concealCount - 1 + i;
auto pcmPacket = makePcmPacket(*_packet, sequenceNumber);
if (!pcmPacket)
{
return -1;
}
conceal(*pcmPacket);
if (pcmPacket->getLength() > 0)
{
_engineMixer.onMixerAudioRtpPacketDecoded(_ssrcContext, std::move(pcmPacket));
}
}

auto pcmPacket = makePcmPacket(*_packet, _extendedSequenceNumber - 1);
if (!pcmPacket)
{
return -1;
}
conceal(*_packet, *pcmPacket);
if (pcmPacket->getLength() > 0)
{
_engineMixer.onMixerAudioRtpPacketDecoded(_ssrcContext, std::move(pcmPacket));
}
}

auto pcmPacket = makePcmPacket(*_packet, _extendedSequenceNumber);
if (!pcmPacket)
{
return -1;
}
decode(*_packet, *pcmPacket);
if (pcmPacket->getLength() == 0)
{
return -1;
}
_ssrcContext.opusPacketRate->update(1, utils::Time::getAbsoluteTime());
const int audioLevel = needAudioLevel ? codec::computeAudioLevel(*pcmPacket) : 0;
_engineMixer.onMixerAudioRtpPacketDecoded(_ssrcContext, std::move(pcmPacket));
return audioLevel;
}

int AudioForwarderReceiveJob::computeOpusAudioLevel(const memory::Packet& opusPacket)
{
if (!_ssrcContext.opusDecoder)
Expand Down Expand Up @@ -303,6 +140,8 @@ void AudioForwarderReceiveJob::run()
return;
}

const bool isSsrcUsed = _ssrcContext.isSsrcUsed.load();

bool silence = false;
utils::Optional<uint8_t> audioLevel;
utils::Optional<bool> isPtt;
Expand Down Expand Up @@ -342,6 +181,14 @@ void AudioForwarderReceiveJob::run()
{
if (_ssrcContext.markNextPacket)
{
if (_hasMixedAudioStreams && _ssrcContext.audioReceivePipe)
{
_ssrcContext.audioReceivePipe->onSilencedRtpPacket(_extendedSequenceNumber,
memory::makeUniquePacket(_engineMixer.getMainAllocator(),
_packet->get(),
rtpHeader->headerLength()),
utils::Time::getAbsoluteTime());
}
return;
}
// Let first silent packet through to clients and barbells
Expand Down Expand Up @@ -372,9 +219,32 @@ void AudioForwarderReceiveJob::run()
{
if (_hasMixedAudioStreams)
{
calculatedAudioLevel = decodeOpus(*_packet, !audioLevel.isSet());
if (!_ssrcContext.audioReceivePipe)
{
_ssrcContext.audioReceivePipe =
std::make_unique<codec::AudioReceivePipeline>(_ssrcContext.rtpMap.sampleRate,
20,
100,
_ssrcContext.rtpMap.audioLevelExtId.valueOr(255));
_ssrcContext.hasAudioReceivePipe = true;
}
if (isSsrcUsed)
{
_ssrcContext.audioReceivePipe->onRtpPacket(_extendedSequenceNumber,
memory::makeUniquePacket(_engineMixer.getMainAllocator(), *_packet),
utils::Time::getAbsoluteTime());
}
else
{
_ssrcContext.audioReceivePipe->onSilencedRtpPacket(_extendedSequenceNumber,
memory::makeUniquePacket(_engineMixer.getMainAllocator(),
_packet->get(),
rtpHeader->headerLength()),
utils::Time::getAbsoluteTime());
}
}
else if (_needAudioLevel && !audioLevel.isSet())

if (_needAudioLevel && !audioLevel.isSet())
{
calculatedAudioLevel = computeOpusAudioLevel(*_packet);
}
Expand Down
6 changes: 0 additions & 6 deletions bridge/engine/AudioForwarderReceiveJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ class AudioForwarderReceiveJob : public jobmanager::CountedJob
void run() override;

private:
int decodeOpus(const memory::Packet& opusPacket, bool needAudioLevel);
memory::UniqueAudioPacket decodeOpusPacket(const memory::Packet& opusPacket);
void onPacketDecoded(const int32_t decodedFrames, const uint8_t* decodedData);
memory::UniqueAudioPacket makePcmPacket(const memory::Packet& opusPacket, uint32_t sequenceNumber);
void conceal(memory::AudioPacket& pcmPacket);
void conceal(const memory::Packet& opusPacket, memory::AudioPacket& pcmPacket);
void decode(const memory::Packet& opusPacket, memory::AudioPacket& pcmPacket);
bool unprotect(memory::Packet& opusPacket);
int computeOpusAudioLevel(const memory::Packet& opusPacket);
Expand Down
Loading

0 comments on commit f7c277f

Please sign in to comment.