Make HTTP connection parameters source-aware

Different replication sources may be located in different
geographical locations with their associated latencies and
timeouts.
Make the HTTP Client source-aware by allocating a separate
connection pool per source with associated timeouts.

HTTP Client gets automatically closed and reconfigured
upon source shutdown and automatic reload.

Feature: Issue 11605
Change-Id: I86419d1854ee405bae3abe947ed6a898c5c1d75b
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 90295c0..9add07c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -42,12 +42,12 @@
 import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiModule;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
-import com.googlesource.gerrit.plugins.replication.pull.client.HttpClientProvider;
+import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
@@ -67,7 +67,11 @@
 
     install(new PullReplicationApiModule());
 
-    bind(CloseableHttpClient.class).toProvider(HttpClientProvider.class).in(Scopes.SINGLETON);
+    install(
+        new FactoryModuleBuilder()
+            .implement(HttpClient.class, SourceHttpClient.class)
+            .build(SourceHttpClient.Factory.class));
+
     install(new FactoryModuleBuilder().build(Source.Factory.class));
     install(new FactoryModuleBuilder().build(FetchRestApiClient.Factory.class));
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index e8eae78..13b1466 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.data.GroupReference;
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.entities.BranchNameKey;
@@ -72,7 +73,9 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -84,6 +87,7 @@
 
 public class Source {
   private static final Logger repLog = PullReplicationLogger.repLog;
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   public interface Factory {
     Source create(SourceConfiguration config);
@@ -102,6 +106,7 @@
   private final PerThreadRequestScope.Scoper threadScoper;
   private final SourceConfiguration config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private CloseableHttpClient httpClient;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -193,6 +198,14 @@
     threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
   }
 
+  public synchronized CloseableHttpClient memoize(
+      Supplier<CloseableHttpClient> httpClientSupplier) {
+    if (httpClient == null) {
+      httpClient = httpClientSupplier.get();
+    }
+    return httpClient;
+  }
+
   private void addRecursiveParents(
       AccountGroup.UUID g,
       Builder<AccountGroup.UUID> builder,
@@ -217,12 +230,21 @@
     pool = workQueue.createQueue(config.getPoolThreads(), poolName);
   }
 
-  public int shutdown() {
+  public synchronized int shutdown() {
     int cnt = 0;
     if (pool != null) {
       cnt = pool.shutdownNow().size();
       pool = null;
     }
+    if (httpClient != null) {
+      try {
+        httpClient.close();
+        httpClient = null;
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error occurred while closing HTTP client connections");
+      }
+    }
+
     return cnt;
   }
 
@@ -646,6 +668,22 @@
     return config.getApis();
   }
 
+  public int getConnectionTimeout() {
+    return config.getConnectionTimeout();
+  }
+
+  public int getIdleTimeout() {
+    return config.getIdleTimeout();
+  }
+
+  public int getMaxConnectionsPerRoute() {
+    return config.getMaxConnectionsPerRoute();
+  }
+
+  public int getMaxConnections() {
+    return config.getMaxConnections();
+  }
+
   public int getMaxRetries() {
     return config.getMaxRetries();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index 009d952..04fae31 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -26,6 +26,9 @@
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
+  static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000;
+  static final int DEFAULT_CONNECTION_TIMEOUT_MS = 5000;
+  static final int DEFAULT_CONNECTIONS_PER_ROUTE = 100;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -41,6 +44,10 @@
   private final ImmutableList<String> authGroupNames;
   private final RemoteConfig remoteConfig;
   private final ImmutableList<String> apis;
+  private final int connectionTimeout;
+  private final int idleTimeout;
+  private final int maxConnectionsPerRoute;
+  private final int maxConnections;
   private final int maxRetries;
   private int slowLatencyThreshold;
 
@@ -49,6 +56,12 @@
     String name = remoteConfig.getName();
     urls = ImmutableList.copyOf(cfg.getStringList("remote", name, "url"));
     apis = ImmutableList.copyOf(cfg.getStringList("remote", name, "apiUrl"));
+    connectionTimeout =
+        cfg.getInt("remote", name, "connectionTimeout", DEFAULT_CONNECTION_TIMEOUT_MS);
+    idleTimeout = cfg.getInt("remote", name, "idleTimeout", DEFAULT_MAX_CONNECTION_INACTIVITY_MS);
+    maxConnectionsPerRoute =
+        cfg.getInt("replication", "maxConnectionsPerRoute", DEFAULT_CONNECTIONS_PER_ROUTE);
+    maxConnections = cfg.getInt("replication", "maxConnections", 2 * maxConnectionsPerRoute);
     delay = Math.max(0, getInt(remoteConfig, cfg, "replicationdelay", DEFAULT_REPLICATION_DELAY));
     rescheduleDelay =
         Math.max(3, getInt(remoteConfig, cfg, "rescheduledelay", DEFAULT_RESCHEDULE_DELAY));
@@ -109,6 +122,22 @@
     return apis;
   }
 
+  public int getConnectionTimeout() {
+    return connectionTimeout;
+  }
+
+  public int getIdleTimeout() {
+    return idleTimeout;
+  }
+
+  public int getMaxConnectionsPerRoute() {
+    return maxConnectionsPerRoute;
+  }
+
+  public int getMaxConnections() {
+    return maxConnections;
+  }
+
   @Override
   public ImmutableList<String> getAdminUrls() {
     return adminUrls;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index da263aa..502d3b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -36,7 +36,6 @@
 import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.util.EntityUtils;
 import org.eclipse.jgit.transport.CredentialItem;
@@ -51,18 +50,18 @@
   }
 
   private final CredentialsFactory credentials;
-  private final CloseableHttpClient httpClient;
+  private final SourceHttpClient.Factory httpClientFactory;
   private final Source source;
   private final String instanceLabel;
 
   @Inject
   FetchRestApiClient(
       CredentialsFactory credentials,
-      CloseableHttpClient httpClient,
+      SourceHttpClient.Factory httpClientFactory,
       ReplicationConfig replicationConfig,
       @Assisted Source source) {
     this.credentials = credentials;
-    this.httpClient = httpClient;
+    this.httpClientFactory = httpClientFactory;
     this.source = source;
     this.instanceLabel =
         replicationConfig.getConfig().getString("replication", null, "instanceLabel");
@@ -81,7 +80,7 @@
             String.format("{\"label\":\"%s\", \"ref_name\": \"%s\"}", instanceLabel, refName),
             StandardCharsets.UTF_8));
     post.addHeader(new BasicHeader("Content-Type", "application/json"));
