| // Copyright (C) 2021 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.googlesource.gerrit.plugins.eventseiffel; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNull; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.collect.Streams; |
| import com.googlesource.gerrit.plugins.eventseiffel.eiffel.EventKey; |
| import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEvent; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import org.junit.Ignore; |
| |
| @Ignore |
| public class TestEventPublisher implements Runnable, EiffelEventHub.Consumer { |
| /* Milliseconds after which the publisher-thread is expected to have handled the event. */ |
| public static int MILLIS_TO_WAIT = 500; |
| |
| Map<EventKey, EiffelEvent> publishedEvents = Maps.newConcurrentMap(); |
| List<EventKey> actualOrder = Lists.newArrayList(); |
| boolean keepTrying = true; |
| boolean pause = false; |
| EiffelEventQueue queue; |
| Object takeTrigger = new Object(); |
| Thread publisherThread; |
| |
| @Override |
| public void run() { |
| while (keepTrying) { |
| try { |
| while (pause) { |
| synchronized (takeTrigger) { |
| takeTrigger.wait(); |
| } |
| } |
| EiffelEvent taken = queue.take(1).iterator().next(); |
| EventKey takenKey = EventKey.fromEvent(taken); |
| TestEventStorage.INSTANCE.addId(takenKey); |
| publishedEvents.put(takenKey, taken); |
| actualOrder.add(takenKey); |
| queue.ack(Sets.newHashSet(taken)); |
| } catch (InterruptedException e) { |
| // Do nothing. |
| } |
| } |
| } |
| |
| public void assertOrder(List<? extends EiffelEvent> events) { |
| List<EventKey> keys = events.stream().map(EventKey::fromEvent).collect(Collectors.toList()); |
| waitForIt(); |
| EventKey next = keys.remove(0); |
| for (EventKey key : ImmutableList.copyOf(actualOrder)) { |
| if (key.equals(next)) { |
| if (keys.isEmpty()) { |
| return; |
| } |
| next = keys.remove(0); |
| } else { |
| for (EventKey notNext : keys) { |
| assertNotEquals(String.format("Event %s came before %s", notNext, next), key, notNext); |
| } |
| } |
| } |
| assertEquals( |
| String.format( |
| "Unpublished events when verifying order: %s", |
| String.join( |
| ", ", |
| Streams.concat(Stream.of(next.toString()), keys.stream().map(ek -> ek.toString())) |
| .collect(Collectors.toList()))), |
| 0, |
| keys.size()); |
| } |
| |
| public void assertNotPublished(EventKey key, String message) { |
| waitForIt(); |
| EiffelEvent event = publishedEvents.getOrDefault(key, null); |
| assertNull(message, event); |
| } |
| |
| public EiffelEvent getPublished(EventKey key) { |
| int wait = 50; |
| for (int totalWait = 0; totalWait < MILLIS_TO_WAIT; totalWait += wait) { |
| if (publishedEvents.containsKey(key)) { |
| return publishedEvents.get(key); |
| } |
| waitFor(wait); |
| } |
| return publishedEvents.getOrDefault(key, null); |
| } |
| |
| public void startPublishing() { |
| pause = false; |
| synchronized (takeTrigger) { |
| takeTrigger.notifyAll(); |
| } |
| waitForIt(); |
| } |
| |
| public void stopPublishing() { |
| publisherThread.interrupt(); |
| pause = true; |
| } |
| |
| private void waitForIt() { |
| waitFor(MILLIS_TO_WAIT); |
| } |
| |
| private void waitFor(int millis) { |
| try { |
| Thread.sleep(millis); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Override |
| public void start(EiffelEventQueue queue) { |
| this.queue = queue; |
| publisherThread = new Thread(this); |
| publisherThread.start(); |
| } |
| |
| @Override |
| public void stop() { |
| keepTrying = false; |
| pause = false; |
| publisherThread.interrupt(); |
| } |
| |
| @Override |
| public boolean isRunning() { |
| return publisherThread != null && publisherThread.isAlive(); |
| } |
| } |