Skip to content

Commit

Permalink
Add comment for legacy
Browse files Browse the repository at this point in the history
  • Loading branch information
PROFeNoM committed Dec 20, 2024
1 parent 12bff4c commit 983f529
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 3 deletions.
13 changes: 11 additions & 2 deletions src/DDTrace/Integrations/Kafka/KafkaIntegration.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,22 @@ public static function addProducerSpanMetadata($span, $conf, $args)

private function injectHeadersIntoArgs(array $args, array $headers): array
{
// public RdKafka\ProducerTopic::producev (
// integer $partition ,
// integer $msgflags ,
// string $payload [,
// string $key = NULL [,
// array $headers = NULL [,
// integer $timestamp_ms = NULL [,
// string $opaque = NULL
// ]]]] ) : void
$argsCount = count($args);
if ($argsCount >= 5) {
$args[4] = array_merge($args[4] ?? [], $headers);
} elseif ($argsCount === 4) {
$args[] = $headers;
} else {
$args[] = null;
} else { // $argsCount === 3
$args[] = null; // $key
$args[] = $headers;
}
return $args;
Expand Down
2 changes: 1 addition & 1 deletion tests/Integrations/Kafka/KafkaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public function testSpanLinksHighLevel()
self::putEnv('DD_TRACE_DEBUG_PRNG_SEED=42');

list($producerTraces, $output) = $this->inCli(
__DIR__ . '/scripts/producer.php',
__DIR__ . '/scripts/producerv.php',
[
'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true',
'DD_TRACE_GENERATE_ROOT_SPAN' => 'false',
Expand Down
77 changes: 77 additions & 0 deletions tests/Integrations/Kafka/scripts/producerv.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

use RdKafka\Conf;
use RdKafka\Message;
use RdKafka\Producer;
use RdKafka\TopicConf;

$conf = new Conf();
$conf->set('bootstrap.servers', 'kafka_integration:9092');
$conf->set('socket.timeout.ms', (string) 50);
$conf->set('queue.buffering.max.messages', (string) 1000);
$conf->set('max.in.flight.requests.per.connection', (string) 1);
$conf->setDrMsgCb(
function (Producer $producer, Message $message): void {
if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
//var_dump($message->errstr());
}
//var_dump($message);
}
);
//$conf->set('log_level', (string) LOG_DEBUG);
//$conf->set('debug', 'all');
$conf->setLogCb(
function (Producer $producer, int $level, string $facility, string $message): void {
echo sprintf('log: %d %s %s', $level, $facility, $message) . PHP_EOL;
}
);
$conf->set('statistics.interval.ms', (string) 1000);
$conf->setStatsCb(
function (Producer $producer, string $json, int $json_len, $opaque = null): void {
echo "stats: ${json}" . PHP_EOL;
}
);
$conf->setDrMsgCb(function (Producer $kafka, Message $message) {
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) {
$errorStr = rd_kafka_err2str($message->err);

echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL;
} else {
// message successfully delivered
echo sprintf('Message sent SUCCESSFULLY with payload => %s', $message->payload) . PHP_EOL;
}
});
//var_dump($conf->dump());

$topicConf = new TopicConf();
$topicConf->set('message.timeout.ms', (string) 30000);
$topicConf->set('request.required.acks', (string) -1);
$topicConf->set('request.timeout.ms', (string) 5000);
//var_dump($topicConf->dump());

$producer = new Producer($conf);

$topicName = $argv[1] ?? 'test';
$topic = $producer->newTopic($topicName, $topicConf);
//var_dump($topic);

$metadata = $producer->getMetadata(false, $topic, 1000);
//var_dump($metadata->getOrigBrokerName());
//var_dump($metadata->getOrigBrokerId());
//var_dump($metadata->getBrokers());
//var_dump($metadata->getTopics());


for ($i = 0; $i < 1; $i++) {
$key = $i % 10;
$payload = sprintf('payload-%d-%s', $i, $key);
echo sprintf('produce msg: %s', $payload) . PHP_EOL;
$topic->producev(RD_KAFKA_PARTITION_UA, 0, $payload, (string) $key);
// triggers log output
$events = $producer->poll(1);
echo sprintf('polling triggered %d events', $events) . PHP_EOL;
}

$producer->flush(5000);

0 comments on commit 983f529

Please sign in to comment.