Move the forwarder retry logic out of the http layer

By moving the retry logic above the http layer, we make it possible
to retry requests on a different target URL. This is necessary when
the other peer node restarts and comes back on a different URL.

Change-Id: Ia566a2a80b0bdefc765507cec0fa34e94c1ae17f
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ForwardingException.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ForwardingException.java
new file mode 100644
index 0000000..d7d09ee
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ForwardingException.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
+
+class ForwardingException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isRecoverable;
+
+  ForwardingException(boolean isRecoverable, String message) {
+    super(message);
+    this.isRecoverable = isRecoverable;
+  }
+
+  ForwardingException(boolean isRecoverable, String message, Throwable cause) {
+    super(message, cause);
+    this.isRecoverable = isRecoverable;
+  }
+
+  boolean isRecoverable() {
+    return isRecoverable;
+  }
+}
\ No newline at end of file
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java
index 9a0d48f..bdd641e 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java
@@ -19,11 +19,8 @@
 
 import com.ericsson.gerrit.plugins.highavailability.Configuration;
 
-import org.apache.http.HttpResponse;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.HttpRequestRetryHandler;
-import org.apache.http.client.ServiceUnavailableRetryStrategy;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.config.Registry;
 import org.apache.http.config.RegistryBuilder;
@@ -36,17 +33,14 @@
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.protocol.HttpContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.X509Certificate;
 
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
@@ -60,7 +54,6 @@
   // Up to 2 target instances with the max number of connections per host:
   private static final int MAX_CONNECTIONS = 2 * CONNECTIONS_PER_ROUTE;
 
-  private static final int ERROR_CODES = 500;
   private static final int MAX_CONNECTION_INACTIVITY = 10000;
 
   private final Configuration cfg;
@@ -78,8 +71,6 @@
         .setConnectionManager(customConnectionManager())
         .setDefaultCredentialsProvider(buildCredentials())
         .setDefaultRequestConfig(customRequestConfig())
-        .setRetryHandler(customRetryHandler())
-        .setServiceUnavailableRetryStrategy(customServiceUnavailRetryStrategy())
         .build();
   }
 
@@ -90,57 +81,6 @@
         .build();
   }
 
