diff --git a/x/mongo/driver/topology/rtt_monitor.go b/x/mongo/driver/topology/rtt_monitor.go index 998d2a0253..1a1c37b296 100644 --- a/x/mongo/driver/topology/rtt_monitor.go +++ b/x/mongo/driver/topology/rtt_monitor.go @@ -51,6 +51,7 @@ type rttMonitor struct { cfg *rttConfig ctx context.Context cancelFn context.CancelFunc + started bool } var _ driver.RTTMonitor = &rttMonitor{} @@ -75,6 +76,7 @@ func newRTTMonitor(cfg *rttConfig) *rttMonitor { func (r *rttMonitor) connect() { r.closeWg.Add(1) + r.started = true go r.start() } @@ -89,6 +91,8 @@ func (r *rttMonitor) start() { var conn *connection defer func() { + r.started = false + if conn != nil { // If the connection exists, we need to wait for it to be connected because // conn.connect() and conn.close() cannot be called concurrently. If the connection diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index 0c9144f5f4..e88cff8084 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -246,7 +246,6 @@ func (s *Server) Connect(updateCallback updateTopologyCallback) error { s.updateTopologyCallback.Store(updateCallback) if !s.cfg.monitoringDisabled && !s.cfg.loadBalanced { - s.rttMonitor.connect() s.closewg.Add(1) go s.update() } @@ -655,12 +654,15 @@ func (s *Server) update() { // If the server supports streaming or we're already streaming, we want to move to streaming the next response // without waiting. If the server has transitioned to Unknown from a network error, we want to do another // check without waiting in case it was a transient error and the server isn't actually down. - serverSupportsStreaming := desc.Kind != description.Unknown && desc.TopologyVersion != nil connectionIsStreaming := s.conn != nil && s.conn.getCurrentlyStreaming() transitionedFromNetworkError := desc.LastError != nil && unwrapConnectionError(desc.LastError) != nil && previousDescription.Kind != description.Unknown - if serverSupportsStreaming || connectionIsStreaming || transitionedFromNetworkError { + if isStreamingEnabled(s) && isStreamable(s) && !s.rttMonitor.started { + s.rttMonitor.connect() + } + + if isStreamable(s) || connectionIsStreaming || transitionedFromNetworkError { continue } @@ -796,23 +798,22 @@ func (s *Server) createBaseOperation(conn driver.Connection) *operation.Hello { ServerAPI(s.cfg.serverAPI) } -// isStreamable returns whether or not we can use the streaming protocol to -// optimally reduces the time it takes for a client to discover server state -// changes. Streaming must be disabled if any of the following are true: -// -// - the client is configured with serverMonitoringMode=poll [P], or -// - the client is configured with serverMonitoringMode=auto [A] and a FaaS -// platform is detected [F], or -// - the server does not support streaming (eg MongoDB <4.4) [S]. -// -// I.e, streaming must be disabled if: P ∨ (A ∧ F) ∨ (~S) ≡ ~P ∧ (~A ∨ ~F) ∧ S -func isStreamable(previousDesc description.Server, srv *Server) bool { - srvMonitoringMode := srv.cfg.serverMonitoringMode - faas := driverutil.GetFaasEnvName() - - return srvMonitoringMode != connstring.ServerMonitoringModePoll && // P - (srvMonitoringMode != connstring.ServerMonitoringModeAuto || faas == "") && // (~A ∨ ~F) - previousDesc.TopologyVersion != nil // S +func isStreamingEnabled(srv *Server) bool { + mode := srv.cfg.serverMonitoringMode + + if mode == connstring.ServerMonitoringModeStream { + return true + } + + if mode == connstring.ServerMonitoringModeAuto { + return driverutil.GetFaasEnvName() == "" + } + + return false +} + +func isStreamable(srv *Server) bool { + return srv.Description().Kind != description.Unknown && srv.Description().TopologyVersion != nil } func (s *Server) check() (description.Server, error) { @@ -850,7 +851,7 @@ func (s *Server) check() (description.Server, error) { heartbeatConn := initConnection{s.conn} baseOperation := s.createBaseOperation(heartbeatConn) previousDescription := s.Description() - streamable := isStreamable(previousDescription, s) + streamable := isStreamingEnabled(s) && isStreamable(s) s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable) switch {