diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go index 9093d37..c6c3fc5 100644 --- a/services/bidcollect/bid-processor.go +++ b/services/bidcollect/bid-processor.go @@ -49,6 +49,7 @@ type BidProcessor struct { csvFileEnding string redisClient *redis.Client + redisC chan *types.CommonBid } func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) { @@ -58,6 +59,7 @@ func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) { outFiles: make(map[int64]*OutFiles), bidCache: make(map[uint64]map[string]*types.CommonBid), topBidCache: make(map[uint64]*types.CommonBid), + redisC: make(chan *types.CommonBid, 10), } if opts.OutputTSV { @@ -84,6 +86,12 @@ func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) { } func (c *BidProcessor) Start() { + // If needed, start publish worker + if c.opts.RedisAddr != "" { + go c.redisPublishWorker() + } + + // Main loop for { time.Sleep(30 * time.Second) c.housekeeping() @@ -120,11 +128,12 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) { isNewBid = true } - // Send to Redis if c.redisClient != nil { - err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err() - if err != nil { - c.log.WithError(err).Error("failed to publish bid to redis") + // Publish bid to Redis (async) + select { + case c.redisC <- bid: + default: + c.log.Warnf("redis channel full, dropping bid %s", bid.UniqueKey()) } } @@ -133,6 +142,15 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) { } } +func (c *BidProcessor) redisPublishWorker() { + for bid := range c.redisC { + err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err() + if err != nil { + c.log.WithError(err).Error("failed to publish bid to redis") + } + } +} + func (c *BidProcessor) writeBidToFile(bid *types.CommonBid, isNewBid, isTopBid bool) { fAll, fTop, err := c.getFiles(bid) if err != nil {