Skip to content

Commit

Permalink
feat(client): blocking LineSenderPool (#53)
Browse files Browse the repository at this point in the history
Breaking change of an experimental API

The pool now blocks if there are already too many senders in use.
Pooled senders are released implicitly, via the Close call.
  • Loading branch information
puzpuzpuz authored Aug 14, 2024
1 parent 53a2d35 commit 9c27421
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 110 deletions.
48 changes: 33 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Golang client for QuestDB's [Influx Line Protocol](https://questdb.io/docs/refer
The library requires Go 1.19 or newer.

Features:
* Context-aware API.
* [Context](https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go)-aware API.
* Optimized for batch writes.
* Supports TLS encryption and ILP authentication.
* Automatic write retries and connection reuse for ILP over HTTP.
Expand Down Expand Up @@ -43,23 +43,40 @@ func main() {
}
// Make sure to close the sender on exit to release resources.
defer sender.Close(ctx)

// Send a few ILP messages.
tradedTs, err := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z")
if err != nil {
log.Fatal(err)
}
err = sender.
Table("trades").
Symbol("name", "test_ilp1").
Float64Column("value", 12.4).
AtNow(ctx)
Table("trades_go").
Symbol("pair", "USDGBP").
Symbol("type", "buy").
Float64Column("traded_price", 0.83).
Float64Column("limit_price", 0.84).
Int64Column("qty", 100).
At(ctx, tradedTs)
if err != nil {
log.Fatal(err)
}

tradedTs, err = time.Parse(time.RFC3339, "2022-08-06T15:04:06.987654Z")
if err != nil {
log.Fatal(err)
}
err = sender.
Table("trades").
Symbol("name", "test_ilp2").
Float64Column("value", 11.4).
At(ctx, time.Now().UnixNano())
Table("trades_go").
Symbol("pair", "GBPJPY").
Symbol("type", "sell").
Float64Column("traded_price", 135.97).
Float64Column("limit_price", 0.84).
Int64Column("qty", 400).
At(ctx, tradedTs)
if err != nil {
log.Fatal(err)
}

// Make sure that the messages are sent over the network.
err = sender.Flush(ctx)
if err != nil {
Expand All @@ -80,15 +97,15 @@ To connect via TCP, set the configuration string to:
**Warning: Experimental feature designed for use with HTTP senders ONLY**

Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
to cache previously-used `LineSender`s in memory so they can be reused without
having to allocate and instantiate new senders.
to pool previously-used `LineSender`s so they can be reused without having
to allocate and instantiate new senders.

A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders
A LineSenderPool is thread-safe and can be used to concurrently obtain senders
across multiple goroutines.

Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
execution block to Release the sender at the end of the goroutine.
execution block to Close the sender at the end of the goroutine.

Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:

Expand All @@ -112,7 +129,7 @@ func main() {
}
}()

sender, err := pool.Acquire(ctx)
sender, err := pool.Sender(ctx)
if err != nil {
panic(err)
}
Expand All @@ -122,7 +139,8 @@ func main() {
Float64Column("price", 123.45).
AtNow(ctx)

if err := pool.Release(ctx, sender); err != nil {
// Close call returns the sender back to the pool
if err := sender.Close(ctx); err != nil {
panic(err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func Messages(s LineSender) string {
}

func MsgCount(s LineSender) int {
if ps, ok := s.(*pooledSender); ok {
hs, _ := ps.wrapped.(*httpLineSender)
return hs.MsgCount()
}
if hs, ok := s.(*httpLineSender); ok {
return hs.MsgCount()
}
Expand Down
9 changes: 4 additions & 5 deletions http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
Expand Down Expand Up @@ -176,7 +175,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
)

if s.closed {
return errors.New("cannot flush a closed LineSender")
return errClosedSenderFlush
}

err := s.buf.LastErr()
Expand All @@ -187,7 +186,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
}
if s.buf.HasTable() {
s.buf.DiscardPendingMsg()
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
return errFlushWithPendingMessage
}

if s.buf.msgCount == 0 {
Expand Down Expand Up @@ -285,7 +284,7 @@ func (s *httpLineSender) BoolColumn(name string, val bool) LineSender {

func (s *httpLineSender) Close(ctx context.Context) error {
if s.closed {
return nil
return errDoubleSenderClose
}

var err error
Expand All @@ -309,7 +308,7 @@ func (s *httpLineSender) AtNow(ctx context.Context) error {

func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
if s.closed {
return errors.New("cannot queue new messages on a closed LineSender")
return errClosedSenderAt
}

sendTs := true
Expand Down
2 changes: 1 addition & 1 deletion http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func TestSenderDoubleClose(t *testing.T) {
assert.NoError(t, err)

err = sender.Close(ctx)
assert.NoError(t, err)
assert.Error(t, err)
}

func TestErrorOnFlushWhenSenderIsClosed(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ import (
"time"
)

var (
errClosedSenderFlush = errors.New("cannot flush a closed LineSender")
errFlushWithPendingMessage = errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
errClosedSenderAt = errors.New("cannot queue new messages on a closed LineSender")
errDoubleSenderClose = errors.New("double sender close")
)

// LineSender allows you to insert rows into QuestDB by sending ILP
// messages over HTTP or TCP protocol.
//
Expand Down
Loading

0 comments on commit 9c27421

Please sign in to comment.