blob: 43a8ef54e59ee27e2ad648cba7d4cbde4f75d558 [file] [log] [blame]
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.hooks.rtc.filters;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gerrit.common.ChangeListener;
import com.google.gerrit.server.events.ChangeEvent;
public class ChangeListenerAsyncDecorator<T extends ChangeListener> implements
ChangeListener {
private static final int MAX_PENDING_EVENTS = 1024;
private static final int MAX_BATCH_SIZE = 64;
private static final Logger log = LoggerFactory
.getLogger(ChangeListenerAsyncDecorator.class);
private T innerListener;
private final LinkedBlockingQueue<ChangeEvent> queue =
new LinkedBlockingQueue<ChangeEvent>(MAX_PENDING_EVENTS);
private ExecutorService executor;
public class ChangeRunner implements Runnable {
@Override
public void run() {
ArrayList<ChangeEvent> failedEvents = new ArrayList<ChangeEvent>();
for (int i = 0; !queue.isEmpty() && i < MAX_BATCH_SIZE; i++) {
ChangeEvent event = queue.remove();
try {
innerListener.onChangeEvent(event);
} catch (Throwable e) {
log.error("Execution of event " + event.getClass().getName() + "/"
+ event.toString()
+ " FAILED\nEvent requeued for later execution", event);
failedEvents.add(event);
}
}
queue.addAll(failedEvents);
}
}
public ChangeListenerAsyncDecorator(T innerListener, ExecutorService executor) {
this.innerListener = innerListener;
this.executor = executor;
}
@Override
public void onChangeEvent(ChangeEvent event) {
queue.add(event);
executor.submit(new ChangeRunner());
}
public Queue<ChangeEvent> getQueue() {
return queue;
}
}