Skip to content

Commit

Permalink
Add Schema Registry to published rpk profile
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstucki committed Nov 14, 2024
1 parent f53a0ad commit 032cd37
Show file tree
Hide file tree
Showing 3 changed files with 764 additions and 28 deletions.
52 changes: 49 additions & 3 deletions charts/redpanda/configmap.tpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func rpkProfile(dot *helmette.Dot) map[string]any {
adminAdvertisedList = append(adminAdvertisedList, fmt.Sprintf("%s:%d", advertisedHost(dot, i), int(advertisedAdminPort(dot, i))))
}

schemaAdvertisedList := []string{}
for i := int32(0); i < values.Statefulset.Replicas; i++ {
schemaAdvertisedList = append(schemaAdvertisedList, fmt.Sprintf("%s:%d", advertisedHost(dot, i), int(advertisedSchemaPort(dot, i))))
}

kafkaTLS := rpkKafkaClientTLSConfiguration(dot)
if _, ok := kafkaTLS["ca_file"]; ok {
kafkaTLS["ca_file"] = "ca.crt"
Expand All @@ -177,6 +182,11 @@ func rpkProfile(dot *helmette.Dot) map[string]any {
adminTLS["ca_file"] = "ca.crt"
}

schemaTLS := rpkSchemaRegistryClientTLSConfiguration(dot)
if _, ok := schemaTLS["ca_file"]; ok {
schemaTLS["ca_file"] = "ca.crt"
}

ka := map[string]any{
"brokers": brokerList,
"tls": nil,
Expand All @@ -195,10 +205,20 @@ func rpkProfile(dot *helmette.Dot) map[string]any {
aa["tls"] = adminTLS
}

sa := map[string]any{
"addresses": schemaAdvertisedList,
"tls": nil,
}

if len(schemaTLS) > 0 {
sa["tls"] = schemaTLS
}

result := map[string]any{
"name": getFirstExternalKafkaListener(dot),
"kafka_api": ka,
"admin_api": aa,
"name": getFirstExternalKafkaListener(dot),
"kafka_api": ka,
"admin_api": aa,
"schema_registry": sa,
}

return result
Expand Down Expand Up @@ -252,6 +272,32 @@ func advertisedAdminPort(dot *helmette.Dot, i int32) int {
return port
}

func advertisedSchemaPort(dot *helmette.Dot, i int32) int {
values := helmette.Unwrap[Values](dot.Values)

keys := helmette.Keys(values.Listeners.SchemaRegistry.External)

helmette.SortAlpha(keys)

externalSchemaListenerName := helmette.First(keys)

listener := values.Listeners.SchemaRegistry.External[externalSchemaListenerName.(string)]

port := int(values.Listeners.SchemaRegistry.Port)

if int(listener.Port) > 1 {
port = int(listener.Port)
}

if len(listener.AdvertisedPorts) > 1 {
port = int(listener.AdvertisedPorts[i])
} else if len(listener.AdvertisedPorts) == 1 {
port = int(listener.AdvertisedPorts[0])
}

return port
}

func advertisedHost(dot *helmette.Dot, i int32) string {
values := helmette.Unwrap[Values](dot.Values)

Expand Down
93 changes: 68 additions & 25 deletions charts/redpanda/templates/_configmap.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@
{{- if $_is_returning -}}
{{- break -}}
{{- end -}}
{{- $schemaAdvertisedList := (list ) -}}
{{- range $_, $i := untilStep (((0 | int) | int)|int) (($values.statefulset.replicas | int)|int) (1|int) -}}
{{- $schemaAdvertisedList = (concat (default (list ) $schemaAdvertisedList) (list (printf "%s:%d" (get (fromJson (include "redpanda.advertisedHost" (dict "a" (list $dot $i) ))) "r") (((get (fromJson (include "redpanda.advertisedSchemaPort" (dict "a" (list $dot $i) ))) "r") | int) | int)))) -}}
{{- end -}}
{{- if $_is_returning -}}
{{- break -}}
{{- end -}}
{{- $kafkaTLS := (get (fromJson (include "redpanda.rpkKafkaClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- $tmp_tuple_3 := (get (fromJson (include "_shims.compact" (dict "a" (list (get (fromJson (include "_shims.dicttest" (dict "a" (list $kafkaTLS "ca_file" (coalesce nil)) ))) "r")) ))) "r") -}}
{{- $ok_3 := $tmp_tuple_3.T2 -}}
Expand All @@ -118,6 +125,12 @@
{{- if $ok_4 -}}
{{- $_ := (set $adminTLS "ca_file" "ca.crt") -}}
{{- end -}}
{{- $schemaTLS := (get (fromJson (include "redpanda.rpkSchemaRegistryClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- $tmp_tuple_5 := (get (fromJson (include "_shims.compact" (dict "a" (list (get (fromJson (include "_shims.dicttest" (dict "a" (list $schemaTLS "ca_file" (coalesce nil)) ))) "r")) ))) "r") -}}
{{- $ok_5 := $tmp_tuple_5.T2 -}}
{{- if $ok_5 -}}
{{- $_ := (set $schemaTLS "ca_file" "ca.crt") -}}
{{- end -}}
{{- $ka := (dict "brokers" $brokerList "tls" (coalesce nil) ) -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $kafkaTLS) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $ka "tls" $kafkaTLS) -}}
Expand All @@ -126,7 +139,11 @@
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $adminTLS) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $aa "tls" $adminTLS) -}}
{{- end -}}
{{- $result := (dict "name" (get (fromJson (include "redpanda.getFirstExternalKafkaListener" (dict "a" (list $dot) ))) "r") "kafka_api" $ka "admin_api" $aa ) -}}
{{- $sa := (dict "addresses" $schemaAdvertisedList "tls" (coalesce nil) ) -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $schemaTLS) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $sa "tls" $schemaTLS) -}}
{{- end -}}
{{- $result := (dict "name" (get (fromJson (include "redpanda.getFirstExternalKafkaListener" (dict "a" (list $dot) ))) "r") "kafka_api" $ka "admin_api" $aa "schema_registry" $sa ) -}}
{{- $_is_returning = true -}}
{{- (dict "r" $result) | toJson -}}
{{- break -}}
Expand Down Expand Up @@ -183,6 +200,32 @@
{{- end -}}
{{- end -}}

{{- define "redpanda.advertisedSchemaPort" -}}
{{- $dot := (index .a 0) -}}
{{- $i := (index .a 1) -}}
{{- range $_ := (list 1) -}}
{{- $_is_returning := false -}}
{{- $values := $dot.Values.AsMap -}}
{{- $keys := (keys $values.listeners.schemaRegistry.external) -}}
{{- $_ := (sortAlpha $keys) -}}
{{- $externalSchemaListenerName := (first $keys) -}}
{{- $listener := (index $values.listeners.schemaRegistry.external (get (fromJson (include "_shims.typeassertion" (dict "a" (list "string" $externalSchemaListenerName) ))) "r")) -}}
{{- $port := (($values.listeners.schemaRegistry.port | int) | int) -}}
{{- if (gt (($listener.port | int) | int) (1 | int)) -}}
{{- $port = (($listener.port | int) | int) -}}
{{- end -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $listener.advertisedPorts) ))) "r") | int) (1 | int)) -}}
{{- $port = ((index $listener.advertisedPorts $i) | int) -}}
{{- else -}}{{- if (eq ((get (fromJson (include "_shims.len" (dict "a" (list $listener.advertisedPorts) ))) "r") | int) (1 | int)) -}}
{{- $port = ((index $listener.advertisedPorts (0 | int)) | int) -}}
{{- end -}}
{{- end -}}
{{- $_is_returning = true -}}
{{- (dict "r" $port) | toJson -}}
{{- break -}}
{{- end -}}
{{- end -}}

