|  | // Copyright 2016 Google Inc. All rights reserved. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | // This program manages a zoekt indexing deployment: | 
|  | // * recycling logs | 
|  | // * periodically fetching new data. | 
|  | // * periodically reindexing all git repos. | 
|  |  | 
|  | package main | 
|  |  | 
|  | import ( | 
|  | "bytes" | 
|  | "context" | 
|  | "flag" | 
|  | "fmt" | 
|  | "log" | 
|  | "math" | 
|  | "os" | 
|  | "os/exec" | 
|  | "path/filepath" | 
|  | "runtime" | 
|  | "strings" | 
|  | "time" | 
|  |  | 
|  | "github.com/google/zoekt" | 
|  | "github.com/google/zoekt/gitindex" | 
|  | ) | 
|  |  | 
|  | const day = time.Hour * 24 | 
|  |  | 
|  | func loggedRun(cmd *exec.Cmd) (out, err []byte) { | 
|  | outBuf := &bytes.Buffer{} | 
|  | errBuf := &bytes.Buffer{} | 
|  | cmd.Stdout = outBuf | 
|  | cmd.Stderr = errBuf | 
|  |  | 
|  | log.Printf("run %v", cmd.Args) | 
|  | if err := cmd.Run(); err != nil { | 
|  | log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", | 
|  | cmd.Args, err, outBuf.String(), errBuf.String()) | 
|  | } | 
|  |  | 
|  | return outBuf.Bytes(), errBuf.Bytes() | 
|  | } | 
|  |  | 
|  | type Options struct { | 
|  | cpuFraction      float64 | 
|  | cpuCount         int | 
|  | fetchInterval    time.Duration | 
|  | mirrorInterval   time.Duration | 
|  | indexFlagsStr    string | 
|  | indexFlags       []string | 
|  | mirrorConfigFile string | 
|  | maxLogAge        time.Duration | 
|  | indexTimeout     time.Duration | 
|  | } | 
|  |  | 
|  | func (o *Options) validate() { | 
|  | if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 { | 
|  | log.Fatal("cpu_fraction must be between 0.0 and 1.0") | 
|  | } | 
|  |  | 
|  | o.cpuCount = int(math.Trunc(float64(runtime.GOMAXPROCS(0)) * o.cpuFraction)) | 
|  | if o.cpuCount < 1 { | 
|  | o.cpuCount = 1 | 
|  | } | 
|  | if o.indexFlagsStr != "" { | 
|  | o.indexFlags = strings.Split(o.indexFlagsStr, " ") | 
|  | } | 
|  | } | 
|  |  | 
|  | func (o *Options) defineFlags() { | 
|  | flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time") | 
|  | flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time") | 
|  | flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often") | 
|  | flag.StringVar(&o.mirrorConfigFile, "mirror_config", | 
|  | "", "JSON file holding mirror configuration.") | 
|  |  | 
|  | flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.") | 
|  | flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25, | 
|  | "use this fraction of the cores for indexing.") | 
|  | flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'") | 
|  | } | 
|  |  | 
|  | // periodicFetch runs git-fetch every once in a while. Results are | 
|  | // posted on pendingRepos. | 
|  | func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) { | 
|  | t := time.NewTicker(opts.fetchInterval) | 
|  | for { | 
|  | repos, err := gitindex.FindGitRepos(repoDir) | 
|  | if err != nil { | 
|  | log.Println(err) | 
|  | continue | 
|  | } | 
|  | if len(repos) == 0 { | 
|  | log.Printf("no repos found under %s", repoDir) | 
|  | } | 
|  |  | 
|  | // TODO: Randomize to make sure quota throttling hits everyone. | 
|  |  | 
|  | later := map[string]struct{}{} | 
|  | for _, dir := range repos { | 
|  | if ok := fetchGitRepo(dir); !ok { | 
|  | later[dir] = struct{}{} | 
|  | } else { | 
|  | pendingRepos <- dir | 
|  | } | 
|  | } | 
|  |  | 
|  | for r := range later { | 
|  | pendingRepos <- r | 
|  | } | 
|  |  | 
|  | <-t.C | 
|  | } | 
|  | } | 
|  |  | 
|  | // fetchGitRepo runs git-fetch, and returns true if there was an | 
|  | // update. | 
|  | func fetchGitRepo(dir string) bool { | 
|  | cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin") | 
|  | outBuf := &bytes.Buffer{} | 
|  | errBuf := &bytes.Buffer{} | 
|  |  | 
|  | // Prevent prompting | 
|  | cmd.Stdin = &bytes.Buffer{} | 
|  | cmd.Stderr = errBuf | 
|  | cmd.Stdout = outBuf | 
|  | if err := cmd.Run(); err != nil { | 
|  | log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", | 
|  | cmd.Args, err, outBuf.String(), errBuf.String()) | 
|  | } else { | 
|  | return len(outBuf.Bytes()) != 0 | 
|  | } | 
|  | return false | 
|  | } | 
|  |  | 
|  | // indexPendingRepos consumes the directories on the repos channel and | 
|  | // indexes them, sequentially. | 
|  | func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) { | 
|  | for dir := range repos { | 
|  | indexPendingRepo(dir, indexDir, repoDir, opts) | 
|  | } | 
|  | } | 
|  |  | 
|  | func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) { | 
|  | ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout) | 
|  | defer cancel() | 
|  | args := []string{ | 
|  | "-require_ctags", | 
|  | fmt.Sprintf("-parallelism=%d", opts.cpuCount), | 
|  | "-repo_cache", repoDir, | 
|  | "-index", indexDir, | 
|  | "-incremental", | 
|  | } | 
|  | args = append(args, opts.indexFlags...) | 
|  | args = append(args, dir) | 
|  | cmd := exec.CommandContext(ctx, "zoekt-git-index", args...) | 
|  | loggedRun(cmd) | 
|  |  | 
|  | } | 
|  |  | 
|  | // deleteLogs deletes old logs. | 
|  | func deleteLogs(logDir string, maxAge time.Duration) { | 
|  | fs, err := filepath.Glob(filepath.Join(logDir, "*")) | 
|  | if err != nil { | 
|  | log.Fatalf("filepath.Glob(%s): %v", logDir, err) | 
|  | } | 
|  |  | 
|  | threshold := time.Now().Add(-maxAge) | 
|  | for _, fn := range fs { | 
|  | if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) { | 
|  | os.Remove(fn) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | func deleteLogsLoop(logDir string, maxAge time.Duration) { | 
|  | tick := time.NewTicker(maxAge / 100) | 
|  | for { | 
|  | deleteLogs(logDir, maxAge) | 
|  | <-tick.C | 
|  | } | 
|  | } | 
|  |  | 
|  | // Delete the shard if its corresponding git repo can't be found. | 
|  | func deleteIfOrphan(repoDir string, fn string) error { | 
|  | f, err := os.Open(fn) | 
|  | if err != nil { | 
|  | return nil | 
|  | } | 
|  | defer f.Close() | 
|  |  | 
|  | ifile, err := zoekt.NewIndexFile(f) | 
|  | if err != nil { | 
|  | return nil | 
|  | } | 
|  | defer ifile.Close() | 
|  |  | 
|  | repo, _, err := zoekt.ReadMetadata(ifile) | 
|  | if err != nil { | 
|  | return nil | 
|  | } | 
|  |  | 
|  | _, err = os.Stat(repo.Source) | 
|  | if os.IsNotExist(err) { | 
|  | log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source) | 
|  | return os.Remove(fn) | 
|  | } | 
|  |  | 
|  | return err | 
|  | } | 
|  |  | 
|  | func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) { | 
|  | t := time.NewTicker(watchInterval) | 
|  |  | 
|  | expr := indexDir + "/*" | 
|  | for { | 
|  | fs, err := filepath.Glob(expr) | 
|  | if err != nil { | 
|  | log.Printf("Glob(%q): %v", expr, err) | 
|  | } | 
|  |  | 
|  | for _, f := range fs { | 
|  | if err := deleteIfOrphan(repoDir, f); err != nil { | 
|  | log.Printf("deleteIfOrphan(%q): %v", f, err) | 
|  | } | 
|  | } | 
|  | <-t.C | 
|  | } | 
|  | } | 
|  |  | 
|  | func main() { | 
|  | var opts Options | 
|  | opts.defineFlags() | 
|  | dataDir := flag.String("data_dir", | 
|  | filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.") | 
|  | indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/") | 
|  | flag.Parse() | 
|  | opts.validate() | 
|  |  | 
|  | if *dataDir == "" { | 
|  | log.Fatal("must set --data_dir") | 
|  | } | 
|  |  | 
|  | // Automatically prepend our own path at the front, to minimize | 
|  | // required configuration. | 
|  | if l, err := os.Readlink("/proc/self/exe"); err == nil { | 
|  | os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) | 
|  | } | 
|  |  | 
|  | logDir := filepath.Join(*dataDir, "logs") | 
|  | if *indexDir == "" { | 
|  | *indexDir = filepath.Join(*dataDir, "index") | 
|  | } | 
|  | repoDir := filepath.Join(*dataDir, "repos") | 
|  | for _, s := range []string{logDir, *indexDir, repoDir} { | 
|  | if _, err := os.Stat(s); err == nil { | 
|  | continue | 
|  | } | 
|  |  | 
|  | if err := os.MkdirAll(s, 0755); err != nil { | 
|  | log.Fatalf("MkdirAll %s: %v", s, err) | 
|  | } | 
|  | } | 
|  |  | 
|  | _, err := readConfigURL(opts.mirrorConfigFile) | 
|  | if err != nil { | 
|  | log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err) | 
|  | } | 
|  |  | 
|  | pendingRepos := make(chan string, 10) | 
|  | go periodicMirrorFile(repoDir, &opts, pendingRepos) | 
|  | go deleteLogsLoop(logDir, opts.maxLogAge) | 
|  | go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval) | 
|  | go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos) | 
|  | periodicFetch(repoDir, *indexDir, &opts, pendingRepos) | 
|  | } |