blob: e0e4d64d8a7a7afbd3d5340cfa21bc6c092c6ef2 [file] [log] [blame]
/*
* Copyright 2013 gitblit.com.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gitblit.fanout;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for Fanout service implementations.
*
* Subclass implementations can be used as a Sparkleshare PubSub notification
* server. This allows Sparkleshare to be used in conjunction with Gitblit
* behind a corporate firewall that restricts or prohibits client internet access
* to the default Sparkleshare PubSub server: notifications.sparkleshare.org
*
* @author James Moger
*
*/
public abstract class FanoutService implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
public final static int DEFAULT_PORT = 17000;
protected final static int serviceTimeout = 5000;
protected final String host;
protected final int port;
protected final String name;
private Thread serviceThread;
private final Map<String, FanoutServiceConnection> connections;
private final Map<String, Set<FanoutServiceConnection>> subscriptions;
protected final AtomicBoolean isRunning;
private final AtomicBoolean strictRequestTermination;
private final AtomicBoolean allowAllChannelAnnouncements;
private final AtomicInteger concurrentConnectionLimit;
private final Date bootDate;
private final AtomicLong rejectedConnectionCount;
private final AtomicInteger peakConnectionCount;
private final AtomicLong totalConnections;
private final AtomicLong totalAnnouncements;
private final AtomicLong totalMessages;
private final AtomicLong totalSubscribes;
private final AtomicLong totalUnsubscribes;
private final AtomicLong totalPings;
protected FanoutService(String host, int port, String name) {
this.host = host;
this.port = port;
this.name = name;
connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
isRunning = new AtomicBoolean(false);
strictRequestTermination = new AtomicBoolean(false);
allowAllChannelAnnouncements = new AtomicBoolean(false);
concurrentConnectionLimit = new AtomicInteger(0);
bootDate = new Date();
rejectedConnectionCount = new AtomicLong(0);
peakConnectionCount = new AtomicInteger(0);
totalConnections = new AtomicLong(0);
totalAnnouncements = new AtomicLong(0);
totalMessages = new AtomicLong(0);
totalSubscribes = new AtomicLong(0);
totalUnsubscribes = new AtomicLong(0);
totalPings = new AtomicLong(0);
}
/*
* Abstract methods
*/
protected abstract boolean isConnected();
protected abstract boolean connect();
protected abstract void listen() throws IOException;
protected abstract void disconnect();
/**
* Returns true if the service requires \n request termination.
*
* @return true if request requires \n termination
*/
public boolean isStrictRequestTermination() {
return strictRequestTermination.get();
}
/**
* Control the termination of fanout requests. If true, fanout requests must
* be terminated with \n. If false, fanout requests may be terminated with
* \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
*
* @param isStrictTermination
*/
public void setStrictRequestTermination(boolean isStrictTermination) {
strictRequestTermination.set(isStrictTermination);
}
/**
* Returns the maximum allowable concurrent fanout connections.
*
* @return the maximum allowable concurrent connection count
*/
public int getConcurrentConnectionLimit() {
return concurrentConnectionLimit.get();
}
/**
* Sets the maximum allowable concurrent fanout connection count.
*
* @param value
*/
public void setConcurrentConnectionLimit(int value) {
concurrentConnectionLimit.set(value);
}
/**
* Returns true if connections are allowed to announce on the all channel.
*
* @return true if connections are allowed to announce on the all channel
*/
public boolean allowAllChannelAnnouncements() {
return allowAllChannelAnnouncements.get();
}
/**
* Allows/prohibits connections from announcing on the ALL channel.
*
* @param value
*/
public void setAllowAllChannelAnnouncements(boolean value) {
allowAllChannelAnnouncements.set(value);
}
/**
* Returns the current connections
*
* @param channel
* @return map of current connections keyed by their id
*/
public Map<String, FanoutServiceConnection> getCurrentConnections() {
return connections;
}
/**
* Returns all subscriptions
*
* @return map of current subscriptions keyed by channel name
*/
public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
return subscriptions;
}
/**
* Returns the subscriptions for the specified channel
*
* @param channel
* @return set of subscribed connections for the specified channel
*/
public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {
return subscriptions.get(channel);
}
/**
* Returns the runtime statistics object for this service.
*
* @return stats
*/
public FanoutStats getStatistics() {
FanoutStats stats = new FanoutStats();
// settings
stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
stats.strictRequestTermination = isStrictRequestTermination();
// runtime stats
stats.bootDate = bootDate;
stats.rejectedConnectionCount = rejectedConnectionCount.get();
stats.peakConnectionCount = peakConnectionCount.get();
stats.totalConnections = totalConnections.get();
stats.totalAnnouncements = totalAnnouncements.get();
stats.totalMessages = totalMessages.get();
stats.totalSubscribes = totalSubscribes.get();
stats.totalUnsubscribes = totalUnsubscribes.get();
stats.totalPings = totalPings.get();
stats.currentConnections = connections.size();
stats.currentChannels = subscriptions.size();
stats.currentSubscriptions = subscriptions.size() * connections.size();
return stats;
}
/**
* Returns true if the service is ready.
*
* @return true, if the service is ready
*/
public boolean isReady() {
if (isRunning.get()) {
return isConnected();
}
return false;
}
/**
* Start the Fanout service thread and immediatel return.
*
*/
public void start() {
if (isRunning.get()) {
logger.warn(MessageFormat.format("{0} is already running", name));
return;
}
serviceThread = new Thread(this);
serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
serviceThread.start();
}
/**
* Start the Fanout service thread and wait until it is accepting connections.
*
*/
public void startSynchronously() {
start();
while (!isReady()) {
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
}
/**
* Stop the Fanout service. This method returns when the service has been
* completely shutdown.
*/
public void stop() {
if (!isRunning.get()) {
logger.warn(MessageFormat.format("{0} is not running", name));
return;
}
logger.info(MessageFormat.format("stopping {0}...", name));
isRunning.set(false);
try {
if (serviceThread != null) {
serviceThread.join();
serviceThread = null;
}
} catch (InterruptedException e1) {
logger.error("", e1);
}
logger.info(MessageFormat.format("stopped {0}", name));
}
/**
* Main execution method of the service
*/
@Override
public final void run() {
disconnect();
resetState();
isRunning.set(true);
while (isRunning.get()) {
if (connect()) {
try {
listen();
} catch (IOException e) {
logger.error(MessageFormat.format("error processing {0}", name), e);
isRunning.set(false);
}
} else {
try {
Thread.sleep(serviceTimeout);
} catch (InterruptedException x) {
}
}
}
disconnect();
resetState();
}
protected void resetState() {
// reset state data
connections.clear();
subscriptions.clear();
rejectedConnectionCount.set(0);
peakConnectionCount.set(0);
totalConnections.set(0);
totalAnnouncements.set(0);
totalMessages.set(0);
totalSubscribes.set(0);
totalUnsubscribes.set(0);
totalPings.set(0);
}
/**
* Configure the client connection socket.
*
* @param socket
* @throws SocketException
*/
protected void configureClientSocket(Socket socket) throws SocketException {
socket.setKeepAlive(true);
socket.setSoLinger(true, 0); // immediately discard any remaining data
}
/**
* Add the connection to the connections map.
*
* @param connection
* @return false if the connection was rejected due to too many concurrent
* connections
*/
protected boolean addConnection(FanoutServiceConnection connection) {
int limit = getConcurrentConnectionLimit();
if (limit > 0 && connections.size() > limit) {
logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
increment(rejectedConnectionCount);
connection.busy();
return false;
}
// add the connection to our map
connections.put(connection.id, connection);
// track peak number of concurrent connections
if (connections.size() > peakConnectionCount.get()) {
peakConnectionCount.set(connections.size());
}
logger.info("fanout new connection " + connection.id);
connection.connected();
return true;
}
/**
* Remove the connection from the connections list and from subscriptions.
*
* @param connection
*/
protected void removeConnection(FanoutServiceConnection connection) {
connections.remove(connection.id);
Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();
Set<FanoutServiceConnection> subscriptions = entry.getValue();
subscriptions.remove(connection);
if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {
if (subscriptions.size() == 0) {
itr.remove();
logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));
}
}
}
logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
}
/**
* Tests to see if the connection is being monitored by the service.
*
* @param connection
* @return true if the service is monitoring the connection
*/
protected boolean hasConnection(FanoutServiceConnection connection) {
return connections.containsKey(connection.id);
}
/**
* Reply to a connection on the specified channel.
*
* @param connection
* @param channel
* @param message
* @return the reply
*/
protected String reply(FanoutServiceConnection connection, String channel, String message) {
if (channel != null && channel.length() > 0) {
increment(totalMessages);
}
return connection.reply(channel, message);
}
/**
* Service method to broadcast a message to all connections.
*
* @param message
*/
public void broadcastAll(String message) {
broadcast(connections.values(), FanoutConstants.CH_ALL, message);
increment(totalAnnouncements);
}
/**
* Service method to broadcast a message to connections subscribed to the
* channel.
*
* @param message
*/
public void broadcast(String channel, String message) {
List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
broadcast(connections, channel, message);
increment(totalAnnouncements);
}
/**
* Broadcast a message to connections subscribed to the specified channel.
*
* @param connections
* @param channel
* @param message
*/
protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
for (FanoutServiceConnection connection : connections) {
reply(connection, channel, message);
}
}
/**
* Process an incoming Fanout request.
*
* @param connection
* @param req
* @return the reply to the request, may be null
*/
protected String processRequest(FanoutServiceConnection connection, String req) {
logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));
String[] fields = req.split(" ", 3);
String action = fields[0];
String channel = fields.length >= 2 ? fields[1] : null;
String message = fields.length >= 3 ? fields[2] : null;
try {
return processRequest(connection, action, channel, message);
} catch (IllegalArgumentException e) {
// invalid action
logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));
logger.error(asHexArray(req));
}
return null;
}
/**
* Process the Fanout request.
*
* @param connection
* @param action
* @param channel
* @param message
* @return the reply to the request, may be null
* @throws IllegalArgumentException
*/
protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {
if ("ping".equals(action)) {
// ping
increment(totalPings);
return reply(connection, null, "" + System.currentTimeMillis());
} else if ("info".equals(action)) {
// info
String info = getStatistics().info();
return reply(connection, null, info);
} else if ("announce".equals(action)) {
// announcement
if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {
// prohibiting connection-sourced all announcements
logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));
} else if ("debug".equals(channel)) {
// prohibiting connection-sourced debug announcements
logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));
} else {
// acceptable announcement
List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
connections.remove(connection); // remove announcer
broadcast(connections, channel, message);
increment(totalAnnouncements);
}
} else if ("subscribe".equals(action)) {
// subscribe
if (!subscriptions.containsKey(channel)) {
logger.info(MessageFormat.format("fanout new channel {0}", channel));
subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());
}
subscriptions.get(channel).add(connection);
logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));
increment(totalSubscribes);
} else if ("unsubscribe".equals(action)) {
// unsubscribe
if (subscriptions.containsKey(channel)) {
subscriptions.get(channel).remove(connection);
if (subscriptions.get(channel).size() == 0) {
subscriptions.remove(channel);
}
increment(totalUnsubscribes);
}
} else {
// invalid action
throw new IllegalArgumentException(action);
}
return null;
}
private String asHexArray(String req) {
StringBuilder sb = new StringBuilder();
for (char c : req.toCharArray()) {
sb.append(Integer.toHexString(c)).append(' ');
}
return "[ " + sb.toString().trim() + " ]";
}
/**
* Increment a long and prevent negative rollover.
*
* @param counter
*/
private void increment(AtomicLong counter) {
long v = counter.incrementAndGet();
if (v < 0) {
counter.set(0);
}
}
@Override
public String toString() {
return name;
}
}