Merge "Make sure that the EventListener receives replication events" into stable-3.4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
index 89a9067..dfcba94 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
@@ -18,7 +18,11 @@
 
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
@@ -26,6 +30,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.transport.URIish;
 
 public class DeleteProjectTask implements Runnable, Completable {
@@ -40,17 +45,20 @@
   private final String uri;
   private final Project.NameKey project;
   private final FetchApiClient.Factory fetchClientFactory;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
   private boolean succeeded;
 
   @Inject
   DeleteProjectTask(
       FetchApiClient.Factory fetchClientFactory,
       IdGenerator ig,
+      DynamicItem<EventDispatcher> eventDispatcher,
       @Assisted Source source,
       @Assisted String uri,
       @Assisted Project.NameKey project) {
     this.fetchClientFactory = fetchClientFactory;
     this.id = ig.next();
+    this.eventDispatcher = eventDispatcher;
     this.uri = uri;
     this.source = source;
     this.project = project;
@@ -71,6 +79,30 @@
           String.format("Cannot delete project %s on remote site %s.", project, uri);
       logger.atWarning().withCause(e).log(errorMessage);
       repLog.warn(errorMessage);
+    } finally {
+      fireEvent();
+    }
+  }
+
+  private void fireEvent() {
+    try {
+      Context.setLocalEvent(true);
+      eventDispatcher
+          .get()
+          .postEvent(
+              new FetchRefReplicatedEvent(
+                  project.get(),
+                  RefNames.REFS_CONFIG,
+                  source.getRemoteConfigName(),
+                  succeeded
+                      ? ReplicationState.RefFetchResult.SUCCEEDED
+                      : ReplicationState.RefFetchResult.FAILED,
+                  RefUpdate.Result.FORCED));
+    } catch (PermissionBackendException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot post event for refs/meta/config on project %s", project.get());
+    } finally {
+      Context.unsetLocalEvent();
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 52f5422..3cd1271 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -357,7 +357,7 @@
 
     } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
       // Tried to replicate to a remote via anonymous git:// but the repository
-      // does not exist.  In this case NoRemoteRepositoryException is not
+      // does not exist. In this case NoRemoteRepositoryException is not
       // raised.
       String msg = e.getMessage();
       repLog.error(
@@ -439,6 +439,9 @@
       }
 
       runImpl();
+    } catch (IOException e) {
+      notifyRefReplicatedIOException();
+      throw e;
     }
   }
 
@@ -559,6 +562,19 @@
     }
   }
 
+  private void notifyRefReplicatedIOException() {
+    for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
+      entry
+          .getValue()
+          .notifyRefReplicated(
+              projectName.get(),
+              entry.getKey(),
+              uri,
+              ReplicationState.RefFetchResult.FAILED,
+              RefUpdate.Result.IO_FAILURE);
+    }
+  }
+
   public static class LockFailureException extends TransportException {
     private static final long serialVersionUID = 1L;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java
index 3f6673b..f4b6a07 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadAction.java
@@ -15,8 +15,11 @@
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
 import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.api.projects.HeadInput;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
@@ -24,28 +27,43 @@
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestModifyView;
 import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.project.ProjectResource;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.LocalFS;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.GerritConfigOps;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import java.net.HttpURLConnection;
 import java.util.Optional;
+import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.transport.URIish;
 
 @Singleton
 public class UpdateHeadAction implements RestModifyView<ProjectResource, HeadInput> {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final GerritConfigOps gerritConfigOps;
   private final FetchPreconditions preconditions;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
 
   @Inject
-  UpdateHeadAction(GerritConfigOps gerritConfigOps, FetchPreconditions preconditions) {
+  UpdateHeadAction(
+      GerritConfigOps gerritConfigOps,
+      FetchPreconditions preconditions,
+      DynamicItem<EventDispatcher> eventDispatcher) {
     this.gerritConfigOps = gerritConfigOps;
     this.preconditions = preconditions;
+    this.eventDispatcher = eventDispatcher;
   }
 
   @Override
   public Response<?> apply(ProjectResource projectResource, HeadInput input)
       throws AuthException, BadRequestException, ResourceConflictException, Exception {
+    Response<String> res = null;
+
     if (input == null || Strings.isNullOrEmpty(input.ref)) {
       throw new BadRequestException("ref required");
     }
@@ -61,15 +79,44 @@
     Optional<URIish> maybeRepo =
         gerritConfigOps.getGitRepositoryURI(String.format("%s.git", projectResource.getName()));
 
-    if (maybeRepo.isPresent()) {
-      if (new LocalFS(maybeRepo.get()).updateHead(projectResource.getNameKey(), ref)) {
-        return Response.ok(ref);
+    try {
+      if (maybeRepo.isPresent()) {
+        if (new LocalFS(maybeRepo.get()).updateHead(projectResource.getNameKey(), ref)) {
+          return res = Response.ok(ref);
+        }
+        throw new UnprocessableEntityException(
+            String.format(
+                "Could not update HEAD of repo %s to ref %s", projectResource.getName(), ref));
       }
-      throw new UnprocessableEntityException(
-          String.format(
-              "Could not update HEAD of repo %s to ref %s", projectResource.getName(), ref));
+    } finally {
+      fireEvent(
+          projectResource.getNameKey(),
+          res != null && res.statusCode() == HttpURLConnection.HTTP_OK);
     }
     throw new ResourceNotFoundException(
         String.format("Could not compute URL for repo: %s", projectResource.getName()));
   }
+
+  private void fireEvent(Project.NameKey projectName, boolean succeeded) {
+    try {
+      Context.setLocalEvent(true);
+      eventDispatcher
+          .get()
+          .postEvent(
+              new FetchRefReplicatedEvent(
+                  projectName.get(),
+                  RefNames.HEAD,
+                  "", // TODO: the remote label is not passed as parameter, hence cannot be
+                  // propagated to the event
+                  succeeded
+                      ? ReplicationState.RefFetchResult.SUCCEEDED
+                      : ReplicationState.RefFetchResult.FAILED,
+                  RefUpdate.Result.FORCED));
+    } catch (PermissionBackendException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot post event for refs/meta/config on project %s", projectName);
+    } finally {
+      Context.unsetLocalEvent();
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index 7d8a164..da7be9f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -31,15 +31,22 @@
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
 import org.eclipse.jgit.junit.TestRepository;
 import org.eclipse.jgit.lib.ObjectId;
@@ -54,12 +61,62 @@
 
 /** Base class to run regular and async acceptance tests */
 public abstract class PullReplicationITAbstract extends PullReplicationSetupBase {
+  private BufferedEventListener eventListener;
 
   public static class PullReplicationTestModule extends PullReplicationModule {
     @Inject
     public PullReplicationTestModule(SitePaths site, InMemoryMetricMaker memMetric) {
       super(site, memMetric);
     }
+
+    @Override
+    protected void configure() {
+      super.configure();
+
+      DynamicSet.bind(binder(), EventListener.class)
+          .to(BufferedEventListener.class)
+          .asEagerSingleton();
+    }
+  }
+
+  @Singleton
+  public static class BufferedEventListener implements EventListener {
+
+    private final List<Event> eventsReceived;
+    private String eventTypeFilter;
+
+    @Inject
+    public BufferedEventListener() {
+      eventsReceived = new ArrayList<>();
+    }
+
+    @Override
+    public void onEvent(Event event) {
+      if (event.getType().equals(eventTypeFilter)) {
+        eventsReceived.add(event);
+      }
+    }
+
+    public void clearFilter(String expectedEventType) {
+      eventsReceived.clear();
+      eventTypeFilter = expectedEventType;
+    }
+
+    public int numEventsReceived() {
+      return eventsReceived.size();
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T extends Event> Stream<T> eventsStream() {
+      return (Stream<T>) eventsReceived.stream();
+    }
+  }
+
+  @Override
+  protected void setUpTestPlugin(boolean loadExisting) throws Exception {
+    super.setUpTestPlugin(loadExisting);
+
+    eventListener = plugin.getSysInjector().getInstance(BufferedEventListener.class);
   }
 
   @Override
@@ -100,6 +157,8 @@
             ObjectId.zeroId().getName(),
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
+
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(event);
     waitUntilReplicationCompleted(1);
 
@@ -110,6 +169,9 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
+
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
   private void assertTasksMetricScheduledAndCompleted(int numTasks) {
@@ -141,6 +203,7 @@
             ObjectId.zeroId().getName(),
             branchRevision,
             TEST_REPLICATION_REMOTE);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(event);
     waitUntilReplicationCompleted(1);
 
@@ -153,7 +216,8 @@
       assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
     }
 
-    assertTasksMetricScheduledAndCompleted(1);
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
   @Test
@@ -171,6 +235,7 @@
             ObjectId.zeroId().getName(),
             branchRevision,
             TEST_REPLICATION_REMOTE);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(event);
     waitUntilReplicationFailed(1);
 
@@ -180,6 +245,10 @@
       Ref targetBranchRef = getRef(repo, newBranch);
       assertThat(targetBranchRef).isNull();
     }
+
+    assertThatEventListenerHasReceivedNumEvents(2);
+    assertThatRefReplicatedEventsContainsExactlyStatuses(
+        ReplicationState.RefFetchResult.FAILED, ReplicationState.RefFetchResult.FAILED);
   }
 
   @Test
@@ -213,6 +282,7 @@
             ObjectId.zeroId().getName(),
             branchRevision,
             TEST_REPLICATION_REMOTE);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(event);
     waitUntilReplicationCompleted(1);
 
@@ -223,6 +293,8 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
     }
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
 
     TestRepository<InMemoryRepository> testProject = cloneProject(testProjectNameKey);
     fetch(testProject, RefNames.REFS_HEADS + "*:" + RefNames.REFS_HEADS + "*");
@@ -240,6 +312,7 @@
             branchRevision,
             amendedCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(forcedPushEvent);
     waitUntilReplicationCompleted(2);
 
@@ -255,6 +328,8 @@
     }
 
     assertTasksMetricScheduledAndCompleted(2);
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
   @Test
@@ -282,6 +357,7 @@
             ObjectId.zeroId().getName(),
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(event);
     waitUntilReplicationCompleted(1);
 
@@ -294,6 +370,8 @@
     }
 
     assertTasksMetricScheduledAndCompleted(1);
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
   @Test
@@ -326,6 +404,7 @@
             ObjectId.zeroId().getName(),
             branchRevision,
             TEST_REPLICATION_REMOTE);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(event);
     waitUntilReplicationCompleted(1);
 
@@ -339,6 +418,8 @@
     }
 
     assertTasksMetricScheduledAndCompleted(1);
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
   @Test
@@ -363,6 +444,7 @@
             return NotifyHandling.NONE;
           }
         };
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     for (ProjectDeletedListener l : deletedListeners) {
       l.onProjectDeleted(event);
     }
@@ -371,6 +453,8 @@
     waitUntil(() -> !repoManager.list().contains(project));
 
     assertTasksMetricScheduledAndCompleted(1);
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
   @Test
@@ -394,6 +478,7 @@
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
 
     HeadUpdatedListener.Event event = new FakeHeadUpdateEvent(master, newBranch, testProjectName);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onHeadUpdated(event);
     waitUntilReplicationCompleted(1);
 
@@ -407,6 +492,8 @@
         });
 
     assertTasksMetricScheduledAndCompleted(1);
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
   @Ignore
@@ -427,6 +514,7 @@
             ObjectId.zeroId().getName(),
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
     pullReplicationQueue.onEvent(event);
     waitUntilReplicationCompleted(1);
 
@@ -437,6 +525,29 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
+
+    assertThatEventListenerHasReceivedNumEvents(1);
+    assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
+  }
+
+  private void assertThatEventListenerHasReceivedNumEvents(int numExpectedEvents) {
+    assertThat(eventListener.numEventsReceived()).isEqualTo(numExpectedEvents);
+  }
+
+  private void assertThatRefReplicatedEventsContainsStatus(
+      ReplicationState.RefFetchResult refFetchResult) {
+    Stream<FetchRefReplicatedEvent> replicatedStream = eventListener.eventsStream();
+    assertThat(replicatedStream.map(FetchRefReplicatedEvent::getStatus))
+        .contains(refFetchResult.toString());
+  }
+
+  private void assertThatRefReplicatedEventsContainsExactlyStatuses(
+      ReplicationState.RefFetchResult... refFetchResult) {
+    List<String> expectedStatuses =
+        Stream.of(refFetchResult).map(Object::toString).collect(Collectors.toList());
+    Stream<FetchRefReplicatedEvent> replicatedStream = eventListener.eventsStream();
+    assertThat(replicatedStream.map(FetchRefReplicatedEvent::getStatus))
+        .containsExactlyElementsIn(expectedStatuses);
   }
 
   private void waitUntilReplicationCompleted(int expected) throws Exception {