| /* | |
| * Copyright 2013 gitblit.com. | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| package com.gitblit.fanout; | |
| import java.io.IOException; | |
| import java.net.Socket; | |
| import java.net.SocketException; | |
| import java.text.MessageFormat; | |
| import java.util.ArrayList; | |
| import java.util.Collection; | |
| import java.util.Date; | |
| import java.util.Iterator; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.Set; | |
| import java.util.concurrent.ConcurrentHashMap; | |
| import java.util.concurrent.ConcurrentSkipListSet; | |
| import java.util.concurrent.atomic.AtomicBoolean; | |
| import java.util.concurrent.atomic.AtomicInteger; | |
| import java.util.concurrent.atomic.AtomicLong; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| /** | |
| * Base class for Fanout service implementations. | |
| * | |
| * Subclass implementations can be used as a Sparkleshare PubSub notification | |
| * server. This allows Sparkleshare to be used in conjunction with Gitblit | |
| * behind a corporate firewall that restricts or prohibits client internet access | |
| * to the default Sparkleshare PubSub server: notifications.sparkleshare.org | |
| * | |
| * @author James Moger | |
| * | |
| */ | |
| public abstract class FanoutService implements Runnable { | |
| private final static Logger logger = LoggerFactory.getLogger(FanoutService.class); | |
| public final static int DEFAULT_PORT = 17000; | |
| protected final static int serviceTimeout = 5000; | |
| protected final String host; | |
| protected final int port; | |
| protected final String name; | |
| private Thread serviceThread; | |
| private final Map<String, FanoutServiceConnection> connections; | |
| private final Map<String, Set<FanoutServiceConnection>> subscriptions; | |
| protected final AtomicBoolean isRunning; | |
| private final AtomicBoolean strictRequestTermination; | |
| private final AtomicBoolean allowAllChannelAnnouncements; | |
| private final AtomicInteger concurrentConnectionLimit; | |
| private final Date bootDate; | |
| private final AtomicLong rejectedConnectionCount; | |
| private final AtomicInteger peakConnectionCount; | |
| private final AtomicLong totalConnections; | |
| private final AtomicLong totalAnnouncements; | |
| private final AtomicLong totalMessages; | |
| private final AtomicLong totalSubscribes; | |
| private final AtomicLong totalUnsubscribes; | |
| private final AtomicLong totalPings; | |
| protected FanoutService(String host, int port, String name) { | |
| this.host = host; | |
| this.port = port; | |
| this.name = name; | |
| connections = new ConcurrentHashMap<String, FanoutServiceConnection>(); | |
| subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>(); | |
| subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>()); | |
| isRunning = new AtomicBoolean(false); | |
| strictRequestTermination = new AtomicBoolean(false); | |
| allowAllChannelAnnouncements = new AtomicBoolean(false); | |
| concurrentConnectionLimit = new AtomicInteger(0); | |
| bootDate = new Date(); | |
| rejectedConnectionCount = new AtomicLong(0); | |
| peakConnectionCount = new AtomicInteger(0); | |
| totalConnections = new AtomicLong(0); | |
| totalAnnouncements = new AtomicLong(0); | |
| totalMessages = new AtomicLong(0); | |
| totalSubscribes = new AtomicLong(0); | |
| totalUnsubscribes = new AtomicLong(0); | |
| totalPings = new AtomicLong(0); | |
| } | |
| /* | |
| * Abstract methods | |
| */ | |
| protected abstract boolean isConnected(); | |
| protected abstract boolean connect(); | |
| protected abstract void listen() throws IOException; | |
| protected abstract void disconnect(); | |
| /** | |
| * Returns true if the service requires \n request termination. | |
| * | |
| * @return true if request requires \n termination | |
| */ | |
| public boolean isStrictRequestTermination() { | |
| return strictRequestTermination.get(); | |
| } | |
| /** | |
| * Control the termination of fanout requests. If true, fanout requests must | |
| * be terminated with \n. If false, fanout requests may be terminated with | |
| * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client. | |
| * | |
| * @param isStrictTermination | |
| */ | |
| public void setStrictRequestTermination(boolean isStrictTermination) { | |
| strictRequestTermination.set(isStrictTermination); | |
| } | |
| /** | |
| * Returns the maximum allowable concurrent fanout connections. | |
| * | |
| * @return the maximum allowable concurrent connection count | |
| */ | |
| public int getConcurrentConnectionLimit() { | |
| return concurrentConnectionLimit.get(); | |
| } | |
| /** | |
| * Sets the maximum allowable concurrent fanout connection count. | |
| * | |
| * @param value | |
| */ | |
| public void setConcurrentConnectionLimit(int value) { | |
| concurrentConnectionLimit.set(value); | |
| } | |
| /** | |
| * Returns true if connections are allowed to announce on the all channel. | |
| * | |
| * @return true if connections are allowed to announce on the all channel | |
| */ | |
| public boolean allowAllChannelAnnouncements() { | |
| return allowAllChannelAnnouncements.get(); | |
| } | |
| /** | |
| * Allows/prohibits connections from announcing on the ALL channel. | |
| * | |
| * @param value | |
| */ | |
| public void setAllowAllChannelAnnouncements(boolean value) { | |
| allowAllChannelAnnouncements.set(value); | |
| } | |
| /** | |
| * Returns the current connections | |
| * | |
| * @param channel | |
| * @return map of current connections keyed by their id | |
| */ | |
| public Map<String, FanoutServiceConnection> getCurrentConnections() { | |
| return connections; | |
| } | |
| /** | |
| * Returns all subscriptions | |
| * | |
| * @return map of current subscriptions keyed by channel name | |
| */ | |
| public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() { | |
| return subscriptions; | |
| } | |
| /** | |
| * Returns the subscriptions for the specified channel | |
| * | |
| * @param channel | |
| * @return set of subscribed connections for the specified channel | |
| */ | |
| public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) { | |
| return subscriptions.get(channel); | |
| } | |
| /** | |
| * Returns the runtime statistics object for this service. | |
| * | |
| * @return stats | |
| */ | |
| public FanoutStats getStatistics() { | |
| FanoutStats stats = new FanoutStats(); | |
| // settings | |
| stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements(); | |
| stats.concurrentConnectionLimit = getConcurrentConnectionLimit(); | |
| stats.strictRequestTermination = isStrictRequestTermination(); | |
| // runtime stats | |
| stats.bootDate = bootDate; | |
| stats.rejectedConnectionCount = rejectedConnectionCount.get(); | |
| stats.peakConnectionCount = peakConnectionCount.get(); | |
| stats.totalConnections = totalConnections.get(); | |
| stats.totalAnnouncements = totalAnnouncements.get(); | |
| stats.totalMessages = totalMessages.get(); | |
| stats.totalSubscribes = totalSubscribes.get(); | |
| stats.totalUnsubscribes = totalUnsubscribes.get(); | |
| stats.totalPings = totalPings.get(); | |
| stats.currentConnections = connections.size(); | |
| stats.currentChannels = subscriptions.size(); | |
| stats.currentSubscriptions = subscriptions.size() * connections.size(); | |
| return stats; | |
| } | |
| /** | |
| * Returns true if the service is ready. | |
| * | |
| * @return true, if the service is ready | |
| */ | |
| public boolean isReady() { | |
| if (isRunning.get()) { | |
| return isConnected(); | |
| } | |
| return false; | |
| } | |
| /** | |
| * Start the Fanout service thread and immediatel return. | |
| * | |
| */ | |
| public void start() { | |
| if (isRunning.get()) { | |
| logger.warn(MessageFormat.format("{0} is already running", name)); | |
| return; | |
| } | |
| serviceThread = new Thread(this); | |
| serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port)); | |
| serviceThread.start(); | |
| } | |
| /** | |
| * Start the Fanout service thread and wait until it is accepting connections. | |
| * | |
| */ | |
| public void startSynchronously() { | |
| start(); | |
| while (!isReady()) { | |
| try { | |
| Thread.sleep(100); | |
| } catch (Exception e) { | |
| } | |
| } | |
| } | |
| /** | |
| * Stop the Fanout service. This method returns when the service has been | |
| * completely shutdown. | |
| */ | |
| public void stop() { | |
| if (!isRunning.get()) { | |
| logger.warn(MessageFormat.format("{0} is not running", name)); | |
| return; | |
| } | |
| logger.info(MessageFormat.format("stopping {0}...", name)); | |
| isRunning.set(false); | |
| try { | |
| if (serviceThread != null) { | |
| serviceThread.join(); | |
| serviceThread = null; | |
| } | |
| } catch (InterruptedException e1) { | |
| logger.error("", e1); | |
| } | |
| logger.info(MessageFormat.format("stopped {0}", name)); | |
| } | |
| /** | |
| * Main execution method of the service | |
| */ | |
| @Override | |
| public final void run() { | |
| disconnect(); | |
| resetState(); | |
| isRunning.set(true); | |
| while (isRunning.get()) { | |
| if (connect()) { | |
| try { | |
| listen(); | |
| } catch (IOException e) { | |
| logger.error(MessageFormat.format("error processing {0}", name), e); | |
| isRunning.set(false); | |
| } | |
| } else { | |
| try { | |
| Thread.sleep(serviceTimeout); | |
| } catch (InterruptedException x) { | |
| } | |
| } | |
| } | |
| disconnect(); | |
| resetState(); | |
| } | |
| protected void resetState() { | |
| // reset state data | |
| connections.clear(); | |
| subscriptions.clear(); | |
| rejectedConnectionCount.set(0); | |
| peakConnectionCount.set(0); | |
| totalConnections.set(0); | |
| totalAnnouncements.set(0); | |
| totalMessages.set(0); | |
| totalSubscribes.set(0); | |
| totalUnsubscribes.set(0); | |
| totalPings.set(0); | |
| } | |
| /** | |
| * Configure the client connection socket. | |
| * | |
| * @param socket | |
| * @throws SocketException | |
| */ | |
| protected void configureClientSocket(Socket socket) throws SocketException { | |
| socket.setKeepAlive(true); | |
| socket.setSoLinger(true, 0); // immediately discard any remaining data | |
| } | |
| /** | |
| * Add the connection to the connections map. | |
| * | |
| * @param connection | |
| * @return false if the connection was rejected due to too many concurrent | |
| * connections | |
| */ | |
| protected boolean addConnection(FanoutServiceConnection connection) { | |
| int limit = getConcurrentConnectionLimit(); | |
| if (limit > 0 && connections.size() > limit) { | |
| logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit)); | |
| increment(rejectedConnectionCount); | |
| connection.busy(); | |
| return false; | |
| } | |
| // add the connection to our map | |
| connections.put(connection.id, connection); | |
| // track peak number of concurrent connections | |
| if (connections.size() > peakConnectionCount.get()) { | |
| peakConnectionCount.set(connections.size()); | |
| } | |
| logger.info("fanout new connection " + connection.id); | |
| connection.connected(); | |
| return true; | |
| } | |
| /** | |
| * Remove the connection from the connections list and from subscriptions. | |
| * | |
| * @param connection | |
| */ | |
| protected void removeConnection(FanoutServiceConnection connection) { | |
| connections.remove(connection.id); | |
| Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator(); | |
| while (itr.hasNext()) { | |
| Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next(); | |
| Set<FanoutServiceConnection> subscriptions = entry.getValue(); | |
| subscriptions.remove(connection); | |
| if (!FanoutConstants.CH_ALL.equals(entry.getKey())) { | |
| if (subscriptions.size() == 0) { | |
| itr.remove(); | |
| logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey())); | |
| } | |
| } | |
| } | |
| logger.info(MessageFormat.format("fanout connection {0} removed", connection.id)); | |
| } | |
| /** | |
| * Tests to see if the connection is being monitored by the service. | |
| * | |
| * @param connection | |
| * @return true if the service is monitoring the connection | |
| */ | |
| protected boolean hasConnection(FanoutServiceConnection connection) { | |
| return connections.containsKey(connection.id); | |
| } | |
| /** | |
| * Reply to a connection on the specified channel. | |
| * | |
| * @param connection | |
| * @param channel | |
| * @param message | |
| * @return the reply | |
| */ | |
| protected String reply(FanoutServiceConnection connection, String channel, String message) { | |
| if (channel != null && channel.length() > 0) { | |
| increment(totalMessages); | |
| } | |
| return connection.reply(channel, message); | |
| } | |
| /** | |
| * Service method to broadcast a message to all connections. | |
| * | |
| * @param message | |
| */ | |
| public void broadcastAll(String message) { | |
| broadcast(connections.values(), FanoutConstants.CH_ALL, message); | |
| increment(totalAnnouncements); | |
| } | |
| /** | |
| * Service method to broadcast a message to connections subscribed to the | |
| * channel. | |
| * | |
| * @param message | |
| */ | |
| public void broadcast(String channel, String message) { | |
| List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel)); | |
| broadcast(connections, channel, message); | |
| increment(totalAnnouncements); | |
| } | |
| /** | |
| * Broadcast a message to connections subscribed to the specified channel. | |
| * | |
| * @param connections | |
| * @param channel | |
| * @param message | |
| */ | |
| protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) { | |
| for (FanoutServiceConnection connection : connections) { | |
| reply(connection, channel, message); | |
| } | |
| } | |
| /** | |
| * Process an incoming Fanout request. | |
| * | |
| * @param connection | |
| * @param req | |
| * @return the reply to the request, may be null | |
| */ | |
| protected String processRequest(FanoutServiceConnection connection, String req) { | |
| logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req)); | |
| String[] fields = req.split(" ", 3); | |
| String action = fields[0]; | |
| String channel = fields.length >= 2 ? fields[1] : null; | |
| String message = fields.length >= 3 ? fields[2] : null; | |
| try { | |
| return processRequest(connection, action, channel, message); | |
| } catch (IllegalArgumentException e) { | |
| // invalid action | |
| logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action)); | |
| logger.error(asHexArray(req)); | |
| } | |
| return null; | |
| } | |
| /** | |
| * Process the Fanout request. | |
| * | |
| * @param connection | |
| * @param action | |
| * @param channel | |
| * @param message | |
| * @return the reply to the request, may be null | |
| * @throws IllegalArgumentException | |
| */ | |
| protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException { | |
| if ("ping".equals(action)) { | |
| // ping | |
| increment(totalPings); | |
| return reply(connection, null, "" + System.currentTimeMillis()); | |
| } else if ("info".equals(action)) { | |
| // info | |
| String info = getStatistics().info(); | |
| return reply(connection, null, info); | |
| } else if ("announce".equals(action)) { | |
| // announcement | |
| if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) { | |
| // prohibiting connection-sourced all announcements | |
| logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message)); | |
| } else if ("debug".equals(channel)) { | |
| // prohibiting connection-sourced debug announcements | |
| logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message)); | |
| } else { | |
| // acceptable announcement | |
| List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel)); | |
| connections.remove(connection); // remove announcer | |
| broadcast(connections, channel, message); | |
| increment(totalAnnouncements); | |
| } | |
| } else if ("subscribe".equals(action)) { | |
| // subscribe | |
| if (!subscriptions.containsKey(channel)) { | |
| logger.info(MessageFormat.format("fanout new channel {0}", channel)); | |
| subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>()); | |
| } | |
| subscriptions.get(channel).add(connection); | |
| logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel)); | |
| increment(totalSubscribes); | |
| } else if ("unsubscribe".equals(action)) { | |
| // unsubscribe | |
| if (subscriptions.containsKey(channel)) { | |
| subscriptions.get(channel).remove(connection); | |
| if (subscriptions.get(channel).size() == 0) { | |
| subscriptions.remove(channel); | |
| } | |
| increment(totalUnsubscribes); | |
| } | |
| } else { | |
| // invalid action | |
| throw new IllegalArgumentException(action); | |
| } | |
| return null; | |
| } | |
| private String asHexArray(String req) { | |
| StringBuilder sb = new StringBuilder(); | |
| for (char c : req.toCharArray()) { | |
| sb.append(Integer.toHexString(c)).append(' '); | |
| } | |
| return "[ " + sb.toString().trim() + " ]"; | |
| } | |
| /** | |
| * Increment a long and prevent negative rollover. | |
| * | |
| * @param counter | |
| */ | |
| private void increment(AtomicLong counter) { | |
| long v = counter.incrementAndGet(); | |
| if (v < 0) { | |
| counter.set(0); | |
| } | |
| } | |
| @Override | |
| public String toString() { | |
| return name; | |
| } | |
| } |