diff --git a/index/scorch/communication.go b/index/scorch/communication.go new file mode 100644 index 000000000..9eac95814 --- /dev/null +++ b/index/scorch/communication.go @@ -0,0 +1,55 @@ +// Copyright (c) 2020 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scorch + +type notificationChan chan struct{} + +type epochWatcher struct { + epoch uint64 + notifyCh notificationChan +} + +type epochWatchers []*epochWatcher + +func (e *epochWatchers) Add(watcher *epochWatcher) { + *e = append(*e, watcher) +} + +func (e *epochWatchers) NotifySatisfiedWatchers(epoch uint64) { + var epochWatchersNext epochWatchers + for _, w := range *e { + if w.epoch < epoch { + close(w.notifyCh) + } else { + epochWatchersNext.Add(w) + } + } + *e = epochWatchersNext +} + +type watcherChan chan *epochWatcher + +func (w watcherChan) NotifyUsAfter(epoch uint64, closeCh chan struct{}) (*epochWatcher, error) { + ew := &epochWatcher{ + epoch: epoch, + notifyCh: make(notificationChan, 1), + } + select { + case <-closeCh: + return nil, ErrClosed + case w <- ew: + } + return ew, nil +} diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 64ca969bd..2f5349767 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -40,13 +40,8 @@ type persistIntroduction struct { applied notificationChan } -type epochWatcher struct { - epoch uint64 - notifyCh notificationChan -} - func (s *Scorch) introducerLoop() { - var epochWatchers []*epochWatcher + var introduceWatchers epochWatchers OUTER: for { atomic.AddUint64(&s.stats.TotIntroduceLoop, 1) @@ -56,7 +51,7 @@ OUTER: break OUTER case epochWatcher := <-s.introducerNotifier: - epochWatchers = append(epochWatchers, epochWatcher) + introduceWatchers.Add(epochWatcher) case nextMerge := <-s.merges: s.introduceMerge(nextMerge) @@ -78,15 +73,7 @@ OUTER: epochCurr = s.root.epoch } s.rootLock.RUnlock() - var epochWatchersNext []*epochWatcher - for _, w := range epochWatchers { - if w.epoch < epochCurr { - close(w.notifyCh) - } else { - epochWatchersNext = append(epochWatchersNext, w) - } - } - epochWatchers = epochWatchersNext + introduceWatchers.NotifySatisfiedWatchers(epochCurr) } s.asyncTasks.Done() diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 37dca529a..c04777270 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -27,12 +27,20 @@ import ( "github.com/blevesearch/bleve/index/scorch/segment" ) -func (s *Scorch) mergerLoop() { +func (s *Scorch) mergerLoop(initialEpoch uint64) { + defer s.asyncTasks.Done() + var lastEpochMergePlanned uint64 mergePlannerOptions, err := s.parseMergePlannerOptions() if err != nil { s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err)) - s.asyncTasks.Done() + return + } + + // tell the persister we're waiting for anything after the initialEpoch + var ew *epochWatcher + ew, err = s.persisterNotifier.NotifyUsAfter(initialEpoch, s.closeCh) + if err != nil { return } @@ -44,7 +52,7 @@ OUTER: case <-s.closeCh: break OUTER - default: + case <-ew.notifyCh: // check to see if there is a new snapshot to persist s.rootLock.Lock() ourSnapshot := s.root @@ -78,32 +86,16 @@ OUTER: } _ = ourSnapshot.DecRef() - // tell the persister we're waiting for changes - // first make a epochWatcher chan - ew := &epochWatcher{ - epoch: lastEpochMergePlanned, - notifyCh: make(notificationChan, 1), - } - - // give it to the persister - select { - case <-s.closeCh: - break OUTER - case s.persisterNotifier <- ew: - } - - // now wait for persister (but also detect close) - select { - case <-s.closeCh: + // update the persister, that we're now waiting for something + // after lastEpochMergePlanned + ew, err = s.persisterNotifier.NotifyUsAfter(lastEpochMergePlanned, s.closeCh) + if err != nil { break OUTER - case <-ew.notifyCh: } } atomic.AddUint64(&s.stats.TotFileMergeLoopEnd, 1) } - - s.asyncTasks.Done() } func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions, diff --git a/index/scorch/persister.go b/index/scorch/persister.go index ffa656693..2f4f5e0db 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -79,12 +79,10 @@ type persisterOptions struct { MemoryPressurePauseThreshold uint64 } -type notificationChan chan struct{} - -func (s *Scorch) persisterLoop() { +func (s *Scorch) persisterLoop(initialEpoch uint64) { defer s.asyncTasks.Done() - var persistWatchers []*epochWatcher + var persistWatchers epochWatchers var lastPersistedEpoch, lastMergedEpoch uint64 var ew *epochWatcher @@ -97,157 +95,133 @@ func (s *Scorch) persisterLoop() { return } + // tell the introducer we're waiting for changes after the initial epoch + var introducerEpochWatcher *epochWatcher + introducerEpochWatcher, err = s.introducerNotifier.NotifyUsAfter(initialEpoch, s.closeCh) + if err != nil { + return + } + OUTER: for { atomic.AddUint64(&s.stats.TotPersistLoopBeg, 1) + atomic.AddUint64(&s.stats.TotPersistLoopWait, 1) select { case <-s.closeCh: break OUTER case ew = <-s.persisterNotifier: - persistWatchers = append(persistWatchers, ew) - default: - } - if ew != nil && ew.epoch > lastMergedEpoch { + persistWatchers.Add(ew) lastMergedEpoch = ew.epoch - } - lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch, - lastMergedEpoch, persistWatchers, po) + case <-introducerEpochWatcher.notifyCh: + // woken up, next loop should pick up work + atomic.AddUint64(&s.stats.TotPersistLoopWaitNotified, 1) - var ourSnapshot *IndexSnapshot - var ourPersisted []chan error - var ourPersistedCallbacks []index.BatchCallback + lastMergedEpoch, persistWatchers = s.pausePersisterForMergerCatchUp(lastPersistedEpoch, + lastMergedEpoch, persistWatchers, po) - // check to see if there is a new snapshot to persist - s.rootLock.Lock() - if s.root != nil && s.root.epoch > lastPersistedEpoch { - ourSnapshot = s.root - ourSnapshot.AddRef() - ourPersisted = s.rootPersisted - s.rootPersisted = nil - ourPersistedCallbacks = s.persistedCallbacks - s.persistedCallbacks = nil - atomic.StoreUint64(&s.iStats.persistSnapshotSize, uint64(ourSnapshot.Size())) - atomic.StoreUint64(&s.iStats.persistEpoch, ourSnapshot.epoch) - } - s.rootLock.Unlock() + var ourSnapshot *IndexSnapshot + var ourPersisted []chan error + var ourPersistedCallbacks []index.BatchCallback + + // check to see if there is a new snapshot to persist + s.rootLock.Lock() + if s.root != nil && s.root.epoch > lastPersistedEpoch { + ourSnapshot = s.root + ourSnapshot.AddRef() + ourPersisted = s.rootPersisted + s.rootPersisted = nil + ourPersistedCallbacks = s.persistedCallbacks + s.persistedCallbacks = nil + atomic.StoreUint64(&s.iStats.persistSnapshotSize, uint64(ourSnapshot.Size())) + atomic.StoreUint64(&s.iStats.persistEpoch, ourSnapshot.epoch) + } + s.rootLock.Unlock() - if ourSnapshot != nil { - startTime := time.Now() + if ourSnapshot != nil { + startTime := time.Now() - err := s.persistSnapshot(ourSnapshot, po) - for _, ch := range ourPersisted { - if err != nil { - ch <- err + err := s.persistSnapshot(ourSnapshot, po) + for _, ch := range ourPersisted { + if err != nil { + ch <- err + } + close(ch) } - close(ch) - } - if err != nil { - atomic.StoreUint64(&s.iStats.persistEpoch, 0) - if err == segment.ErrClosed { - // index has been closed + if err != nil { + atomic.StoreUint64(&s.iStats.persistEpoch, 0) + if err == segment.ErrClosed { + // index has been closed + _ = ourSnapshot.DecRef() + break OUTER + } + + // save this current snapshot's persistedCallbacks, to invoke during + // the retry attempt + unpersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...) + + s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err)) _ = ourSnapshot.DecRef() - break OUTER + atomic.AddUint64(&s.stats.TotPersistLoopErr, 1) + continue OUTER } - // save this current snapshot's persistedCallbacks, to invoke during - // the retry attempt - unpersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...) + if unpersistedCallbacks != nil { + // in the event of this being a retry attempt for persisting a snapshot + // that had earlier failed, prepend the persistedCallbacks associated + // with earlier segment(s) to the latest persistedCallbacks + ourPersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...) + unpersistedCallbacks = nil + } - s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err)) - _ = ourSnapshot.DecRef() - atomic.AddUint64(&s.stats.TotPersistLoopErr, 1) - continue OUTER - } + for i := range ourPersistedCallbacks { + ourPersistedCallbacks[i](err) + } - if unpersistedCallbacks != nil { - // in the event of this being a retry attempt for persisting a snapshot - // that had earlier failed, prepend the persistedCallbacks associated - // with earlier segment(s) to the latest persistedCallbacks - ourPersistedCallbacks = append(unpersistedCallbacks, ourPersistedCallbacks...) - unpersistedCallbacks = nil - } + atomic.StoreUint64(&s.stats.LastPersistedEpoch, ourSnapshot.epoch) - for i := range ourPersistedCallbacks { - ourPersistedCallbacks[i](err) - } + lastPersistedEpoch = ourSnapshot.epoch + for _, ew := range persistWatchers { + close(ew.notifyCh) + } - atomic.StoreUint64(&s.stats.LastPersistedEpoch, ourSnapshot.epoch) + persistWatchers = nil + _ = ourSnapshot.DecRef() - lastPersistedEpoch = ourSnapshot.epoch - for _, ew := range persistWatchers { - close(ew.notifyCh) - } + changed := false + s.rootLock.RLock() + if s.root != nil && s.root.epoch != lastPersistedEpoch { + changed = true + } + s.rootLock.RUnlock() - persistWatchers = nil - _ = ourSnapshot.DecRef() + s.fireEvent(EventKindPersisterProgress, time.Since(startTime)) - changed := false - s.rootLock.RLock() - if s.root != nil && s.root.epoch != lastPersistedEpoch { - changed = true + if changed { + atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1) + continue OUTER + } } - s.rootLock.RUnlock() - - s.fireEvent(EventKindPersisterProgress, time.Since(startTime)) - if changed { - atomic.AddUint64(&s.stats.TotPersistLoopProgress, 1) - continue OUTER + // tell the introducer we're waiting for changes after lastPersistedEpoch + introducerEpochWatcher, err = s.introducerNotifier.NotifyUsAfter(lastPersistedEpoch, s.closeCh) + if err != nil { + break OUTER } - } - - // tell the introducer we're waiting for changes - w := &epochWatcher{ - epoch: lastPersistedEpoch, - notifyCh: make(notificationChan, 1), - } - select { - case <-s.closeCh: - break OUTER - case s.introducerNotifier <- w: - } - - s.removeOldData() // might as well cleanup while waiting - - atomic.AddUint64(&s.stats.TotPersistLoopWait, 1) - - select { - case <-s.closeCh: - break OUTER - case <-w.notifyCh: - // woken up, next loop should pick up work - atomic.AddUint64(&s.stats.TotPersistLoopWaitNotified, 1) - case ew = <-s.persisterNotifier: - // if the watchers are already caught up then let them wait, - // else let them continue to do the catch up - persistWatchers = append(persistWatchers, ew) + s.removeOldData() // might as well cleanup while waiting } atomic.AddUint64(&s.stats.TotPersistLoopEnd, 1) } } -func notifyMergeWatchers(lastPersistedEpoch uint64, - persistWatchers []*epochWatcher) []*epochWatcher { - var watchersNext []*epochWatcher - for _, w := range persistWatchers { - if w.epoch < lastPersistedEpoch { - close(w.notifyCh) - } else { - watchersNext = append(watchersNext, w) - } - } - return watchersNext -} - func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, - lastMergedEpoch uint64, persistWatchers []*epochWatcher, - po *persisterOptions) (uint64, []*epochWatcher) { + lastMergedEpoch uint64, persistWatchers epochWatchers, + po *persisterOptions) (uint64, epochWatchers) { // First, let the watchers proceed if they lag behind - persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers) + persistWatchers.NotifySatisfiedWatchers(lastPersistedEpoch) // Check the merger lag by counting the segment files on disk, numFilesOnDisk, _, _ := s.diskFileStats(nil) @@ -264,9 +238,9 @@ func (s *Scorch) pausePersisterForMergerCatchUp(lastPersistedEpoch uint64, case ew := <-s.persisterNotifier: // unblock the merger in meantime - persistWatchers = append(persistWatchers, ew) + persistWatchers.Add(ew) lastMergedEpoch = ew.epoch - persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers) + persistWatchers.NotifySatisfiedWatchers(lastPersistedEpoch) atomic.AddUint64(&s.stats.TotPersisterMergerNapBreak, 1) } return lastMergedEpoch, persistWatchers @@ -293,14 +267,14 @@ OUTER: case <-s.closeCh: break OUTER case ew := <-s.persisterNotifier: - persistWatchers = append(persistWatchers, ew) + persistWatchers.Add(ew) lastMergedEpoch = ew.epoch } atomic.AddUint64(&s.stats.TotPersisterSlowMergerResume, 1) // let the watchers proceed if they lag behind - persistWatchers = notifyMergeWatchers(lastPersistedEpoch, persistWatchers) + persistWatchers.NotifySatisfiedWatchers(lastPersistedEpoch) numFilesOnDisk, _, _ = s.diskFileStats(nil) } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 698aaf16a..9d87e8da8 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -65,8 +65,8 @@ type Scorch struct { introductions chan *segmentIntroduction persists chan *persistIntroduction merges chan *segmentMerge - introducerNotifier chan *epochWatcher - persisterNotifier chan *epochWatcher + introducerNotifier watcherChan + persisterNotifier watcherChan rootBolt *bolt.DB asyncTasks sync.WaitGroup @@ -180,14 +180,16 @@ func (s *Scorch) Open() error { return err } + initialEpoch := s.root.epoch + s.asyncTasks.Add(1) go s.introducerLoop() if !s.readOnly && s.path != "" { s.asyncTasks.Add(1) - go s.persisterLoop() + go s.persisterLoop(initialEpoch) s.asyncTasks.Add(1) - go s.mergerLoop() + go s.mergerLoop(initialEpoch) } return nil @@ -238,8 +240,8 @@ func (s *Scorch) openBolt() error { s.introductions = make(chan *segmentIntroduction) s.persists = make(chan *persistIntroduction) s.merges = make(chan *segmentMerge) - s.introducerNotifier = make(chan *epochWatcher, 1) - s.persisterNotifier = make(chan *epochWatcher, 1) + s.introducerNotifier = make(watcherChan, 1) + s.persisterNotifier = make(watcherChan, 1) s.closeCh = make(chan struct{}) if !s.readOnly && s.path != "" {