From 497e4eb6488a40ca36f78c796270ae21c2bf45c7 Mon Sep 17 00:00:00 2001 From: Adam Bouhenguel Date: Sat, 23 Jul 2016 21:36:07 -0700 Subject: [PATCH] First cut of `qa auto` --- Gimmefile | 1 + src/qa/cmd/auto/auto.go | 140 ++++++++++++++++++++++++++++++++ src/qa/fileevents/watchman.go | 149 ++++++++++++++++++++++++++++++++++ 3 files changed, 290 insertions(+) create mode 100644 src/qa/cmd/auto/auto.go create mode 100644 src/qa/fileevents/watchman.go diff --git a/Gimmefile b/Gimmefile index 73e0dce..1c8eca9 100644 --- a/Gimmefile +++ b/Gimmefile @@ -7,6 +7,7 @@ "vendor/go-bindata:go-bindata", // For testing. + "vendor/watchman:watchman", "vendor/ruby:ruby", "vendor/minitest:minitest", "vendor/test-unit:test-unit", diff --git a/src/qa/cmd/auto/auto.go b/src/qa/cmd/auto/auto.go new file mode 100644 index 0000000..7c5aa1a --- /dev/null +++ b/src/qa/cmd/auto/auto.go @@ -0,0 +1,140 @@ +package auto + +import ( + "flag" + "fmt" + "os" + "os/signal" + "path/filepath" + "qa/cmd" + "qa/cmd/run" + "qa/fileevents" + "qa/runner" + "syscall" +) + +// When watchman emits an event, execute a qa run for that test +// Expect: +// { +// "version": "1.6", +// "subscribe": "mysubscriptionname" +// } +type eventFileLister fileevents.Event + +func (s eventFileLister) Dir() string { + return s.Root +} + +func (s eventFileLister) Patterns() []string { + var names []string + for _, file := range s.Files { + names = append(names, file.Name) + } + return names +} + +func (s eventFileLister) ListFiles() ([]string, error) { + return s.Patterns(), nil +} + +func pruneRunEnvForFiles( + runEnv *run.Env, + runnerConfig runner.Config, + event *fileevents.Event) *run.Env { + + pruned := *runEnv + runnerConfig.FileLister = eventFileLister(*event) + pruned.RunnerConfigs = []runner.Config{ + runnerConfig, + } + + return &pruned +} + +func subscribeToRunnerConfigFiles(watcher fileevents.Watcher, runnerConfig runner.Config) (*fileevents.Subscription, error) { + dir, err := filepath.Abs(runnerConfig.FileLister.Dir()) + if err != nil { + return nil, err + } + + dir, err = filepath.EvalSymlinks(dir) + if err != nil { + return nil, err + } + + expr := map[string](interface{}){ + "expression": []string{"match", runnerConfig.FileLister.Patterns()[0], "wholename"}, + "fields": []string{"name", "new", "exists"}, + "defer_vcs": true, + } + + return watcher.Subscribe(dir, "tests", expr) +} + +func Main(env *cmd.Env, args []string) error { + watcher, err := fileevents.StartWatchman("/tmp/watchman") + if err != nil { + return err + } + defer watcher.Close() + + flags := flag.NewFlagSet("auto", flag.ContinueOnError) + + runFlags := run.DefineFlags(flags) + err = flags.Parse(args) + if err != nil { + return err + } + + runEnv, err := runFlags.NewEnv(env, flags.Args()) + if err != nil { + return err + } + + srv := runEnv.Server + defer srv.Close() + go srv.Run() + + runEnvChan := make(chan *run.Env) + + // Handle common process-killing signals so we can gracefully shut down: + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, os.Interrupt, os.Kill, syscall.SIGTERM) + go func(c chan os.Signal) { + // Wait for signal + sig, ok := <-c + if ok { + fmt.Fprintln(env.Stderr, "Got signal:", sig) + srv.Close() + watcher.Close() + close(runEnvChan) + } + }(sigc) + defer signal.Stop(sigc) + defer close(sigc) + + for _, runnerConfig := range runEnv.RunnerConfigs { + sub, err := subscribeToRunnerConfigFiles(watcher, runnerConfig) + if err != nil { + return nil + } + defer sub.Close() + + go func(sub *fileevents.Subscription, runnerConfig runner.Config) { + for event := range sub.Events { + runEnvChan <- pruneRunEnvForFiles(runEnv, runnerConfig, event) + } + }(sub, runnerConfig) + } + + // Figure out which file changed. If file that changed matches the existing glob, + // start running that test. + for runEnv := range runEnvChan { + _, err := run.Run(runEnv) + if err != nil { + return err + } + } + + return nil +} diff --git a/src/qa/fileevents/watchman.go b/src/qa/fileevents/watchman.go new file mode 100644 index 0000000..22ddabc --- /dev/null +++ b/src/qa/fileevents/watchman.go @@ -0,0 +1,149 @@ +package fileevents + +import ( + "encoding/json" + "fmt" + "io" + "net" + "os" + "os/exec" + "syscall" + "time" +) + +type Event struct { + Version string `json:"version"` + Clock string `json:"clock"` + Files []File `json:"files"` + Root string `json:"root"` + Subscription string `json:"subscription"` +} + +type File struct { + Name string `json:"name"` + New bool `json:"new"` + Exists bool `json:"exists"` +} + +type Subscription struct { + Events chan *Event + closer io.Closer +} + +func (s *Subscription) Close() error { + return s.closer.Close() +} + +type Watcher interface { + Subscribe(root string, name string, expr interface{}) (*Subscription, error) + Close() error +} + +func tolerantDial(sockname string) (net.Conn, error) { + for { + conn, err := net.Dial("unix", sockname) + if err == nil { + return conn, err + } + + opErr, ok := err.(*net.OpError) + if !ok { + return conn, err + } + + syscallError, ok := opErr.Err.(*os.SyscallError) + if !ok { + return conn, err + } + + switch syscallError.Err { + case syscall.ECONNREFUSED: + time.Sleep(50 * time.Millisecond) + continue + case syscall.ENOENT: + time.Sleep(50 * time.Millisecond) + continue + case syscall.ENODATA: + time.Sleep(50 * time.Millisecond) + continue + } + + fmt.Fprintf(os.Stderr, "Saw error %#v\n", syscallError) + return conn, err + } +} + +func StartWatchman(sockname string) (Watcher, error) { + cmd := exec.Command( + "watchman", + "--no-save-state", + "--foreground", + "--sockname", sockname, + "--logfile", "/dev/stdout") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Start() + if err != nil { + return nil, err + } + + return &WatchmanService{process: cmd.Process, sockname: sockname}, nil +} + +type WatchmanService struct { + process *os.Process + sockname string +} + +func (w *WatchmanService) Close() error { + err := w.process.Kill() + if err != nil { + _, err = w.process.Wait() + } + + return err +} + +func (w *WatchmanService) Subscribe(root string, name string, expr interface{}) (*Subscription, error) { + // Open unix socket and keep it open for the future + conn, err := tolerantDial(w.sockname) + if err != nil { + return nil, err + } + + encoder := json.NewEncoder(conn) + message := []interface{}{"subscribe", root, name, expr} + err = encoder.Encode(message) + if err != nil { + return nil, err + } + + decoder := json.NewDecoder(conn) + // Expect response: + // { + // "version": "1.6", + // "subscribe": "mysubscriptionname" + // } + var response interface{} + err = decoder.Decode(&response) + if err != nil { + return nil, err + } + + c := make(chan *Event, 1) + go func() { + defer close(c) + defer conn.Close() + + for { + var event Event + err := decoder.Decode(&event) + if err != nil { + break + } + c <- &event + } + }() + + return &Subscription{Events: c, closer: conn}, nil +}