Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
Add a method Exporter.ExportMetricsServiceRequest that accepts proto …
Browse files Browse the repository at this point in the history
…metrics (#75)

This will be used when Agent sends proto metrics to Collector.
Also add proto resource to export requests.

Related to open-telemetry/opentelemetry-collector#231.
  • Loading branch information
songy23 authored Aug 5, 2019
1 parent 8110e6c commit a8a6f45
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
78 changes: 55 additions & 23 deletions ocagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,49 @@ func (ae *Exporter) ExportView(vd *view.Data) {
_ = ae.viewDataBundler.Add(vd, 1)
}

// ExportMetricsServiceRequest sends proto metrics with the metrics service client.
func (ae *Exporter) ExportMetricsServiceRequest(batch *agentmetricspb.ExportMetricsServiceRequest) error {
if batch == nil || len(batch.Metrics) == 0 {
return nil
}

select {
case <-ae.stopCh:
return errStopped

default:
if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil {
return fmt.Errorf("ExportMetricsServiceRequest: no active connection, last connection error: %v", lastConnectErr)
}

ae.senderMu.Lock()
err := ae.metricsExporter.Send(batch)
ae.senderMu.Unlock()
if err != nil {
if err == io.EOF {
ae.recvMu.Lock()
// Perform a .Recv to try to find out why the RPC actually ended.
// See:
// * https://github.com/grpc/grpc-go/blob/d389f9fac68eea0dcc49957d0b4cca5b3a0a7171/stream.go#L98-L100
// * https://groups.google.com/forum/#!msg/grpc-io/XcN4hA9HonI/F_UDiejTAwAJ
for {
_, err = ae.metricsExporter.Recv()
if err != nil {
break
}
}
ae.recvMu.Unlock()
}

ae.setStateDisconnected(err)
if err != io.EOF {
return err
}
}
return nil
}
}

func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span {
if len(sdl) == 0 {
return nil
Expand Down Expand Up @@ -460,7 +503,8 @@ func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) {
}
ae.senderMu.Lock()
err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{
Spans: protoSpans,
Spans: protoSpans,
Resource: resourceProtoFromEnv(),
})
ae.senderMu.Unlock()
if err != nil {
Expand All @@ -487,30 +531,18 @@ func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric {
}

func (ae *Exporter) uploadViewData(vdl []*view.Data) {
select {
case <-ae.stopCh:
protoMetrics := ocViewDataToPbMetrics(vdl)
if len(protoMetrics) == 0 {
return

default:
if !ae.connected() {
return
}

protoMetrics := ocViewDataToPbMetrics(vdl)
if len(protoMetrics) == 0 {
return
}
err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{
Metrics: protoMetrics,
// TODO:(@odeke-em)
// a) Figure out how to derive a Node from the environment
// b) Figure out how to derive a Resource from the environment
// or better letting users of the exporter configure it.
})
if err != nil {
ae.setStateDisconnected(err)
}
}
req := &agentmetricspb.ExportMetricsServiceRequest{
Metrics: protoMetrics,
Resource: resourceProtoFromEnv(),
// TODO:(@odeke-em)
// a) Figure out how to derive a Node from the environment
// or better letting users of the exporter configure it.
}
ae.ExportMetricsServiceRequest(req)
}

func (ae *Exporter) Flush() {
Expand Down
1 change: 1 addition & 0 deletions viewdata_to_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func TestExportMetrics_conversionFromViewData(t *testing.T) {
},
},
},
Resource: resourceProtoFromEnv(),
},
}

Expand Down

0 comments on commit a8a6f45

Please sign in to comment.