Skip to content

Commit

Permalink
Add schema registry client function (#1565)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstucki authored Oct 15, 2024
1 parent 569ede5 commit 56e8cc7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 10 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go/modules/k3s v0.32.0
github.com/twmb/franz-go v1.15.4
github.com/twmb/franz-go v1.18.0
github.com/twmb/franz-go/pkg/sr v1.2.0
github.com/wk8/go-ordered-map/v2 v2.1.8
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
Expand Down Expand Up @@ -152,7 +153,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
Expand All @@ -178,7 +179,7 @@ require (
github.com/texttheater/golang-levenshtein v1.0.1 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
github.com/twmb/tlscfg v1.2.1 // indirect
github.com/virtuald/go-ordered-json v0.0.0-20170621173500-b18e6e673d74 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down Expand Up @@ -647,10 +647,12 @@ github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5I
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw=
github.com/twmb/franz-go v1.15.4/go.mod h1:rC18hqNmfo8TMc1kz7CQmHL74PLNF8KVvhflxiiJZCU=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
github.com/twmb/franz-go/pkg/sr v1.2.0 h1:zYr0Ly7KLFfeCGaSr8teN6LvAVeYVrZoUsyyPHTYB+M=
github.com/twmb/franz-go/pkg/sr v1.2.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw=
github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44=
github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs=
github.com/virtuald/go-ordered-json v0.0.0-20170621173500-b18e6e673d74 h1:JwtAtbp7r/7QSyGz8mKUbYJBg2+6Cd7OjM8o/GNOcVo=
Expand Down
65 changes: 64 additions & 1 deletion pkg/redpanda/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import (
"errors"
"fmt"
"net"
"net/http"
"slices"
"strings"
"time"

"github.com/redpanda-data/helm-charts/charts/redpanda"
"github.com/redpanda-data/helm-charts/pkg/gotohelm/helmette"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/pkg/sr"

"github.com/redpanda-data/common-go/rpadmin"

Expand Down Expand Up @@ -56,7 +59,7 @@ func AdminClient(dot *helmette.Dot, dialer DialContextFunc) (*rpadmin.AdminAPI,
if redpanda.TLSEnabled(dot) {
prefix = "https://"

tlsConfig, err = tlsConfigFromDot(dot, values.Listeners.Kafka.TLS.Cert)
tlsConfig, err = tlsConfigFromDot(dot, values.Listeners.Admin.TLS.Cert)
if err != nil {
return nil, err
}
Expand All @@ -82,6 +85,66 @@ func AdminClient(dot *helmette.Dot, dialer DialContextFunc) (*rpadmin.AdminAPI,
return rpadmin.NewAdminAPIWithDialer(hosts, auth, tlsConfig, dialer)
}

// SchemaRegistryClient creates a client to talk to a Redpanda cluster admin API based on its helm
// configuration over its internal listeners.
func SchemaRegistryClient(dot *helmette.Dot, dialer DialContextFunc, opts ...sr.ClientOpt) (*sr.Client, error) {
values := helmette.Unwrap[redpanda.Values](dot.Values)
name := redpanda.Fullname(dot)
domain := redpanda.InternalDomain(dot)
prefix := "http://"

// These transport values come from the TLS client options found here:
// https://github.com/twmb/franz-go/blob/cea7aa5d803781e5f0162187795482ba1990c729/pkg/sr/clientopt.go#L48-L68
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
DialContext: dialer,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

if dialer == nil {
transport.DialContext = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext
}

if redpanda.TLSEnabled(dot) {
prefix = "https://"

tlsConfig, err := tlsConfigFromDot(dot, values.Listeners.SchemaRegistry.TLS.Cert)
if err != nil {
return nil, err
}
transport.TLSClientConfig = tlsConfig
}

copts := []sr.ClientOpt{sr.HTTPClient(&http.Client{
Timeout: 5 * time.Second,
Transport: transport,
})}

username, password, _, err := authFromDot(dot)
if err != nil {
return nil, err
}

if username != "" {
copts = append(copts, sr.BasicAuth(username, password))
}

hosts := redpanda.ServerList(values.Statefulset.Replicas, prefix, name, domain, values.Listeners.SchemaRegistry.Port)
copts = append(copts, sr.URLs(hosts...))

// finally, override any calculated client opts with whatever was
// passed in
return sr.NewClient(append(copts, opts...)...)
}

// KafkaClient creates a client to talk to a Redpanda cluster based on its helm
// configuration over its internal listeners.
func KafkaClient(dot *helmette.Dot, dialer DialContextFunc, opts ...kgo.Opt) (*kgo.Client, error) {
Expand Down

0 comments on commit 56e8cc7

Please sign in to comment.