Avoid multiple indexing operation for a single change

Before this code change, if active node calls passive node to index
some id and there already was an indexing operation for that id, we
ended up with multiple concurrent indexing operations. Adding tracking
of in-flight indexing operations allows avoiding multiple indexing
for the same id. Set of in-flight ids instead of locking fixes the
issue with occupying receivers threads while waiting for a lock.

Feature: Issue 13637
Change-Id: I660e9b9f9879beb6ca44103f96dc194fd2bcaa7e
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
index 9977ff1..44e35a8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
@@ -16,7 +16,10 @@
 
 import com.google.common.flogger.FluentLogger;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Base class to handle forwarded indexing. This class is meant to be extended by classes used on
@@ -26,6 +29,7 @@
  */
 public abstract class ForwardedIndexingHandler<T> {
   protected static final FluentLogger log = FluentLogger.forEnclosingClass();
+  private final Set<T> inFlightIndexing = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   public enum Operation {
     INDEX,
@@ -51,21 +55,27 @@
    */
   public void index(T id, Operation operation, Optional<IndexEvent> indexEvent) throws IOException {
     log.atFine().log("%s %s %s", operation, id, indexEvent);
-    try {
-      Context.setForwardedEvent(true);
-      switch (operation) {
-        case INDEX:
-          doIndex(id, indexEvent);
-          break;
-        case DELETE:
-          doDelete(id, indexEvent);
-          break;
-        default:
-          log.atSevere().log("unexpected operation: %s", operation);
-          break;
+    if (inFlightIndexing.add(id)) {
+      try {
+        Context.setForwardedEvent(true);
+        switch (operation) {
+          case INDEX:
+            doIndex(id, indexEvent);
+            break;
+          case DELETE:
+            doDelete(id, indexEvent);
+            break;
+          default:
+            log.atSevere().log("unexpected operation: %s", operation);
+            break;
+        }
+      } finally {
+        Context.unsetForwardedEvent();
+        inFlightIndexing.remove(id);
       }
-    } finally {
-      Context.unsetForwardedEvent();
+    } else {
+      throw new InFlightIndexedException(
+          String.format("Indexing for %s %s %s already in flight", operation, id, indexEvent));
     }
   }
 }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/InFlightIndexedException.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/InFlightIndexedException.java
new file mode 100644
index 0000000..87d28b3
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/InFlightIndexedException.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder;
+
+import java.io.IOException;
+
+public class InFlightIndexedException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  public InFlightIndexedException(String msg) {
+    super(msg);
+  }
+}