| // Copyright 2016 Google Inc. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // This program manages a zoekt indexing deployment: |
| // * recycling logs |
| // * periodically fetching new data. |
| // * periodically reindexing all git repos. |
| |
| package main |
| |
| import ( |
| "bytes" |
| "context" |
| "flag" |
| "fmt" |
| "log" |
| "math" |
| "os" |
| "os/exec" |
| "path/filepath" |
| "runtime" |
| "strings" |
| "time" |
| |
| "github.com/google/zoekt" |
| "github.com/google/zoekt/gitindex" |
| ) |
| |
| const day = time.Hour * 24 |
| |
| func loggedRun(cmd *exec.Cmd) (out, err []byte) { |
| outBuf := &bytes.Buffer{} |
| errBuf := &bytes.Buffer{} |
| cmd.Stdout = outBuf |
| cmd.Stderr = errBuf |
| |
| log.Printf("run %v", cmd.Args) |
| if err := cmd.Run(); err != nil { |
| log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", |
| cmd.Args, err, outBuf.String(), errBuf.String()) |
| } |
| |
| return outBuf.Bytes(), errBuf.Bytes() |
| } |
| |
| type Options struct { |
| cpuFraction float64 |
| cpuCount int |
| fetchInterval time.Duration |
| mirrorInterval time.Duration |
| indexFlagsStr string |
| indexFlags []string |
| mirrorConfigFile string |
| maxLogAge time.Duration |
| indexTimeout time.Duration |
| } |
| |
| func (o *Options) validate() { |
| if o.cpuFraction <= 0.0 || o.cpuFraction > 1.0 { |
| log.Fatal("cpu_fraction must be between 0.0 and 1.0") |
| } |
| |
| o.cpuCount = int(math.Trunc(float64(runtime.GOMAXPROCS(0)) * o.cpuFraction)) |
| if o.cpuCount < 1 { |
| o.cpuCount = 1 |
| } |
| if o.indexFlagsStr != "" { |
| o.indexFlags = strings.Split(o.indexFlagsStr, " ") |
| } |
| } |
| |
| func (o *Options) defineFlags() { |
| flag.DurationVar(&o.indexTimeout, "index_timeout", time.Hour, "kill index job after this much time") |
| flag.DurationVar(&o.maxLogAge, "max_log_age", 3*day, "recycle index logs after this much time") |
| flag.DurationVar(&o.fetchInterval, "fetch_interval", time.Hour, "run fetches this often") |
| flag.StringVar(&o.mirrorConfigFile, "mirror_config", |
| "", "JSON file holding mirror configuration.") |
| |
| flag.DurationVar(&o.mirrorInterval, "mirror_duration", 24*time.Hour, "find and clone new repos at this frequency.") |
| flag.Float64Var(&o.cpuFraction, "cpu_fraction", 0.25, |
| "use this fraction of the cores for indexing.") |
| flag.StringVar(&o.indexFlagsStr, "git_index_flags", "", "space separated list of flags passed through to zoekt-git-index (e.g. -git_index_flags='-symbols=false -submodules=false'") |
| } |
| |
| // periodicFetch runs git-fetch every once in a while. Results are |
| // posted on pendingRepos. |
| func periodicFetch(repoDir, indexDir string, opts *Options, pendingRepos chan<- string) { |
| t := time.NewTicker(opts.fetchInterval) |
| for { |
| repos, err := gitindex.FindGitRepos(repoDir) |
| if err != nil { |
| log.Println(err) |
| continue |
| } |
| if len(repos) == 0 { |
| log.Printf("no repos found under %s", repoDir) |
| } |
| |
| // TODO: Randomize to make sure quota throttling hits everyone. |
| |
| later := map[string]struct{}{} |
| for _, dir := range repos { |
| if ok := fetchGitRepo(dir); !ok { |
| later[dir] = struct{}{} |
| } else { |
| pendingRepos <- dir |
| } |
| } |
| |
| for r := range later { |
| pendingRepos <- r |
| } |
| |
| <-t.C |
| } |
| } |
| |
| // fetchGitRepo runs git-fetch, and returns true if there was an |
| // update. |
| func fetchGitRepo(dir string) bool { |
| cmd := exec.Command("git", "--git-dir", dir, "fetch", "origin") |
| outBuf := &bytes.Buffer{} |
| errBuf := &bytes.Buffer{} |
| |
| // Prevent prompting |
| cmd.Stdin = &bytes.Buffer{} |
| cmd.Stderr = errBuf |
| cmd.Stdout = outBuf |
| if err := cmd.Run(); err != nil { |
| log.Printf("command %s failed: %v\nOUT: %s\nERR: %s", |
| cmd.Args, err, outBuf.String(), errBuf.String()) |
| } else { |
| return len(outBuf.Bytes()) != 0 |
| } |
| return false |
| } |
| |
| // indexPendingRepos consumes the directories on the repos channel and |
| // indexes them, sequentially. |
| func indexPendingRepos(indexDir, repoDir string, opts *Options, repos <-chan string) { |
| for dir := range repos { |
| indexPendingRepo(dir, indexDir, repoDir, opts) |
| } |
| } |
| |
| func indexPendingRepo(dir, indexDir, repoDir string, opts *Options) { |
| ctx, cancel := context.WithTimeout(context.Background(), opts.indexTimeout) |
| defer cancel() |
| args := []string{ |
| "-require_ctags", |
| fmt.Sprintf("-parallelism=%d", opts.cpuCount), |
| "-repo_cache", repoDir, |
| "-index", indexDir, |
| "-incremental", |
| } |
| args = append(args, opts.indexFlags...) |
| args = append(args, dir) |
| cmd := exec.CommandContext(ctx, "zoekt-git-index", args...) |
| loggedRun(cmd) |
| |
| } |
| |
| // deleteLogs deletes old logs. |
| func deleteLogs(logDir string, maxAge time.Duration) { |
| fs, err := filepath.Glob(filepath.Join(logDir, "*")) |
| if err != nil { |
| log.Fatalf("filepath.Glob(%s): %v", logDir, err) |
| } |
| |
| threshold := time.Now().Add(-maxAge) |
| for _, fn := range fs { |
| if fi, err := os.Lstat(fn); err == nil && fi.ModTime().Before(threshold) { |
| os.Remove(fn) |
| } |
| } |
| } |
| |
| func deleteLogsLoop(logDir string, maxAge time.Duration) { |
| tick := time.NewTicker(maxAge / 100) |
| for { |
| deleteLogs(logDir, maxAge) |
| <-tick.C |
| } |
| } |
| |
| // Delete the shard if its corresponding git repo can't be found. |
| func deleteIfOrphan(repoDir string, fn string) error { |
| f, err := os.Open(fn) |
| if err != nil { |
| return nil |
| } |
| defer f.Close() |
| |
| ifile, err := zoekt.NewIndexFile(f) |
| if err != nil { |
| return nil |
| } |
| defer ifile.Close() |
| |
| repo, _, err := zoekt.ReadMetadata(ifile) |
| if err != nil { |
| return nil |
| } |
| |
| _, err = os.Stat(repo.Source) |
| if os.IsNotExist(err) { |
| log.Printf("deleting orphan shard %s; source %q not found", fn, repo.Source) |
| return os.Remove(fn) |
| } |
| |
| return err |
| } |
| |
| func deleteOrphanIndexes(indexDir, repoDir string, watchInterval time.Duration) { |
| t := time.NewTicker(watchInterval) |
| |
| expr := indexDir + "/*" |
| for { |
| fs, err := filepath.Glob(expr) |
| if err != nil { |
| log.Printf("Glob(%q): %v", expr, err) |
| } |
| |
| for _, f := range fs { |
| if err := deleteIfOrphan(repoDir, f); err != nil { |
| log.Printf("deleteIfOrphan(%q): %v", f, err) |
| } |
| } |
| <-t.C |
| } |
| } |
| |
| func main() { |
| var opts Options |
| opts.defineFlags() |
| dataDir := flag.String("data_dir", |
| filepath.Join(os.Getenv("HOME"), "zoekt-serving"), "directory holding all data.") |
| indexDir := flag.String("index_dir", "", "directory holding index shards. Defaults to $data_dir/index/") |
| flag.Parse() |
| opts.validate() |
| |
| if *dataDir == "" { |
| log.Fatal("must set --data_dir") |
| } |
| |
| // Automatically prepend our own path at the front, to minimize |
| // required configuration. |
| if l, err := os.Readlink("/proc/self/exe"); err == nil { |
| os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) |
| } |
| |
| logDir := filepath.Join(*dataDir, "logs") |
| if *indexDir == "" { |
| *indexDir = filepath.Join(*dataDir, "index") |
| } |
| repoDir := filepath.Join(*dataDir, "repos") |
| for _, s := range []string{logDir, *indexDir, repoDir} { |
| if _, err := os.Stat(s); err == nil { |
| continue |
| } |
| |
| if err := os.MkdirAll(s, 0755); err != nil { |
| log.Fatalf("MkdirAll %s: %v", s, err) |
| } |
| } |
| |
| _, err := readConfigURL(opts.mirrorConfigFile) |
| if err != nil { |
| log.Fatalf("readConfigURL(%s): %v", opts.mirrorConfigFile, err) |
| } |
| |
| pendingRepos := make(chan string, 10) |
| go periodicMirrorFile(repoDir, &opts, pendingRepos) |
| go deleteLogsLoop(logDir, opts.maxLogAge) |
| go deleteOrphanIndexes(*indexDir, repoDir, opts.fetchInterval) |
| go indexPendingRepos(*indexDir, repoDir, &opts, pendingRepos) |
| periodicFetch(repoDir, *indexDir, &opts, pendingRepos) |
| } |