Split out watching and loading

Change-Id: Iff886aac9788ae659f57dddbdec4ad5818529fcd
diff --git a/build/e2e_test.go b/build/e2e_test.go
index 1e77dab..866263b 100644
--- a/build/e2e_test.go
+++ b/build/e2e_test.go
@@ -172,7 +172,11 @@
 	if repos, err = ss.List(ctx, &query.Repo{Pattern: "repo"}); err != nil {
 		t.Fatalf("List: %v", err)
 	} else if len(repos.Repos) != 1 {
-		t.Errorf("List(repo): got %v, want 1 repo", repos.Repos)
+		var ss []string
+		for _, r := range repos.Repos {
+			ss = append(ss, r.Repository.Name)
+		}
+		t.Errorf("List(repo): got %v, want 1 repo", ss)
 	}
 }
 
diff --git a/shards/shards.go b/shards/shards.go
index b6d8b2e..7de7618 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -29,27 +29,24 @@
 // NewDirectorySearcher returns a searcher instance that loads all
 // shards corresponding to a glob into memory.
 func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
-	ss := shardWatcher{
-		dir:      dir,
-		shards:   make(map[string]*searchShard),
-		quit:     make(chan struct{}, 1),
+	ss := &shardedSearcher{
+		shards:   make(map[string]zoekt.Searcher),
 		throttle: make(chan struct{}, runtime.NumCPU()),
 	}
-
-	if err := ss.scan(); err != nil {
+	_, err := NewDirectoryWatcher(dir, ss)
+	if err != nil {
 		return nil, err
 	}
 
-	if err := ss.watch(); err != nil {
-		return nil, err
-	}
+	return ss, nil
+}
 
-	return &shardedSearcher{&ss}, nil
+func (ss *shardedSearcher) String() string {
+	return "shardedSearcher"
 }
 
 // Close closes references to open files. It may be called only once.
-func (ss *shardWatcher) Close() {
-	close(ss.quit)
+func (ss *shardedSearcher) Close() {
 	ss.lock()
 	defer ss.unlock()
 	for _, s := range ss.shards {
@@ -57,16 +54,14 @@
 	}
 }
 
-type shardLoader interface {
-	Close()
-	getShards() []zoekt.Searcher
-	rlock()
-	runlock()
-	String() string
-}
-
 type shardedSearcher struct {
-	shardLoader
+	// Limit the number of parallel queries. Since searching is
+	// CPU bound, we can't do better than #CPU queries in
+	// parallel.  If we do so, we just create more memory
+	// pressure.
+	throttle chan struct{}
+
+	shards map[string]zoekt.Searcher
 }
 
 func (ss *shardedSearcher) Search(ctx context.Context, pat query.Q, opts *zoekt.SearchOptions) (*zoekt.SearchResult, error) {
@@ -83,8 +78,8 @@
 
 	// This critical section is large, but we don't want to deal with
 	// searches on shards that have just been closed.
-	ss.shardLoader.rlock()
-	defer ss.shardLoader.runlock()
+	ss.rlock()
+	defer ss.runlock()
 	aggregate.Wait = time.Now().Sub(start)
 	start = time.Now()
 
@@ -214,3 +209,66 @@
 		Crashes: crashes,
 	}, nil
 }
+
+func (s *shardedSearcher) rlock() {
+	s.throttle <- struct{}{}
+}
+
+// getShards returns the currently loaded shards. The shards must be
+// accessed under a rlock call.
+func (s *shardedSearcher) getShards() []zoekt.Searcher {
+	var res []zoekt.Searcher
+	for _, sh := range s.shards {
+		res = append(res, sh)
+	}
+	return res
+}
+
+func (s *shardedSearcher) runlock() {
+	<-s.throttle
+}
+
+func (s *shardedSearcher) lock() {
+	n := cap(s.throttle)
+	for n > 0 {
+		s.throttle <- struct{}{}
+		n--
+	}
+}
+
+func (s *shardedSearcher) unlock() {
+	n := cap(s.throttle)
+	for n > 0 {
+		<-s.throttle
+		n--
+	}
+}
+
+func (s *shardedSearcher) load(key string) {
+	shard, err := loadShard(key)
+	log.Printf("reloading: %s, err %v ", key, err)
+	if err != nil {
+		return
+	}
+
+	s.replace(key, shard)
+}
+
+func (s *shardedSearcher) drop(key string) {
+	s.replace(key, nil)
+}
+
+func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) {
+	s.lock()
+	defer s.unlock()
+	old := s.shards[key]
+	if old != nil {
+		old.Close()
+	}
+
+	if shard == nil {
+		delete(s.shards, key)
+	} else {
+		s.shards[key] = shard
+	}
+}
diff --git a/shards/shards_test.go b/shards/shards_test.go
index 03dd37a..23a3652 100644
--- a/shards/shards_test.go
+++ b/shards/shards_test.go
@@ -60,7 +60,12 @@
 	out := &bytes.Buffer{}
 	log.SetOutput(out)
 	defer log.SetOutput(os.Stderr)