-  private HttpRequestRetryHandler customRetryHandler() {
-    return new HttpRequestRetryHandler() {
-
-      @Override
-      public boolean retryRequest(IOException exception, int executionCount,
-          HttpContext context) {
-        if (executionCount > cfg.getMaxTries()
-            || exception instanceof SSLException) {
-          return false;
-        }
-        logRetry(exception.getMessage(), context);
-        try {
-          Thread.sleep(cfg.getRetryInterval());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return false;
-        }
-        return true;
-      }
-    };
-  }
-
-  private ServiceUnavailableRetryStrategy customServiceUnavailRetryStrategy() {
-    return new ServiceUnavailableRetryStrategy() {
-      @Override
-      public boolean retryRequest(HttpResponse response, int executionCount,
-          HttpContext context) {
-        if (executionCount > cfg.getMaxTries()) {
-          return false;
-        }
-        if (response.getStatusLine().getStatusCode() >= ERROR_CODES) {
-          logRetry(response.getStatusLine().getReasonPhrase(), context);
-          return true;
-        }
-        return false;
-      }
-
-      @Override
-      public long getRetryInterval() {
-        return cfg.getRetryInterval();
-      }
-    };
-  }
-
-  private void logRetry(String cause, HttpContext context) {
-    if(log.isDebugEnabled()){
-      log.debug("Retrying request caused by '" + cause + "', request: '"
-          + context.getAttribute("http.request") + "'");
-    }
-  }
-
   private HttpClientConnectionManager customConnectionManager() {
     Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder
         .<ConnectionSocketFactory> create().register("https", sslSocketFactory)
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
index fddec98..cc58e3c 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -22,6 +22,7 @@
 import com.google.gson.GsonBuilder;
 import com.google.inject.Inject;
 
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult;
 
@@ -30,18 +31,23 @@
 
 import java.io.IOException;
 
+import javax.net.ssl.SSLException;
+
 class RestForwarder implements Forwarder {
   private static final Logger log =
       LoggerFactory.getLogger(RestForwarder.class);
 
   private final HttpSession httpSession;
   private final String pluginRelativePath;
+  private final Configuration cfg;
 
   @Inject
   RestForwarder(HttpSession httpClient,
-      @PluginName String pluginName) {
+      @PluginName String pluginName,
+      Configuration cfg) {
     this.httpSession = httpClient;
     this.pluginRelativePath = Joiner.on("/").join("/plugins", pluginName);
+    this.cfg = cfg;
   }
 
   @Override
@@ -97,24 +103,61 @@
 
   private abstract class Request {
     private String name;
+    private int execCnt;
 
     Request(String name) {
       this.name = name;
     }
 
     boolean execute() {
+      for (;;) {
+        try {
+          execCnt++;
+          tryOnce();
+          return true;
+        } catch (ForwardingException e) {
+          if (!e.isRecoverable()) {
+            log.error("Failed to {}", name, e);
+            return false;
+          }
+          if (execCnt >= cfg.getMaxTries()) {
+            log.error("Failed to {}, after {} tries", name, cfg.getMaxTries());
+            return false;
+          }
+
+          logRetry(e);
+          try {
+            Thread.sleep(cfg.getRetryInterval());
+          } catch (InterruptedException ie) {
+            log.error("{} was interrupted, giving up", name, ie);
+            Thread.currentThread().interrupt();
+            return false;
+          }
+        }
+      }
+    }
+
+    void tryOnce() throws ForwardingException {
       try {
         HttpResult result = send();
-        if (result.isSuccessful()) {
-          return true;
+        if (!result.isSuccessful()) {
+          throw new ForwardingException(true, "Unable to " + name + ": " + result.getMessage());
         }
-        log.error("Unable to {}: {}", name, result.getMessage());
       } catch (IOException e) {
-        log.error("Error trying to {}", name, e);
+        throw new ForwardingException(isRecoverable(e), e.getMessage(), e);
       }
-      return false;
     }
 
     abstract HttpResult send() throws IOException;
+
+    boolean isRecoverable(IOException e) {
+      return !(e instanceof SSLException);
+    }
+
+    void logRetry(Throwable cause) {
+      if (log.isDebugEnabled()) {
+        log.debug("Retrying to {} caused by '{}'", name, cause);
+      }
+    }
   }
 }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionIT.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionIT.java
index fc492f5..6649d01 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionIT.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionIT.java
@@ -50,7 +50,8 @@
   @Test
   @GerritConfigs({
       @GerritConfig(name = "plugin.high-availability.url", value = "http://localhost:18888"),
-      @GerritConfig(name = "plugin.high-availability.user", value = "admin")})
+      @GerritConfig(name = "plugin.high-availability.user", value = "admin"),
+      @GerritConfig(name = "plugin.high-availability.cacheThreadPoolSize", value = "10")})
   public void flushAndSendPost() throws Exception {
     final String flushRequest =
         "/plugins/high-availability/cache/" + Constants.PROJECT_LIST;
@@ -72,7 +73,7 @@
 
     adminSshSession
         .exec("gerrit flush-caches --cache " + Constants.PROJECT_LIST);
-    checkPoint.await(5, TimeUnit.SECONDS);
+    checkPoint.await(50, TimeUnit.SECONDS);
     verify(postRequestedFor(urlEqualTo(flushRequest)));
   }
 }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
index 7834cf0..4280ac5 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
@@ -51,7 +51,6 @@
   private static final String REQUEST_MADE = "Request made";
   private static final String SECOND_TRY = "Second try";
   private static final String THIRD_TRY = "Third try";
-  private static final String RETRY_AT_ERROR = "Retry at error";
   private static final String RETRY_AT_DELAY = "Retry at delay";
 
   private HttpSession httpSession;
@@ -123,18 +122,6 @@
   }
 
   @Test
