shards: throttle loading in watcher
By throttling in the watcher we can prevent spinning up N goroutines. On
large instances when starting up N can be very large. Additionally since
only one "scan" is running at a time, we can move the throttle semaphore
to a local variable.
We also rename throttledLoader to loader since it no longer does
throttling.
Change-Id: Ifece1d85b4d40f5a331369b3a9d7566cf08ce5e8
diff --git a/shards/shards.go b/shards/shards.go
index e361095..98e51db 100644
--- a/shards/shards.go
+++ b/shards/shards.go
@@ -153,9 +153,8 @@
// shards corresponding to a glob into memory.
func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
- tl := &throttledLoader{
- ss: ss,
- throttle: make(chan struct{}, runtime.GOMAXPROCS(0)),
+ tl := &loader{
+ ss: ss,
}
_, err := NewDirectoryWatcher(dir, tl)
if err != nil {
@@ -165,16 +164,12 @@
return ss, nil
}
-// throttledLoader tries to load up to throttle shards in parallel.
-type throttledLoader struct {
- ss *shardedSearcher
- throttle chan struct{}
+type loader struct {
+ ss *shardedSearcher
}
-func (tl *throttledLoader) load(key string) {
- tl.throttle <- struct{}{}
+func (tl *loader) load(key string) {
shard, err := loadShard(key)
- <-tl.throttle
if err != nil {
metricShardsLoadFailedTotal.Inc()
log.Printf("reloading: %s, err %v ", key, err)
@@ -185,7 +180,7 @@
tl.ss.replace(key, shard)
}
-func (tl *throttledLoader) drop(key string) {
+func (tl *loader) drop(key string) {
tl.ss.replace(key, nil)
}
diff --git a/shards/watcher.go b/shards/watcher.go
index 7cc7094..4bdb8c1 100644
--- a/shards/watcher.go
+++ b/shards/watcher.go
@@ -20,7 +20,7 @@
"log"
"os"
"path/filepath"
- "sync"
+ "runtime"
"time"
"github.com/fsnotify/fsnotify"
@@ -112,15 +112,18 @@
s.loader.drop(t)
}
- var wg sync.WaitGroup
+ // Limit amount of concurrent shard loads.
+ throttle := make(chan struct{}, runtime.GOMAXPROCS(0))
for _, t := range toLoad {
- wg.Add(1)
+ throttle <- struct{}{}
go func(k string) {
s.loader.load(k)
- wg.Done()
+ <-throttle
}(t)
}
- wg.Wait()
+ for i := 0; i < cap(throttle); i++ {
+ throttle <- struct{}{}
+ }
return nil
}