-	ss := &shardedSearcher{&testLoader{[]zoekt.Searcher{&crashSearcher{}}}}
+	ss := &shardedSearcher{
+		shards: map[string]zoekt.Searcher{
+			"x": &crashSearcher{},
+		},
+		throttle: make(chan struct{}, 2),
+	}
 
 	q := &query.Substring{Pattern: "hoi"}
 	opts := &zoekt.SearchOptions{}
diff --git a/shards/watcher.go b/shards/watcher.go
index d36c71a..906a0ed 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -16,6 +16,7 @@
 
 import (
 	"fmt"
+	"io"
 	"log"
 	"os"
 	"path/filepath"
@@ -25,32 +26,47 @@
 	"github.com/google/zoekt"
 )
 
-type searchShard struct {
-	zoekt.Searcher
-	mtime time.Time
+type shardLoader interface {
+	load(filename string)
+	drop(filename string)
 }
 
 type shardWatcher struct {
-	dir string
-
-	// Limit the number of parallel queries. Since searching is
-	// CPU bound, we can't do better than #CPU queries in
-	// parallel.  If we do so, we just create more memory
-	// pressure.
-	throttle chan struct{}
-
-	shards map[string]*searchShard
-	quit   chan struct{}
+	dir        string
+	timestamps map[string]time.Time
+	loader     shardLoader
+	quit       chan struct{}
 }
 
-func loadShard(fn string) (*searchShard, error) {
-	f, err := os.Open(fn)
-	if err != nil {
+func (sw *shardWatcher) Close() error {
+	if sw.quit != nil {
+		close(sw.quit)
+		sw.quit = nil
+	}
+	return nil
+}
+
+func NewDirectoryWatcher(dir string, loader shardLoader) (io.Closer, error) {
+	sw := &shardWatcher{
+		dir:        dir,
+		timestamps: map[string]time.Time{},
+		loader:     loader,
+		quit:       make(chan struct{}, 1),
+	}
+	if err := sw.scan(); err != nil {
 		return nil, err
 	}
-	fi, err := f.Stat()
+
+	if err := sw.watch(); err != nil {
+		return nil, err
+	}
+
+	return sw, nil
+}
+
+func loadShard(fn string) (zoekt.Searcher, error) {
+	f, err := os.Open(fn)
 	if err != nil {
-		f.Close()
 		return nil, err
 	}
 
@@ -64,10 +80,7 @@
 		return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
 	}
 
-	return &searchShard{
-		mtime:    fi.ModTime(),
-		Searcher: s,
-	}, nil
+	return s, nil
 }
 
 func (s *shardWatcher) String() string {
@@ -86,97 +99,42 @@
 
 	ts := map[string]time.Time{}
 	for _, fn := range fs {
-		key := filepath.Base(fn)
 		fi, err := os.Lstat(fn)
 		if err != nil {
 			continue
 		}
 
-		ts[key] = fi.ModTime()
+		ts[fn] = fi.ModTime()
 	}
 
-	s.lock()
 	var toLoad []string
 	for k, mtime := range ts {
-		if s.shards[k] == nil || s.shards[k].mtime != mtime {
+		if t, ok := s.timestamps[k]; !ok || t != mtime {
 			toLoad = append(toLoad, k)
+			s.timestamps[k] = mtime
 		}
 	}
 
 	var toDrop []string
 	// Unload deleted shards.
-	for k := range s.shards {
+	for k := range s.timestamps {
 		if _, ok := ts[k]; !ok {
 			toDrop = append(toDrop, k)
 		}
 	}
-	s.unlock()
 
 	for _, t := range toDrop {
 		log.Printf("unloading: %s", t)
-		s.replace(t, nil)
+		s.loader.drop(t)
 	}
 
 	for _, t := range toLoad {
-		shard, err := loadShard(filepath.Join(s.dir, t))
-		log.Printf("reloading: %s, err %v ", t, err)
-		if err != nil {
-			continue
-		}
-		s.replace(t, shard)
+		s.loader.load(t)
 	}
 
 	return nil
 }
 
-func (s *shardWatcher) rlock() {
-	s.throttle <- struct{}{}
-}
-
-// getShards returns the currently loaded shards. The shards must be
-// accessed under a rlock call.
-func (s *shardWatcher) getShards() []zoekt.Searcher {
-	var res []zoekt.Searcher
-	for _, sh := range s.shards {
-		res = append(res, sh)
-	}
-	return res
-}
-
-func (s *shardWatcher) runlock() {
-	<-s.throttle
-}
-
-func (s *shardWatcher) lock() {
-	n := cap(s.throttle)
-	for n > 0 {
-		s.throttle <- struct{}{}
-		n--
-	}
-}
-
-func (s *shardWatcher) unlock() {
-	n := cap(s.throttle)
-	for n > 0 {
-		<-s.throttle
-		n--
-	}
-}
-
-func (s *shardWatcher) replace(key string, shard *searchShard) {
-	s.lock()
-	defer s.unlock()
-	old := s.shards[key]
-	if old != nil {
-		old.Close()
-	}
-	if shard != nil {
-		s.shards[key] = shard
-	} else {
-		delete(s.shards, key)
-	}
-}
-
 func (s *shardWatcher) watch() error {
 	watcher, err := fsnotify.NewWatcher()
 	if err != nil {