Revert "shards: load up to NumCPU() shards in parallel"
This reverts commit 3561dc98352f3c706a4f48ca33dad8d3f55c42de.
Reason for revert: causes unexplained deadlocks, but now on startup.
Change-Id: I85d1dcb25d5fd219749cc7f9f69a589e60693f79
diff --git a/shards/shards.go b/shards/shards.go
index 93ea233..7de7618 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -16,9 +16,7 @@
import (
"context"
- "fmt"
"log"
- "os"
"runtime"
"runtime/debug"
"sort"
@@ -28,28 +26,6 @@
"github.com/google/zoekt/query"
)
-// throttledLoader tries to load up to throttle shards in parallel.
-type throttledLoader struct {
- ss *shardedSearcher
- throttle chan struct{}
-}
-
-func (tl *throttledLoader) load(key string) {
- tl.throttle <- struct{}{}
- shard, err := loadShard(key)
- <-tl.throttle
- log.Printf("reloading: %s, err %v ", key, err)
- if err != nil {
- return
- }
-
- tl.ss.replace(key, shard)
-}
-
-func (tl *throttledLoader) drop(key string) {
- tl.ss.replace(key, nil)
-}
-
// NewDirectorySearcher returns a searcher instance that loads all
// shards corresponding to a glob into memory.
func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
@@ -57,12 +33,7 @@
shards: make(map[string]zoekt.Searcher),
throttle: make(chan struct{}, runtime.NumCPU()),
}
-
- tl := &throttledLoader{
- ss: ss,
- throttle: make(chan struct{}, runtime.NumCPU()),
- }
- _, err := NewDirectoryWatcher(dir, tl)
+ _, err := NewDirectoryWatcher(dir, ss)
if err != nil {
return nil, err
}
@@ -273,6 +244,20 @@
}
}
+func (s *shardedSearcher) load(key string) {
+ shard, err := loadShard(key)
+ log.Printf("reloading: %s, err %v ", key, err)
+ if err != nil {
+ return
+ }
+
+ s.replace(key, shard)
+}
+
+func (s *shardedSearcher) drop(key string) {
+ s.replace(key, nil)
+}
+
func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) {
s.lock()
defer s.unlock()
@@ -287,22 +272,3 @@
s.shards[key] = shard
}
}
-
-func loadShard(fn string) (zoekt.Searcher, error) {
- f, err := os.Open(fn)
- if err != nil {
- return nil, err
- }
-
- iFile, err := zoekt.NewIndexFile(f)
- if err != nil {
- return nil, err
- }
- s, err := zoekt.NewSearcher(iFile)
- if err != nil {
- iFile.Close()
- return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
- }
-
- return s, nil
-}
diff --git a/shards/watcher.go b/shards/watcher.go
index 766f851..10b5fea 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -20,14 +20,13 @@
"log"
"os"
"path/filepath"
- "sync"
"time"
"github.com/fsnotify/fsnotify"
+ "github.com/google/zoekt"
)
type shardLoader interface {
- // Load a new file. Should be safe for concurrent calls.
load(filename string)
drop(filename string)
}
@@ -66,6 +65,25 @@
return sw, nil
}
+func loadShard(fn string) (zoekt.Searcher, error) {
+ f, err := os.Open(fn)
+ if err != nil {
+ return nil, err
+ }
+
+ iFile, err := zoekt.NewIndexFile(f)
+ if err != nil {
+ return nil, err
+ }
+ s, err := zoekt.NewSearcher(iFile)
+ if err != nil {
+ iFile.Close()
+ return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
+ }
+
+ return s, nil
+}
+
func (s *shardWatcher) String() string {
return fmt.Sprintf("shardWatcher(%s)", s.dir)
}
@@ -112,15 +130,9 @@
s.loader.drop(t)
}
- var wg sync.WaitGroup
for _, t := range toLoad {
- wg.Add(1)
- go func(k string) {
- s.loader.load(k)
- wg.Done()
- }(t)
+ s.loader.load(t)
}
- wg.Wait()
return nil
}