Skip to content

Commit

Permalink
feat(yqlclient): add YQL client
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jul 20, 2023
1 parent 809f10c commit 328edc6
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 0 deletions.
17 changes: 17 additions & 0 deletions internal/yqlclient/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package yqlclient

import (
"fmt"

"github.com/go-faster/oteldb/internal/yqlclient/ytqueryapi"
)

// Error is a wrapper for API error.
type Error struct {
Err ytqueryapi.Error
}

// Error implements error.
func (e *Error) Error() string {
return fmt.Sprintf("code %d: %s", e.Err.Code, e.Err.Message)
}
50 changes: 50 additions & 0 deletions internal/yqlclient/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package yqlclient

import (
"context"
"io"

"github.com/go-faster/errors"
"go.ytsaurus.tech/yt/go/yson"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/yqlclient/ytqueryapi"
)

// ReadResult reads result of given query ID.
func ReadResult[T any](ctx context.Context, c *Client, queryID ytqueryapi.QueryID) (iterators.Iterator[T], error) {
data, err := c.client.ReadQueryResult(ctx, ytqueryapi.ReadQueryResultParams{
QueryID: queryID,
OutputFormat: ytqueryapi.OutputFormatYson,
})
if err != nil {
return nil, errors.Wrap(err, "read query result")
}

return &resultIterator[T]{dec: yson.NewDecoder(data)}, nil
}

type resultIterator[T any] struct {
dec *yson.Decoder
err error
}

func (i *resultIterator[T]) Next(t *T) bool {
switch err := i.dec.Decode(t); err {
case io.EOF:
return false
case nil:
return true
default:
i.err = err
return false
}
}

func (i *resultIterator[T]) Err() error {
return i.err
}

func (i *resultIterator[T]) Close() error {
return nil
}
199 changes: 199 additions & 0 deletions internal/yqlclient/yqlclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Package yqlclient provides YTSaurus YQL client.
package yqlclient

import (
"context"
"net/http"
"time"

"github.com/go-faster/errors"
"github.com/ogen-go/ogen/ogenerrors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/yqlclient/ytqueryapi"
)

// Client is a YQL client.
type Client struct {
client *ytqueryapi.Client
tracer trace.Tracer
}

// ClientOptions is a Client creation options.
type ClientOptions struct {
// Token to use. If empty, authentication would not be used.
Token string

// Client to use. Defaults to http.DefaultClient.
Client *http.Client

// TracerProvider is a tracer provider. Defaults to otel.GetTracerProvider.
TracerProvider trace.TracerProvider
// MeterProvider is a meter provider. Defaults to otel.GetMeterProvider.
MeterProvider metric.MeterProvider
}

func (opts *ClientOptions) setDefaults() {
if opts.Client == nil {
opts.Client = http.DefaultClient
}
if opts.TracerProvider == nil {
opts.TracerProvider = otel.GetTracerProvider()
}
}

type securitySource struct {
Token string
}

func (s *securitySource) YTToken(context.Context, string) (t ytqueryapi.YTToken, _ error) {
if s.Token == "" {
return t, ogenerrors.ErrSkipClientSecurity
}
t.APIKey = s.Token
return t, nil
}

// NewClient creates new Client.
func NewClient(proxyURL string, opts ClientOptions) (*Client, error) {
opts.setDefaults()

client, err := ytqueryapi.NewClient(
proxyURL,
&securitySource{Token: opts.Token},
ytqueryapi.WithClient(opts.Client),
ytqueryapi.WithTracerProvider(opts.TracerProvider),
ytqueryapi.WithMeterProvider(opts.MeterProvider),
)
if err != nil {
return nil, errors.Wrap(err, "create ogen client")
}

return &Client{
client: client,
tracer: otel.Tracer("yqlclient"),
}, nil
}

// RawClient returns raw client.
func (c *Client) RawClient() *ytqueryapi.Client {
return c.client
}

// ExecuteQueryParams sets ExecuteQuery parameters.
type ExecuteQueryParams struct {
// PollInterval is a query result polling interval. Defaults to 1s.
PollInterval time.Duration

// AbortTimeout sets timeout for aborting query. Defaults to 10s.
AbortTimeout time.Duration

// Engine to run query. Defaults to YQL.
Engine ytqueryapi.Engine
}

func (p *ExecuteQueryParams) setDefaults() {
if p.PollInterval == 0 {
p.PollInterval = time.Second
}
if p.AbortTimeout == 0 {
p.AbortTimeout = 10 * time.Second
}
if p.Engine == "" {
p.Engine = ytqueryapi.EngineYql
}
}

func (c *Client) abortQuery(queryID ytqueryapi.QueryID, timeout time.Duration) error {
abortCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := c.client.AbortQuery(abortCtx, ytqueryapi.AbortQueryParams{QueryID: queryID}); err != nil {
return errors.Wrapf(err, "abort query %s", queryID)
}
return nil
}

// ExecuteQuery starts query and waits for query completion.
//
// Caller may abort the query by canceling the context.
func (c *Client) ExecuteQuery(ctx context.Context, q string, params ExecuteQueryParams) (queryID ytqueryapi.QueryID, rerr error) {
params.setDefaults()

ctx, span := c.tracer.Start(ctx, "ExecuteQuery", trace.WithAttributes(
attribute.String("engine", string(params.Engine)),
))
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()

started, err := c.client.StartQuery(ctx, ytqueryapi.StartQueryParams{
Query: q,
Engine: params.Engine,
})
if err != nil {
return queryID, errors.Wrap(err, "start query")
}
queryID = started.QueryID
span.SetAttributes(attribute.String("yt.query_id", string(queryID)))
span.AddEvent("QueryStarted")

t := time.NewTicker(params.PollInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
span.AddEvent("QueryCanceled")
return queryID, multierr.Append(
ctx.Err(),
c.abortQuery(queryID, params.AbortTimeout),
)
case <-t.C:
status, err := c.client.GetQuery(ctx, ytqueryapi.GetQueryParams{QueryID: queryID})
if err != nil {
return queryID, errors.Wrapf(err, "get query %s status", queryID)
}

switch status.State {
case ytqueryapi.OperationStateAborted:
span.AddEvent("QueryAborted")
return queryID, errors.Wrapf(err, "query %s aborted", queryID)
case ytqueryapi.OperationStateFailed:
span.AddEvent("QueryFailed", trace.WithAttributes(
attribute.Int("yt.error_code", status.Error.Code),
attribute.String("yt.error_message", status.Error.Message),
))
return queryID, &Error{Err: status.Error}
case ytqueryapi.OperationStateCompleted:
span.AddEvent("QueryCompleted", trace.WithAttributes(
attribute.Int("yt.result_count", status.ResultCount.Or(0)),
))
return queryID, nil
}
}
}
}

// YQLQuery makes an YQL query.
func YQLQuery[T any](ctx context.Context, c *Client, q string) (iterators.Iterator[T], error) {
queryID, err := c.ExecuteQuery(ctx, q, ExecuteQueryParams{
Engine: ytqueryapi.EngineYql,
})
if err != nil {
return nil, errors.Wrap(err, "execute query")
}

iter, err := ReadResult[T](ctx, c, queryID)
if err != nil {
return nil, errors.Wrap(err, "read result")
}
return iter, nil
}

0 comments on commit 328edc6

Please sign in to comment.