diff --git a/Gopkg.lock b/Gopkg.lock index 7f12156..ec6395b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -4,7 +4,10 @@ [[projects]] branch = "master" name = "github.com/alecthomas/template" - packages = [".","parse"] + packages = [ + ".", + "parse" + ] revision = "a0175ee3bccc567396460bf5acd36800cb10c49c" [[projects]] @@ -19,11 +22,17 @@ packages = ["."] revision = "2b8494104d86337cdd41d0a49cbed8e4583c0ab4" +[[projects]] + name = "github.com/go-ini/ini" + packages = ["."] + revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e" + version = "v1.38.2" + [[projects]] name = "github.com/go-sql-driver/mysql" packages = ["."] - revision = "a0583e0143b1624142adab07e0e97fe106d99561" - version = "v1.3" + revision = "d523deb1b23d913de5bdada721a6071e71283618" + version = "v1.4.0" [[projects]] branch = "master" @@ -34,38 +43,41 @@ [[projects]] branch = "master" name = "github.com/gosuri/uiprogress" - packages = [".","util/strutil"] + packages = [ + ".", + "util/strutil" + ] revision = "d0567a9d84a1c40dd7568115ea66f4887bf57b33" [[projects]] - branch = "master" name = "github.com/hashicorp/go-version" packages = ["."] - revision = "4fe82ae3040f80a03d04d2cccb5606a626b8e1ee" + revision = "b5a281d3160aa11950a6182bd9a9dc2cb1e02d50" + version = "v1.0.0" [[projects]] branch = "master" name = "github.com/icrowley/fake" packages = ["."] - revision = "e64cc2cf92049a299f359734c6ea76073f2a8b2c" + revision = "4178557ae428460c3780a381c824a1f3aceb6325" [[projects]] - branch = "master" name = "github.com/kr/pretty" packages = ["."] - revision = "cfb55aafdaf3ec08f0db22699ab822c50091b1c4" + revision = "73f6ac0b30a98e433b289500d779f50c1a6f0712" + version = "v0.1.0" [[projects]] - branch = "master" name = "github.com/kr/text" packages = ["."] - revision = "7cafcd837844e784b526369c9bce262804aebc60" + revision = "e2ffdb16a802fe2bb95e2e35ff34f0e53aeef34f" + version = "v0.1.0" [[projects]] name = "github.com/mattn/go-isatty" packages = ["."] - revision = "0360b2af4f38e8d38c7fce2a9f4e702702d73a39" - version = "v0.0.3" + revision = "6ca4dbf54d38eea1a992b3c722a76a5d1c4cb25c" + version = "v0.0.4" [[projects]] name = "github.com/pkg/errors" @@ -76,20 +88,29 @@ [[projects]] name = "github.com/sirupsen/logrus" packages = ["."] - revision = "d682213848ed68c0a260ca37d6dd5ace8423f5ba" - version = "v1.0.4" + revision = "3e01752db0189b9157070a0e1668a620f9a85da2" + version = "v1.0.6" [[projects]] branch = "master" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - revision = "a6600008915114d9c087fad9f03d75087b1a74df" + revision = "0709b304e793a5edb4a2c0145f281ecdc20838a4" [[projects]] branch = "master" name = "golang.org/x/sys" - packages = ["unix","windows"] - revision = "2c42eef0765b9837fbdab12011af7830f55f88f0" + packages = [ + "unix", + "windows" + ] + revision = "d0be0721c37eeb5299f245a996a483160fc36940" + +[[projects]] + name = "google.golang.org/appengine" + packages = ["cloudsql"] + revision = "b1f26356af11148e710935ed1ac8a7f5702c7612" + version = "v1.1.0" [[projects]] name = "gopkg.in/alecthomas/kingpin.v2" @@ -100,6 +121,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "1c77a165ee7d064291072d0a898cff06c81a888b7099ed92542977a812e8dbca" + inputs-digest = "2c2fd1aa913bfca3c44d61530b6d12610bc4aef0ff9ced3b763ae0150237dd71" solver-name = "gps-cdcl" solver-version = 1 diff --git a/README.md b/README.md index 51d5b49..e517d32 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,10 @@ https://github.com/Percona-Lab/mysql_random_data_load/releases ## Version history +#### 0.1.9 +- Added support for bunary and varbinary columns +- By default, read connection params from ${HOME}/.my.cnf + #### 0.1.8 - Fixed error for triggers created with MySQL 5.6 - Added Travis-CI diff --git a/main.go b/main.go index acdfc0e..35fc749 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "net/url" "os" + "os/user" "runtime" "strings" "sync" @@ -12,6 +13,7 @@ import ( "github.com/Percona-Lab/mysql_random_data_load/internal/getters" "github.com/Percona-Lab/mysql_random_data_load/tableparser" + "github.com/go-ini/ini" "github.com/go-sql-driver/mysql" "github.com/gosuri/uiprogress" "github.com/kr/pretty" @@ -20,26 +22,40 @@ import ( kingpin "gopkg.in/alecthomas/kingpin.v2" ) +type cliOptions struct { + app *kingpin.Application + + // Arguments + Schema *string + TableName *string + Rows *int + // Flags + BulkSize *int + ConfigFile *string + Debug *bool + Factor *float64 + Host *string + MaxRetries *int + MaxThreads *int + NoProgress *bool + Pass *string + Port *int + Print *bool + Samples *int64 + User *string + Version *bool +} + +type mysqlOptions struct { + Host string + Password string + Port int + Sock string + User string +} + var ( - app = kingpin.New("mysql_random_data_loader", "MySQL Random Data Loader") - - bulkSize = app.Flag("bulk-size", "Number of rows per insert statement").Default("1000").Int() - debug = app.Flag("debug", "Log debugging information").Bool() - factor = app.Flag("fk-samples-factor", "Percentage used to get random samples for foreign keys fields").Default("0.3").Float64() - host = app.Flag("host", "Host name/IP").Short('h').Default("127.0.0.1").String() - maxRetries = app.Flag("max-retries", "Number of rows to insert").Default("100").Int() - maxThreads = app.Flag("max-threads", "Maximum number of threads to run inserts").Default("1").Int() - noProgress = app.Flag("no-progress", "Show progress bar").Default("false").Bool() - pass = app.Flag("password", "Password").Short('p').String() - port = app.Flag("port", "Port").Short('P').Default("3306").Int() - print = app.Flag("print", "Print queries to the standard output instead of inserting them into the db").Bool() - samples = app.Flag("max-fk-samples", "Maximum number of samples for foreign keys fields").Default("100").Int64() - user = app.Flag("user", "User").Short('u').String() - version = app.Flag("version", "Show version and exit").Bool() - - schema = app.Arg("database", "Database").Required().String() - tableName = app.Arg("table", "Table").Required().String() - rows = app.Arg("rows", "Number of rows to insert").Required().Int() + opts *cliOptions validFunctions = []string{"int", "string", "date", "date_in_range"} maxValues = map[string]int64{ @@ -69,10 +85,19 @@ type getter interface { type insertValues []getter type insertFunction func(*sql.DB, string, chan int, chan bool, *sync.WaitGroup) +const ( + defaultMySQLConfigSection = "client" + defaultConfigFile = "~/.my.cnf" +) + func main() { - _, err := app.Parse(os.Args[1:]) - if *version { + opts, err := processCliParams() + if err != nil { + log.Fatal(err.Error()) + } + + if *opts.Version { fmt.Printf("Version : %s\n", Version) fmt.Printf("Commit : %s\n", Commit) fmt.Printf("Branch : %s\n", Branch) @@ -80,23 +105,19 @@ func main() { fmt.Printf("Go version: %s\n", GoVersion) return } - if err != nil { - log.Errorln(err) - os.Exit(1) - } - address := *host + address := *opts.Host net := "unix" if address != "localhost" { net = "tcp" } - if *port != 0 { - address = fmt.Sprintf("%s:%d", address, *port) + if *opts.Port != 0 { + address = fmt.Sprintf("%s:%d", address, *opts.Port) } dsn := mysql.Config{ - User: *user, - Passwd: *pass, + User: *opts.User, + Passwd: *opts.Pass, Addr: address, Net: net, DBName: "", @@ -116,41 +137,41 @@ func main() { os.Exit(1) } - table, err := tableparser.NewTable(db, *schema, *tableName) + table, err := tableparser.NewTable(db, *opts.Schema, *opts.TableName) if err != nil { - log.Printf("cannot get table %s struct: %s", *tableName, err) + log.Printf("cannot get table %s struct: %s", *opts.TableName, err) db.Close() os.Exit(1) } log.SetFormatter(&log.TextFormatter{FullTimestamp: true}) - if *debug { + if *opts.Debug { log.SetLevel(log.DebugLevel) - *noProgress = true + *opts.NoProgress = true } log.Debug(pretty.Sprint(table)) if len(table.Triggers) > 0 { - log.Warnf("There are triggers on the %s table that might affect this process:", *tableName) + log.Warnf("There are triggers on the %s table that might affect this process:", *opts.TableName) for _, t := range table.Triggers { log.Warnf("Trigger %q, %s %s", t.Trigger, t.Timing, t.Event) log.Warnf("Statement: %s", t.Statement) } } - if *bulkSize > *rows { - *bulkSize = *rows + if *opts.BulkSize > *opts.Rows { + *opts.BulkSize = *opts.Rows } - if maxThreads == nil { - *maxThreads = runtime.NumCPU() * 10 + if opts.MaxThreads == nil { + *opts.MaxThreads = runtime.NumCPU() * 10 } - if *maxThreads < 1 { - *maxThreads = 1 + if *opts.MaxThreads < 1 { + *opts.MaxThreads = 1 } - if !*print { + if !*opts.Print { log.Info("Starting") } @@ -162,37 +183,37 @@ func main() { // And then, we need to run this insert once to complete 11 rows // INSERT INTO table (f1, f2) VALUES (?, ?), (?, ?), (?, ?) newLineOnEachRow := false - count := *rows / *bulkSize - remainder := *rows - count**bulkSize - semaphores := makeSemaphores(*maxThreads) + count := *opts.Rows / *opts.BulkSize + remainder := *opts.Rows - count**opts.BulkSize + semaphores := makeSemaphores(*opts.MaxThreads) rowValues := makeValueFuncs(db, table.Fields) - log.Debugf("Must run %d bulk inserts having %d rows each", count, *bulkSize) + log.Debugf("Must run %d bulk inserts having %d rows each", count, *opts.BulkSize) runInsertFunc := runInsert - if *print { - *maxThreads = 1 - *noProgress = true + if *opts.Print { + *opts.MaxThreads = 1 + *opts.NoProgress = true newLineOnEachRow = true runInsertFunc = func(db *sql.DB, insertQuery string, resultsChan chan int, sem chan bool, wg *sync.WaitGroup) { fmt.Println(insertQuery) - resultsChan <- *bulkSize + resultsChan <- *opts.BulkSize sem <- true wg.Done() } } - bar := uiprogress.AddBar(*rows).AppendCompleted().PrependElapsed() - if !*noProgress { + bar := uiprogress.AddBar(*opts.Rows).AppendCompleted().PrependElapsed() + if !*opts.NoProgress { uiprogress.Start() } - okCount, err := run(db, table, bar, semaphores, rowValues, count, *bulkSize, runInsertFunc, newLineOnEachRow) + okCount, err := run(db, table, bar, semaphores, rowValues, count, *opts.BulkSize, runInsertFunc, newLineOnEachRow) if err != nil { log.Errorln(err) } var okrCount, okiCount int // remainder & individual inserts OK count if remainder > 0 { - log.Debugf("Must run 1 extra bulk insert having %d rows, to complete %d rows", remainder, *rows) + log.Debugf("Must run 1 extra bulk insert having %d rows, to complete %d rows", remainder, *opts.Rows) okrCount, err = run(db, table, bar, semaphores, rowValues, 1, remainder, runInsertFunc, newLineOnEachRow) if err != nil { log.Errorln(err) @@ -203,11 +224,11 @@ func main() { // retry adding individual rows (no bulk inserts) totalOkCount := okCount + okrCount retries := 0 - if totalOkCount < *rows { - log.Debugf("Running extra %d individual inserts (duplicated keys?)", *rows-totalOkCount) + if totalOkCount < *opts.Rows { + log.Debugf("Running extra %d individual inserts (duplicated keys?)", *opts.Rows-totalOkCount) } - for totalOkCount < *rows && retries < *maxRetries { - okiCount, err = run(db, table, bar, semaphores, rowValues, *rows-totalOkCount, 1, runInsertFunc, newLineOnEachRow) + for totalOkCount < *opts.Rows && retries < *opts.MaxRetries { + okiCount, err = run(db, table, bar, semaphores, rowValues, *opts.Rows-totalOkCount, 1, runInsertFunc, newLineOnEachRow) if err != nil { log.Errorf("Cannot run extra insert: %s", err) } @@ -217,7 +238,7 @@ func main() { } time.Sleep(500 * time.Millisecond) // Let the progress bar to update - if !*print { + if !*opts.Print { log.Printf("%d rows inserted", totalOkCount) } db.Close() @@ -359,8 +380,7 @@ func runInsert(db *sql.DB, insertQuery string, resultsChan chan int, sem chan bo func makeValueFuncs(conn *sql.DB, fields []tableparser.Field) insertValues { var values []getter for _, field := range fields { - if !field.IsNullable && field.ColumnKey == "PRI" && - strings.Contains(field.Extra, "auto_increment") { + if !field.IsNullable && field.ColumnKey == "PRI" && strings.Contains(field.Extra, "auto_increment") { continue } if field.Constraint != nil { @@ -385,7 +405,7 @@ func makeValueFuncs(conn *sql.DB, fields []tableparser.Field) insertValues { case "float", "decimal", "double": values = append(values, getters.NewRandomDecimal(field.ColumnName, field.NumericPrecision.Int64-field.NumericScale.Int64, field.IsNullable)) - case "char", "varchar", "varbinary": + case "char", "varchar": values = append(values, getters.NewRandomString(field.ColumnName, field.CharacterMaximumLength.Int64, field.IsNullable)) case "date": @@ -402,6 +422,8 @@ func makeValueFuncs(conn *sql.DB, fields []tableparser.Field) insertValues { int64(time.Now().Year()), field.IsNullable)) case "enum", "set": values = append(values, getters.NewRandomEnum(field.SetEnumVals, field.IsNullable)) + case "binary", "varbinary": + values = append(values, getters.NewRandomBinary(field.ColumnName, field.CharacterMaximumLength.Int64, field.IsNullable)) default: log.Printf("cannot get field type: %s: %s\n", field.ColumnName, field.DataType) } @@ -458,11 +480,15 @@ func getSamples(conn *sql.DB, schema, table, field string, samples int64, dataTy var v int64 err = rows.Scan(&v) val = v - case "char", "varchar", "varbinary", "tinyblob", "tinytext", "blob", "text", - "mediumtext", "mediumblob", "longblob", "longtext": + case "char", "varchar", "blob", "text", "mediumtext", + "mediumblob", "longblob", "longtext": var v string err = rows.Scan(&v) val = v + case "binary", "varbinary": + var v []rune + err = rows.Scan(&v) + val = v case "float", "decimal", "double": var v float64 err = rows.Scan(&v) @@ -516,6 +542,7 @@ func isSupportedType(fieldType string) bool { "mediumtext": true, "longblob": true, "longtext": true, + "binary": true, "varbinary": true, "enum": true, "set": true, @@ -523,3 +550,88 @@ func isSupportedType(fieldType string) bool { _, ok := supportedTypes[fieldType] return ok } + +func processCliParams() (*cliOptions, error) { + app := kingpin.New("mysql_random_data_loader", "MySQL Random Data Loader") + + opts := &cliOptions{ + app: app, + BulkSize: app.Flag("bulk-size", "Number of rows per insert statement").Default("1000").Int(), + ConfigFile: app.Flag("config-file", "MySQL config file").Default(defaultConfigFile).String(), + Debug: app.Flag("debug", "Log debugging information").Bool(), + Factor: app.Flag("fk-samples-factor", "Percentage used to get random samples for foreign keys fields").Default("0.3").Float64(), + Host: app.Flag("host", "Host name/IP").Short('h').Default("127.0.0.1").String(), + MaxRetries: app.Flag("max-retries", "Number of rows to insert").Default("100").Int(), + MaxThreads: app.Flag("max-threads", "Maximum number of threads to run inserts").Default("1").Int(), + NoProgress: app.Flag("no-progress", "Show progress bar").Default("false").Bool(), + Pass: app.Flag("password", "Password").Short('p').String(), + Port: app.Flag("port", "Port").Short('P').Default("3306").Int(), + Print: app.Flag("print", "Print queries to the standard output instead of inserting them into the db").Bool(), + Samples: app.Flag("max-fk-samples", "Maximum number of samples for foreign keys fields").Default("100").Int64(), + User: app.Flag("user", "User").Short('u').String(), + Version: app.Flag("version", "Show version and exit").Bool(), + + Schema: app.Arg("database", "Database").Required().String(), + TableName: app.Arg("table", "Table").Required().String(), + Rows: app.Arg("rows", "Number of rows to insert").Required().Int(), + } + _, err := app.Parse(os.Args[1:]) + + if err != nil { + return nil, err + } + + if mysqlOpts, err := readMySQLConfigFile(*opts.ConfigFile); err == nil { + checkMySQLParams(opts, mysqlOpts) + } + + return opts, nil +} + +func checkMySQLParams(opts *cliOptions, mysqlOpts *mysqlOptions) { + if *opts.Host == "" && mysqlOpts.Host != "" { + *opts.Host = mysqlOpts.Host + } + + if *opts.Port == 0 && mysqlOpts.Port != 0 { + *opts.Port = mysqlOpts.Port + } + + if *opts.User == "" && mysqlOpts.User != "" { + *opts.User = mysqlOpts.User + } + + if *opts.Pass == "" && mysqlOpts.Password != "" { + *opts.Pass = mysqlOpts.Password + } +} + +func readMySQLConfigFile(filename string) (*mysqlOptions, error) { + cfg, err := ini.Load(expandHomeDir(filename)) + if err != nil { + return nil, err + } + + section := cfg.Section(defaultMySQLConfigSection) + port, _ := section.Key("port").Int() + + mysqlOpts := &mysqlOptions{ + Host: section.Key("host").String(), + Port: port, + User: section.Key("user").String(), + Password: section.Key("password").String(), + } + + return mysqlOpts, nil +} + +func expandHomeDir(dir string) string { + if !strings.HasPrefix(dir, "~") { + return dir + } + u, err := user.Current() + if err != nil { + return dir + } + return u.HomeDir + strings.TrimPrefix(dir, "~") +} diff --git a/testutils/testutils.go b/testutils/testutils.go index f8bd88b..554a256 100644 --- a/testutils/testutils.go +++ b/testutils/testutils.go @@ -1,6 +1,7 @@ package testutils import ( + "bufio" "database/sql" "encoding/json" "fmt" @@ -111,6 +112,47 @@ func GetMinorVersion(tb testing.TB, db *sql.DB) *version.Version { return v } +func LoadFile(tb testing.TB, filename string) []string { + file := filepath.Join("testdata", filename) + fh, err := os.Open(file) + lines := []string{} + reader := bufio.NewReader(fh) + + line, err := reader.ReadString('\n') + for err == nil { + lines = append(lines, strings.TrimRight(line, "\n")) + line, err = reader.ReadString('\n') + } + return lines +} + +func UpdateSampleFile(tb testing.TB, filename string, lines []string) { + if us, _ := strconv.ParseBool(os.Getenv("UPDATE_SAMPLES")); !us { + return + } + WriteFile(tb, filename, lines) +} + +func UpdateSampleJSON(tb testing.TB, filename string, data interface{}) { + if us, _ := strconv.ParseBool(os.Getenv("UPDATE_SAMPLES")); !us { + return + } + WriteJson(tb, filename, data) +} + +func WriteFile(tb testing.TB, filename string, lines []string) { + file := filepath.Join("testdata", filename) + ofh, err := os.Create(file) + if err != nil { + fmt.Printf("%s cannot load json file %q: %s\n\n", caller(), file, err) + tb.FailNow() + } + for _, line := range lines { + ofh.WriteString(line + "\n") + } + ofh.Close() +} + func LoadQueriesFromFile(tb testing.TB, filename string) { conn := GetMySQLConnection(tb) file := filepath.Join("testdata", filename)