From 2857142b80128a7f0cfb9a2eff08f2faecccdf31 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Fri, 6 Sep 2024 10:59:48 -0500 Subject: [PATCH] make the maximum accepted gRPC message size configurable For Gazette and consumers. Previously, the default (4MB) was used and could not be changed. Now it's configurable by flag and environment variable for brokers and consumers: --broker.max-grpc-recv-size= Maximum size of gRPC messages accepted by this server, in bytes (default: 4194304) [$BROKER_MAX_GRPC_RECV_SIZE] --- cmd/gazette/main.go | 2 +- mainboilerplate/runconsumer/run_consumer.go | 2 +- mainboilerplate/service.go | 1 + server/server.go | 5 +++-- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/gazette/main.go b/cmd/gazette/main.go index 3e28000f..2fdbcd77 100644 --- a/cmd/gazette/main.go +++ b/cmd/gazette/main.go @@ -85,7 +85,7 @@ func (cmdServe) Execute(args []string) error { } // Bind our server listener, grabbing a random available port if Port is zero. - srv, err := server.New("", Config.Broker.Host, Config.Broker.Port, serverTLS, peerTLS) + srv, err := server.New("", Config.Broker.Host, Config.Broker.Port, serverTLS, peerTLS, Config.Broker.MaxGRPCRecvSize) mbp.Must(err, "building Server instance") // If a file:// root was provided, ensure it exists and apply it. diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index 8c9fb4eb..6b044cdb 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -149,7 +149,7 @@ func (sc Cmd) Execute(args []string) error { } // Bind our server listener, grabbing a random available port if Port is zero. - srv, err := server.New("", bc.Consumer.Host, bc.Consumer.Port, serverTLS, peerTLS) + srv, err := server.New("", bc.Consumer.Host, bc.Consumer.Port, serverTLS, peerTLS, bc.Consumer.MaxGRPCRecvSize) mbp.Must(err, "building Server instance") if bc.Broker.Cache.Size <= 0 { diff --git a/mainboilerplate/service.go b/mainboilerplate/service.go index a1f36b5a..c91086fd 100644 --- a/mainboilerplate/service.go +++ b/mainboilerplate/service.go @@ -23,6 +23,7 @@ type ServiceConfig struct { PeerCertFile string `long:"peer-cert-file" env:"PEER_CERT_FILE" default:"" description:"Path to the client TLS certificate for peer-to-peer requests"` PeerCertKeyFile string `long:"peer-cert-key-file" env:"PEER_CERT_KEY_FILE" default:"" description:"Path to the client TLS private key for peer-to-peer requests"` PeerCAFile string `long:"peer-ca-file" env:"PEER_CA_FILE" default:"" description:"Path to the trusted CA for client verification of peer server certificates. When absent, the system CA pool is used instead."` + MaxGRPCRecvSize uint32 `long:"max-grpc-recv-size" env:"MAX_GRPC_RECV_SIZE" default:"4194304" description:"Maximum size of gRPC messages accepted by this server, in bytes"` } // ProcessSpec of the ServiceConfig. diff --git a/server/server.go b/server/server.go index 17ecdbe1..8f3d38fb 100644 --- a/server/server.go +++ b/server/server.go @@ -51,7 +51,7 @@ type Server struct { // and `port` for serving traffic directed at `host`. // `port` may be empty, in which case a random free port is assigned. // if `tlsConfig` is non-nil, the Server uses TLS (and is otherwise in the clear). -func New(iface, host, port string, serverTLS, peerTLS *tls.Config) (*Server, error) { +func New(iface, host, port string, serverTLS, peerTLS *tls.Config, maxSize uint32) (*Server, error) { var network, bind string if port == "" { network, bind = "tcp", fmt.Sprintf("%s:0", iface) // Assign a random free port. @@ -100,6 +100,7 @@ func New(iface, host, port string, serverTLS, peerTLS *tls.Config) (*Server, err GRPCServer: grpc.NewServer( grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), + grpc.MaxRecvMsgSize(int(maxSize)), ), } if serverTLS != nil { @@ -192,7 +193,7 @@ func BuildTLSConfig(certPath, keyPath, trustedCAPath string) (*tls.Config, error // MustLoopback builds and returns a new Server instance bound to a random // port on the loopback interface. It panics on error. func MustLoopback() *Server { - if srv, err := New("127.0.0.1", "127.0.0.1", "", nil, nil); err != nil { + if srv, err := New("127.0.0.1", "127.0.0.1", "", nil, nil, 1<<20); err != nil { log.WithField("err", err).Panic("failed to build Server") panic("not reached") } else {