blob: ebbd9402d2b172c65fc4029f4c05838d34318cdc [file] [log] [blame]
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
@Singleton
public class RestForwarderScheduler {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
private final ScheduledExecutorService executor;
private final long retryIntervalMs;
public class CompletablePromise<V> extends CompletableFuture<V> {
private Future<V> future;
public CompletablePromise(Future<V> future) {
this.future = future;
executor.execute(this::tryToComplete);
}
private void tryToComplete() {
if (future.isDone()) {
try {
complete(future.get());
} catch (InterruptedException e) {
completeExceptionally(e);
} catch (ExecutionException e) {
completeExceptionally(e.getCause());
}
return;
}
if (future.isCancelled()) {
cancel(true);
return;
}
executor.execute(this::tryToComplete);
}
}
@Inject
public RestForwarderScheduler(
WorkQueue workQueue, Configuration cfg, Provider<Set<PeerInfo>> peerInfoProvider) {
int executorSize = peerInfoProvider.get().size() * cfg.index().threadPoolSize();
retryIntervalMs = cfg.index().retryInterval();
this.executor = workQueue.createQueue(executorSize, "RestForwarderScheduler");
}
@VisibleForTesting
public RestForwarderScheduler(ScheduledExecutorService executor) {
this.executor = executor;
retryIntervalMs = 0;
}
public CompletableFuture<Boolean> execute(RestForwarder.Request request) {
return execute(request, 0);
}
public CompletableFuture<Boolean> execute(RestForwarder.Request request, long delayMs) {
return supplyAsync(
request.toString(),
() -> {
try {
if (!request.execute()) {
log.atWarning().log(
"Rescheduling %s for retry after %d msec", request, retryIntervalMs);
return execute(request, retryIntervalMs);
}
return CompletableFuture.completedFuture(true);
} catch (ForwardingException e) {
log.atSevere().withCause(e).log("Forwarding of %s has failed", request);
return CompletableFuture.completedFuture(false);
}
},
executor,
delayMs);
}
private CompletableFuture<Boolean> supplyAsync(
String taskName,
Supplier<CompletableFuture<Boolean>> fn,
ScheduledExecutorService executor,
long delayMs) {
BooleanAsyncSupplier asyncSupplier = new BooleanAsyncSupplier(taskName, fn);
executor.schedule(asyncSupplier, delayMs, TimeUnit.MILLISECONDS);
return asyncSupplier.future();
}
static class BooleanAsyncSupplier implements Runnable {
private CompletableFuture<CompletableFuture<Boolean>> dep;
private Supplier<CompletableFuture<Boolean>> fn;
private String taskName;
BooleanAsyncSupplier(String taskName, Supplier<CompletableFuture<Boolean>> fn) {
this.taskName = taskName;
this.dep = new CompletableFuture<>();
this.fn = fn;
}
public CompletableFuture<Boolean> future() {
return dep.thenCompose(Function.identity());
}
@Override
public void run() {
try {
dep.complete(fn.get());
} catch (Throwable ex) {
dep.completeExceptionally(ex);
}
}
@Override
public String toString() {
return taskName;
}
}
}