-    return httpClient.execute(post, this, getContext(targetUri));
+    return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
new file mode 100644
index 0000000..7bfc7d1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import java.io.IOException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.protocol.HttpContext;
+
+/** HTTP client for executing URI requests to a remote site */
+public interface HttpClient {
+
+  public <T> T execute(
+      final HttpUriRequest request,
+      final ResponseHandler<? extends T> responseHandler,
+      final HttpContext context)
+      throws ClientProtocolException, IOException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java
deleted file mode 100644
index 6d6e803..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication.pull.client;
-
-import com.google.inject.Provider;
-import com.google.inject.ProvisionException;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.conn.HttpClientConnectionManager;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-
-/** Provides an HTTP client with SSL capabilities. */
-public class HttpClientProvider implements Provider<CloseableHttpClient> {
-  private static final int CONNECTIONS_PER_ROUTE = 100;
-
-  // Up to 2 target instances with the max number of connections per host:
-  private static final int MAX_CONNECTIONS = 2 * CONNECTIONS_PER_ROUTE;
-
-  private static final int MAX_CONNECTION_INACTIVITY_MS = 10000;
-  private static final int DEFAULT_TIMEOUT_MS = 5000;
-
-  @Override
-  public CloseableHttpClient get() {
-    try {
-      return HttpClients.custom()
-          .setConnectionManager(customConnectionManager())
-          .setDefaultRequestConfig(customRequestConfig())
-          .build();
-    } catch (Exception e) {
-      throw new ProvisionException("Couldn't create CloseableHttpClient", e);
-    }
-  }
-
-  private RequestConfig customRequestConfig() {
-    return RequestConfig.custom()
-        .setConnectTimeout(DEFAULT_TIMEOUT_MS)
-        .setSocketTimeout(DEFAULT_TIMEOUT_MS)
-        .setConnectionRequestTimeout(DEFAULT_TIMEOUT_MS)
-        .build();
-  }
-
-  private HttpClientConnectionManager customConnectionManager() throws Exception {
-    PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
-    connManager.setDefaultMaxPerRoute(CONNECTIONS_PER_ROUTE);
-    connManager.setMaxTotal(MAX_CONNECTIONS);
-    connManager.setValidateAfterInactivity(MAX_CONNECTION_INACTIVITY_MS);
-    return connManager;
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
new file mode 100644
index 0000000..ee0fe79
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
@@ -0,0 +1,74 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import java.io.IOException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.protocol.HttpContext;
+
+/** Apache HTTP client implementation based on Source-specific parameters */
+public class SourceHttpClient implements HttpClient {
+  private final Source source;
+
+  public interface Factory {
+    public HttpClient create(Source source);
+  }
+
+  @Inject
+  public SourceHttpClient(@Assisted Source source) {
+    this.source = source;
+  }
+
+  @Override
+  public <T> T execute(
+      HttpUriRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context)
+      throws ClientProtocolException, IOException {
+    return source
+        .memoize(
+            () ->
+                HttpClients.custom()
+                    .setConnectionManager(customConnectionManager(source))
+                    .setDefaultRequestConfig(customRequestConfig(source))
+                    .build())
+        .execute(request, responseHandler, context);
+  }
+
+  private static RequestConfig customRequestConfig(Source source) {
+    int connectionTimeout = source.getConnectionTimeout();
+    return RequestConfig.custom()
+        .setConnectTimeout(connectionTimeout)
+        .setSocketTimeout(connectionTimeout)
+        .setConnectionRequestTimeout(connectionTimeout)
+        .build();
+  }
+
+  private static HttpClientConnectionManager customConnectionManager(Source source) {
+    PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
+
+    connManager.setDefaultMaxPerRoute(source.getMaxConnectionsPerRoute());
+    connManager.setMaxTotal(source.getMaxConnections());
+    connManager.setValidateAfterInactivity(source.getIdleTimeout());
+    return connManager;
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 8fac1d9..a6ea668 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -121,6 +121,16 @@
 	provided in the remote configuration section which name is equal
 	to instanceLabel.
 
+replication.maxConnectionsPerRoute
+:	Maximum number of HTTP connections per one HTTP route.
+
+	Default: 100
+
+replication.maxConnections
+:	Total number of HTTP connections pool.
+
+	Default: 2 * replication.maxConnectionsPerRoute
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
@@ -148,6 +158,21 @@
 	different destinations which share the same settings. Gerrit calls
 	all URLs in sequence.
 
+remote.NAME.connectionTimeout
+:	Defines the socket timeout ({@code SO_TIMEOUT}) in milliseconds,
+	which is the timeout for waiting for data or, put differently,
+	a maximum period inactivity between two consecutive data packets.
+
+	Default: 5000
+
+remote.NAME.idleTimeout
+:	Defines period of inactivity in milliseconds after which persistent connections must
+	be re-validated prior to being leased to the consumer. Non-positive value disables 
+	connection validation. This check helps detect connections that have become stale
+	(half-closed) while kept inactive in the pool.
+
+	Default: 10000
+
 remote.NAME.uploadpack
 :	Path of the `git-upload-pack` executable on the remote system,
 	if using the SSH transport.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index 22dd0e9..669161f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -34,7 +34,6 @@
 import org.apache.http.Header;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.message.BasicHeader;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.CredentialItem;
@@ -56,7 +55,8 @@
 public class FetchRestApiClientTest {
   @Mock CredentialsProvider credentialProvider;
   @Mock CredentialsFactory credentials;
-  @Mock CloseableHttpClient httpClient;
+  @Mock HttpClient httpClient;
+  @Mock SourceHttpClient.Factory httpClientFactory;
   @Mock FileBasedConfig config;
   @Mock ReplicationFileBasedConfig replicationConfig;
   @Mock Source source;
@@ -96,8 +96,9 @@
 
     HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
     when(httpClient.execute(any(HttpPost.class), any(), any())).thenReturn(httpResult);
-
-    objectUnderTest = new FetchRestApiClient(credentials, httpClient, replicationConfig, source);
+    when(httpClientFactory.create(any())).thenReturn(httpClient);
+    objectUnderTest =
+        new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source);
   }
 
   @Test