Trigger remote update HEAD
Listen to update HEAD events and trigger a remote update HEAD over REST
API.
Since replicas do not have REST API, this is currently working just for
primary instances. The implementation for replicas will be provided as a
follow up change.
Bug: Issue 15258
Change-Id: Ic19d4ac15387830c98a87438cfd3bcf881a90845
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 647a2f7..29ec93a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -20,6 +20,7 @@
import com.google.gerrit.extensions.annotations.Exports;
import com.google.gerrit.extensions.config.CapabilityDefinition;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
@@ -103,6 +104,7 @@
bind(EventBus.class).in(Scopes.SINGLETON);
bind(ReplicationSources.class).to(SourcesCollection.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class);
+ DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class);
bind(ReplicationQueue.class).in(Scopes.SINGLETON);
bind(ObservableQueue.class).to(ReplicationQueue.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index b7b509d..0e4ace1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -19,6 +19,7 @@
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.Project.NameKey;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
@@ -55,7 +56,8 @@
implements ObservableQueue,
LifecycleListener,
GitReferenceUpdatedListener,
- ProjectDeletedListener {
+ ProjectDeletedListener,
+ HeadUpdatedListener {
static final String PULL_REPLICATION_LOG_NAME = "pull_replication_log";
static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
@@ -312,6 +314,17 @@
}
}
+ @Override
+ public void onHeadUpdated(HeadUpdatedListener.Event event) {
+ Project.NameKey p = Project.nameKey(event.getProjectName());
+ sources.get().getAll().stream()
+ .filter(s -> s.wouldFetchProject(p))
+ .forEach(
+ s ->
+ s.getApis()
+ .forEach(apiUrl -> s.scheduleUpdateHead(apiUrl, p, event.getNewHeadName())));
+ }
+
@AutoValue
abstract static class ReferenceUpdatedEvent {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index e223036..28aa541 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -70,6 +70,7 @@
import com.googlesource.gerrit.plugins.replication.pull.fetch.JGitFetch;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
@@ -102,6 +103,7 @@
}
private final ReplicationStateListener stateLog;
+ private final UpdateHeadTask.Factory updateHeadFactory;
private final Object stateLock = new Object();
private final Map<URIish, FetchOne> pending = new HashMap<>();
private final Map<URIish, FetchOne> inFlight = new HashMap<>();
@@ -190,6 +192,7 @@
.implement(Fetch.class, BatchFetchClient.class)
.implement(Fetch.class, FetchClientImplementation.class, clientClass)
.build(FetchFactory.class));
+ factory(UpdateHeadTask.Factory.class);
}
@Provides
@@ -214,6 +217,7 @@
opFactory = child.getInstance(FetchOne.Factory.class);
threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
deleteProjectFactory = child.getInstance(DeleteProjectTask.Factory.class);
+ updateHeadFactory = child.getInstance(UpdateHeadTask.Factory.class);
}
public synchronized CloseableHttpClient memoize(
@@ -762,6 +766,19 @@
return config.replicateProjectDeletions();
}
+ void scheduleUpdateHead(String apiUrl, Project.NameKey project, String newHead) {
+ try {
+ URIish apiURI = new URIish(apiUrl);
+ @SuppressWarnings("unused")
+ ScheduledFuture<?> ignored =
+ pool.schedule(
+ updateHeadFactory.create(this, apiURI, project, newHead), 0, TimeUnit.SECONDS);
+ } catch (URISyntaxException e) {
+ logger.atSevere().withCause(e).log(
+ "Could not schedule HEAD pull-replication for project {}", project.get());
+ }
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
new file mode 100644
index 0000000..943ea92
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
@@ -0,0 +1,87 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationQueue.repLog;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
+import java.io.IOException;
+import org.eclipse.jgit.transport.URIish;
+
+public class UpdateHeadTask implements Runnable {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private final FetchRestApiClient.Factory fetchClientFactory;
+ private final Source source;
+ private final URIish apiURI;
+ private final Project.NameKey project;
+ private final String newHead;
+ private final int id;
+
+ interface Factory {
+ UpdateHeadTask create(Source source, URIish apiURI, Project.NameKey project, String newHead);
+ }
+
+ @Inject
+ UpdateHeadTask(
+ FetchRestApiClient.Factory fetchClientFactory,
+ IdGenerator ig,
+ @Assisted Source source,
+ @Assisted URIish apiURI,
+ @Assisted Project.NameKey project,
+ @Assisted String newHead) {
+ this.fetchClientFactory = fetchClientFactory;
+ this.id = ig.next();
+ this.source = source;
+ this.apiURI = apiURI;
+ this.project = project;
+ this.newHead = newHead;
+ }
+
+ @Override
+ public void run() {
+ try {
+ HttpResult httpResult =
+ fetchClientFactory.create(source).updateHead(project, newHead, apiURI);
+ if (!httpResult.isSuccessful()) {
+ throw new IOException(httpResult.getMessage().orElse("Unknown"));
+ }
+ logger.atFine().log(
+ "Successfully updated HEAD of project {} on remote {}",
+ project.get(),
+ apiURI.toASCIIString());
+ } catch (IOException e) {
+ String errorMessage =
+ String.format(
+ "Cannot update HEAD of project %s remote site %s",
+ project.get(), apiURI.toASCIIString());
+ logger.atWarning().withCause(e).log(errorMessage);
+ repLog.warn(errorMessage);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "[%s] update-head %s at %s to %s",
+ HexFormat.fromInt(id), project.get(), apiURI.toASCIIString(), newHead);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index a7cc3e7..df97609 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -129,6 +129,18 @@
return httpClientFactory.create(source).execute(delete, this, getContext(apiUri));
}
+ public HttpResult updateHead(Project.NameKey project, String newHead, URIish apiUri)
+ throws IOException {
+ logger.atFine().log("Updating head of %s on %s", project.get(), newHead);
+ String url =
+ String.format("%s/%s", apiUri.toASCIIString(), getProjectUpdateHeadUrl(project.get()));
+ HttpPut req = new HttpPut(url);
+ req.setEntity(
+ new StringEntity(String.format("{\"ref\": \"%s\"}", newHead), StandardCharsets.UTF_8));
+ req.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
+ return httpClientFactory.create(source).execute(req, this, getContext(apiUri));
+ }
+
public HttpResult callSendObject(
Project.NameKey project, String refName, RevisionData revisionData, URIish targetUri)
throws ClientProtocolException, IOException {
@@ -180,4 +192,8 @@
String getProjectDeletionUrl(String projectName) {
return String.format("a/projects/%s", Url.encode(projectName));
}
+
+ String getProjectUpdateHeadUrl(String projectName) {
+ return String.format("a/projects/%s/HEAD", Url.encode(projectName));
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeHeadUpdateEvent.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeHeadUpdateEvent.java
new file mode 100644
index 0000000..d1769ab
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeHeadUpdateEvent.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.api.changes.NotifyHandling;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
+
+class FakeHeadUpdateEvent implements HeadUpdatedListener.Event {
+
+ private final String oldName;
+ private final String newName;
+ private final String projectName;
+
+ FakeHeadUpdateEvent(String oldName, String newName, String projectName) {
+
+ this.oldName = oldName;
+ this.newName = newName;
+ this.projectName = projectName;
+ }
+
+ @Override
+ public NotifyHandling getNotify() {
+ return NotifyHandling.NONE;
+ }
+
+ @Override
+ public String getOldHeadName() {
+ return oldName;
+ }
+
+ @Override
+ public String getNewHeadName() {
+ return newName;
+ }
+
+ @Override
+ public String getProjectName() {
+ return projectName;
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index c8c294f..c2335e8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -30,12 +30,14 @@
import com.google.gerrit.extensions.api.projects.BranchInput;
import com.google.gerrit.extensions.common.Input;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.BadRequestException;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
import com.google.gerrit.extensions.restapi.RestApiModule;
import com.google.gerrit.extensions.restapi.RestModifyView;
import com.google.gerrit.server.config.SitePaths;
@@ -284,6 +286,38 @@
waitUntil(() -> fakeDeleteProjectPlugin.getDeleteEndpointCalls() == 1);
}
+ @Test
+ public void shouldReplicateHeadUpdate() throws Exception {
+ String testProjectName = project.get();
+ setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(testProjectName));
+ config.save();
+ AutoReloadConfigDecorator autoReloadConfigDecorator =
+ getInstance(AutoReloadConfigDecorator.class);
+ autoReloadConfigDecorator.reload();
+
+ String newBranch = "refs/heads/mybranch";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(testProjectName).branch(newBranch).create(input);
+ String branchRevision = gApi.projects().name(testProjectName).branch(newBranch).get().revision;
+
+ ReplicationQueue pullReplicationQueue =
+ plugin.getSysInjector().getInstance(ReplicationQueue.class);
+
+ HeadUpdatedListener.Event event = new FakeHeadUpdateEvent(master, newBranch, testProjectName);
+ pullReplicationQueue.onHeadUpdated(event);
+
+ waitUntil(
+ () -> {
+ try {
+ return gApi.projects().name(testProjectName).head().equals(newBranch);
+ } catch (RestApiException e) {
+ return false;
+ }
+ });
+ }
+
private Ref getRef(Repository repo, String branchName) throws IOException {
return repo.getRefDatabase().exactRef(branchName);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 4779d63..78eb3f7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -298,6 +298,34 @@
verify(source, never()).scheduleDeleteProject(any(), any());
}
+ @Test
+ public void shouldScheduleUpdateHeadWhenWouldFetchProject() throws IOException {
+ when(source.wouldFetchProject(any())).thenReturn(true);
+
+ String projectName = "aProject";
+ String newHEAD = "newHEAD";
+
+ objectUnderTest.start();
+ objectUnderTest.onHeadUpdated(new FakeHeadUpdateEvent("oldHead", newHEAD, projectName));
+ verify(source, times(1))
+ .scheduleUpdateHead(any(), projectNameKeyCaptor.capture(), stringCaptor.capture());
+
+ assertThat(stringCaptor.getValue()).isEqualTo(newHEAD);
+ assertThat(projectNameKeyCaptor.getValue()).isEqualTo(Project.NameKey.parse(projectName));
+ }
+
+ @Test
+ public void shouldNotScheduleUpdateHeadWhenNotWouldFetchProject() throws IOException {
+ when(source.wouldFetchProject(any())).thenReturn(false);
+
+ String projectName = "aProject";
+ String newHEAD = "newHEAD";
+
+ objectUnderTest.start();
+ objectUnderTest.onHeadUpdated(new FakeHeadUpdateEvent("oldHead", newHEAD, projectName));
+ verify(source, never()).scheduleUpdateHead(any(), any(), any());
+ }
+
protected static Path createTempPath(String prefix) throws IOException {
return createTempDirectory(prefix);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index bde86fc..ed1eef4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -23,7 +23,9 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+import com.google.common.io.CharStreams;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
@@ -33,6 +35,7 @@
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Optional;
@@ -376,6 +379,25 @@
assertThat(httpDelete.getURI().getPath()).isEqualTo("/a/projects/test_repo");
}
+ @Test
+ public void shouldCallUpdateHEADEndpoint() throws IOException, URISyntaxException {
+ String newHead = "newHead";
+ String projectName = "aProject";
+ objectUnderTest.updateHead(Project.nameKey(projectName), newHead, new URIish(api));
+
+ verify(httpClient, times(1)).execute(httpPutCaptor.capture(), any(), any());
+
+ HttpPut httpPut = httpPutCaptor.getValue();
+ String payload =
+ CharStreams.toString(
+ new InputStreamReader(httpPut.getEntity().getContent(), Charsets.UTF_8));
+
+ assertThat(httpPut.getURI().getHost()).isEqualTo("gerrit-host");
+ assertThat(httpPut.getURI().getPath())
+ .isEqualTo(String.format("/a/projects/%s/HEAD", projectName));
+ assertThat(payload).isEqualTo(String.format("{\"ref\": \"%s\"}", newHead));
+ }
+
public String readPayload(HttpPost entity) throws UnsupportedOperationException, IOException {
ByteBuffer buf = IO.readWholeStream(entity.getEntity().getContent(), 1024);
return RawParseUtils.decode(buf.array(), buf.arrayOffset(), buf.limit()).trim();