Skip to content

Commit

Permalink
Gracefully shutdown VTGate instances (#14219)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpiegza authored Oct 23, 2023
1 parent 7b49ddb commit ad26d15
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 8 deletions.
9 changes: 4 additions & 5 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,10 @@ func (c *Conn) IsMarkedForClose() bool {
return c.closing
}

func (c *Conn) IsShuttingDown() bool {
return c.listener.shutdown.Load()
}

// ConnCheck ensures that this connection to the MySQL server hasn't been broken.
// This is a fast, non-blocking check. For details on its implementation, please read
// "Three Bugs in the Go MySQL Driver" (Vicent Marti, GitHub, 2020)
Expand Down Expand Up @@ -1745,8 +1749,3 @@ func (c *Conn) ConnCheck() error {
}
return nil
}

// GetTestConn returns a conn for testing purpose only.
func GetTestConn() *Conn {
return newConn(testConn{})
}
11 changes: 11 additions & 0 deletions go/mysql/conn_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,14 @@ func (m mockAddress) String() string {
}

var _ net.Addr = (*mockAddress)(nil)

// GetTestConn returns a conn for testing purpose only.
func GetTestConn() *Conn {
return newConn(testConn{})
}

// GetTestServerConn is only meant to be used for testing.
// It creates a server connection using a testConn and the provided listener.
func GetTestServerConn(listener *Listener) *Conn {
return newServerConn(testConn{}, listener)
}
11 changes: 8 additions & 3 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ func startSpan(ctx context.Context, query, label string) (trace.Span, context.Co
}

func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
session := vh.session(c)
if c.IsShuttingDown() && !session.InTransaction {
c.MarkForClose()
return sqlerror.NewSQLError(sqlerror.ERServerShutdown, sqlerror.SSNetError, "Server shutdown in progress")
}

ctx, cancel := context.WithCancel(context.Background())
c.UpdateCancelCtx(cancel)

Expand Down Expand Up @@ -229,7 +235,6 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session := vh.session(c)
if !session.InTransaction {
vh.busyConnections.Add(1)
}
Expand Down Expand Up @@ -614,11 +619,11 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys

func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
if srv.tcpListener != nil {
srv.tcpListener.Close()
srv.tcpListener.Shutdown()
srv.tcpListener = nil
}
if srv.unixListener != nil {
srv.unixListener.Close()
srv.unixListener.Shutdown()
srv.unixListener = nil
}
if srv.sigChan != nil {
Expand Down
77 changes: 77 additions & 0 deletions go/vt/vtgate/plugin_mysql_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,80 @@ func TestKillMethods(t *testing.T) {
require.EqualError(t, cancelCtx.Err(), "context canceled")
require.True(t, mysqlConn.IsMarkedForClose())
}

func TestGracefulShutdown(t *testing.T) {
executor, _, _, _, _ := createExecutorEnv(t)

vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected})
th := &testHandler{}
listener, err := mysql.NewListener("tcp", "127.0.0.1:", mysql.NewAuthServerNone(), th, 0, 0, false, false, 0)
require.NoError(t, err)
defer listener.Close()

// add a connection
mysqlConn := mysql.GetTestServerConn(listener)
mysqlConn.ConnectionID = 1
mysqlConn.UserData = &mysql.StaticUserData{}
vh.connections[1] = mysqlConn

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

listener.Shutdown()

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)")

require.True(t, mysqlConn.IsMarkedForClose())
}

func TestGracefulShutdownWithTransaction(t *testing.T) {
executor, _, _, _, _ := createExecutorEnv(t)

vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected})
th := &testHandler{}
listener, err := mysql.NewListener("tcp", "127.0.0.1:", mysql.NewAuthServerNone(), th, 0, 0, false, false, 0)
require.NoError(t, err)
defer listener.Close()

// add a connection
mysqlConn := mysql.GetTestServerConn(listener)
mysqlConn.ConnectionID = 1
mysqlConn.UserData = &mysql.StaticUserData{}
vh.connections[1] = mysqlConn

err = vh.ComQuery(mysqlConn, "BEGIN", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

listener.Shutdown()

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

err = vh.ComQuery(mysqlConn, "COMMIT", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

require.False(t, mysqlConn.IsMarkedForClose())

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)")

require.True(t, mysqlConn.IsMarkedForClose())
}

0 comments on commit ad26d15

Please sign in to comment.