diff --git a/include/ZMQParserInterface.h b/include/ZMQParserInterface.h index 590de255989d..656d5abf0fe3 100644 --- a/include/ZMQParserInterface.h +++ b/include/ZMQParserInterface.h @@ -118,6 +118,7 @@ class ZMQParserInterface : public ParserInterface { u_int32_t msg_id, void *data); u_int8_t parseTLVCounter(const char *payload, int payload_size); u_int8_t parseJSONCounter(const char *payload, int payload_size); + u_int8_t parseJSONCustomIE(const char *payload, int payload_size); u_int8_t parseTemplate(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id, void *data); u_int8_t parseOption(const char *payload, int payload_size, diff --git a/src/Utils.cpp b/src/Utils.cpp index ae1380132ceb..07e3891e796e 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -5647,7 +5647,7 @@ void Utils::closeSocket(int sock) { static const char *message_topics[] = { "flow", "event", "counter", "template", "option", - "hello", "listening-ports", "snmp-ifaces", NULL}; + "hello", "listening-ports", "snmp-ifaces", "custom-ie", NULL}; const char **Utils::getMessagingTopics() { return ((const char **)message_topics); diff --git a/src/ZMQCollectorInterface.cpp b/src/ZMQCollectorInterface.cpp index f023b9a5d757..05bc4c31af3a 100644 --- a/src/ZMQCollectorInterface.cpp +++ b/src/ZMQCollectorInterface.cpp @@ -55,10 +55,9 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI if (strncmp(e, "zmq", 3) == 0) e[0] = 't', e[1] = 'c', e[2] = 'p'; if (num_subscribers == MAX_ZMQ_SUBSCRIBERS) { - ntop->getTrace()->traceEvent( - TRACE_ERROR, - "Too many endpoints defined %u: skipping those in excess", - num_subscribers); + ntop->getTrace()->traceEvent(TRACE_ERROR, + "Too many endpoints defined %u: skipping those in excess", + num_subscribers); break; } @@ -83,7 +82,7 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI } #else ntop->getTrace()->traceEvent(TRACE_ERROR, - "Unable to enable ZMQ CURVE encryption, ZMQ >= 4.1 is required"); + "Unable to enable ZMQ CURVE encryption, ZMQ >= 4.1 is required"); #endif } @@ -105,9 +104,9 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI zmq_close(subscriber[num_subscribers].socket); zmq_ctx_destroy(context); ntop->getTrace()->traceEvent( - TRACE_ERROR, - "Unable to bind to ZMQ endpoint %s [collector]: %s (%d)", e, - strerror(errno), errno); + TRACE_ERROR, + "Unable to bind to ZMQ endpoint %s [collector]: %s (%d)", e, + strerror(errno), errno); free(tmp); throw("Unable to bind to the specified ZMQ endpoint"); } @@ -116,22 +115,19 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI zmq_close(subscriber[num_subscribers].socket); zmq_ctx_destroy(context); ntop->getTrace()->traceEvent( - TRACE_ERROR, - "Unable to connect to ZMQ endpoint %s [probe]: %s (%d)", e, - strerror(errno), errno); + TRACE_ERROR, + "Unable to connect to ZMQ endpoint %s [probe]: %s (%d)", e, + strerror(errno), errno); free(tmp); throw("Unable to connect to the specified ZMQ endpoint"); } } for (int i = 0; topics[i] != NULL; i++) { - if (zmq_setsockopt(subscriber[num_subscribers].socket, ZMQ_SUBSCRIBE, - topics[i], strlen(topics[i])) != 0) { + if (zmq_setsockopt(subscriber[num_subscribers].socket, ZMQ_SUBSCRIBE, topics[i], strlen(topics[i])) != 0) { zmq_close(subscriber[num_subscribers].socket); zmq_ctx_destroy(context); - ntop->getTrace()->traceEvent( - TRACE_ERROR, "Unable to connect to subscribe to topic %s", - topics[i]); + ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to connect to subscribe to topic %s", topics[i]); free(tmp); throw("Unable to subscribe to the specified ZMQ endpoint"); } @@ -159,9 +155,9 @@ ZMQCollectorInterface::~ZMQCollectorInterface() { for (u_int i = 0; i < INTERFACE_PROFILING_NUM_SECTIONS; i++) { if (INTERFACE_PROFILING_SECTION_LABEL(i) != NULL) ntop->getTrace()->traceEvent( - TRACE_NORMAL, "[PROFILING] Section #%d '%s': AVG %llu ticks", i, - INTERFACE_PROFILING_SECTION_LABEL(i), - INTERFACE_PROFILING_SECTION_AVG(i, n)); + TRACE_NORMAL, "[PROFILING] Section #%d '%s': AVG %llu ticks", i, + INTERFACE_PROFILING_SECTION_LABEL(i), + INTERFACE_PROFILING_SECTION_AVG(i, n)); ntop->getTrace()->traceEvent(TRACE_NORMAL, "[PROFILING] Section #%d '%s': %llu ticks", i, INTERFACE_PROFILING_SECTION_LABEL(i), @@ -214,12 +210,12 @@ char *ZMQCollectorInterface::findInterfaceEncryptionKeys(char *public_key, char void ZMQCollectorInterface::checkPointCounters(bool drops_only) { if (!drops_only) { recvStatsCheckpoint.num_flows = recvStats.num_flows, - recvStatsCheckpoint.num_events = recvStats.num_events, - recvStatsCheckpoint.num_counters = recvStats.num_counters, - recvStatsCheckpoint.num_templates = recvStats.num_templates, - recvStatsCheckpoint.num_options = recvStats.num_options, - recvStatsCheckpoint.num_network_events = recvStats.num_network_events, - recvStatsCheckpoint.zmq_msg_rcvd = recvStats.zmq_msg_rcvd; + recvStatsCheckpoint.num_events = recvStats.num_events, + recvStatsCheckpoint.num_counters = recvStats.num_counters, + recvStatsCheckpoint.num_templates = recvStats.num_templates, + recvStatsCheckpoint.num_options = recvStats.num_options, + recvStatsCheckpoint.num_network_events = recvStats.num_network_events, + recvStatsCheckpoint.zmq_msg_rcvd = recvStats.zmq_msg_rcvd; } recvStatsCheckpoint.num_dropped_flows = recvStats.num_dropped_flows; @@ -281,7 +277,7 @@ void ZMQCollectorInterface::collect_flows() { for (int i = 0; i < num_subscribers; i++) items[i].socket = subscriber[i].socket, items[i].fd = 0, - items[i].events = ZMQ_POLLIN, items[i].revents = 0; + items[i].events = ZMQ_POLLIN, items[i].revents = 0; do { rc = zmq_poll(items, num_subscribers, MAX_ZMQ_POLL_WAIT_MS); @@ -327,7 +323,7 @@ void ZMQCollectorInterface::collect_flows() { if (((size != sizeof(struct zmq_msg_hdr_v3)) && (size != sizeof(struct zmq_msg_hdr_v2)) && (size != sizeof(struct zmq_msg_hdr_v1)) - ) || + ) || (h->version > ZMQ_MSG_VERSION)) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported publisher version: is your nProbe sender " @@ -503,7 +499,7 @@ void ZMQCollectorInterface::collect_flows() { ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to uncompress ZMQ traffic: " "ntopng compiled without zlib"), - once = true; + once = true; continue; #endif @@ -516,8 +512,8 @@ void ZMQCollectorInterface::collect_flows() { if (ntop->getPrefs()->get_zmq_encryption_pwd()) Utils::xor_encdec( - (u_char *)uncompressed, uncompressed_len, - (u_char *)ntop->getPrefs()->get_zmq_encryption_pwd()); + (u_char *)uncompressed, uncompressed_len, + (u_char *)ntop->getPrefs()->get_zmq_encryption_pwd()); if (false) { ntop->getTrace()->traceEvent(TRACE_NORMAL, "[url: %s]", h->url); @@ -544,61 +540,68 @@ void ZMQCollectorInterface::collect_flows() { /* Process the message */ switch (h->url[0]) { - case 'e': /* event */ - recvStats.num_events++; - parseEvent(uncompressed, uncompressed_len, source_id, msg_id, - this); - break; - - case 'f': /* flow */ - if (tlv_encoding) - recvStats.num_flows += - parseTLVFlow(uncompressed, uncompressed_len, subscriber_id, - msg_id, this); - else { - uncompressed[uncompressed_len] = '\0'; - recvStats.num_flows += parseJSONFlow(uncompressed, uncompressed_len, subscriber_id, msg_id); - } - break; - - case 'c': /* counter */ + case 'e': /* event */ + recvStats.num_events++; + parseEvent(uncompressed, uncompressed_len, source_id, msg_id, + this); + break; + + case 'f': /* flow */ + if (tlv_encoding) + recvStats.num_flows += parseTLVFlow(uncompressed, uncompressed_len, + subscriber_id, msg_id, this); + else { + uncompressed[uncompressed_len] = '\0'; + recvStats.num_flows += parseJSONFlow(uncompressed, uncompressed_len, subscriber_id, msg_id); + } + break; + + case 'c': /* counter / custom-ie */ + if(h->url[1] == 'o') { + /* counter */ if (tlv_encoding) parseTLVCounter(uncompressed, uncompressed_len); else parseJSONCounter(uncompressed, uncompressed_len); + recvStats.num_counters++; - break; - - case 't': /* template */ - recvStats.num_templates++; - parseTemplate(uncompressed, uncompressed_len, subscriber_id, - msg_id, this); - break; - - case 'o': /* option */ - recvStats.num_options++; - parseOption(uncompressed, uncompressed_len, subscriber_id, msg_id, - this); - break; - - case 'h': /* hello */ - recvStats.num_hello++; - /* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[HELLO] %s", - * uncompressed); */ - ntop->askToRefreshIPSRules(); - break; - - case 'l': /* listening-ports */ - recvStats.num_listening_ports++; - parseListeningPorts(uncompressed, uncompressed_len, subscriber_id, - msg_id, this); - break; - - case 's': /* snmp-ifaces */ - recvStats.num_snmp_interfaces++; - parseSNMPIntefaces(uncompressed, uncompressed_len, subscriber_id, - msg_id, this); - break; + } else { + /* custom-ie (JSON only) */ + + parseJSONCustomIE(uncompressed, uncompressed_len); + } + break; + + case 't': /* template */ + recvStats.num_templates++; + parseTemplate(uncompressed, uncompressed_len, subscriber_id, + msg_id, this); + break; + + case 'o': /* option */ + recvStats.num_options++; + parseOption(uncompressed, uncompressed_len, subscriber_id, msg_id, + this); + break; + + case 'h': /* hello */ + recvStats.num_hello++; + /* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[HELLO] %s", + * uncompressed); */ + ntop->askToRefreshIPSRules(); + break; + + case 'l': /* listening-ports */ + recvStats.num_listening_ports++; + parseListeningPorts(uncompressed, uncompressed_len, subscriber_id, + msg_id, this); + break; + + case 's': /* snmp-ifaces */ + recvStats.num_snmp_interfaces++; + parseSNMPIntefaces(uncompressed, uncompressed_len, subscriber_id, + msg_id, this); + break; } /* ntop->getTrace()->traceEvent(TRACE_INFO, "[%s] %s", h->url, @@ -642,8 +645,8 @@ void ZMQCollectorInterface::startPacketPolling() { bool ZMQCollectorInterface::set_packet_filter(char *filter) { ntop->getTrace()->traceEvent( - TRACE_ERROR, "No filter can be set on a collector interface. Ignored %s", - filter); + TRACE_ERROR, "No filter can be set on a collector interface. Ignored %s", + filter); return (false); } diff --git a/src/ZMQParserInterface.cpp b/src/ZMQParserInterface.cpp index 034bc9333bfc..716b49b1e90b 100644 --- a/src/ZMQParserInterface.cpp +++ b/src/ZMQParserInterface.cpp @@ -1632,9 +1632,8 @@ bool ZMQParserInterface::matchField(ParsedFlow *const flow, const char *key, /* **************************************************** */ -bool ZMQParserInterface::parseNProbeAgentField( - ParsedFlow *const flow, const char *key, ParsedValue *value, - json_object *const jvalue) { +bool ZMQParserInterface::parseNProbeAgentField(ParsedFlow *const flow, const char *key, ParsedValue *value, + json_object *const jvalue) { bool ret = false; json_object *obj; @@ -1969,16 +1968,13 @@ int ZMQParserInterface::parseSingleJSONFlow(json_object *o, !json_object_iter_equal(&additional_it, &additional_itEnd)) { const char *additional_key = json_object_iter_peek_name(&additional_it); - json_object *additional_v = - json_object_iter_peek_value(&additional_it); - const char *additional_value = - json_object_get_string(additional_v); + json_object *additional_v = json_object_iter_peek_value(&additional_it); + const char *additional_value = json_object_get_string(additional_v); if ((additional_key != NULL) && (additional_value != NULL)) { // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional // field: %s", additional_key); - flow.addAdditionalField( - additional_key, json_object_new_string(additional_value)); + flow.addAdditionalField(additional_key, json_object_new_string(additional_value)); } json_object_iter_next(&additional_it); } @@ -2301,7 +2297,6 @@ u_int8_t ZMQParserInterface::parseJSONFlow(const char *payload, if (rc > 0) n++; } - } else { rc = parseSingleJSONFlow(f, source_id); @@ -3407,4 +3402,63 @@ u_int16_t ZMQParserInterface::findVLANMapping(std::string name) { /* **************************************************** */ +u_int8_t ZMQParserInterface::parseJSONCustomIE(const char *payload, + int payload_size) { + json_object *f; + enum json_tokener_error jerr = json_tokener_success; + sFlowInterfaceStats stats; + + //ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload); + + memset(&stats, 0, sizeof(stats)); + f = json_tokener_parse_verbose(payload, &jerr); + + if (f != NULL) { + if (json_object_get_type(f) == json_type_array) { + /* IE array */ + int id, num_elements = json_object_array_length(f); + + for (id = 0; id < num_elements; id++) { + json_object *o = json_object_array_get_idx(f, id); + + if(o != NULL) { + struct json_object_iterator it = json_object_iter_begin(o); + struct json_object_iterator itEnd = json_object_iter_end(o); + + while (!json_object_iter_equal(&it, &itEnd)) { + const char *key = json_object_iter_peek_name(&it); + json_object *jvalue = json_object_iter_peek_value(&it); + const char *value = json_object_get_string(jvalue); + + if((key != NULL) && (value != NULL)) { + ntop->getTrace()->traceEvent(TRACE_INFO, "%s = %s", key, value); + + /* TODO: handle custom IEs */ + } /* if */ + + /* Move to the next element */ + json_object_iter_next(&it); + } /* while */ + } + } /* for */ + } + + json_object_put(f); + } else { + // if o != NULL + if (!once) { + ntop->getTrace()->traceEvent(TRACE_WARNING, + "Invalid message received: your nProbe sender is outdated, data " + "encrypted or invalid JSON?"); + ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", + json_tokener_error_desc(jerr), payload_size, payload); + } + + once = true; + return -1; + } + + return 0; +} + #endif