blob: 841aa4f9acea316875df0c07b917a6500a7c8e1a [file] [log] [blame]
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.eventseiffel;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.flogger.FluentLogger;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.eventseiffel.cache.EiffelEventIdCache;
import com.googlesource.gerrit.plugins.eventseiffel.cache.EiffelEventIdLookupException;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.EventKey;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEvent;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class EiffelEventHubImpl implements EiffelEventHub {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public static int MAX_SIZE = 16384;
private final EiffelEventIdCache idCache;
private final EiffelEventHub.Consumer worker;
private final Deque<EiffelEvent> eventQueue = Lists.newLinkedList();
private final Object eventQueueLock = new Object();
private final Map<EventKey, EiffelEvent> eventsInQueue = Maps.newConcurrentMap();
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock idLookupLock = new ReentrantLock();
private final Condition readyForTake = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
private final AtomicInteger count = new AtomicInteger();
private Set<EiffelEvent> taken;
private boolean open = false;
@Inject
public EiffelEventHubImpl(EiffelEventIdCache idCache, EiffelEventHub.Consumer worker) {
this.idCache = idCache;
this.worker = worker;
}
@Override
public void push(EiffelEvent event, boolean force) throws InterruptedException {
checkState(event != null, "Event must not be null.");
checkState(EventKey.isSupported(event), "Unsupported type: " + event.meta.type.name());
EventKey key = EventKey.fromEvent(event);
checkState(
!force || key.supportsForce(), "This event does not support force: " + key.toString());
final ReentrantLock putLock = this.putLock;
final ReentrantLock idLookupLock = this.idLookupLock;
final AtomicInteger count = this.count;
int previousCount = -1;
putLock.lock();
try {
try {
while (open && count.get() == MAX_SIZE) {
notFull.await(5, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
notFull.signal();
throw e;
}
if (!open) {
throw new InterruptedException("EventHub is closed");
}
idLookupLock.lock();
try {
if (!force && getExistingId(key, true).isPresent()) {
logger.atFine().log("Event %s is already pushed.", key);
return;
}
synchronized (eventQueueLock) {
eventQueue.add(event);
logger.atFine().log("Added event %s to publish-queue.", key);
}
eventsInQueue.put(key, event);
previousCount = count.getAndIncrement();
if (previousCount + 1 < MAX_SIZE) {
notFull.signal();
}
} finally {
idLookupLock.unlock();
}
} catch (EiffelEventIdLookupException e) {
logger.atSevere().withCause(e).log("Failed to lookup event %s in id cache.", key);
} finally {
putLock.unlock();
}
if (previousCount == 0) {
signalReadyForTake();
}
}
@Override
public Optional<UUID> getExistingId(EventKey key) throws EiffelEventIdLookupException {
return getExistingId(key, false);
}
private Optional<UUID> getExistingId(EventKey key, boolean expectingAbsent)
throws EiffelEventIdLookupException {
final ReentrantLock idLookupLock = this.idLookupLock;
idLookupLock.lock();
try {
EiffelEvent fromHub = eventsInQueue.getOrDefault(key, null);
if (fromHub != null) {
if (key.matches(fromHub)) {
if (expectingAbsent) {
logger.atFine().log(
"Unexpected event found in queue: %s, with id: %s", key, fromHub.meta.id);
}
return Optional.of(fromHub.meta.id);
}
throw new EiffelEventIdLookupException(
"Key: %s does not match found event: %s.", key, EventKey.fromEvent(fromHub));
}
Optional<UUID> fromCache = idCache.getEventId(key);
if (fromCache.isPresent() && expectingAbsent) {
logger.atFine().log("Got unexpected event-id from cache: %s", fromCache.get());
}
return fromCache;
} finally {
idLookupLock.unlock();
}
}
@Override
public Optional<UUID> getScsForCommit(String repo, String commit, List<String> branches)
throws EiffelEventIdLookupException {
List<SourceChangeEventKey> keys =
branches.stream()
.map(branch -> SourceChangeEventKey.scsKey(repo, branch, commit))
.collect(Collectors.toList());
final ReentrantLock idLookupLock = this.idLookupLock;
idLookupLock.lock();
try {
Optional<UUID> id =
keys.stream()
.map(key -> eventsInQueue.get(key))
.filter(Objects::nonNull)
.map(event -> event.meta.id)
.findAny();
if (id.isPresent()) return id;
return idCache.getScsForCommit(repo, commit, branches);
} finally {
idLookupLock.unlock();
}
}
@Override
public boolean isOpen() {
return open;
}
@Override
public Set<EiffelEvent> take(int max) throws InterruptedException {
Set<EiffelEvent> toTake = Sets.newHashSet();
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
while (open && (count.get() == 0 || isTaken())) {
readyForTake.await(2, TimeUnit.SECONDS);
}
if (!open) {
return null;
}
synchronized (eventQueueLock) {
while (toTake.size() < max && !eventQueue.isEmpty()) {
toTake.add(eventQueue.poll());
}
}
taken = toTake;
} finally {
takeLock.unlock();
}
return toTake;
}
@Override
public void ack(Set<EiffelEvent> events) {
validateAckAttempt(events);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
final ReentrantLock idLookupLock = this.idLookupLock;
int previousCount = -1;
try {
takeLock.lockInterruptibly();
idLookupLock.lockInterruptibly();
try {
taken = null;
previousCount = count.get();
for (EiffelEvent event : events) {
idCache.putId(event);
EventKey key = EventKey.fromEvent(event);
eventsInQueue.remove(key);
count.decrementAndGet();
logger.atFine().log("Event %s have been ack:ed by publisher", key);
}
readyForTake.signal();
} finally {
takeLock.unlock();
idLookupLock.unlock();
}
} catch (InterruptedException ie) {
logger.atWarning().log("Interupted while ack:ing events, attempting to clean up.");
for (EiffelEvent event : events) {
EventKey key = EventKey.fromEvent(event);
if (eventsInQueue.containsKey(key)) {
idCache.putId(event);
eventsInQueue.remove(key);
count.decrementAndGet();
logger.atFine().log("Event %s have been ack:ed by publisher", key);
}
}
logger.atInfo().log("Cleanup complete.");
}
if (previousCount == MAX_SIZE) {
signalReadyForPut();
}
}
@Override
public void nak(Set<EiffelEvent> events) {
if (!open) {
return;
}
validateNakAttempt(events);
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
synchronized (eventQueueLock) {
for (EiffelEvent event : events) {
eventQueue.addFirst(event);
}
}
taken = null;
readyForTake.signal();
} finally {
takeLock.unlock();
}
}
@Override
public void startPublishing() {
worker.start(this);
open = true;
}
@Override
public void stopPublishing() {
open = false;
worker.stop();
/* Wake up event-producing threads. */
signalReadyForPut();
}
private boolean isTaken() {
return taken != null && !taken.isEmpty();
}
private void validateAckAttempt(Set<EiffelEvent> events) {
validateAckNakAttempt(events, "ack");
}
private void validateNakAttempt(Set<EiffelEvent> events) {
validateAckNakAttempt(events, "nak");
}
private void validateAckNakAttempt(Set<EiffelEvent> events, String action) {
if (!isTaken()) {
throw new IllegalArgumentException(String.format("Trying to %s without taken event", action));
}
if (!events.equals(taken)) {
throw new IllegalArgumentException(
String.format("Trying to %s events other than those taken.", action));
}
}
private void signalReadyForPut() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private void signalReadyForTake() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (!isTaken()) {
readyForTake.signal();
}
} finally {
takeLock.unlock();
}
}
}