Skip to content

Commit

Permalink
fix: fix reader block
Browse files Browse the repository at this point in the history
  • Loading branch information
yywing committed Sep 28, 2023
1 parent 919a893 commit f4ea4b2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
9 changes: 8 additions & 1 deletion channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Channel struct {

// closed is set to 1 when the channel has been closed - see Channel.send()
closed int32
close chan struct{}

// true when we will never notify again
noNotify bool
Expand Down Expand Up @@ -86,6 +87,7 @@ func newChannel(c *Connection, id uint16) *Channel {
confirms: newConfirms(),
recv: (*Channel).recvMethod,
errors: make(chan *Error, 1),
close: make(chan struct{}),
}
}

Expand Down Expand Up @@ -146,6 +148,7 @@ func (ch *Channel) shutdown(e *Error) {
}

close(ch.errors)
close(ch.close)
ch.noNotify = true
})
}
Expand Down Expand Up @@ -368,7 +371,11 @@ func (ch *Channel) dispatch(msg message) {
// deliveries are in flight and a no-wait cancel has happened

default:
ch.rpc <- msg
select {
case <-ch.close:
return
case ch.rpc <- msg:
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type Connection struct {
blocks []chan Blocking

errors chan *Error
// if connection is closed should close this chan
close chan struct{}

Config Config // The negotiated Config after connection.open

Expand Down Expand Up @@ -263,6 +265,7 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
rpc: make(chan message),
sends: make(chan time.Time),
errors: make(chan *Error, 1),
close: make(chan struct{}),
deadlines: make(chan readDeadliner, 1),
}
go c.reader(conn)
Expand Down Expand Up @@ -597,6 +600,8 @@ func (c *Connection) shutdown(err *Error) {
}

c.conn.Close()
// reader exit
close(c.close)

c.channels = nil
c.allocator = nil
Expand Down Expand Up @@ -634,7 +639,12 @@ func (c *Connection) dispatch0(f frame) {
c <- Blocking{Active: false}
}
default:
c.rpc <- m
select {
case <-c.close:
return
case c.rpc <- m:
}

}
case *heartbeatFrame:
// kthx - all reads reset our deadline. so we can drop this
Expand Down

0 comments on commit f4ea4b2

Please sign in to comment.