diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index b57f53c52d..b2dc35abcc 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -43,6 +43,21 @@ aliases: image: kong/httpbin name: httpbin_integration + - &IMAGE_DOCKER_KAFKA + image: confluentinc/cp-kafka:7.7.1 + name: kafka_integration + environment: + - KAFKA_BROKER_ID=111 + - KAFKA_CREATE_TOPICS=test-lowlevel:1:1,test-highlevel:1:1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka_integration:9092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT + - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true + - &IMAGE_DOCKER_MEMCHACED image: memcached:1.5-alpine name: memcached_integration @@ -100,6 +115,14 @@ aliases: image: gcr.io/cloud-spanner-emulator/emulator:1.5.25 name: googlespanner_integration + - &IMAGE_DOCKER_ZOOKEEPER + image: confluentinc/cp-zookeeper:7.7.1 + name: zookeeper + environment: + - ZOOKEEPER_CLIENT_PORT=2181 + - ZOOKEEPER_TICK_TIME=2000 + + - &STEP_ATTACH_WORKSPACE attach_workspace: at: ~/datadog @@ -537,6 +560,22 @@ commands: -e ES_JAVA_OPTS="-Xms1g -Xmx1g" \ -e discovery.type=single-node \ --name elasticsearch7_integration elasticsearch:7.17.0 + retry_docker run --detach --rm --net net \ + -e ZOOKEEPER_CLIENT_PORT=2181 \ + -e ZOOKEEPER_TICK_TIME=2000 \ + --name zookeeper confluentinc/cp-zookeeper:7.7.1 + retry_docker run --detach --rm --net net \ + -e KAFKA_BROKER_ID=111 \ + -e KAFKA_CREATE_TOPICS=test-lowlevel:1:1,test-highlevel:1:1 \ + -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ + -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka_integration:9092 \ + -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT \ + -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ + -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ + -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ + -e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \ + --name kafka_integration confluentinc/cp-kafka:7.7.1 retry_docker run --detach --rm --net net \ --name redis_integration datadog/dd-trace-ci:php-redis-5.0 retry_docker run --detach --rm --net net \ @@ -581,6 +620,8 @@ commands: docker_image: elasticsearch2_integration - docker_logs: docker_image: elasticsearch7_integration + - docker_logs: + docker_image: kafka_integration - docker_logs: docker_image: redis_integration - docker_logs: @@ -595,6 +636,8 @@ commands: docker_image: sqlsrv_integration - docker_logs: docker_image: googlespanner_integration + - docker_logs: + docker_image: zookeeper - run: name: Setup docker image << parameters.docker_image >> command: | @@ -732,6 +775,7 @@ executors: - <<: *IMAGE_DOCKER_ELASTICSEARCH7 - <<: *IMAGE_DOCKER_HTTPBIN - <<: *IMAGE_DOCKER_REDIS + - <<: *IMAGE_DOCKER_KAFKA - <<: *IMAGE_DOCKER_MEMCHACED - <<: *IMAGE_DOCKER_MYSQL - <<: *IMAGE_DOCKER_RABBITMQ @@ -740,6 +784,7 @@ executors: - <<: *IMAGE_DOCKER_DD_TEST_AGENT - <<: *IMAGE_DOCKER_SQLSRV - <<: *IMAGE_DOCKER_GOOGLESPANNER + - <<: *IMAGE_DOCKER_ZOOKEEPER with_redis: environment: DDAGENT_HOSTNAME: 127.0.0.1 diff --git a/.gitlab/ci-images.yml b/.gitlab/ci-images.yml index 8b76d3952f..368c5b1bbb 100644 --- a/.gitlab/ci-images.yml +++ b/.gitlab/ci-images.yml @@ -14,7 +14,7 @@ CentOS: - when: manual needs: [] tags: ["arch:amd64"] - timeout: 1h + timeout: 4h image: 486234852809.dkr.ecr.us-east-1.amazonaws.com/docker:24.0.4-gbi-focal parallel: matrix: @@ -41,7 +41,7 @@ Alpine: - when: manual needs: [] tags: ["arch:amd64"] - timeout: 1h + timeout: 4h image: 486234852809.dkr.ecr.us-east-1.amazonaws.com/docker:24.0.4-gbi-focal parallel: matrix: @@ -68,7 +68,7 @@ Bookworm: - when: manual needs: [] tags: ["arch:amd64"] - timeout: 1h + timeout: 4h image: 486234852809.dkr.ecr.us-east-1.amazonaws.com/docker:24.0.4-gbi-focal parallel: matrix: @@ -97,7 +97,7 @@ Buster: - when: manual needs: [] tags: ["arch:amd64"] - timeout: 1h + timeout: 4h image: 486234852809.dkr.ecr.us-east-1.amazonaws.com/docker:24.0.4-gbi-focal parallel: matrix: diff --git a/Makefile b/Makefile index d0af6785f1..87a4a56b70 100644 --- a/Makefile +++ b/Makefile @@ -547,6 +547,7 @@ TELEMETRY_ENABLED=0 TEST_INTEGRATIONS_70 := \ test_integrations_deferred_loading \ test_integrations_curl \ + test_integrations_kafka \ test_integrations_memcache \ test_integrations_memcached \ test_integrations_mongodb1 \ @@ -588,6 +589,7 @@ TEST_INTEGRATIONS_71 := \ test_integrations_amqp2 \ test_integrations_amqp35 \ test_integrations_curl \ + test_integrations_kafka \ test_integrations_memcache \ test_integrations_memcached \ test_integrations_mongodb1 \ @@ -639,6 +641,7 @@ TEST_INTEGRATIONS_72 := \ test_integrations_amqp2 \ test_integrations_amqp35 \ test_integrations_curl \ + test_integrations_kafka \ test_integrations_memcache \ test_integrations_memcached \ test_integrations_mongodb1 \ @@ -697,6 +700,7 @@ TEST_INTEGRATIONS_73 :=\ test_integrations_amqp2 \ test_integrations_amqp35 \ test_integrations_curl \ + test_integrations_kafka \ test_integrations_memcache \ test_integrations_memcached \ test_integrations_mongodb1 \ @@ -754,6 +758,7 @@ TEST_INTEGRATIONS_74 := \ test_integrations_amqp2 \ test_integrations_amqp35 \ test_integrations_curl \ + test_integrations_kafka \ test_integrations_memcache \ test_integrations_memcached \ test_integrations_mongodb1 \ @@ -819,6 +824,7 @@ TEST_INTEGRATIONS_80 := \ test_integrations_amqp2 \ test_integrations_amqp35 \ test_integrations_curl \ + test_integrations_kafka \ test_integrations_laminaslog2 \ test_integrations_memcache \ test_integrations_memcached \ @@ -871,6 +877,7 @@ TEST_INTEGRATIONS_81 := \ test_integrations_amqp35 \ test_integrations_curl \ test_integrations_deferred_loading \ + test_integrations_kafka \ test_integrations_laminaslog2 \ test_integrations_memcache \ test_integrations_memcached \ @@ -925,6 +932,7 @@ TEST_INTEGRATIONS_82 := \ test_integrations_amqp35 \ test_integrations_curl \ test_integrations_deferred_loading \ + test_integrations_kafka \ test_integrations_laminaslog2 \ test_integrations_memcache \ test_integrations_memcached \ @@ -987,6 +995,7 @@ TEST_INTEGRATIONS_83 := \ test_integrations_amqp35 \ test_integrations_curl \ test_integrations_deferred_loading \ + test_integrations_kafka \ test_integrations_laminaslog2 \ test_integrations_memcache \ test_integrations_memcached \ @@ -1230,6 +1239,8 @@ test_integrations_guzzle6: global_test_run_dependencies tests/Integrations/Guzz $(call run_tests_debug,tests/Integrations/Guzzle/V6) test_integrations_guzzle7: global_test_run_dependencies tests/Integrations/Guzzle/V7/composer.lock-php$(PHP_MAJOR_MINOR) $(call run_tests_debug,tests/Integrations/Guzzle/V7) +test_integrations_kafka: global_test_run_dependencies + $(call run_tests_debug,tests/Integrations/Kafka) test_integrations_laminaslog2: global_test_run_dependencies tests/Integrations/Logs/LaminasLogV2/composer.lock-php$(PHP_MAJOR_MINOR) $(call run_tests_debug,tests/Integrations/Logs/LaminasLogV2) test_integrations_memcached: global_test_run_dependencies diff --git a/config.m4 b/config.m4 index 8e35b07846..241f81657b 100644 --- a/config.m4 +++ b/config.m4 @@ -184,6 +184,7 @@ if test "$PHP_DDTRACE" != "no"; then ext/handlers_api.c \ ext/handlers_exception.c \ ext/handlers_internal.c \ + ext/handlers_kafka.c \ ext/handlers_pcntl.c \ ext/handlers_signal.c \ ext/integrations/exec_integration.c \ diff --git a/config.w32 b/config.w32 index fadb21e51f..02bd845fee 100644 --- a/config.w32 +++ b/config.w32 @@ -35,6 +35,7 @@ if (PHP_DDTRACE != 'no') { DDTRACE_EXT_SOURCES += " handlers_curl" + (version < 800 ? "_php7" : "") + ".c"; DDTRACE_EXT_SOURCES += " handlers_exception.c"; DDTRACE_EXT_SOURCES += " handlers_internal.c"; + DDTRACE_EXT_SOURCES += " handlers_kafka.c"; DDTRACE_EXT_SOURCES += " handlers_pcntl.c"; DDTRACE_EXT_SOURCES += " ip_extraction.c"; DDTRACE_EXT_SOURCES += " standalone_limiter.c"; diff --git a/docker-compose.yml b/docker-compose.yml index 2cd203d4be..ac2eb7cadb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,6 +30,7 @@ x-aliases: - mysql_integration - memcached_integration - googlespanner_integration + - kafka_integration environment: - REDIS_HOSTNAME=redis_integration - DDAGENT_HOSTNAME=ddagent_integration @@ -169,6 +170,36 @@ services: aliases: - spanner.local + kafka_integration: + image: confluentinc/cp-kafka:7.7.1 + ports: + - "9092:9092" + networks: + default: + aliases: + - kafka_integration + environment: + - KAFKA_BROKER_ID=111 + - KAFKA_CREATE_TOPICS=test-lowlevel:1:1,test-highlevel:1:1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka_integration:9092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT + - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true + depends_on: + - zookeeper + + zookeeper: + image: confluentinc/cp-zookeeper:7.7.1 + ports: + - "2181:2181" + environment: + - ZOOKEEPER_CLIENT_PORT=2181 + - ZOOKEEPER_TICK_TIME=2000 + mongodb_integration: image: "circleci/mongo:4.0" ports: diff --git a/dockerfiles/ci/alpine/Dockerfile b/dockerfiles/ci/alpine/Dockerfile index b16923e9a0..18a2d30349 100644 --- a/dockerfiles/ci/alpine/Dockerfile +++ b/dockerfiles/ci/alpine/Dockerfile @@ -28,6 +28,7 @@ RUN set -eux; \ libffi-dev \ libmemcached \ libmemcached-dev \ + librdkafka-dev \ libsodium-dev \ libxml2-dev \ libzip-dev \ diff --git a/dockerfiles/ci/alpine_compile_extension/base.Dockerfile b/dockerfiles/ci/alpine_compile_extension/base.Dockerfile index b67c1a88c3..38f1575495 100644 --- a/dockerfiles/ci/alpine_compile_extension/base.Dockerfile +++ b/dockerfiles/ci/alpine_compile_extension/base.Dockerfile @@ -19,6 +19,7 @@ RUN set -eux; \ libedit-dev \ libffi-dev \ libmcrypt-dev \ + librdkafka-dev \ libsodium-dev \ libxml2-dev \ gnu-libiconv-dev \ diff --git a/dockerfiles/ci/buster/Dockerfile b/dockerfiles/ci/buster/Dockerfile index dba7530c8b..31a04c6e8d 100644 --- a/dockerfiles/ci/buster/Dockerfile +++ b/dockerfiles/ci/buster/Dockerfile @@ -27,6 +27,7 @@ ENV DEVLIBS \ libpq-dev \ libpng-dev \ librabbitmq-dev \ + librdkafka-dev \ libsodium-dev \ libsqlite3-dev \ libssl-dev \ diff --git a/dockerfiles/ci/buster/build-extensions.sh b/dockerfiles/ci/buster/build-extensions.sh index d7f004fd1f..49c5d76ab4 100755 --- a/dockerfiles/ci/buster/build-extensions.sh +++ b/dockerfiles/ci/buster/build-extensions.sh @@ -128,6 +128,7 @@ else yes 'no' | pecl install memcached; echo "extension=memcached.so" >> ${iniDir}/memcached.ini; yes '' | pecl install memcache$MEMCACHE_VERSION; echo "extension=memcache.so" >> ${iniDir}/memcache.ini; pecl install mongodb$MONGODB_VERSION; echo "extension=mongodb.so" >> ${iniDir}/mongodb.ini; + pecl install rdkafka; echo "extension=rdkafka.so" >> ${iniDir}/rdkafka.ini; pecl install sqlsrv$SQLSRV_VERSION; # Xdebug is disabled by default for VERSION in "${XDEBUG_VERSIONS[@]}"; do diff --git a/dockerfiles/ci/centos/7/base.Dockerfile b/dockerfiles/ci/centos/7/base.Dockerfile index 7de9f359d0..8ed940a55a 100644 --- a/dockerfiles/ci/centos/7/base.Dockerfile +++ b/dockerfiles/ci/centos/7/base.Dockerfile @@ -232,6 +232,7 @@ RUN set -eux; \ bzip2-devel \ httpd-devel \ libmemcached-devel \ + librdkafka-devel \ libsodium-devel \ libsqlite3x-devel \ libxml2-devel \ diff --git a/ext/compatibility.h b/ext/compatibility.h index 351a9043d6..0e71b51619 100644 --- a/ext/compatibility.h +++ b/ext/compatibility.h @@ -622,6 +622,16 @@ static zend_always_inline zend_result zend_call_function_with_return_value(zend_ #define Z_PARAM_ZVAL_OR_NULL(dest) Z_PARAM_ZVAL_EX(dest, 1, 0) +#ifndef Z_PARAM_STRING_OR_NULL +#define Z_PARAM_STRING_OR_NULL(dest, dest_len) \ + Z_PARAM_STRING_EX(dest, dest_len, 1, 0) +#endif + +#ifndef Z_PARAM_STR_OR_NULL +#define Z_PARAM_STR_OR_NULL(dest) \ + Z_PARAM_STR_EX(dest, 1, 0) +#endif + #define ZEND_GUARD_PROPERTY_MASK 0xf // strip const diff --git a/ext/configuration.h b/ext/configuration.h index 30ac815eb7..970b97f212 100644 --- a/ext/configuration.h +++ b/ext/configuration.h @@ -137,6 +137,7 @@ enum ddtrace_sampling_rules_format { CONFIG(STRING, DD_TRACE_MEMORY_LIMIT, "") \ CONFIG(BOOL, DD_TRACE_REPORT_HOSTNAME, "false") \ CONFIG(BOOL, DD_TRACE_FLUSH_COLLECT_CYCLES, "false") \ + CONFIG(BOOL, DD_TRACE_KAFKA_DISTRIBUTED_TRACING, "true") \ CONFIG(BOOL, DD_TRACE_LARAVEL_QUEUE_DISTRIBUTED_TRACING, "true") \ CONFIG(BOOL, DD_TRACE_SYMFONY_MESSENGER_DISTRIBUTED_TRACING, "true") \ CONFIG(BOOL, DD_TRACE_SYMFONY_MESSENGER_MIDDLEWARES, "false") \ diff --git a/ext/handlers_internal.c b/ext/handlers_internal.c index 6a1d0f5f4a..ab4bcd9baa 100644 --- a/ext/handlers_internal.c +++ b/ext/handlers_internal.c @@ -30,6 +30,7 @@ void ddtrace_free_unregistered_class(zend_class_entry *ce) { void ddtrace_curl_handlers_startup(void); void ddtrace_exception_handlers_startup(void); void ddtrace_pcntl_handlers_startup(void); +void ddtrace_kafka_handlers_startup(void); #ifndef _WIN32 void ddtrace_signal_block_handlers_startup(void); #endif @@ -150,6 +151,7 @@ void ddtrace_internal_handlers_startup() { ddtrace_exception_handlers_startup(); ddtrace_exec_handlers_startup(); + ddtrace_kafka_handlers_startup(); #ifndef _WIN32 // Block remote-config signals of some functions ddtrace_signal_block_handlers_startup(); diff --git a/ext/handlers_kafka.c b/ext/handlers_kafka.c new file mode 100644 index 0000000000..fbe13618cf --- /dev/null +++ b/ext/handlers_kafka.c @@ -0,0 +1,110 @@ +#include +#include +#include "configuration.h" +#include "handlers_api.h" + +ZEND_EXTERN_MODULE_GLOBALS(ddtrace); + +#define MAX_PRODUCEV_ARGS 7 + +// True global - only modify during MINIT/MSHUTDOWN +static bool dd_ext_kafka_loaded = false; +static uint32_t opaque_param = 0; + +static zif_handler dd_kafka_produce_handler = NULL; + +static bool rdkafka_version_supported(void) { + zend_module_entry *rdkafka_me = zend_hash_str_find_ptr(&module_registry, ZEND_STRL("rdkafka")); + return rdkafka_me && strncmp(rdkafka_me->version, "6", 1) >= 0; +} + +static bool dd_load_kafka_integration(void) { + return dd_ext_kafka_loaded && + get_DD_TRACE_ENABLED() && + get_DD_TRACE_KAFKA_ENABLED() && + get_DD_DISTRIBUTED_TRACING() && + rdkafka_version_supported(); +} + +static void dd_initialize_producev_args(zval* args, zend_long partition, zend_long msgflags, + const char* payload, size_t payload_len, + const char* key, size_t key_len, + zend_string* opaque) { + ZVAL_LONG(&args[0], partition); // Partition + ZVAL_LONG(&args[1], msgflags); // Msgflags + ZVAL_STRINGL(&args[2], payload ? payload : "", payload_len); // Payload (optional) + ZVAL_STRINGL(&args[3], key ? key : "", key_len); // Key (optional) + ZVAL_NULL(&args[4]); // Headers (distributed tracing) + ZVAL_NULL(&args[5]); // Timestamp (optional) + if (opaque_param) { + ZVAL_STR(&args[6], opaque ? opaque : ZSTR_EMPTY_ALLOC()); // Opaque (optional) + } +} + +ZEND_FUNCTION(ddtrace_kafka_produce) { + if (!dd_load_kafka_integration()) { + // Call the original handler + dd_kafka_produce_handler(INTERNAL_FUNCTION_PARAM_PASSTHRU); + return; + } + + zend_long partition, msgflags; + char* payload = NULL; + size_t payload_len = 0; + char* key = NULL; + size_t key_len = 0; + zend_string* opaque = NULL; + + ZEND_PARSE_PARAMETERS_START(2, 4 + opaque_param) + Z_PARAM_LONG(partition) + Z_PARAM_LONG(msgflags) + Z_PARAM_OPTIONAL + Z_PARAM_STRING_OR_NULL(payload, payload_len) + Z_PARAM_STRING_OR_NULL(key, key_len) + Z_PARAM_STR_OR_NULL(opaque) + ZEND_PARSE_PARAMETERS_END(); + + zval args[MAX_PRODUCEV_ARGS]; + dd_initialize_producev_args(args, partition, msgflags, payload, payload_len, key, key_len, opaque); + + zval function_name; + ZVAL_STRING(&function_name, "producev"); + call_user_function(NULL, getThis(), &function_name, return_value, 6 + opaque_param, args); + zval_dtor(&function_name); + + zend_string_release(Z_STR(args[2])); + zend_string_release(Z_STR(args[3])); +} + +/** + * Called during MINIT. + */ +void ddtrace_kafka_handlers_startup(void) { + dd_ext_kafka_loaded = zend_hash_str_exists(&module_registry, ZEND_STRL("rdkafka")); + if (!dd_ext_kafka_loaded) { + return; + } + + zend_class_entry* producer_topic_ce = zend_hash_str_find_ptr(CG(class_table), ZEND_STRL("rdkafka\\producertopic")); + if (!producer_topic_ce || !zend_hash_str_exists(&producer_topic_ce->function_table, ZEND_STRL("producev"))) { + return; // Don't install handlers if producev doesn't exist + } + + // Determine the number of arguments for producev (check if purge exists) + // See https://github.com/arnaud-lb/php-rdkafka/blob/d6f4d160422a0f8c1e3ee6a18add7cd8f805ba07/topic.c#L495-L497 + zend_class_entry* kafka_ce = zend_hash_str_find_ptr(CG(class_table), ZEND_STRL("rdkafka")); + if (kafka_ce) { + zend_function* purge_func = zend_hash_str_find_ptr(&kafka_ce->function_table, ZEND_STRL("purge")); + opaque_param = purge_func ? 1 : 0; + } + + static const datadog_php_zim_handler handlers[] = { + {ZEND_STRL("rdkafka\\producertopic"), + {ZEND_STRL("produce"), &dd_kafka_produce_handler, ZEND_FN(ddtrace_kafka_produce)}} + }; + + size_t handlers_len = sizeof(handlers) / sizeof(handlers[0]); + for (size_t i = 0; i < handlers_len; ++i) { + datadog_php_install_method_handler(handlers[i]); + } +} diff --git a/ext/integrations/integrations.c b/ext/integrations/integrations.c index eaee9d0dd1..b90f952b32 100644 --- a/ext/integrations/integrations.c +++ b/ext/integrations/integrations.c @@ -293,6 +293,13 @@ void ddtrace_integrations_minit(void) { DD_SET_UP_DEFERRED_LOADING_BY_METHOD(DDTRACE_INTEGRATION_GUZZLE, "GuzzleHttp\\Client", "__construct", "DDTrace\\Integrations\\Guzzle\\GuzzleIntegration"); + DD_SET_UP_DEFERRED_LOADING_BY_METHOD(DDTRACE_INTEGRATION_KAFKA, "RdKafka\\Producer", "__construct", + "DDTrace\\Integrations\\Kafka\\KafkaIntegration"); + DD_SET_UP_DEFERRED_LOADING_BY_METHOD(DDTRACE_INTEGRATION_KAFKA, "RdKafka\\Consumer", "__construct", + "DDTrace\\Integrations\\Kafka\\KafkaIntegration"); + DD_SET_UP_DEFERRED_LOADING_BY_METHOD(DDTRACE_INTEGRATION_KAFKA, "RdKafka\\Conf", "__construct", + "DDTrace\\Integrations\\Kafka\\KafkaIntegration"); + DD_SET_UP_DEFERRED_LOADING_BY_METHOD(DDTRACE_INTEGRATION_LAMINAS, "Laminas\\Mvc\\Application", "init", "DDTrace\\Integrations\\Laminas\\LaminasIntegration"); DD_SET_UP_DEFERRED_LOADING_BY_METHOD(DDTRACE_INTEGRATION_LAMINAS, "Laminas\\Mvc\\Application", "bootstrap", diff --git a/ext/integrations/integrations.h b/ext/integrations/integrations.h index 6d66ce22bf..e884277db8 100644 --- a/ext/integrations/integrations.h +++ b/ext/integrations/integrations.h @@ -24,6 +24,7 @@ INTEGRATION(FRANKENPHP, "frankenphp") \ INTEGRATION(GOOGLESPANNER, "googlespanner") \ INTEGRATION(GUZZLE, "guzzle") \ + INTEGRATION(KAFKA, "kafka") \ INTEGRATION(LAMINAS, "laminas") \ INTEGRATION(LARAVEL, "laravel") \ INTEGRATION(LARAVELQUEUE, "laravelqueue") \ diff --git a/src/DDTrace/Integrations/Kafka/KafkaIntegration.php b/src/DDTrace/Integrations/Kafka/KafkaIntegration.php new file mode 100644 index 0000000000..d1fd29840f --- /dev/null +++ b/src/DDTrace/Integrations/Kafka/KafkaIntegration.php @@ -0,0 +1,214 @@ + Tag::KAFKA_HOST_LIST, + 'group.id' => Tag::KAFKA_GROUP_ID, + 'client.id' => Tag::KAFKA_CLIENT_ID + ]; + + public function init(): int + { + if (strtok(phpversion('rdkafka'), '.') < 6) { + return Integration::NOT_LOADED; + } + + $this->installProducerTopicHooks(); + $this->installConsumerHooks(); + $this->installConfigurationHooks(); + + return Integration::LOADED; + } + + private function installProducerTopicHooks() + { + $integration = $this; + \DDTrace\install_hook( + 'RdKafka\ProducerTopic::producev', + function (HookData $hook) use ($integration) { + /** @var \RdKafka\ProducerTopic $this */ + $integration->setupKafkaProduceSpan($hook, $this); + } + ); + } + + public function setupKafkaProduceSpan(HookData $hook, \RdKafka\ProducerTopic $producerTopic) + { + /** @var \RdKafka\ProducerTopic $this */ + $span = $hook->span(); + KafkaIntegration::setupCommonSpanMetadata($span, Tag::KAFKA_PRODUCE, Tag::SPAN_KIND_VALUE_PRODUCER, Tag::MQ_OPERATION_SEND); + + $span->meta[Tag::MQ_DESTINATION] = $producerTopic->getName(); + $span->meta[Tag::MQ_DESTINATION_KIND] = Type::QUEUE; + + $conf = ObjectKVStore::get($producerTopic, 'conf'); + KafkaIntegration::addProducerSpanMetadata($span, $conf, $hook->args); + + if (\ddtrace_config_distributed_tracing_enabled()) { + $headers = \DDTrace\generate_distributed_tracing_headers(); + $hook->args = $this->injectHeadersIntoArgs($hook->args, $headers); + $hook->overrideArguments($hook->args); + } + } + + public static function addProducerSpanMetadata($span, $conf, $args) + { + self::addMetadataToSpan($span, $conf); + $span->metrics[Tag::KAFKA_PARTITION] = $args[0]; + $span->metrics[Tag::MQ_MESSAGE_PAYLOAD_SIZE] = strlen($args[2]); + if (isset($args[3])) { + $span->meta[Tag::KAFKA_MESSAGE_KEY] = $args[3]; + } + } + + private function injectHeadersIntoArgs(array $args, array $headers): array + { + // public RdKafka\ProducerTopic::producev ( + // integer $partition , + // integer $msgflags , + // string $payload [, + // string $key = NULL [, + // array $headers = NULL [, + // integer $timestamp_ms = NULL [, + // string $opaque = NULL + // ]]]] ) : void + $argsCount = count($args); + if ($argsCount >= 5) { + $args[4] = array_merge($args[4] ?? [], $headers); + } elseif ($argsCount === 4) { + $args[] = $headers; + } elseif ($argsCount === 3) { + $args[] = null; // $key + $args[] = $headers; + } + return $args; + } + + private function installConsumerHooks() + { + $integration = $this; + + $consumerMethods = [ + 'RdKafka\KafkaConsumer::consume', + 'RdKafka\Queue::consume' + ]; + + foreach ($consumerMethods as $method) { + \DDTrace\install_hook( + $method, + function (HookData $hook) use ($integration) { + $hook->data['start'] = microtime(true); + }, + function (HookData $hook) use ($integration) { + /** @var \RdKafka\Message $message */ + $message = $hook->returned; + + if ($message) { + if ($message->headers && $link = SpanLink::fromHeaders($message->headers)) { + if (\dd_trace_env_config('DD_TRACE_KAFKA_DISTRIBUTED_TRACING')) { + $span = \DDTrace\start_trace_span($hook->data['start']); + \DDTrace\consume_distributed_tracing_headers($message->headers); + } else { + $span = \DDTrace\start_span($hook->data['start']); + $span->links[] = $link; + } + } else { + $span = \DDTrace\start_span($hook->data['start']); + } + + $span->meta[Tag::MQ_DESTINATION] = $message->topic_name; + $span->meta[Tag::MQ_DESTINATION_KIND] = Type::QUEUE; + $span->metrics[Tag::KAFKA_PARTITION] = $message->partition; + $span->metrics[Tag::KAFKA_MESSAGE_OFFSET] = $message->offset; + $span->metrics[Tag::MQ_MESSAGE_PAYLOAD_SIZE] = strlen($message->payload ?? ''); + } else { + $span = \DDTrace\start_span($hook->data['start']); + } + + if (!$message || $message->payload === null || $message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) { + $span->meta[Tag::KAFKA_TOMBSTONE] = true; + } + + $hook->data['span'] = $span; + $integration->setupKafkaConsumeSpan($hook, $this); + \DDTrace\close_span(); + } + ); + } + } + + public function setupKafkaConsumeSpan(HookData $hook, $consumer) + { + $span = $hook->data['span']; + KafkaIntegration::setupCommonSpanMetadata($span, Tag::KAFKA_CONSUME, Tag::SPAN_KIND_VALUE_CONSUMER, Tag::MQ_OPERATION_RECEIVE); + + $conf = ObjectKVStore::get($consumer, 'conf'); + KafkaIntegration::addMetadataToSpan($span, $conf); + } + + private static function addMetadataToSpan($span, $conf) + { + foreach (self::METADATA_MAPPING as $configKey => $tagKey) { + if (isset($conf[$configKey])) { + $span->meta[$tagKey] = $conf[$configKey]; + } + } + } + + public static function setupCommonSpanMetadata($span, string $name, string $spanKind, string $operation) + { + $span->name = $name; + $span->type = Type::QUEUE; + $span->meta[Tag::SPAN_KIND] = $spanKind; + $span->meta[Tag::COMPONENT] = self::NAME; + $span->meta[Tag::MQ_SYSTEM] = self::NAME; + $span->meta[Tag::MQ_OPERATION] = $operation; + } + + private function installConfigurationHooks() + { + $configurationHooks = [ + 'RdKafka\KafkaConsumer' => ['__construct'], + 'RdKafka\Producer' => ['__construct', 'newTopic'], + 'RdKafka\Consumer' => ['__construct', 'newQueue'] + ]; + + foreach ($configurationHooks as $class => $methods) { + foreach ($methods as $method) { + $this->installConfigurationHook($class, $method); + } + } + } + + private function installConfigurationHook(string $class, string $method) + { + \DDTrace\hook_method( + $class, + $method, + function ($This, $scope, $args) use ($method) { + if ($method === '__construct') { + $conf = $args[0]; + ObjectKVStore::put($This, 'conf', $conf->dump()); + } + }, + function ($This, $scope, $args, $returnValue) use ($method) { + if (in_array($method, ['newTopic', 'newQueue'])) { + $conf = ObjectKVStore::get($This, 'conf'); + ObjectKVStore::put($returnValue, 'conf', $conf); + } + } + ); + } +} diff --git a/src/api/Tag.php b/src/api/Tag.php index d3fb17b7af..fb2ab55e2e 100644 --- a/src/api/Tag.php +++ b/src/api/Tag.php @@ -60,6 +60,18 @@ class Tag const DB_STMT = 'db.statement'; const DB_USER = 'db.user'; + // Kafka + const KAFKA_CLIENT_ID = 'messaging.kafka.client_id'; + const KAFKA_GROUP_ID = 'messaging.kafka.group_id'; + const KAFKA_HOST_LIST = 'messaging.kafka.bootstrap.servers'; + const KAFKA_MESSAGE_KEY = 'messaging.kafka.message_key'; + const KAFKA_MESSAGE_OFFSET = 'messaging.kafka.message_offset'; + const KAFKA_PARTITION = 'messaging.kafka.partition'; + const KAFKA_TOMBSTONE = 'messaging.kafka.tombstone'; + + const KAFKA_PRODUCE = 'kafka.produce'; + const KAFKA_CONSUME = 'kafka.consume'; + // Laravel Queue const LARAVELQ_ATTEMPTS = 'messaging.laravel.attempts'; const LARAVELQ_BATCH_ID = 'messaging.laravel.batch_id'; @@ -95,6 +107,10 @@ class Tag const MQ_OPERATION = 'messaging.operation'; const MQ_CONSUMER_ID = 'messaging.consumer_id'; + const MQ_OPERATION_PROCESS = 'process'; + const MQ_OPERATION_RECEIVE = 'receive'; + const MQ_OPERATION_SEND = 'send'; + // RabbitMQ const RABBITMQ_DELIVERY_MODE = 'messaging.rabbitmq.delivery_mode'; const RABBITMQ_EXCHANGE = 'messaging.rabbitmq.exchange'; diff --git a/src/api/Type.php b/src/api/Type.php index 968215bc25..1b993442d6 100644 --- a/src/api/Type.php +++ b/src/api/Type.php @@ -9,9 +9,7 @@ class Type const WEB_SERVLET = 'web'; const CLI = 'cli'; const SQL = 'sql'; - - const MESSAGE_CONSUMER = 'queue'; - const MESSAGE_PRODUCER = 'queue'; + const QUEUE = 'queue'; const CASSANDRA = 'cassandra'; const ELASTICSEARCH = 'elasticsearch'; diff --git a/src/ddtrace_php_api.stubs.php b/src/ddtrace_php_api.stubs.php index 63511f1e50..138a44348b 100644 --- a/src/ddtrace_php_api.stubs.php +++ b/src/ddtrace_php_api.stubs.php @@ -2248,6 +2248,16 @@ class Tag const DB_ROW_COUNT = 'db.row_count'; const DB_STMT = 'db.statement'; const DB_USER = 'db.user'; + // Kafka + const KAFKA_CLIENT_ID = 'messaging.kafka.client_id'; + const KAFKA_GROUP_ID = 'messaging.kafka.group_id'; + const KAFKA_HOST_LIST = 'messaging.kafka.bootstrap.servers'; + const KAFKA_MESSAGE_KEY = 'messaging.kafka.message_key'; + const KAFKA_MESSAGE_OFFSET = 'messaging.kafka.message_offset'; + const KAFKA_PARTITION = 'messaging.kafka.partition'; + const KAFKA_TOMBSTONE = 'messaging.kafka.tombstone'; + const KAFKA_PRODUCE = 'kafka.produce'; + const KAFKA_CONSUME = 'kafka.consume'; // Laravel Queue const LARAVELQ_ATTEMPTS = 'messaging.laravel.attempts'; const LARAVELQ_BATCH_ID = 'messaging.laravel.batch_id'; @@ -2279,6 +2289,9 @@ class Tag const MQ_MESSAGE_PAYLOAD_SIZE = 'messaging.message_payload_size_bytes'; const MQ_OPERATION = 'messaging.operation'; const MQ_CONSUMER_ID = 'messaging.consumer_id'; + const MQ_OPERATION_PROCESS = 'process'; + const MQ_OPERATION_RECEIVE = 'receive'; + const MQ_OPERATION_SEND = 'send'; // RabbitMQ const RABBITMQ_DELIVERY_MODE = 'messaging.rabbitmq.delivery_mode'; const RABBITMQ_EXCHANGE = 'messaging.rabbitmq.exchange'; @@ -2296,8 +2309,7 @@ class Type const WEB_SERVLET = 'web'; const CLI = 'cli'; const SQL = 'sql'; - const MESSAGE_CONSUMER = 'queue'; - const MESSAGE_PRODUCER = 'queue'; + const QUEUE = 'queue'; const CASSANDRA = 'cassandra'; const ELASTICSEARCH = 'elasticsearch'; const MEMCACHED = 'memcached'; diff --git a/tests/Integrations/Kafka/KafkaTest.php b/tests/Integrations/Kafka/KafkaTest.php new file mode 100644 index 0000000000..137f1bec95 --- /dev/null +++ b/tests/Integrations/Kafka/KafkaTest.php @@ -0,0 +1,204 @@ +set('bootstrap.servers', self::$host . ':' . self::$port); + $producer = new \RdKafka\Producer($conf); + $topicConf = new \RdKafka\TopicConf(); + $topicConf->set('message.timeout.ms', (string) 30000); + $topicConf->set('request.required.acks', (string) -1); + $topicConf->set('request.timeout.ms', (string) 5000); + $topicLowLevel = $producer->newTopic('test-lowlevel', $topicConf); + $topicHighLevel = $producer->newTopic('test-highlevel', $topicConf); + $producer->getMetadata(false, $topicLowLevel, 5000); + $producer->getMetadata(false, $topicHighLevel, 5000); + } + + public function testDistributedTracingHighLevel() + { + list($producerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/producer.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + 'DD_TRACE_EXEC_ENABLED' => 'false', + 'DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED' => 'false', + ], + [], + 'test-highlevel', + true + ); + + $producerTrace = $producerTraces[0][0]; + $kafkaProduceSpanID = $producerTrace['span_id']; + $kafkaProduceTraceID = $producerTrace['trace_id']; + + list($consumerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/consumer-highlevel.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + 'DD_TRACE_EXEC_ENABLED' => 'false', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'true', + 'DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED' => 'false', + 'DD_TRACE_KAFKA_DISTRIBUTED_TRACING' => 'true', + ], + [], + null, + true + ); + + $distributedKafkaConsumeTraceID = $consumerTraces[0][0]['trace_id']; + $distributedKafkaConsumeParentID = $consumerTraces[0][0]['parent_id']; + + $this->assertEquals($kafkaProduceTraceID, $distributedKafkaConsumeTraceID); + $this->assertEquals($kafkaProduceSpanID, $distributedKafkaConsumeParentID); + $this->assertCount(1, $consumerTraces[0]); + } + + public function testSpanLinksLowLevel() + { + self::putEnv('DD_TRACE_DEBUG_PRNG_SEED=42'); + + // Get the latest offset of the test-lowlevel topic + $this->isolateLimitedTracer(function () use (&$low, &$high) { + $conf = new \RdKafka\Conf(); + $conf->set('bootstrap.servers', 'kafka_integration:9092'); + $conf->set('group.id', 'consumer-lowlevel'); + $conf->set('enable.partition.eof', 'true'); + + $consumer = new \RdKafka\KafkaConsumer($conf); + $consumer->queryWatermarkOffsets('test-lowlevel', 0, $low, $high, 1000); + }); + + echo "Low: $low, High: $high\n"; + + list($producerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/producer.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + 'DD_TRACE_EXEC_ENABLED' => 'false', + 'DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED' => 'false', + ], + [], + 'test-lowlevel', + true + ); + + echo $output; + + $this->snapshotFromTraces( + $producerTraces, + self::FIELDS_TO_IGNORE, + 'tests.integrations.kafka_test.test_span_links_low_level_producer' + ); + + list($consumerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/consumer-lowlevel.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + 'DD_TRACE_EXEC_ENABLED' => 'false', + 'DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED' => 'false', + 'DD_TRACE_KAFKA_DISTRIBUTED_TRACING' => 'false', + ], + [], + $high, + true + ); + + echo $output; + + $this->snapshotFromTraces( + $consumerTraces, + self::FIELDS_TO_IGNORE, + 'tests.integrations.kafka_test.test_span_links_low_level_consumer' + ); + } + + public function testSpanLinksHighLevel() + { + self::putEnv('DD_TRACE_DEBUG_PRNG_SEED=42'); + + list($producerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/producerv.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + 'DD_TRACE_EXEC_ENABLED' => 'false', + 'DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED' => 'false', + ], + [], + 'test-highlevel', + true + ); + + echo $output; + + $this->snapshotFromTraces( + $producerTraces, + self::FIELDS_TO_IGNORE, + 'tests.integrations.kafka_test.test_span_links_high_level_producer' + ); + + list($consumerTraces, $output) = $this->inCli( + __DIR__ . '/scripts/consumer-highlevel.php', + [ + 'DD_TRACE_AUTO_FLUSH_ENABLED' => 'true', + 'DD_TRACE_CLI_ENABLED' => 'true', + 'DD_INSTRUMENTATION_TELEMETRY_ENABLED' => 'false', + 'DD_SERVICE' => 'kafka_test', + 'DD_TRACE_EXEC_ENABLED' => 'false', + 'DD_TRACE_GENERATE_ROOT_SPAN' => 'false', + 'DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED' => 'false', + 'DD_TRACE_KAFKA_DISTRIBUTED_TRACING' => 'false', + ], + [], + null, + true + ); + + echo $output; + + $this->snapshotFromTraces( + $consumerTraces, + self::FIELDS_TO_IGNORE, + 'tests.integrations.kafka_test.test_span_links_high_level_consumer' + ); + } +} diff --git a/tests/Integrations/Kafka/scripts/consumer-highlevel.php b/tests/Integrations/Kafka/scripts/consumer-highlevel.php new file mode 100644 index 0000000000..a069a92c1f --- /dev/null +++ b/tests/Integrations/Kafka/scripts/consumer-highlevel.php @@ -0,0 +1,64 @@ +set('bootstrap.servers', 'kafka_integration:9092'); +$conf->set('group.id', 'consumer-highlevel'); +$conf->set('enable.partition.eof', 'true'); +$conf->set('auto.offset.reset', 'earliest'); + +// Track partitions that have been fully consumed +$partitionsEof = []; + +$consumer = new KafkaConsumer($conf); +$consumer->subscribe(['test-highlevel']); + +echo "Consumer started, waiting for messages...\n"; + +do { + $message = $consumer->consume(5000); + + switch ($message->err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + // Process the message + echo sprintf("Message consumed: %s\n", $message->payload); + // Headers + echo sprintf("Headers: %s\n", json_encode($message->headers)); + + // Commit the message offset after processing it + $consumer->commit($message); + + break; + + case RD_KAFKA_RESP_ERR__PARTITION_EOF: + // Mark the partition as fully consumed + echo sprintf("Partition %d fully consumed\n", $message->partition); + $partitionsEof[$message->partition] = true; + break; + + case RD_KAFKA_RESP_ERR__TIMED_OUT: + // Ignore timeout errors + echo "Timed out waiting for messages...\n"; + break; + + default: + // Handle other errors + echo sprintf("Error: %s\n", $message->errstr()); + exit(1); + } + + // Get the current assignment of partitions + $assignments = $consumer->getAssignment(); + + // Check if all partitions have been fully consumed + if (count($assignments) > 0 && count($partitionsEof) === count($assignments)) { + echo "All partitions fully consumed. Exiting...\n"; + break; + } +} while (true); + +$consumer->close(); diff --git a/tests/Integrations/Kafka/scripts/consumer-lowlevel.php b/tests/Integrations/Kafka/scripts/consumer-lowlevel.php new file mode 100644 index 0000000000..6739ab3e4a --- /dev/null +++ b/tests/Integrations/Kafka/scripts/consumer-lowlevel.php @@ -0,0 +1,57 @@ +set('bootstrap.servers', 'kafka_integration:9092'); +$conf->set('group.id', 'consumer-lowlevel'); +$conf->set('enable.partition.eof', 'true'); +//$conf->set('log_level', (string) LOG_DEBUG); +//$conf->set('debug', 'all'); +$conf->setLogCb( + function (Consumer $consumer, int $level, string $facility, string $message): void { + echo sprintf(' log: %d %s %s', $level, $facility, $message) . PHP_EOL; + } +); + +$conf->set('statistics.interval.ms', (string) 1000); +$conf->setStatsCb( + function (Consumer $consumer, string $json, int $jsonLength, $opaque = null): void { + echo "stats: ${json}" . PHP_EOL; + } +); + +$topicConf = new TopicConf(); +$topicConf->set('enable.auto.commit', 'true'); +$topicConf->set('auto.commit.interval.ms', (string) 100); +$topicConf->set('auto.offset.reset', 'earliest'); +//var_dump($topicConf->dump()); + +$consumer = new Consumer($conf); + +$topic = $consumer->newTopic('test-lowlevel', $topicConf); +//var_dump($topic); + +$queue = $consumer->newQueue(); +$offset = $argv[1] ?? RD_KAFKA_OFFSET_BEGINNING; +$topic->consumeQueueStart(0, (int) $offset, $queue); +//$topic->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue); +//$topic->consumeQueueStart(2, RD_KAFKA_OFFSET_BEGINNING, $queue); +do { + $message = $queue->consume(5000); + if ($message === null) { + break; + } + echo sprintf('consume msg: %s, timestamp: %s, topic: %s', $message->payload, $message->timestamp, $message->topic_name) . PHP_EOL; + // triggers log output + $events = $consumer->poll(1); + echo sprintf('polling triggered %d events', $events) . PHP_EOL; +} while (true); +$topic->consumeStop(0); +//$topic->consumeStop(1); +//$topic->consumeStop(2); diff --git a/tests/Integrations/Kafka/scripts/producer.php b/tests/Integrations/Kafka/scripts/producer.php new file mode 100644 index 0000000000..2cfd5c32e2 --- /dev/null +++ b/tests/Integrations/Kafka/scripts/producer.php @@ -0,0 +1,77 @@ +set('bootstrap.servers', 'kafka_integration:9092'); +$conf->set('socket.timeout.ms', (string) 50); +$conf->set('queue.buffering.max.messages', (string) 1000); +$conf->set('max.in.flight.requests.per.connection', (string) 1); +$conf->setDrMsgCb( + function (Producer $producer, Message $message): void { + if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) { + //var_dump($message->errstr()); + } + //var_dump($message); + } +); +//$conf->set('log_level', (string) LOG_DEBUG); +//$conf->set('debug', 'all'); +$conf->setLogCb( + function (Producer $producer, int $level, string $facility, string $message): void { + echo sprintf('log: %d %s %s', $level, $facility, $message) . PHP_EOL; + } +); +$conf->set('statistics.interval.ms', (string) 1000); +$conf->setStatsCb( + function (Producer $producer, string $json, int $json_len, $opaque = null): void { + echo "stats: ${json}" . PHP_EOL; + } +); +$conf->setDrMsgCb(function (Producer $kafka, Message $message) { + if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) { + $errorStr = rd_kafka_err2str($message->err); + + echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL; + } else { + // message successfully delivered + echo sprintf('Message sent SUCCESSFULLY with payload => %s', $message->payload) . PHP_EOL; + } +}); +//var_dump($conf->dump()); + +$topicConf = new TopicConf(); +$topicConf->set('message.timeout.ms', (string) 30000); +$topicConf->set('request.required.acks', (string) -1); +$topicConf->set('request.timeout.ms', (string) 5000); +//var_dump($topicConf->dump()); + +$producer = new Producer($conf); + +$topicName = $argv[1] ?? 'test'; +$topic = $producer->newTopic($topicName, $topicConf); +//var_dump($topic); + +$metadata = $producer->getMetadata(false, $topic, 1000); +//var_dump($metadata->getOrigBrokerName()); +//var_dump($metadata->getOrigBrokerId()); +//var_dump($metadata->getBrokers()); +//var_dump($metadata->getTopics()); + + +for ($i = 0; $i < 1; $i++) { + $key = $i % 10; + $payload = sprintf('payload-%d-%s', $i, $key); + echo sprintf('produce msg: %s', $payload) . PHP_EOL; + $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, (string) $key); + // triggers log output + $events = $producer->poll(1); + echo sprintf('polling triggered %d events', $events) . PHP_EOL; +} + +$producer->flush(5000); diff --git a/tests/Integrations/Kafka/scripts/producerv.php b/tests/Integrations/Kafka/scripts/producerv.php new file mode 100644 index 0000000000..b1b14920f9 --- /dev/null +++ b/tests/Integrations/Kafka/scripts/producerv.php @@ -0,0 +1,77 @@ +set('bootstrap.servers', 'kafka_integration:9092'); +$conf->set('socket.timeout.ms', (string) 50); +$conf->set('queue.buffering.max.messages', (string) 1000); +$conf->set('max.in.flight.requests.per.connection', (string) 1); +$conf->setDrMsgCb( + function (Producer $producer, Message $message): void { + if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) { + //var_dump($message->errstr()); + } + //var_dump($message); + } +); +//$conf->set('log_level', (string) LOG_DEBUG); +//$conf->set('debug', 'all'); +$conf->setLogCb( + function (Producer $producer, int $level, string $facility, string $message): void { + echo sprintf('log: %d %s %s', $level, $facility, $message) . PHP_EOL; + } +); +$conf->set('statistics.interval.ms', (string) 1000); +$conf->setStatsCb( + function (Producer $producer, string $json, int $json_len, $opaque = null): void { + echo "stats: ${json}" . PHP_EOL; + } +); +$conf->setDrMsgCb(function (Producer $kafka, Message $message) { + if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) { + $errorStr = rd_kafka_err2str($message->err); + + echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL; + } else { + // message successfully delivered + echo sprintf('Message sent SUCCESSFULLY with payload => %s', $message->payload) . PHP_EOL; + } +}); +//var_dump($conf->dump()); + +$topicConf = new TopicConf(); +$topicConf->set('message.timeout.ms', (string) 30000); +$topicConf->set('request.required.acks', (string) -1); +$topicConf->set('request.timeout.ms', (string) 5000); +//var_dump($topicConf->dump()); + +$producer = new Producer($conf); + +$topicName = $argv[1] ?? 'test'; +$topic = $producer->newTopic($topicName, $topicConf); +//var_dump($topic); + +$metadata = $producer->getMetadata(false, $topic, 1000); +//var_dump($metadata->getOrigBrokerName()); +//var_dump($metadata->getOrigBrokerId()); +//var_dump($metadata->getBrokers()); +//var_dump($metadata->getTopics()); + + +for ($i = 0; $i < 1; $i++) { + $key = $i % 10; + $payload = sprintf('payload-%d-%s', $i, $key); + echo sprintf('produce msg: %s', $payload) . PHP_EOL; + $topic->producev(RD_KAFKA_PARTITION_UA, 0, $payload, (string) $key); + // triggers log output + $events = $producer->poll(1); + echo sprintf('polling triggered %d events', $events) . PHP_EOL; +} + +$producer->flush(5000); diff --git a/tests/api/Unit/UserAvailableConstantsTest.php b/tests/api/Unit/UserAvailableConstantsTest.php index a80af0b0a4..aa50416356 100644 --- a/tests/api/Unit/UserAvailableConstantsTest.php +++ b/tests/api/Unit/UserAvailableConstantsTest.php @@ -24,9 +24,7 @@ public function types() [Type::WEB_SERVLET, 'web'], [Type::CLI, 'cli'], [Type::SQL, 'sql'], - - [Type::MESSAGE_CONSUMER, 'queue'], - [Type::MESSAGE_PRODUCER, 'queue'], + [Type::QUEUE, 'queue'], [Type::CASSANDRA, 'cassandra'], [Type::ELASTICSEARCH, 'elasticsearch'], @@ -138,6 +136,15 @@ public function tags() [Tag::DB_ROW_COUNT, 'db.row_count'], [Tag::DB_STMT, 'db.statement'], [Tag::DB_USER, 'db.user'], + [Tag::KAFKA_CLIENT_ID, 'messaging.kafka.client_id'], + [Tag::KAFKA_GROUP_ID, 'messaging.kafka.group_id'], + [Tag::KAFKA_HOST_LIST, 'messaging.kafka.bootstrap.servers'], + [Tag::KAFKA_MESSAGE_KEY, 'messaging.kafka.message_key'], + [Tag::KAFKA_MESSAGE_OFFSET, 'messaging.kafka.message_offset'], + [Tag::KAFKA_PARTITION, 'messaging.kafka.partition'], + [Tag::KAFKA_TOMBSTONE, 'messaging.kafka.tombstone'], + [Tag::KAFKA_PRODUCE, 'kafka.produce'], + [Tag::KAFKA_CONSUME, 'kafka.consume'], [Tag::LARAVELQ_ATTEMPTS, 'messaging.laravel.attempts'], [Tag::LARAVELQ_BATCH_ID, 'messaging.laravel.batch_id'], [Tag::LARAVELQ_CONNECTION, 'messaging.laravel.connection'], @@ -165,6 +172,9 @@ public function tags() [Tag::MQ_MESSAGE_PAYLOAD_SIZE, 'messaging.message_payload_size_bytes'], [Tag::MQ_OPERATION, 'messaging.operation'], [Tag::MQ_CONSUMER_ID, 'messaging.consumer_id'], + [Tag::MQ_OPERATION_PROCESS, 'process'], + [Tag::MQ_OPERATION_RECEIVE, 'receive'], + [Tag::MQ_OPERATION_SEND, 'send'], [Tag::RABBITMQ_DELIVERY_MODE, 'messaging.rabbitmq.delivery_mode'], [Tag::RABBITMQ_EXCHANGE, 'messaging.rabbitmq.exchange'], [Tag::RABBITMQ_ROUTING_KEY, 'messaging.rabbitmq.routing_key'], diff --git a/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_producer.json b/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_producer.json new file mode 100644 index 0000000000..2aabcf6f8d --- /dev/null +++ b/tests/snapshots/tests.integrations.kafka_test.test_distributed_tracing_high_level_producer.json @@ -0,0 +1,29 @@ +[[ + { + "name": "kafka.produce", + "service": "kafka_test", + "resource": "kafka.produce", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "component": "kafka", + "messaging.destination": "test-highlevel", + "messaging.destination_kind": "queue", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.message_key": "0", + "messaging.operation": "send", + "messaging.system": "kafka", + "runtime-id": "199c66d4-0b85-4f42-b130-c204bb4b6951", + "span.kind": "producer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1, + "messaging.kafka.partition": -1, + "messaging.message_payload_size_bytes": 11 + } + }]] diff --git a/tests/snapshots/tests.integrations.kafka_test.test_span_links_high_level_consumer.json b/tests/snapshots/tests.integrations.kafka_test.test_span_links_high_level_consumer.json new file mode 100644 index 0000000000..119fd0e276 --- /dev/null +++ b/tests/snapshots/tests.integrations.kafka_test.test_span_links_high_level_consumer.json @@ -0,0 +1,62 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka_test", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "_dd.span_links": "[{\"trace_id\":\"0000000000000000c151df7d6ee5e2d6\",\"span_id\":\"c151df7d6ee5e2d6\",\"trace_state\":\"dd=t.dm:-0\",\"attributes\":{\"_dd.p.dm\":\"-0\"}}]", + "component": "kafka", + "messaging.destination": "test-highlevel", + "messaging.destination_kind": "queue", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.group_id": "consumer-highlevel", + "messaging.operation": "receive", + "messaging.system": "kafka", + "runtime-id": "bff4ee81-2002-4ffb-9ec3-13308f9f7dde", + "span.kind": "consumer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1, + "messaging.kafka.message_offset": 43, + "messaging.kafka.partition": 0, + "messaging.message_payload_size_bytes": 11 + } + }], +[ + { + "name": "kafka.consume", + "service": "kafka_test", + "resource": "kafka.consume", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "component": "kafka", + "messaging.destination": "test-highlevel", + "messaging.destination_kind": "queue", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.group_id": "consumer-highlevel", + "messaging.kafka.tombstone": "true", + "messaging.operation": "receive", + "messaging.system": "kafka", + "runtime-id": "bff4ee81-2002-4ffb-9ec3-13308f9f7dde", + "span.kind": "consumer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1, + "messaging.kafka.message_offset": 44, + "messaging.kafka.partition": 0, + "messaging.message_payload_size_bytes": 24 + } + }]] diff --git a/tests/snapshots/tests.integrations.kafka_test.test_span_links_high_level_producer.json b/tests/snapshots/tests.integrations.kafka_test.test_span_links_high_level_producer.json new file mode 100644 index 0000000000..c346351a8c --- /dev/null +++ b/tests/snapshots/tests.integrations.kafka_test.test_span_links_high_level_producer.json @@ -0,0 +1,29 @@ +[[ + { + "name": "kafka.produce", + "service": "kafka_test", + "resource": "kafka.produce", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "component": "kafka", + "messaging.destination": "test-highlevel", + "messaging.destination_kind": "queue", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.message_key": "0", + "messaging.operation": "send", + "messaging.system": "kafka", + "runtime-id": "a4b75b9c-c738-46a2-8986-f857f27de07d", + "span.kind": "producer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1, + "messaging.kafka.partition": -1, + "messaging.message_payload_size_bytes": 11 + } + }]] diff --git a/tests/snapshots/tests.integrations.kafka_test.test_span_links_low_level_consumer.json b/tests/snapshots/tests.integrations.kafka_test.test_span_links_low_level_consumer.json new file mode 100644 index 0000000000..bdbec974e1 --- /dev/null +++ b/tests/snapshots/tests.integrations.kafka_test.test_span_links_low_level_consumer.json @@ -0,0 +1,88 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka_test", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "_dd.span_links": "[{\"trace_id\":\"0000000000000000c151df7d6ee5e2d6\",\"span_id\":\"c151df7d6ee5e2d6\",\"trace_state\":\"dd=t.dm:-0\",\"attributes\":{\"_dd.p.dm\":\"-0\"}}]", + "component": "kafka", + "messaging.destination": "test-lowlevel", + "messaging.destination_kind": "queue", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.group_id": "consumer-lowlevel", + "messaging.operation": "receive", + "messaging.system": "kafka", + "runtime-id": "cb546b0e-9a6f-450b-8bf2-359f968d021f", + "span.kind": "consumer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1, + "messaging.kafka.message_offset": 20, + "messaging.kafka.partition": 0, + "messaging.message_payload_size_bytes": 11 + } + }], +[ + { + "name": "kafka.consume", + "service": "kafka_test", + "resource": "kafka.consume", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "component": "kafka", + "messaging.destination": "test-lowlevel", + "messaging.destination_kind": "queue", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.group_id": "consumer-lowlevel", + "messaging.kafka.tombstone": "true", + "messaging.operation": "receive", + "messaging.system": "kafka", + "runtime-id": "cb546b0e-9a6f-450b-8bf2-359f968d021f", + "span.kind": "consumer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1, + "messaging.kafka.message_offset": 21, + "messaging.kafka.partition": 0, + "messaging.message_payload_size_bytes": 24 + } + }], +[ + { + "name": "kafka.consume", + "service": "kafka_test", + "resource": "kafka.consume", + "trace_id": 2, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "component": "kafka", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.group_id": "consumer-lowlevel", + "messaging.kafka.tombstone": "true", + "messaging.operation": "receive", + "messaging.system": "kafka", + "runtime-id": "cb546b0e-9a6f-450b-8bf2-359f968d021f", + "span.kind": "consumer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1 + } + }]] diff --git a/tests/snapshots/tests.integrations.kafka_test.test_span_links_low_level_producer.json b/tests/snapshots/tests.integrations.kafka_test.test_span_links_low_level_producer.json new file mode 100644 index 0000000000..4076532b92 --- /dev/null +++ b/tests/snapshots/tests.integrations.kafka_test.test_span_links_low_level_producer.json @@ -0,0 +1,29 @@ +[[ + { + "name": "kafka.produce", + "service": "kafka_test", + "resource": "kafka.produce", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "queue", + "meta": { + "_dd.p.dm": "0", + "component": "kafka", + "messaging.destination": "test-lowlevel", + "messaging.destination_kind": "queue", + "messaging.kafka.bootstrap.servers": "kafka_integration:9092", + "messaging.kafka.client_id": "rdkafka", + "messaging.kafka.message_key": "0", + "messaging.operation": "send", + "messaging.system": "kafka", + "runtime-id": "b34f8a96-ec1b-4fb1-b695-c4bf718a5a48", + "span.kind": "producer" + }, + "metrics": { + "_dd.agent_psr": 1, + "_sampling_priority_v1": 1, + "messaging.kafka.partition": -1, + "messaging.message_payload_size_bytes": 11 + } + }]]