-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
87 lines (77 loc) · 2.46 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"time"
"github.com/antongulenko/golib"
"github.com/bitflow-stream/go-bitflow/script/reg"
log "github.com/sirupsen/logrus"
)
const (
defaultExecutableName = "bitflow-pipeline"
managerNotificationTimeout = 2000 * time.Millisecond
)
func main() {
var executable, httpEndpoint, managerURL string
var agentTags golib.KeyValueStringSlice
flag.StringVar(&executable, "e", "", fmt.Sprintf("Name of the pipeline executable. By default, search $PATH for %v", defaultExecutableName))
flag.StringVar(&httpEndpoint, "h", ":8080", "HTTP endpoint for serving the REST API.")
flag.StringVar(&managerURL, "m", "", "After initializing the REST API, send a GET request with no further headers or content to the given URL.")
flag.Var(&agentTags, "tag", "Additional key=value pairs that will be served through GET /info.")
golib.RegisterLogFlags()
flag.Parse()
golib.ConfigureLogging()
var err error
if executable == "" {
executable, err = exec.LookPath(defaultExecutableName)
golib.Checkerr(err)
log.Println("Using executable:", executable)
}
engine := SubprocessEngine{
Executable: executable,
Tags: agentTags.Map(),
}
golib.Checkerr(engine.Run())
if managerURL != "" {
notifyManager(managerURL)
}
golib.Checkerr(engine.ServeHttp(httpEndpoint))
}
const CapabilitiesFlag = "-capabilities"
func LoadCapabilities(executable string) (obj reg.ProcessingSteps, err error) {
var output []byte
cmd := exec.Command(executable, CapabilitiesFlag)
cmd.Stderr = os.Stderr
output, err = cmd.Output()
if err == nil {
err = json.Unmarshal(output, &obj)
}
if err != nil {
err = fmt.Errorf("Error obtaining capabilities of %v: %v", executable, err)
}
return
}
func notifyManager(managerURL string) {
// Defer the management registration shortly, to first initialize the HTTP server below.
// Unfortunately there is no cleaner way to do this.
time.AfterFunc(200*time.Millisecond, func() {
log.Printf("Notifying the manager at %v...", managerURL)
client := http.Client{
Timeout: managerNotificationTimeout,
}
resp, err := client.Get(managerURL)
if err == nil && resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body) // Ignore the read error
err = fmt.Errorf("Received non-success response (status %v %v) from manager (URL: %v). Body:\n%v",
resp.StatusCode, resp.Status, managerURL, string(body))
}
if err != nil {
log.Fatalln(err)
}
})
}