Skip to content

Commit

Permalink
GODRIVER-2810 Re-organize rtt monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed Sep 8, 2023
1 parent b8cba73 commit cfb1c66
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
4 changes: 4 additions & 0 deletions x/mongo/driver/topology/rtt_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type rttMonitor struct {
cfg *rttConfig
ctx context.Context
cancelFn context.CancelFunc
started bool
}

var _ driver.RTTMonitor = &rttMonitor{}
Expand All @@ -75,6 +76,7 @@ func newRTTMonitor(cfg *rttConfig) *rttMonitor {

func (r *rttMonitor) connect() {
r.closeWg.Add(1)
r.started = true
go r.start()
}

Expand All @@ -89,6 +91,8 @@ func (r *rttMonitor) start() {

var conn *connection
defer func() {
r.started = false

if conn != nil {
// If the connection exists, we need to wait for it to be connected because
// conn.connect() and conn.close() cannot be called concurrently. If the connection
Expand Down
43 changes: 22 additions & 21 deletions x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ func (s *Server) Connect(updateCallback updateTopologyCallback) error {
s.updateTopologyCallback.Store(updateCallback)

if !s.cfg.monitoringDisabled && !s.cfg.loadBalanced {
s.rttMonitor.connect()
s.closewg.Add(1)
go s.update()
}
Expand Down Expand Up @@ -655,12 +654,15 @@ func (s *Server) update() {
// If the server supports streaming or we're already streaming, we want to move to streaming the next response
// without waiting. If the server has transitioned to Unknown from a network error, we want to do another
// check without waiting in case it was a transient error and the server isn't actually down.
serverSupportsStreaming := desc.Kind != description.Unknown && desc.TopologyVersion != nil
connectionIsStreaming := s.conn != nil && s.conn.getCurrentlyStreaming()
transitionedFromNetworkError := desc.LastError != nil && unwrapConnectionError(desc.LastError) != nil &&
previousDescription.Kind != description.Unknown

if serverSupportsStreaming || connectionIsStreaming || transitionedFromNetworkError {
if isStreamingEnabled(s) && isStreamable(s) && !s.rttMonitor.started {
s.rttMonitor.connect()
}

if isStreamable(s) || connectionIsStreaming || transitionedFromNetworkError {
continue
}

Expand Down Expand Up @@ -796,23 +798,22 @@ func (s *Server) createBaseOperation(conn driver.Connection) *operation.Hello {
ServerAPI(s.cfg.serverAPI)
}

// isStreamable returns whether or not we can use the streaming protocol to
// optimally reduces the time it takes for a client to discover server state
// changes. Streaming must be disabled if any of the following are true:
//
// - the client is configured with serverMonitoringMode=poll [P], or
// - the client is configured with serverMonitoringMode=auto [A] and a FaaS
// platform is detected [F], or
// - the server does not support streaming (eg MongoDB <4.4) [S].
//
// I.e, streaming must be disabled if: P ∨ (A ∧ F) ∨ (~S) ≡ ~P ∧ (~A ∨ ~F) ∧ S
func isStreamable(previousDesc description.Server, srv *Server) bool {
srvMonitoringMode := srv.cfg.serverMonitoringMode
faas := driverutil.GetFaasEnvName()

return srvMonitoringMode != connstring.ServerMonitoringModePoll && // P
(srvMonitoringMode != connstring.ServerMonitoringModeAuto || faas == "") && // (~A ∨ ~F)
previousDesc.TopologyVersion != nil // S
func isStreamingEnabled(srv *Server) bool {
mode := srv.cfg.serverMonitoringMode

if mode == connstring.ServerMonitoringModeStream {
return true
}

if mode == connstring.ServerMonitoringModeAuto {
return driverutil.GetFaasEnvName() == ""
}

return false
}

func isStreamable(srv *Server) bool {
return srv.Description().Kind != description.Unknown && srv.Description().TopologyVersion != nil
}

func (s *Server) check() (description.Server, error) {
Expand Down Expand Up @@ -850,7 +851,7 @@ func (s *Server) check() (description.Server, error) {
heartbeatConn := initConnection{s.conn}
baseOperation := s.createBaseOperation(heartbeatConn)
previousDescription := s.Description()
streamable := isStreamable(previousDescription, s)
streamable := isStreamingEnabled(s) && isStreamable(s)

s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable)
switch {
Expand Down

0 comments on commit cfb1c66

Please sign in to comment.