| // Copyright (C) 2021 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.googlesource.gerrit.plugins.eventseiffel; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.flogger.FluentLogger; |
| import com.google.inject.Inject; |
| import com.googlesource.gerrit.plugins.eventseiffel.cache.EiffelEventIdCache; |
| import com.googlesource.gerrit.plugins.eventseiffel.cache.EiffelEventIdLookupException; |
| import com.googlesource.gerrit.plugins.eventseiffel.eiffel.EventKey; |
| import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey; |
| import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEvent; |
| import java.util.Deque; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.stream.Collectors; |
| |
| public class EiffelEventHubImpl implements EiffelEventHub { |
| private static final FluentLogger logger = FluentLogger.forEnclosingClass(); |
| public static int MAX_SIZE = 16384; |
| |
| private final EiffelEventIdCache idCache; |
| private final EiffelEventHub.Consumer worker; |
| private final Deque<EiffelEvent> eventQueue = Lists.newLinkedList(); |
| private final Object eventQueueLock = new Object(); |
| private final Map<EventKey, EiffelEvent> eventsInQueue = Maps.newConcurrentMap(); |
| private final ReentrantLock takeLock = new ReentrantLock(); |
| private final ReentrantLock putLock = new ReentrantLock(); |
| private final ReentrantLock idLookupLock = new ReentrantLock(); |
| private final Condition readyForTake = takeLock.newCondition(); |
| private final Condition notFull = putLock.newCondition(); |
| private final AtomicInteger count = new AtomicInteger(); |
| private Set<EiffelEvent> taken; |
| private boolean open = false; |
| |
| @Inject |
| public EiffelEventHubImpl(EiffelEventIdCache idCache, EiffelEventHub.Consumer worker) { |
| this.idCache = idCache; |
| this.worker = worker; |
| } |
| |
| @Override |
| public void push(EiffelEvent event, boolean force) throws InterruptedException { |
| checkState(event != null, "Event must not be null."); |
| checkState(EventKey.isSupported(event), "Unsupported type: " + event.meta.type.name()); |
| |
| EventKey key = EventKey.fromEvent(event); |
| checkState( |
| !force || key.supportsForce(), "This event does not support force: " + key.toString()); |
| |
| final ReentrantLock putLock = this.putLock; |
| final ReentrantLock idLookupLock = this.idLookupLock; |
| final AtomicInteger count = this.count; |
| int previousCount = -1; |
| putLock.lock(); |
| try { |
| try { |
| while (open && count.get() == MAX_SIZE) { |
| notFull.await(5, TimeUnit.SECONDS); |
| } |
| } catch (InterruptedException e) { |
| notFull.signal(); |
| throw e; |
| } |
| if (!open) { |
| throw new InterruptedException("EventHub is closed"); |
| } |
| idLookupLock.lock(); |
| try { |
| if (!force && getExistingId(key, true).isPresent()) { |
| logger.atFine().log("Event %s is already pushed.", key); |
| return; |
| } |
| synchronized (eventQueueLock) { |
| eventQueue.add(event); |
| logger.atFine().log("Added event %s to publish-queue.", key); |
| } |
| eventsInQueue.put(key, event); |
| previousCount = count.getAndIncrement(); |
| if (previousCount + 1 < MAX_SIZE) { |
| notFull.signal(); |
| } |
| } finally { |
| idLookupLock.unlock(); |
| } |
| } catch (EiffelEventIdLookupException e) { |
| logger.atSevere().withCause(e).log("Failed to lookup event %s in id cache.", key); |
| } finally { |
| putLock.unlock(); |
| } |
| if (previousCount == 0) { |
| signalReadyForTake(); |
| } |
| } |
| |
| @Override |
| public Optional<UUID> getExistingId(EventKey key) throws EiffelEventIdLookupException { |
| return getExistingId(key, false); |
| } |
| |
| private Optional<UUID> getExistingId(EventKey key, boolean expectingAbsent) |
| throws EiffelEventIdLookupException { |
| final ReentrantLock idLookupLock = this.idLookupLock; |
| idLookupLock.lock(); |
| try { |
| EiffelEvent fromHub = eventsInQueue.getOrDefault(key, null); |
| if (fromHub != null) { |
| if (key.matches(fromHub)) { |
| if (expectingAbsent) { |
| logger.atFine().log( |
| "Unexpected event found in queue: %s, with id: %s", key, fromHub.meta.id); |
| } |
| return Optional.of(fromHub.meta.id); |
| } |
| throw new EiffelEventIdLookupException( |
| "Key: %s does not match found event: %s.", key, EventKey.fromEvent(fromHub)); |
| } |
| Optional<UUID> fromCache = idCache.getEventId(key); |
| if (fromCache.isPresent() && expectingAbsent) { |
| logger.atFine().log("Got unexpected event-id from cache: %s", fromCache.get()); |
| } |
| return fromCache; |
| } finally { |
| idLookupLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Optional<UUID> getScsForCommit(String repo, String commit, List<String> branches) |
| throws EiffelEventIdLookupException { |
| List<SourceChangeEventKey> keys = |
| branches.stream() |
| .map(branch -> SourceChangeEventKey.scsKey(repo, branch, commit)) |
| .collect(Collectors.toList()); |
| final ReentrantLock idLookupLock = this.idLookupLock; |
| idLookupLock.lock(); |
| try { |
| Optional<UUID> id = |
| keys.stream() |
| .map(key -> eventsInQueue.get(key)) |
| .filter(Objects::nonNull) |
| .map(event -> event.meta.id) |
| .findAny(); |
| if (id.isPresent()) return id; |
| return idCache.getScsForCommit(repo, commit, branches); |
| } finally { |
| idLookupLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean isOpen() { |
| return open; |
| } |
| |
| @Override |
| public Set<EiffelEvent> take(int max) throws InterruptedException { |
| Set<EiffelEvent> toTake = Sets.newHashSet(); |
| final AtomicInteger count = this.count; |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lock(); |
| try { |
| while (open && (count.get() == 0 || isTaken())) { |
| readyForTake.await(2, TimeUnit.SECONDS); |
| } |
| if (!open) { |
| return null; |
| } |
| synchronized (eventQueueLock) { |
| while (toTake.size() < max && !eventQueue.isEmpty()) { |
| toTake.add(eventQueue.poll()); |
| } |
| } |
| taken = toTake; |
| } finally { |
| takeLock.unlock(); |
| } |
| return toTake; |
| } |
| |
| @Override |
| public void ack(Set<EiffelEvent> events) { |
| validateAckAttempt(events); |
| final AtomicInteger count = this.count; |
| final ReentrantLock takeLock = this.takeLock; |
| final ReentrantLock idLookupLock = this.idLookupLock; |
| int previousCount = -1; |
| try { |
| takeLock.lockInterruptibly(); |
| idLookupLock.lockInterruptibly(); |
| try { |
| taken = null; |
| previousCount = count.get(); |
| for (EiffelEvent event : events) { |
| idCache.putId(event); |
| EventKey key = EventKey.fromEvent(event); |
| eventsInQueue.remove(key); |
| count.decrementAndGet(); |
| logger.atFine().log("Event %s have been ack:ed by publisher", key); |
| } |
| readyForTake.signal(); |
| } finally { |
| takeLock.unlock(); |
| idLookupLock.unlock(); |
| } |
| } catch (InterruptedException ie) { |
| logger.atWarning().log("Interupted while ack:ing events, attempting to clean up."); |
| for (EiffelEvent event : events) { |
| EventKey key = EventKey.fromEvent(event); |
| if (eventsInQueue.containsKey(key)) { |
| idCache.putId(event); |
| eventsInQueue.remove(key); |
| count.decrementAndGet(); |
| logger.atFine().log("Event %s have been ack:ed by publisher", key); |
| } |
| } |
| logger.atInfo().log("Cleanup complete."); |
| } |
| if (previousCount == MAX_SIZE) { |
| signalReadyForPut(); |
| } |
| } |
| |
| @Override |
| public void nak(Set<EiffelEvent> events) { |
| if (!open) { |
| return; |
| } |
| validateNakAttempt(events); |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lock(); |
| try { |
| synchronized (eventQueueLock) { |
| for (EiffelEvent event : events) { |
| eventQueue.addFirst(event); |
| } |
| } |
| taken = null; |
| readyForTake.signal(); |
| } finally { |
| takeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void startPublishing() { |
| worker.start(this); |
| open = true; |
| } |
| |
| @Override |
| public void stopPublishing() { |
| open = false; |
| worker.stop(); |
| /* Wake up event-producing threads. */ |
| signalReadyForPut(); |
| } |
| |
| private boolean isTaken() { |
| return taken != null && !taken.isEmpty(); |
| } |
| |
| private void validateAckAttempt(Set<EiffelEvent> events) { |
| validateAckNakAttempt(events, "ack"); |
| } |
| |
| private void validateNakAttempt(Set<EiffelEvent> events) { |
| validateAckNakAttempt(events, "nak"); |
| } |
| |
| private void validateAckNakAttempt(Set<EiffelEvent> events, String action) { |
| if (!isTaken()) { |
| throw new IllegalArgumentException(String.format("Trying to %s without taken event", action)); |
| } |
| if (!events.equals(taken)) { |
| throw new IllegalArgumentException( |
| String.format("Trying to %s events other than those taken.", action)); |
| } |
| } |
| |
| private void signalReadyForPut() { |
| final ReentrantLock putLock = this.putLock; |
| putLock.lock(); |
| try { |
| notFull.signal(); |
| } finally { |
| putLock.unlock(); |
| } |
| } |
| |
| private void signalReadyForTake() { |
| final ReentrantLock takeLock = this.takeLock; |
| takeLock.lock(); |
| try { |
| if (!isTaken()) { |
| readyForTake.signal(); |
| } |
| } finally { |
| takeLock.unlock(); |
| } |
| } |
| } |