Skip to content

Commit

Permalink
Added code for handling custom-ie
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaderi committed Nov 4, 2024
1 parent ab48c6e commit f3d471d
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 93 deletions.
1 change: 1 addition & 0 deletions include/ZMQParserInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class ZMQParserInterface : public ParserInterface {
u_int32_t msg_id, void *data);
u_int8_t parseTLVCounter(const char *payload, int payload_size);
u_int8_t parseJSONCounter(const char *payload, int payload_size);
u_int8_t parseJSONCustomIE(const char *payload, int payload_size);
u_int8_t parseTemplate(const char *payload, int payload_size,
u_int32_t source_id, u_int32_t msg_id, void *data);
u_int8_t parseOption(const char *payload, int payload_size,
Expand Down
2 changes: 1 addition & 1 deletion src/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5647,7 +5647,7 @@ void Utils::closeSocket(int sock) {

static const char *message_topics[] = {
"flow", "event", "counter", "template", "option",
"hello", "listening-ports", "snmp-ifaces", NULL};
"hello", "listening-ports", "snmp-ifaces", "custom-ie", NULL};

const char **Utils::getMessagingTopics() {
return ((const char **)message_topics);
Expand Down
167 changes: 85 additions & 82 deletions src/ZMQCollectorInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI
if (strncmp(e, "zmq", 3) == 0) e[0] = 't', e[1] = 'c', e[2] = 'p';

if (num_subscribers == MAX_ZMQ_SUBSCRIBERS) {
ntop->getTrace()->traceEvent(
TRACE_ERROR,
"Too many endpoints defined %u: skipping those in excess",
num_subscribers);
ntop->getTrace()->traceEvent(TRACE_ERROR,
"Too many endpoints defined %u: skipping those in excess",
num_subscribers);
break;
}

Expand All @@ -83,7 +82,7 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI
}
#else
ntop->getTrace()->traceEvent(TRACE_ERROR,
"Unable to enable ZMQ CURVE encryption, ZMQ >= 4.1 is required");
"Unable to enable ZMQ CURVE encryption, ZMQ >= 4.1 is required");
#endif
}

Expand All @@ -105,9 +104,9 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI
zmq_close(subscriber[num_subscribers].socket);
zmq_ctx_destroy(context);
ntop->getTrace()->traceEvent(
TRACE_ERROR,
"Unable to bind to ZMQ endpoint %s [collector]: %s (%d)", e,
strerror(errno), errno);
TRACE_ERROR,
"Unable to bind to ZMQ endpoint %s [collector]: %s (%d)", e,
strerror(errno), errno);
free(tmp);
throw("Unable to bind to the specified ZMQ endpoint");
}
Expand All @@ -116,22 +115,19 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI
zmq_close(subscriber[num_subscribers].socket);
zmq_ctx_destroy(context);
ntop->getTrace()->traceEvent(
TRACE_ERROR,
"Unable to connect to ZMQ endpoint %s [probe]: %s (%d)", e,
strerror(errno), errno);
TRACE_ERROR,
"Unable to connect to ZMQ endpoint %s [probe]: %s (%d)", e,
strerror(errno), errno);
free(tmp);
throw("Unable to connect to the specified ZMQ endpoint");
}
}

