Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2810 Switch to polling monitoring when running within a FaaS environment #1376

Merged
merged 33 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fbbafce
GODRIVER-2810 Switch to polling monitoring when running within a FaaS…
prestonvasquez Sep 5, 2023
2a88672
GODRIVER-2972 Fix wiremessage RequestID race in operation.Execute
prestonvasquez Sep 6, 2023
8c6de75
Merge branch 'GODRIVER-2972' into GODRIVER-2810
prestonvasquez Sep 6, 2023
7a61bdb
GODRIVER-2810 Correct expected order for test assertion
prestonvasquez Sep 6, 2023
d5a6c99
GODRIVER-2810 Fix hello test faas getter reference
prestonvasquez Sep 6, 2023
8277d88
GODRIVER-2935 Use OP_QUERY in connection handshakes
prestonvasquez Sep 6, 2023
fb83d15
GODRIVER-2810 restore legacy hello
prestonvasquez Sep 6, 2023
728f277
GODRIVER-2935 Update sent_message logic to include OP_QUERY for hello
prestonvasquez Sep 6, 2023
9fec743
GODRIVER-2935 Add LegacyHandshake operation logic
prestonvasquez Sep 6, 2023
7c38cd1
Merge branch 'GODRIVER-2935' into GODRIVER-2810
prestonvasquez Sep 6, 2023
fcc4db7
GODRIVER-2810 Clean up handshake prose test
prestonvasquez Sep 6, 2023
fe237bb
GODRIVER-2810 Remove debugging tools
prestonvasquez Sep 6, 2023
c343f12
GODRIVER-2935 Extend legacy check to GetHandshakeInformation
prestonvasquez Sep 7, 2023
25b7685
GODRIVER-2935 Add legacy tests back to auth and client
prestonvasquez Sep 7, 2023
c0085e1
Merge branch 'master' into GODRIVER-2935
prestonvasquez Sep 7, 2023
a62ad25
Merge branch 'GODRIVER-2935' into GODRIVER-2810
prestonvasquez Sep 7, 2023
0e8a3a9
GODRIVER-2810 Mege master
prestonvasquez Sep 7, 2023
3672bb4
GODRIVER-2810 Add licenses
prestonvasquez Sep 7, 2023
b8cba73
GODRIVER-2810 Cleanup code
prestonvasquez Sep 7, 2023
cfb1c66
GODRIVER-2810 Re-organize rtt monitor
prestonvasquez Sep 8, 2023
1a728b0
Merge branch 'master' into GODRIVER-2810
prestonvasquez Sep 8, 2023
2d54de4
GODRIVER-2810 Bump RTT tests to min 4.4
prestonvasquez Sep 8, 2023
2d5ebd9
GODRIVER-2810 Revert unecessary changes
prestonvasquez Sep 12, 2023
1f32ccd
GODRIVER-2810 Make code more dev-friendly
prestonvasquez Sep 22, 2023
9ca3369
GODRIVER-2810 Fix merge conflicts
prestonvasquez Sep 22, 2023
16b14a9
Merge branch 'v1' into GODRIVER-2810
prestonvasquez Sep 25, 2023
6287bab
GODRIVER-2810 Ensure polling does not disable CSOT
prestonvasquez Oct 12, 2023
ce91abf
Merge branch 'v1' into GODRIVER-2810
prestonvasquez Oct 17, 2023
d2798ef
GODRIVER-2810 Bump schema to 1.17
prestonvasquez Oct 17, 2023
aaa1b6a
GODRIVER-2810 Remove unecessary bool logic
prestonvasquez Oct 20, 2023
20d1b17
Merge branch 'v1' into GODRIVER-2810
prestonvasquez Oct 23, 2023
b686f7c
GODRIVER-2810 Guard rttMonitor connection
prestonvasquez Oct 27, 2023
34a8970
GODRIVER-2810 Remove unused rttMonitor field
prestonvasquez Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions internal/driverutil/hello.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (C) MongoDB, Inc. 2023-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package driverutil

import (
"os"
"strings"
)

const AwsLambdaPrefix = "AWS_Lambda_"

const (
// FaaS environment variable names

// EnvVarAWSExecutionEnv is the AWS Execution environment variable.
EnvVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
// EnvVarAWSLambdaRuntimeAPI is the AWS Lambda runtime API variable.
EnvVarAWSLambdaRuntimeAPI = "AWS_LAMBDA_RUNTIME_API"
// EnvVarFunctionsWorkerRuntime is the functions worker runtime variable.
EnvVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
// EnvVarKService is the K Service variable.
EnvVarKService = "K_SERVICE"
// EnvVarFunctionName is the function name variable.
EnvVarFunctionName = "FUNCTION_NAME"
// EnvVarVercel is the Vercel variable.
EnvVarVercel = "VERCEL"
)

