Skip to content

Commit

Permalink
publish bids to redis async
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Aug 5, 2024
1 parent 53a6055 commit 075cb26
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions services/bidcollect/bid-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type BidProcessor struct {
csvFileEnding string

redisClient *redis.Client
redisC chan *types.CommonBid
}

func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) {
Expand All @@ -58,6 +59,7 @@ func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) {
outFiles: make(map[int64]*OutFiles),
bidCache: make(map[uint64]map[string]*types.CommonBid),
topBidCache: make(map[uint64]*types.CommonBid),
redisC: make(chan *types.CommonBid, 10),
}

if opts.OutputTSV {
Expand All @@ -84,6 +86,12 @@ func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) {
}

func (c *BidProcessor) Start() {
// If needed, start publish worker
if c.opts.RedisAddr != "" {
go c.redisPublishWorker()
}

// Main loop
for {
time.Sleep(30 * time.Second)
c.housekeeping()
Expand Down Expand Up @@ -120,11 +128,12 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) {
isNewBid = true
}

// Send to Redis
if c.redisClient != nil {
err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err()
if err != nil {
c.log.WithError(err).Error("failed to publish bid to redis")
// Publish bid to Redis (async)
select {
case c.redisC <- bid:
default:
c.log.Warnf("redis channel full, dropping bid %s", bid.UniqueKey())
}
}

Expand All @@ -133,6 +142,15 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) {
}
}

func (c *BidProcessor) redisPublishWorker() {
for bid := range c.redisC {
err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err()
if err != nil {
c.log.WithError(err).Error("failed to publish bid to redis")
}
}
}

func (c *BidProcessor) writeBidToFile(bid *types.CommonBid, isNewBid, isTopBid bool) {
fAll, fTop, err := c.getFiles(bid)
if err != nil {
Expand Down

0 comments on commit 075cb26

Please sign in to comment.