-  public void testBadResponseRetryThenOK() throws Exception {
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_ERROR)
-        .whenScenarioStateIs(Scenario.STARTED).willSetStateTo(REQUEST_MADE)
-        .willReturn(aResponse().withStatus(ERROR)));
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_ERROR)
-        .whenScenarioStateIs(REQUEST_MADE)
-        .willReturn(aResponse().withStatus(NO_CONTENT)));
-
-    assertThat(httpSession.post(ENDPOINT).isSuccessful()).isTrue();
-  }
-
-  @Test
   public void testBadResponseRetryThenGiveUp() throws Exception {
     wireMockRule.givenThat(post(urlEqualTo(ENDPOINT))
         .willReturn(aResponse().withStatus(ERROR).withBody(ERROR_MESSAGE)));
@@ -144,36 +131,6 @@
     assertThat(result.getMessage()).isEqualTo(ERROR_MESSAGE);
   }
 
-  @Test
-  public void testRetryAfterDelay() throws Exception {
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
-        .whenScenarioStateIs(Scenario.STARTED).willSetStateTo(REQUEST_MADE)
-        .willReturn(aResponse().withStatus(ERROR).withFixedDelay(TIMEOUT / 2)));
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
-        .whenScenarioStateIs(REQUEST_MADE)
-        .willReturn(aResponse().withStatus(NO_CONTENT)));
-
-    assertThat(httpSession.post(ENDPOINT).isSuccessful()).isTrue();
-  }
-
-  @Test
-  public void testRetryAfterTimeoutThenOK() throws Exception {
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
-        .whenScenarioStateIs(Scenario.STARTED).willSetStateTo(REQUEST_MADE)
-        .willReturn(aResponse().withFixedDelay(TIMEOUT)));
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
-        .whenScenarioStateIs(REQUEST_MADE).willSetStateTo(SECOND_TRY)
-        .willReturn(aResponse().withFixedDelay(TIMEOUT)));
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
-        .whenScenarioStateIs(SECOND_TRY).willSetStateTo(THIRD_TRY)
-        .willReturn(aResponse().withFixedDelay(TIMEOUT)));
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
-        .whenScenarioStateIs(THIRD_TRY)
-        .willReturn(aResponse().withStatus(NO_CONTENT)));
-
-    assertThat(httpSession.post(ENDPOINT).isSuccessful()).isTrue();
-  }
-
   @Test(expected = SocketTimeoutException.class)
   public void testMaxRetriesAfterTimeoutThenGiveUp() throws Exception {
     wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
@@ -193,15 +150,6 @@
   }
 
   @Test
-  public void testGiveUpAtTimeout() throws Exception {
-    wireMockRule.givenThat(post(urlEqualTo(ENDPOINT)).inScenario(RETRY_AT_DELAY)
-        .whenScenarioStateIs(Scenario.STARTED).willSetStateTo(REQUEST_MADE)
-        .willReturn(aResponse().withStatus(ERROR).withFixedDelay(TIMEOUT)));
-
-    assertThat(httpSession.post(ENDPOINT).isSuccessful()).isFalse();
-  }
-
-  @Test
   public void testResponseWithMalformedResponse() throws Exception {
     wireMockRule.givenThat(post(urlEqualTo(ENDPOINT))
         .willReturn(aResponse().withFault(Fault.MALFORMED_RESPONSE_CHUNK)));
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
index 2c65f84..ecc8fbf 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -15,6 +15,7 @@
 package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -25,6 +26,7 @@
 import com.google.gerrit.server.events.Event;
 import com.google.gson.GsonBuilder;
 
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.cache.Constants;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult;
 
@@ -32,6 +34,8 @@
 
 import java.io.IOException;
 
+import javax.net.ssl.SSLException;
+
 public class RestForwarderTest {
   private static final String PLUGIN_NAME = "high-availability";
   private static final String EMPTY_MSG = "";
@@ -42,6 +46,9 @@
   private static final boolean DO_NOT_THROW_EXCEPTION = false;
   private static final boolean THROW_EXCEPTION = true;
 
+  private static final int MAX_TRIES = 3;
+  private static final int RETRY_INTERVAL = 250;
+
   //Index
   private static final int CHANGE_NUMBER = 1;
   private static final String DELETE_OP = "delete";
@@ -111,7 +118,10 @@
         when(httpSession.delete(request)).thenReturn(result);
       }
     }
-    restForwarder = new RestForwarder(httpSession, PLUGIN_NAME);
+    Configuration cfg = mock(Configuration.class);
+    when(cfg.getMaxTries()).thenReturn(MAX_TRIES);
+    when(cfg.getRetryInterval()).thenReturn(RETRY_INTERVAL);
+    restForwarder = new RestForwarder(httpSession, PLUGIN_NAME, cfg);
   }
 
   @Test