const (
// FaaS environment variable names

// EnvVarAWSRegion is the AWS region variable.
EnvVarAWSRegion = "AWS_REGION"
// EnvVarAWSLambdaFunctionMemorySize is the AWS Lambda function memory size variable.
EnvVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
// EnvVarFunctionMemoryMB is the function memory in megabytes variable.
EnvVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
// EnvVarFunctionTimeoutSec is the function timeout in seconds variable.
EnvVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
// EnvVarFunctionRegion is the function region variable.
EnvVarFunctionRegion = "FUNCTION_REGION"
// EnvVarVercelRegion is the Vercel region variable.
EnvVarVercelRegion = "VERCEL_REGION"
)

const (
// FaaS environment names used by the client

// EnvNameAWSLambda is the AWS Lambda environment name.
EnvNameAWSLambda = "aws.lambda"
// EnvNameAzureFunc is the Azure Function environment name.
EnvNameAzureFunc = "azure.func"
// EnvNameGCPFunc is the Google Cloud Function environment name.
EnvNameGCPFunc = "gcp.func"
// EnvNameVercel is the Vercel environment name.
EnvNameVercel = "vercel"
)

// GetFaasEnvName parses the FaaS environment variable name and returns the
// corresponding name used by the client. If none of the variables or variables
// for multiple names are populated the client.env value MUST be entirely
// omitted. When variables for multiple "client.env.name" values are present,
// "vercel" takes precedence over "aws.lambda"; any other combination MUST cause
// "client.env" to be entirely omitted.
func GetFaasEnvName() string {
envVars := []string{
EnvVarAWSExecutionEnv,
EnvVarAWSLambdaRuntimeAPI,
EnvVarFunctionsWorkerRuntime,
EnvVarKService,
EnvVarFunctionName,
EnvVarVercel,
}

// If none of the variables are populated the client.env value MUST be
// entirely omitted.
names := make(map[string]struct{})

for _, envVar := range envVars {
val := os.Getenv(envVar)
if val == "" {
continue
}

var name string

switch envVar {
case EnvVarAWSExecutionEnv:
if !strings.HasPrefix(val, AwsLambdaPrefix) {
continue
}

name = EnvNameAWSLambda
case EnvVarAWSLambdaRuntimeAPI:
name = EnvNameAWSLambda
case EnvVarFunctionsWorkerRuntime:
name = EnvNameAzureFunc
case EnvVarKService, EnvVarFunctionName:
name = EnvNameGCPFunc
case EnvVarVercel:
// "vercel" takes precedence over "aws.lambda".
delete(names, EnvNameAWSLambda)

name = EnvNameVercel
}

names[name] = struct{}{}
if len(names) > 1 {
// If multiple names are populated the client.env value
// MUST be entirely omitted.
names = nil

break
}
}

for name := range names {
return name
}

return ""
}
File renamed without changes.
25 changes: 20 additions & 5 deletions internal/test/faas/awslambda/mongodb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ const timeout = 60 * time.Second
// event durations, as well as the number of heartbeats, commands, and open
// conections.
type eventListener struct {
commandCount int
commandDuration int64
heartbeatCount int
heartbeatDuration int64
openConnections int
commandCount int
commandDuration int64
heartbeatAwaitedCount int
heartbeatCount int
heartbeatDuration int64
openConnections int
}

