From 5bc874025efe1fb2c2c8e390d8d9cc3ae01848bd Mon Sep 17 00:00:00 2001 From: Mike Benner <36419818+mikebenner@users.noreply.github.com> Date: Wed, 23 Oct 2024 09:57:01 -0700 Subject: [PATCH] Revert "Update index.ts" This reverts commit 54662543be3f4b9d60878184e5640d354b577ee3. --- index.ts | 505 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 478 insertions(+), 27 deletions(-) diff --git a/index.ts b/index.ts index 55b64e6..fc1f942 100644 --- a/index.ts +++ b/index.ts @@ -13,7 +13,7 @@ import { nodeProfilingIntegration } from "@sentry/profiling-node"; import { createClient } from "redis"; import { env } from "process"; -// Generate a pseudo UUID to use as an instance ID +// generate a pseduo uuid kinda thing to use as an instance id const INSTANCE_ID = (() => { return crypto.randomBytes(4).toString("hex"); })(); @@ -36,12 +36,14 @@ const logger = { }, }; -// Initialize Sentry Sentry.init({ environment: process.env.ENVIRONMENT || "development", integrations: [nodeProfilingIntegration()], - tracesSampleRate: 1.0, // Capture 100% of transactions - profilesSampleRate: 1.0, // Set profiling sample rate + // Performance Monitoring + tracesSampleRate: 1.0, // Capture 100% of the transactions + + // Set sampling rate for profiling - this is relative to tracesSampleRate + profilesSampleRate: 1.0, }); Sentry.setTag("instance_id", INSTANCE_ID); @@ -62,26 +64,21 @@ const mqttBrokerUrl = "mqtt://mqtt.bayme.sh"; const mqttUsername = "meshdev"; const mqttPassword = "large4cats"; -// Create Redis Client with Azure Cache for Redis URL and SSL enabled const redisClient = createClient({ url: process.env.REDIS_URL, - socket: { - tls: true, // Enable SSL/TLS - }, }); (async () => { if (process.env.REDIS_ENABLED === "true") { - // Connect to the Redis server + // Connect to redis server await redisClient.connect(); - logger.info(`Connected to Redis instance at ${process.env.REDIS_URL}`); logger.info(`Setting active instance id to ${INSTANCE_ID}`); - await redisClient.set(`baymesh:active`, INSTANCE_ID); + redisClient.set(`baymesh:active`, INSTANCE_ID); } })(); const decryptionKeys = [ - "1PG7OiApB1nwvP+rz05pAQ==", // Add default "AQ==" decryption key + "1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key ]; const nodeDB = JSON.parse(fs.readFileSync("./nodeDB.json").toString()); @@ -107,20 +104,35 @@ const updateNodeDB = ( nodeInfoGenericObj.updatedAt = new Date().getTime(); redisClient.json .set(`baymesh:nodeinfo:${node}`, "$", nodeInfoGenericObj) - .catch(async (err) => { - // Handle Redis key type mismatch - const result = await redisClient.type(`baymesh:nodeinfo:${node}`); - logger.info(result); - if (result === "string") { - await redisClient.del(`baymesh:nodeinfo:${node}`); - await redisClient.json.set( - `baymesh:nodeinfo:${node}`, - "$", - nodeInfoGenericObj, - ); - logger.info("Deleted and re-added node info for: " + node); - } - logger.error(`Redis key: baymesh:nodeinfo:${node} ${err}`); + .then(() => { + // redisClient.json + // .get(`baymesh:nodeinfo:${node}`) // , { path: "$.hwModel" } + // .then((data) => { + // if (data) { + // logger.info(JSON.stringify(data)); + // } + // }); + }) + .catch((err) => { + // console.log(nodeInfoGenericObj); + // if (err === "Error: Existing key has wrong Redis type") { + redisClient.type(`baymesh:nodeinfo:${node}`).then((result) => { + logger.info(result); + if (result === "string") { + redisClient.del(`baymesh:nodeinfo:${node}`).then(() => { + redisClient.json + .set(`baymesh:nodeinfo:${node}`, "$", nodeInfoGenericObj) + .then(() => { + logger.info("deleted and re-added node info for: " + node); + }) + .catch((err) => { + logger.error(err); + }); + }); + } + }); + // } + logger.error(`redis key: baymesh:nodeinfo:${node} ${err}`); }); } fs.writeFileSync( @@ -128,6 +140,7 @@ const updateNodeDB = ( JSON.stringify(nodeDB, null, 2), ); } catch (err) { + // logger.error(err.message); Sentry.captureException(err); } }; @@ -138,6 +151,7 @@ const isInIgnoreDB = (node: string) => { const getNodeInfos = async (nodeIds: string[], debug: boolean) => { try { + // const foo = nodeIds.slice(0, nodeIds.length - 1); nodeIds = Array.from(new Set(nodeIds)); const nodeInfos = await redisClient.json.mGet( nodeIds.map((nodeId) => `baymesh:nodeinfo:${nodeId2hex(nodeId)}`), @@ -146,26 +160,47 @@ const getNodeInfos = async (nodeIds: string[], debug: boolean) => { if (debug) { logger.debug(JSON.stringify(nodeInfos)); } + const formattedNodeInfos = nodeInfos.flat().reduce((acc, item) => { if (item && item.id) { acc[item.id] = item; } return acc; }, {}); + + // const formattedNodeInfos = nodeInfos.reduce((acc, [info]) => { + // if (info && info.id) { + // acc[info.id] = info; + // } + // return acc; + // }, {}); if (Object.keys(formattedNodeInfos).length !== nodeIds.length) { + // figure out which nodes are missing from nodeInfo and print them + // console.log( + // "ABC", + // nodeInfos[0].map((nodeInfo) => nodeInfo.id), + // ); + // console.log(Object.keys(formattedNodeInfos).length, nodeIds.length); const missingNodes = nodeIds.filter((nodeId) => { return formattedNodeInfos[nodeId] === undefined; }); logger.info("Missing nodeInfo for nodes: " + missingNodes.join(",")); } + // console.log("Feep", nodeInfos); return formattedNodeInfos; } catch (err) { + // logger.error(err.message); Sentry.captureException(err); } return {}; }; const getNodeName = (nodeId: string | number) => { + // redisClient.json.get(`baymesh:nodeinfo:${nodeId}`).then((nodeInfo) => { + // if (nodeInfo) { + // logger.info(nodeInfo); + // } + // }); return nodeDB[nodeId2hex(nodeId)] || "Unknown"; }; @@ -188,7 +223,7 @@ const prettyNodeName = (nodeId: string | number) => { const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); -// Load protobufs +// load protobufs const root = new protobufjs.Root(); root.resolvePath = (origin, target) => path.join(__dirname, "src/protobufs", target); @@ -196,6 +231,7 @@ root.loadSync("meshtastic/mqtt.proto"); const Data = root.lookupType("Data"); const ServiceEnvelope = root.lookupType("ServiceEnvelope"); const User = root.lookupType("User"); +const Position = root.lookupType("Position"); if (!process.env.DISCORD_WEBHOOK_URL) { logger.error("DISCORD_WEBHOOK_URL not set"); @@ -212,6 +248,9 @@ function sendDiscordMessage(webhookUrl: string, payload: any) { return axios .post(webhookUrl, data) + .then(() => { + // console.log("Message sent successfully"); + }) .catch((error) => { logger.error( `[error] Could not send discord message: ${error.response.status}`, @@ -233,6 +272,7 @@ const createDiscordMessage = async (packetGroup, text) => { const from = nodeId2hex(packet.from); const nodeIdHex = nodeId2hex(from); + // discard text messages in the form of "seq 6034" "seq 6025" if (text.match(/^seq \d+$/)) { return; } @@ -246,6 +286,7 @@ const createDiscordMessage = async (packetGroup, text) => { return; } + // ignore packets older than 5 minutes if (new Date(packet.rxTime * 1000) < new Date(Date.now() - 5 * 60 * 1000)) { logger.info( `MessageId: ${packetGroup.id} Ignoring old message from ${prettyNodeName( @@ -253,3 +294,413 @@ const createDiscordMessage = async (packetGroup, text) => { )} to ${prettyNodeName(to)} : ${text}`, ); } + + if (process.env.ENVIRONMENT === "production" && to !== "ffffffff") { + logger.info( + `MessageId: ${packetGroup.id} Not to public channel: ${packetGroup.serviceEnvelopes.map((envelope) => envelope.topic)}`, + ); + return; + } + + if ( + packetGroup.serviceEnvelopes.filter((envelope) => + home_topics.some((home_topic) => envelope.topic.startsWith(home_topic)), + ).length === 0 + ) { + logger.info( + `MessageId: ${packetGroup.id} No packets found in topic: ${packetGroup.serviceEnvelopes.map((envelope) => envelope.topic)}`, + ); + return; + } + + let nodeInfos = await getNodeInfos( + packetGroup.serviceEnvelopes + .map((se) => se.gatewayId.replace("!", "")) + .concat(from), + false, + ); + + let avatarUrl = pfpDb["default"]; + if (Object.hasOwn(pfpDb, nodeIdHex)) { + avatarUrl = pfpDb[nodeIdHex]; + } + + const maxHopStart = packetGroup.serviceEnvelopes.reduce((acc, se) => { + const hopStart = se.packet.hopStart; + return hopStart > acc ? hopStart : acc; + }, 0); + + // console.log("maxHopStart", maxHopStart); + + const content = { + username: "SoCalMesh Parrot", + avatar_url: + "https://cdn.discordapp.com/app-icons/1240017058046152845/295e77bec5f9a44f7311cf8723e9c332.png", + embeds: [ + { + url: `https://meshtastic.liamcottle.net/?node_id=${packet.from}`, + color: 6810260, + timestamp: new Date(packet.rxTime * 1000).toISOString(), + + author: { + name: `${nodeInfos[nodeIdHex] ? nodeInfos[nodeIdHex].longName : "Unknown"}`, + url: `https://meshtastic.liamcottle.net/?node_id=${packet.from}`, + icon_url: avatarUrl, + }, + title: `${nodeInfos[nodeIdHex] ? nodeInfos[nodeIdHex].shortName : "UNK"}`, + description: text, + fields: [ + // { + // name: `${nodeInfos[nodeIdHex] ? nodeInfos[nodeIdHex].shortName : "UNK"}`, + // value: text, + // }, + // { + // name: "Node ID", + // value: `${nodeIdHex}`, + // inline: true, + // }, + { + name: "Packet", + value: `[${packetGroup.id.toString(16)}](https://meshview.armooo.net/packet/${packetGroup.id})`, + inline: true, + }, + { + name: "Channel", + value: `${packetGroup.serviceEnvelopes[0].channelId}`, + inline: true, + }, + ...packetGroup.serviceEnvelopes + .filter( + (value, index, self) => + self.findIndex((t) => t.gatewayId === value.gatewayId) === + index, + ) + .map((envelope) => { + const gatewayDelay = + envelope.mqttTime.getTime() - packetGroup.time.getTime(); + + if ( + envelope.gatewayId === "!75f1804c" || + envelope.gatewayId === "!3b46b95c" + ) { + // console.log(envelope); + } + + let gatewayDisplaName = envelope.gatewayId.replace("!", ""); + if (nodeInfos[envelope.gatewayId.replace("!", "")]) { + gatewayDisplaName = + // nodeInfos[envelope.gatewayId.replace("!", "")].shortName + + // " - " + + nodeInfos[envelope.gatewayId.replace("!", "")].shortName; //+ + // " " + + // envelope.gatewayId.replace("!", ""); + } + + let hopText = `${envelope.packet.hopStart - envelope.packet.hopLimit}/${envelope.packet.hopStart} hops`; + + if ( + envelope.packet.hopStart === 0 && + envelope.packet.hopLimit === 0 + ) { + hopText = `${envelope.packet.rxSnr} / ${envelope.packet.rxRssi} dBm`; + } else if ( + envelope.packet.hopStart - envelope.packet.hopLimit === + 0 + ) { + hopText = `${envelope.packet.rxSnr} / ${envelope.packet.rxRssi} dBm ${envelope.packet.hopStart - envelope.packet.hopLimit}/${envelope.packet.hopStart} hops`; + } + + if (envelope.gatewayId.replace("!", "") === nodeIdHex) { + hopText = `Self Gated ${envelope.packet.hopStart} hopper`; + } + + if (maxHopStart !== envelope.packet.hopStart) { + hopText = `:older_man: ${envelope.packet.hopStart - envelope.packet.hopLimit}/${envelope.packet.hopStart} hops`; + } + + if (envelope.mqttServer === "public") { + hopText = `:poop: ${envelope.packet.hopStart - envelope.packet.hopLimit}/${envelope.packet.hopStart} hops`; + } + + return { + name: `Gateway`, + value: `[${gatewayDisplaName} (${hopText})](https://meshview.armooo.net/packet_list/${nodeHex2id(envelope.gatewayId.replace("!", ""))})${gatewayDelay > 0 ? " (" + gatewayDelay + "ms)" : ""}`, + inline: true, + }; + }), + ], + }, + ], + }; + + //console.log(packetGroup, packetGroup.serviceEnvelopes); + + logger.info( + `MessageId: ${packetGroup.id} Received message from ${prettyNodeName(from)} to ${prettyNodeName(to)} : ${text}`, + ); + + if ( + packetGroup.serviceEnvelopes.filter((envelope) => + ba_home_topics.some((home_topic) => + envelope.topic.startsWith(home_topic), + ), + ).length > 0 + ) { + sendDiscordMessage(baWebhookUrl, content); + } + + if ( + packetGroup.serviceEnvelopes.filter((envelope) => + sv_home_topics.some((home_topic) => + envelope.topic.startsWith(home_topic), + ), + ).length > 0 + ) { + if (svWebhookUrl) { + sendDiscordMessage(svWebhookUrl, content); + } + } + } catch (err) { + logger.error("Error: " + String(err)); + Sentry.captureException(err); + } +}; + +// const client = mqtt.connect(mqttBrokerUrl, { +// username: mqttUsername, +// password: mqttPassword, +// }); + +const baymesh_client = mqtt.connect(mqttBrokerUrl, { + username: mqttUsername, + password: mqttPassword, +}); + +const ba_home_topics = [ + "msh/US/socalmesh", + "msh/US/SoCalMesh", + "msh/US/CA/socalmesh", + "msh/US/CA/SoCalMesh", +]; + +const sv_home_topics = [ + "msh/US/sacvalley", + "msh/US/SacValley", + "msh/US/CA/sacvalley", + "msh/US/CA/SacValley", +]; + +// home_topics is both ba and sv +const home_topics = ba_home_topics.concat(sv_home_topics); + +const nodes_to_log_all_positions = [ + "fa6dc348", // me + "3b46b95c", // ohr + "33686ed8", // balloon +]; + +const subbed_topics = ["msh/US"]; + +// run every 5 seconds and pop off from the queue +const processing_timer = setInterval(() => { + if (process.env.REDIS_ENABLED === "true") { + redisClient.get(`baymesh:active`).then((active_instance) => { + if (active_instance && active_instance !== INSTANCE_ID) { + logger.error( + `Stopping RATM instance; active_instance: ${active_instance} this instance: ${INSTANCE_ID}`, + ); + clearInterval(processing_timer); // do we want to kill it so fast? what about things in the queue? + // subbed_topics.forEach((topic) => client.unsubscribe(topic)); + subbed_topics.forEach((topic) => baymesh_client.unsubscribe(topic)); + } + }); + } + const packetGroups = meshPacketQueue.popPacketGroupsOlderThan( + Date.now() - grouping_duration, + ); + packetGroups.forEach((packetGroup) => { + processPacketGroup(packetGroup); + }); +}, 5000); + +function sub(the_client: mqtt.MqttClient, topic: string) { + the_client.subscribe(`${topic}/#`, (err) => { + if (!err) { + logger.info(`Subscribed to ${topic}/#`); + } else { + logger.error(`Subscription error: ${err.message}`); + } + }); +} + +// subscribe to everything when connected +baymesh_client.on("connect", () => { + logger.info(`Connected to Private MQTT broker`); + subbed_topics.forEach((topic) => sub(baymesh_client, topic)); +}); + +// handle message received +baymesh_client.on("message", async (topic: string, message: any) => { + try { + if (topic.includes("msh")) { + if (!topic.includes("/json")) { + if (topic.includes("/stat/")) { + return; + } + // decode service envelope + let envelope; + try { + envelope = ServiceEnvelope.decode(message); + } catch (envDecodeErr) { + if ( + String(envDecodeErr).indexOf("invalid wire type 7 at offset 1") === + -1 + ) { + logger.error( + `MessageId: Error decoding service envelope: ${envDecodeErr}`, + ); + } + return; + } + if (!envelope || !envelope.packet) { + return; + } + + if ( + home_topics.some((home_topic) => topic.startsWith(home_topic)) || + nodes_to_log_all_positions.includes( + nodeId2hex(envelope.packet.from), + ) || + meshPacketQueue.exists(envelope.packet.id) + ) { + // return; + } else { + // logger.info("Message received on topic: " + topic); + return; + } + + // attempt to decrypt encrypted packets + const isEncrypted = envelope.packet.encrypted?.length > 0; + if (isEncrypted) { + const decoded = decrypt(envelope.packet); + if (decoded) { + envelope.packet.decoded = decoded; + } + } + + if (process.env.REDIS_ENABLED === "true") { + const redisKey = `baymesh:envelope:${nodeId2hex(envelope.packet.id)}:${nodeId2hex(envelope.gatewayId.replace("!", ""))}:${nodeId2hex(envelope.packet.from)}`; + const seenBefore = await redisClient.exists(redisKey); + if (seenBefore) { + // logger.debug( + // `RedisCache: Already received envelope with baymesh:envelope:${nodeId2hex(envelope.packet.id)}:${nodeId2hex(envelope.gatewayId.replace("!", ""))}:${nodeId2hex(envelope.packet.from)}`, + // ); + return; + } + + //logger.debug(`setting ${redisKey}`); + + redisClient.set(redisKey, 1); + } else { + if (cache.exists(shaHash(envelope))) { + // logger.debug( + // `FifoCache: Already received envelope with hash ${shaHash(envelope)} MessageId: ${envelope.packet.id} Gateway: ${envelope.gatewayId}`, + // ); + return; + } + + if (cache.add(shaHash(envelope))) { + // periodically print the nodeDB to the console + //console.log(JSON.stringify(nodeDB)); + } + } + + meshPacketQueue.add(envelope, topic, "baymesh"); + } + } + } catch (err) { + logger.error("Error: " + String(err)); + Sentry.captureException(err); + } +}); + +function shaHash(serviceEnvelope: ServiceEnvelope) { + const hash = crypto.createHash("sha256"); + hash.update(JSON.stringify(serviceEnvelope)); + return hash.digest("hex"); +} + +function processPacketGroup(packetGroup: PacketGroup) { + const packet = packetGroup.serviceEnvelopes[0].packet; + const portnum = packet?.decoded?.portnum; + + if (portnum === 1) { + processTextMessage(packetGroup); + } else if (portnum === 3) { + // we used to insert positions in to the postresdb, but no more this is a just a logger + } else if (portnum === 4) { + const user = User.decode(packet.decoded.payload); + const from = nodeId2hex(packet.from); + updateNodeDB(from, user.longName, user, packet.hopStart); + } else { + // logger.debug( + // `MessageId: ${packetGroup.id} Unknown portnum ${portnum} from ${prettyNodeName( + // packet.from, + // )}`, + // ); + } +} + +function createNonce(packetId, fromNode) { + // Expand packetId to 64 bits + const packetId64 = BigInt(packetId); + + // Initialize block counter (32-bit, starts at zero) + const blockCounter = 0; + + // Create a buffer for the nonce + const buf = Buffer.alloc(16); + + // Write packetId, fromNode, and block counter to the buffer + buf.writeBigUInt64LE(packetId64, 0); + buf.writeUInt32LE(fromNode, 8); + buf.writeUInt32LE(blockCounter, 12); + + return buf; +} + +/** + * References: + * https://github.com/crypto-smoke/meshtastic-go/blob/develop/radio/aes.go#L42 + * https://github.com/pdxlocations/Meshtastic-MQTT-Connect/blob/main/meshtastic-mqtt-connect.py#L381 + */ +function decrypt(packet) { + // attempt to decrypt with all available decryption keys + for (const decryptionKey of decryptionKeys) { + try { + // console.log(`using decryption key: ${decryptionKey}`); + // convert encryption key to buffer + const key = Buffer.from(decryptionKey, "base64"); + + // create decryption iv/nonce for this packet + const nonceBuffer = createNonce(packet.id, packet.from); + + // create aes-128-ctr decipher + const decipher = crypto.createDecipheriv("aes-128-ctr", key, nonceBuffer); + + // decrypt encrypted packet + const decryptedBuffer = Buffer.concat([ + decipher.update(packet.encrypted), + decipher.final(), + ]); + + // parse as data message + return Data.decode(decryptedBuffer); + } catch (e) { + // console.log(e); + } + } + + // couldn't decrypt + return null; +}