Update NailGun.
Summary:
Update the version of NailGun that buckd uses to
https://github.com/martylamb/nailgun/commits/fcc24ecd63ead63f160e96f6a0640f6b4280f72d,
which uses a FixedThreadPool instead of CachedThreadPool to reduce the number of threads
that Buck uses and notifies blocked threads on client disconnection so that threads aren't leaked.
Test Plan:
0) buck test --all
1) buckd
2) buck build buck
3) buck build buck
4) check that buckd is using <70 threads
diff --git a/lib/README.txt b/lib/README.txt
index 53963da..38f1962 100644
--- a/lib/README.txt
+++ b/lib/README.txt
@@ -9,15 +9,14 @@
nailgun-server-0.9.2-SNAPSHOT.jar and nailgun-server-0.9.2-SNAPSHOT-sources.jar were
-built from a fork of Nailgun which adds support for detecting client disconnection at
-https://github.com/jimpurbrick/nailgun.git
+built from https://github.com/martylamb/nailgun.git
To regenerate these jars:
0) install maven (brew install maven)
- 1) git clone https://github.com/jimpurbrick/nailgun.git
+ 1) git clone https://github.com/martylamb/nailgun
2) cd nailgun
- 3) git checkout d005c16f13d42489ac1ab428b15c3d9cdb1ada31
+ 3) git checkout fcc24ecd63ead63f160e96f6a0640f6b4280f72d
4) mvn clean install
5) copy nailgun-server/target/nailgun-server-0.9.2-SNAPSHOT.jar and
nailgun-server/target/nailgun-server-0.9.2-SNAPSHOT-sources.jar to your buck/lib directory
diff --git a/lib/nailgun-server-0.9.2-SNAPSHOT-sources.jar b/lib/nailgun-server-0.9.2-SNAPSHOT-sources.jar
index a595667..4513f48 100644
--- a/lib/nailgun-server-0.9.2-SNAPSHOT-sources.jar
+++ b/lib/nailgun-server-0.9.2-SNAPSHOT-sources.jar
Binary files differ
diff --git a/lib/nailgun-server-0.9.2-SNAPSHOT.jar b/lib/nailgun-server-0.9.2-SNAPSHOT.jar
index 8d45dda..308dd9f 100644
--- a/lib/nailgun-server-0.9.2-SNAPSHOT.jar
+++ b/lib/nailgun-server-0.9.2-SNAPSHOT.jar
Binary files differ
diff --git a/third-party/nailgun/README.md b/third-party/nailgun/README.md
index 213d4f4..df471ba 100644
--- a/third-party/nailgun/README.md
+++ b/third-party/nailgun/README.md
@@ -16,6 +16,5 @@
For more information, see [the nailgun website](http://martiansoftware.com/nailgun/).
-Buck currently uses a fork of nailgun hosted at https://github.com/jimpurbrick/nailgun.
-This fork adds support for interrupting server processing when client disconnection is
-detected, changes which are currently being accepted in to the main nailgun repository.
\ No newline at end of file
+Buck currently uses https://github.com/martylamb/nailgun at
+fcc24ecd63ead63f160e96f6a0640f6b4280f72d
diff --git a/third-party/nailgun/nailgun-client/ng.c b/third-party/nailgun/nailgun-client/ng.c
index bd7a390..059b05f 100644
--- a/third-party/nailgun/nailgun-client/ng.c
+++ b/third-party/nailgun/nailgun-client/ng.c
@@ -27,7 +27,7 @@
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
- #include <sys/time.h>
+ #include <sys/time.h>
#include <sys/types.h>
#endif
@@ -96,6 +96,8 @@
#define CHUNKTYPE_SENDINPUT 'S'
#define CHUNKTYPE_HEARTBEAT 'H'
+#define HEARTBEAT_TIMEOUT_MILLIS 500
+
/*
the following is required to compile for hp-ux
originally posted at http://jira.codehaus.org/browse/JRUBY-2346
@@ -111,7 +113,12 @@
char buf[BUFSIZE];
/* track whether server is ready to receive */
-int readyToSend = 0;
+#ifdef WIN32
+ HANDLE readyToSend = 0;
+ HANDLE sending = 0;
+#else
+ int readyToSend = 0;
+#endif
/**
* Clean up the application.
@@ -192,12 +199,13 @@
}
/**
- * Sends a chunk header noting the specified payload size and chunk type.
+ * Sends a chunk noting the specified payload size and chunk type.
+ * Waits for sending mutex on Win32.
*
* @param size the payload size
* @param chunkType the chunk type identifier
*/
-void sendHeader(unsigned int size, char chunkType) {
+void sendChunk(unsigned int size, char chunkType, char* buf) {
/* buffer used for reading and writing chunk headers */
char header[CHUNK_HEADER_LEN];
@@ -207,7 +215,20 @@
header[3] = size & 0xff;
header[4] = chunkType;
+#ifdef WIN32
+ if (WaitForSingleObject(sending, INFINITE) != WAIT_OBJECT_0) {
+ handleError();
+ }
+#endif
+
sendAll(nailgunsocket, header, CHUNK_HEADER_LEN);
+ if (size > 0) {
+ sendAll(nailgunsocket, buf, size);
+ }
+
+#ifdef WIN32
+ ReleaseMutex(sending);
+#endif
}
/**
@@ -228,15 +249,14 @@
i = read(f, buf, BUFSIZE);
while (i > 0) {
- sendHeader(i, CHUNKTYPE_LONGARG);
- sendAll(nailgunsocket, buf, i);
+ sendChunk(i, CHUNKTYPE_LONGARG, buf);
i = read(f, buf, BUFSIZE);
}
if (i < 0) {
perror("--nailgun-filearg");
return 1;
}
- sendHeader(0, CHUNKTYPE_LONGARG);
+ sendChunk(0, CHUNKTYPE_LONGARG, buf);
close(f);
return 0;
@@ -250,8 +270,7 @@
*/
void sendText(char chunkType, char *text) {
int len = text ? strlen(text) : 0;
- sendHeader(len, chunkType);
- sendAll(nailgunsocket, text, len);
+ sendChunk(len, chunkType, text);
}
/**
@@ -278,9 +297,9 @@
int thisPass = 0;
thisPass = recv(nailgunsocket, buf, bytesToRead, MSG_WAITALL);
- if (thisPass < bytesToRead) handleSocketClose();
-
-
+ if (thisPass == 0) {
+ handleSocketClose();
+ }
bytesRead += thisPass;
bytesCopied = 0;
@@ -304,6 +323,17 @@
}
}
+unsigned long recvToBuffer(unsigned long len) {
+ unsigned long bytesRead = 0;
+ while(bytesRead < len) {
+ int thisPass = recv(nailgunsocket, buf + bytesRead, len - bytesRead, MSG_WAITALL);
+ if (thisPass == 0) {
+ handleSocketClose();
+ }
+ bytesRead += thisPass;
+ }
+ return bytesRead;
+}
/**
* Processes an exit chunk from the server. This is just a string
@@ -315,7 +345,7 @@
void processExit(char *buf, unsigned long len) {
int exitcode;
int bytesToRead = (BUFSIZE - 1 < len) ? BUFSIZE - 1 : len;
- int bytesRead = recv(nailgunsocket, buf, bytesToRead, MSG_WAITALL);
+ int bytesRead = recvToBuffer(bytesToRead);
if (bytesRead < 0) {
handleSocketClose();
@@ -336,26 +366,44 @@
* @param len the number of bytes to send
*/
void sendStdin(char *buf, unsigned int len) {
+#ifndef WIN32
readyToSend = 0;
- sendHeader(len, CHUNKTYPE_STDIN);
- sendAll(nailgunsocket, buf, len);
+#endif
+ sendChunk(len, CHUNKTYPE_STDIN, buf);
}
/**
* Sends a stdin-eof chunk to the nailgun server
*/
void processEof() {
- sendHeader(0, CHUNKTYPE_STDIN_EOF);
+ sendChunk(0, CHUNKTYPE_STDIN_EOF, buf);
}
/**
* Sends a heartbeat chunk to let the server know the client is still alive.
*/
void sendHeartbeat() {
- sendHeader(0, CHUNKTYPE_HEARTBEAT);
+ sendChunk(0, CHUNKTYPE_HEARTBEAT, buf);
}
#ifdef WIN32
+
+HANDLE createEvent(BOOL manualReset) {
+ return CreateEvent(NULL, /* default security */
+ manualReset,
+ FALSE, /* initial state unsignalled */
+ NULL /* unnamed event */);
+}
+
+DWORD WINAPI sendHeartbeats(LPVOID lpParameter) {
+
+ /* this could be made more efficient by only sending heartbeats when stdin chunks aren't being sent */
+ for (;;) {
+ Sleep(HEARTBEAT_TIMEOUT_MILLIS);
+ sendHeartbeat();
+ }
+}
+
/**
* Thread main for reading from stdin and sending
*/
@@ -363,23 +411,32 @@
/* buffer used for reading and sending stdin chunks */
char wbuf[BUFSIZE];
- for (;;) {
- DWORD numberOfBytes = 0;
+ /* number of bytes read */
+ DWORD numberOfBytes;
- if (readyToSend && !ReadFile(NG_STDIN_FILENO, wbuf, BUFSIZE, &numberOfBytes, NULL)) {
- if (numberOfBytes != 0) {
- handleError();
- }
+ for (;;) {
+
+ /* wait for ready to send */
+ if(WaitForSingleObject(readyToSend, INFINITE) != WAIT_OBJECT_0) {
+ handleError();
}
+ /* read data from stdin */
+ if (! ReadFile(NG_STDIN_FILENO, wbuf, BUFSIZE, &numberOfBytes, NULL)) {
+ if (numberOfBytes != 0) {
+ handleError();
+ }
+ }
+
+ /* send data to server */
if (numberOfBytes > 0) {
sendStdin(wbuf, numberOfBytes);
} else {
processEof();
break;
}
- }
+ }
return 0;
}
#else
@@ -408,6 +465,10 @@
WSADATA win_socket_data; /* required to initialise winsock */
WSAStartup(2, &win_socket_data);
+
+ /* create flow control event and mutex */
+ readyToSend = createEvent(FALSE);
+ sending = CreateMutex(NULL, FALSE, NULL);
}
#endif
@@ -440,6 +501,10 @@
if (!CreateThread(&securityAttributes, 0, &processStdin, NULL, 0, &threadId)) {
handleError();
}
+
+ if (!CreateThread(&securityAttributes, 0, &sendHeartbeats, NULL, 0, &threadId)) {
+ handleError();
+ }
}
#endif
@@ -449,16 +514,11 @@
void processnailgunstream() {
/*for (;;) {*/
- int bytesRead = 0;
unsigned long len;
char chunkType;
- bytesRead = recv(nailgunsocket, buf, CHUNK_HEADER_LEN, MSG_WAITALL);
+ recvToBuffer(CHUNK_HEADER_LEN);
- if (bytesRead < CHUNK_HEADER_LEN) {
- handleSocketClose();
- }
-
len = ((buf[0] << 24) & 0xff000000)
| ((buf[1] << 16) & 0x00ff0000)
| ((buf[2] << 8) & 0x0000ff00)
@@ -474,7 +534,11 @@
case CHUNKTYPE_EXIT: processExit(buf, len);
break;
case CHUNKTYPE_SENDINPUT:
- readyToSend = 1;
+#ifdef WIN32
+ SetEvent(readyToSend);
+#else
+ readyToSend = 1;
+#endif
break;
default: fprintf(stderr, "Unexpected chunk type %d ('%c')\n", chunkType, chunkType);
cleanUpAndExit(NAILGUN_UNEXPECTED_CHUNKTYPE);
@@ -556,7 +620,6 @@
fd_set readfds;
int eof = 0;
struct timeval readtimeout;
-
#endif
#ifdef WIN32
@@ -694,6 +757,7 @@
/* initialise the std-* handles and the thread to send stdin to the server */
#ifdef WIN32
initIo();
+ winStartInput();
#endif
/* stream forwarding loop */
@@ -709,22 +773,22 @@
FD_SET(nailgunsocket, &readfds);
memset(&readtimeout, '\0', sizeof(readtimeout));
- readtimeout.tv_usec = 100000;
+ readtimeout.tv_usec = HEARTBEAT_TIMEOUT_MILLIS * 1000;
if(select (nailgunsocket + 1, &readfds, NULL, NULL, &readtimeout) == -1) {
perror("select");
}
if (FD_ISSET(nailgunsocket, &readfds)) {
#endif
- processnailgunstream();
+ processnailgunstream();
#ifndef WIN32
} else if (FD_ISSET(NG_STDIN_FILENO, &readfds)) {
- if (!processStdin()) {
- FD_CLR(NG_STDIN_FILENO, &readfds);
- eof = 1;
- }
+ if (!processStdin()) {
+ FD_CLR(NG_STDIN_FILENO, &readfds);
+ eof = 1;
+ }
} else {
- sendHeartbeat();
+ sendHeartbeat();
}
#endif
}