Extract pack-copy logic into ProjectRepairer Move the pack-copy logic from RepairCommand into a new shared ProjectRepairer class so it can be reused by other callers (for example, an automated repair flow). RepairCommand now delegates the per-destination copy to ProjectRepairer. Change-Id: I86166730f02bf66963ce603cc929c49fbd895fff
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ProjectRepairer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ProjectRepairer.java new file mode 100644 index 0000000..5866569 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ProjectRepairer.java
@@ -0,0 +1,157 @@ +// Copyright (C) 2026 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + +import com.google.common.base.Strings; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.git.GitRepositoryManager; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.googlesource.gerrit.plugins.replication.api.ReplicationConfig; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.util.QuotedString; +import org.eclipse.jgit.util.io.StreamCopyThread; + +@Singleton +public class ProjectRepairer { + private final GitRepositoryManager gitManager; + private final ReplicationConfig replicationConfig; + + @Inject + ProjectRepairer(GitRepositoryManager gitManager, ReplicationConfig replicationConfig) { + this.gitManager = gitManager; + this.replicationConfig = replicationConfig; + } + + public boolean repair(Project.NameKey project, URIish uri, OutputStream out, boolean copyPacks) { + if (copyPacks && !copyPackTo(project, uri, out)) { + repLog.atSevere().log("Repair failed for %s on %s", project.get(), uri); + return false; + } + return true; + } + + public static boolean canCopy(URIish uri) { + return AdminApiFactory.isSSH(uri) && !AdminApiFactory.isGerrit(uri); + } + + private boolean copyPackTo(Project.NameKey project, URIish uri, OutputStream out) { + if (Strings.isNullOrEmpty(uri.getHost())) { + repLog.atSevere().log("Cannot repair %s: URI has no host: %s", project.get(), uri); + return false; + } + if (Strings.isNullOrEmpty(uri.getPath())) { + repLog.atSevere().log("Cannot repair %s: URI has no path: %s", project.get(), uri); + return false; + } + + Path packDir; + try (Repository repo = gitManager.openRepository(project)) { + packDir = repo.getDirectory().toPath().resolve("objects").resolve("pack"); + } catch (IOException e) { + repLog.atSevere().withCause(e).log("Cannot open repository %s for repair", project.get()); + return false; + } + + if (!Files.isDirectory(packDir)) { + repLog.atSevere().log("No objects/pack directory for project %s", project.get()); + return false; + } + + return copyInOrder(packDir, uri, out); + } + + private boolean copyInOrder(Path packDir, URIish uri, OutputStream out) { + try { + return copy(packDir, uri, out, "*.pack") == 0 + && copy(packDir, uri, out, "*.idx", "*.bitmap", "*.rev") == 0; + } catch (InterruptedException e) { + repLog.atWarning().withCause(e).log("Interrupted during copy to %s", uri); + return false; + } + } + + private int copy(Path src, URIish uri, OutputStream out, String... includes) + throws InterruptedException { + List<String> cmd = new ArrayList<>(); + cmd.add(replicationConfig.getRsyncPath()); + cmd.add("-avP"); + cmd.add("-e"); + cmd.add(buildSshTransport(uri)); + for (String inc : includes) { + cmd.add("--include=" + inc); + } + cmd.add("--exclude=*"); + cmd.add(src.toAbsolutePath().normalize() + "/"); + cmd.add(buildCopyDestination(uri)); + + repLog.atInfo().log("Running repair cmd: %s", String.join(" ", cmd)); + + ProcessBuilder pb = new ProcessBuilder(cmd); + pb.redirectErrorStream(true); + Process p; + try { + p = pb.start(); + } catch (IOException e) { + repLog.atWarning().withCause(e).log("Copy to %s failed", uri); + return -1; + } + + StreamCopyThread outStream = new StreamCopyThread(p.getInputStream(), out); + outStream.setName("copy-packs-output"); + outStream.start(); + try { + int code = p.waitFor(); + outStream.join(); + if (code != 0) { + repLog.atWarning().log("Copy to %s failed with exit code %d", uri, code); + } + return code; + } catch (InterruptedException e) { + p.destroyForcibly(); + outStream.halt(); + return -1; + } + } + + private static String buildCopyDestination(URIish uri) { + String host = uri.getHost(); + String path = uri.getPath(); + String remotePackPath = QuotedString.BOURNE.quote(path + "/objects/pack/"); + String user = uri.getUser(); + if (user != null && !user.isEmpty()) { + return user + "@" + host + ":" + remotePackPath; + } + return host + ":" + remotePackPath; + } + + private static String buildSshTransport(URIish uri) { + StringBuilder sb = new StringBuilder("ssh -o BatchMode=yes"); + int port = uri.getPort(); + if (port > 0) { + sb.append(" -p ").append(port); + } + return sb.toString(); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RepairCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RepairCommand.java index 92af46a..0371c9c 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RepairCommand.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RepairCommand.java
@@ -14,11 +14,9 @@ package com.googlesource.gerrit.plugins.replication; -import com.google.common.flogger.FluentLogger; import com.google.gerrit.entities.Project; import com.google.gerrit.exceptions.StorageException; import com.google.gerrit.extensions.annotations.RequiresCapability; -import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.project.ProjectCache; import com.google.gerrit.sshd.CommandMetaData; import com.google.gerrit.sshd.SshCommand; @@ -26,27 +24,19 @@ import com.googlesource.gerrit.plugins.replication.api.ReplicationConfig; import java.io.IOException; import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; -import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.transport.URIish; -import org.eclipse.jgit.util.QuotedString; -import org.eclipse.jgit.util.io.StreamCopyThread; import org.kohsuke.args4j.Argument; import org.kohsuke.args4j.Option; @RequiresCapability(StartReplicationCapability.START_REPLICATION) @CommandMetaData(name = "repair", description = "Repair a project on replication destinations") final class RepairCommand extends SshCommand implements PushResultProcessing.SshOutputCommand { - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - @Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name") private String projectName; @@ -64,11 +54,10 @@ @Option(name = "--full", usage = "run all supported repair actions (default)") private boolean full; - @Inject private GitRepositoryManager gitManager; @Inject private ProjectCache projectCache; @Inject private ReplicationDestinations destinations; @Inject private ReplicationStarter replicationStarter; - @Inject private ReplicationConfig replicationConfig; + @Inject private ProjectRepairer projectRepairer; private final Object outputLock = new Object(); @@ -94,24 +83,13 @@ } private Set<URIish> repair(Project.NameKey project) throws Failure { - Path packDir; - try (Repository repo = gitManager.openRepository(project)) { - packDir = repo.getDirectory().toPath().resolve("objects").resolve("pack"); - } catch (IOException e) { - throw die(e); - } - - if (!Files.isDirectory(packDir)) { - throw die("No objects/pack directory for project " + projectName); - } - Set<URIish> copyTargets = new HashSet<>(); Collection<URIish> destUris = destinations .getURIs(Optional.empty(), project, ReplicationConfig.FilterType.ALL, urlMatch) .values(); for (URIish uri : destUris) { - if (!canCopy(uri)) { + if (!ProjectRepairer.canCopy(uri)) { writeStdErrSync( "Warning: skipping " + uri + " as copy-packs only supports plain SSH destinations"); continue; @@ -124,86 +102,26 @@ } Set<URIish> failedUris = new HashSet<>(); + OutputStream out = getFlushingOutputStream(); for (URIish uri : copyTargets) { writeStdOutSync("\nRepairing " + uri + " ..."); - if ((full || copyPacks) && !copyInOrder(packDir, uri)) { + if (projectRepairer.repair(project, uri, out, full || copyPacks)) { + writeStdOutSync( + "\nRunning replication start for " + project.get() + " to " + uri.toString() + " ..."); + replicationStarter.start( + uri.toString(), + Set.of(), + new ReplicationFilter(List.of(project.get()), Collections.emptyList()), + /* now= */ true, + /* wait= */ true, + this); + } else { failedUris.add(uri); - continue; } - writeStdOutSync( - "\nRunning replication start for " + project.get() + " to " + uri.toString() + " ..."); - replicationStarter.start( - uri.toString(), - Set.of(), - new ReplicationFilter(List.of(project.get()), Collections.emptyList()), - /* now= */ true, - /* wait= */ true, - this); } return failedUris; } - private boolean copyInOrder(Path packDir, URIish uri) throws Failure { - return copyWithDie(packDir, uri, "*.pack") - && copyWithDie(packDir, uri, "*.idx", "*.bitmap", "*.rev"); - } - - private boolean copyWithDie(Path src, URIish uri, String... includes) throws Failure { - int retCode; - try { - retCode = copy(src, uri, includes); - } catch (IOException e) { - throw die(e); - } catch (InterruptedException e) { - throw die("Interrupted during copy to " + uri, e); - } - if (retCode != 0) { - writeStdErrSync("Warning: copy to " + uri + " failed with exit code " + retCode); - return false; - } - return true; - } - - private static boolean canCopy(URIish uri) { - return AdminApiFactory.isSSH(uri) && !AdminApiFactory.isGerrit(uri); - } - - private int copy(Path src, URIish uri, String... includes) - throws IOException, InterruptedException, UnloggedFailure { - List<String> cmd = new ArrayList<>(); - cmd.add(replicationConfig.getRsyncPath()); - cmd.add("-avP"); - cmd.add("-e"); - cmd.add(buildSshTransport(uri)); - for (String inc : includes) { - cmd.add("--include=" + inc); - } - cmd.add("--exclude=*"); - cmd.add(src.toAbsolutePath().normalize() + "/"); - cmd.add(buildCopyDestination(uri)); - - logger.atInfo().log("Running repair cmd: %s", String.join(" ", cmd)); - - ProcessBuilder pb = new ProcessBuilder(cmd); - pb.redirectErrorStream(true); - Process p = pb.start(); - - stdout.flush(); - StreamCopyThread outStream = - new StreamCopyThread(p.getInputStream(), getFlushingOutputStream()); - outStream.setName("copy-packs-output"); - outStream.start(); - try { - int code = p.waitFor(); - outStream.join(); - return code; - } catch (InterruptedException e) { - p.destroyForcibly(); - outStream.halt(); - throw e; - } - } - private OutputStream getFlushingOutputStream() { return new OutputStream() { @Override @@ -220,35 +138,6 @@ }; } - private String buildCopyDestination(URIish uri) throws UnloggedFailure { - String host = uri.getHost(); - if (host == null || host.isEmpty()) { - throw die("URI has no host: " + uri); - } - - String path = uri.getPath(); - if (path == null || path.isEmpty()) { - throw die("URI has no path: " + uri); - } - - String remotePackPath = QuotedString.BOURNE.quote(path + "/objects/pack/"); - - String user = uri.getUser(); - if (user != null && !user.isEmpty()) { - return user + "@" + host + ":" + remotePackPath; - } - return host + ":" + remotePackPath; - } - - private static String buildSshTransport(URIish uri) { - StringBuilder sb = new StringBuilder("ssh -o BatchMode=yes"); - int port = uri.getPort(); - if (port > 0) { - sb.append(" -p ").append(port); - } - return sb.toString(); - } - @Override public void writeStdOutSync(String message) { synchronized (outputLock) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java index e3e4021..bae633d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -106,6 +106,8 @@ bind(ReplicationQueue.class).in(Scopes.SINGLETON); bind(ReplicationDestinations.class).to(DestinationsCollection.class); + bind(ProjectRepairer.class).in(Scopes.SINGLETON); + install(new FactoryModuleBuilder().build(Destination.Factory.class)); install(new FactoryModuleBuilder().build(ProjectDeletionState.Factory.class)); }