@@ -144,7 +154,10 @@
       HttpResult result = new HttpResult(isOperationSuccessful, msg);
       when(httpSession.post(request, content)).thenReturn(result);
     }
-    restForwarder = new RestForwarder(httpSession, PLUGIN_NAME);
+    Configuration cfg = mock(Configuration.class);
+    when(cfg.getMaxTries()).thenReturn(MAX_TRIES);
+    when(cfg.getRetryInterval()).thenReturn(RETRY_INTERVAL);
+    restForwarder = new RestForwarder(httpSession, PLUGIN_NAME, cfg);
     return event;
   }
 
@@ -227,6 +240,76 @@
       HttpResult result = new HttpResult(isOperationSuccessful, "Error");
       when(httpSession.post(request, json)).thenReturn(result);
     }
-    restForwarder = new RestForwarder(httpSession, PLUGIN_NAME);
+    Configuration cfg = mock(Configuration.class);
+    when(cfg.getMaxTries()).thenReturn(MAX_TRIES);
+    when(cfg.getRetryInterval()).thenReturn(RETRY_INTERVAL);
+    restForwarder = new RestForwarder(httpSession, PLUGIN_NAME, cfg);
+  }
+
+  @Test
+  public void testRetryOnErrorThenSuccess() throws IOException {
+    Configuration cfg = mock(Configuration.class);
+    when(cfg.getMaxTries()).thenReturn(3);
+    when(cfg.getRetryInterval()).thenReturn(10);
+
+    HttpSession httpSession = mock(HttpSession.class);
+    when(httpSession.post(anyString(), anyString()))
+        .thenReturn(new HttpResult(false, "Error"))
+        .thenReturn(new HttpResult(false, "Error"))
+        .thenReturn(new HttpResult(true, "Success"));
+
+    RestForwarder forwarder = new RestForwarder(httpSession, PLUGIN_NAME, cfg);
+    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object()))
+        .isTrue();
+  }
+
+  @Test
+  public void testRetryOnIoExceptionThenSuccess() throws IOException {
+    Configuration cfg = mock(Configuration.class);
+    when(cfg.getMaxTries()).thenReturn(3);
+    when(cfg.getRetryInterval()).thenReturn(10);
+
+    HttpSession httpSession = mock(HttpSession.class);
+    when(httpSession.post(anyString(), anyString()))
+        .thenThrow(new IOException())
+        .thenThrow(new IOException())
+        .thenReturn(new HttpResult(true, "Success"));
+
+    RestForwarder forwarder = new RestForwarder(httpSession, PLUGIN_NAME, cfg);
+    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object()))
+        .isTrue();
+  }
+
+  @Test
+  public void testNoRetryAfterNonRecoverableException() throws IOException {
+    Configuration cfg = mock(Configuration.class);
+    when(cfg.getMaxTries()).thenReturn(3);
+    when(cfg.getRetryInterval()).thenReturn(10);
+
+    HttpSession httpSession = mock(HttpSession.class);
+    when(httpSession.post(anyString(), anyString()))
+        .thenThrow(new SSLException("Non Recoverable"))
+        .thenReturn(new HttpResult(true, "Success"));
+
+    RestForwarder forwarder = new RestForwarder(httpSession, PLUGIN_NAME, cfg);
+    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object()))
+        .isFalse();
+  }
+
+  @Test
+  public void testFailureAfterMaxTries() throws IOException {
+    Configuration cfg = mock(Configuration.class);
+    when(cfg.getMaxTries()).thenReturn(3);
+    when(cfg.getRetryInterval()).thenReturn(10);
+
+    HttpSession httpSession = mock(HttpSession.class);
+    when(httpSession.post(anyString(), anyString()))
+        .thenReturn(new HttpResult(false, "Error"))
+        .thenReturn(new HttpResult(false, "Error"))
+        .thenReturn(new HttpResult(false, "Error"));
+
+    RestForwarder forwarder = new RestForwarder(httpSession, PLUGIN_NAME, cfg);
+    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object()))
+        .isFalse();
   }
 }