Drop producer/consumer queue to simplify testing
Change-Id: I4bcd744e8e3fd37ef95089abfb1066ab924ee3d2
diff --git a/cmd/checker/checker.go b/cmd/checker/checker.go
index 10507ca..02b5921 100644
--- a/cmd/checker/checker.go
+++ b/cmd/checker/checker.go
@@ -21,6 +21,7 @@
"errors"
"fmt"
"log"
+ "math/rand"
"net/rpc"
"strconv"
"strings"
@@ -122,7 +123,6 @@
todo: make(chan *gerrit.PendingChecksInfo, 5),
}
- go gc.pendingLoop()
return gc, nil
}
@@ -187,42 +187,51 @@
return msgs, nil
}
-// pendingLoop periodically contacts gerrit to find new checks to
-// execute. It should be executed in a goroutine.
-func (c *gerritChecker) pendingLoop() {
+func (c *gerritChecker) Serve() {
for {
// TODO: real rate limiting.
- time.Sleep(10 * time.Second)
-
- pending, err := c.server.PendingChecksByScheme(checkerScheme)
+ wait, err := c.processPendingChecks()
if err != nil {
- log.Printf("PendingChecksByScheme: %v", err)
- continue
+ log.Printf("checkAllChecks: %v", err)
}
-
- if len(pending) == 0 {
- log.Printf("no pending checks")
- }
-
- for _, pc := range pending {
- select {
- case c.todo <- pc:
- default:
- log.Println("too busy; dropping pending check.")
- }
+ if wait {
+ time.Sleep(10 * time.Second)
}
}
}
-// Serve runs the serve loop, executing formatters for checks that
-// need it.
-func (gc *gerritChecker) Serve() {
- for p := range gc.todo {
- // TODO: parallelism?.
- if err := gc.executeCheck(p); err != nil {
- log.Printf("executeCheck(%v): %v", p, err)
+// processPendingChecks
+func (c *gerritChecker) processPendingChecks() (wait bool, err error) {
+ pending, err := c.server.PendingChecksByScheme(checkerScheme)
+ if err != nil {
+ wait = true
+ return
+ }
+
+ if len(pending) == 0 {
+ wait = true
+ log.Printf("no pending checks")
+ return
+ }
+
+ // Shuffle so we don't always report the first error if there
+ // is a problem.
+ rand.Shuffle(len(pending),
+ func(i, j int) {
+ pending[i], pending[j] = pending[j], pending[i]
+ })
+
+ var aggregateErr error
+ for _, pc := range pending {
+ if err := c.executeCheck(pc); err != nil && aggregateErr == nil {
+ // just register the first error.
+ aggregateErr = err
+ } else if err == nil {
+ // we can try again until we stop making progress.
+ wait = false
}
}
+ return wait, aggregateErr
}
// status encodes the checker states.