diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index afa716f..bf0c438 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -30,7 +30,6 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +44,9 @@
   private final Properties properties;
   private final Gson gson;
   private final Timer monitorTimer = new Timer();
-  private EventListener eventListener;
   private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
+  private final Object sessionMon = new Object();
+  private EventListener eventListener;
   private CancelableRunnable publisher;
   private Thread publisherThread;
 
@@ -60,15 +60,22 @@
     this.gson = gson;
     this.eventListener =
         new EventListener() {
+          private int lostEventCount = 0;
+
           @Override
           public void onEvent(Event event) {
-            try {
-              if (!publisherThread.isAlive()) {
-                publisherThread.start();
+            if (!publisherThread.isAlive()) {
+              publisherThread.start();
+            }
+            if (queue.offer(event)) {
+              if (lostEventCount > 0) {
+                LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
+                lostEventCount = 0;
               }
-              queue.put(event);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Failed to queue event", e);
+            } else {
+              if (lostEventCount++ % 10 == 0) {
+                LOGGER.error("Event queue is full, lost {} event(s)", lostEventCount);
+              }
             }
           }
         };
@@ -81,20 +88,17 @@
           public void run() {
             while (!canceled) {
               try {
-                if (isConnected()) {
-                  Event event = queue.poll(200, TimeUnit.MILLISECONDS);
-                  if (event != null) {
-                    if (isConnected()) {
-                      publishEvent(event);
-                    } else {
-                      queue.put(event);
-                    }
+                Event event = queue.take();
+                while (!isConnected() && !canceled) {
+                  synchronized (sessionMon) {
+                    sessionMon.wait(1000);
                   }
-                } else {
-                  Thread.sleep(1000);
+                }
+                if (!publishEvent(event) && !queue.offer(event)) {
+                  LOGGER.error("Event lost: {}", gson.toJson(event));
                 }
               } catch (InterruptedException e) {
-                LOGGER.warn("Interupted while taking event", e);
+                LOGGER.warn("Interupted while waiting for event or connection.", e);
               }
             }
           }
@@ -119,14 +123,14 @@
     publisherThread = new Thread(publisher);
     publisherThread.start();
     if (!isConnected()) {
-      session.connect();
+      connect();
       monitorTimer.schedule(
           new TimerTask() {
             @Override
             public void run() {
               if (!isConnected()) {
                 LOGGER.info("#start: try to reconnect");
-                session.connect();
+                connect();
               }
             }
           },
@@ -168,7 +172,15 @@
     return session != null && session.isOpen();
   }
 
-  private void publishEvent(Event event) {
-    session.publish(gson.toJson(event));
+  private boolean publishEvent(Event event) {
+    return session.publish(gson.toJson(event));
+  }
+
+  private void connect() {
+    if (!isConnected() && session.connect()) {
+      synchronized (sessionMon) {
+        sessionMon.notifyAll();
+      }
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
index 180aeda..7d0669f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
@@ -16,9 +16,9 @@
 public interface Session {
   boolean isOpen();
 
-  void connect();
+  boolean connect();
 
   void disconnect();
 
-  void publish(String message);
+  boolean publish(String message);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index d0a872b..25ec9f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -133,10 +133,10 @@
   }
 
   @Override
-  public void connect() {
+  public boolean connect() {
     if (connection != null && connection.isOpen()) {
       LOGGER.info(MSG("Already connected."));
-      return;
+      return true;
     }
     AMQP amqp = properties.getSection(AMQP.class);
     LOGGER.info(MSG("Connect to {}..."), amqp.uri);
@@ -157,14 +157,17 @@
         connection = factory.newConnection();
         connection.addShutdownListener(connectionListener);
         LOGGER.info(MSG("Connection established."));
+        return true;
       }
     } catch (URISyntaxException ex) {
       LOGGER.error(MSG("URI syntax error: {}"), amqp.uri);
+
     } catch (IOException ex) {
       LOGGER.error(MSG("Connection cannot be opened."), ex);
     } catch (KeyManagementException | NoSuchAlgorithmException ex) {
       LOGGER.error(MSG("Security error when opening connection."), ex);
     }
+    return false;
   }
 
   @Override
@@ -194,7 +197,7 @@
   }
 
   @Override
-  public void publish(String messageBody) {
+  public boolean publish(String messageBody) {
     if (channel == null || !channel.isOpen()) {
       channel = getChannel();
     }
@@ -208,9 +211,13 @@
             message.routingKey,
             properties.getAMQProperties().getBasicProperties(),
             messageBody.getBytes(CharEncoding.UTF_8));
+        return true;
       } catch (IOException ex) {
         LOGGER.error(MSG("Error when sending meessage."), ex);
+        return false;
       }
     }
+    LOGGER.error(MSG("Cannot open channel."));
+    return false;
   }
 }
