Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud Services Entity Relationship Changes #952

Merged
merged 7 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion v3/integrations/nramqp/examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func main() {

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
amqpURL := "amqp://guest:guest@localhost:5672/"
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

Expand Down
10 changes: 7 additions & 3 deletions v3/integrations/nramqp/examples/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ type amqpServer struct {
ch *amqp.Channel
exchange string
routingKey string
url string
}

func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string) *amqpServer {
func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string, url string) *amqpServer {
return &amqpServer{
channel,
exchangeName,
routingKeyName,
url,
}
}

Expand All @@ -65,6 +67,7 @@ func (serv *amqpServer) publishPlainTxtMessage(w http.ResponseWriter, r *http.Re
ctx,
serv.exchange, // exchange
serv.routingKey, // routing key
serv.url, // url
false, // mandatory
false, // immediate
amqp.Publishing{
Expand Down Expand Up @@ -94,7 +97,8 @@ func main() {

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
amqpURL := "amqp://guest:guest@localhost:5672/"
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

Expand All @@ -112,7 +116,7 @@ func main() {
)
failOnError(err, "Failed to declare a queue")

server := NewServer(ch, "", q.Name)
server := NewServer(ch, "", q.Name, amqpURL)

http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/", server.index))
http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/message", server.publishPlainTxtMessage))
Expand Down
43 changes: 35 additions & 8 deletions v3/integrations/nramqp/nramqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nramqp

import (
"context"
"strings"

amqp "github.com/rabbitmq/amqp091-go"

Expand All @@ -16,7 +17,7 @@ const (

func init() { internal.TrackUsage("integration", "messagebroker", "nramqp") }

func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment {
func createProducerSegment(exchange, key string) *newrelic.MessageProducerSegment {
s := newrelic.MessageProducerSegment{
Library: RabbitMQLibrary,
DestinationName: "Default",
Expand All @@ -33,13 +34,34 @@ func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment
return &s
}

func GetHostAndPortFromURL(url string) (string, string) {
// url is of format amqp://user:password@host:port or amqp://host:port
var hostPortPart string

// extract the part after "@" symbol, if present
if parts := strings.Split(url, "@"); len(parts) == 2 {
hostPortPart = parts[1]
} else {
// assume the whole url after "amqp://" is the host:port part
hostPortPart = strings.TrimPrefix(url, "amqp://")
}

// split the host:port part
strippedURL := strings.Split(hostPortPart, ":")
if len(strippedURL) != 2 {
return "", ""
}
return strippedURL[0], strippedURL[1]
}

// PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment.
// It will also inject distributed tracing headers into the message.
func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key, url string, mandatory, immediate bool, msg amqp.Publishing) error {
host, port := GetHostAndPortFromURL(url)
txn := newrelic.FromContext(ctx)
if txn != nil {
// generate message broker segment
s := creatProducerSegment(exchange, key)
s := createProducerSegment(exchange, key)

// capture telemetry for AMQP producer
if msg.Headers != nil && len(msg.Headers) > 0 {
Expand All @@ -49,15 +71,18 @@ func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key str
}
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr)
}
s.StartTime = txn.StartSegmentNow()

// inject DT headers into headers object
msg.Headers = injectDtHeaders(txn, msg.Headers)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeSpanKind, "producer")
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeServerAddress, host)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeServerPort, port)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageDestinationName, exchange)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageRoutingKey, key)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageCorrelationID, msg.CorrelationId)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageReplyTo, msg.ReplyTo)

// inject DT headers into headers object
msg.Headers = injectDtHeaders(txn, msg.Headers)

s.StartTime = txn.StartSegmentNow()
err := ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
s.End()
return err
Expand Down Expand Up @@ -91,8 +116,10 @@ func Consume(app *newrelic.Application, ch *amqp.Channel, queue, consumer string
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr, nil)
}
}

integrationsupport.AddAgentAttribute(txn, newrelic.AttributeSpanKind, "consumer", nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageQueueName, queue, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageDestinationName, queue, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessagingDestinationPublishName, delivery.Exchange, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, delivery.RoutingKey, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, delivery.CorrelationId, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, delivery.ReplyTo, nil)
Expand Down
56 changes: 54 additions & 2 deletions v3/integrations/nramqp/nramqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func BenchmarkCreateProducerSegment(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
creatProducerSegment("exchange", "key")
createProducerSegment("exchange", "key")
}
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func TestCreateProducerSegment(t *testing.T) {
}

for _, test := range tests {
s := creatProducerSegment(test.exchange, test.key)
s := createProducerSegment(test.exchange, test.key)
if s.DestinationName != test.expect.DestinationName {
t.Errorf("expected destination name %s, got %s", test.expect.DestinationName, s.DestinationName)
}
Expand All @@ -76,3 +76,55 @@ func TestCreateProducerSegment(t *testing.T) {
}

}

