Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added additional instrumentation #10

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _register.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use OpenTelemetry\SDK\Sdk;
use Spryker\Service\Opentelemetry\Instrumentation\ElasticaInstrumentation;
use Spryker\Service\Opentelemetry\Instrumentation\GuzzleInstrumentation;
use Spryker\Service\Opentelemetry\Instrumentation\PropelInstrumentation;
use Spryker\Service\Opentelemetry\Instrumentation\RabbitMqInstrumentation;
use Spryker\Service\Opentelemetry\Instrumentation\RedisInstrumentation;
Expand All @@ -25,4 +26,5 @@
PropelInstrumentation::register();
RabbitMqInstrumentation::register();
RedisInstrumentation::register();
GuzzleInstrumentation::register();
SprykerInstrumentationBootstrap::register();
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,62 @@ class ElasticaInstrumentation
/**
* @var string
*/
protected const METHOD_NAME = 'request';
protected const METHOD_NAME_REQUEST = 'request';

/**
* @var string
*/
protected const SPAN_NAME = 'elasticsearch-request';
protected const METHOD_NAME_UPDATE_DOCUMENTS = 'updateDocuments';

/**
* @var string
*/
protected const METHOD_NAME_UPDATE_DOCUMENT = 'updateDocument';

/**
* @var string
*/
protected const METHOD_NAME_ADD_DOCUMENTS = 'addDocuments';

/**
* @var string
*/
protected const METHOD_NAME_DELETE_DOCUMENTS = 'deleteDocuments';

/**
* @var string
*/
protected const METHOD_NAME_DELETE_IDS = 'deleteIds';

/**
* @var string
*/
protected const SPAN_NAME_REQUEST = 'elasticsearch-request';

/**
* @var string
*/
protected const SPAN_NAME_UPDATE_DOCUMENTS = 'elasticsearch-update-documents';

/**
* @var string
*/
protected const SPAN_NAME_UPDATE_DOCUMENT = 'elasticsearch-update-document';

/**
* @var string
*/
protected const SPAN_NAME_ADD_DOCUMENTS = 'elasticsearch-add-document';

/**
* @var string
*/
protected const SPAN_NAME_DELETE_DOCUMENTS = 'elasticsearch-delete-documents';

/**
* @var string
*/
protected const SPAN_NAME_DELETE_IDS = 'elasticsearch-delete-ids';

/**
* @var string
Expand All @@ -53,6 +103,21 @@ class ElasticaInstrumentation
*/
protected const ATTRIBUTE_SEARCH_INDEX = 'search.index';

/**
* @var string
*/
protected const ATTRIBUTE_SEARCH_INDEXES = 'search.indexes';

/**
* @var string
*/
protected const ATTRIBUTE_SEARCH_ID = 'search.id';

/**
* @var string
*/
protected const ATTRIBUTE_SEARCH_IDS = 'search.ids';

/**
* @var string
*/
Expand All @@ -79,7 +144,7 @@ public static function register(): void

hook(
class: Client::class,
gechetspr marked this conversation as resolved.
Show resolved Hide resolved
function: static::METHOD_NAME,
function: static::METHOD_NAME_REQUEST,
pre: function (Client $client, array $params): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
dimitriyTsemma marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -90,7 +155,7 @@ function: static::METHOD_NAME,
$context = Context::getCurrent();

$span = $instrumentation->tracer()
->spanBuilder(static::SPAN_NAME)
->spanBuilder(static::SPAN_NAME_REQUEST)
->setSpanKind(SpanKind::KIND_CLIENT)
->setParent($context)
->setAttribute(CriticalSpanRatioSampler::IS_CRITICAL_ATTRIBUTE, true)
Expand Down Expand Up @@ -126,5 +191,278 @@ function: static::METHOD_NAME,
$span->end();
},
);

