Make HTTP connection parameters source-aware Different replication sources may be located in different geographical locations with their associated latencies and timeouts. Make the HTTP Client source-aware by allocating a separate connection pool per source with associated timeouts. HTTP Client gets automatically closed and reconfigured upon source shutdown and automatic reload. Feature: Issue 11605 Change-Id: I86419d1854ee405bae3abe947ed6a898c5c1d75b
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java index 90295c0..9add07c 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -42,12 +42,12 @@ import com.googlesource.gerrit.plugins.replication.StartReplicationCapability; import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiModule; import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient; -import com.googlesource.gerrit.plugins.replication.pull.client.HttpClientProvider; +import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient; +import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import org.apache.http.impl.client.CloseableHttpClient; import org.eclipse.jgit.errors.ConfigInvalidException; import org.eclipse.jgit.storage.file.FileBasedConfig; import org.eclipse.jgit.util.FS; @@ -67,7 +67,11 @@ install(new PullReplicationApiModule()); - bind(CloseableHttpClient.class).toProvider(HttpClientProvider.class).in(Scopes.SINGLETON); + install( + new FactoryModuleBuilder() + .implement(HttpClient.class, SourceHttpClient.class) + .build(SourceHttpClient.Factory.class)); + install(new FactoryModuleBuilder().build(Source.Factory.class)); install(new FactoryModuleBuilder().build(FetchRestApiClient.Factory.class));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java index e8eae78..13b1466 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Lists; +import com.google.common.flogger.FluentLogger; import com.google.gerrit.common.data.GroupReference; import com.google.gerrit.entities.AccountGroup; import com.google.gerrit.entities.BranchNameKey; @@ -72,7 +73,9 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.commons.io.FilenameUtils; +import org.apache.http.impl.client.CloseableHttpClient; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.RefUpdate; @@ -84,6 +87,7 @@ public class Source { private static final Logger repLog = PullReplicationLogger.repLog; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); public interface Factory { Source create(SourceConfiguration config); @@ -102,6 +106,7 @@ private final PerThreadRequestScope.Scoper threadScoper; private final SourceConfiguration config; private final DynamicItem<EventDispatcher> eventDispatcher; + private CloseableHttpClient httpClient; protected enum RetryReason { TRANSPORT_ERROR, @@ -193,6 +198,14 @@ threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class); } + public synchronized CloseableHttpClient memoize( + Supplier<CloseableHttpClient> httpClientSupplier) { + if (httpClient == null) { + httpClient = httpClientSupplier.get(); + } + return httpClient; + } + private void addRecursiveParents( AccountGroup.UUID g, Builder<AccountGroup.UUID> builder, @@ -217,12 +230,21 @@ pool = workQueue.createQueue(config.getPoolThreads(), poolName); } - public int shutdown() { + public synchronized int shutdown() { int cnt = 0; if (pool != null) { cnt = pool.shutdownNow().size(); pool = null; } + if (httpClient != null) { + try { + httpClient.close(); + httpClient = null; + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error occurred while closing HTTP client connections"); + } + } + return cnt; } @@ -646,6 +668,22 @@ return config.getApis(); } + public int getConnectionTimeout() { + return config.getConnectionTimeout(); + } + + public int getIdleTimeout() { + return config.getIdleTimeout(); + } + + public int getMaxConnectionsPerRoute() { + return config.getMaxConnectionsPerRoute(); + } + + public int getMaxConnections() { + return config.getMaxConnections(); + } + public int getMaxRetries() { return config.getMaxRetries(); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java index 009d952..04fae31 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -26,6 +26,9 @@ static final int DEFAULT_REPLICATION_DELAY = 15; static final int DEFAULT_RESCHEDULE_DELAY = 3; static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900; + static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000; + static final int DEFAULT_CONNECTION_TIMEOUT_MS = 5000; + static final int DEFAULT_CONNECTIONS_PER_ROUTE = 100; private final int delay; private final int rescheduleDelay; @@ -41,6 +44,10 @@ private final ImmutableList<String> authGroupNames; private final RemoteConfig remoteConfig; private final ImmutableList<String> apis; + private final int connectionTimeout; + private final int idleTimeout; + private final int maxConnectionsPerRoute; + private final int maxConnections; private final int maxRetries; private int slowLatencyThreshold; @@ -49,6 +56,12 @@ String name = remoteConfig.getName(); urls = ImmutableList.copyOf(cfg.getStringList("remote", name, "url")); apis = ImmutableList.copyOf(cfg.getStringList("remote", name, "apiUrl")); + connectionTimeout = + cfg.getInt("remote", name, "connectionTimeout", DEFAULT_CONNECTION_TIMEOUT_MS); + idleTimeout = cfg.getInt("remote", name, "idleTimeout", DEFAULT_MAX_CONNECTION_INACTIVITY_MS); + maxConnectionsPerRoute = + cfg.getInt("replication", "maxConnectionsPerRoute", DEFAULT_CONNECTIONS_PER_ROUTE); + maxConnections = cfg.getInt("replication", "maxConnections", 2 * maxConnectionsPerRoute); delay = Math.max(0, getInt(remoteConfig, cfg, "replicationdelay", DEFAULT_REPLICATION_DELAY)); rescheduleDelay = Math.max(3, getInt(remoteConfig, cfg, "rescheduledelay", DEFAULT_RESCHEDULE_DELAY)); @@ -109,6 +122,22 @@ return apis; } + public int getConnectionTimeout() { + return connectionTimeout; + } + + public int getIdleTimeout() { + return idleTimeout; + } + + public int getMaxConnectionsPerRoute() { + return maxConnectionsPerRoute; + } + + public int getMaxConnections() { + return maxConnections; + } + @Override public ImmutableList<String> getAdminUrls() { return adminUrls;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java index da263aa..502d3b7 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -36,7 +36,6 @@ import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicHeader; import org.apache.http.util.EntityUtils; import org.eclipse.jgit.transport.CredentialItem; @@ -51,18 +50,18 @@ } private final CredentialsFactory credentials; - private final CloseableHttpClient httpClient; + private final SourceHttpClient.Factory httpClientFactory; private final Source source; private final String instanceLabel; @Inject FetchRestApiClient( CredentialsFactory credentials, - CloseableHttpClient httpClient, + SourceHttpClient.Factory httpClientFactory, ReplicationConfig replicationConfig, @Assisted Source source) { this.credentials = credentials; - this.httpClient = httpClient; + this.httpClientFactory = httpClientFactory; this.source = source; this.instanceLabel = replicationConfig.getConfig().getString("replication", null, "instanceLabel"); @@ -81,7 +80,7 @@ String.format("{\"label\":\"%s\", \"ref_name\": \"%s\"}", instanceLabel, refName), StandardCharsets.UTF_8)); post.addHeader(new BasicHeader("Content-Type", "application/json")); - return httpClient.execute(post, this, getContext(targetUri)); + return httpClientFactory.create(source).execute(post, this, getContext(targetUri)); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java new file mode 100644 index 0000000..7bfc7d1 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
@@ -0,0 +1,31 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.client; + +import java.io.IOException; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.protocol.HttpContext; + +/** HTTP client for executing URI requests to a remote site */ +public interface HttpClient { + + public <T> T execute( + final HttpUriRequest request, + final ResponseHandler<? extends T> responseHandler, + final HttpContext context) + throws ClientProtocolException, IOException; +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java deleted file mode 100644 index 6d6e803..0000000 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java +++ /dev/null
@@ -1,62 +0,0 @@ -// Copyright (C) 2020 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.replication.pull.client; - -import com.google.inject.Provider; -import com.google.inject.ProvisionException; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.conn.HttpClientConnectionManager; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; - -/** Provides an HTTP client with SSL capabilities. */ -public class HttpClientProvider implements Provider<CloseableHttpClient> { - private static final int CONNECTIONS_PER_ROUTE = 100; - - // Up to 2 target instances with the max number of connections per host: - private static final int MAX_CONNECTIONS = 2 * CONNECTIONS_PER_ROUTE; - - private static final int MAX_CONNECTION_INACTIVITY_MS = 10000; - private static final int DEFAULT_TIMEOUT_MS = 5000; - - @Override - public CloseableHttpClient get() { - try { - return HttpClients.custom() - .setConnectionManager(customConnectionManager()) - .setDefaultRequestConfig(customRequestConfig()) - .build(); - } catch (Exception e) { - throw new ProvisionException("Couldn't create CloseableHttpClient", e); - } - } - - private RequestConfig customRequestConfig() { - return RequestConfig.custom() - .setConnectTimeout(DEFAULT_TIMEOUT_MS) - .setSocketTimeout(DEFAULT_TIMEOUT_MS) - .setConnectionRequestTimeout(DEFAULT_TIMEOUT_MS) - .build(); - } - - private HttpClientConnectionManager customConnectionManager() throws Exception { - PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(); - connManager.setDefaultMaxPerRoute(CONNECTIONS_PER_ROUTE); - connManager.setMaxTotal(MAX_CONNECTIONS); - connManager.setValidateAfterInactivity(MAX_CONNECTION_INACTIVITY_MS); - return connManager; - } -}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java new file mode 100644 index 0000000..ee0fe79 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
@@ -0,0 +1,74 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.client; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import com.googlesource.gerrit.plugins.replication.pull.Source; +import java.io.IOException; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; + +/** Apache HTTP client implementation based on Source-specific parameters */ +public class SourceHttpClient implements HttpClient { + private final Source source; + + public interface Factory { + public HttpClient create(Source source); + } + + @Inject + public SourceHttpClient(@Assisted Source source) { + this.source = source; + } + + @Override + public <T> T execute( + HttpUriRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context) + throws ClientProtocolException, IOException { + return source + .memoize( + () -> + HttpClients.custom() + .setConnectionManager(customConnectionManager(source)) + .setDefaultRequestConfig(customRequestConfig(source)) + .build()) + .execute(request, responseHandler, context); + } + + private static RequestConfig customRequestConfig(Source source) { + int connectionTimeout = source.getConnectionTimeout(); + return RequestConfig.custom() + .setConnectTimeout(connectionTimeout) + .setSocketTimeout(connectionTimeout) + .setConnectionRequestTimeout(connectionTimeout) + .build(); + } + + private static HttpClientConnectionManager customConnectionManager(Source source) { + PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(); + + connManager.setDefaultMaxPerRoute(source.getMaxConnectionsPerRoute()); + connManager.setMaxTotal(source.getMaxConnections()); + connManager.setValidateAfterInactivity(source.getIdleTimeout()); + return connManager; + } +}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 8fac1d9..a6ea668 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -121,6 +121,16 @@ provided in the remote configuration section which name is equal to instanceLabel. +replication.maxConnectionsPerRoute +: Maximum number of HTTP connections per one HTTP route. + + Default: 100 + +replication.maxConnections +: Total number of HTTP connections pool. + + Default: 2 * replication.maxConnectionsPerRoute + remote.NAME.url : Address of the remote server to fetch from. Single URL can be specified within a single remote block. A remote node can request @@ -148,6 +158,21 @@ different destinations which share the same settings. Gerrit calls all URLs in sequence. +remote.NAME.connectionTimeout +: Defines the socket timeout ({@code SO_TIMEOUT}) in milliseconds, + which is the timeout for waiting for data or, put differently, + a maximum period inactivity between two consecutive data packets. + + Default: 5000 + +remote.NAME.idleTimeout +: Defines period of inactivity in milliseconds after which persistent connections must + be re-validated prior to being leased to the consumer. Non-positive value disables + connection validation. This check helps detect connections that have become stale + (half-closed) while kept inactive in the pool. + + Default: 10000 + remote.NAME.uploadpack : Path of the `git-upload-pack` executable on the remote system, if using the SSH transport.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java index 22dd0e9..669161f 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -34,7 +34,6 @@ import org.apache.http.Header; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicHeader; import org.eclipse.jgit.storage.file.FileBasedConfig; import org.eclipse.jgit.transport.CredentialItem; @@ -56,7 +55,8 @@ public class FetchRestApiClientTest { @Mock CredentialsProvider credentialProvider; @Mock CredentialsFactory credentials; - @Mock CloseableHttpClient httpClient; + @Mock HttpClient httpClient; + @Mock SourceHttpClient.Factory httpClientFactory; @Mock FileBasedConfig config; @Mock ReplicationFileBasedConfig replicationConfig; @Mock Source source; @@ -96,8 +96,9 @@ HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message")); when(httpClient.execute(any(HttpPost.class), any(), any())).thenReturn(httpResult); - - objectUnderTest = new FetchRestApiClient(credentials, httpClient, replicationConfig, source); + when(httpClientFactory.create(any())).thenReturn(httpClient); + objectUnderTest = + new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source); } @Test