diff --git a/src/DDTrace/Integrations/Kafka/KafkaIntegration.php b/src/DDTrace/Integrations/Kafka/KafkaIntegration.php index ebd4939c47..f0625b05db 100644 --- a/src/DDTrace/Integrations/Kafka/KafkaIntegration.php +++ b/src/DDTrace/Integrations/Kafka/KafkaIntegration.php @@ -69,13 +69,22 @@ public static function addProducerSpanMetadata($span, $conf, $args) private function injectHeadersIntoArgs(array $args, array $headers): array { + // public RdKafka\ProducerTopic::producev ( + // integer $partition , + // integer $msgflags , + // string $payload [, + // string $key = NULL [, + // array $headers = NULL [, + // integer $timestamp_ms = NULL [, + // string $opaque = NULL + // ]]]] ) : void $argsCount = count($args); if ($argsCount >= 5) { $args[4] = array_merge($args[4] ?? [], $headers); } elseif ($argsCount === 4) { $args[] = $headers; - } else { - $args[] = null; + } else { // $argsCount === 3 + $args[] = null; // $key $args[] = $headers; } return $args; diff --git a/tests/Integrations/Kafka/KafkaTest.php b/tests/Integrations/Kafka/KafkaTest.php index a959ef3037..100e104261 100644 --- a/tests/Integrations/Kafka/KafkaTest.php +++ b/tests/Integrations/Kafka/KafkaTest.php @@ -152,7 +152,7 @@ public function testSpanLinksHighLevel() self::putEnv('DD_TRACE_DEBUG_PRNG_SEED=42'); list($producerTraces, $output) = $this->inCli( - __DIR__ . '/scripts/producer.php', + __DIR__ . '/scripts/producerv.php', [ 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', diff --git a/tests/Integrations/Kafka/scripts/producerv.php b/tests/Integrations/Kafka/scripts/producerv.php new file mode 100644 index 0000000000..b1b14920f9 --- /dev/null +++ b/tests/Integrations/Kafka/scripts/producerv.php @@ -0,0 +1,77 @@ +set('bootstrap.servers', 'kafka_integration:9092'); +$conf->set('socket.timeout.ms', (string) 50); +$conf->set('queue.buffering.max.messages', (string) 1000); +$conf->set('max.in.flight.requests.per.connection', (string) 1); +$conf->setDrMsgCb( + function (Producer $producer, Message $message): void { + if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) { + //var_dump($message->errstr()); + } + //var_dump($message); + } +); +//$conf->set('log_level', (string) LOG_DEBUG); +//$conf->set('debug', 'all'); +$conf->setLogCb( + function (Producer $producer, int $level, string $facility, string $message): void { + echo sprintf('log: %d %s %s', $level, $facility, $message) . PHP_EOL; + } +); +$conf->set('statistics.interval.ms', (string) 1000); +$conf->setStatsCb( + function (Producer $producer, string $json, int $json_len, $opaque = null): void { + echo "stats: ${json}" . PHP_EOL; + } +); +$conf->setDrMsgCb(function (Producer $kafka, Message $message) { + if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) { + $errorStr = rd_kafka_err2str($message->err); + + echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL; + } else { + // message successfully delivered + echo sprintf('Message sent SUCCESSFULLY with payload => %s', $message->payload) . PHP_EOL; + } +}); +//var_dump($conf->dump()); + +$topicConf = new TopicConf(); +$topicConf->set('message.timeout.ms', (string) 30000); +$topicConf->set('request.required.acks', (string) -1); +$topicConf->set('request.timeout.ms', (string) 5000); +//var_dump($topicConf->dump()); + +$producer = new Producer($conf); + +$topicName = $argv[1] ?? 'test'; +$topic = $producer->newTopic($topicName, $topicConf); +//var_dump($topic); + +$metadata = $producer->getMetadata(false, $topic, 1000); +//var_dump($metadata->getOrigBrokerName()); +//var_dump($metadata->getOrigBrokerId()); +//var_dump($metadata->getBrokers()); +//var_dump($metadata->getTopics()); + + +for ($i = 0; $i < 1; $i++) { + $key = $i % 10; + $payload = sprintf('payload-%d-%s', $i, $key); + echo sprintf('produce msg: %s', $payload) . PHP_EOL; + $topic->producev(RD_KAFKA_PARTITION_UA, 0, $payload, (string) $key); + // triggers log output + $events = $producer->poll(1); + echo sprintf('polling triggered %d events', $events) . PHP_EOL; +} + +$producer->flush(5000);