hook(
class: Client::class,
function: static::METHOD_NAME_UPDATE_DOCUMENTS,
pre: function (Client $client, array $params): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$instrumentation = CachedInstrumentation::getCachedInstrumentation();
$request = new RequestProcessor();
$context = Context::getCurrent();

$indexes = static::getIndexesIndexedByIdsFromDocuments($params[0]);

$span = $instrumentation->tracer()
->spanBuilder(static::SPAN_NAME_UPDATE_DOCUMENTS)
->setSpanKind(SpanKind::KIND_CLIENT)
->setParent($context)
->setAttribute(CriticalSpanRatioSampler::IS_CRITICAL_ATTRIBUTE, true)
->setAttribute(static::ATTRIBUTE_SEARCH_INDEXES, implode(',', $indexes))
->setAttribute(static::ATTRIBUTE_SEARCH_IDS, implode(',', array_keys($indexes)))
->setAttribute(static::ATTRIBUTE_ROOT_URL, $request->getRequest()->getUri())
->setAttribute(TraceAttributes::URL_DOMAIN, $request->getRequest()->headers->get(static::HEADER_HOST))
->startSpan();

Context::storage()->attach($span->storeInContext($context));
},
post: function (Client $client, array $params, $response, ?Throwable $exception): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$scope = Context::storage()->scope();

if ($scope === null) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());
if ($exception !== null) {
$span->recordException($exception);
$span->setStatus(StatusCode::STATUS_ERROR);
} else {
$span->setAttribute(static::ATTRIBUTE_QUERY_TIME, $response->getQueryTime());
$span->setStatus(StatusCode::STATUS_OK);
}

$span->end();
},
);

hook(
class: Client::class,
function: static::METHOD_NAME_ADD_DOCUMENTS,
pre: function (Client $client, array $params): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$instrumentation = CachedInstrumentation::getCachedInstrumentation();
$request = new RequestProcessor();
$context = Context::getCurrent();

$indexes = static::getIndexesIndexedByIdsFromDocuments($params[0]);

$span = $instrumentation->tracer()
->spanBuilder(static::SPAN_NAME_ADD_DOCUMENTS)
->setSpanKind(SpanKind::KIND_CLIENT)
->setParent($context)
->setAttribute(CriticalSpanRatioSampler::IS_CRITICAL_ATTRIBUTE, true)
->setAttribute(static::ATTRIBUTE_SEARCH_INDEXES, implode(',', $indexes))
->setAttribute(static::ATTRIBUTE_SEARCH_IDS, implode(',', array_keys($indexes)))
->setAttribute(static::ATTRIBUTE_ROOT_URL, $request->getRequest()->getUri())
->setAttribute(TraceAttributes::URL_DOMAIN, $request->getRequest()->headers->get(static::HEADER_HOST))
->startSpan();

Context::storage()->attach($span->storeInContext($context));
},
post: function (Client $client, array $params, $response, ?Throwable $exception): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$scope = Context::storage()->scope();

if ($scope === null) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());
if ($exception !== null) {
$span->recordException($exception);
$span->setStatus(StatusCode::STATUS_ERROR);
} else {
$span->setAttribute(static::ATTRIBUTE_QUERY_TIME, $response->getQueryTime());
$span->setStatus(StatusCode::STATUS_OK);
}

$span->end();
},
);

hook(
class: Client::class,
function: static::METHOD_NAME_UPDATE_DOCUMENT,
pre: function (Client $client, array $params): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$instrumentation = CachedInstrumentation::getCachedInstrumentation();
$request = new RequestProcessor();
$context = Context::getCurrent();

$span = $instrumentation->tracer()
->spanBuilder(static::SPAN_NAME_UPDATE_DOCUMENT)
->setSpanKind(SpanKind::KIND_CLIENT)
->setParent($context)
->setAttribute(CriticalSpanRatioSampler::IS_CRITICAL_ATTRIBUTE, true)
->setAttribute(static::ATTRIBUTE_SEARCH_ID, $params[0])
->setAttribute(static::ATTRIBUTE_SEARCH_INDEX, $params[2])
->setAttribute(static::ATTRIBUTE_ROOT_URL, $request->getRequest()->getUri())
->setAttribute(TraceAttributes::URL_DOMAIN, $request->getRequest()->headers->get(static::HEADER_HOST))
->startSpan();

Context::storage()->attach($span->storeInContext($context));
},
post: function (Client $client, array $params, $response, ?Throwable $exception): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$scope = Context::storage()->scope();

