shards: load up to NumCPU() shards in parallel
Fixes #35
Change-Id: I22801ea079934b0895647c1c1510a8f3297cdd22
diff --git a/shards/shards.go b/shards/shards.go
index 7de7618..93ea233 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -16,7 +16,9 @@
import (
"context"
+ "fmt"
"log"
+ "os"
"runtime"
"runtime/debug"
"sort"
@@ -26,6 +28,28 @@
"github.com/google/zoekt/query"
)
+// throttledLoader tries to load up to throttle shards in parallel.
+type throttledLoader struct {
+ ss *shardedSearcher
+ throttle chan struct{}
+}
+
+func (tl *throttledLoader) load(key string) {
+ tl.throttle <- struct{}{}
+ shard, err := loadShard(key)
+ <-tl.throttle
+ log.Printf("reloading: %s, err %v ", key, err)
+ if err != nil {
+ return
+ }
+
+ tl.ss.replace(key, shard)
+}
+
+func (tl *throttledLoader) drop(key string) {
+ tl.ss.replace(key, nil)
+}
+
// NewDirectorySearcher returns a searcher instance that loads all
// shards corresponding to a glob into memory.
func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
@@ -33,7 +57,12 @@
shards: make(map[string]zoekt.Searcher),
throttle: make(chan struct{}, runtime.NumCPU()),
}
- _, err := NewDirectoryWatcher(dir, ss)
+
+ tl := &throttledLoader{
+ ss: ss,
+ throttle: make(chan struct{}, runtime.NumCPU()),
+ }
+ _, err := NewDirectoryWatcher(dir, tl)
if err != nil {
return nil, err
}
@@ -244,20 +273,6 @@
}
}
-func (s *shardedSearcher) load(key string) {
- shard, err := loadShard(key)
- log.Printf("reloading: %s, err %v ", key, err)
- if err != nil {
- return
- }
-
- s.replace(key, shard)
-}
-
-func (s *shardedSearcher) drop(key string) {
- s.replace(key, nil)
-}
-
func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) {
s.lock()
defer s.unlock()
@@ -272,3 +287,22 @@
s.shards[key] = shard
}
}
+
+func loadShard(fn string) (zoekt.Searcher, error) {
+ f, err := os.Open(fn)
+ if err != nil {
+ return nil, err
+ }
+
+ iFile, err := zoekt.NewIndexFile(f)
+ if err != nil {
+ return nil, err
+ }
+ s, err := zoekt.NewSearcher(iFile)
+ if err != nil {
+ iFile.Close()
+ return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
+ }
+
+ return s, nil
+}
diff --git a/shards/watcher.go b/shards/watcher.go
index 10b5fea..766f851 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -20,13 +20,14 @@
"log"
"os"
"path/filepath"
+ "sync"
"time"
"github.com/fsnotify/fsnotify"
- "github.com/google/zoekt"
)
type shardLoader interface {
+ // Load a new file. Should be safe for concurrent calls.
load(filename string)
drop(filename string)
}
@@ -65,25 +66,6 @@
return sw, nil
}
-func loadShard(fn string) (zoekt.Searcher, error) {
- f, err := os.Open(fn)
- if err != nil {
- return nil, err
- }
-
- iFile, err := zoekt.NewIndexFile(f)
- if err != nil {
- return nil, err
- }
- s, err := zoekt.NewSearcher(iFile)
- if err != nil {
- iFile.Close()
- return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
- }
-
- return s, nil
-}
-
func (s *shardWatcher) String() string {
return fmt.Sprintf("shardWatcher(%s)", s.dir)
}
@@ -130,9 +112,15 @@
s.loader.drop(t)
}
+ var wg sync.WaitGroup
for _, t := range toLoad {
- s.loader.load(t)
+ wg.Add(1)
+ go func(k string) {
+ s.loader.load(k)
+ wg.Done()
+ }(t)
}
+ wg.Wait()
return nil
}