Force async fetch as a fallback for sync replication
When running sync replication, the apply-object or fetch may
fail or timeout, causing the client to call the fetch operation
as a fallback.
Before this change, the modality for running the fallback fetch
was dependent on the replication.syncRefs configuration settings,
causing the fallback to potentially fail for all refs that were
included in the replication.syncRefs regex.
Force the async fetch when running the fallback execution to
a failed sync replication API, so that it can reliably work
for any refs independently from the replication.syncRefs settings.
It is tough to write a test for this specific condition
and verifying that the fetch happened asynchronously, hence
this logic has only been unit-tested at the RepicationQueue
level.
Bug: Issue 305653639
Change-Id: I5ea3f5d14b31cdc9cecc28996b175cc302f31a27
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index ef95596..cdef514 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.replication.pull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
@@ -38,6 +40,7 @@
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
import com.googlesource.gerrit.plugins.replication.pull.filter.ApplyObjectsRefsFilter;
import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter;
@@ -288,7 +291,7 @@
}
if (!callSuccessful) {
- callFetch(source, project, refName, state);
+ callFetch(source, project, refName, state, FetchRestApiClient.FORCE_ASYNC);
}
};
}
@@ -334,7 +337,7 @@
state);
}
- return (source) -> callFetch(source, project, refName, state);
+ return (source) -> callFetch(source, project, refName, state, false);
}
private boolean callSendObject(
@@ -461,7 +464,11 @@
}
private boolean callFetch(
- Source source, Project.NameKey project, String refName, ReplicationState state) {
+ Source source,
+ Project.NameKey project,
+ String refName,
+ ReplicationState state,
+ boolean forceAsyncCall) {
boolean resultIsSuccessful = true;
if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
for (String apiUrl : source.getApis()) {
@@ -470,7 +477,13 @@
FetchApiClient fetchClient = fetchClientFactory.create(source);
repLog.info("Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName);
Context<String> timer = fetchMetrics.startEnd2End(source.getRemoteConfigName());
- HttpResult result = fetchClient.callFetch(project, refName, uri);
+ HttpResult result =
+ fetchClient.callFetch(
+ project,
+ refName,
+ uri,
+ MILLISECONDS.toNanos(System.currentTimeMillis()),
+ forceAsyncCall);
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(timer.stop());
boolean resultSuccessful = result.isSuccessful();
repLog.info(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
index f7ed4cb..38d5d57 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -32,12 +32,17 @@
}
HttpResult callFetch(
- Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
+ Project.NameKey project,
+ String refName,
+ URIish targetUri,
+ long startTimeNanos,
+ boolean forceAsyncCall)
throws ClientProtocolException, IOException;
default HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
throws ClientProtocolException, IOException {
- return callFetch(project, refName, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()));
+ return callFetch(
+ project, refName, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()), false);
}
/**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index 7607e4b..5579415 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -63,6 +63,8 @@
import org.eclipse.jgit.transport.URIish;
public class FetchRestApiClient implements FetchApiClient, ResponseHandler<HttpResult> {
+ public static final boolean FORCE_ASYNC = true;
+
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
@@ -107,15 +109,31 @@
this.urlAuthenticationPrefix = bearerTokenProvider.get().map(br -> "").orElse("a/");
}
- /* (non-Javadoc)
- * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#callFetch(com.google.gerrit.entities.Project.NameKey, java.lang.String, org.eclipse.jgit.transport.URIish)
- */
@Override
public HttpResult callFetch(
- Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
+ NameKey project,
+ String refName,
+ URIish targetUri,
+ long startTimeNanos,
+ boolean forceAsyncFetch)
+ throws ClientProtocolException, IOException {
+ return doCallFetch(
+ project,
+ refName,
+ targetUri,
+ startTimeNanos,
+ forceAsyncFetch || !syncRefsFilter.match(refName));
+ }
+
+ private HttpResult doCallFetch(
+ Project.NameKey project,
+ String refName,
+ URIish targetUri,
+ long startTimeNanos,
+ boolean callAsync)
throws IOException {
String url = formatUrl(targetUri.toString(), project, "fetch");
- Boolean callAsync = !syncRefsFilter.match(refName);
+
HttpPost post = new HttpPost(url);
post.setEntity(
new StringEntity(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index a07aa55..3d4d531 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -61,6 +61,7 @@
import java.util.Optional;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.util.FS;
import org.junit.Before;
import org.junit.Test;
@@ -152,7 +153,6 @@
.thenReturn(successfulHttpResult);
when(successfulHttpResult.isSuccessful()).thenReturn(true);
when(httpResult.isSuccessful()).thenReturn(true);
- when(fetchHttpResult.isSuccessful()).thenReturn(true);
when(httpResult.isProjectMissing(any())).thenReturn(false);
when(applyObjectsRefsFilter.match(any())).thenReturn(false);
@@ -250,31 +250,31 @@
@Test
public void shouldFallbackToCallFetchWhenIOException() throws Exception {
- Event event = new TestEvent("refs/changes/01/1/meta");
+ RefUpdatedEvent event = new TestEvent("refs/changes/01/1/meta");
objectUnderTest.start();
when(revReader.read(any(), any(), anyString(), anyInt())).thenThrow(IOException.class);
objectUnderTest.onEvent(event);
- verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+ verifyFallbackToRestApiClientFetchAsync(event);
}
@Test
public void shouldFallbackToCallFetchWhenLargeRef() throws Exception {
- Event event = new TestEvent("refs/changes/01/1/1");
+ RefUpdatedEvent event = new TestEvent("refs/changes/01/1/1");
objectUnderTest.start();
when(revReader.read(any(), any(), anyString(), anyInt())).thenReturn(Optional.empty());
objectUnderTest.onEvent(event);
- verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+ verifyFallbackToRestApiClientFetchAsync(event);
}
@Test
public void shouldFallbackToCallFetchWhenParentObjectIsMissing() throws Exception {
- Event event = new TestEvent("refs/changes/01/1/1");
+ RefUpdatedEvent event = new TestEvent("refs/changes/01/1/1");
objectUnderTest.start();
when(httpResult.isSuccessful()).thenReturn(false);
@@ -284,7 +284,7 @@
objectUnderTest.onEvent(event);
- verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+ verifyFallbackToRestApiClientFetchAsync(event);
}
@Test
@@ -486,4 +486,14 @@
return projectName;
}
}
+
+ private void verifyFallbackToRestApiClientFetchAsync(RefUpdatedEvent event) throws IOException {
+ verify(fetchRestApiClient)
+ .callFetch(
+ eq(event.getProjectNameKey()),
+ eq(event.getRefName()),
+ any(URIish.class),
+ any(Long.class),
+ eq(FetchRestApiClient.FORCE_ASYNC));
+ }
}