if ($scope === null) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());
if ($exception !== null) {
$span->recordException($exception);
$span->setStatus(StatusCode::STATUS_ERROR);
} else {
$span->setAttribute(static::ATTRIBUTE_QUERY_TIME, $response->getQueryTime());
$span->setStatus(StatusCode::STATUS_OK);
}

$span->end();
},
);

hook(
class: Client::class,
function: static::METHOD_NAME_DELETE_DOCUMENTS,
pre: function (Client $client, array $params): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$instrumentation = CachedInstrumentation::getCachedInstrumentation();
$request = new RequestProcessor();
$context = Context::getCurrent();

$indexes = static::getIndexesIndexedByIdsFromDocuments($params[0]);

$span = $instrumentation->tracer()
->spanBuilder(static::SPAN_NAME_DELETE_DOCUMENTS)
->setSpanKind(SpanKind::KIND_CLIENT)
->setParent($context)
->setAttribute(CriticalSpanRatioSampler::IS_CRITICAL_ATTRIBUTE, true)
->setAttribute(static::ATTRIBUTE_SEARCH_INDEXES, implode(',', $indexes))
->setAttribute(static::ATTRIBUTE_SEARCH_IDS, implode(',', array_keys($indexes)))
->setAttribute(static::ATTRIBUTE_ROOT_URL, $request->getRequest()->getUri())
->setAttribute(TraceAttributes::URL_DOMAIN, $request->getRequest()->headers->get(static::HEADER_HOST))
->startSpan();

Context::storage()->attach($span->storeInContext($context));
},
post: function (Client $client, array $params, $response, ?Throwable $exception): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$scope = Context::storage()->scope();

if ($scope === null) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());
if ($exception !== null) {
$span->recordException($exception);
$span->setStatus(StatusCode::STATUS_ERROR);
} else {
$span->setAttribute(static::ATTRIBUTE_QUERY_TIME, $response->getQueryTime());
$span->setStatus(StatusCode::STATUS_OK);
}

$span->end();
},
);

hook(
class: Client::class,
function: static::METHOD_NAME_DELETE_IDS,
pre: function (Client $client, array $params): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$instrumentation = CachedInstrumentation::getCachedInstrumentation();
$request = new RequestProcessor();
$context = Context::getCurrent();

$indexes = static::getIndexesIndexedByIdsFromDocuments($params[0]);

$span = $instrumentation->tracer()
->spanBuilder(static::SPAN_NAME_DELETE_IDS)
->setSpanKind(SpanKind::KIND_CLIENT)
->setParent($context)
->setAttribute(CriticalSpanRatioSampler::IS_CRITICAL_ATTRIBUTE, true)
->setAttribute(static::ATTRIBUTE_SEARCH_INDEXES, implode(',', $indexes))
->setAttribute(static::ATTRIBUTE_SEARCH_IDS, implode(',', array_keys($indexes)))
->setAttribute(static::ATTRIBUTE_ROOT_URL, $request->getRequest()->getUri())
->setAttribute(TraceAttributes::URL_DOMAIN, $request->getRequest()->headers->get(static::HEADER_HOST))
->startSpan();

Context::storage()->attach($span->storeInContext($context));
},
post: function (Client $client, array $params, $response, ?Throwable $exception): void {
if (TraceSampleResult::shouldSkipTraceBody()) {
return;
}

$scope = Context::storage()->scope();

if ($scope === null) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());
if ($exception !== null) {
$span->recordException($exception);
$span->setStatus(StatusCode::STATUS_ERROR);
} else {
$span->setAttribute(static::ATTRIBUTE_QUERY_TIME, $response->getQueryTime());
$span->setStatus(StatusCode::STATUS_OK);
}

$span->end();
},
);
}

/**
* @param array<\Elastica\Document> $documents
*
* @return array<string, string>
*/
protected static function getIndexesIndexedByIdsFromDocuments(array $documents): array
{
$indexes = [];
foreach ($documents as $document) {
$indexes[$document->getId()] = $document->getIndex();
}

return $indexes;
}
}
Loading
Loading