Wait for confirms of published events and add them to cache
Solves: Jira GER-1561
Change-Id: Ie0a41d2db66bfd49f01e0721c1774759d0f7f8a6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java
index 8db4ce8..96e4093 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java
@@ -205,22 +205,36 @@
final ReentrantLock takeLock = this.takeLock;
final ReentrantLock idLookupLock = this.idLookupLock;
int previousCount = -1;
- takeLock.lock();
- idLookupLock.lock();
try {
- taken = null;
- previousCount = count.get();
- for (EiffelEvent event : events) {
- idCache.putId(event);
- EventKey key = EventKey.fromEvent(event);
- eventsInQueue.remove(key);
- count.decrementAndGet();
- logger.atFine().log("Event %s have been ack:ed by publisher", key);
+ takeLock.lockInterruptibly();
+ idLookupLock.lockInterruptibly();
+ try {
+ taken = null;
+ previousCount = count.get();
+ for (EiffelEvent event : events) {
+ idCache.putId(event);
+ EventKey key = EventKey.fromEvent(event);
+ eventsInQueue.remove(key);
+ count.decrementAndGet();
+ logger.atFine().log("Event %s have been ack:ed by publisher", key);
+ }
+ readyForTake.signal();
+ } finally {
+ takeLock.unlock();
+ idLookupLock.unlock();
}
- readyForTake.signal();
- } finally {
- takeLock.unlock();
- idLookupLock.unlock();
+ } catch (InterruptedException ie) {
+ logger.atWarning().log("Interupted while ack:ing events, attempting to clean up.");
+ for (EiffelEvent event : events) {
+ EventKey key = EventKey.fromEvent(event);
+ if (eventsInQueue.containsKey(key)) {
+ idCache.putId(event);
+ eventsInQueue.remove(key);
+ count.decrementAndGet();
+ logger.atFine().log("Event %s have been ack:ed by publisher", key);
+ }
+ }
+ logger.atInfo().log("Cleanup complete.");
}
if (previousCount == MAX_SIZE) {
signalReadyForPut();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
index b192bb3..34f4e5d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
@@ -41,10 +41,10 @@
try {
while (running) {
events = eventQueue.take(publisher.maxBatchSize());
- boolean unpublishedEvent = !events.isEmpty();
- while (running && unpublishedEvent) {
- unpublishedEvent = !publisher.publish(events);
- if (!unpublishedEvent) {
+ boolean unpublishedEvents = !events.isEmpty();
+ while (running && unpublishedEvents) {
+ unpublishedEvents = !publisher.publish(events);
+ if (!unpublishedEvents) {
eventQueue.ack(events);
events = null;
} else {
@@ -83,7 +83,11 @@
public void stop() {
logger.atInfo().log("Stopping publisher thread");
this.running = false;
- thread.interrupt();
+ try {
+ thread.join(5000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java
index 564c91e..7d45b66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java
@@ -26,6 +26,7 @@
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
@@ -63,9 +64,20 @@
if (config.confirmsEnabled()) {
try {
return channel.waitForConfirms(config.waitForConfirms());
- } catch (InterruptedException | TimeoutException e) {
- logger.atWarning().withCause(e).log("Failed to confirm delivery");
+ } catch (TimeoutException te) {
+ logger.atWarning().withCause(te).log("Failed to confirm delivery");
return false;
+ } catch (InterruptedException ie) {
+ logger.atWarning().log("Interupted while waiting for confirms, new attempt");
+ try {
+ channel.waitForConfirms(2000);
+ return true;
+ } catch (InterruptedException | TimeoutException e) {
+ logger.atSevere().withCause(e).log(
+ "Failed to confirm pushed events before cleanup: %s",
+ Arrays.toString(events.toArray()));
+ return false;
+ }
}
}
return true;