Add multiple replication configuration file support

Replication plugin is capable to load both replication.config file and
all *.config files from etc/replication directory. This change allows
to split current monolith replication.config into global and remotes
parts. Main purpose of this change is to simplify future generation
of the diff between old and new remote configurations to decide which
remote configurations needs to be reloaded.

Feature: Issue 12450
Change-Id: I51c1b8fc3b7d95b65b15c37a540fdc6e2087d09c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
new file mode 100644
index 0000000..4a2f8c9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -0,0 +1,179 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.io.Files.getFileExtension;
+import static com.google.common.io.Files.getNameWithoutExtension;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.internal.storage.file.FileSnapshot;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+public class FanoutReplicationConfig implements ReplicationConfig {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final ReplicationFileBasedConfig replicationConfig;
+  private final Config config;
+  private final Path remoteConfigsDirPath;
+
+  @Inject
+  public FanoutReplicationConfig(SitePaths site, @PluginData Path pluginDataDir)
+      throws IOException, ConfigInvalidException {
+
+    remoteConfigsDirPath = site.etc_dir.resolve("replication");
+    replicationConfig = new ReplicationFileBasedConfig(site, pluginDataDir);
+    config = replicationConfig.getConfig();
+    removeRemotes(config);
+
+    try (Stream<Path> files = Files.walk(remoteConfigsDirPath, 1)) {
+      files
+          .filter(Files::isRegularFile)
+          .filter(path -> getFileExtension(path.toString()).equals("config"))
+          .map(this::loadConfig)
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .filter(this::isValid)
+          .forEach(cfg -> addRemoteConfig(cfg, config));
+    } catch (IllegalStateException e) {
+      throw new ConfigInvalidException(e.getMessage());
+    }
+  }
+
+  private void removeRemotes(Config config) {
+    Set<String> remoteNames = config.getSubsections("remote");
+    if (remoteNames.size() > 0) {
+      logger.atSevere().log(
+          "When replication directory is present replication.config file cannot contain remote configuration. Ignoring: %s",
+          String.join(",", remoteNames));
+
+      for (String name : remoteNames) {
+        config.unsetSection("remote", name);
+      }
+    }
+  }
+
+  private void addRemoteConfig(FileBasedConfig source, Config destination) {
+    String remoteName = getNameWithoutExtension(source.getFile().getName());
+    for (String name : source.getNames("remote")) {
+      destination.setStringList(
+          "remote",
+          remoteName,
+          name,
+          Lists.newArrayList(source.getStringList("remote", null, name)));
+    }
+  }
+
+  private boolean isValid(Config cfg) {
+    if (cfg.getSections().size() != 1 || !cfg.getSections().contains("remote")) {
+      logger.atSevere().log(
+          "Remote replication configuration file %s must contain only one remote section.", cfg);
+      return false;
+    }
+    if (cfg.getSubsections("remote").size() > 0) {
+      logger.atSevere().log(
+          "Remote replication configuration file %s cannot contain remote subsections.", cfg);
+      return false;
+    }
+
+    return true;
+  }
+
+  private Optional<FileBasedConfig> loadConfig(Path path) {
+    FileBasedConfig cfg = new FileBasedConfig(path.toFile(), FS.DETECTED);
+    try {
+      cfg.load();
+    } catch (IOException | ConfigInvalidException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot load remote replication configuration file %s.", path);
+      return Optional.empty();
+    }
+    return Optional.of(cfg);
+  }
+
+  @Override
+  public boolean isReplicateAllOnPluginStart() {
+    return replicationConfig.isReplicateAllOnPluginStart();
+  }
+
+  @Override
+  public boolean isDefaultForceUpdate() {
+    return replicationConfig.isDefaultForceUpdate();
+  }
+
+  @Override
+  public int getMaxRefsToLog() {
+    return replicationConfig.getMaxRefsToLog();
+  }
+
+  @Override
+  public Path getEventsDirectory() {
+    return replicationConfig.getEventsDirectory();
+  }
+
+  @Override
+  public int getSshConnectionTimeout() {
+    return replicationConfig.getSshConnectionTimeout();
+  }
+
+  @Override
+  public int getSshCommandTimeout() {
+    return replicationConfig.getSshCommandTimeout();
+  }
+
+  @Override
+  public String getVersion() {
+    Hasher hasher = Hashing.murmur3_128().newHasher();
+    hasher.putString(replicationConfig.getVersion(), UTF_8);
+    try (Stream<Path> files = Files.list(remoteConfigsDirPath)) {
+      files
+          .filter(Files::isRegularFile)
+          .filter(path -> getFileExtension(path.toString()).equals("config"))
+          .sorted()
+          .map(Path::toFile)
+          .map(FileSnapshot::save)
+          .forEach(
+              fileSnapshot ->
+                  // hashCode is based on file size, file key and last modified time
+                  hasher.putInt(fileSnapshot.hashCode()));
+      return hasher.hash().toString();
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot list remote configuration files from %s. Returning replication.config file version",
+          remoteConfigsDirPath);
+      return replicationConfig.getVersion();
+    }
+  }
+
+  @Override
+  public Config getConfig() {
+    return config;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 456b17f..ed1e348 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -34,6 +34,7 @@
 import com.google.inject.internal.UniqueAnnotations;
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -41,10 +42,12 @@
 import org.eclipse.jgit.util.FS;
 
 class ReplicationModule extends AbstractModule {
+  private final SitePaths site;
   private final Path cfgPath;
 
   @Inject
   public ReplicationModule(SitePaths site) {
+    this.site = site;
     cfgPath = site.etc_dir.resolve("replication.config");
   }
 
@@ -82,13 +85,13 @@
     if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
       bind(ReplicationConfig.class)
           .annotatedWith(MainReplicationConfig.class)
-          .to(ReplicationFileBasedConfig.class);
+          .to(getReplicationConfigClass());
       bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class).in(Scopes.SINGLETON);
       bind(LifecycleListener.class)
           .annotatedWith(UniqueAnnotations.create())
           .to(AutoReloadConfigDecorator.class);
     } else {
-      bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class).in(Scopes.SINGLETON);
+      bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
     }
 
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
@@ -112,4 +115,11 @@
     }
     return config;
   }
