From 075cb26372c2bcdb88486fd3070b07e739337852 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Mon, 5 Aug 2024 17:05:30 +0200 Subject: [PATCH] publish bids to redis async --- services/bidcollect/bid-processor.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go index 9093d37..c6c3fc5 100644 --- a/services/bidcollect/bid-processor.go +++ b/services/bidcollect/bid-processor.go @@ -49,6 +49,7 @@ type BidProcessor struct { csvFileEnding string redisClient *redis.Client + redisC chan *types.CommonBid } func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) { @@ -58,6 +59,7 @@ func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) { outFiles: make(map[int64]*OutFiles), bidCache: make(map[uint64]map[string]*types.CommonBid), topBidCache: make(map[uint64]*types.CommonBid), + redisC: make(chan *types.CommonBid, 10), } if opts.OutputTSV { @@ -84,6 +86,12 @@ func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) { } func (c *BidProcessor) Start() { + // If needed, start publish worker + if c.opts.RedisAddr != "" { + go c.redisPublishWorker() + } + + // Main loop for { time.Sleep(30 * time.Second) c.housekeeping() @@ -120,11 +128,12 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) { isNewBid = true } - // Send to Redis if c.redisClient != nil { - err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err() - if err != nil { - c.log.WithError(err).Error("failed to publish bid to redis") + // Publish bid to Redis (async) + select { + case c.redisC <- bid: + default: + c.log.Warnf("redis channel full, dropping bid %s", bid.UniqueKey()) } } @@ -133,6 +142,15 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) { } } +func (c *BidProcessor) redisPublishWorker() { + for bid := range c.redisC { + err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err() + if err != nil { + c.log.WithError(err).Error("failed to publish bid to redis") + } + } +} + func (c *BidProcessor) writeBidToFile(bid *types.CommonBid, isNewBid, isTopBid bool) { fAll, fTop, err := c.getFiles(bid) if err != nil {