Skip to content

Commit

Permalink
[live] add multiple schemas and Encoding/StypeIn args
Browse files Browse the repository at this point in the history
  • Loading branch information
neomantra committed Jun 3, 2024
1 parent 2fe46f1 commit a7563c2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# CHANGELOG

## v0.0.11 (unreleased)
## v0.0.11 (2024-06-03)

* Expand `dbn-go-hist` tool and add `tests/exercise_dbn-go-hist.sh` example.
* `dbn-go-live` supports multiple schemas and more args
* Add the rest of the Historical Metadata API
* Add custom slog Logger to `LiveClient` and cleanup logging
* Updated for DBN `0.18.0` API changes
Expand Down
50 changes: 36 additions & 14 deletions cmd/dbn-go-live/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ type Config struct {
OutFilename string
ApiKey string
Dataset string
Schema string
STypeIn dbn.SType
Encoding dbn.Encoding
Schemas []string
Symbols []string
StartTime time.Time
Verbose bool
Expand All @@ -34,14 +36,17 @@ type Config struct {
///////////////////////////////////////////////////////////////////////////////

func main() {
var err error
var config Config
var startTimeArg string
var encodingArg, startTimeArg, stypeInArg string
var showHelp bool

pflag.StringVarP(&config.Dataset, "dataset", "d", "", "Dataset to subscribe to ")
pflag.StringVarP(&config.Schema, "schema", "s", "", "Schema to subscribe to")
pflag.StringArrayVarP(&config.Schemas, "schema", "s", []string{}, "Schema to subscribe to (multiple allowed)")
pflag.StringVarP(&config.ApiKey, "key", "k", "", "Databento API key (or set 'DATABENTO_API_KEY' envvar)")
pflag.StringVarP(&config.OutFilename, "out", "o", "", "Output filename for DBN stream ('-' for stdout)")
pflag.StringVarP(&stypeInArg, "stype", "i", "raw", "SType of the symbols")
pflag.StringVarP(&encodingArg, "encoding", "e", "dbn", "Encoding of the output")
pflag.StringVarP(&startTimeArg, "start", "t", "", "Start time to request as ISO 8601 format (default: now)")
pflag.BoolVarP(&config.Verbose, "verbose", "v", false, "Verbose logging")
pflag.BoolVarP(&showHelp, "help", "h", false, "Show help")
Expand All @@ -56,7 +61,6 @@ func main() {
}

if startTimeArg != "" {
var err error
config.StartTime, err = iso8601.ParseString(startTimeArg)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to parse --start as ISO 8601 time: %s\n", err.Error())
Expand All @@ -69,13 +73,29 @@ func main() {
requireValOrExit(config.ApiKey, "missing DataBento API key, use --key or set DATABENTO_API_KEY envvar\n")
}

if len(config.Schemas) == 0 {
fmt.Fprintf(os.Stderr, "requires at least --schema argument\n")
os.Exit(1)
}

if len(config.Symbols) == 0 {
fmt.Fprintf(os.Stderr, "requires at least one symbol argument\n")
os.Exit(1)
}

config.Encoding, err = dbn.EncodingFromString(encodingArg)
if err != nil {
fmt.Fprintf(os.Stderr, "argument --encoding '%s' is unknown\n", encodingArg)
os.Exit(1)
}

config.STypeIn, err = dbn.STypeFromString(stypeInArg)
if err != nil {
fmt.Fprintf(os.Stderr, "argument --stype '%s' is unknown\n", stypeInArg)
os.Exit(1)
}

requireValOrExit(config.Dataset, "missing required --dataset")
requireValOrExit(config.Schema, "missing required --schema")
requireValOrExit(config.OutFilename, "missing required --out")

if err := run(config); err != nil {
Expand Down Expand Up @@ -121,15 +141,17 @@ func run(config Config) error {
}

// Subscribe
subRequest := dbn_live.SubscriptionRequestMsg{
Schema: config.Schema,
StypeIn: dbn.SType_RawSymbol,
Symbols: config.Symbols,
Start: config.StartTime,
Snapshot: false,
}
if err = client.Subscribe(subRequest); err != nil {
return fmt.Errorf("failed to subscribe LiveClient: %w", err)
for _, schema := range config.Schemas {
subRequest := dbn_live.SubscriptionRequestMsg{
Schema: schema,
StypeIn: config.STypeIn,
Symbols: config.Symbols,
Start: config.StartTime,
Snapshot: false,
}
if err = client.Subscribe(subRequest); err != nil {
return fmt.Errorf("failed to subscribe LiveClient: %w", err)
}
}

// Start session
Expand Down
3 changes: 2 additions & 1 deletion live/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type LiveConfig struct {
Logger *slog.Logger
ApiKey string
Dataset string
Encoding dbn.Encoding // nil mean Encoding_Dbn
SendTsOut bool
VersionUpgradePolicy dbn.VersionUpgradePolicy
Verbose bool
Expand Down Expand Up @@ -264,7 +265,7 @@ func (c *LiveClient) Authenticate(apiKey string) (string, error) {
request := AuthenticationRequestMsg{
Auth: auth,
Dataset: c.config.Dataset,
Encoding: dbn.Encoding_Dbn,
Encoding: c.config.Encoding,
TsOut: c.config.SendTsOut,
Client: "Go " + DATABENTO_VERSION,
}
Expand Down

0 comments on commit a7563c2

Please sign in to comment.