{{- define "redpanda.advertisedHost" -}}
{{- $dot := (index .a 0) -}}
{{- $i := (index .a 1) -}}
Expand Down Expand Up @@ -251,19 +294,19 @@
{{- $values := $dot.Values.AsMap -}}
{{- $brokerList := (get (fromJson (include "redpanda.BrokerList" (dict "a" (list $dot ($values.statefulset.replicas | int) ($values.listeners.kafka.port | int)) ))) "r") -}}
{{- $adminTLS := (coalesce nil) -}}
{{- $tls_5 := (get (fromJson (include "redpanda.rpkAdminAPIClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_5) ))) "r") | int) (0 | int)) -}}
{{- $adminTLS = $tls_5 -}}
{{- $tls_6 := (get (fromJson (include "redpanda.rpkAdminAPIClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_6) ))) "r") | int) (0 | int)) -}}
{{- $adminTLS = $tls_6 -}}
{{- end -}}
{{- $brokerTLS := (coalesce nil) -}}
{{- $tls_6 := (get (fromJson (include "redpanda.rpkKafkaClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_6) ))) "r") | int) (0 | int)) -}}
{{- $brokerTLS = $tls_6 -}}
{{- $tls_7 := (get (fromJson (include "redpanda.rpkKafkaClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_7) ))) "r") | int) (0 | int)) -}}
{{- $brokerTLS = $tls_7 -}}
{{- end -}}
{{- $schemaRegistryTLS := (coalesce nil) -}}
{{- $tls_7 := (get (fromJson (include "redpanda.rpkSchemaRegistryClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_7) ))) "r") | int) (0 | int)) -}}
{{- $schemaRegistryTLS = $tls_7 -}}
{{- $tls_8 := (get (fromJson (include "redpanda.rpkSchemaRegistryClientTLSConfiguration" (dict "a" (list $dot) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_8) ))) "r") | int) (0 | int)) -}}
{{- $schemaRegistryTLS = $tls_8 -}}
{{- end -}}
{{- $result := (dict "overprovisioned" (get (fromJson (include "redpanda.RedpandaResources.GetOverProvisionValue" (dict "a" (list $values.resources) ))) "r") "enable_memory_locking" (get (fromJson (include "_shims.ptr_Deref" (dict "a" (list $values.resources.memory.enable_memory_locking false) ))) "r") "additional_start_flags" (get (fromJson (include "redpanda.RedpandaAdditionalStartFlags" (dict "a" (list $dot ((get (fromJson (include "redpanda.RedpandaSMP" (dict "a" (list $dot) ))) "r") | int64)) ))) "r") "kafka_api" (dict "brokers" $brokerList "tls" $brokerTLS ) "admin_api" (dict "addresses" (get (fromJson (include "redpanda.Listeners.AdminList" (dict "a" (list $values.listeners ($values.statefulset.replicas | int) (get (fromJson (include "redpanda.Fullname" (dict "a" (list $dot) ))) "r") (get (fromJson (include "redpanda.InternalDomain" (dict "a" (list $dot) ))) "r")) ))) "r") "tls" $adminTLS ) "schema_registry" (dict "addresses" (get (fromJson (include "redpanda.Listeners.SchemaRegistryList" (dict "a" (list $values.listeners ($values.statefulset.replicas | int) (get (fromJson (include "redpanda.Fullname" (dict "a" (list $dot) ))) "r") (get (fromJson (include "redpanda.InternalDomain" (dict "a" (list $dot) ))) "r")) ))) "r") "tls" $schemaRegistryTLS ) ) -}}
{{- $result = (merge (dict ) $result (get (fromJson (include "redpanda.Tuning.Translate" (dict "a" (list $values.tuning) ))) "r")) -}}
Expand Down Expand Up @@ -381,18 +424,18 @@
{{- $_ := (set $redpanda "kafka_api" (get (fromJson (include "redpanda.KafkaListeners.Listeners" (dict "a" (list $values.listeners.kafka $values.auth) ))) "r")) -}}
{{- $_ := (set $redpanda "rpc_server" (get (fromJson (include "redpanda.rpcListeners" (dict "a" (list $dot) ))) "r")) -}}
{{- $_ := (set $redpanda "admin_api_tls" (coalesce nil)) -}}
{{- $tls_8 := (get (fromJson (include "redpanda.AdminListeners.ListenersTLS" (dict "a" (list $values.listeners.admin $values.tls) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_8) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $redpanda "admin_api_tls" $tls_8) -}}
{{- end -}}
{{- $_ := (set $redpanda "kafka_api_tls" (coalesce nil)) -}}
{{- $tls_9 := (get (fromJson (include "redpanda.KafkaListeners.ListenersTLS" (dict "a" (list $values.listeners.kafka $values.tls) ))) "r") -}}
{{- $tls_9 := (get (fromJson (include "redpanda.AdminListeners.ListenersTLS" (dict "a" (list $values.listeners.admin $values.tls) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_9) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $redpanda "kafka_api_tls" $tls_9) -}}
{{- $_ := (set $redpanda "admin_api_tls" $tls_9) -}}
{{- end -}}
{{- $tls_10 := (get (fromJson (include "redpanda.rpcListenersTLS" (dict "a" (list $dot) ))) "r") -}}
{{- $_ := (set $redpanda "kafka_api_tls" (coalesce nil)) -}}
{{- $tls_10 := (get (fromJson (include "redpanda.KafkaListeners.ListenersTLS" (dict "a" (list $values.listeners.kafka $values.tls) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_10) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $redpanda "rpc_server_tls" $tls_10) -}}
{{- $_ := (set $redpanda "kafka_api_tls" $tls_10) -}}
{{- end -}}
{{- $tls_11 := (get (fromJson (include "redpanda.rpcListenersTLS" (dict "a" (list $dot) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_11) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $redpanda "rpc_server_tls" $tls_11) -}}
{{- end -}}
{{- end -}}
{{- end -}}
Expand All @@ -405,9 +448,9 @@
{{- $pandaProxy := (dict ) -}}
{{- $_ := (set $pandaProxy "pandaproxy_api" (get (fromJson (include "redpanda.HTTPListeners.Listeners" (dict "a" (list $values.listeners.http (get (fromJson (include "redpanda.Auth.IsSASLEnabled" (dict "a" (list $values.auth) ))) "r")) ))) "r")) -}}
{{- $_ := (set $pandaProxy "pandaproxy_api_tls" (coalesce nil)) -}}
{{- $tls_11 := (get (fromJson (include "redpanda.HTTPListeners.ListenersTLS" (dict "a" (list $values.listeners.http $values.tls) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_11) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $pandaProxy "pandaproxy_api_tls" $tls_11) -}}
{{- $tls_12 := (get (fromJson (include "redpanda.HTTPListeners.ListenersTLS" (dict "a" (list $values.listeners.http $values.tls) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_12) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $pandaProxy "pandaproxy_api_tls" $tls_12) -}}
{{- end -}}
{{- $_is_returning = true -}}
{{- (dict "r" $pandaProxy) | toJson -}}
Expand All @@ -423,9 +466,9 @@
{{- $schemaReg := (dict ) -}}
{{- $_ := (set $schemaReg "schema_registry_api" (get (fromJson (include "redpanda.SchemaRegistryListeners.Listeners" (dict "a" (list $values.listeners.schemaRegistry (get (fromJson (include "redpanda.Auth.IsSASLEnabled" (dict "a" (list $values.auth) ))) "r")) ))) "r")) -}}
{{- $_ := (set $schemaReg "schema_registry_api_tls" (coalesce nil)) -}}
{{- $tls_12 := (get (fromJson (include "redpanda.SchemaRegistryListeners.ListenersTLS" (dict "a" (list $values.listeners.schemaRegistry $values.tls) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_12) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $schemaReg "schema_registry_api_tls" $tls_12) -}}
{{- $tls_13 := (get (fromJson (include "redpanda.SchemaRegistryListeners.ListenersTLS" (dict "a" (list $values.listeners.schemaRegistry $values.tls) ))) "r") -}}
{{- if (gt ((get (fromJson (include "_shims.len" (dict "a" (list $tls_13) ))) "r") | int) (0 | int)) -}}
{{- $_ := (set $schemaReg "schema_registry_api_tls" $tls_13) -}}
{{- end -}}
{{- $_is_returning = true -}}
{{- (dict "r" $schemaReg) | toJson -}}
Expand Down
Loading

0 comments on commit 032cd37

Please sign in to comment.