Skip to content

Commit

Permalink
Merge pull request #1719 from actiontech/fixissue-1718
Browse files Browse the repository at this point in the history
fix-issue1718
  • Loading branch information
ColdWaterLW authored Aug 2, 2023
2 parents baa698a + dcae3d9 commit 098203b
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 180 deletions.
14 changes: 2 additions & 12 deletions sqle/driver/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
"github.com/actiontech/sqle/sqle/log"
"github.com/actiontech/sqle/sqle/pkg/params"
"github.com/actiontech/sqle/sqle/utils"
"github.com/pingcap/parser/ast"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -226,17 +225,8 @@ func (i *MysqlDriverImpl) KillProcess(ctx context.Context) error {
}
defer killConn.Db.Close()
killSQL := fmt.Sprintf("KILL %v", connID)
killFunc := func() error {
_, err := killConn.Db.Exec(killSQL)
return err
}
err = utils.AsyncCallTimeout(ctx, killFunc)
if err != nil {
err = fmt.Errorf("exec sql(%v) failed, err: %v", killSQL, err)
return err
}
logEntry.Infof("exec sql(%v) successfully", killSQL)
return nil
err = util.KillProcess(ctx, killSQL, killConn, logEntry)
return err
}

func (i *MysqlDriverImpl) query(ctx context.Context, query string, args ...interface{}) ([]map[string]sql.NullString, error) {
Expand Down
16 changes: 16 additions & 0 deletions sqle/driver/mysql/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"strings"

"github.com/actiontech/sqle/sqle/driver/mysql/executor"
"github.com/actiontech/sqle/sqle/utils"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/format"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/sirupsen/logrus"
)

var ErrUnsupportedSqlType = errors.New("unsupported sql type")
Expand Down Expand Up @@ -215,3 +217,17 @@ func checkSql(affectRowSql string) error {

return nil
}

func KillProcess(ctx context.Context, killSQL string, killConn *executor.Executor, logEntry *logrus.Entry) error {
killFunc := func() error {
_, err := killConn.Db.Exec(killSQL)
return err
}
err := utils.AsyncCallTimeout(ctx, killFunc)
if err != nil {
err = fmt.Errorf("exec sql(%v) failed, err: %v", killSQL, err)
return err
}
logEntry.Infof("exec sql(%v) successfully", killSQL)
return nil
}
16 changes: 14 additions & 2 deletions sqle/driver/plugin_adapter_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
sqlDriver "database/sql/driver"
"fmt"
"sync"
"errors"

driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
protoV2 "github.com/actiontech/sqle/sqle/driver/v2/proto"
"github.com/actiontech/sqle/sqle/errors"
"github.com/actiontech/sqle/sqle/log"
"github.com/actiontech/sqle/sqle/pkg/params"

Expand Down Expand Up @@ -174,7 +174,19 @@ func (s *PluginImplV2) Close(ctx context.Context) {
}

func (s *PluginImplV2) KillProcess(ctx context.Context) error {
return errors.NewNotImplementedError("KillProcess not support yet")
api := "Kill Process"
s.preLog(api)
rs, err := s.client.KillProcess(ctx, &protoV2.KillProcessRequest{
Session: s.Session,
})
s.afterLog(api, err)
if err != nil {
return err
}
if rs.ErrMessage != "" {
return errors.New(rs.ErrMessage)
}
return nil
}

// audit
Expand Down
14 changes: 14 additions & 0 deletions sqle/driver/v2/driver_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,17 @@ func (d *DriverGrpcServer) EstimateSQLAffectRows(ctx context.Context, req *proto
ErrMessage: ar.ErrMessage,
}, nil
}

func (d *DriverGrpcServer) KillProcess(ctx context.Context, req *protoV2.KillProcessRequest) (*protoV2.KillProcessResponse, error) {
driver, err := d.getDriverBySession(req.Session)
if err != nil {
return &protoV2.KillProcessResponse{}, err
}
info, err := driver.KillProcess(ctx)
if err != nil {
return &protoV2.KillProcessResponse{}, err
}
return &protoV2.KillProcessResponse{
ErrMessage: info.ErrMessage,
}, nil
}
5 changes: 5 additions & 0 deletions sqle/driver/v2/driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Driver interface {
GetTableMeta(ctx context.Context, table *Table) (*TableMeta, error)
ExtractTableFromSQL(ctx context.Context, sql string) ([]*Table, error)
EstimateSQLAffectRows(ctx context.Context, sql string) (*EstimatedAffectRows, error)
KillProcess(ctx context.Context) (*KillProcessInfo, error)
}

type Node struct {
Expand Down Expand Up @@ -286,3 +287,7 @@ type EstimatedAffectRows struct {
Count int64
ErrMessage string
}

type KillProcessInfo struct {
ErrMessage string
}
Loading

0 comments on commit 098203b

Please sign in to comment.