shards: load up to NumCPU() shards in parallel

Second try now that #23 is fixed.

Fixes #35

Change-Id: Icc8669ec559f7b1e9ae52556aea183af994afe61
diff --git a/shards/shards.go b/shards/shards.go
index 289fe31..1eba4f9 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -16,7 +16,9 @@
 
 import (
 	"context"
+	"fmt"
 	"log"
+	"os"
 	"runtime"
 	"runtime/debug"
 	"sort"
@@ -52,7 +54,11 @@
 // shards corresponding to a glob into memory.
 func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
 	ss := newShardedSearcher(int64(runtime.NumCPU()))
-	_, err := NewDirectoryWatcher(dir, ss)
+	tl := &throttledLoader{
+		ss:       ss,
+		throttle: make(chan struct{}, runtime.NumCPU()),
+	}
+	_, err := NewDirectoryWatcher(dir, tl)
 	if err != nil {
 		return nil, err
 	}
@@ -60,6 +66,28 @@
 	return ss, nil
 }
 
+// throttledLoader tries to load up to throttle shards in parallel.
+type throttledLoader struct {
+	ss       *shardedSearcher
+	throttle chan struct{}
+}
+
+func (tl *throttledLoader) load(key string) {
+	tl.throttle <- struct{}{}
+	shard, err := loadShard(key)
+	<-tl.throttle
+	log.Printf("reloading: %s, err %v ", key, err)
+	if err != nil {
+		return
+	}
+
+	tl.ss.replace(key, shard)
+}
+
+func (tl *throttledLoader) drop(key string) {
+	tl.ss.replace(key, nil)
+}
+
 func (ss *shardedSearcher) String() string {
 	return "shardedSearcher"
 }
@@ -249,20 +277,6 @@
 	s.throttle.Release(s.capacity)
 }
 
-func (s *shardedSearcher) load(key string) {
-	shard, err := loadShard(key)
-	log.Printf("reloading: %s, err %v ", key, err)
-	if err != nil {
-		return
-	}
-
-	s.replace(key, shard)
-}
-
-func (s *shardedSearcher) drop(key string) {
-	s.replace(key, nil)
-}
-
 func (s *shardedSearcher) replace(key string, shard zoekt.Searcher) {
 	s.lock(context.Background())
 	defer s.unlock()
@@ -277,3 +291,22 @@
 		s.shards[key] = shard
 	}
 }
+
+func loadShard(fn string) (zoekt.Searcher, error) {
+	f, err := os.Open(fn)
+	if err != nil {
+		return nil, err
+	}
+
+	iFile, err := zoekt.NewIndexFile(f)
+	if err != nil {
+		return nil, err
+	}
+	s, err := zoekt.NewSearcher(iFile)
+	if err != nil {
+		iFile.Close()
+		return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
+	}
+
+	return s, nil
+}
diff --git a/shards/watcher.go b/shards/watcher.go
index 10b5fea..766f851 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -20,13 +20,14 @@
 	"log"
 	"os"
 	"path/filepath"
+	"sync"
 	"time"
 
 	"github.com/fsnotify/fsnotify"
-	"github.com/google/zoekt"
 )
 
 type shardLoader interface {
+	// Load a new file. Should be safe for concurrent calls.
 	load(filename string)
 	drop(filename string)
 }
@@ -65,25 +66,6 @@
 	return sw, nil
 }
 
-func loadShard(fn string) (zoekt.Searcher, error) {
-	f, err := os.Open(fn)
-	if err != nil {
-		return nil, err
-	}
-
-	iFile, err := zoekt.NewIndexFile(f)
-	if err != nil {
-		return nil, err
-	}
-	s, err := zoekt.NewSearcher(iFile)
-	if err != nil {
-		iFile.Close()
-		return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
-	}
-
-	return s, nil
-}
-
 func (s *shardWatcher) String() string {
 	return fmt.Sprintf("shardWatcher(%s)", s.dir)
 }
@@ -130,9 +112,15 @@
 		s.loader.drop(t)
 	}
 
+	var wg sync.WaitGroup
 	for _, t := range toLoad {
-		s.loader.load(t)
+		wg.Add(1)
+		go func(k string) {
+			s.loader.load(k)
+			wg.Done()
+		}(t)
 	}
+	wg.Wait()
 
 	return nil
 }