Skip to content

Commit

Permalink
fix distributed tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
PROFeNoM committed Dec 18, 2024
1 parent dc1e08b commit a318f36
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 313 deletions.
27 changes: 27 additions & 0 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ aliases:
image: kong/httpbin
name: httpbin_integration

- &IMAGE_DOCKER_KAFKA
image: confluentinc/cp-kafka:7.7.1
name: kafka_integration
environment:
- KAFKA_BROKER_ID=111
- KAFKA_CREATE_TOPICS=test-lowlevel:1:1,test-highlevel:1:1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka_integration:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
depends:
- zookeeper


- &IMAGE_DOCKER_MEMCHACED
image: memcached:1.5-alpine
name: memcached_integration
Expand Down Expand Up @@ -100,6 +117,14 @@ aliases:
image: gcr.io/cloud-spanner-emulator/emulator:1.5.25
name: googlespanner_integration

- &IMAGE_DOCKER_ZOOKEEPER
image: confluentic/cp-zookeeper:7.7.1
name: zookeeper
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000


- &STEP_ATTACH_WORKSPACE
attach_workspace:
at: ~/datadog
Expand Down Expand Up @@ -732,6 +757,7 @@ executors:
- <<: *IMAGE_DOCKER_ELASTICSEARCH7
- <<: *IMAGE_DOCKER_HTTPBIN
- <<: *IMAGE_DOCKER_REDIS
- <<: *IMAGE_DOCKER_KAFKA
- <<: *IMAGE_DOCKER_MEMCHACED
- <<: *IMAGE_DOCKER_MYSQL
- <<: *IMAGE_DOCKER_RABBITMQ
Expand All @@ -740,6 +766,7 @@ executors:
- <<: *IMAGE_DOCKER_DD_TEST_AGENT
- <<: *IMAGE_DOCKER_SQLSRV
- <<: *IMAGE_DOCKER_GOOGLESPANNER
- <<: *IMAGE_DOCKER_ZOOKEEPER
with_redis:
environment:
DDAGENT_HOSTNAME: 127.0.0.1
Expand Down
9 changes: 1 addition & 8 deletions ext/handlers_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,14 @@ ZEND_FUNCTION(ddtrace_kafka_produce) {
Z_PARAM_STR_OR_NULL(opaque)
ZEND_PARSE_PARAMETERS_END();

// Create distributed tracing headers if not passed
LOG(DEBUG, "Creating distributed tracing headers");
zval headers;
array_init(&headers);
ddtrace_inject_distributed_headers(Z_ARR(headers), true);

// Prepare arguments for calling the producev method
zval args[6 + opaque_param]; // We have 7 arguments for producev at max

ZVAL_LONG(&args[0], partition); // Partition
ZVAL_LONG(&args[1], msgflags); // Msgflags
ZVAL_STRINGL(&args[2], payload ? payload : "", payload_len); // Payload (optional)
ZVAL_STRINGL(&args[3], key ? key : "", key_len); // Key (optional)
ZVAL_ZVAL(&args[4], &headers, 0, 0); // Headers (distributed tracing)
ZVAL_NULL(&args[4]); // Headers (distributed tracing)
ZVAL_NULL(&args[5]); // Timestamp (optional) - NULL for now
if (opaque_param) {
ZVAL_STR(&args[6], opaque ? opaque : ZSTR_EMPTY_ALLOC()); // Opaque (optional)
Expand All @@ -87,7 +81,6 @@ ZEND_FUNCTION(ddtrace_kafka_produce) {
LOG(DEBUG, "Called 'producev' method");
zval_dtor(&function_name);

zval_ptr_dtor(&headers);
if (payload) {
zend_string_release(Z_STR(args[2]));
}
Expand Down
30 changes: 25 additions & 5 deletions src/DDTrace/Integrations/Kafka/KafkaIntegration.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use DDTrace\HookData;
use DDTrace\Integrations\Integration;
use DDTrace\Log\Logger;
use DDTrace\SpanLink;
use DDTrace\Tag;
use DDTrace\Type;
Expand Down Expand Up @@ -59,6 +60,20 @@ public function setupKafkaProduceSpan(HookData $hook, \RdKafka\ProducerTopic $pr

$conf = ObjectKVStore::get($producerTopic, 'conf');
KafkaIntegration::addProducerSpanMetadata($span, $conf, $hook->args);

// Inject distributed tracing headers
$headers = \DDTrace\generate_distributed_tracing_headers();
$nArgs = count($hook->args);
if ($nArgs >= 5) { // Add to passed headers
$hook->args[4] = array_merge($hook->args[4] ?? [], $headers);
} elseif ($nArgs == 4) { // Add the headers to the args
$hook->args[] = $headers;
} else { // Add the message key and headers to the args
$hook->args[] = null; // $key
$hook->args[] = $headers; // $headers
}
Logger::get()->debug('Added Headers: ' . json_encode($headers, JSON_PRETTY_PRINT));
$hook->overrideArguments($hook->args);
}

public static function addProducerSpanMetadata($span, $conf, $args)
Expand Down Expand Up @@ -136,11 +151,16 @@ public function processConsumedMessage(HookData $hook)
$span->metrics[Tag::KAFKA_MESSAGE_OFFSET] = $message->offset;
$span->metrics[Tag::MQ_MESSAGE_PAYLOAD_SIZE] = strlen($message->payload);

$headers = KafkaIntegration::extractMessageHeaders($message->headers ?? []);
if (\dd_trace_env_config('DD_TRACE_KAFKA_DISTRIBUTED_TRACING')) {
\DDTrace\consume_distributed_tracing_headers($headers);
} else {
$span->links[] = SpanLink::fromHeaders($headers);
$headers = array_filter(KafkaIntegration::extractMessageHeaders($message->headers ?? []), function($value) {
return $value !== null;
});
Logger::get()->debug('Read Headers: ' . json_encode($headers, JSON_PRETTY_PRINT));
if (!empty($headers)) {
if (\dd_trace_env_config('DD_TRACE_KAFKA_DISTRIBUTED_TRACING')) {
\DDTrace\consume_distributed_tracing_headers($headers);
} else {
$span->links[] = SpanLink::fromHeaders($headers);
}
}
}

Expand Down
59 changes: 55 additions & 4 deletions tests/Integrations/Kafka/KafkaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function testDistributedTracingLowLevel()
__DIR__ . '/scripts/producer.php',
[
'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'false',
'DD_TRACE_CLI_ENABLED' => 'true',
'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false',
'DD_SERVICE' => 'kafka_test',
Expand All @@ -61,7 +61,7 @@ public function testDistributedTracingLowLevel()
__DIR__ . '/scripts/consumer-lowlevel.php',
[
'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'false',
'DD_TRACE_CLI_ENABLED' => 'true',
'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false',
'DD_SERVICE' => 'kafka_test',
Expand All @@ -88,7 +88,7 @@ public function testDistributedTracingHighLevel()
__DIR__ . '/scripts/producer.php',
[
'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'false',
'DD_TRACE_CLI_ENABLED' => 'true',
'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false',
'DD_SERVICE' => 'kafka_test',
Expand All @@ -111,7 +111,7 @@ public function testDistributedTracingHighLevel()
__DIR__ . '/scripts/consumer-highlevel.php',
[
'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'false',
'DD_TRACE_CLI_ENABLED' => 'true',
'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false',
'DD_SERVICE' => 'kafka_test',
Expand All @@ -129,4 +129,55 @@ public function testDistributedTracingHighLevel()
'tests.integrations.kafka_test.test_distributed_tracing_high_level_consumer'
);
}

public function testSpanLinks()
{
self::putEnv('DD_TRACE_DEBUG_PRNG_SEED=42');

list($producerTraces, $output) = $this->inCli(
__DIR__ . '/scripts/producer.php',
[
'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'false',
'DD_TRACE_CLI_ENABLED' => 'true',
'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false',
'DD_SERVICE' => 'kafka_test',
'DD_TRACE_EXEC_ENABLED' => 'false',
],
[],
'test-highlevel',
true
);

echo $output;

$this->snapshotFromTraces(
$producerTraces,
self::FIELDS_TO_IGNORE,
'tests.integrations.kafka_test.test_span_links_producer'
);

list($consumerTraces, $output) = $this->inCli(
__DIR__ . '/scripts/consumer-highlevel.php',
[
'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'false',
'DD_TRACE_KAFKA_DISTRIBUTED_TRACING' => 'false',
'DD_TRACE_CLI_ENABLED' => 'true',
'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false',
'DD_SERVICE' => 'kafka_test',
],
[],
null,
true
);

echo $output;

$this->snapshotFromTraces(
$consumerTraces,
self::FIELDS_TO_IGNORE,
'tests.integrations.kafka_test.test_span_links_consumer'
);
}
}
46 changes: 7 additions & 39 deletions tests/Integrations/Kafka/scripts/consumer-highlevel.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,14 @@

echo "Consumer started, waiting for messages...\n";

do {
$message = $consumer->consume(5000);
$message = $consumer->consume(5000);

switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
// Process the message
echo sprintf("Message consumed: %s\n", $message->payload);
// Headers
echo sprintf("Headers: %s\n", json_encode($message->headers));
// Process the message
echo sprintf("Message consumed: %s\n", $message->payload);
// Headers
echo sprintf("Headers: %s\n", json_encode($message->headers));
// Commit the message offset after processing it
$consumer->commit($message);

// Commit the message offset after processing it
$consumer->commit($message);

break;

case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// Mark the partition as fully consumed
echo sprintf("Partition %d fully consumed\n", $message->partition);
$partitionsEof[$message->partition] = true;
break;

case RD_KAFKA_RESP_ERR__TIMED_OUT:
// Ignore timeout errors
echo "Timed out waiting for messages...\n";
break;

default:
// Handle other errors
echo sprintf("Error: %s\n", $message->errstr());
exit(1);
}

// Get the current assignment of partitions
$assignments = $consumer->getAssignment();

// Check if all partitions have been fully consumed
if (count($assignments) > 0 && count($partitionsEof) === count($assignments)) {
echo "All partitions fully consumed. Exiting...\n";
break;
}
} while (true);

$consumer->close();
19 changes: 5 additions & 14 deletions tests/Integrations/Kafka/scripts/consumer-lowlevel.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,9 @@ function (Consumer $consumer, string $json, int $jsonLength, $opaque = null): vo
$queue = $consumer->newQueue();
$offset = $argv[1] ?? RD_KAFKA_OFFSET_BEGINNING;
$topic->consumeQueueStart(0, (int) $offset, $queue);
//$topic->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);
//$topic->consumeQueueStart(2, RD_KAFKA_OFFSET_BEGINNING, $queue);
do {
$message = $queue->consume(1000);
if ($message === null) {
break;
}
echo sprintf('consume msg: %s, timestamp: %s, topic: %s', $message->payload, $message->timestamp, $message->topic_name) . PHP_EOL;
// triggers log output
$events = $consumer->poll(1);
echo sprintf('polling triggered %d events', $events) . PHP_EOL;
} while (true);
$message = $queue->consume(5000);
echo sprintf('consume msg: %s, timestamp: %s, topic: %s', $message->payload, $message->timestamp, $message->topic_name) . PHP_EOL;
// triggers log output
$events = $consumer->poll(1);
echo sprintf('polling triggered %d events', $events) . PHP_EOL;
$topic->consumeStop(0);
//$topic->consumeStop(1);
//$topic->consumeStop(2);

This file was deleted.

Loading

0 comments on commit a318f36

Please sign in to comment.