blob: 9d13ba9a9ae2e2f0cc7775564981319bec5f9043 [file] [log] [blame]
/*
* Copyright 2013-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.facebook.buck.event;
import com.facebook.buck.timing.Clock;
import com.facebook.buck.timing.DefaultClock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.eventbus.AsyncEventBus;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Thin wrapper around guava event bus.
*/
public class BuckEventBus {
private static Supplier<Long> DEFAULT_THREAD_ID_SUPPLIER = new Supplier<Long>() {
@Override
public Long get() {
return Thread.currentThread().getId();
}
};
/** This is the {@link Executor} that powers the {@link AsyncEventBus}. */
private final ThreadPoolExecutor executor;
private final AsyncEventBus eventBus;
private final Clock clock;
private final Supplier<Long> threadIdSupplier;
public BuckEventBus() {
this(new DefaultClock(), getDefaultThreadIdSupplier());
}
public BuckEventBus(Clock clock, Supplier<Long> threadIdSupplier) {
// We instantiate this.executor using the implementation of Executors.newSingleThreadExecutor().
// The problem with newSingleThreadExecutor is that it returns an ExecutorService; however, we
// need to use the getQueue() method that is available in ThreadPoolExecutor, but is not
// exposed via the ThreadPoolExecutor interface.
this.executor = new ThreadPoolExecutor(/* corePoolSize */ 1,
/* maximumPoolSize */ 1,
/* keepAliveTime */ 0L,
TimeUnit.MILLISECONDS,
/* workQueue */ new LinkedBlockingQueue<Runnable>(),
new ThreadPoolExecutor.DiscardPolicy());
this.eventBus = new AsyncEventBus("buck-build-events", executor);
this.clock = Preconditions.checkNotNull(clock);
this.threadIdSupplier = Preconditions.checkNotNull(threadIdSupplier);
}
public static Supplier<Long> getDefaultThreadIdSupplier() {
return DEFAULT_THREAD_ID_SUPPLIER;
}
public void post(BuckEvent event) {
event.configure(clock.currentTimeMillis(), clock.nanoTime(), threadIdSupplier.get());
eventBus.post(event);
}
public void register(Object object) {
eventBus.register(object);
}
/**
* Execute {@link Runnable}s in the underlying {@link ExecutorService} until there aren't any
* left to execute. This differs from {@link ExecutorService#shutdown()} because this method
* still allows new {@link Runnable}s to be submitted to the {@link ExecutorService} while this
* method is running.
*/
@VisibleForTesting
public void flushForTesting() {
while (!executor.getQueue().isEmpty()) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* Posts the specified event directly to the underlying {@link AsyncEventBus}.
* <p>
* TODO(royw): Try to eliminate this method.
*/
@VisibleForTesting
public void postDirectlyToAsyncEventBusForTesting(BuckEvent event) {
eventBus.post(event);
flushForTesting();
}
/**
* @return the underlying {@link ExecutorService} that is being used to process the events posted
* to this event bus.
*/
public ExecutorService getExecutorService() {
return executor;
}
public Clock getClock() {
return clock;
}
public Supplier<Long> getThreadIdSupplier() {
return threadIdSupplier;
}
}