Skip to content

Commit

Permalink
Merge pull request #290 from nikhan/nsq_ultireader
Browse files Browse the repository at this point in the history
fixes for NSQ and all blocks in general
  • Loading branch information
mikedewar committed Mar 11, 2014
2 parents 2b08702 + 623f580 commit 3b10d56
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
40 changes: 32 additions & 8 deletions st/blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func (b *Block) Build(c BlockChans) {
b.QuitChan = c.QuitChan

// route maps
b.inRoutes = make(map[string]chan interface{}, 10) // necessary to stop locking...
b.queryRoutes = make(map[string]chan chan interface{}, 10)
b.inRoutes = make(map[string]chan interface{}) // necessary to stop locking...
b.queryRoutes = make(map[string]chan chan interface{})

// broadcast channel
b.broadcast = make(chan interface{}, 10) // necessary to stop locking...
Expand All @@ -98,13 +98,13 @@ func (b *Block) SetId(Id string){
}

func (b *Block) InRoute(routeName string) chan interface{} {
route := make(chan interface{})
route := make(chan interface{}, 1000)
b.inRoutes[routeName] = route
return route
}

func (b *Block) QueryRoute(routeName string) chan chan interface{} {
route := make(chan chan interface{})
route := make(chan chan interface{}, 1000)
b.queryRoutes[routeName] = route
return route
}
Expand Down Expand Up @@ -205,7 +205,25 @@ func BlockRoutine(bi BlockInterface) {
if !ok {
break
}
b.inRoutes[msg.Route] <- msg.Msg

// every in channel is buffered a 1000 messages.
// if we cannot immediately send to that in channel we place the msg
// in a go routine and notify the user that the block routine's
// buffer has overflowed. this still allows for unrecoverable
// overflows (for example: a stuck run() function), but at least it
// offloads to memory/cpu instead of blocking.
select {
case b.inRoutes[msg.Route] <- msg.Msg:
default:
go func(id string, msgOut interface{}){
b.inRoutes[msg.Route] <- msgOut
loghub.Log <- &loghub.LogMsg{
Type: loghub.ERROR,
Data: "Dropping messages!",
Id: id,
}
}(b.Id, msg.Msg)
}

if msg.Route == "rule" {
go func(id string){
Expand All @@ -222,9 +240,15 @@ func BlockRoutine(bi BlockInterface) {
if !ok {
break
}
go func(){
b.queryRoutes[msg.Route] <- msg.RespChan
}()

select {
case b.queryRoutes[msg.Route] <- msg.RespChan:
default:
go func(){
b.queryRoutes[msg.Route] <- msg.RespChan
}()
}

case msg := <-b.AddChan:
outChans[msg.Route] = msg.Channel
case msg := <-b.DelChan:
Expand Down
6 changes: 5 additions & 1 deletion st/library/fromNSQ.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ func (b *FromNSQ) Run() {
b.Error(err)
}

reader, err := nsq.NewReader(topic, channel)
if reader != nil {
reader.Stop()
}

reader, err = nsq.NewReader(topic, channel)
if err != nil {
b.Error(err)
}
Expand Down

0 comments on commit 3b10d56

Please sign in to comment.