Split off shard watching into separate source file.

Change-Id: Ic164eba52dcb64c472016868049a5d4c677e1138
diff --git a/shards/shards.go b/shards/shards.go
index 91c46e4..2abaf57 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -15,10 +15,7 @@
 package shards
 
 import (
-	"fmt"
 	"log"
-	"os"
-	"path/filepath"
 	"runtime"
 	"runtime/debug"
 	"sort"
@@ -26,189 +23,10 @@
 
 	"golang.org/x/net/context"
 
-	"github.com/fsnotify/fsnotify"
 	"github.com/google/zoekt"
 	"github.com/google/zoekt/query"
 )
 
-type searchShard struct {
-	zoekt.Searcher
-	mtime time.Time
-}
-
-type shardWatcher struct {
-	dir string
-
-	// Limit the number of parallel queries. Since searching is
-	// CPU bound, we can't do better than #CPU queries in
-	// parallel.  If we do so, we just create more memory
-	// pressure.
-	throttle chan struct{}
-
-	shards map[string]*searchShard
-	quit   chan struct{}
-}
-
-func loadShard(fn string) (*searchShard, error) {
-	f, err := os.Open(fn)
-	if err != nil {
-		return nil, err
-	}
-	fi, err := f.Stat()
-	if err != nil {
-		return nil, err
-	}
-
-	iFile, err := zoekt.NewIndexFile(f)
-	if err != nil {
-		return nil, err
-	}
-	s, err := zoekt.NewSearcher(iFile)
-	if err != nil {
-		iFile.Close()
-		return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
-	}
-
-	return &searchShard{
-		mtime:    fi.ModTime(),
-		Searcher: s,
-	}, nil
-}
-
-func (s *shardWatcher) String() string {
-	return fmt.Sprintf("shardWatcher(%s)", s.dir)
-}
-
-func (s *shardWatcher) scan() error {
-	fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt"))
-	if err != nil {
-		return err
-	}
-
-	if len(fs) == 0 {
-		return fmt.Errorf("directory %s is empty", s.dir)
-	}
-
-	ts := map[string]time.Time{}
-	for _, fn := range fs {
-		key := filepath.Base(fn)
-		fi, err := os.Lstat(fn)
-		if err != nil {
-			continue
-		}
-
-		ts[key] = fi.ModTime()
-	}
-
-	s.lock()
-	var toLoad []string
-	for k, mtime := range ts {
-		if s.shards[k] == nil || s.shards[k].mtime != mtime {
-			toLoad = append(toLoad, k)
-		}
-	}
-
-	var toDrop []string
-	// Unload deleted shards.
-	for k := range s.shards {
-		if _, ok := ts[k]; !ok {
-			toDrop = append(toDrop, k)
-		}
-	}
-	s.unlock()
-
-	for _, t := range toDrop {
-		log.Printf("unloading: %s", t)
-		s.replace(t, nil)
-	}
-
-	for _, t := range toLoad {
-		shard, err := loadShard(filepath.Join(s.dir, t))
-		log.Printf("reloading: %s, err %v ", t, err)
-		if err != nil {
-			continue
-		}
-		s.replace(t, shard)
-	}
-
-	return nil
-}
-
-func (s *shardWatcher) rlock() {
-	s.throttle <- struct{}{}
-}
-
-// getShards returns the currently loaded shards. The shards must be
-// accessed under a rlock call.
-func (s *shardWatcher) getShards() []zoekt.Searcher {
-	var res []zoekt.Searcher
-	for _, sh := range s.shards {
-		res = append(res, sh)
-	}
-	return res
-}
-
-func (s *shardWatcher) runlock() {
-	<-s.throttle
-}
-
-func (s *shardWatcher) lock() {
-	n := cap(s.throttle)
-	for n > 0 {
-		s.throttle <- struct{}{}
-		n--
-	}
-}
-
-func (s *shardWatcher) unlock() {
-	n := cap(s.throttle)
-	for n > 0 {
-		<-s.throttle
-		n--
-	}
-}
-
-func (s *shardWatcher) replace(key string, shard *searchShard) {
-	s.lock()
-	defer s.unlock()
-	old := s.shards[key]
-	if old != nil {
-		old.Close()
-	}
-	if shard != nil {
-		s.shards[key] = shard
-	} else {
-		delete(s.shards, key)
-	}
-}
-
-func (s *shardWatcher) watch() error {
-	watcher, err := fsnotify.NewWatcher()
-	if err != nil {
-		return err
-	}
-	if err := watcher.Add(s.dir); err != nil {
-		return err
-	}
-
-	go func() {
-		for {
-			select {
-			case <-watcher.Events:
-				s.scan()
-			case err := <-watcher.Errors:
-				if err != nil {
-					log.Println("watcher error:", err)
-				}
-			case <-s.quit:
-				watcher.Close()
-				return
-			}
-		}
-	}()
-	return nil
-}
-
 // NewShardedSearcher returns a searcher instance that loads all
 // shards corresponding to a glob into memory.
 func NewShardedSearcher(dir string) (zoekt.Searcher, error) {
diff --git a/shards/watcher.go b/shards/watcher.go
new file mode 100644
index 0000000..8c71b76
--- /dev/null
+++ b/shards/watcher.go
@@ -0,0 +1,204 @@
+// Copyright 2017 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package shards
+
+import (
+	"fmt"
+	"log"
+	"os"
+	"path/filepath"
+	"time"
+
+	"github.com/fsnotify/fsnotify"
+	"github.com/google/zoekt"
+)
+
+type searchShard struct {
+	zoekt.Searcher
+	mtime time.Time
+}
+
+type shardWatcher struct {
+	dir string
+
+	// Limit the number of parallel queries. Since searching is
+	// CPU bound, we can't do better than #CPU queries in
+	// parallel.  If we do so, we just create more memory
+	// pressure.
+	throttle chan struct{}
+
+	shards map[string]*searchShard
+	quit   chan struct{}
+}
+
+func loadShard(fn string) (*searchShard, error) {
+	f, err := os.Open(fn)
+	if err != nil {
+		return nil, err
+	}
+	fi, err := f.Stat()
+	if err != nil {
+		return nil, err
+	}
+
+	iFile, err := zoekt.NewIndexFile(f)
+	if err != nil {
+		return nil, err
+	}
+	s, err := zoekt.NewSearcher(iFile)
+	if err != nil {
+		iFile.Close()
+		return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
+	}
+
+	return &searchShard{
+		mtime:    fi.ModTime(),
+		Searcher: s,
+	}, nil
+}
+
+func (s *shardWatcher) String() string {
+	return fmt.Sprintf("shardWatcher(%s)", s.dir)
+}
+
+func (s *shardWatcher) scan() error {
+	fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt"))
+	if err != nil {
+		return err
+	}
+
+	if len(fs) == 0 {
+		return fmt.Errorf("directory %s is empty", s.dir)
+	}
+
+	ts := map[string]time.Time{}
+	for _, fn := range fs {
+		key := filepath.Base(fn)
+		fi, err := os.Lstat(fn)
+		if err != nil {
+			continue
+		}
+
+		ts[key] = fi.ModTime()
+	}
+
+	s.lock()
+	var toLoad []string
+	for k, mtime := range ts {
+		if s.shards[k] == nil || s.shards[k].mtime != mtime {
+			toLoad = append(toLoad, k)
+		}
+	}
+
+	var toDrop []string
+	// Unload deleted shards.
+	for k := range s.shards {
+		if _, ok := ts[k]; !ok {
+			toDrop = append(toDrop, k)
+		}
+	}
+	s.unlock()
+
+	for _, t := range toDrop {
+		log.Printf("unloading: %s", t)
+		s.replace(t, nil)
+	}
+
+	for _, t := range toLoad {
+		shard, err := loadShard(filepath.Join(s.dir, t))
+		log.Printf("reloading: %s, err %v ", t, err)
+		if err != nil {
+			continue
+		}
+		s.replace(t, shard)
+	}
+
+	return nil
+}
+
+func (s *shardWatcher) rlock() {
+	s.throttle <- struct{}{}
+}
+
+// getShards returns the currently loaded shards. The shards must be
+// accessed under a rlock call.
+func (s *shardWatcher) getShards() []zoekt.Searcher {
+	var res []zoekt.Searcher
+	for _, sh := range s.shards {
+		res = append(res, sh)
+	}
+	return res
+}
+
+func (s *shardWatcher) runlock() {
+	<-s.throttle
+}
+
+func (s *shardWatcher) lock() {
+	n := cap(s.throttle)
+	for n > 0 {
+		s.throttle <- struct{}{}
+		n--
+	}
+}
+
+func (s *shardWatcher) unlock() {
+	n := cap(s.throttle)
+	for n > 0 {
+		<-s.throttle
+		n--
+	}
+}
+
+func (s *shardWatcher) replace(key string, shard *searchShard) {
+	s.lock()
+	defer s.unlock()
+	old := s.shards[key]
+	if old != nil {
+		old.Close()
+	}
+	if shard != nil {
+		s.shards[key] = shard
+	} else {
+		delete(s.shards, key)
+	}
+}
+
+func (s *shardWatcher) watch() error {
+	watcher, err := fsnotify.NewWatcher()
+	if err != nil {
+		return err
+	}
+	if err := watcher.Add(s.dir); err != nil {
+		return err
+	}
+
+	go func() {
+		for {
+			select {
+			case <-watcher.Events:
+				s.scan()
+			case err := <-watcher.Errors:
+				if err != nil {
+					log.Println("watcher error:", err)
+				}
+			case <-s.quit:
+				watcher.Close()
+				return
+			}
+		}
+	}()
+	return nil
+}