From a8a6f458bbc1d5042322ad1f9b65eeb0b69be9ea Mon Sep 17 00:00:00 2001 From: Yang Song Date: Mon, 5 Aug 2019 15:24:08 -0700 Subject: [PATCH] Add a method Exporter.ExportMetricsServiceRequest that accepts proto metrics (#75) This will be used when Agent sends proto metrics to Collector. Also add proto resource to export requests. Related to https://github.com/open-telemetry/opentelemetry-service/issues/231. --- ocagent.go | 78 ++++++++++++++++++++++++++----------- viewdata_to_metrics_test.go | 1 + 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/ocagent.go b/ocagent.go index 8c43eb9..24f4107 100644 --- a/ocagent.go +++ b/ocagent.go @@ -431,6 +431,49 @@ func (ae *Exporter) ExportView(vd *view.Data) { _ = ae.viewDataBundler.Add(vd, 1) } +// ExportMetricsServiceRequest sends proto metrics with the metrics service client. +func (ae *Exporter) ExportMetricsServiceRequest(batch *agentmetricspb.ExportMetricsServiceRequest) error { + if batch == nil || len(batch.Metrics) == 0 { + return nil + } + + select { + case <-ae.stopCh: + return errStopped + + default: + if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil { + return fmt.Errorf("ExportMetricsServiceRequest: no active connection, last connection error: %v", lastConnectErr) + } + + ae.senderMu.Lock() + err := ae.metricsExporter.Send(batch) + ae.senderMu.Unlock() + if err != nil { + if err == io.EOF { + ae.recvMu.Lock() + // Perform a .Recv to try to find out why the RPC actually ended. + // See: + // * https://github.com/grpc/grpc-go/blob/d389f9fac68eea0dcc49957d0b4cca5b3a0a7171/stream.go#L98-L100 + // * https://groups.google.com/forum/#!msg/grpc-io/XcN4hA9HonI/F_UDiejTAwAJ + for { + _, err = ae.metricsExporter.Recv() + if err != nil { + break + } + } + ae.recvMu.Unlock() + } + + ae.setStateDisconnected(err) + if err != io.EOF { + return err + } + } + return nil + } +} + func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span { if len(sdl) == 0 { return nil @@ -460,7 +503,8 @@ func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) { } ae.senderMu.Lock() err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{ - Spans: protoSpans, + Spans: protoSpans, + Resource: resourceProtoFromEnv(), }) ae.senderMu.Unlock() if err != nil { @@ -487,30 +531,18 @@ func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric { } func (ae *Exporter) uploadViewData(vdl []*view.Data) { - select { - case <-ae.stopCh: + protoMetrics := ocViewDataToPbMetrics(vdl) + if len(protoMetrics) == 0 { return - - default: - if !ae.connected() { - return - } - - protoMetrics := ocViewDataToPbMetrics(vdl) - if len(protoMetrics) == 0 { - return - } - err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{ - Metrics: protoMetrics, - // TODO:(@odeke-em) - // a) Figure out how to derive a Node from the environment - // b) Figure out how to derive a Resource from the environment - // or better letting users of the exporter configure it. - }) - if err != nil { - ae.setStateDisconnected(err) - } } + req := &agentmetricspb.ExportMetricsServiceRequest{ + Metrics: protoMetrics, + Resource: resourceProtoFromEnv(), + // TODO:(@odeke-em) + // a) Figure out how to derive a Node from the environment + // or better letting users of the exporter configure it. + } + ae.ExportMetricsServiceRequest(req) } func (ae *Exporter) Flush() { diff --git a/viewdata_to_metrics_test.go b/viewdata_to_metrics_test.go index 0f14865..e940ea5 100644 --- a/viewdata_to_metrics_test.go +++ b/viewdata_to_metrics_test.go @@ -135,6 +135,7 @@ func TestExportMetrics_conversionFromViewData(t *testing.T) { }, }, }, + Resource: resourceProtoFromEnv(), }, }