Skip to content

Commit

Permalink
Fix never ending loop when consumer responds with an error 400
Browse files Browse the repository at this point in the history
Signed-off-by: Md Soharab Ansari <[email protected]>
  • Loading branch information
soharab-ic committed Oct 23, 2024
1 parent b4b8853 commit 0127b89
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions nats-jetstream-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type jetstreamConnector struct {
}

func main() {

host := os.Getenv("NATS_SERVER")
consumer := os.Getenv("CONSUMER")
ackwait := os.Getenv("ACKWAIT")
Expand All @@ -40,7 +39,11 @@ func main() {
log.Fatalf("can't initialize zap logger: %v", err)
}

nc, _ := nats.Connect(host)
nc, err := nats.Connect(host)
if err != nil {
logger.Fatal("error while connecting to NATS:", zap.Error(err))
}

js, err := nc.JetStream()
if err != nil {
logger.Fatal("error while getting jetstream context:", zap.Error(err))
Expand Down Expand Up @@ -69,7 +72,7 @@ func main() {

err = conn.consumeMessage()
if err != nil {
conn.logger.Fatal("error occurred while parsing metadata", zap.Error(err))
conn.logger.Fatal("error occurred while consuming messages", zap.Error(err))
}
}

Expand Down Expand Up @@ -117,7 +120,7 @@ func (conn jetstreamConnector) consumeMessage() error {
// will be reading records from the start from the stream.
}, nats.Durable(conn.consumer), nats.ManualAck(), nats.AckWait(ackwait))
if err != nil {
conn.logger.Debug("error occurred while parsing metadata", zap.Error(err))
conn.logger.Fatal("error occurred while subscribing to topic", zap.Error(err))
return err
}

Expand All @@ -129,20 +132,22 @@ func (conn jetstreamConnector) consumeMessage() error {
conn.logger.Info("received signal", zap.String("signal", sig.String()))
cancel()
<-signalChan
panic("double signal received")
conn.logger.Warn("double signal received, shutting down gracefully")
}()

<-ctx.Done()
conn.logger.Info("unsubscribing and closing connection...")
err = sub.Unsubscribe()
if err != nil {
conn.logger.Error("error while unsubscribing", zap.Error(err))
}

close(conn.concurrentSem)

return nil
}

func (conn jetstreamConnector) handleHTTPRequest(msg *nats.Msg) {

headers := http.Header{
"Topic": {conn.connectordata.Topic},
"RespTopic": {conn.connectordata.ResponseTopic},
Expand All @@ -155,26 +160,24 @@ func (conn jetstreamConnector) handleHTTPRequest(msg *nats.Msg) {

resp, err := common.HandleHTTPRequest(string(msg.Data), headers, conn.connectordata, conn.logger)
if err != nil {
conn.logger.Info(err.Error())
conn.logger.Error("error handling HTTP request", zap.Error(err))
conn.errorHandler(err)
conn.acknowledgeMsg(msg)
} else {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
conn.logger.Info(err.Error())
conn.logger.Error("error reading response body", zap.Error(err))
conn.errorHandler(err)
conn.acknowledgeMsg(msg)
} else {
if success := conn.responseHandler(body); success {
err = msg.Ack()
if err != nil {
conn.logger.Info(err.Error())
conn.errorHandler(err)
}
conn.logger.Info("done processing message",
zap.String("message", string(body)))
conn.acknowledgeMsg(msg)
conn.logger.Info("done processing message", zap.String("message", string(body)))
}
}
}

<-conn.concurrentSem
}

Expand All @@ -199,7 +202,6 @@ func (conn jetstreamConnector) responseHandler(response []byte) bool {
}

func (conn jetstreamConnector) errorHandler(err error) {

if len(conn.connectordata.ErrorTopic) == 0 {
conn.logger.Warn("error topic not set")
return
Expand All @@ -214,3 +216,10 @@ func (conn jetstreamConnector) errorHandler(err error) {
zap.String("topic", conn.connectordata.ErrorTopic))
}
}

func (conn jetstreamConnector) acknowledgeMsg(msg *nats.Msg) {
err := msg.Ack()
if err != nil {
conn.logger.Error("error acknowledging message", zap.Error(err))
}
}

0 comments on commit 0127b89

Please sign in to comment.