Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Nov 15, 2023
1 parent ef6679f commit 78e4d2c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 93 deletions.
6 changes: 3 additions & 3 deletions mongo/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ var (
// TODO(GODRIVER-2943): Fix and unskip this test case.
"Topology lifecycle": "Test times out. See GODRIVER-2943",

// "Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
// "Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
// "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
}

logMessageValidatorTimeout = 10 * time.Millisecond
Expand Down
13 changes: 5 additions & 8 deletions x/mongo/driver/topology/CMAP_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,15 +523,12 @@ func runOperationInThread(t *testing.T, operation map[string]interface{}, testIn
}
return c.Close()
case "clear":
var fns []func()
{
needInterruption, ok := operation["interruptInUseConnections"].(bool)
if ok && needInterruption {
fns = append(fns, s.pool.interruptInUseConnections)
}
needInterruption, ok := operation["interruptInUseConnections"].(bool)
if ok && needInterruption {
s.pool.clearAll(fmt.Errorf("spec test clear"), nil)
} else {
s.pool.clear(fmt.Errorf("spec test clear"), nil)
}

s.pool.clear(fmt.Errorf("spec test clear"), nil, fns...)
case "close":
s.pool.close(context.Background())
case "ready":
Expand Down
178 changes: 104 additions & 74 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,35 +762,52 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro
// checkIn returns an idle connection to the pool. If the connection is perished or the pool is
// closed, it is removed from the connection pool and closed.
func (p *pool) checkIn(conn *connection) error {
if conn == nil {
return nil
}
if conn.pool != p {
return ErrWrongPool
}
return p.checkInWithCallback(conn, func() (reason, bool) {
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
}

if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
}

logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
}

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCheckedIn,
ConnectionID: conn.driverConnectionID,
Address: conn.addr.String(),
})
}
if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCheckedIn,
ConnectionID: conn.driverConnectionID,
Address: conn.addr.String(),
})
}

return p.checkInNoEvent(conn)
r, perished := connectionPerished(conn)
if !perished && conn.pool.getState() == poolClosed {
perished = true
r = reason{
loggerConn: logger.ReasonConnClosedPoolClosed,
event: event.ReasonPoolClosed,
}
}
return r, perished
})
}

// checkInNoEvent returns a connection to the pool. It behaves identically to checkIn except it does
// not publish events. It is only intended for use by pool-internal functions.
func (p *pool) checkInNoEvent(conn *connection) error {
return p.checkInWithCallback(conn, func() (reason, bool) {
r, perished := connectionPerished(conn)
if !perished && conn.pool.getState() == poolClosed {
perished = true
r = reason{
loggerConn: logger.ReasonConnClosedPoolClosed,
event: event.ReasonPoolClosed,
}
}
return r, perished
})
}

func (p *pool) checkInWithCallback(conn *connection, cb func() (reason, bool)) error {
if conn == nil {
return nil
}
Expand All @@ -808,20 +825,13 @@ func (p *pool) checkInNoEvent(conn *connection) error {
// connection should never be perished due to max idle time.
conn.bumpIdleDeadline()

if reason, perished := connectionPerished(conn); perished {
_ = p.removeConnection(conn, reason, nil)
go func() {
_ = p.closeConnection(conn)
}()
return nil
var r reason
var perished bool
if cb != nil {
r, perished = cb()
}

if conn.pool.getState() == poolClosed {
_ = p.removeConnection(conn, reason{
loggerConn: logger.ReasonConnClosedPoolClosed,
event: event.ReasonPoolClosed,
}, nil)

if perished {
_ = p.removeConnection(conn, r, nil)
go func() {
_ = p.closeConnection(conn)
}()
Expand Down Expand Up @@ -851,28 +861,43 @@ func (p *pool) checkInNoEvent(conn *connection) error {
return nil
}

func (p *pool) interruptInUseConnections() {
// clearAll does same as the "clear" method and interrupts all in-use connections as well.
func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) {
if done := p.clearImpl(err, serviceID, true); !done {
return
}
for _, conn := range p.conns {
if conn.inUse && p.stale(conn) {
_ = p.removeConnection(conn, reason{
loggerConn: logger.ReasonConnClosedStale,
event: event.ReasonStale,
}, nil)
go func(conn *connection) {
if conn.pool != p {
return
_ = conn.closeWithErr(poolClearedError{
err: fmt.Errorf("interrupted"),
address: p.address,
})
_ = p.checkInWithCallback(conn, func() (reason, bool) {
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
}

logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
}

if atomic.LoadInt64(&conn.state) == connConnected {
conn.closeConnectContext()
conn.wait() // Make sure that the connection has finished connecting.
if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCheckedIn,
ConnectionID: conn.driverConnectionID,
Address: conn.addr.String(),
})
}

_ = conn.closeWithErr(poolClearedError{
err: fmt.Errorf("interrupted"),
address: p.address,
})
}(conn)
r, ok := connectionPerished(conn)
if ok {
r = reason{
loggerConn: logger.ReasonConnClosedStale,
event: event.ReasonStale,
}
}
return r, ok
})
}
}
}
Expand All @@ -882,9 +907,13 @@ func (p *pool) interruptInUseConnections() {
// "paused". If serviceID is nil, clear marks all connections as stale. If serviceID is not nil,
// clear marks only connections associated with the given serviceID stale (for use in load balancer
// mode).
func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ...func()) {
func (p *pool) clear(err error, serviceID *primitive.ObjectID) {
p.clearImpl(err, serviceID, false)
}

func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, isInterruption bool) bool {
if p.getState() == poolClosed {
return
return false
}

p.generation.clear(serviceID)
Expand All @@ -906,7 +935,30 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ...
}
p.lastClearErr = err
p.stateMu.Unlock()
}

if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyServiceID, serviceID,
}

logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...)
}

if sendEvent && p.monitor != nil {
event := &event.PoolEvent{
Type: event.PoolCleared,
Address: p.address.String(),
ServiceID: serviceID,
Error: err,
}
if isInterruption {
event.Interruption = true
}
p.monitor.Event(event)
}

if serviceID == nil {
pcErr := poolClearedError{err: err, address: p.address}

// Clear the idle connections wait queue.
Expand Down Expand Up @@ -934,30 +986,8 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ...
p.createConnectionsCond.L.Unlock()
}

if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyServiceID, serviceID,
}

logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...)
}

if sendEvent && p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.PoolCleared,
Address: p.address.String(),
ServiceID: serviceID,
Error: err,
})
}

for _, fn := range cleaningupFns {
if fn != nil {
fn()
}
}

p.removePerishedConns()
return true
}

// getOrQueueForIdleConn attempts to deliver an idle connection to the given wantConn. If there is
Expand Down
13 changes: 6 additions & 7 deletions x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (s *Server) update() {
s.updateDescription(desc)
// Retry after the first timeout before clearing the pool in case of a FAAS pause as
// described in GODRIVER-2577.
if err := unwrapConnectionError(desc.LastError); err != nil && timeoutCnt < 0 {
if err := unwrapConnectionError(desc.LastError); err != nil && timeoutCnt < 1 {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
timeoutCnt++
// We want to immediately retry on timeout error. Continue to next loop.
Expand All @@ -640,12 +640,11 @@ func (s *Server) update() {
// Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear
// because the monitoring routine only runs for non-load balanced deployments in which servers don't return
// IDs.
g, _ := s.pool.generation.getGeneration(s.conn.desc.ServiceID)
s.conn.pool.createConnectionsCond.L.Lock()
_, ok := s.conn.pool.conns[s.conn.driverConnectionID]
s.conn.pool.createConnectionsCond.L.Unlock()
fmt.Println("clear", err, s.conn.closed(), ok, s.conn.generation, g)
s.pool.clear(err, nil, s.pool.interruptInUseConnections)
if timeoutCnt > 0 {
s.pool.clearAll(err, nil)
} else {
s.pool.clear(err, nil)
}
}
// We're either not handling a timeout error, or we just handled the 2nd consecutive
// timeout error. In either case, reset the timeout count to 0 and return false to
Expand Down
2 changes: 1 addition & 1 deletion x/mongo/driver/topology/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (d *timeoutDialer) DialContext(ctx context.Context, network, address string

// TestServerHeartbeatTimeout tests timeout retry for GODRIVER-2577.
func TestServerHeartbeatTimeout(t *testing.T) {
t.Skip("skipping for GODRIVER-2335")
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
Expand Down Expand Up @@ -458,6 +457,7 @@ func TestServer(t *testing.T) {
generation, _ := server.pool.generation.getGeneration(&serviceID)
assert.Eventuallyf(t,
func() bool {
generation, _ := server.pool.generation.getGeneration(&serviceID)
numConns := server.pool.generation.getNumConns(&serviceID)
return generation == wantGeneration && numConns == wantNumConns
},
Expand Down

0 comments on commit 78e4d2c

Please sign in to comment.