Skip to content

Commit

Permalink
fix: Use SafeChan as TxnManager.closed (#35763)
Browse files Browse the repository at this point in the history
Resolves: #35762

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Aug 28, 2024
1 parent b75b257 commit 91223de
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/lifetime"
)

// NewTxnManager creates a new transaction manager.
Expand All @@ -25,7 +26,7 @@ func NewTxnManager() *TxnManager {
type TxnManager struct {
mu sync.Mutex
sessions map[message.TxnID]*TxnSession
closed chan struct{}
closed lifetime.SafeChan
}

// BeginNewTxn starts a new transaction with a session.
Expand Down Expand Up @@ -75,7 +76,7 @@ func (m *TxnManager) CleanupTxnUntil(ts uint64) {

// If the manager is on graceful shutdown and all transactions are cleaned up.
if len(m.sessions) == 0 && m.closed != nil {
close(m.closed)
m.closed.Close()
}
}

Expand All @@ -95,12 +96,12 @@ func (m *TxnManager) GetSessionOfTxn(id message.TxnID) (*TxnSession, error) {
func (m *TxnManager) GracefulClose() {
m.mu.Lock()
if m.closed == nil {
m.closed = make(chan struct{})
m.closed = lifetime.NewSafeChan()
if len(m.sessions) == 0 {
close(m.closed)
m.closed.Close()
}
}
m.mu.Unlock()

<-m.closed
<-m.closed.CloseCh()
}

0 comments on commit 91223de

Please sign in to comment.