Add extra logging related to publishing of events
Change-Id: Ic9b0c2a1abc3e6856c354e5f72493667f5ab0d66
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 5697431..6a4b3e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -88,7 +88,7 @@
sessionMon.wait(1000);
}
}
- if (!publishEvent(topicEvent) && !queue.offer(topicEvent)) {
+ if (!publishEvent(topicEvent) && !offerToQueue(topicEvent)) {
logger.atSevere().log("Event lost: %s", gson.toJson(topicEvent.event));
}
} catch (InterruptedException e) {
@@ -102,7 +102,7 @@
public void cancel() {
canceled = true;
if (queue.isEmpty()) {
- queue.offer(new TopicEvent(null, EOS, null));
+ offerToQueue(new TopicEvent(null, EOS, null));
}
}
@@ -151,7 +151,7 @@
logger.atFine().log(
"Adding event %s for topic %s to publisher queue", topicEvent.event, topicEvent.topic);
synchronized (lostEventCountLock) {
- if (queue.offer(topicEvent)) {
+ if (offerToQueue(topicEvent)) {
if (lostEventCount > 0) {
logger.atWarning().log(
"Event queue is no longer full, %d events were lost", lostEventCount);
@@ -165,6 +165,11 @@
}
}
+ private boolean offerToQueue(TopicEvent topicEvent) {
+ logger.atFine().log("Event %s is offered to queue", topicEvent.event);
+ return queue.offer(topicEvent);
+ }
+
private boolean isConnected() {
return session != null && session.isOpen();
}
@@ -176,7 +181,20 @@
return false;
}
eventsToBeAcked.put(seqNbr, topicEvent);
- return session.publish(gson.toJson(topicEvent.event), topicEvent.topic);
+ String jsonEvent = gson.toJson(topicEvent.event);
+ boolean published = session.publish(jsonEvent, topicEvent.topic);
+ if (published) {
+ if (logger.atFinest().isEnabled()) {
+ logger.atFinest().log(
+ "Full event publishing to topic %s with sequence number %d in json: %s",
+ topicEvent.topic, seqNbr, jsonEvent);
+ } else {
+ logger.atFine().log(
+ "Publishing event %s with sequence number %d to topic %s",
+ topicEvent.event, seqNbr, topicEvent.topic);
+ }
+ }
+ return published;
}
boolean published = session.publish(gson.toJson(topicEvent.event), topicEvent.topic);
topicEvent.published.set(published);