Skip to content

Commit

Permalink
Fire async error on panic in scorch async routine (#1566)
Browse files Browse the repository at this point in the history
* Index is always left closed when opening process failed

* Fire async error on panic in scorch async routine

* s.asyncTasks.Done() call fixed

* asyncTasks.Done call moved to common defer block

Co-authored-by: Pavel Bazika <[email protected]>
  • Loading branch information
pavelbazika and Pavel Bazika authored Sep 2, 2021
1 parent 2442216 commit d199c93
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
13 changes: 11 additions & 2 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ type epochWatcher struct {
}

func (s *Scorch) introducerLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "introducer",
Path: s.path,
})
}

s.asyncTasks.Done()
}()

var epochWatchers []*epochWatcher
OUTER:
for {
Expand Down Expand Up @@ -88,8 +99,6 @@ OUTER:
}
epochWatchers = epochWatchersNext
}

s.asyncTasks.Done()
}

func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
Expand Down
14 changes: 11 additions & 3 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,22 @@ import (
)

func (s *Scorch) mergerLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "merger",
Path: s.path,
})
}

s.asyncTasks.Done()
}()

var lastEpochMergePlanned uint64
var ctrlMsg *mergerCtrl
mergePlannerOptions, err := s.parseMergePlannerOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
s.asyncTasks.Done()
return
}
ctrlMsgDflt := &mergerCtrl{ctx: context.Background(),
Expand Down Expand Up @@ -130,8 +140,6 @@ OUTER:

atomic.AddUint64(&s.stats.TotFileMergeLoopEnd, 1)
}

s.asyncTasks.Done()
}

type mergerCtrl struct {
Expand Down
11 changes: 10 additions & 1 deletion index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,16 @@ type persisterOptions struct {
type notificationChan chan struct{}

func (s *Scorch) persisterLoop() {
defer s.asyncTasks.Done()
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "persister",
Path: s.path,
})
}

s.asyncTasks.Done()
}()

var persistWatchers []*epochWatcher
var lastPersistedEpoch, lastMergedEpoch uint64
Expand Down
10 changes: 10 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ type Scorch struct {
segPlugin SegmentPlugin
}

// AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process
type AsyncPanicError struct {
Source string
Path string
}

func (e *AsyncPanicError) Error() string {
return fmt.Sprintf("%s panic when processing %s", e.Source, e.Path)
}

type internalStats struct {
persistEpoch uint64
persistSnapshotSize uint64
Expand Down

0 comments on commit d199c93

Please sign in to comment.