func TestHostAndPortParsing(t *testing.T) {
app := createTestApp()
txn := app.StartTransaction("test")
defer txn.End()

type testObject struct {
url string
expectHost string
expectPort string
}

tests := []testObject{
{
"amqp://user:password@host:port",
"host",
"port",
},
{
"amqp://host:port",
"host",
"port",
},
{
"aaa://host:port",
"",
"",
},

{
"amqp://user:password@host",
"",
"",
},
{
"amqp://user:password@host:port:extra",
"",
"",
},
}

for _, test := range tests {
host, port := GetHostAndPortFromURL(test.url)
if host != test.expectHost {
t.Errorf("expected host %s, got %s", test.expectHost, host)
}
if port != test.expectPort {
t.Errorf("expected port %s, got %s", test.expectPort, port)
}
}

}
42 changes: 35 additions & 7 deletions v3/integrations/nrawssdk-v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,45 @@ module github.com/newrelic/go-agent/v3/integrations/nrawssdk-v2

// As of May 2021, the aws-sdk-go-v2 go.mod file uses 1.15:
// https://github.com/aws/aws-sdk-go-v2/blob/master/go.mod
go 1.20
go 1.21

toolchain go1.21.0

require (
github.com/aws/aws-sdk-go-v2 v1.16.15
github.com/aws/aws-sdk-go-v2/config v1.17.6
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.0
github.com/aws/aws-sdk-go-v2/service/lambda v1.24.5
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.10
github.com/aws/smithy-go v1.13.3
github.com/aws/aws-sdk-go-v2 v1.30.4
github.com/aws/aws-sdk-go-v2/config v1.27.31
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.6
github.com/aws/aws-sdk-go-v2/service/lambda v1.58.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.6
github.com/aws/smithy-go v1.20.4
github.com/newrelic/go-agent/v3 v3.33.1
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.30 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.56.3 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

replace github.com/newrelic/go-agent/v3 => ../..
75 changes: 75 additions & 0 deletions v3/integrations/nrawssdk-v2/nrawssdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ package nrawssdk

import (
"context"
"net/url"
"strconv"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
awsmiddle "github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go/middleware"
smithymiddle "github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"
"github.com/newrelic/go-agent/v3/internal/integrationsupport"
Expand All @@ -41,6 +46,11 @@ type nrMiddleware struct {
txn *newrelic.Transaction
}

// Context key for SQS service queue
type contextKey string

const queueURLKey contextKey = "QueueURL"

type endable interface{ End() }

// See https://aws.github.io/aws-sdk-go-v2/docs/middleware/ for a description of
Expand Down Expand Up @@ -88,6 +98,24 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error {
response, ok := out.RawResponse.(*smithyhttp.Response)

if ok {
if serviceName == "sqs" || serviceName == "SQS" {
if queueURL, ok := ctx.Value(queueURLKey).(string); ok {
parsedURL, err := url.Parse(queueURL)
if err == nil {
// Example URL: https://sqs.{region}.amazonaws.com/{account.id}/{queue.name}
pathParts := strings.Split(parsedURL.Path, "/")
if len(pathParts) >= 3 {
accountID := pathParts[1]
queueName := pathParts[2]
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeCloudAccountID, accountID)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeCloudRegion, region)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageSystem, "aws_sqs")
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageDestinationName, queueName)
}
}

}
}
// Set additional span attributes
integrationsupport.AddAgentSpanAttribute(txn,
newrelic.AttributeResponseCode, strconv.Itoa(response.StatusCode))
Expand All @@ -107,6 +135,51 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error {
smithymiddle.Before)
}

func (m nrMiddleware) serializeMiddleware(stack *middleware.Stack) error {
return stack.Initialize.Add(middleware.InitializeMiddlewareFunc("NRSerializeMiddleware", func(
ctx context.Context, in middleware.InitializeInput, next middleware.InitializeHandler) (
out middleware.InitializeOutput, metadata middleware.Metadata, err error) {

serviceName := awsmiddle.GetServiceID(ctx)
if serviceName == "sqs" || serviceName == "SQS" {
QueueURL := ""
switch params := in.Parameters.(type) {
case *sqs.SendMessageInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.DeleteQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.ReceiveMessageInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.DeleteMessageInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.ChangeMessageVisibilityInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.ChangeMessageVisibilityBatchInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.DeleteMessageBatchInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.SendMessageBatchInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.PurgeQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.GetQueueAttributesInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.SetQueueAttributesInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.TagQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.UntagQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
default:
QueueURL = ""
}
// Store the QueueURL in the context
ctx = context.WithValue(ctx, queueURLKey, QueueURL)
}
return next.HandleInitialize(ctx, in)
}), middleware.After)
}

// AppendMiddlewares inserts New Relic middleware in the given `apiOptions` for
// the AWS SDK V2 for Go. It must be called only once per AWS configuration.
//
Expand Down Expand Up @@ -167,4 +240,6 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error {
func AppendMiddlewares(apiOptions *[]func(*smithymiddle.Stack) error, txn *newrelic.Transaction) {
m := nrMiddleware{txn: txn}
*apiOptions = append(*apiOptions, m.deserializeMiddleware)
*apiOptions = append(*apiOptions, m.serializeMiddleware)

}
Loading
Loading