From 78e4d2c86ae17d03b80f7bd0cab74a624d5b5f8d Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Wed, 15 Nov 2023 12:57:47 -0500 Subject: [PATCH] WIP --- .../unified/unified_spec_runner.go | 6 +- x/mongo/driver/topology/CMAP_spec_test.go | 13 +- x/mongo/driver/topology/pool.go | 178 ++++++++++-------- x/mongo/driver/topology/server.go | 13 +- x/mongo/driver/topology/server_test.go | 2 +- 5 files changed, 119 insertions(+), 93 deletions(-) diff --git a/mongo/integration/unified/unified_spec_runner.go b/mongo/integration/unified/unified_spec_runner.go index d532b8ccae..f97e807717 100644 --- a/mongo/integration/unified/unified_spec_runner.go +++ b/mongo/integration/unified/unified_spec_runner.go @@ -33,9 +33,9 @@ var ( // TODO(GODRIVER-2943): Fix and unskip this test case. "Topology lifecycle": "Test times out. See GODRIVER-2943", - // "Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout", - // "Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout", - // "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout", + "Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout", + "Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout", + "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout", } logMessageValidatorTimeout = 10 * time.Millisecond diff --git a/x/mongo/driver/topology/CMAP_spec_test.go b/x/mongo/driver/topology/CMAP_spec_test.go index 445556c63b..5f0e1c2eef 100644 --- a/x/mongo/driver/topology/CMAP_spec_test.go +++ b/x/mongo/driver/topology/CMAP_spec_test.go @@ -523,15 +523,12 @@ func runOperationInThread(t *testing.T, operation map[string]interface{}, testIn } return c.Close() case "clear": - var fns []func() - { - needInterruption, ok := operation["interruptInUseConnections"].(bool) - if ok && needInterruption { - fns = append(fns, s.pool.interruptInUseConnections) - } + needInterruption, ok := operation["interruptInUseConnections"].(bool) + if ok && needInterruption { + s.pool.clearAll(fmt.Errorf("spec test clear"), nil) + } else { + s.pool.clear(fmt.Errorf("spec test clear"), nil) } - - s.pool.clear(fmt.Errorf("spec test clear"), nil, fns...) case "close": s.pool.close(context.Background()) case "ready": diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index c05c9c4f41..ff886c326b 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -762,35 +762,52 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro // checkIn returns an idle connection to the pool. If the connection is perished or the pool is // closed, it is removed from the connection pool and closed. func (p *pool) checkIn(conn *connection) error { - if conn == nil { - return nil - } - if conn.pool != p { - return ErrWrongPool - } + return p.checkInWithCallback(conn, func() (reason, bool) { + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.driverConnectionID, + } - if mustLogPoolMessage(p) { - keysAndValues := logger.KeyValues{ - logger.KeyDriverConnectionID, conn.driverConnectionID, + logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...) } - logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...) - } - - if p.monitor != nil { - p.monitor.Event(&event.PoolEvent{ - Type: event.ConnectionCheckedIn, - ConnectionID: conn.driverConnectionID, - Address: conn.addr.String(), - }) - } + if p.monitor != nil { + p.monitor.Event(&event.PoolEvent{ + Type: event.ConnectionCheckedIn, + ConnectionID: conn.driverConnectionID, + Address: conn.addr.String(), + }) + } - return p.checkInNoEvent(conn) + r, perished := connectionPerished(conn) + if !perished && conn.pool.getState() == poolClosed { + perished = true + r = reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + } + } + return r, perished + }) } // checkInNoEvent returns a connection to the pool. It behaves identically to checkIn except it does // not publish events. It is only intended for use by pool-internal functions. func (p *pool) checkInNoEvent(conn *connection) error { + return p.checkInWithCallback(conn, func() (reason, bool) { + r, perished := connectionPerished(conn) + if !perished && conn.pool.getState() == poolClosed { + perished = true + r = reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + } + } + return r, perished + }) +} + +func (p *pool) checkInWithCallback(conn *connection, cb func() (reason, bool)) error { if conn == nil { return nil } @@ -808,20 +825,13 @@ func (p *pool) checkInNoEvent(conn *connection) error { // connection should never be perished due to max idle time. conn.bumpIdleDeadline() - if reason, perished := connectionPerished(conn); perished { - _ = p.removeConnection(conn, reason, nil) - go func() { - _ = p.closeConnection(conn) - }() - return nil + var r reason + var perished bool + if cb != nil { + r, perished = cb() } - - if conn.pool.getState() == poolClosed { - _ = p.removeConnection(conn, reason{ - loggerConn: logger.ReasonConnClosedPoolClosed, - event: event.ReasonPoolClosed, - }, nil) - + if perished { + _ = p.removeConnection(conn, r, nil) go func() { _ = p.closeConnection(conn) }() @@ -851,28 +861,43 @@ func (p *pool) checkInNoEvent(conn *connection) error { return nil } -func (p *pool) interruptInUseConnections() { +// clearAll does same as the "clear" method and interrupts all in-use connections as well. +func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) { + if done := p.clearImpl(err, serviceID, true); !done { + return + } for _, conn := range p.conns { if conn.inUse && p.stale(conn) { - _ = p.removeConnection(conn, reason{ - loggerConn: logger.ReasonConnClosedStale, - event: event.ReasonStale, - }, nil) - go func(conn *connection) { - if conn.pool != p { - return + _ = conn.closeWithErr(poolClearedError{ + err: fmt.Errorf("interrupted"), + address: p.address, + }) + _ = p.checkInWithCallback(conn, func() (reason, bool) { + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.driverConnectionID, + } + + logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...) } - if atomic.LoadInt64(&conn.state) == connConnected { - conn.closeConnectContext() - conn.wait() // Make sure that the connection has finished connecting. + if p.monitor != nil { + p.monitor.Event(&event.PoolEvent{ + Type: event.ConnectionCheckedIn, + ConnectionID: conn.driverConnectionID, + Address: conn.addr.String(), + }) } - _ = conn.closeWithErr(poolClearedError{ - err: fmt.Errorf("interrupted"), - address: p.address, - }) - }(conn) + r, ok := connectionPerished(conn) + if ok { + r = reason{ + loggerConn: logger.ReasonConnClosedStale, + event: event.ReasonStale, + } + } + return r, ok + }) } } } @@ -882,9 +907,13 @@ func (p *pool) interruptInUseConnections() { // "paused". If serviceID is nil, clear marks all connections as stale. If serviceID is not nil, // clear marks only connections associated with the given serviceID stale (for use in load balancer // mode). -func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ...func()) { +func (p *pool) clear(err error, serviceID *primitive.ObjectID) { + p.clearImpl(err, serviceID, false) +} + +func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, isInterruption bool) bool { if p.getState() == poolClosed { - return + return false } p.generation.clear(serviceID) @@ -906,7 +935,30 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ... } p.lastClearErr = err p.stateMu.Unlock() + } + + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyServiceID, serviceID, + } + + logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...) + } + if sendEvent && p.monitor != nil { + event := &event.PoolEvent{ + Type: event.PoolCleared, + Address: p.address.String(), + ServiceID: serviceID, + Error: err, + } + if isInterruption { + event.Interruption = true + } + p.monitor.Event(event) + } + + if serviceID == nil { pcErr := poolClearedError{err: err, address: p.address} // Clear the idle connections wait queue. @@ -934,30 +986,8 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ... p.createConnectionsCond.L.Unlock() } - if mustLogPoolMessage(p) { - keysAndValues := logger.KeyValues{ - logger.KeyServiceID, serviceID, - } - - logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...) - } - - if sendEvent && p.monitor != nil { - p.monitor.Event(&event.PoolEvent{ - Type: event.PoolCleared, - Address: p.address.String(), - ServiceID: serviceID, - Error: err, - }) - } - - for _, fn := range cleaningupFns { - if fn != nil { - fn() - } - } - p.removePerishedConns() + return true } // getOrQueueForIdleConn attempts to deliver an idle connection to the given wantConn. If there is diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index c109ea10bc..98e5b6d5ee 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -624,7 +624,7 @@ func (s *Server) update() { s.updateDescription(desc) // Retry after the first timeout before clearing the pool in case of a FAAS pause as // described in GODRIVER-2577. - if err := unwrapConnectionError(desc.LastError); err != nil && timeoutCnt < 0 { + if err := unwrapConnectionError(desc.LastError); err != nil && timeoutCnt < 1 { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { timeoutCnt++ // We want to immediately retry on timeout error. Continue to next loop. @@ -640,12 +640,11 @@ func (s *Server) update() { // Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear // because the monitoring routine only runs for non-load balanced deployments in which servers don't return // IDs. - g, _ := s.pool.generation.getGeneration(s.conn.desc.ServiceID) - s.conn.pool.createConnectionsCond.L.Lock() - _, ok := s.conn.pool.conns[s.conn.driverConnectionID] - s.conn.pool.createConnectionsCond.L.Unlock() - fmt.Println("clear", err, s.conn.closed(), ok, s.conn.generation, g) - s.pool.clear(err, nil, s.pool.interruptInUseConnections) + if timeoutCnt > 0 { + s.pool.clearAll(err, nil) + } else { + s.pool.clear(err, nil) + } } // We're either not handling a timeout error, or we just handled the 2nd consecutive // timeout error. In either case, reset the timeout count to 0 and return false to diff --git a/x/mongo/driver/topology/server_test.go b/x/mongo/driver/topology/server_test.go index 8566cf02d8..79c4c67137 100644 --- a/x/mongo/driver/topology/server_test.go +++ b/x/mongo/driver/topology/server_test.go @@ -128,7 +128,6 @@ func (d *timeoutDialer) DialContext(ctx context.Context, network, address string // TestServerHeartbeatTimeout tests timeout retry for GODRIVER-2577. func TestServerHeartbeatTimeout(t *testing.T) { - t.Skip("skipping for GODRIVER-2335") if testing.Short() { t.Skip("skipping integration test in short mode") } @@ -458,6 +457,7 @@ func TestServer(t *testing.T) { generation, _ := server.pool.generation.getGeneration(&serviceID) assert.Eventuallyf(t, func() bool { + generation, _ := server.pool.generation.getGeneration(&serviceID) numConns := server.pool.generation.getNumConns(&serviceID) return generation == wantGeneration && numConns == wantNumConns },