blob: a676abcd33e6ba04cf3b37fce8612bd96ba3ee07 [file] [log] [blame]
/*
* Copyright 2013 gitblit.com.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gitblit.fanout;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Fanout client class.
*
* @author James Moger
*
*/
public class FanoutClient implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);
private final int clientTimeout = 500;
private final int reconnectTimeout = 2000;
private final String host;
private final int port;
private final List<FanoutListener> listeners;
private String id;
private volatile Selector selector;
private volatile SocketChannel socketCh;
private Thread clientThread;
private final AtomicBoolean isConnected;
private final AtomicBoolean isRunning;
private final AtomicBoolean isAutomaticReconnect;
private final ByteBuffer writeBuffer;
private final ByteBuffer readBuffer;
private final CharsetDecoder decoder;
private final Set<String> subscriptions;
private boolean resubscribe;
public interface FanoutListener {
public void pong(Date timestamp);
public void announcement(String channel, String message);
}
public static class FanoutAdapter implements FanoutListener {
@Override
public void pong(Date timestamp) { }
@Override
public void announcement(String channel, String message) { }
}
public static void main(String args[]) throws Exception {
FanoutClient client = new FanoutClient("localhost", 2000);
client.addListener(new FanoutAdapter() {
@Override
public void pong(Date timestamp) {
System.out.println("Pong. " + timestamp);
}
@Override
public void announcement(String channel, String message) {
System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
}
});
client.start();
Thread.sleep(5000);
client.ping();
client.subscribe("james");
client.announce("james", "12345");
client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
while (true) {
Thread.sleep(10000);
client.ping();
}
}
public FanoutClient(String host, int port) {
this.host = host;
this.port = port;
readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());
subscriptions = new LinkedHashSet<String>();
isRunning = new AtomicBoolean(false);
isConnected = new AtomicBoolean(false);
isAutomaticReconnect = new AtomicBoolean(true);
}
public void addListener(FanoutListener listener) {
listeners.add(listener);
}
public void removeListener(FanoutListener listener) {
listeners.remove(listener);
}
public boolean isAutomaticReconnect() {
return isAutomaticReconnect.get();
}
public void setAutomaticReconnect(boolean value) {
isAutomaticReconnect.set(value);
}
public void ping() {
confirmConnection();
write("ping");
}
public void status() {
confirmConnection();
write("status");
}
public void subscribe(String channel) {
confirmConnection();
if (subscriptions.add(channel)) {
write("subscribe " + channel);
}
}
public void unsubscribe(String channel) {
confirmConnection();
if (subscriptions.remove(channel)) {
write("unsubscribe " + channel);
}
}
public void announce(String channel, String message) {
confirmConnection();
write("announce " + channel + " " + message);
}
private void confirmConnection() {
if (!isConnected()) {
throw new RuntimeException("Fanout client is disconnected!");
}
}
public boolean isConnected() {
return isRunning.get() && socketCh != null && isConnected.get();
}
/**
* Start client connection and return immediately.
*/
public void start() {
if (isRunning.get()) {
logger.warn("Fanout client is already running");
return;
}
clientThread = new Thread(this, "Fanout client");
clientThread.start();
}
/**
* Start client connection and wait until it has connected.
*/
public void startSynchronously() {
start();
while (!isConnected()) {
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
}
/**
* Stops client connection. This method returns when the connection has
* been completely shutdown.
*/
public void stop() {
if (!isRunning.get()) {
logger.warn("Fanout client is not running");
return;
}
isRunning.set(false);
try {
if (clientThread != null) {
clientThread.join();
clientThread = null;
}
} catch (InterruptedException e1) {
}
}
@Override
public void run() {
resetState();
isRunning.set(true);
while (isRunning.get()) {
// (re)connect
if (socketCh == null) {
try {
InetAddress addr = InetAddress.getByName(host);
socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
socketCh.configureBlocking(false);
selector = Selector.open();
id = FanoutConstants.getLocalSocketId(socketCh.socket());
socketCh.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
try {
Thread.sleep(reconnectTimeout);
} catch (InterruptedException x) {
}
continue;
}
}
// read/write
try {
selector.select(clientTimeout);
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
i.remove();
if (key.isReadable()) {
// read message
String content = read();
String[] lines = content.split("\n");
for (String reply : lines) {
logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));
if (!processReply(reply)) {
logger.error(MessageFormat.format("fanout client {0} received unknown message", id));
}
}
} else if (key.isWritable()) {
// resubscribe
if (resubscribe) {
resubscribe = false;
logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
for (String subscription : subscriptions) {
write("subscribe " + subscription);
}
}
socketCh.register(selector, SelectionKey.OP_READ);
}
}
} catch (IOException e) {
logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
closeChannel();
if (!isAutomaticReconnect.get()) {
isRunning.set(false);
continue;
}
}
}
closeChannel();
resetState();
}
protected void resetState() {
readBuffer.clear();
writeBuffer.clear();
isRunning.set(false);
isConnected.set(false);
}
private void closeChannel() {
try {
if (socketCh != null) {
socketCh.close();
socketCh = null;
selector.close();
selector = null;
isConnected.set(false);
}
} catch (IOException x) {
}
}
protected boolean processReply(String reply) {
String[] fields = reply.split("!", 2);
if (fields.length == 1) {
try {
long time = Long.parseLong(fields[0]);
Date date = new Date(time);
firePong(date);
} catch (Exception e) {
}
return true;
} else if (fields.length == 2) {
String channel = fields[0];
String message = fields[1];
if (FanoutConstants.CH_DEBUG.equals(channel)) {
// debug messages are for internal use
if (FanoutConstants.MSG_CONNECTED.equals(message)) {
isConnected.set(true);
resubscribe = subscriptions.size() > 0;
if (resubscribe) {
try {
// register for async resubscribe
socketCh.register(selector, SelectionKey.OP_WRITE);
} catch (Exception e) {
logger.error("an error occurred", e);
}
}
}
logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));
} else {
fireAnnouncement(channel, message);
}
return true;
} else {
// unknown message
return false;
}
}
protected void firePong(Date timestamp) {
logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));
for (FanoutListener listener : listeners) {
try {
listener.pong(timestamp);
} catch (Throwable t) {
logger.error("FanoutListener threw an exception!", t);
}
}
}
protected void fireAnnouncement(String channel, String message) {
logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));
for (FanoutListener listener : listeners) {
try {
listener.announcement(channel, message);
} catch (Throwable t) {
logger.error("FanoutListener threw an exception!", t);
}
}
}
protected synchronized String read() throws IOException {
readBuffer.clear();
long len = socketCh.read(readBuffer);
if (len == -1) {
logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));
socketCh.close();
return null;
} else {
readBuffer.flip();
String content = decoder.decode(readBuffer).toString();
readBuffer.clear();
return content;
}
}
protected synchronized boolean write(String message) {
try {
logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
byte [] bytes = message.getBytes(FanoutConstants.CHARSET);
writeBuffer.clear();
writeBuffer.put(bytes);
if (bytes[bytes.length - 1] != 0xa) {
writeBuffer.put((byte) 0xa);
}
writeBuffer.flip();
// loop until write buffer has been completely sent
long written = 0;
long toWrite = writeBuffer.remaining();
while (written != toWrite) {
written += socketCh.write(writeBuffer);
try {
Thread.sleep(10);
} catch (Exception x) {
}
}
return true;
} catch (IOException e) {
logger.error("fanout client {0} error: {1}", id, e.getMessage());
}
return false;
}
}