| /* | |
| * Copyright 2013 gitblit.com. | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| package com.gitblit.fanout; | |
| import java.io.BufferedInputStream; | |
| import java.io.IOException; | |
| import java.io.OutputStream; | |
| import java.net.InetSocketAddress; | |
| import java.net.ServerSocket; | |
| import java.net.Socket; | |
| import java.net.SocketException; | |
| import java.net.SocketTimeoutException; | |
| import java.text.MessageFormat; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| /** | |
| * A multi-threaded socket implementation of https://github.com/travisghansen/fanout | |
| * | |
| * This implementation creates a master acceptor thread which accepts incoming | |
| * fanout connections and then spawns a daemon thread for each accepted connection. | |
| * If there are 100 concurrent fanout connections, there are 101 threads. | |
| * | |
| * @author James Moger | |
| * | |
| */ | |
| public class FanoutSocketService extends FanoutService { | |
| private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class); | |
| private volatile ServerSocket serviceSocket; | |
| public static void main(String[] args) throws Exception { | |
| FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT); | |
| pubsub.setStrictRequestTermination(false); | |
| pubsub.setAllowAllChannelAnnouncements(false); | |
| pubsub.start(); | |
| } | |
| /** | |
| * Create a multi-threaded fanout service. | |
| * | |
| * @param port | |
| * the port for running the fanout PubSub service | |
| * @throws IOException | |
| */ | |
| public FanoutSocketService(int port) { | |
| this(null, port); | |
| } | |
| /** | |
| * Create a multi-threaded fanout service. | |
| * | |
| * @param bindInterface | |
| * the ip address to bind for the service, may be null | |
| * @param port | |
| * the port for running the fanout PubSub service | |
| * @throws IOException | |
| */ | |
| public FanoutSocketService(String bindInterface, int port) { | |
| super(bindInterface, port, "Fanout socket service"); | |
| } | |
| @Override | |
| protected boolean isConnected() { | |
| return serviceSocket != null; | |
| } | |
| @Override | |
| protected boolean connect() { | |
| if (serviceSocket == null) { | |
| try { | |
| serviceSocket = new ServerSocket(); | |
| serviceSocket.setReuseAddress(true); | |
| serviceSocket.setSoTimeout(serviceTimeout); | |
| serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); | |
| logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", | |
| name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort())); | |
| } catch (IOException e) { | |
| logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", | |
| name, host == null ? "0.0.0.0" : host, port), e); | |
| return false; | |
| } | |
| } | |
| return true; | |
| } | |
| @Override | |
| protected void disconnect() { | |
| try { | |
| if (serviceSocket != null) { | |
| logger.debug(MessageFormat.format("closing {0} server socket", name)); | |
| serviceSocket.close(); | |
| serviceSocket = null; | |
| } | |
| } catch (IOException e) { | |
| logger.error(MessageFormat.format("failed to disconnect {0}", name), e); | |
| } | |
| } | |
| /** | |
| * This accepts incoming fanout connections and spawns connection threads. | |
| */ | |
| @Override | |
| protected void listen() throws IOException { | |
| try { | |
| Socket socket; | |
| socket = serviceSocket.accept(); | |
| configureClientSocket(socket); | |
| FanoutSocketConnection connection = new FanoutSocketConnection(socket); | |
| if (addConnection(connection)) { | |
| // spawn connection daemon thread | |
| Thread connectionThread = new Thread(connection); | |
| connectionThread.setDaemon(true); | |
| connectionThread.setName("Fanout " + connection.id); | |
| connectionThread.start(); | |
| } else { | |
| // synchronously close the connection and remove it | |
| removeConnection(connection); | |
| connection.closeConnection(); | |
| connection = null; | |
| } | |
| } catch (SocketTimeoutException e) { | |
| // ignore accept timeout exceptions | |
| } | |
| } | |
| /** | |
| * FanoutSocketConnection handles reading/writing messages from a remote fanout | |
| * connection. | |
| * | |
| * @author James Moger | |
| * | |
| */ | |
| class FanoutSocketConnection extends FanoutServiceConnection implements Runnable { | |
| Socket socket; | |
| FanoutSocketConnection(Socket socket) { | |
| super(socket); | |
| this.socket = socket; | |
| } | |
| /** | |
| * Connection thread read/write method. | |
| */ | |
| @Override | |
| public void run() { | |
| try { | |
| StringBuilder sb = new StringBuilder(); | |
| BufferedInputStream is = new BufferedInputStream(socket.getInputStream()); | |
| byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH]; | |
| int len = 0; | |
| while (true) { | |
| while (is.available() > 0) { | |
| len = is.read(buffer); | |
| for (int i = 0; i < len; i++) { | |
| byte b = buffer[i]; | |
| if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) { | |
| String req = sb.toString(); | |
| sb.setLength(0); | |
| if (req.length() > 0) { | |
| // ignore empty request strings | |
| processRequest(this, req); | |
| } | |
| } else { | |
| sb.append((char) b); | |
| } | |
| } | |
| } | |
| if (!isRunning.get()) { | |
| // service has stopped, terminate client connection | |
| break; | |
| } else { | |
| Thread.sleep(500); | |
| } | |
| } | |
| } catch (Throwable t) { | |
| if (t instanceof SocketException) { | |
| logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage())); | |
| } else if (t instanceof SocketTimeoutException) { | |
| logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage())); | |
| } else { | |
| logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t); | |
| } | |
| } finally { | |
| closeConnection(); | |
| } | |
| logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id)); | |
| } | |
| @Override | |
| protected void reply(String content) throws IOException { | |
| // synchronously send reply | |
| logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content)); | |
| OutputStream os = socket.getOutputStream(); | |
| byte [] bytes = content.getBytes(FanoutConstants.CHARSET); | |
| os.write(bytes); | |
| if (bytes[bytes.length - 1] != 0xa) { | |
| os.write(0xa); | |
| } | |
| os.flush(); | |
| } | |
| protected void closeConnection() { | |
| // close the connection socket | |
| try { | |
| socket.close(); | |
| } catch (IOException e) { | |
| } | |
| socket = null; | |
| // remove this connection from the service | |
| removeConnection(this); | |
| } | |
| } | |
| } |