shards: load up to NumCPU() shards in parallel
Second try now that #23 is fixed.
Fixes #35
Change-Id: Icc8669ec559f7b1e9ae52556aea183af994afe61
diff --git a/shards/shards.go b/shards/shards.go
index 289fe31..1eba4f9 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -16,7 +16,9 @@
import (
"context"
+ "fmt"
"log"
+ "os"
"runtime"
"runtime/debug"
"sort"
@@ -52,7 +54,11 @@
// shards corresponding to a glob into memory.
func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
ss := newShardedSearcher(int64(runtime.NumCPU()))
- _, err := NewDirectoryWatcher(dir, ss)
+ tl := &throttledLoader{
+ ss: ss,
+ throttle: make(chan struct{}, runtime.NumCPU()),
+ }
+ _, err := NewDirectoryWatcher(dir, tl)
if err != nil {
return nil, err
}
@@ -60,6 +66,28 @@
return ss, nil
}
+// throttledLoader tries to load up to throttle shards in parallel.
+type throttledLoader struct {
+ ss *shardedSearcher
+ throttle chan struct{}
+}
+
+func (tl *throttledLoader) load(key string) {
+ tl.throttle <- struct{}{}
+ shard, err := loadShard(key)
+ <-tl.throttle
+ log.Printf("reloading: %s, err %v ", key, err)
+ if err != nil {
+ return
+ }
+
+ tl.ss.replace(key, shard)
+}
+
+func (tl *throttledLoader) drop(key string) {
+ tl.ss.replace(key, nil)
+}
+
func (ss *shardedSearcher) String() string {
return "shardedSearcher"
}
@@ -249,20 +277,6 @@
s.throttle.Release(s.capacity)
}
-func (s *shardedSearcher) load(key string) {
- shard, err := loadShard(key)
- log.Printf("reloading: %s, err %v ", key, err)
- if err != nil {
- return
- }
-
- s.replace(key, shard)
-}
-
-func (s *shardedSearcher) drop(key string) {
- s.replace(key, nil)
-}
-
func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) {
s.lock(context.Background())
defer s.unlock()
@@ -277,3 +291,22 @@
s.shards[key] = shard
}
}
+
+func loadShard(fn string) (zoekt.Searcher, error) {
+ f, err := os.Open(fn)
+ if err != nil {
+ return nil, err
+ }
+
+ iFile, err := zoekt.NewIndexFile(f)
+ if err != nil {
+ return nil, err
+ }
+ s, err := zoekt.NewSearcher(iFile)
+ if err != nil {
+ iFile.Close()
+ return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
+ }
+
+ return s, nil
+}
diff --git a/shards/watcher.go b/shards/watcher.go
index 10b5fea..766f851 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -20,13 +20,14 @@
"log"
"os"
"path/filepath"
+ "sync"
"time"
"github.com/fsnotify/fsnotify"
- "github.com/google/zoekt"
)
type shardLoader interface {
+ // Load a new file. Should be safe for concurrent calls.
load(filename string)
drop(filename string)
}
@@ -65,25 +66,6 @@
return sw, nil
}
-func loadShard(fn string) (zoekt.Searcher, error) {
- f, err := os.Open(fn)
- if err != nil {
- return nil, err
- }
-
- iFile, err := zoekt.NewIndexFile(f)
- if err != nil {
- return nil, err
- }
- s, err := zoekt.NewSearcher(iFile)
- if err != nil {
- iFile.Close()
- return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
- }
-
- return s, nil
-}
-
func (s *shardWatcher) String() string {
return fmt.Sprintf("shardWatcher(%s)", s.dir)
}
@@ -130,9 +112,15 @@
s.loader.drop(t)
}
+ var wg sync.WaitGroup
for _, t := range toLoad {
- s.loader.load(t)
+ wg.Add(1)
+ go func(k string) {
+ s.loader.load(k)
+ wg.Done()
+ }(t)
}
+ wg.Wait()
return nil
}