for (int i = 0; topics[i] != NULL; i++) {
if (zmq_setsockopt(subscriber[num_subscribers].socket, ZMQ_SUBSCRIBE,
topics[i], strlen(topics[i])) != 0) {
if (zmq_setsockopt(subscriber[num_subscribers].socket, ZMQ_SUBSCRIBE, topics[i], strlen(topics[i])) != 0) {
zmq_close(subscriber[num_subscribers].socket);
zmq_ctx_destroy(context);
ntop->getTrace()->traceEvent(
TRACE_ERROR, "Unable to connect to subscribe to topic %s",
topics[i]);
ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to connect to subscribe to topic %s", topics[i]);
free(tmp);
throw("Unable to subscribe to the specified ZMQ endpoint");
}
Expand Down Expand Up @@ -159,9 +155,9 @@ ZMQCollectorInterface::~ZMQCollectorInterface() {
for (u_int i = 0; i < INTERFACE_PROFILING_NUM_SECTIONS; i++) {
if (INTERFACE_PROFILING_SECTION_LABEL(i) != NULL)
ntop->getTrace()->traceEvent(
TRACE_NORMAL, "[PROFILING] Section #%d '%s': AVG %llu ticks", i,
INTERFACE_PROFILING_SECTION_LABEL(i),
INTERFACE_PROFILING_SECTION_AVG(i, n));
TRACE_NORMAL, "[PROFILING] Section #%d '%s': AVG %llu ticks", i,
INTERFACE_PROFILING_SECTION_LABEL(i),
INTERFACE_PROFILING_SECTION_AVG(i, n));
ntop->getTrace()->traceEvent(TRACE_NORMAL,
"[PROFILING] Section #%d '%s': %llu ticks",
i, INTERFACE_PROFILING_SECTION_LABEL(i),
Expand Down Expand Up @@ -214,12 +210,12 @@ char *ZMQCollectorInterface::findInterfaceEncryptionKeys(char *public_key, char
void ZMQCollectorInterface::checkPointCounters(bool drops_only) {
if (!drops_only) {
recvStatsCheckpoint.num_flows = recvStats.num_flows,
recvStatsCheckpoint.num_events = recvStats.num_events,
recvStatsCheckpoint.num_counters = recvStats.num_counters,
recvStatsCheckpoint.num_templates = recvStats.num_templates,
recvStatsCheckpoint.num_options = recvStats.num_options,
recvStatsCheckpoint.num_network_events = recvStats.num_network_events,
recvStatsCheckpoint.zmq_msg_rcvd = recvStats.zmq_msg_rcvd;
recvStatsCheckpoint.num_events = recvStats.num_events,
recvStatsCheckpoint.num_counters = recvStats.num_counters,
recvStatsCheckpoint.num_templates = recvStats.num_templates,
recvStatsCheckpoint.num_options = recvStats.num_options,
recvStatsCheckpoint.num_network_events = recvStats.num_network_events,
recvStatsCheckpoint.zmq_msg_rcvd = recvStats.zmq_msg_rcvd;
}

recvStatsCheckpoint.num_dropped_flows = recvStats.num_dropped_flows;
Expand Down Expand Up @@ -281,7 +277,7 @@ void ZMQCollectorInterface::collect_flows() {

for (int i = 0; i < num_subscribers; i++)
items[i].socket = subscriber[i].socket, items[i].fd = 0,
items[i].events = ZMQ_POLLIN, items[i].revents = 0;
items[i].events = ZMQ_POLLIN, items[i].revents = 0;

do {
rc = zmq_poll(items, num_subscribers, MAX_ZMQ_POLL_WAIT_MS);
Expand Down Expand Up @@ -327,7 +323,7 @@ void ZMQCollectorInterface::collect_flows() {
if (((size != sizeof(struct zmq_msg_hdr_v3)) &&
(size != sizeof(struct zmq_msg_hdr_v2)) &&
(size != sizeof(struct zmq_msg_hdr_v1))
) ||
) ||
(h->version > ZMQ_MSG_VERSION)) {
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Unsupported publisher version: is your nProbe sender "
Expand Down Expand Up @@ -503,7 +499,7 @@ void ZMQCollectorInterface::collect_flows() {
ntop->getTrace()->traceEvent(TRACE_ERROR,
"Unable to uncompress ZMQ traffic: "
"ntopng compiled without zlib"),
once = true;
once = true;

continue;
#endif
Expand All @@ -516,8 +512,8 @@ void ZMQCollectorInterface::collect_flows() {

if (ntop->getPrefs()->get_zmq_encryption_pwd())
Utils::xor_encdec(
(u_char *)uncompressed, uncompressed_len,
(u_char *)ntop->getPrefs()->get_zmq_encryption_pwd());
(u_char *)uncompressed, uncompressed_len,
(u_char *)ntop->getPrefs()->get_zmq_encryption_pwd());

if (false) {
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[url: %s]", h->url);
Expand All @@ -544,61 +540,68 @@ void ZMQCollectorInterface::collect_flows() {

/* Process the message */
switch (h->url[0]) {
case 'e': /* event */
recvStats.num_events++;
parseEvent(uncompressed, uncompressed_len, source_id, msg_id,
this);
break;

case 'f': /* flow */
if (tlv_encoding)
recvStats.num_flows +=
parseTLVFlow(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
else {
uncompressed[uncompressed_len] = '\0';
recvStats.num_flows += parseJSONFlow(uncompressed, uncompressed_len, subscriber_id, msg_id);
}
break;

case 'c': /* counter */
case 'e': /* event */
recvStats.num_events++;
parseEvent(uncompressed, uncompressed_len, source_id, msg_id,
this);
break;

case 'f': /* flow */
if (tlv_encoding)
recvStats.num_flows += parseTLVFlow(uncompressed, uncompressed_len,
subscriber_id, msg_id, this);
else {
uncompressed[uncompressed_len] = '\0';
recvStats.num_flows += parseJSONFlow(uncompressed, uncompressed_len, subscriber_id, msg_id);
}
break;

case 'c': /* counter / custom-ie */
if(h->url[1] == 'o') {
/* counter */
if (tlv_encoding)
parseTLVCounter(uncompressed, uncompressed_len);
else
parseJSONCounter(uncompressed, uncompressed_len);

recvStats.num_counters++;
break;

case 't': /* template */
recvStats.num_templates++;
parseTemplate(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
break;

case 'o': /* option */
recvStats.num_options++;
parseOption(uncompressed, uncompressed_len, subscriber_id, msg_id,
this);
break;

case 'h': /* hello */
recvStats.num_hello++;
/* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[HELLO] %s",
* uncompressed); */
ntop->askToRefreshIPSRules();
break;

case 'l': /* listening-ports */
recvStats.num_listening_ports++;
parseListeningPorts(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
break;

case 's': /* snmp-ifaces */
recvStats.num_snmp_interfaces++;
parseSNMPIntefaces(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
break;
} else {
/* custom-ie (JSON only) */

parseJSONCustomIE(uncompressed, uncompressed_len);
}
break;

case 't': /* template */
recvStats.num_templates++;
parseTemplate(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
break;

case 'o': /* option */
recvStats.num_options++;
parseOption(uncompressed, uncompressed_len, subscriber_id, msg_id,
this);
break;

case 'h': /* hello */
recvStats.num_hello++;
/* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[HELLO] %s",
* uncompressed); */
ntop->askToRefreshIPSRules();
break;

case 'l': /* listening-ports */
recvStats.num_listening_ports++;
parseListeningPorts(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
break;

case 's': /* snmp-ifaces */
recvStats.num_snmp_interfaces++;
parseSNMPIntefaces(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
break;
}

/* ntop->getTrace()->traceEvent(TRACE_INFO, "[%s] %s", h->url,
Expand Down Expand Up @@ -642,8 +645,8 @@ void ZMQCollectorInterface::startPacketPolling() {

bool ZMQCollectorInterface::set_packet_filter(char *filter) {
ntop->getTrace()->traceEvent(
TRACE_ERROR, "No filter can be set on a collector interface. Ignored %s",
filter);
TRACE_ERROR, "No filter can be set on a collector interface. Ignored %s",
filter);
return (false);
}

Expand Down
74 changes: 64 additions & 10 deletions src/ZMQParserInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1632,9 +1632,8 @@ bool ZMQParserInterface::matchField(ParsedFlow *const flow, const char *key,

/* **************************************************** */

bool ZMQParserInterface::parseNProbeAgentField(
ParsedFlow *const flow, const char *key, ParsedValue *value,
json_object *const jvalue) {
bool ZMQParserInterface::parseNProbeAgentField(ParsedFlow *const flow, const char *key, ParsedValue *value,
json_object *const jvalue) {
bool ret = false;
json_object *obj;

Expand Down Expand Up @@ -1969,16 +1968,13 @@ int ZMQParserInterface::parseSingleJSONFlow(json_object *o,
!json_object_iter_equal(&additional_it, &additional_itEnd)) {
const char *additional_key =
json_object_iter_peek_name(&additional_it);
json_object *additional_v =
json_object_iter_peek_value(&additional_it);
const char *additional_value =
json_object_get_string(additional_v);
json_object *additional_v = json_object_iter_peek_value(&additional_it);
const char *additional_value = json_object_get_string(additional_v);

if ((additional_key != NULL) && (additional_value != NULL)) {
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional
// field: %s", additional_key);
flow.addAdditionalField(
additional_key, json_object_new_string(additional_value));
flow.addAdditionalField(additional_key, json_object_new_string(additional_value));
}
json_object_iter_next(&additional_it);
}
Expand Down Expand Up @@ -2301,7 +2297,6 @@ u_int8_t ZMQParserInterface::parseJSONFlow(const char *payload,

if (rc > 0) n++;
}

} else {
rc = parseSingleJSONFlow(f, source_id);

Expand Down Expand Up @@ -3407,4 +3402,63 @@ u_int16_t ZMQParserInterface::findVLANMapping(std::string name) {

/* **************************************************** */

u_int8_t ZMQParserInterface::parseJSONCustomIE(const char *payload,
int payload_size) {
json_object *f;
enum json_tokener_error jerr = json_tokener_success;
sFlowInterfaceStats stats;

//ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);

memset(&stats, 0, sizeof(stats));
f = json_tokener_parse_verbose(payload, &jerr);

if (f != NULL) {
if (json_object_get_type(f) == json_type_array) {
/* IE array */
int id, num_elements = json_object_array_length(f);

for (id = 0; id < num_elements; id++) {
json_object *o = json_object_array_get_idx(f, id);

if(o != NULL) {
struct json_object_iterator it = json_object_iter_begin(o);
struct json_object_iterator itEnd = json_object_iter_end(o);

while (!json_object_iter_equal(&it, &itEnd)) {
const char *key = json_object_iter_peek_name(&it);
json_object *jvalue = json_object_iter_peek_value(&it);
const char *value = json_object_get_string(jvalue);

if((key != NULL) && (value != NULL)) {
ntop->getTrace()->traceEvent(TRACE_INFO, "%s = %s", key, value);

/* TODO: handle custom IEs */
} /* if */

/* Move to the next element */
json_object_iter_next(&it);
} /* while */
}
} /* for */
}

json_object_put(f);
} else {
// if o != NULL
if (!once) {
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated, data "
"encrypted or invalid JSON?");
ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
json_tokener_error_desc(jerr), payload_size, payload);
}

once = true;
return -1;
}

return 0;
}

#endif

0 comments on commit f3d471d

Please sign in to comment.