+
+  private Class<? extends ReplicationConfig> getReplicationConfigClass() {
+    if (Files.exists(site.etc_dir.resolve("replication"))) {
+      return FanoutReplicationConfig.class;
+    }
+    return ReplicationFileBasedConfig.class;
+  }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7233061..4b98abe 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -432,6 +432,79 @@
 
 	default: 15 minutes
 
+Directory `replication`
+--------------------
+The optional directory `$site_path/etc/replication` contains Git-style
+config files that controls the replication settings for the replication
+plugin. When present all `remote` sections from `replication.config` file are
+ignored.
+
+Files are composed of one `remote` section. Multiple `remote` sections or any
+other section makes the file invalid and skipped by the replication plugin.
+File name defines remote section name. Each section provides common configuration
+settings for one or more destination URLs. For more details how to setup `remote`
+sections please refer to the `replication.config` section.
+
+### Configuration example:
+
+Static configuration in `$site_path/etc/replication.config`:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+```
+
+Remote sections in `$site_path/etc/replication` directory:
+
+* File `$site_path/etc/replication/host-one.config`
+
+ ```
+ [remote]
+    url = gerrit2@host-one.example.com:/some/path/${name}.git
+ ```
+
+
+* File `$site_path/etc/replication/pubmirror.config`
+
+ ```
+  [remote]
+    url = mirror1.us.some.org:/pub/git/${name}.git
+    url = mirror2.us.some.org:/pub/git/${name}.git
+    url = mirror3.us.some.org:/pub/git/${name}.git
+    push = +refs/heads/*:refs/heads/*
+    push = +refs/tags/*:refs/tags/*
+    threads = 3
+    authGroup = Public Mirror Group
+    authGroup = Second Public Mirror Group
+ ```
+
+Replication plugin resolves config files to the following configuration:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+
+[remote "host-one"]
+    url = gerrit2@host-one.example.com:/some/path/${name}.git
+
+[remote "pubmirror"]
+    url = mirror1.us.some.org:/pub/git/${name}.git
+    url = mirror2.us.some.org:/pub/git/${name}.git
+    url = mirror3.us.some.org:/pub/git/${name}.git
+    push = +refs/heads/*:refs/heads/*
+    push = +refs/tags/*:refs/tags/*
+    threads = 3
+    authGroup = Public Mirror Group
+    authGroup = Second Public Mirror Group
+```
 
 File `secure.config`
 --------------------
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
index 14ac595..c48bdbd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
@@ -98,8 +98,12 @@
   }
 
   protected FileBasedConfig newReplicationConfig() {
+    return newReplicationConfig("replication.config");
+  }
+
+  protected FileBasedConfig newReplicationConfig(String path) {
     FileBasedConfig replicationConfig =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+        new FileBasedConfig(sitePaths.etc_dir.resolve(path).toFile(), FS.DETECTED);
     return replicationConfig;
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
index b6a3c4d..b1b9453 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
@@ -22,6 +22,8 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.junit.Test;
 
@@ -43,7 +45,7 @@
 
     replicationConfig = newReplicationFileBasedConfig();
 
-    newAutoReloadConfig().start();
+    newAutoReloadConfig(() -> newReplicationFileBasedConfig()).start();
 
     DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
     destinationsCollections.startup(workQueueMock);
@@ -66,6 +68,140 @@
   }
 
   @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsAdded() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    String remoteName2 = "foobar";
+    String remoteUrl2 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsRemoved() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    String remoteName2 = "foobar";
+    String remoteUrl2 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    assertThat(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete())
+        .isTrue();
+
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+  }
+
+  @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsModified() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    String remoteName2 = "bar";
+    String remoteUrl2 = "ssh://git@git.bar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    String remoteUrl3 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig.setString("remote", null, "url", remoteUrl3);
+    remoteConfig.save();
+
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl3);
+  }
+
+  @Test
   public void shouldNotAutoReloadReplicationConfigIfDisabled() throws Exception {
     String remoteName1 = "foo";
     String remoteUrl1 = "ssh://git@git.foo.com/${name}";
@@ -91,7 +227,8 @@
     assertThat(destinationsCollections.getAll(FilterType.ALL)).isEqualTo(destinations);
   }
 
-  private AutoReloadConfigDecorator newAutoReloadConfig() {
+  private AutoReloadConfigDecorator newAutoReloadConfig(
+      Supplier<ReplicationConfig> configSupplier) {
     AutoReloadRunnable autoReloadRunnable =
         new AutoReloadRunnable(
             configParser,
@@ -99,7 +236,7 @@
 
               @Override
               public ReplicationConfig get() {
-                return newReplicationFileBasedConfig();
+                return configSupplier.get();
               }
             },
             eventBus,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java
new file mode 100644
index 0000000..123e3e1
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java
@@ -0,0 +1,286 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.io.MoreFiles;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.io.IOException;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FanoutReplicationConfigTest extends AbstractConfigTest {
+
+  public FanoutReplicationConfigTest() throws IOException {
+    super();
+  }
+
+  String remoteName1 = "foo";
+  String remoteUrl1 = "ssh://git@git.somewhere.com/${name}";
+  String remoteName2 = "bar";
+  String remoteUrl2 = "ssh://git@git.elsewhere.com/${name}";
+
+  @Before
+  public void setupTests() {
+    FileBasedConfig config = newReplicationConfig();
+    try {
+      config.save();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void shouldSkipRemoteConfigFromReplicationConfig() throws Exception {
+    String remoteName = "foo";
+    String remoteUrl = "ssh://git@git.somewhere.com/${name}";
+
+    FileBasedConfig config = newReplicationConfig();
+    config.setString("remote", remoteName, "url", remoteUrl);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldLoadDestinationsFromMultipleFiles() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+
+    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
+    assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldIgnoreDestinationsFromSubdirectories() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig("subdirectory/" + remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
+  }
+
+  @Test
+  public void shouldIgnoreNonConfigFiles() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".yaml").toFile(),
+            FS.DETECTED);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
+  }
+
+  @Test(expected = ConfigInvalidException.class)
+  public void shouldThrowConfigInvalidExceptionWhenUrlIsMissingName() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", "ssh://git@git.elsewhere.com/name");
+    config.save();
+
+    newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+  }
+
+  @Test
+  public void shouldIgnoreEmptyConfigFile() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldIgnoreConfigWhenMoreThanOneRemoteInASingleFile() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.setString("remote", remoteName2, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldIgnoreConfigRemoteSection() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("replication", null, "url", remoteUrl1);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldReturnSameVersionWhenNoChanges() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    objectUnderTest = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    assertThat(objectUnderTest.getVersion()).isEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileAdded() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileIsModified() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileRemoved() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+    assertThat(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete())
+        .isTrue();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnReplicationConfigVersionWhenReplicationConfigDirectoryRemoved()
+      throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String replicationConfigVersion =
+        new ReplicationFileBasedConfig(sitePaths, pluginDataPath).getVersion();
+
+    MoreFiles.deleteRecursively(sitePaths.etc_dir.resolve("replication"), ALLOW_INSECURE);
+
+    assertThat(objectUnderTest.getVersion()).isEqualTo(replicationConfigVersion);
+  }
+
+  protected FileBasedConfig newRemoteConfig(String configFileName) {
+    return new FileBasedConfig(
+        sitePaths.etc_dir.resolve("replication/" + configFileName + ".config").toFile(),
+        FS.DETECTED);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
new file mode 100644
index 0000000..adb7fc8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -0,0 +1,266 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.After;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationFanoutIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+  @Inject private SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  private Path pluginDataDir;
+  private Path gitPath;
+  private Path storagePath;
+  private FileBasedConfig config;
+  private ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setAutoReload();
+    config.save();
+
+    setReplicationDestination("remote1", "suffix1", Optional.of("not-used-project"));
+
+    super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    cleanupReplicationTasks();
+    tasksStorage.disableDeleteForTesting(true);
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    if (Files.exists(sitePaths.etc_dir.resolve("replication"))) {
+      MoreFiles.deleteRecursively(
+          sitePaths.etc_dir.resolve("replication"), RecursiveDeleteOption.ALLOW_INSECURE);
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranch() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+    try (Repository repo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+    Project.NameKey targetProject1 = createTestProject(project + "replica1");
+    Project.NameKey targetProject2 = createTestProject(project + "replica2");
+
+    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+    try (Repository repo1 = repoManager.openRepository(targetProject1);
+        Repository repo2 = repoManager.openRepository(targetProject2)) {
+      waitUntil(
+          () ->
+              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+      Ref targetBranchRef1 = getRef(repo1, sourceRef);
+      assertThat(targetBranchRef1).isNotNull();
+      assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+      Ref targetBranchRef2 = getRef(repo2, sourceRef);
+      assertThat(targetBranchRef2).isNotNull();
+      assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createTestProject(project + "replica1");
+    createTestProject(project + "replica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setReplicationDestination(
+      String remoteName, List<String> replicaSuffixes, Optional<String> allProjects)
+      throws IOException {
+    FileBasedConfig remoteConfig =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/" + remoteName + ".config").toFile(),
+            FS.DETECTED);
+
+    setReplicationDestination(remoteConfig, replicaSuffixes, allProjects);
+  }
+
+  private void setAutoReload() throws IOException {
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
+
+  private void setReplicationDestination(
+      FileBasedConfig config, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", null, "url", replicaUrls);
+    config.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", null, "projects", prj));
+
+    config.save();
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  private void reloadConfig() {
+    getAutoReloadConfigDecoratorInstance().reload();
+  }
+
+  private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+    return getInstance(AutoReloadConfigDecorator.class);
+  }
+
+  private <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.list().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
+  }
+
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
+      for (Path path : files) {
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
+      }
+    }
+  }
+}