Merge "Make sure that the EventListener receives replication events" into stable-3.4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
index 89a9067..dfcba94 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
@@ -18,7 +18,11 @@
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -26,6 +30,7 @@
import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
import java.io.IOException;
import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.transport.URIish;
public class DeleteProjectTask implements Runnable, Completable {
@@ -40,17 +45,20 @@
private final String uri;
private final Project.NameKey project;
private final FetchApiClient.Factory fetchClientFactory;
+ private final DynamicItem<EventDispatcher> eventDispatcher;
private boolean succeeded;
@Inject
DeleteProjectTask(
FetchApiClient.Factory fetchClientFactory,
IdGenerator ig,
+ DynamicItem<EventDispatcher> eventDispatcher,
@Assisted Source source,
@Assisted String uri,
@Assisted Project.NameKey project) {
this.fetchClientFactory = fetchClientFactory;
this.id = ig.next();
+ this.eventDispatcher = eventDispatcher;
this.uri = uri;
this.source = source;
this.project = project;
@@ -71,6 +79,30 @@
String.format("Cannot delete project %s on remote site %s.", project, uri);
logger.atWarning().withCause(e).log(errorMessage);
repLog.warn(errorMessage);
+ } finally {
+ fireEvent();
+ }
+ }
+
+ private void fireEvent() {
+ try {
+ Context.setLocalEvent(true);
+ eventDispatcher
+ .get()
+ .postEvent(
+ new FetchRefReplicatedEvent(
+ project.get(),
+ RefNames.REFS_CONFIG,
+ source.getRemoteConfigName(),
+ succeeded
+ ? ReplicationState.RefFetchResult.SUCCEEDED
+ : ReplicationState.RefFetchResult.FAILED,
+ RefUpdate.Result.FORCED));
+ } catch (PermissionBackendException e) {
+ logger.atSevere().withCause(e).log(
+ "Cannot post event for refs/meta/config on project %s", project.get());
+ } finally {
+ Context.unsetLocalEvent();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 52f5422..3cd1271 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -357,7 +357,7 @@
} catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
// Tried to replicate to a remote via anonymous git:// but the repository
- // does not exist. In this case NoRemoteRepositoryException is not
+ // does not exist. In this case NoRemoteRepositoryException is not
// raised.
String msg = e.getMessage();
repLog.error(
@@ -439,6 +439,9 @@
}
runImpl();
+ } catch (IOException e) {
+ notifyRefReplicatedIOException();
+ throw e;
}
}
@@ -559,6 +562,19 @@
}
}
+ private void notifyRefReplicatedIOException() {
+ for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
+ entry
+ .getValue()
+ .notifyRefReplicated(
+ projectName.get(),
+ entry.getKey(),
+ uri,
+ ReplicationState.RefFetchResult.FAILED,
+ RefUpdate.Result.IO_FAILURE);
+ }
+ }
+
public static class LockFailureException extends TransportException {
private static final long serialVersionUID = 1L;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java
index 3f6673b..f4b6a07 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java
@@ -15,8 +15,11 @@
package com.googlesource.gerrit.plugins.replication.pull.api;
import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
import com.google.gerrit.extensions.api.projects.HeadInput;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.BadRequestException;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
@@ -24,28 +27,43 @@
import com.google.gerrit.extensions.restapi.Response;
import com.google.gerrit.extensions.restapi.RestModifyView;
import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.ProjectResource;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.replication.LocalFS;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
import com.googlesource.gerrit.plugins.replication.pull.GerritConfigOps;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import java.net.HttpURLConnection;
import java.util.Optional;
+import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.transport.URIish;
@Singleton
public class UpdateHeadAction implements RestModifyView<ProjectResource, HeadInput> {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final GerritConfigOps gerritConfigOps;
private final FetchPreconditions preconditions;
+ private final DynamicItem<EventDispatcher> eventDispatcher;
@Inject
- UpdateHeadAction(GerritConfigOps gerritConfigOps, FetchPreconditions preconditions) {
+ UpdateHeadAction(
+ GerritConfigOps gerritConfigOps,
+ FetchPreconditions preconditions,
+ DynamicItem<EventDispatcher> eventDispatcher) {
this.gerritConfigOps = gerritConfigOps;
this.preconditions = preconditions;
+ this.eventDispatcher = eventDispatcher;
}
@Override
public Response<?> apply(ProjectResource projectResource, HeadInput input)
throws AuthException, BadRequestException, ResourceConflictException, Exception {
+ Response<String> res = null;
+
if (input == null || Strings.isNullOrEmpty(input.ref)) {
throw new BadRequestException("ref required");
}
@@ -61,15 +79,44 @@
Optional<URIish> maybeRepo =
gerritConfigOps.getGitRepositoryURI(String.format("%s.git", projectResource.getName()));
- if (maybeRepo.isPresent()) {
- if (new LocalFS(maybeRepo.get()).updateHead(projectResource.getNameKey(), ref)) {
- return Response.ok(ref);
+ try {
+ if (maybeRepo.isPresent()) {
+ if (new LocalFS(maybeRepo.get()).updateHead(projectResource.getNameKey(), ref)) {
+ return res = Response.ok(ref);
+ }
+ throw new UnprocessableEntityException(
+ String.format(
+ "Could not update HEAD of repo %s to ref %s", projectResource.getName(), ref));
}
- throw new UnprocessableEntityException(
- String.format(
- "Could not update HEAD of repo %s to ref %s", projectResource.getName(), ref));
+ } finally {
+ fireEvent(
+ projectResource.getNameKey(),
+ res != null && res.statusCode() == HttpURLConnection.HTTP_OK);
}
throw new ResourceNotFoundException(
String.format("Could not compute URL for repo: %s", projectResource.getName()));
}
+
+ private void fireEvent(Project.NameKey projectName, boolean succeeded) {
+ try {
+ Context.setLocalEvent(true);
+ eventDispatcher
+ .get()
+ .postEvent(
+ new FetchRefReplicatedEvent(
+ projectName.get(),
+ RefNames.HEAD,
+ "", // TODO: the remote label is not passed as parameter, hence cannot be
+ // propagated to the event
+ succeeded
+ ? ReplicationState.RefFetchResult.SUCCEEDED
+ : ReplicationState.RefFetchResult.FAILED,
+ RefUpdate.Result.FORCED));
+ } catch (PermissionBackendException e) {
+ logger.atSevere().withCause(e).log(
+ "Cannot post event for refs/meta/config on project %s", projectName);
+ } finally {
+ Context.unsetLocalEvent();
+ }
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index 7d8a164..da7be9f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -31,15 +31,22 @@
import com.google.gerrit.extensions.api.projects.BranchInput;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.extensions.restapi.RestApiException;
import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
import org.eclipse.jgit.junit.TestRepository;
import org.eclipse.jgit.lib.ObjectId;
@@ -54,12 +61,62 @@
/** Base class to run regular and async acceptance tests */
public abstract class PullReplicationITAbstract extends PullReplicationSetupBase {
+ private BufferedEventListener eventListener;
public static class PullReplicationTestModule extends PullReplicationModule {
@Inject
public PullReplicationTestModule(SitePaths site, InMemoryMetricMaker memMetric) {
super(site, memMetric);
}
+
+ @Override
+ protected void configure() {
+ super.configure();
+
+ DynamicSet.bind(binder(), EventListener.class)
+ .to(BufferedEventListener.class)
+ .asEagerSingleton();
+ }
+ }
+
+ @Singleton
+ public static class BufferedEventListener implements EventListener {
+
+ private final List<Event> eventsReceived;
+ private String eventTypeFilter;
+
+ @Inject
+ public BufferedEventListener() {
+ eventsReceived = new ArrayList<>();
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ if (event.getType().equals(eventTypeFilter)) {
+ eventsReceived.add(event);
+ }
+ }
+
+ public void clearFilter(String expectedEventType) {
+ eventsReceived.clear();
+ eventTypeFilter = expectedEventType;
+ }
+
+ public int numEventsReceived() {
+ return eventsReceived.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Event> Stream<T> eventsStream() {
+ return (Stream<T>) eventsReceived.stream();
+ }
+ }
+
+ @Override
+ protected void setUpTestPlugin(boolean loadExisting) throws Exception {
+ super.setUpTestPlugin(loadExisting);
+
+ eventListener = plugin.getSysInjector().getInstance(BufferedEventListener.class);
}
@Override
@@ -100,6 +157,8 @@
ObjectId.zeroId().getName(),
sourceCommit.getId().getName(),
TEST_REPLICATION_REMOTE);
+
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(event);
waitUntilReplicationCompleted(1);
@@ -110,6 +169,9 @@
assertThat(targetBranchRef).isNotNull();
assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
}
+
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
}
private void assertTasksMetricScheduledAndCompleted(int numTasks) {
@@ -141,6 +203,7 @@
ObjectId.zeroId().getName(),
branchRevision,
TEST_REPLICATION_REMOTE);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(event);
waitUntilReplicationCompleted(1);
@@ -153,7 +216,8 @@
assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
}
- assertTasksMetricScheduledAndCompleted(1);
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
}
@Test
@@ -171,6 +235,7 @@
ObjectId.zeroId().getName(),
branchRevision,
TEST_REPLICATION_REMOTE);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(event);
waitUntilReplicationFailed(1);
@@ -180,6 +245,10 @@
Ref targetBranchRef = getRef(repo, newBranch);
assertThat(targetBranchRef).isNull();
}
+
+ assertThatEventListenerHasReceivedNumEvents(2);
+ assertThatRefReplicatedEventsContainsExactlyStatuses(
+ ReplicationState.RefFetchResult.FAILED, ReplicationState.RefFetchResult.FAILED);
}
@Test
@@ -213,6 +282,7 @@
ObjectId.zeroId().getName(),
branchRevision,
TEST_REPLICATION_REMOTE);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(event);
waitUntilReplicationCompleted(1);
@@ -223,6 +293,8 @@
assertThat(targetBranchRef).isNotNull();
assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
}
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
TestRepository<InMemoryRepository> testProject = cloneProject(testProjectNameKey);
fetch(testProject, RefNames.REFS_HEADS + "*:" + RefNames.REFS_HEADS + "*");
@@ -240,6 +312,7 @@
branchRevision,
amendedCommit.getId().getName(),
TEST_REPLICATION_REMOTE);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(forcedPushEvent);
waitUntilReplicationCompleted(2);
@@ -255,6 +328,8 @@
}
assertTasksMetricScheduledAndCompleted(2);
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
}
@Test
@@ -282,6 +357,7 @@
ObjectId.zeroId().getName(),
sourceCommit.getId().getName(),
TEST_REPLICATION_REMOTE);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(event);
waitUntilReplicationCompleted(1);
@@ -294,6 +370,8 @@
}
assertTasksMetricScheduledAndCompleted(1);
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
}
@Test
@@ -326,6 +404,7 @@
ObjectId.zeroId().getName(),
branchRevision,
TEST_REPLICATION_REMOTE);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(event);
waitUntilReplicationCompleted(1);
@@ -339,6 +418,8 @@
}
assertTasksMetricScheduledAndCompleted(1);
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
}
@Test
@@ -363,6 +444,7 @@
return NotifyHandling.NONE;
}
};
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
for (ProjectDeletedListener l : deletedListeners) {
l.onProjectDeleted(event);
}
@@ -371,6 +453,8 @@
waitUntil(() -> !repoManager.list().contains(project));
assertTasksMetricScheduledAndCompleted(1);
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
}
@Test
@@ -394,6 +478,7 @@
plugin.getSysInjector().getInstance(ReplicationQueue.class);
HeadUpdatedListener.Event event = new FakeHeadUpdateEvent(master, newBranch, testProjectName);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onHeadUpdated(event);
waitUntilReplicationCompleted(1);
@@ -407,6 +492,8 @@
});
assertTasksMetricScheduledAndCompleted(1);
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
}
@Ignore
@@ -427,6 +514,7 @@
ObjectId.zeroId().getName(),
sourceCommit.getId().getName(),
TEST_REPLICATION_REMOTE);
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
pullReplicationQueue.onEvent(event);
waitUntilReplicationCompleted(1);
@@ -437,6 +525,29 @@
assertThat(targetBranchRef).isNotNull();
assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
}
+
+ assertThatEventListenerHasReceivedNumEvents(1);
+ assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
+ }
+
+ private void assertThatEventListenerHasReceivedNumEvents(int numExpectedEvents) {
+ assertThat(eventListener.numEventsReceived()).isEqualTo(numExpectedEvents);
+ }
+
+ private void assertThatRefReplicatedEventsContainsStatus(
+ ReplicationState.RefFetchResult refFetchResult) {
+ Stream<FetchRefReplicatedEvent> replicatedStream = eventListener.eventsStream();
+ assertThat(replicatedStream.map(FetchRefReplicatedEvent::getStatus))
+ .contains(refFetchResult.toString());
+ }
+
+ private void assertThatRefReplicatedEventsContainsExactlyStatuses(
+ ReplicationState.RefFetchResult... refFetchResult) {
+ List<String> expectedStatuses =
+ Stream.of(refFetchResult).map(Object::toString).collect(Collectors.toList());
+ Stream<FetchRefReplicatedEvent> replicatedStream = eventListener.eventsStream();
+ assertThat(replicatedStream.map(FetchRefReplicatedEvent::getStatus))
+ .containsExactlyElementsIn(expectedStatuses);
}
private void waitUntilReplicationCompleted(int expected) throws Exception {