// commandMonitor initializes an event.CommandMonitor that will count the number
Expand Down Expand Up @@ -61,11 +62,19 @@ func (listener *eventListener) serverMonitor() *event.ServerMonitor {
succeeded := func(e *event.ServerHeartbeatSucceededEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

failed := func(e *event.ServerHeartbeatFailedEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

return &event.ServerMonitor{
Expand Down Expand Up @@ -150,6 +159,12 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
return gateway500(), fmt.Errorf("failed to delete: %w", err)
}

// Driver must switch to polling monitoring when running within a FaaS
// environment.
if listener.heartbeatAwaitedCount > 0 {
return gateway500(), fmt.Errorf("FaaS environment fialed to switch to polling")
}

var avgCmdDur float64
if count := listener.commandCount; count != 0 {
avgCmdDur = float64(listener.commandDuration) / float64(count)
Expand Down
103 changes: 43 additions & 60 deletions mongo/integration/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/driverutil"
"go.mongodb.org/mongo-driver/internal/handshake"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
Expand Down Expand Up @@ -51,31 +52,18 @@ func TestHandshakeProse(t *testing.T) {
return elems
}

const (
envVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
envVarAWSRegion = "AWS_REGION"
envVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
envVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
envVarKService = "K_SERVICE"
envVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
envVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
envVarFunctionRegion = "FUNCTION_REGION"
envVarVercel = "VERCEL"
envVarVercelRegion = "VERCEL_REGION"
)

// Reset the environment variables to avoid environment namespace
// collision.
t.Setenv(envVarAWSExecutionEnv, "")
t.Setenv(envVarFunctionsWorkerRuntime, "")
t.Setenv(envVarKService, "")
t.Setenv(envVarVercel, "")
t.Setenv(envVarAWSRegion, "")
t.Setenv(envVarAWSLambdaFunctionMemorySize, "")
t.Setenv(envVarFunctionMemoryMB, "")
t.Setenv(envVarFunctionTimeoutSec, "")
t.Setenv(envVarFunctionRegion, "")
t.Setenv(envVarVercelRegion, "")
t.Setenv(driverutil.EnvVarAWSExecutionEnv, "")
t.Setenv(driverutil.EnvVarFunctionsWorkerRuntime, "")
t.Setenv(driverutil.EnvVarKService, "")
t.Setenv(driverutil.EnvVarVercel, "")
t.Setenv(driverutil.EnvVarAWSRegion, "")
t.Setenv(driverutil.EnvVarAWSLambdaFunctionMemorySize, "")
t.Setenv(driverutil.EnvVarFunctionMemoryMB, "")
t.Setenv(driverutil.EnvVarFunctionTimeoutSec, "")
t.Setenv(driverutil.EnvVarFunctionRegion, "")
t.Setenv(driverutil.EnvVarVercelRegion, "")

for _, test := range []struct {
name string
Expand All @@ -85,9 +73,9 @@ func TestHandshakeProse(t *testing.T) {
{
name: "1. valid AWS",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: "us-east-2",
envVarAWSLambdaFunctionMemorySize: "1024",
driverutil.EnvVarAWSExecutionEnv: "AWS_Lambda_java8",
driverutil.EnvVarAWSRegion: "us-east-2",
driverutil.EnvVarAWSLambdaFunctionMemorySize: "1024",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -98,7 +86,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "2. valid Azure",
env: map[string]string{
envVarFunctionsWorkerRuntime: "node",
driverutil.EnvVarFunctionsWorkerRuntime: "node",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "azure.func"},
Expand All @@ -107,10 +95,10 @@ func TestHandshakeProse(t *testing.T) {
{
name: "3. valid GCP",
env: map[string]string{
envVarKService: "servicename",
envVarFunctionMemoryMB: "1024",
envVarFunctionTimeoutSec: "60",
envVarFunctionRegion: "us-central1",
driverutil.EnvVarKService: "servicename",
driverutil.EnvVarFunctionMemoryMB: "1024",
driverutil.EnvVarFunctionTimeoutSec: "60",
driverutil.EnvVarFunctionRegion: "us-central1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "gcp.func"},
Expand All @@ -122,8 +110,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "4. valid Vercel",
env: map[string]string{
envVarVercel: "1",
envVarVercelRegion: "cdg1",
driverutil.EnvVarVercel: "1",
driverutil.EnvVarVercelRegion: "cdg1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "vercel"},
Expand All @@ -133,16 +121,16 @@ func TestHandshakeProse(t *testing.T) {
{
name: "5. invalid multiple providers",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarFunctionsWorkerRuntime: "node",
driverutil.EnvVarAWSExecutionEnv: "AWS_Lambda_java8",
driverutil.EnvVarFunctionsWorkerRuntime: "node",
},
want: clientMetadata(nil),
},
{
name: "6. invalid long string",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: func() string {
driverutil.EnvVarAWSExecutionEnv: "AWS_Lambda_java8",
driverutil.EnvVarAWSRegion: func() string {
var s string
for i := 0; i < 512; i++ {
s += "a"
Expand All @@ -157,8 +145,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "7. invalid wrong types",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSLambdaFunctionMemorySize: "big",
driverutil.EnvVarAWSExecutionEnv: "AWS_Lambda_java8",
driverutil.EnvVarAWSLambdaFunctionMemorySize: "big",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -167,7 +155,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "8. Invalid - AWS_EXECUTION_ENV does not start with \"AWS_Lambda_\"",
env: map[string]string{
envVarAWSExecutionEnv: "EC2",
driverutil.EnvVarAWSExecutionEnv: "EC2",
},
want: clientMetadata(nil),
},
Expand All @@ -184,32 +172,27 @@ func TestHandshakeProse(t *testing.T) {
require.NoError(mt, err, "Ping error: %v", err)

messages := mt.GetProxiedMessages()
handshakeMessage := messages[:1][0]

// First two messages are handshake messages
for idx, pair := range messages[:2] {
hello := handshake.LegacyHello
// Expect "hello" command name with API version.
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}

assert.Equal(mt, pair.CommandName, hello, "expected and actual command name at index %d are different", idx)
hello := handshake.LegacyHello
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}

sent := pair.Sent
assert.Equal(mt, hello, handshakeMessage.CommandName)

// Lookup the "client" field in the command document.
clientVal, err := sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s at index %d to contain client field", sent.Command, idx)
// Lookup the "client" field in the command document.
clientVal, err := handshakeMessage.Sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s to contain client field", handshakeMessage.Sent.Command)

got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)
got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)

wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)
wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)

want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
}
want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
})
}
}
2 changes: 2 additions & 0 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
clientOpts.SetTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "serverselectiontimeoutms":
clientOpts.SetServerSelectionTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "servermonitoringmode":
clientOpts.SetServerMonitoringMode(value.(string))
default:
return fmt.Errorf("unrecognized URI option %s", key)
}
Expand Down
Loading
Loading