Split out watching and loading
Change-Id: Iff886aac9788ae659f57dddbdec4ad5818529fcd
diff --git a/build/e2e_test.go b/build/e2e_test.go
index 1e77dab..866263b 100644
--- a/build/e2e_test.go
+++ b/build/e2e_test.go
@@ -172,7 +172,11 @@
if repos, err = ss.List(ctx, &query.Repo{Pattern: "repo"}); err != nil {
t.Fatalf("List: %v", err)
} else if len(repos.Repos) != 1 {
- t.Errorf("List(repo): got %v, want 1 repo", repos.Repos)
+ var ss []string
+ for _, r := range repos.Repos {
+ ss = append(ss, r.Repository.Name)
+ }
+ t.Errorf("List(repo): got %v, want 1 repo", ss)
}
}
diff --git a/shards/shards.go b/shards/shards.go
index b6d8b2e..7de7618 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -29,27 +29,24 @@
// NewDirectorySearcher returns a searcher instance that loads all
// shards corresponding to a glob into memory.
func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
- ss := shardWatcher{
- dir: dir,
- shards: make(map[string]*searchShard),
- quit: make(chan struct{}, 1),
+ ss := &shardedSearcher{
+ shards: make(map[string]zoekt.Searcher),
throttle: make(chan struct{}, runtime.NumCPU()),
}
-
- if err := ss.scan(); err != nil {
+ _, err := NewDirectoryWatcher(dir, ss)
+ if err != nil {
return nil, err
}
- if err := ss.watch(); err != nil {
- return nil, err
- }
+ return ss, nil
+}
- return &shardedSearcher{&ss}, nil
+func (ss *shardedSearcher) String() string {
+ return "shardedSearcher"
}
// Close closes references to open files. It may be called only once.
-func (ss *shardWatcher) Close() {
- close(ss.quit)
+func (ss *shardedSearcher) Close() {
ss.lock()
defer ss.unlock()
for _, s := range ss.shards {
@@ -57,16 +54,14 @@
}
}
-type shardLoader interface {
- Close()
- getShards() []zoekt.Searcher
- rlock()
- runlock()
- String() string
-}
-
type shardedSearcher struct {
- shardLoader
+ // Limit the number of parallel queries. Since searching is
+ // CPU bound, we can't do better than #CPU queries in
+ // parallel. If we do so, we just create more memory
+ // pressure.
+ throttle chan struct{}
+
+ shards map[string]zoekt.Searcher
}
func (ss *shardedSearcher) Search(ctx context.Context, pat query.Q, opts *zoekt.SearchOptions) (*zoekt.SearchResult, error) {
@@ -83,8 +78,8 @@
// This critical section is large, but we don't want to deal with
// searches on shards that have just been closed.
- ss.shardLoader.rlock()
- defer ss.shardLoader.runlock()
+ ss.rlock()
+ defer ss.runlock()
aggregate.Wait = time.Now().Sub(start)
start = time.Now()
@@ -214,3 +209,66 @@
Crashes: crashes,
}, nil
}
+
+func (s *shardedSearcher) rlock() {
+ s.throttle <- struct{}{}
+}
+
+// getShards returns the currently loaded shards. The shards must be
+// accessed under a rlock call.
+func (s *shardedSearcher) getShards() []zoekt.Searcher {
+ var res []zoekt.Searcher
+ for _, sh := range s.shards {
+ res = append(res, sh)
+ }
+ return res
+}
+
+func (s *shardedSearcher) runlock() {
+ <-s.throttle
+}
+
+func (s *shardedSearcher) lock() {
+ n := cap(s.throttle)
+ for n > 0 {
+ s.throttle <- struct{}{}
+ n--
+ }
+}
+
+func (s *shardedSearcher) unlock() {
+ n := cap(s.throttle)
+ for n > 0 {
+ <-s.throttle
+ n--
+ }
+}
+
+func (s *shardedSearcher) load(key string) {
+ shard, err := loadShard(key)
+ log.Printf("reloading: %s, err %v ", key, err)
+ if err != nil {
+ return
+ }
+
+ s.replace(key, shard)
+}
+
+func (s *shardedSearcher) drop(key string) {
+ s.replace(key, nil)
+}
+
+func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) {
+ s.lock()
+ defer s.unlock()
+ old := s.shards[key]
+ if old != nil {
+ old.Close()
+ }
+
+ if shard == nil {
+ delete(s.shards, key)
+ } else {
+ s.shards[key] = shard
+ }
+}
diff --git a/shards/shards_test.go b/shards/shards_test.go
index 03dd37a..23a3652 100644
--- a/shards/shards_test.go
+++ b/shards/shards_test.go
@@ -60,7 +60,12 @@
out := &bytes.Buffer{}
log.SetOutput(out)
defer log.SetOutput(os.Stderr)
- ss := &shardedSearcher{&testLoader{[]zoekt.Searcher{&crashSearcher{}}}}
+ ss := &shardedSearcher{
+ shards: map[string]zoekt.Searcher{
+ "x": &crashSearcher{},
+ },
+ throttle: make(chan struct{}, 2),
+ }
q := &query.Substring{Pattern: "hoi"}
opts := &zoekt.SearchOptions{}
diff --git a/shards/watcher.go b/shards/watcher.go
index d36c71a..906a0ed 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -16,6 +16,7 @@
import (
"fmt"
+ "io"
"log"
"os"
"path/filepath"
@@ -25,32 +26,47 @@
"github.com/google/zoekt"
)
-type searchShard struct {
- zoekt.Searcher
- mtime time.Time
+type shardLoader interface {
+ load(filename string)
+ drop(filename string)
}
type shardWatcher struct {
- dir string
-
- // Limit the number of parallel queries. Since searching is
- // CPU bound, we can't do better than #CPU queries in
- // parallel. If we do so, we just create more memory
- // pressure.
- throttle chan struct{}
-
- shards map[string]*searchShard
- quit chan struct{}
+ dir string
+ timestamps map[string]time.Time
+ loader shardLoader
+ quit chan struct{}
}
-func loadShard(fn string) (*searchShard, error) {
- f, err := os.Open(fn)
- if err != nil {
+func (sw *shardWatcher) Close() error {
+ if sw.quit != nil {
+ close(sw.quit)
+ sw.quit = nil
+ }
+ return nil
+}
+
+func NewDirectoryWatcher(dir string, loader shardLoader) (io.Closer, error) {
+ sw := &shardWatcher{
+ dir: dir,
+ timestamps: map[string]time.Time{},
+ loader: loader,
+ quit: make(chan struct{}, 1),
+ }
+ if err := sw.scan(); err != nil {
return nil, err
}
- fi, err := f.Stat()
+
+ if err := sw.watch(); err != nil {
+ return nil, err
+ }
+
+ return sw, nil
+}
+
+func loadShard(fn string) (zoekt.Searcher, error) {
+ f, err := os.Open(fn)
if err != nil {
- f.Close()
return nil, err
}
@@ -64,10 +80,7 @@
return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
}
- return &searchShard{
- mtime: fi.ModTime(),
- Searcher: s,
- }, nil
+ return s, nil
}
func (s *shardWatcher) String() string {
@@ -86,97 +99,42 @@
ts := map[string]time.Time{}
for _, fn := range fs {
- key := filepath.Base(fn)
fi, err := os.Lstat(fn)
if err != nil {
continue
}
- ts[key] = fi.ModTime()
+ ts[fn] = fi.ModTime()
}
- s.lock()
var toLoad []string
for k, mtime := range ts {
- if s.shards[k] == nil || s.shards[k].mtime != mtime {
+ if t, ok := s.timestamps[k]; !ok || t != mtime {
toLoad = append(toLoad, k)
+ s.timestamps[k] = mtime
}
}
var toDrop []string
// Unload deleted shards.
- for k := range s.shards {
+ for k := range s.timestamps {
if _, ok := ts[k]; !ok {
toDrop = append(toDrop, k)
}
}
- s.unlock()
for _, t := range toDrop {
log.Printf("unloading: %s", t)
- s.replace(t, nil)
+ s.loader.drop(t)
}
for _, t := range toLoad {
- shard, err := loadShard(filepath.Join(s.dir, t))
- log.Printf("reloading: %s, err %v ", t, err)
- if err != nil {
- continue
- }
- s.replace(t, shard)
+ s.loader.load(t)
}
return nil
}
-func (s *shardWatcher) rlock() {
- s.throttle <- struct{}{}
-}
-
-// getShards returns the currently loaded shards. The shards must be
-// accessed under a rlock call.
-func (s *shardWatcher) getShards() []zoekt.Searcher {
- var res []zoekt.Searcher
- for _, sh := range s.shards {
- res = append(res, sh)
- }
- return res
-}
-
-func (s *shardWatcher) runlock() {
- <-s.throttle
-}
-
-func (s *shardWatcher) lock() {
- n := cap(s.throttle)
- for n > 0 {
- s.throttle <- struct{}{}
- n--
- }
-}
-
-func (s *shardWatcher) unlock() {
- n := cap(s.throttle)
- for n > 0 {
- <-s.throttle
- n--
- }
-}
-
-func (s *shardWatcher) replace(key string, shard *searchShard) {
- s.lock()
- defer s.unlock()
- old := s.shards[key]
- if old != nil {
- old.Close()
- }
- if shard != nil {
- s.shards[key] = shard
- } else {
- delete(s.shards, key)
- }
-}
-
func (s *shardWatcher) watch() error {
watcher, err := fsnotify.NewWatcher()
if err != nil {