Merge branch 'stable-3.4'
* stable-3.4:
Fix 'illegal format conversion' compilation error
Use stream events to fetch refs and create repositories
Use gerrit.instanceId instead of replication.instanceLabel
Extract FetchApiClient interface
Add user/password authentication for CGit client
Change-Id: I4d9daab055d364a9773325ac6e713766424ed749
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
index ed584b7..527b746 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
@@ -22,7 +22,7 @@
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -39,11 +39,11 @@
private final Source source;
private final String uri;
private final Project.NameKey project;
- private final FetchRestApiClient.Factory fetchClientFactory;
+ private final FetchApiClient.Factory fetchClientFactory;
@Inject
DeleteProjectTask(
- FetchRestApiClient.Factory fetchClientFactory,
+ FetchApiClient.Factory fetchClientFactory,
IdGenerator ig,
@Assisted Source source,
@Assisted String uri,
@@ -63,7 +63,7 @@
if (!httpResult.isSuccessful()) {
throw new IOException(httpResult.getMessage().orElse("Unknown"));
}
- logger.atFine().log("Successfully deleted project {} on remote {}", project.get(), uri);
+ logger.atFine().log("Successfully deleted project %s on remote %s", project.get(), uri);
} catch (URISyntaxException | IOException e) {
String errorMessage =
String.format("Cannot delete project %s on remote site %s.", project, uri);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 5ab744c..56eb483 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -64,7 +64,7 @@
*/
public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
private final ReplicationStateListener stateLog;
- static final String ALL_REFS = "..all..";
+ public static final String ALL_REFS = "..all..";
static final String ID_KEY = "fetchOneId";
interface Factory {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 29ec93a..ebc36b3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -43,16 +43,19 @@
import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiModule;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
+import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
@@ -82,7 +85,10 @@
.build(SourceHttpClient.Factory.class));
install(new FactoryModuleBuilder().build(Source.Factory.class));
- install(new FactoryModuleBuilder().build(FetchRestApiClient.Factory.class));
+ install(
+ new FactoryModuleBuilder()
+ .implement(FetchApiClient.class, FetchRestApiClient.class)
+ .build(FetchApiClient.Factory.class));
bind(FetchReplicationMetrics.class).in(Scopes.SINGLETON);
@@ -116,7 +122,8 @@
bind(ConfigParser.class).to(SourceConfigParser.class).in(Scopes.SINGLETON);
- if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+ Config replicationConfig = getReplicationConfig();
+ if (replicationConfig.getBoolean("gerrit", "autoReload", false)) {
bind(ReplicationConfig.class)
.annotatedWith(MainReplicationConfig.class)
.to(getReplicationConfigClass());
@@ -128,6 +135,10 @@
bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
}
+ if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
+ install(new StreamEventModule());
+ }
+
DynamicSet.setOf(binder(), ReplicationStateListener.class);
DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 7449263..cfe01e2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -31,7 +31,7 @@
import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
-import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter;
import java.io.IOException;
@@ -71,7 +71,7 @@
private volatile boolean running;
private volatile boolean replaying;
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
- private FetchRestApiClient.Factory fetchClientFactory;
+ private FetchApiClient.Factory fetchClientFactory;
private Integer fetchCallsTimeout;
private ExcludedRefsFilter refsFilter;
private RevisionReader revisionReader;
@@ -82,7 +82,7 @@
Provider<SourcesCollection> rd,
DynamicItem<EventDispatcher> dis,
ReplicationStateListeners sl,
- FetchRestApiClient.Factory fetchClientFactory,
+ FetchApiClient.Factory fetchClientFactory,
ExcludedRefsFilter refsFilter,
RevisionReader revReader) {
workQueue = wq;
@@ -273,7 +273,7 @@
for (String apiUrl : source.getApis()) {
try {
URIish uri = new URIish(apiUrl);
- FetchRestApiClient fetchClient = fetchClientFactory.create(source);
+ FetchApiClient fetchClient = fetchClientFactory.create(source);
HttpResult result = fetchClient.callSendObject(project, refName, isDelete, revision, uri);
if (isProjectMissing(result, project) && source.isCreateMissingRepositories()) {
@@ -309,7 +309,7 @@
for (String apiUrl : source.getApis()) {
try {
URIish uri = new URIish(apiUrl);
- FetchRestApiClient fetchClient = fetchClientFactory.create(source);
+ FetchApiClient fetchClient = fetchClientFactory.create(source);
HttpResult result = fetchClient.callFetch(project, refName, uri);
if (isProjectMissing(result, project) && source.isCreateMissingRepositories()) {
result = initProject(project, uri, fetchClient, result);
@@ -344,7 +344,7 @@
}
private HttpResult initProject(
- Project.NameKey project, URIish uri, FetchRestApiClient fetchClient, HttpResult result)
+ Project.NameKey project, URIish uri, FetchApiClient fetchClient, HttpResult result)
throws IOException, ClientProtocolException {
HttpResult initProjectResult = fetchClient.initProject(project, uri);
if (initProjectResult.isSuccessful()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 74a4456..10a8997 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -774,7 +774,7 @@
updateHeadFactory.create(this, apiURI, project, newHead), 0, TimeUnit.SECONDS);
} catch (URISyntaxException e) {
logger.atSevere().withCause(e).log(
- "Could not schedule HEAD pull-replication for project {}", project.get());
+ "Could not schedule HEAD pull-replication for project %s", project.get());
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
index 943ea92..e169eb3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
@@ -22,14 +22,14 @@
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
import java.io.IOException;
import org.eclipse.jgit.transport.URIish;
public class UpdateHeadTask implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final FetchRestApiClient.Factory fetchClientFactory;
+ private final FetchApiClient.Factory fetchClientFactory;
private final Source source;
private final URIish apiURI;
private final Project.NameKey project;
@@ -42,7 +42,7 @@
@Inject
UpdateHeadTask(
- FetchRestApiClient.Factory fetchClientFactory,
+ FetchApiClient.Factory fetchClientFactory,
IdGenerator ig,
@Assisted Source source,
@Assisted URIish apiURI,
@@ -65,9 +65,8 @@
throw new IOException(httpResult.getMessage().orElse("Unknown"));
}
logger.atFine().log(
- "Successfully updated HEAD of project {} on remote {}",
- project.get(),
- apiURI.toASCIIString());
+ "Successfully updated HEAD of project %s on remote %s",
+ project.get(), apiURI.toASCIIString());
} catch (IOException e) {
String errorMessage =
String.format(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 2767f1d..23b7018 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -111,7 +111,7 @@
return Response.accepted(url.get());
}
- private static class FetchJob implements Runnable {
+ public static class FetchJob implements Runnable {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
private FetchCommand command;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
index 01635e6..63d32c4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
@@ -103,13 +103,15 @@
"Cannot initialize project " + projectName);
}
- protected boolean initProject(String projectName)
- throws AuthException, PermissionBackendException {
- permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
-
+ public boolean initProject(String projectName) throws AuthException, PermissionBackendException {
+ // When triggered internally(for example by consuming stream events) user is not provided
+ // and internal user is returned. Project creation should be always allowed for internal user.
+ if (!userProvider.get().isInternalUser()) {
+ permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
+ }
Optional<URIish> maybeUri = gerritConfigOps.getGitRepositoryURI(projectName);
if (!maybeUri.isPresent()) {
- logger.atSevere().log("Cannot initialize project '{}'", projectName);
+ logger.atSevere().log("Cannot initialize project '%s'", projectName);
return false;
}
LocalFS localFS = new LocalFS(maybeUri.get());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
new file mode 100644
index 0000000..476a35b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import com.google.gerrit.entities.Project;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import java.io.IOException;
+import org.apache.http.client.ClientProtocolException;
+import org.eclipse.jgit.transport.URIish;
+
+public interface FetchApiClient {
+
+ public interface Factory {
+ FetchApiClient create(Source source);
+ }
+
+ HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
+ throws ClientProtocolException, IOException;
+
+ HttpResult initProject(Project.NameKey project, URIish uri) throws IOException;
+
+ HttpResult deleteProject(Project.NameKey project, URIish apiUri) throws IOException;
+
+ HttpResult updateHead(Project.NameKey project, String newHead, URIish apiUri) throws IOException;
+
+ HttpResult callSendObject(
+ Project.NameKey project,
+ String refName,
+ boolean isDelete,
+ RevisionData revisionData,
+ URIish targetUri)
+ throws ClientProtocolException, IOException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index ab3d3c5..f1d486d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -25,6 +25,7 @@
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.restapi.Url;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Inject;
@@ -56,21 +57,17 @@
import org.eclipse.jgit.transport.CredentialItem;
import org.eclipse.jgit.transport.URIish;
-public class FetchRestApiClient implements ResponseHandler<HttpResult> {
+public class FetchRestApiClient implements FetchApiClient, ResponseHandler<HttpResult> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
private static final Gson GSON =
new GsonBuilder().setFieldNamingPolicy(LOWER_CASE_WITH_UNDERSCORES).create();
- public interface Factory {
- FetchRestApiClient create(Source source);
- }
-
private final CredentialsFactory credentials;
private final SourceHttpClient.Factory httpClientFactory;
private final Source source;
- private final String instanceLabel;
+ private final String instanceId;
private final String pluginName;
private final SyncRefsFilter syncRefsFilter;
@@ -81,20 +78,28 @@
ReplicationConfig replicationConfig,
SyncRefsFilter syncRefsFilter,
@PluginName String pluginName,
+ @Nullable @GerritInstanceId String instanceId,
@Assisted Source source) {
this.credentials = credentials;
this.httpClientFactory = httpClientFactory;
this.source = source;
this.pluginName = pluginName;
this.syncRefsFilter = syncRefsFilter;
- this.instanceLabel =
- Strings.nullToEmpty(
+ this.instanceId =
+ Optional.ofNullable(
replicationConfig.getConfig().getString("replication", null, "instanceLabel"))
+ .orElse(instanceId)
.trim();
+
requireNonNull(
- Strings.emptyToNull(instanceLabel), "replication.instanceLabel cannot be null or empty");
+ Strings.emptyToNull(this.instanceId),
+ "gerrit.instanceId or replication.instanceLabel must be set");
}
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#callFetch(com.google.gerrit.entities.Project.NameKey, java.lang.String, org.eclipse.jgit.transport.URIish)
+ */
+ @Override
public HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
throws ClientProtocolException, IOException {
String url =
@@ -107,12 +112,16 @@
new StringEntity(
String.format(
"{\"label\":\"%s\", \"ref_name\": \"%s\", \"async\":%s}",
- instanceLabel, refName, callAsync),
+ instanceId, refName, callAsync),
StandardCharsets.UTF_8));
post.addHeader(new BasicHeader("Content-Type", "application/json"));
return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
}
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#initProject(com.google.gerrit.entities.Project.NameKey, org.eclipse.jgit.transport.URIish)
+ */
+ @Override
public HttpResult initProject(Project.NameKey project, URIish uri) throws IOException {
String url =
String.format(
@@ -123,6 +132,10 @@
return httpClientFactory.create(source).execute(put, this, getContext(uri));
}
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#deleteProject(com.google.gerrit.entities.Project.NameKey, org.eclipse.jgit.transport.URIish)
+ */
+ @Override
public HttpResult deleteProject(Project.NameKey project, URIish apiUri) throws IOException {
String url =
String.format("%s/%s", apiUri.toASCIIString(), getProjectDeletionUrl(project.get()));
@@ -130,6 +143,10 @@
return httpClientFactory.create(source).execute(delete, this, getContext(apiUri));
}
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#updateHead(com.google.gerrit.entities.Project.NameKey, java.lang.String, org.eclipse.jgit.transport.URIish)
+ */
+ @Override
public HttpResult updateHead(Project.NameKey project, String newHead, URIish apiUri)
throws IOException {
logger.atFine().log("Updating head of %s on %s", project.get(), newHead);
@@ -142,6 +159,10 @@
return httpClientFactory.create(source).execute(req, this, getContext(apiUri));
}
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#callSendObject(com.google.gerrit.entities.Project.NameKey, java.lang.String, boolean, com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData, org.eclipse.jgit.transport.URIish)
+ */
+ @Override
public HttpResult callSendObject(
Project.NameKey project,
String refName,
@@ -156,7 +177,7 @@
} else {
requireNull(revisionData, "DELETE ref-updates cannot be associated with a RevisionData");
}
- RevisionInput input = new RevisionInput(instanceLabel, refName, revisionData);
+ RevisionInput input = new RevisionInput(instanceId, refName, revisionData);
String url =
String.format(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
new file mode 100644
index 0000000..4360fcf
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -0,0 +1,106 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchCommand;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import org.eclipse.jgit.lib.ObjectId;
+
+public class StreamEventListener implements EventListener {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private String instanceId;
+ private FetchCommand fetchCommand;
+ private WorkQueue workQueue;
+ private ProjectInitializationAction projectInitializationAction;
+
+ @Inject
+ public StreamEventListener(
+ @Nullable @GerritInstanceId String instanceId,
+ FetchCommand command,
+ ProjectInitializationAction projectInitializationAction,
+ WorkQueue workQueue) {
+ this.instanceId = instanceId;
+ this.fetchCommand = command;
+ this.projectInitializationAction = projectInitializationAction;
+ this.workQueue = workQueue;
+
+ requireNonNull(
+ Strings.emptyToNull(this.instanceId), "gerrit.instanceId cannot be null or empty");
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ if (!instanceId.equals(event.instanceId)) {
+ if (event instanceof RefUpdatedEvent) {
+ RefUpdatedEvent refUpdatedEvent = (RefUpdatedEvent) event;
+ if (!isProjectDelete(refUpdatedEvent)) {
+ fetchRefsAsync(
+ refUpdatedEvent.getRefName(),
+ refUpdatedEvent.instanceId,
+ refUpdatedEvent.getProjectNameKey());
+ }
+ }
+ if (event instanceof ProjectCreatedEvent) {
+ ProjectCreatedEvent projectCreatedEvent = (ProjectCreatedEvent) event;
+ try {
+ projectInitializationAction.initProject(getProjectRepositoryName(projectCreatedEvent));
+ fetchRefsAsync(
+ FetchOne.ALL_REFS,
+ projectCreatedEvent.instanceId,
+ projectCreatedEvent.getProjectNameKey());
+ } catch (AuthException | PermissionBackendException e) {
+ logger.atSevere().withCause(e).log(
+ "Cannot initialise project:%s", projectCreatedEvent.projectName);
+ }
+ }
+ }
+ }
+
+ private boolean isProjectDelete(RefUpdatedEvent event) {
+ return RefNames.isConfigRef(event.getRefName())
+ && ObjectId.zeroId().equals(ObjectId.fromString(event.refUpdate.get().newRev));
+ }
+
+ protected void fetchRefsAsync(String refName, String sourceInstanceId, NameKey projectNameKey) {
+ FetchAction.Input input = new FetchAction.Input();
+ input.refName = refName;
+ input.label = sourceInstanceId;
+ workQueue.getDefaultQueue().submit(new FetchJob(fetchCommand, projectNameKey, input));
+ }
+
+ private String getProjectRepositoryName(ProjectCreatedEvent projectCreatedEvent) {
+ return String.format("%s.git", projectCreatedEvent.projectName);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
new file mode 100644
index 0000000..2389678
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
+import com.google.inject.AbstractModule;
+
+public class StreamEventModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
index 9f055c8..c404e30 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
import java.io.BufferedReader;
import java.io.File;
@@ -30,6 +31,8 @@
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.CredentialItem;
+import org.eclipse.jgit.transport.CredentialsProvider;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.URIish;
@@ -40,17 +43,20 @@
private int timeout;
@Inject
- public CGitFetch(SourceConfiguration config, @Assisted URIish uri, @Assisted Repository git) {
-
+ public CGitFetch(
+ SourceConfiguration config,
+ CredentialsFactory cpFactory,
+ @Assisted URIish uri,
+ @Assisted Repository git) {
this.localProjectDirectory = git.getDirectory();
- this.uri = uri;
+ this.uri = appendCredentials(uri, cpFactory.create(config.getRemoteConfig().getName()));
this.timeout = config.getRemoteConfig().getTimeout();
}
@Override
public List<RefUpdateState> fetch(List<RefSpec> refsSpec) throws IOException {
List<String> refs = refsSpec.stream().map(s -> s.toString()).collect(Collectors.toList());
- List<String> command = Lists.newArrayList("git", "fetch", uri.toASCIIString());
+ List<String> command = Lists.newArrayList("git", "fetch", uri.toPrivateASCIIString());
command.addAll(refs);
ProcessBuilder pb = new ProcessBuilder().command(command).directory(localProjectDirectory);
repLog.info("Fetch references {} from {}", refs, uri);
@@ -83,6 +89,19 @@
}
}
+ protected URIish appendCredentials(URIish uri, CredentialsProvider credentialsProvider) {
+ CredentialItem.Username user = new CredentialItem.Username();
+ CredentialItem.Password pass = new CredentialItem.Password();
+ if (credentialsProvider.supports(user, pass)
+ && credentialsProvider.get(uri, user, pass)
+ && uri.getScheme() != null
+ && !"ssh".equalsIgnoreCase(uri.getScheme())) {
+ return uri.setUser(user.getValue()).setPass(String.valueOf(pass.getValue()));
+ }
+
+ return uri;
+ }
+
public boolean waitForTaskToFinish(Process process) throws InterruptedException {
if (timeout == 0) {
process.waitFor();
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b2470a4..f29e572 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -31,8 +31,13 @@
threads = 3
authGroup = Public Mirror Group
authGroup = Second Public Mirror Group
- [replication]
- instanceLabel = host-one
+```
+
+And make sure that instanceId is setup in `$site_path/etc/gerrit.config`:
+
+```
+[gerrit]
+ instanceId = host-one
```
Then reload the replication plugin to pick up the new configuration:
@@ -121,6 +126,21 @@
provided in the remote configuration section which name is equal
to instanceLabel.
+ Deprecated: This property is kept for backward compatibility and
+ will be removed in the future release. Use [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit.instanceId)
+ instead.
+
+replication.consumeStreamEvents
+: Use stream events to trigger pull-replication actions alongside the
+ REST approach. This mechanism is useful together with event-broker
+ and multi-site to provide backfill mechanism when a node has to
+ catch up with the events after being unreachable.
+
+ NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId
+ instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used.
+
+ Default: false
+
replication.maxConnectionsPerRoute
: Maximum number of HTTP connections per one HTTP route.
@@ -441,7 +461,6 @@
autoReload = true
replicateOnStartup = false
[replication]
- instanceLabel = host-one
lockErrorMaxRetries = 5
maxRetries = 5
```
@@ -477,7 +496,6 @@
autoReload = true
replicateOnStartup = false
[replication]
- instanceLabel = host-one
lockErrorMaxRetries = 5
maxRetries = 5
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
index a02f43f..406cd8c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
@@ -34,7 +34,12 @@
import com.google.gerrit.extensions.config.FactoryModule;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
+import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.googlesource.gerrit.plugins.replication.AutoReloadSecureCredentialsFactoryDecorator;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
import com.googlesource.gerrit.plugins.replication.pull.fetch.BatchFetchClient;
import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetch;
import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
@@ -270,6 +275,10 @@
try {
RemoteConfig remoteConfig = new RemoteConfig(cf, "test_config");
SourceConfiguration sourceConfig = new SourceConfiguration(remoteConfig, cf);
+ bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+ bind(CredentialsFactory.class)
+ .to(AutoReloadSecureCredentialsFactoryDecorator.class)
+ .in(Scopes.SINGLETON);
bind(SourceConfiguration.class).toInstance(sourceConfig);
install(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
index cb5af42..ee5876f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
@@ -23,6 +23,7 @@
import com.google.gerrit.acceptance.SkipProjectClone;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.api.projects.BranchInput;
@@ -95,6 +96,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateNewChangeRef() throws Exception {
testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
@@ -122,6 +124,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateNewChangeRefAfterConfigReloaded() throws Exception {
testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
@@ -157,6 +160,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateNewBranch() throws Exception {
String testProjectName = project + TEST_REPLICATION_SUFFIX;
createTestProject(testProjectName);
@@ -190,6 +194,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldAutoReloadConfiguration() throws Exception {
SourcesCollection sources = getInstance(SourcesCollection.class);
AutoReloadConfigDecorator autoReloadConfigDecorator =
@@ -202,6 +207,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldAutoReloadConfigurationWhenRemoteConfigAdded() throws Exception {
FileBasedConfig newRemoteConfig =
new FileBasedConfig(
@@ -224,6 +230,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldAutoReloadConfigurationWhenRemoteConfigDeleted() throws Exception {
SourcesCollection sources = getInstance(SourcesCollection.class);
AutoReloadConfigDecorator autoReloadConfigDecorator =
@@ -259,7 +266,6 @@
}
private void setReplicationSource(String remoteName) throws IOException {
- config.setString("replication", null, "instanceLabel", remoteName);
config.setBoolean("gerrit", null, "autoReload", true);
config.save();
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index cf94086..13b3460 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -27,6 +27,7 @@
import com.google.gerrit.acceptance.SkipProjectClone;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
import com.google.gerrit.entities.Permission;
import com.google.gerrit.entities.Project;
@@ -115,6 +116,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateNewChangeRef() throws Exception {
testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
@@ -142,6 +144,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateNewBranch() throws Exception {
String testProjectName = project + TEST_REPLICATION_SUFFIX;
createTestProject(testProjectName);
@@ -176,6 +179,7 @@
@Test
@UseLocalDisk
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateForceUpdatedBranch() throws Exception {
boolean forcedPush = true;
String testProjectName = project + TEST_REPLICATION_SUFFIX;
@@ -245,6 +249,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateNewChangeRefCGitClient() throws Exception {
AutoReloadConfigDecorator autoReloadConfigDecorator =
getInstance(AutoReloadConfigDecorator.class);
@@ -280,6 +285,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateNewBranchCGitClient() throws Exception {
AutoReloadConfigDecorator autoReloadConfigDecorator =
getInstance(AutoReloadConfigDecorator.class);
@@ -321,6 +327,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateProjectDeletion() throws Exception {
String projectToDelete = project.get();
setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(projectToDelete));
@@ -349,6 +356,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
public void shouldReplicateHeadUpdate() throws Exception {
String testProjectName = project.get();
setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(testProjectName));
@@ -407,7 +415,6 @@
replicaSuffixes.stream()
.map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
.collect(toList());
- config.setString("replication", null, "instanceLabel", remoteName);
config.setStringList("remote", remoteName, "url", replicaUrls);
config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 82dc2b8..4e831bc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -43,6 +43,7 @@
import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter;
@@ -73,7 +74,7 @@
@Mock private DynamicItem<EventDispatcher> dis;
@Mock ReplicationStateListeners sl;
@Mock FetchRestApiClient fetchRestApiClient;
- @Mock FetchRestApiClient.Factory fetchClientFactory;
+ @Mock FetchApiClient.Factory fetchClientFactory;
@Mock AccountInfo accountInfo;
@Mock RevisionReader revReader;
@Mock RevisionData revisionData;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
index cef1051..e6f788a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
@@ -199,7 +199,6 @@
replicaSuffixes.stream()
.map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
.collect(toList());
- config.setString("replication", null, "instanceLabel", remoteName);
config.setStringList("remote", remoteName, "url", replicaUrls);
config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
config.setString("remote", remoteName, "fetch", "+refs/tags/*:refs/tags/*");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index 394a4a4..f0b8b99 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -79,7 +79,7 @@
String api = "http://gerrit-host";
String pluginName = "pull-replication";
- String label = "Replication";
+ String instanceId = "Replication";
String refName = RefNames.REFS_HEADS + "master";
String expectedPayload =
@@ -135,7 +135,7 @@
+ " ]\n"
+ "}";
- FetchRestApiClient objectUnderTest;
+ FetchApiClient objectUnderTest;
@Before
public void setup() throws ClientProtocolException, IOException {
@@ -159,7 +159,6 @@
when(replicationConfig.getConfig()).thenReturn(config);
when(config.getStringList("replication", null, "syncRefs")).thenReturn(new String[0]);
when(source.getRemoteConfigName()).thenReturn("Replication");
- when(config.getString("replication", null, "instanceLabel")).thenReturn(label);
HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
when(httpClient.execute(any(HttpPost.class), any(), any())).thenReturn(httpResult);
@@ -167,7 +166,13 @@
syncRefsFilter = new SyncRefsFilter(replicationConfig);
objectUnderTest =
new FetchRestApiClient(
- credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+ credentials,
+ httpClientFactory,
+ replicationConfig,
+ syncRefsFilter,
+ pluginName,
+ instanceId,
+ source);
}
@Test
@@ -191,7 +196,13 @@
syncRefsFilter = new SyncRefsFilter(replicationConfig);
objectUnderTest =
new FetchRestApiClient(
- credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+ credentials,
+ httpClientFactory,
+ replicationConfig,
+ syncRefsFilter,
+ pluginName,
+ instanceId,
+ source);
objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
@@ -210,7 +221,13 @@
syncRefsFilter = new SyncRefsFilter(replicationConfig);
objectUnderTest =
new FetchRestApiClient(
- credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+ credentials,
+ httpClientFactory,
+ replicationConfig,
+ syncRefsFilter,
+ pluginName,
+ instanceId,
+ source);
objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
@@ -232,7 +249,13 @@
syncRefsFilter = new SyncRefsFilter(replicationConfig);
objectUnderTest =
new FetchRestApiClient(
- credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+ credentials,
+ httpClientFactory,
+ replicationConfig,
+ syncRefsFilter,
+ pluginName,
+ instanceId,
+ source);
objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
@@ -321,7 +344,6 @@
@Test
public void shouldThrowExceptionWhenInstanceLabelIsNull() {
- when(config.getString("replication", null, "instanceLabel")).thenReturn(null);
assertThrows(
NullPointerException.class,
() ->
@@ -331,12 +353,12 @@
replicationConfig,
syncRefsFilter,
pluginName,
+ null,
source));
}
@Test
public void shouldTrimInstanceLabel() {
- when(config.getString("replication", null, "instanceLabel")).thenReturn(" ");
assertThrows(
NullPointerException.class,
() ->
@@ -346,12 +368,12 @@
replicationConfig,
syncRefsFilter,
pluginName,
+ " ",
source));
}
@Test
public void shouldThrowExceptionWhenInstanceLabelIsEmpty() {
- when(config.getString("replication", null, "instanceLabel")).thenReturn("");
assertThrows(
NullPointerException.class,
() ->
@@ -361,10 +383,32 @@
replicationConfig,
syncRefsFilter,
pluginName,
+ "",
source));
}
@Test
+ public void shouldUseReplicationLabelWhenProvided()
+ throws ClientProtocolException, IOException, URISyntaxException {
+ when(config.getString("replication", null, "instanceLabel")).thenReturn(instanceId);
+ FetchRestApiClient objectUnderTest =
+ new FetchRestApiClient(
+ credentials,
+ httpClientFactory,
+ replicationConfig,
+ syncRefsFilter,
+ pluginName,
+ "",
+ source);
+ objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
+
+ verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+
+ HttpPost httpPost = httpPostCaptor.getValue();
+ assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+ }
+
+ @Test
public void shouldCallInitProjectEndpoint() throws IOException, URISyntaxException {
objectUnderTest.initProject(Project.nameKey("test_repo"), new URIish(api));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
new file mode 100644
index 0000000..8f80883
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -0,0 +1,126 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchCommand;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import java.util.concurrent.ScheduledExecutorService;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StreamEventListenerTest {
+
+ private static final String TEST_PROJECT = "test-project";
+ private static final String INSTANCE_ID = "node_instance_id";
+ private static final String REMOTE_INSTANCE_ID = "remote_node_instance_id";
+
+ @Mock private FetchCommand fetchCommand;
+ @Mock private ProjectInitializationAction projectInitializationAction;
+ @Mock private WorkQueue workQueue;
+ @Mock private ScheduledExecutorService executor;
+
+ private StreamEventListener objectUnderTest;
+
+ @Before
+ public void setup() {
+ when(workQueue.getDefaultQueue()).thenReturn(executor);
+ objectUnderTest =
+ new StreamEventListener(INSTANCE_ID, fetchCommand, projectInitializationAction, workQueue);
+ }
+
+ @Test
+ public void shouldSkipEventsGeneratedByTheSameInstance() {
+ Event event = new RefUpdatedEvent();
+ event.instanceId = INSTANCE_ID;
+ objectUnderTest.onEvent(event);
+
+ verify(executor, never()).submit(any(Runnable.class));
+ }
+
+ @Test
+ public void shouldSkipFetchForProjectDeleteEvent() {
+ RefUpdatedEvent event = new RefUpdatedEvent();
+ RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+ refUpdate.refName = RefNames.REFS_CONFIG;
+ refUpdate.newRev = ObjectId.zeroId().getName();
+ refUpdate.project = TEST_PROJECT;
+
+ event.instanceId = REMOTE_INSTANCE_ID;
+ event.refUpdate = () -> refUpdate;
+
+ objectUnderTest.onEvent(event);
+
+ verify(executor, never()).submit(any(Runnable.class));
+ }
+
+ @Test
+ public void shouldScheduleFetchJobForRefUpdateEvent() {
+ RefUpdatedEvent event = new RefUpdatedEvent();
+ RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+ refUpdate.refName = "refs/changes/01/1/1";
+ refUpdate.project = TEST_PROJECT;
+
+ event.instanceId = REMOTE_INSTANCE_ID;
+ event.refUpdate = () -> refUpdate;
+
+ objectUnderTest.onEvent(event);
+
+ verify(executor).submit(any(FetchJob.class));
+ }
+
+ @Test
+ public void shouldCreateProjectForProjectCreatedEvent()
+ throws AuthException, PermissionBackendException {
+ ProjectCreatedEvent event = new ProjectCreatedEvent();
+ event.instanceId = REMOTE_INSTANCE_ID;
+ event.projectName = TEST_PROJECT;
+
+ objectUnderTest.onEvent(event);
+
+ verify(projectInitializationAction, times(1))
+ .initProject(String.format("%s.git", TEST_PROJECT));
+ }
+
+ @Test
+ public void shouldScheduleAllRefsFetchForProjectCreatedEvent() {
+ ProjectCreatedEvent event = new ProjectCreatedEvent();
+ event.instanceId = REMOTE_INSTANCE_ID;
+ event.projectName = TEST_PROJECT;
+
+ objectUnderTest.onEvent(event);
+
+ verify(executor).submit(any(FetchJob.class));
+ }
+}