Make the FileLfsRepository thread safe

The FileLfsRepository.out member could have been accessed from multiple
threads which would corrupt the content.

Don't store the AtomicObjectOutputStream in the FileLfsRepository.out but
move it to the ObjectUploadListener which is instantiated per-request.

Add a parallel upload test.

Change-Id: I62298630e99c46b500d376843ffcde934436215b
Signed-off-by: Saša Živkov <sasa.zivkov@sap.com>
Signed-off-by: Matthias Sohn <matthias.sohn@sap.com>
diff --git a/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java b/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java
index 35bf09b..1fb91bd 100644
--- a/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java
+++ b/org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java
@@ -51,6 +51,13 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.eclipse.jgit.lfs.lib.AnyLongObjectId;
 import org.eclipse.jgit.lfs.lib.LongObjectId;
@@ -98,4 +105,36 @@ public void testLargeFileUpload() throws Exception {
 		assertEquals("expected object length " + Files.size(f), Files.size(f),
 				repository.getSize(id));
 	}
+
+	@Test
+	public void testParallelUploads() throws Exception {
+		int count = 10;
+		List<Path> paths = new ArrayList<>(count);
+
+		for (int i = 0; i < count; i++) {
+			Path f = Paths.get(getTempDirectory().toString(),
+					"largeRandomFile_" + i);
+			createPseudoRandomContentFile(f, 1 * MiB);
+			paths.add(f);
+		}
+
+		final CyclicBarrier barrier = new CyclicBarrier(count);
+
+		ExecutorService e = Executors.newFixedThreadPool(count);
+		try {
+			for (final Path p : paths) {
+				e.submit(new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						barrier.await();
+						putContent(p);
+						return null;
+					}
+				});
+			}
+		} finally {
+			e.shutdown();
+			e.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+		}
+	}
 }
diff --git a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java
index 12da271..2e71c04 100644
--- a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java
+++ b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java
@@ -45,10 +45,8 @@
 import static org.eclipse.jgit.util.HttpSupport.HDR_AUTHORIZATION;
 
 import java.io.IOException;
-import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -70,7 +68,6 @@ public class FileLfsRepository implements LargeFileRepository {
 
 	private final String url;
 	private final Path dir;
-	private AtomicObjectOutputStream out;
 
 	/**
 	 * @param url
@@ -147,21 +144,11 @@ ReadableByteChannel getReadChannel(AnyLongObjectId id)
 		return FileChannel.open(getPath(id), StandardOpenOption.READ);
 	}
 
-	WritableByteChannel getWriteChannel(AnyLongObjectId id)
+	AtomicObjectOutputStream getOutputStream(AnyLongObjectId id)
 			throws IOException {
 		Path path = getPath(id);
 		Files.createDirectories(path.getParent());
-		out = new AtomicObjectOutputStream(path, id);
-		return Channels.newChannel(out);
-	}
-
-	/**
-	 * Abort the output stream
-	 */
-	void abortWrite() {
-		if (out != null) {
-			out.abort();
-		}
+		return new AtomicObjectOutputStream(path, id);
 	}
 
 	private static char[] toHexCharArray(int b) {
diff --git a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java
index 05970ad..e524ac6 100644
--- a/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java
+++ b/org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java
@@ -74,13 +74,13 @@ class ObjectUploadListener implements ReadListener {
 
 	private final HttpServletResponse response;
 
-	private FileLfsRepository repository;
-
 	private final ServletInputStream in;
 
 	private final ReadableByteChannel inChannel;
 
-	private WritableByteChannel out;
+	private final AtomicObjectOutputStream out;
+
+	private WritableByteChannel channel;
 
 	private final ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
 
@@ -98,12 +98,12 @@ public ObjectUploadListener(FileLfsRepository repository,
 			AsyncContext context, HttpServletRequest request,
 			HttpServletResponse response, AnyLongObjectId id)
 					throws FileNotFoundException, IOException {
-		this.repository = repository;
 		this.context = context;
 		this.response = response;
 		this.in = request.getInputStream();
 		this.inChannel = Channels.newChannel(in);
-		this.out = repository.getWriteChannel(id);
+		this.out = repository.getOutputStream(id);
+		this.channel = Channels.newChannel(out);
 		response.setContentType(Constants.CONTENT_TYPE_GIT_LFS_JSON);
 	}
 
@@ -117,12 +117,12 @@ public void onDataAvailable() throws IOException {
 		while (in.isReady()) {
 			if (inChannel.read(buffer) > 0) {
 				buffer.flip();
-				out.write(buffer);
+				channel.write(buffer);
 				buffer.compact();
 			} else {
 				buffer.flip();
 				while (buffer.hasRemaining()) {
-					out.write(buffer);
+					channel.write(buffer);
 				}
 				close();
 				return;
@@ -141,7 +141,7 @@ public void onAllDataRead() throws IOException {
 	protected void close() throws IOException {
 		try {
 			inChannel.close();
-			out.close();
+			channel.close();
 			// TODO check if status 200 is ok for PUT request, HTTP foresees 204
 			// for successful PUT without response body
 			response.setStatus(HttpServletResponse.SC_OK);
@@ -157,9 +157,9 @@ protected void close() throws IOException {
 	@Override
 	public void onError(Throwable e) {
 		try {
-			repository.abortWrite();
+			out.abort();
 			inChannel.close();
-			out.close();
+			channel.close();
 			int status;
 			if (e instanceof CorruptLongObjectException) {
 				status = HttpStatus.SC_BAD_REQUEST;