/* | |
* Copyright 2013 gitblit.com. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.gitblit.fanout; | |
import java.io.IOException; | |
import java.net.Socket; | |
import java.net.SocketException; | |
import java.text.MessageFormat; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentSkipListSet; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* Base class for Fanout service implementations. | |
* | |
* Subclass implementations can be used as a Sparkleshare PubSub notification | |
* server. This allows Sparkleshare to be used in conjunction with Gitblit | |
* behind a corporate firewall that restricts or prohibits client internet access | |
* to the default Sparkleshare PubSub server: notifications.sparkleshare.org | |
* | |
* @author James Moger | |
* | |
*/ | |
public abstract class FanoutService implements Runnable { | |
private final static Logger logger = LoggerFactory.getLogger(FanoutService.class); | |
public final static int DEFAULT_PORT = 17000; | |
protected final static int serviceTimeout = 5000; | |
protected final String host; | |
protected final int port; | |
protected final String name; | |
private Thread serviceThread; | |
private final Map<String, FanoutServiceConnection> connections; | |
private final Map<String, Set<FanoutServiceConnection>> subscriptions; | |
protected final AtomicBoolean isRunning; | |
private final AtomicBoolean strictRequestTermination; | |
private final AtomicBoolean allowAllChannelAnnouncements; | |
private final AtomicInteger concurrentConnectionLimit; | |
private final Date bootDate; | |
private final AtomicLong rejectedConnectionCount; | |
private final AtomicInteger peakConnectionCount; | |
private final AtomicLong totalConnections; | |
private final AtomicLong totalAnnouncements; | |
private final AtomicLong totalMessages; | |
private final AtomicLong totalSubscribes; | |
private final AtomicLong totalUnsubscribes; | |
private final AtomicLong totalPings; | |
protected FanoutService(String host, int port, String name) { | |
this.host = host; | |
this.port = port; | |
this.name = name; | |
connections = new ConcurrentHashMap<String, FanoutServiceConnection>(); | |
subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>(); | |
subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>()); | |
isRunning = new AtomicBoolean(false); | |
strictRequestTermination = new AtomicBoolean(false); | |
allowAllChannelAnnouncements = new AtomicBoolean(false); | |
concurrentConnectionLimit = new AtomicInteger(0); | |
bootDate = new Date(); | |
rejectedConnectionCount = new AtomicLong(0); | |
peakConnectionCount = new AtomicInteger(0); | |
totalConnections = new AtomicLong(0); | |
totalAnnouncements = new AtomicLong(0); | |
totalMessages = new AtomicLong(0); | |
totalSubscribes = new AtomicLong(0); | |
totalUnsubscribes = new AtomicLong(0); | |
totalPings = new AtomicLong(0); | |
} | |
/* | |
* Abstract methods | |
*/ | |
protected abstract boolean isConnected(); | |
protected abstract boolean connect(); | |
protected abstract void listen() throws IOException; | |
protected abstract void disconnect(); | |
/** | |
* Returns true if the service requires \n request termination. | |
* | |
* @return true if request requires \n termination | |
*/ | |
public boolean isStrictRequestTermination() { | |
return strictRequestTermination.get(); | |
} | |
/** | |
* Control the termination of fanout requests. If true, fanout requests must | |
* be terminated with \n. If false, fanout requests may be terminated with | |
* \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client. | |
* | |
* @param isStrictTermination | |
*/ | |
public void setStrictRequestTermination(boolean isStrictTermination) { | |
strictRequestTermination.set(isStrictTermination); | |
} | |
/** | |
* Returns the maximum allowable concurrent fanout connections. | |
* | |
* @return the maximum allowable concurrent connection count | |
*/ | |
public int getConcurrentConnectionLimit() { | |
return concurrentConnectionLimit.get(); | |
} | |
/** | |
* Sets the maximum allowable concurrent fanout connection count. | |
* | |
* @param value | |
*/ | |
public void setConcurrentConnectionLimit(int value) { | |
concurrentConnectionLimit.set(value); | |
} | |
/** | |
* Returns true if connections are allowed to announce on the all channel. | |
* | |
* @return true if connections are allowed to announce on the all channel | |
*/ | |
public boolean allowAllChannelAnnouncements() { | |
return allowAllChannelAnnouncements.get(); | |
} | |
/** | |
* Allows/prohibits connections from announcing on the ALL channel. | |
* | |
* @param value | |
*/ | |
public void setAllowAllChannelAnnouncements(boolean value) { | |
allowAllChannelAnnouncements.set(value); | |
} | |
/** | |
* Returns the current connections | |
* | |
* @param channel | |
* @return map of current connections keyed by their id | |
*/ | |
public Map<String, FanoutServiceConnection> getCurrentConnections() { | |
return connections; | |
} | |
/** | |
* Returns all subscriptions | |
* | |
* @return map of current subscriptions keyed by channel name | |
*/ | |
public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() { | |
return subscriptions; | |
} | |
/** | |
* Returns the subscriptions for the specified channel | |
* | |
* @param channel | |
* @return set of subscribed connections for the specified channel | |
*/ | |
public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) { | |
return subscriptions.get(channel); | |
} | |
/** | |
* Returns the runtime statistics object for this service. | |
* | |
* @return stats | |
*/ | |
public FanoutStats getStatistics() { | |
FanoutStats stats = new FanoutStats(); | |
// settings | |
stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements(); | |
stats.concurrentConnectionLimit = getConcurrentConnectionLimit(); | |
stats.strictRequestTermination = isStrictRequestTermination(); | |
// runtime stats | |
stats.bootDate = bootDate; | |
stats.rejectedConnectionCount = rejectedConnectionCount.get(); | |
stats.peakConnectionCount = peakConnectionCount.get(); | |
stats.totalConnections = totalConnections.get(); | |
stats.totalAnnouncements = totalAnnouncements.get(); | |
stats.totalMessages = totalMessages.get(); | |
stats.totalSubscribes = totalSubscribes.get(); | |
stats.totalUnsubscribes = totalUnsubscribes.get(); | |
stats.totalPings = totalPings.get(); | |
stats.currentConnections = connections.size(); | |
stats.currentChannels = subscriptions.size(); | |
stats.currentSubscriptions = subscriptions.size() * connections.size(); | |
return stats; | |
} | |
/** | |
* Returns true if the service is ready. | |
* | |
* @return true, if the service is ready | |
*/ | |
public boolean isReady() { | |
if (isRunning.get()) { | |
return isConnected(); | |
} | |
return false; | |
} | |
/** | |
* Start the Fanout service thread and immediatel return. | |
* | |
*/ | |
public void start() { | |
if (isRunning.get()) { | |
logger.warn(MessageFormat.format("{0} is already running", name)); | |
return; | |
} | |
serviceThread = new Thread(this); | |
serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port)); | |
serviceThread.start(); | |
} | |
/** | |
* Start the Fanout service thread and wait until it is accepting connections. | |
* | |
*/ | |
public void startSynchronously() { | |
start(); | |
while (!isReady()) { | |
try { | |
Thread.sleep(100); | |
} catch (Exception e) { | |
} | |
} | |
} | |
/** | |
* Stop the Fanout service. This method returns when the service has been | |
* completely shutdown. | |
*/ | |
public void stop() { | |
if (!isRunning.get()) { | |
logger.warn(MessageFormat.format("{0} is not running", name)); | |
return; | |
} | |
logger.info(MessageFormat.format("stopping {0}...", name)); | |
isRunning.set(false); | |
try { | |
if (serviceThread != null) { | |
serviceThread.join(); | |
serviceThread = null; | |
} | |
} catch (InterruptedException e1) { | |
logger.error("", e1); | |
} | |
logger.info(MessageFormat.format("stopped {0}", name)); | |
} | |
/** | |
* Main execution method of the service | |
*/ | |
@Override | |
public final void run() { | |
disconnect(); | |
resetState(); | |
isRunning.set(true); | |
while (isRunning.get()) { | |
if (connect()) { | |
try { | |
listen(); | |
} catch (IOException e) { | |
logger.error(MessageFormat.format("error processing {0}", name), e); | |
isRunning.set(false); | |
} | |
} else { | |
try { | |
Thread.sleep(serviceTimeout); | |
} catch (InterruptedException x) { | |
} | |
} | |
} | |
disconnect(); | |
resetState(); | |
} | |
protected void resetState() { | |
// reset state data | |
connections.clear(); | |
subscriptions.clear(); | |
rejectedConnectionCount.set(0); | |
peakConnectionCount.set(0); | |
totalConnections.set(0); | |
totalAnnouncements.set(0); | |
totalMessages.set(0); | |
totalSubscribes.set(0); | |
totalUnsubscribes.set(0); | |
totalPings.set(0); | |
} | |
/** | |
* Configure the client connection socket. | |
* | |
* @param socket | |
* @throws SocketException | |
*/ | |
protected void configureClientSocket(Socket socket) throws SocketException { | |
socket.setKeepAlive(true); | |
socket.setSoLinger(true, 0); // immediately discard any remaining data | |
} | |
/** | |
* Add the connection to the connections map. | |
* | |
* @param connection | |
* @return false if the connection was rejected due to too many concurrent | |
* connections | |
*/ | |
protected boolean addConnection(FanoutServiceConnection connection) { | |
int limit = getConcurrentConnectionLimit(); | |
if (limit > 0 && connections.size() > limit) { | |
logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit)); | |
increment(rejectedConnectionCount); | |
connection.busy(); | |
return false; | |
} | |
// add the connection to our map | |
connections.put(connection.id, connection); | |
// track peak number of concurrent connections | |
if (connections.size() > peakConnectionCount.get()) { | |
peakConnectionCount.set(connections.size()); | |
} | |
logger.info("fanout new connection " + connection.id); | |
connection.connected(); | |
return true; | |
} | |
/** | |
* Remove the connection from the connections list and from subscriptions. | |
* | |
* @param connection | |
*/ | |
protected void removeConnection(FanoutServiceConnection connection) { | |
connections.remove(connection.id); | |
Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator(); | |
while (itr.hasNext()) { | |
Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next(); | |
Set<FanoutServiceConnection> subscriptions = entry.getValue(); | |
subscriptions.remove(connection); | |
if (!FanoutConstants.CH_ALL.equals(entry.getKey())) { | |
if (subscriptions.size() == 0) { | |
itr.remove(); | |
logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey())); | |
} | |
} | |
} | |
logger.info(MessageFormat.format("fanout connection {0} removed", connection.id)); | |
} | |
/** | |
* Tests to see if the connection is being monitored by the service. | |
* | |
* @param connection | |
* @return true if the service is monitoring the connection | |
*/ | |
protected boolean hasConnection(FanoutServiceConnection connection) { | |
return connections.containsKey(connection.id); | |
} | |
/** | |
* Reply to a connection on the specified channel. | |
* | |
* @param connection | |
* @param channel | |
* @param message | |
* @return the reply | |
*/ | |
protected String reply(FanoutServiceConnection connection, String channel, String message) { | |
if (channel != null && channel.length() > 0) { | |
increment(totalMessages); | |
} | |
return connection.reply(channel, message); | |
} | |
/** | |
* Service method to broadcast a message to all connections. | |
* | |
* @param message | |
*/ | |
public void broadcastAll(String message) { | |
broadcast(connections.values(), FanoutConstants.CH_ALL, message); | |
increment(totalAnnouncements); | |
} | |
/** | |
* Service method to broadcast a message to connections subscribed to the | |
* channel. | |
* | |
* @param message | |
*/ | |
public void broadcast(String channel, String message) { | |
List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel)); | |
broadcast(connections, channel, message); | |
increment(totalAnnouncements); | |
} | |
/** | |
* Broadcast a message to connections subscribed to the specified channel. | |
* | |
* @param connections | |
* @param channel | |
* @param message | |
*/ | |
protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) { | |
for (FanoutServiceConnection connection : connections) { | |
reply(connection, channel, message); | |
} | |
} | |
/** | |
* Process an incoming Fanout request. | |
* | |
* @param connection | |
* @param req | |
* @return the reply to the request, may be null | |
*/ | |
protected String processRequest(FanoutServiceConnection connection, String req) { | |
logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req)); | |
String[] fields = req.split(" ", 3); | |
String action = fields[0]; | |
String channel = fields.length >= 2 ? fields[1] : null; | |
String message = fields.length >= 3 ? fields[2] : null; | |
try { | |
return processRequest(connection, action, channel, message); | |
} catch (IllegalArgumentException e) { | |
// invalid action | |
logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action)); | |
logger.error(asHexArray(req)); | |
} | |
return null; | |
} | |
/** | |
* Process the Fanout request. | |
* | |
* @param connection | |
* @param action | |
* @param channel | |
* @param message | |
* @return the reply to the request, may be null | |
* @throws IllegalArgumentException | |
*/ | |
protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException { | |
if ("ping".equals(action)) { | |
// ping | |
increment(totalPings); | |
return reply(connection, null, "" + System.currentTimeMillis()); | |
} else if ("info".equals(action)) { | |
// info | |
String info = getStatistics().info(); | |
return reply(connection, null, info); | |
} else if ("announce".equals(action)) { | |
// announcement | |
if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) { | |
// prohibiting connection-sourced all announcements | |
logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message)); | |
} else if ("debug".equals(channel)) { | |
// prohibiting connection-sourced debug announcements | |
logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message)); | |
} else { | |
// acceptable announcement | |
List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel)); | |
connections.remove(connection); // remove announcer | |
broadcast(connections, channel, message); | |
increment(totalAnnouncements); | |
} | |
} else if ("subscribe".equals(action)) { | |
// subscribe | |
if (!subscriptions.containsKey(channel)) { | |
logger.info(MessageFormat.format("fanout new channel {0}", channel)); | |
subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>()); | |
} | |
subscriptions.get(channel).add(connection); | |
logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel)); | |
increment(totalSubscribes); | |
} else if ("unsubscribe".equals(action)) { | |
// unsubscribe | |
if (subscriptions.containsKey(channel)) { | |
subscriptions.get(channel).remove(connection); | |
if (subscriptions.get(channel).size() == 0) { | |
subscriptions.remove(channel); | |
} | |
increment(totalUnsubscribes); | |
} | |
} else { | |
// invalid action | |
throw new IllegalArgumentException(action); | |
} | |
return null; | |
} | |
private String asHexArray(String req) { | |
StringBuilder sb = new StringBuilder(); | |
for (char c : req.toCharArray()) { | |
sb.append(Integer.toHexString(c)).append(' '); | |
} | |
return "[ " + sb.toString().trim() + " ]"; | |
} | |
/** | |
* Increment a long and prevent negative rollover. | |
* | |
* @param counter | |
*/ | |
private void increment(AtomicLong counter) { | |
long v = counter.incrementAndGet(); | |
if (v < 0) { | |
counter.set(0); | |
} | |
} | |
@Override | |
public String toString() { | |
return name; | |
} | |
} |