commit | 6a7af2d433f05414e14bb9736d66f186eb67921a | [log] [tgz] |
---|---|---|
author | Marcin Czech <maczech@gmail.com> | Fri Aug 20 14:03:49 2021 +0200 |
committer | Marcin Czech <maczech@gmail.com> | Fri Aug 27 18:43:41 2021 +0200 |
tree | a89476d17ad6b547029cdcbf8889575e7ea77c5a | |
parent | 6ad7192d1ee2ef17cf1d27370605d8a2f48e61d3 [diff] |
Make publishing stream events wait for the message to be published To allow message publishing in a ordered way wait for the message publishing to be ACK by the broker before publishing next one. This ensure producing messages in an ordered way even if the underlying broker implementation does not supports message delivery ordering. The messages are ordered up until the events-broker, but not necessarily in the broker implementation. Waiting for ACK may introduce latency, because of the broker availability. To avoid blocking the main thread(UI or replication) wait for the ACK in a separate thread. NOTE: This change does not guarantee message *delivery* ordering but message producing ordering. Message delivery ordering depends on the broker implementation and should be addressed in a separate change. Bug: Issue 14909 Change-Id: I99628f78fd9fe60bfadf3ab913d47bb9280727ae
API of a generic events broker for use with Gerrit Code Review.
Enables the de-coupling between Gerrit, plugins and the different implementations of a generic events broker.
It is a quite common use case for consumers of this library to listen for Gerrit events and to stream them on a specific topic.
Since the implementation of such logic is always the same, this library provides a generic stream events publisher which will perform the relevant operations.
In order to listen and stream gerrit events, consumers of this API need to provide a named annotation with the name of the stream events topic and java.util.concurrent.Executor
binding annotated with StreamEventPublisherExecutor
annotation. A default single threaded implementation (StreamEventPublisherExecutor
) is provided by the library. The last step is to explicitly bind the Stream Events Publisher, as such:
import com.gerritforge.gerrit.eventbroker.StreamEventPublisher; import com.google.gerrit.extensions.registration.DynamicSet; import com.google.gerrit.server.events.EventListener; import com.google.inject.AbstractModule; import com.google.inject.TypeLiteral; import com.google.inject.name.Names; public class SomeModule extends AbstractModule { @Override protected void configure() { bind(new TypeLiteral<String>() { }) .annotatedWith(Names.named(StreamEventPublisher.STREAM_EVENTS_TOPIC)) .toInstance("name_of_the_stream_events_topic"); bind(Executor.class).annotatedWith(StreamEventPublisherExecutor.class).toProvider(StreamEventPublisherExecutorProvider.class); DynamicSet.bind(binder(), EventListener.class).to(StreamEventPublisher.class); } }
Note: To avoid message duplication Stream Events Publisher uses gerrit.instanceId and Event.instanceId to filter out forwarded events.