shards: wait for watcher to stop when closing

Previously we never shutdown the shardWatcher. This meant that when
closing shardedSearcher, the shardWatcher would still interact with it
if it noticed shards changing. This can happen in practice but is
rare. It can more easily be triggered by running e2e tests which cleanup
the index files.

Change-Id: I372e9f1723485ae092d82f0502deb64b4b3c976f
diff --git a/shards/shards.go b/shards/shards.go
index 4eceb32..df86efe 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -156,12 +156,28 @@
 	tl := &loader{
 		ss: ss,
 	}
-	_, err := NewDirectoryWatcher(dir, tl)
+	dw, err := NewDirectoryWatcher(dir, tl)
 	if err != nil {
 		return nil, err
 	}
 
-	return ss, nil
+	return &directorySearcher{
+		Searcher:         ss,
+		directoryWatcher: dw,
+	}, nil
+}
+
+type directorySearcher struct {
+	zoekt.Searcher
+
+	directoryWatcher Stopper
+}
+
+func (s *directorySearcher) Close() {
+	// We need to Stop directoryWatcher first since it calls load/unload on
+	// Searcher.
+	s.directoryWatcher.Stop()
+	s.Searcher.Close()
 }
 
 type loader struct {
diff --git a/shards/watcher.go b/shards/watcher.go
index 4a7dc72..4898586 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -16,11 +16,11 @@
 
 import (
 	"fmt"
-	"io"
 	"log"
 	"os"
 	"path/filepath"
 	"runtime"
+	"sync"
 	"time"
 
 	"github.com/fsnotify/fsnotify"
@@ -36,30 +36,38 @@
 	dir        string
 	timestamps map[string]time.Time
 	loader     shardLoader
-	quit       chan<- struct{}
+
+	closeOnce sync.Once
+	// quit is closed by Close to signal the directory watcher to stop.
+	quit chan struct{}
+	// stopped is closed once the directory watcher has stopped.
+	stopped chan struct{}
 }
 
-func (sw *shardWatcher) Close() error {
-	if sw.quit != nil {
+func (sw *shardWatcher) Stop() {
+	sw.closeOnce.Do(func() {
 		close(sw.quit)
-		sw.quit = nil
-	}
-	return nil
+		<-sw.stopped
+	})
 }
 
-func NewDirectoryWatcher(dir string, loader shardLoader) (io.Closer, error) {
-	quitter := make(chan struct{}, 1)
+type Stopper interface {
+	Stop()
+}
+
+func NewDirectoryWatcher(dir string, loader shardLoader) (Stopper, error) {
 	sw := &shardWatcher{
 		dir:        dir,
 		timestamps: map[string]time.Time{},
 		loader:     loader,
-		quit:       quitter,
+		quit:       make(chan struct{}),
+		stopped:    make(chan struct{}),
 	}
 	if err := sw.scan(); err != nil {
 		return nil, err
 	}
 
-	if err := sw.watch(quitter); err != nil {
+	if err := sw.watch(); err != nil {
 		return nil, err
 	}
 
@@ -144,7 +152,7 @@
 	return nil
 }
 
-func (s *shardWatcher) watch(quitter <-chan struct{}) error {
+func (s *shardWatcher) watch() error {
 	watcher, err := fsnotify.NewWatcher()
 	if err != nil {
 		return err
@@ -171,7 +179,7 @@
 				if err != nil && err != fsnotify.ErrEventOverflow {
 					log.Println("watcher error:", err)
 				}
-			case <-quitter:
+			case <-s.quit:
 				watcher.Close()
 				close(signal)
 				return
@@ -180,6 +188,7 @@
 	}()
 
 	go func() {
+		defer close(s.stopped)
 		for range signal {
 			s.scan()
 		}
diff --git a/shards/watcher_test.go b/shards/watcher_test.go
index 0678c9b..4db7275 100644
--- a/shards/watcher_test.go
+++ b/shards/watcher_test.go
@@ -65,7 +65,7 @@
 	if err != nil {
 		t.Fatalf("NewDirectoryWatcher: %v", err)
 	}
-	defer dw.Close()
+	defer dw.Stop()
 
 	if got := <-logger.loads; got != shard {
 		t.Fatalf("got load event %v, want %v", got, shard)
@@ -95,7 +95,7 @@
 		t.Fatalf("WriteFile: %v", err)
 	}
 
-	dw.Close()
+	dw.Stop()
 
 	select {
 	case k := <-logger.loads: