Trigger remote update HEAD

Listen to update HEAD events and trigger a remote update HEAD over REST
API.

Since replicas do not have REST API, this is currently working just for
primary instances. The implementation for replicas will be provided as a
follow up change.

Bug: Issue 15258
Change-Id: Ic19d4ac15387830c98a87438cfd3bcf881a90845
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 647a2f7..29ec93a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -20,6 +20,7 @@
 import com.google.gerrit.extensions.annotations.Exports;
 import com.google.gerrit.extensions.config.CapabilityDefinition;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
@@ -103,6 +104,7 @@
     bind(EventBus.class).in(Scopes.SINGLETON);
     bind(ReplicationSources.class).to(SourcesCollection.class);
     DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class);
+    DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class);
 
     bind(ReplicationQueue.class).in(Scopes.SINGLETON);
     bind(ObservableQueue.class).to(ReplicationQueue.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index b7b509d..0e4ace1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
@@ -55,7 +56,8 @@
     implements ObservableQueue,
         LifecycleListener,
         GitReferenceUpdatedListener,
-        ProjectDeletedListener {
+        ProjectDeletedListener,
+        HeadUpdatedListener {
 
   static final String PULL_REPLICATION_LOG_NAME = "pull_replication_log";
   static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
@@ -312,6 +314,17 @@
     }
   }
 
+  @Override
+  public void onHeadUpdated(HeadUpdatedListener.Event event) {
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    sources.get().getAll().stream()
+        .filter(s -> s.wouldFetchProject(p))
+        .forEach(
+            s ->
+                s.getApis()
+                    .forEach(apiUrl -> s.scheduleUpdateHead(apiUrl, p, event.getNewHeadName())));
+  }
+
   @AutoValue
   abstract static class ReferenceUpdatedEvent {
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index e223036..28aa541 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -70,6 +70,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.fetch.JGitFetch;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.util.HashMap;
 import java.util.List;
@@ -102,6 +103,7 @@
   }
 
   private final ReplicationStateListener stateLog;
+  private final UpdateHeadTask.Factory updateHeadFactory;
   private final Object stateLock = new Object();
   private final Map<URIish, FetchOne> pending = new HashMap<>();
   private final Map<URIish, FetchOne> inFlight = new HashMap<>();
@@ -190,6 +192,7 @@
                         .implement(Fetch.class, BatchFetchClient.class)
                         .implement(Fetch.class, FetchClientImplementation.class, clientClass)
                         .build(FetchFactory.class));
+                factory(UpdateHeadTask.Factory.class);
               }
 
               @Provides
@@ -214,6 +217,7 @@
     opFactory = child.getInstance(FetchOne.Factory.class);
     threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
     deleteProjectFactory = child.getInstance(DeleteProjectTask.Factory.class);
+    updateHeadFactory = child.getInstance(UpdateHeadTask.Factory.class);
   }
 
   public synchronized CloseableHttpClient memoize(
@@ -762,6 +766,19 @@
     return config.replicateProjectDeletions();
   }
 
+  void scheduleUpdateHead(String apiUrl, Project.NameKey project, String newHead) {
+    try {
+      URIish apiURI = new URIish(apiUrl);
+      @SuppressWarnings("unused")
+      ScheduledFuture<?> ignored =
+          pool.schedule(
+              updateHeadFactory.create(this, apiURI, project, newHead), 0, TimeUnit.SECONDS);
+    } catch (URISyntaxException e) {
+      logger.atSevere().withCause(e).log(
+          "Could not schedule HEAD pull-replication for project {}", project.get());
+    }
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
new file mode 100644
index 0000000..943ea92
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
@@ -0,0 +1,87 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationQueue.repLog;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
+import java.io.IOException;
+import org.eclipse.jgit.transport.URIish;
+
+public class UpdateHeadTask implements Runnable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private final FetchRestApiClient.Factory fetchClientFactory;
+  private final Source source;
+  private final URIish apiURI;
+  private final Project.NameKey project;
+  private final String newHead;
+  private final int id;
+
+  interface Factory {
+    UpdateHeadTask create(Source source, URIish apiURI, Project.NameKey project, String newHead);
+  }
+
+  @Inject
+  UpdateHeadTask(
+      FetchRestApiClient.Factory fetchClientFactory,
+      IdGenerator ig,
+      @Assisted Source source,
+      @Assisted URIish apiURI,
+      @Assisted Project.NameKey project,
+      @Assisted String newHead) {
+    this.fetchClientFactory = fetchClientFactory;
+    this.id = ig.next();
+    this.source = source;
+    this.apiURI = apiURI;
+    this.project = project;
+    this.newHead = newHead;
+  }
+
+  @Override
+  public void run() {
+    try {
+      HttpResult httpResult =
+          fetchClientFactory.create(source).updateHead(project, newHead, apiURI);
+      if (!httpResult.isSuccessful()) {
+        throw new IOException(httpResult.getMessage().orElse("Unknown"));
+      }
+      logger.atFine().log(
+          "Successfully updated HEAD of project {} on remote {}",
+          project.get(),
+          apiURI.toASCIIString());
+    } catch (IOException e) {
+      String errorMessage =
+          String.format(
+              "Cannot update HEAD of project %s remote site %s",
+              project.get(), apiURI.toASCIIString());
+      logger.atWarning().withCause(e).log(errorMessage);
+      repLog.warn(errorMessage);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "[%s] update-head %s at %s to %s",
+        HexFormat.fromInt(id), project.get(), apiURI.toASCIIString(), newHead);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index a7cc3e7..df97609 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -129,6 +129,18 @@
     return httpClientFactory.create(source).execute(delete, this, getContext(apiUri));
   }
 
+  public HttpResult updateHead(Project.NameKey project, String newHead, URIish apiUri)
+      throws IOException {
+    logger.atFine().log("Updating head of %s on %s", project.get(), newHead);
+    String url =
+        String.format("%s/%s", apiUri.toASCIIString(), getProjectUpdateHeadUrl(project.get()));
+    HttpPut req = new HttpPut(url);
+    req.setEntity(
+        new StringEntity(String.format("{\"ref\": \"%s\"}", newHead), StandardCharsets.UTF_8));
+    req.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
+    return httpClientFactory.create(source).execute(req, this, getContext(apiUri));
+  }
+
   public HttpResult callSendObject(
       Project.NameKey project, String refName, RevisionData revisionData, URIish targetUri)
       throws ClientProtocolException, IOException {
@@ -180,4 +192,8 @@
   String getProjectDeletionUrl(String projectName) {
     return String.format("a/projects/%s", Url.encode(projectName));
   }
+
+  String getProjectUpdateHeadUrl(String projectName) {
+    return String.format("a/projects/%s/HEAD", Url.encode(projectName));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeHeadUpdateEvent.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeHeadUpdateEvent.java
new file mode 100644
index 0000000..d1769ab
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeHeadUpdateEvent.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.api.changes.NotifyHandling;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
+
+class FakeHeadUpdateEvent implements HeadUpdatedListener.Event {
+
+  private final String oldName;
+  private final String newName;
+  private final String projectName;
+
+  FakeHeadUpdateEvent(String oldName, String newName, String projectName) {
+
+    this.oldName = oldName;
+    this.newName = newName;
+    this.projectName = projectName;
+  }
+
+  @Override
+  public NotifyHandling getNotify() {
+    return NotifyHandling.NONE;
+  }
+
+  @Override
+  public String getOldHeadName() {
+    return oldName;
+  }
+
+  @Override
+  public String getNewHeadName() {
+    return newName;
+  }
+
+  @Override
+  public String getProjectName() {
+    return projectName;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index c8c294f..c2335e8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -30,12 +30,14 @@
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.Input;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.extensions.restapi.RestApiModule;
 import com.google.gerrit.extensions.restapi.RestModifyView;
 import com.google.gerrit.server.config.SitePaths;
@@ -284,6 +286,38 @@
     waitUntil(() -> fakeDeleteProjectPlugin.getDeleteEndpointCalls() == 1);
   }
 
+  @Test
+  public void shouldReplicateHeadUpdate() throws Exception {
+    String testProjectName = project.get();
+    setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(testProjectName));
+    config.save();
+    AutoReloadConfigDecorator autoReloadConfigDecorator =
+        getInstance(AutoReloadConfigDecorator.class);
+    autoReloadConfigDecorator.reload();
+
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(testProjectName).branch(newBranch).create(input);
+    String branchRevision = gApi.projects().name(testProjectName).branch(newBranch).get().revision;
+
+    ReplicationQueue pullReplicationQueue =
+        plugin.getSysInjector().getInstance(ReplicationQueue.class);
+
+    HeadUpdatedListener.Event event = new FakeHeadUpdateEvent(master, newBranch, testProjectName);
+    pullReplicationQueue.onHeadUpdated(event);
+
+    waitUntil(
+        () -> {
+          try {
+            return gApi.projects().name(testProjectName).head().equals(newBranch);
+          } catch (RestApiException e) {
+            return false;
+          }
+        });
+  }
+
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 4779d63..78eb3f7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -298,6 +298,34 @@
     verify(source, never()).scheduleDeleteProject(any(), any());
   }
 
+  @Test
+  public void shouldScheduleUpdateHeadWhenWouldFetchProject() throws IOException {
+    when(source.wouldFetchProject(any())).thenReturn(true);
+
+    String projectName = "aProject";
+    String newHEAD = "newHEAD";
+
+    objectUnderTest.start();
+    objectUnderTest.onHeadUpdated(new FakeHeadUpdateEvent("oldHead", newHEAD, projectName));
+    verify(source, times(1))
+        .scheduleUpdateHead(any(), projectNameKeyCaptor.capture(), stringCaptor.capture());
+
+    assertThat(stringCaptor.getValue()).isEqualTo(newHEAD);
+    assertThat(projectNameKeyCaptor.getValue()).isEqualTo(Project.NameKey.parse(projectName));
+  }
+
+  @Test
+  public void shouldNotScheduleUpdateHeadWhenNotWouldFetchProject() throws IOException {
+    when(source.wouldFetchProject(any())).thenReturn(false);
+
+    String projectName = "aProject";
+    String newHEAD = "newHEAD";
+
+    objectUnderTest.start();
+    objectUnderTest.onHeadUpdated(new FakeHeadUpdateEvent("oldHead", newHEAD, projectName));
+    verify(source, never()).scheduleUpdateHead(any(), any(), any());
+  }
+
   protected static Path createTempPath(String prefix) throws IOException {
     return createTempDirectory(prefix);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index bde86fc..ed1eef4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -23,7 +23,9 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+import com.google.common.io.CharStreams;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
@@ -33,6 +35,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.Optional;
@@ -376,6 +379,25 @@
     assertThat(httpDelete.getURI().getPath()).isEqualTo("/a/projects/test_repo");
   }
 
+  @Test
+  public void shouldCallUpdateHEADEndpoint() throws IOException, URISyntaxException {
+    String newHead = "newHead";
+    String projectName = "aProject";
+    objectUnderTest.updateHead(Project.nameKey(projectName), newHead, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPutCaptor.capture(), any(), any());
+
+    HttpPut httpPut = httpPutCaptor.getValue();
+    String payload =
+        CharStreams.toString(
+            new InputStreamReader(httpPut.getEntity().getContent(), Charsets.UTF_8));
+
+    assertThat(httpPut.getURI().getHost()).isEqualTo("gerrit-host");
+    assertThat(httpPut.getURI().getPath())
+        .isEqualTo(String.format("/a/projects/%s/HEAD", projectName));
+    assertThat(payload).isEqualTo(String.format("{\"ref\": \"%s\"}", newHead));
+  }
+
   public String readPayload(HttpPost entity) throws UnsupportedOperationException, IOException {
     ByteBuffer buf = IO.readWholeStream(entity.getEntity().getContent(), 1024);
     return RawParseUtils.decode(buf.array(), buf.arrayOffset(), buf.limit()).trim();