Skip to content

Commit

Permalink
Monitor visitors' codec lists, and aggregate them into a conference p…
Browse files Browse the repository at this point in the history
…roperty. (#1137)
  • Loading branch information
JonathanLennox authored Feb 27, 2024
1 parent be8a115 commit 49cc46e
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ interface ChatRoomMember {
/** The statistics id if any. */
val statsId: String?

/** The supported video codecs if any */
val videoCodecs: List<String>?

/**
* The list of features advertised as XMPP capabilities. Note that although the features are cached (XEP-0115),
* the first time [features] is accessed it may block waiting for a disco#info response!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.xmpp.extensions.jitsimeet.AudioMutedExtension
import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.StartMutedPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.StatsId
Expand All @@ -36,6 +37,7 @@ import org.jitsi.xmpp.extensions.jitsimeet.VideoMutedExtension
import org.jivesoftware.smack.packet.Presence
import org.jivesoftware.smack.packet.StandardExtensionElement
import org.jivesoftware.smackx.caps.packet.CapsExtension
import org.json.simple.JSONArray
import org.jxmpp.jid.EntityFullJid
import org.jxmpp.jid.Jid

Expand Down Expand Up @@ -68,6 +70,8 @@ class ChatRoomMemberImpl(
private set
override var statsId: String? = null
private set
override var videoCodecs: List<String>? = null
private set
override var isAudioMuted = true
private set
override var isVideoMuted = true
Expand Down Expand Up @@ -150,6 +154,7 @@ class ChatRoomMemberImpl(
fun processPresence(presence: Presence) {
require(presence.from == occupantJid) { "Ignoring presence for a different member: ${presence.from}" }

val firstPresence = (this.presence == null)
this.presence = presence
presence.getExtension(UserInfoPacketExt::class.java)?.let {
val newStatus = it.isRobot
Expand Down Expand Up @@ -185,12 +190,14 @@ class ChatRoomMemberImpl(
isJibri = false
}

val oldRole = role
chatRoom.getOccupant(this)?.let { role = fromSmack(it.role, it.affiliation) }
if ((role == MemberRole.VISITOR) != (oldRole == MemberRole.VISITOR)) {
var newRole: MemberRole = MemberRole.VISITOR
chatRoom.getOccupant(this)?.let { newRole = fromSmack(it.role, it.affiliation) }
if (!firstPresence && (role == MemberRole.VISITOR) != (newRole == MemberRole.VISITOR)) {
// This will mess up various member counts
// TODO: Should we try to update them, instead?
logger.warn("Member role changed from $oldRole to $role - not supported!")
logger.warn("Member role changed from $role to $newRole - not supported!")
} else {
role = newRole
}

isTranscriber = isJigasi && presence.getExtension(TranscriptionStatusExtension::class.java) != null
Expand All @@ -209,6 +216,36 @@ class ChatRoomMemberImpl(
presence.getExtension(StatsId::class.java)?.let {
statsId = it.statsId
}

presence.getExtension(JitsiParticipantCodecList::class.java)?.let {
if (!firstPresence && it.codecs != videoCodecs) {
logger.warn("Video codec list changed from $videoCodecs to ${it.codecs} - not supported!")
} else {
if (!it.codecs.contains("vp8")) {
if (firstPresence) {
logger.warn("Video codec list {${it.codecs}} does not contain vp8! Adding manually.")
}
videoCodecs = it.codecs + "vp8"
} else {
videoCodecs = it.codecs
}
}
} ?: // Older clients sent a single codec in codecType rather than all supported ones in codecList
presence.getExtensionElement("jitsi_participant_codecType", "jabber:client")?.let {
if (it is StandardExtensionElement) {
val codec = it.text.lowercase()
val codecList = if (codec == "vp8") {
listOf(codec)
} else {
listOf(codec, "vp8")
}
if (!firstPresence && codecList != videoCodecs) {
logger.warn("Video codec list changed from $videoCodecs to $codecList - not supported!")
} else {
videoCodecs = codecList
}
}
}
}

/**
Expand Down Expand Up @@ -240,6 +277,7 @@ class ChatRoomMemberImpl(
this["is_jibri"] = isJibri
this["is_jigasi"] = isJigasi
this["role"] = role.toString()
this["video_codecs"] = JSONArray().apply { videoCodecs?.let { addAll(it) } }
this["stats_id"] = statsId.toString()
this["is_audio_muted"] = isAudioMuted
this["is_video_muted"] = isVideoMuted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public class JitsiMeetConferenceImpl
return null;
});

/**
* The aggregated count of visitors' supported codecs
*/
private final PreferenceAggregator visitorCodecs;

/**
* The {@link JibriRecorder} instance used to provide live streaming through
* Jibri.
Expand Down Expand Up @@ -290,6 +295,16 @@ public JitsiMeetConferenceImpl(
TimeUnit.MILLISECONDS);


visitorCodecs = new PreferenceAggregator(
logger,
(codecs) -> {
setConferenceProperty(
ConferenceProperties.KEY_VISITOR_CODECS,
String.join(",", codecs)
);
return null;
});

logger.info("Created new conference.");
}

Expand Down Expand Up @@ -816,7 +831,7 @@ private void inviteChatMember(ChatRoomMember chatRoomMember, boolean justJoined)
}
else if (participant.getChatMember().getRole() == MemberRole.VISITOR)
{
visitorAdded();
visitorAdded(participant.getChatMember().getVideoCodecs());
}
}

Expand Down Expand Up @@ -1042,7 +1057,7 @@ private void terminateParticipant(
}
else if (removed.getChatMember().getRole() == MemberRole.VISITOR)
{
visitorRemoved();
visitorRemoved(removed.getChatMember().getVideoCodecs());
}
}
}
Expand Down Expand Up @@ -1579,6 +1594,7 @@ else if (member.isJigasi())
}
}
o.put("visitor_count", visitorCount);
o.put("visitor_codecs", visitorCodecs.debugState());
o.put("participant_count", participantCount);
o.put("jibri_count", jibriCount);
o.put("jigasi_count", jigasiCount);
Expand Down Expand Up @@ -2013,15 +2029,23 @@ private void rescheduleSingleParticipantTimeout()
}

/** Called when a new visitor has been added to the conference. */
private void visitorAdded()
private void visitorAdded(List<String> codecs)
{
visitorCount.adjustValue(+1);
if (codecs != null)
{
visitorCodecs.addPreference(codecs);
}
}

/** Called when a new visitor has been added to the conference. */
private void visitorRemoved()
private void visitorRemoved(List<String> codecs)
{
visitorCount.adjustValue(-1);
if (codecs != null)
{
visitorCodecs.removePreference(codecs);
}
}

/**
Expand Down
150 changes: 150 additions & 0 deletions jicofo/src/main/kotlin/org/jitsi/jicofo/util/PreferenceAggregator.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Jicofo, the Jitsi Conference Focus.
*
* Copyright @ 2015-Present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.jicofo.util

import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import org.json.simple.JSONArray

/** Aggregate lists of preferences coming from a large group of people, such that the resulting aggregated
* list consists of preference items supported by everyone, and in a rough consensus of preference order.
*
* The intended use case is maintaining the list of supported codecs for conference visitors.
*
* Preference orders are aggregated using the Borda count; this isn't theoretically optimal, but it should be
* good enough, and it's computationally cheap.
*/
class PreferenceAggregator(
parentLogger: Logger,
private val onChanged: (List<String>) -> Unit
) {
private val logger = createChildLogger(parentLogger)
private val lock = Any()

var aggregate: List<String> = emptyList()
private set

var count = 0
private set

private val values = mutableMapOf<String, ValueInfo>()

/**
* Add a preference to the aggregator.
*/
fun addPreference(prefs: List<String>) {
val distinctPrefs = prefs.distinct()
if (distinctPrefs != prefs) {
logger.warn("Preferences $prefs contains repeated values")
}
synchronized(lock) {
count++
distinctPrefs.forEachIndexed { index, element ->
val info = values.computeIfAbsent(element) { ValueInfo() }
info.count++
info.rankAggregate += index
}
updateAggregate()
}
}

/**
* Remove a preference from the aggregator.
*/
fun removePreference(prefs: List<String>) {
val distinctPrefs = prefs.distinct()
if (distinctPrefs != prefs) {
logger.warn("Preferences $prefs contains repeated values")
}
synchronized(lock) {
count--
check(count >= 0) {
"Preference count $count should not be negative"
}
distinctPrefs.forEachIndexed { index, element ->
val info = values[element]
check(info != null) {
"Preference info for $element should exist when preferences are being removed"
}
info.count--
check(info.count >= 0) {
"Preference count for $element ${info.count} should not be negative"
}
info.rankAggregate -= index
check(info.rankAggregate >= 0) {
"Preference rank aggregate for $element ${info.rankAggregate} should not be negative"
}
if (info.count == 0) {
check(info.rankAggregate == 0) {
"Preference rank aggregate for $element ${info.rankAggregate} should be zero " +
"when preference count is 0"
}
values.remove(element)
}
}
updateAggregate()
}
}

fun reset() {
synchronized(lock) {
aggregate = emptyList()
count = 0
values.clear()
}
}

fun debugState() = OrderedJsonObject().apply {
synchronized(lock) {
put("count", count)
put(
"ranks",
OrderedJsonObject().apply {
this@PreferenceAggregator.values.asSequence()
.sortedBy { it.value.rankAggregate }
.forEach { put(it.key, it.value.debugState()) }
}
)
put("aggregate", JSONArray().apply { addAll(aggregate) })
}
}

private fun updateAggregate() {
val newAggregate = values.asSequence()
.filter { it.value.count == count }
.sortedBy { it.value.rankAggregate }
.map { it.key }
.toList()
if (aggregate != newAggregate) {
aggregate = newAggregate
/* ?? Do we need to drop the lock before calling this? */
onChanged(aggregate)
}
}

private class ValueInfo {
var count = 0
var rankAggregate = 0

fun debugState() = OrderedJsonObject().apply {
put("count", count)
put("rank_aggregate", rankAggregate)
}
}
}
6 changes: 6 additions & 0 deletions jicofo/src/main/kotlin/org/jitsi/jicofo/xmpp/Smack.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.jitsi.xmpp.extensions.jitsimeet.ConferenceIqProvider
import org.jitsi.xmpp.extensions.jitsimeet.FeatureExtension
import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension
import org.jitsi.xmpp.extensions.jitsimeet.IceStatePacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.JsonMessageExtension
import org.jitsi.xmpp.extensions.jitsimeet.LoginUrlIqProvider
Expand Down Expand Up @@ -119,6 +120,11 @@ fun registerXmppExtensions() {
StatsId.NAMESPACE,
StatsId.Provider()
)
ProviderManager.addExtensionProvider(
JitsiParticipantCodecList.ELEMENT,
JitsiParticipantCodecList.NAMESPACE,
DefaultPacketExtensionProvider(JitsiParticipantCodecList::class.java)
)

// Add the extensions used for handling the inviting of transcriber
ProviderManager.addExtensionProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class JibriChatRoomMember(
override val isVideoMuted: Boolean get() = TODO("Not yet implemented")
override val region: String? get() = TODO("Not yet implemented")
override val statsId: String? get() = TODO("Not yet implemented")
override val videoCodecs: List<String>? get() = TODO("Not yet implemented")
override val features: Set<Features> get() = TODO("Not yet implemented")
override val debugState: OrderedJsonObject get() = TODO("Not yet implemented")

Expand Down
Loading

0 comments on commit 49cc46e

Please sign in to comment.