diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index b57f53c52d..200aad8546 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -43,6 +43,23 @@ aliases: image: kong/httpbin name: httpbin_integration + - &IMAGE_DOCKER_KAFKA + image: confluentinc/cp-kafka:7.7.1 + name: kafka_integration + environment: + - KAFKA_BROKER_ID=111 + - KAFKA_CREATE_TOPICS=test-lowlevel:1:1,test-highlevel:1:1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka_integration:9092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT + - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + depends: + - zookeeper + + - &IMAGE_DOCKER_MEMCHACED image: memcached:1.5-alpine name: memcached_integration @@ -100,6 +117,14 @@ aliases: image: gcr.io/cloud-spanner-emulator/emulator:1.5.25 name: googlespanner_integration + - &IMAGE_DOCKER_ZOOKEEPER + image: confluentic/cp-zookeeper:7.7.1 + name: zookeeper + environment: + - ZOOKEEPER_CLIENT_PORT=2181 + - ZOOKEEPER_TICK_TIME=2000 + + - &STEP_ATTACH_WORKSPACE attach_workspace: at: ~/datadog @@ -732,6 +757,7 @@ executors: - <<: *IMAGE_DOCKER_ELASTICSEARCH7 - <<: *IMAGE_DOCKER_HTTPBIN - <<: *IMAGE_DOCKER_REDIS + - <<: *IMAGE_DOCKER_KAFKA - <<: *IMAGE_DOCKER_MEMCHACED - <<: *IMAGE_DOCKER_MYSQL - <<: *IMAGE_DOCKER_RABBITMQ @@ -740,6 +766,7 @@ executors: - <<: *IMAGE_DOCKER_DD_TEST_AGENT - <<: *IMAGE_DOCKER_SQLSRV - <<: *IMAGE_DOCKER_GOOGLESPANNER + - <<: *IMAGE_DOCKER_ZOOKEEPER with_redis: environment: DDAGENT_HOSTNAME: 127.0.0.1 diff --git a/ext/handlers_kafka.c b/ext/handlers_kafka.c index 4ec3658c02..20d690fa22 100644 --- a/ext/handlers_kafka.c +++ b/ext/handlers_kafka.c @@ -61,12 +61,6 @@ ZEND_FUNCTION(ddtrace_kafka_produce) { Z_PARAM_STR_OR_NULL(opaque) ZEND_PARSE_PARAMETERS_END(); - // Create distributed tracing headers if not passed - LOG(DEBUG, "Creating distributed tracing headers"); - zval headers; - array_init(&headers); - ddtrace_inject_distributed_headers(Z_ARR(headers), true); - // Prepare arguments for calling the producev method zval args[6 + opaque_param]; // We have 7 arguments for producev at max @@ -74,7 +68,7 @@ ZEND_FUNCTION(ddtrace_kafka_produce) { ZVAL_LONG(&args[1], msgflags); // Msgflags ZVAL_STRINGL(&args[2], payload ? payload : "", payload_len); // Payload (optional) ZVAL_STRINGL(&args[3], key ? key : "", key_len); // Key (optional) - ZVAL_ZVAL(&args[4], &headers, 0, 0); // Headers (distributed tracing) + ZVAL_NULL(&args[4]); // Headers (distributed tracing) ZVAL_NULL(&args[5]); // Timestamp (optional) - NULL for now if (opaque_param) { ZVAL_STR(&args[6], opaque ? opaque : ZSTR_EMPTY_ALLOC()); // Opaque (optional) @@ -87,7 +81,6 @@ ZEND_FUNCTION(ddtrace_kafka_produce) { LOG(DEBUG, "Called 'producev' method"); zval_dtor(&function_name); - zval_ptr_dtor(&headers); if (payload) { zend_string_release(Z_STR(args[2])); } diff --git a/src/DDTrace/Integrations/Kafka/KafkaIntegration.php b/src/DDTrace/Integrations/Kafka/KafkaIntegration.php index 19f4f8b032..f35b1f7524 100644 --- a/src/DDTrace/Integrations/Kafka/KafkaIntegration.php +++ b/src/DDTrace/Integrations/Kafka/KafkaIntegration.php @@ -4,6 +4,7 @@ use DDTrace\HookData; use DDTrace\Integrations\Integration; +use DDTrace\Log\Logger; use DDTrace\SpanLink; use DDTrace\Tag; use DDTrace\Type; @@ -59,6 +60,20 @@ public function setupKafkaProduceSpan(HookData $hook, \RdKafka\ProducerTopic $pr $conf = ObjectKVStore::get($producerTopic, 'conf'); KafkaIntegration::addProducerSpanMetadata($span, $conf, $hook->args); + + // Inject distributed tracing headers + $headers = \DDTrace\generate_distributed_tracing_headers(); + $nArgs = count($hook->args); + if ($nArgs >= 5) { // Add to passed headers + $hook->args[4] = array_merge($hook->args[4] ?? [], $headers); + } elseif ($nArgs == 4) { // Add the headers to the args + $hook->args[] = $headers; + } else { // Add the message key and headers to the args + $hook->args[] = null; // $key + $hook->args[] = $headers; // $headers + } + Logger::get()->debug('Added Headers: ' . json_encode($headers, JSON_PRETTY_PRINT)); + $hook->overrideArguments($hook->args); } public static function addProducerSpanMetadata($span, $conf, $args) @@ -136,11 +151,16 @@ public function processConsumedMessage(HookData $hook) $span->metrics[Tag::KAFKA_MESSAGE_OFFSET] = $message->offset; $span->metrics[Tag::MQ_MESSAGE_PAYLOAD_SIZE] = strlen($message->payload); - $headers = KafkaIntegration::extractMessageHeaders($message->headers ?? []); - if (\dd_trace_env_config('DD_TRACE_KAFKA_DISTRIBUTED_TRACING')) { - \DDTrace\consume_distributed_tracing_headers($headers); - } else { - $span->links[] = SpanLink::fromHeaders($headers); + $headers = array_filter(KafkaIntegration::extractMessageHeaders($message->headers ?? []), function($value) { + return $value !== null; + }); + Logger::get()->debug('Read Headers: ' . json_encode($headers, JSON_PRETTY_PRINT)); + if (!empty($headers)) { + if (\dd_trace_env_config('DD_TRACE_KAFKA_DISTRIBUTED_TRACING')) { + \DDTrace\consume_distributed_tracing_headers($headers); + } else { + $span->links[] = SpanLink::fromHeaders($headers); + } } } diff --git a/tests/Integrations/Kafka/KafkaTest.php b/tests/Integrations/Kafka/KafkaTest.php index 6d90f27771..93c7318966 100644 --- a/tests/Integrations/Kafka/KafkaTest.php +++ b/tests/Integrations/Kafka/KafkaTest.php @@ -38,7 +38,7 @@ public function testDistributedTracingLowLevel() __DIR__ . '/scripts/producer.php', [ 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', - 'DD_TRACE_GENERATE_ROOT_SPAN' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', 'DD_TRACE_CLI_ENABLED' => 'true', 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', 'DD_SERVICE' => 'kafka_test', @@ -61,7 +61,7 @@ public function testDistributedTracingLowLevel() __DIR__ . '/scripts/consumer-lowlevel.php', [ 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', - 'DD_TRACE_GENERATE_ROOT_SPAN' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', 'DD_TRACE_CLI_ENABLED' => 'true', 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', 'DD_SERVICE' => 'kafka_test', @@ -88,7 +88,7 @@ public function testDistributedTracingHighLevel() __DIR__ . '/scripts/producer.php', [ 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', - 'DD_TRACE_GENERATE_ROOT_SPAN' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', 'DD_TRACE_CLI_ENABLED' => 'true', 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', 'DD_SERVICE' => 'kafka_test', @@ -111,7 +111,7 @@ public function testDistributedTracingHighLevel() __DIR__ . '/scripts/consumer-highlevel.php', [ 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', - 'DD_TRACE_GENERATE_ROOT_SPAN' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', 'DD_TRACE_CLI_ENABLED' => 'true', 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', 'DD_SERVICE' => 'kafka_test', @@ -129,4 +129,55 @@ public function testDistributedTracingHighLevel() 'tests.integrations.kafka_test.test_distributed_tracing_high_level_consumer' ); } + + public function testSpanLinks() + { + self::putEnv('DD_TRACE_DEBUG_PRNG_SEED=42'); + + list($producerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/producer.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + 'DD_TRACE_EXEC_ENABLED' => 'false', + ], + [], + 'test-highlevel', + true + ); + + echo $output; + + $this->snapshotFromTraces( + $producerTraces, + self::FIELDS_TO_IGNORE, + 'tests.integrations.kafka_test.test_span_links_producer' + ); + + list($consumerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/consumer-highlevel.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', + 'DD_TRACE_KAFKA_DISTRIBUTED_TRACING' => 'false', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + ], + [], + null, + true + ); + + echo $output; + + $this->snapshotFromTraces( + $consumerTraces, + self::FIELDS_TO_IGNORE, + 'tests.integrations.kafka_test.test_span_links_consumer' + ); + } } diff --git a/tests/Integrations/Kafka/scripts/consumer-highlevel.php b/tests/Integrations/Kafka/scripts/consumer-highlevel.php index a069a92c1f..093e54d849 100644 --- a/tests/Integrations/Kafka/scripts/consumer-highlevel.php +++ b/tests/Integrations/Kafka/scripts/consumer-highlevel.php @@ -19,46 +19,14 @@ echo "Consumer started, waiting for messages...\n"; -do { - $message = $consumer->consume(5000); +$message = $consumer->consume(5000); - switch ($message->err) { - case RD_KAFKA_RESP_ERR_NO_ERROR: - // Process the message - echo sprintf("Message consumed: %s\n", $message->payload); - // Headers - echo sprintf("Headers: %s\n", json_encode($message->headers)); +// Process the message +echo sprintf("Message consumed: %s\n", $message->payload); +// Headers +echo sprintf("Headers: %s\n", json_encode($message->headers)); +// Commit the message offset after processing it +$consumer->commit($message); - // Commit the message offset after processing it - $consumer->commit($message); - - break; - - case RD_KAFKA_RESP_ERR__PARTITION_EOF: - // Mark the partition as fully consumed - echo sprintf("Partition %d fully consumed\n", $message->partition); - $partitionsEof[$message->partition] = true; - break; - - case RD_KAFKA_RESP_ERR__TIMED_OUT: - // Ignore timeout errors - echo "Timed out waiting for messages...\n"; - break; - - default: - // Handle other errors - echo sprintf("Error: %s\n", $message->errstr()); - exit(1); - } - - // Get the current assignment of partitions - $assignments = $consumer->getAssignment(); - - // Check if all partitions have been fully consumed - if (count($assignments) > 0 && count($partitionsEof) === count($assignments)) { - echo "All partitions fully consumed. Exiting...\n"; - break; - } -} while (true); $consumer->close(); diff --git a/tests/Integrations/Kafka/scripts/consumer-lowlevel.php b/tests/Integrations/Kafka/scripts/consumer-lowlevel.php index 92f5b27227..a93a7e3c85 100644 --- a/tests/Integrations/Kafka/scripts/consumer-lowlevel.php +++ b/tests/Integrations/Kafka/scripts/consumer-lowlevel.php @@ -40,18 +40,9 @@ function (Consumer $consumer, string $json, int $jsonLength, $opaque = null): vo $queue = $consumer->newQueue(); $offset = $argv[1] ?? RD_KAFKA_OFFSET_BEGINNING; $topic->consumeQueueStart(0, (int) $offset, $queue); -//$topic->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue); -//$topic->consumeQueueStart(2, RD_KAFKA_OFFSET_BEGINNING, $queue); -do { - $message = $queue->consume(1000); - if ($message === null) { - break; - } - echo sprintf('consume msg: %s, timestamp: %s, topic: %s', $message->payload, $message->timestamp, $message->topic_name) . PHP_EOL; - // triggers log output - $events = $consumer->poll(1); - echo sprintf('polling triggered %d events', $events) . PHP_EOL; -} while (true); +$message = $queue->consume(5000); +echo sprintf('consume msg: %s, timestamp: %s, topic: %s', $message->payload, $message->timestamp, $message->topic_name) . PHP_EOL; +// triggers log output +$events = $consumer->poll(1); +echo sprintf('polling triggered %d events', $events) . PHP_EOL; $topic->consumeStop(0); -//$topic->consumeStop(1); -//$topic->consumeStop(2); diff --git a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_consumer.json b/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_consumer.json deleted file mode 100644 index 73652afb36..0000000000 --- a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_consumer.json +++ /dev/null @@ -1,69 +0,0 @@ -[[ - { - "name": "consumer-highlevel.php", - "service": "kafka_test", - "resource": "consumer-highlevel.php", - "trace_id": 0, - "span_id": 1, - "parent_id": 0, - "type": "cli", - "meta": { - "_dd.p.dm": "0", - "_dd.p.tid": "675bfa3a00000000", - "runtime-id": "d3c7f5a5-ca9f-457a-bf7c-7a8ae4ea62af" - }, - "metrics": { - "_dd.agent_psr": 1, - "_sampling_priority_v1": 1 - } - }, - { - "name": "kafka.consume", - "service": "kafka_test", - "resource": "kafka.consume", - "trace_id": 0, - "span_id": 2, - "parent_id": 1, - "type": "queue", - "meta": { - "component": "kafka", - "messaging.destination": "test-highlevel", - "messaging.destination_kind": "queue", - "messaging.kafka.bootstrap.servers": "kafka_integration:9092", - "messaging.kafka.client_id": "rdkafka", - "messaging.kafka.group_id": "consumer-highlevel", - "messaging.operation": "receive", - "messaging.system": "kafka", - "span.kind": "consumer" - }, - "metrics": { - "messaging.kafka.message_offset": 0, - "messaging.kafka.partition": 0, - "messaging.message_payload_size_bytes": 11 - } - }, - { - "name": "kafka.consume", - "service": "kafka_test", - "resource": "kafka.consume", - "trace_id": 0, - "span_id": 3, - "parent_id": 1, - "type": "queue", - "meta": { - "component": "kafka", - "messaging.destination": "test-highlevel", - "messaging.destination_kind": "queue", - "messaging.kafka.bootstrap.servers": "kafka_integration:9092", - "messaging.kafka.client_id": "rdkafka", - "messaging.kafka.group_id": "consumer-highlevel", - "messaging.kafka.tombstone": "true", - "messaging.operation": "receive", - "messaging.system": "kafka", - "span.kind": "consumer" - }, - "metrics": { - "messaging.kafka.message_offset": 1, - "messaging.kafka.partition": 0 - } - }]] diff --git a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_producer.json b/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_producer.json deleted file mode 100644 index 9920a96df1..0000000000 --- a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_producer.json +++ /dev/null @@ -1,43 +0,0 @@ -[[ - { - "name": "producer.php", - "service": "kafka_test", - "resource": "producer.php", - "trace_id": 0, - "span_id": 1, - "parent_id": 0, - "type": "cli", - "meta": { - "_dd.p.dm": "0", - "_dd.p.tid": "675bfa2800000000", - "runtime-id": "eb3f703a-979f-4020-92cd-f792063016f2" - }, - "metrics": { - "_dd.agent_psr": 1, - "_sampling_priority_v1": 1 - } - }, - { - "name": "kafka.produce", - "service": "kafka_test", - "resource": "kafka.produce", - "trace_id": 0, - "span_id": 2, - "parent_id": 1, - "type": "queue", - "meta": { - "component": "kafka", - "messaging.destination": "test-highlevel", - "messaging.destination_kind": "queue", - "messaging.kafka.bootstrap.servers": "kafka_integration:9092", - "messaging.kafka.client_id": "rdkafka", - "messaging.kafka.message_key": "0", - "messaging.operation": "send", - "messaging.system": "kafka", - "span.kind": "producer" - }, - "metrics": { - "messaging.kafka.partition": -1, - "messaging.message_payload_size_bytes": 11 - } - }]] diff --git a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_low_level_consumer.json b/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_low_level_consumer.json deleted file mode 100644 index 46f2fdf9de..0000000000 --- a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_low_level_consumer.json +++ /dev/null @@ -1,88 +0,0 @@ -[[ - { - "name": "consumer-lowlevel.php", - "service": "kafka_test", - "resource": "consumer-lowlevel.php", - "trace_id": 0, - "span_id": 1, - "parent_id": 0, - "type": "cli", - "meta": { - "_dd.p.dm": "0", - "_dd.p.tid": "675bff5f00000000", - "runtime-id": "7a000406-18ba-40df-93e0-53025bb64d88" - }, - "metrics": { - "_dd.agent_psr": 1, - "_sampling_priority_v1": 1 - } - }, - { - "name": "kafka.consume", - "service": "kafka_test", - "resource": "kafka.consume", - "trace_id": 0, - "span_id": 2, - "parent_id": 1, - "type": "queue", - "meta": { - "component": "kafka", - "messaging.destination": "test-lowlevel", - "messaging.destination_kind": "queue", - "messaging.kafka.bootstrap.servers": "kafka_integration:9092", - "messaging.kafka.client_id": "rdkafka", - "messaging.kafka.group_id": "consumer-lowlevel", - "messaging.operation": "receive", - "messaging.system": "kafka", - "span.kind": "consumer" - }, - "metrics": { - "messaging.kafka.message_offset": 5, - "messaging.kafka.partition": 0, - "messaging.message_payload_size_bytes": 11 - } - }, - { - "name": "kafka.consume", - "service": "kafka_test", - "resource": "kafka.consume", - "trace_id": 0, - "span_id": 3, - "parent_id": 1, - "type": "queue", - "meta": { - "component": "kafka", - "messaging.destination": "test-lowlevel", - "messaging.destination_kind": "queue", - "messaging.kafka.bootstrap.servers": "kafka_integration:9092", - "messaging.kafka.client_id": "rdkafka", - "messaging.kafka.group_id": "consumer-lowlevel", - "messaging.kafka.tombstone": "true", - "messaging.operation": "receive", - "messaging.system": "kafka", - "span.kind": "consumer" - }, - "metrics": { - "messaging.kafka.message_offset": 6, - "messaging.kafka.partition": 0 - } - }, - { - "name": "kafka.consume", - "service": "kafka_test", - "resource": "kafka.consume", - "trace_id": 0, - "span_id": 4, - "parent_id": 1, - "type": "queue", - "meta": { - "component": "kafka", - "messaging.kafka.bootstrap.servers": "kafka_integration:9092", - "messaging.kafka.client_id": "rdkafka", - "messaging.kafka.group_id": "consumer-lowlevel", - "messaging.kafka.tombstone": "true", - "messaging.operation": "receive", - "messaging.system": "kafka", - "span.kind": "consumer" - } - }]] diff --git a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_low_level_producer.json b/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_low_level_producer.json deleted file mode 100644 index 2d4d757ecd..0000000000 --- a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_low_level_producer.json +++ /dev/null @@ -1,43 +0,0 @@ -[[ - { - "name": "producer.php", - "service": "kafka_test", - "resource": "producer.php", - "trace_id": 0, - "span_id": 1, - "parent_id": 0, - "type": "cli", - "meta": { - "_dd.p.dm": "0", - "_dd.p.tid": "675bfedc00000000", - "runtime-id": "fe5746a6-b493-4f6a-87ac-aa49b995d07a" - }, - "metrics": { - "_dd.agent_psr": 1, - "_sampling_priority_v1": 1 - } - }, - { - "name": "kafka.produce", - "service": "kafka_test", - "resource": "kafka.produce", - "trace_id": 0, - "span_id": 2, - "parent_id": 1, - "type": "queue", - "meta": { - "component": "kafka", - "messaging.destination": "test-lowlevel", - "messaging.destination_kind": "queue", - "messaging.kafka.bootstrap.servers": "kafka_integration:9092", - "messaging.kafka.client_id": "rdkafka", - "messaging.kafka.message_key": "0", - "messaging.operation": "send", - "messaging.system": "kafka", - "span.kind": "producer" - }, - "metrics": { - "messaging.kafka.partition": -1, - "messaging.message_payload_size_bytes": 11 - } - }]]