Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

emit messages to specific address #1014

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
13 changes: 6 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"bugs": "https://github.com/decentraland/js-sdk-toolchain/issues",
"dependencies": {
"@actions/core": "^1.10.0",
"@dcl/protocol": "1.0.0-10955038136.commit-7fe9554",
"@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-11115539131.commit-ebab575.tgz",
"@dcl/quickjs-emscripten": "^0.21.0-3680274614.commit-1808aa1",
"@dcl/ts-proto": "1.153.0",
"@types/fs-extra": "^9.0.12",
Expand Down
6 changes: 3 additions & 3 deletions packages/@dcl/sdk/src/network/binary-message-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ export enum CommsMessage {
RES_CRDT_STATE = 3
}

export function BinaryMessageBus<T extends CommsMessage>(send: (message: Uint8Array) => void) {
export function BinaryMessageBus<T extends CommsMessage>(send: (message: Uint8Array, toPeerAddress?: string) => void) {
const mapping: Map<T, (value: Uint8Array, sender: string) => void> = new Map()
return {
on: <K extends T>(message: K, callback: (value: Uint8Array, sender: string) => void) => {
mapping.set(message, callback)
},
emit: <K extends T>(message: K, value: Uint8Array) => {
send(craftCommsMessage<T>(message, value))
emit: <K extends T>(message: K, value: Uint8Array, toPeerAddress?: string) => {
send(craftCommsMessage<T>(message, value), toPeerAddress)
},
__processMessages: (messages: Uint8Array[]) => {
for (const message of messages) {
Expand Down
30 changes: 24 additions & 6 deletions packages/@dcl/sdk/src/network/message-bus-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,30 @@ export function addSyncTransport(
const entityDefinitions = entityUtils(engine, myProfile)

// List of MessageBuss messsages to be sent on every frame to comms
const pendingMessageBusMessagesToSend: Uint8Array[] = []
const binaryMessageBus = BinaryMessageBus((message) => pendingMessageBusMessagesToSend.push(message))
const pendingMessageBusMessagesToSend: { data: Uint8Array[]; address?: string }[] = []
const binaryMessageBus = BinaryMessageBus((data, address) => {
const pendingAddressMessage = address && pendingMessageBusMessagesToSend.find(($) => $.address === address)
if (pendingAddressMessage) {
pendingAddressMessage.data.push(data)
} else {
pendingMessageBusMessagesToSend.push({ data: [data], address })
}
})

function getMessagesToSend() {
function getMessagesToSend(): [Uint8Array[], typeof pendingMessageBusMessagesToSend] {
const messages = [...pendingMessageBusMessagesToSend]
pendingMessageBusMessagesToSend.length = 0
return messages
const broadcastMessages: Uint8Array[] = []
const messagesToAddress: typeof pendingMessageBusMessagesToSend = []

for (const message of messages) {
if (!message.address) {
broadcastMessages.push(...message.data)
} else {
messagesToAddress.push(message)
}
}
return [broadcastMessages, messagesToAddress]
}

let transportInitialzed = false
Expand All @@ -45,8 +62,9 @@ export function addSyncTransport(
console.log(...Array.from(serializeCrdtMessages('[NetworkMessage sent]:', message, engine)))
binaryMessageBus.emit(CommsMessage.CRDT, message)
}
const messages = getMessagesToSend()
const response = await sendBinary({ data: messages })
const [broadcastMessages, messagesToAddress] = getMessagesToSend()

const response = await sendBinary({ data: broadcastMessages, messagesToAddress })
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe peerMessages its a better name ?

binaryMessageBus.__processMessages(response.data)
transportInitialzed = true
},
Expand Down
Loading