From 0d52441f01fd0f0cde22686d3cd0ec8ececb7019 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Sat, 23 Feb 2013 18:15:10 -0500 Subject: [PATCH] Add 'unconfirmed writes' feature to SFTP RemoteFileOutputStream, allowing for major speedups Thanks to @romainreuillon for the idea and initial implementation! #97 --- .../net/schmizz/sshj/sftp/RemoteFile.java | 64 ++++++++++++++----- .../schmizz/sshj/sftp/SFTPFileTransfer.java | 8 ++- 2 files changed, 53 insertions(+), 19 deletions(-) diff --git a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java index d7582bb0..2efe656a 100644 --- a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java +++ b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java @@ -15,11 +15,14 @@ */ package net.schmizz.sshj.sftp; +import net.schmizz.concurrent.Promise; import net.schmizz.sshj.sftp.Response.StatusCode; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.TimeUnit; public class RemoteFile @@ -29,14 +32,6 @@ public class RemoteFile super(requester, path, handle); } - public RemoteFileInputStream getInputStream() { - return new RemoteFileInputStream(); - } - - public RemoteFileOutputStream getOutputStream() { - return new RemoteFileOutputStream(); - } - public FileAttributes fetchAttributes() throws IOException { return requester.request(newRequest(PacketType.FSTAT)) @@ -77,11 +72,21 @@ public class RemoteFile public void write(long fileOffset, byte[] data, int off, int len) throws IOException { - requester.request(newRequest(PacketType.WRITE) - .putUInt64(fileOffset) - .putUInt32(len - off) - .putRawBytes(data, off, len) - ).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS).ensureStatusPacketIsOK(); + checkResponse(asyncWrite(fileOffset, data, off, len)); + } + + protected Promise asyncWrite(long fileOffset, byte[] data, int off, int len) + throws IOException { + return requester.request(newRequest(PacketType.WRITE) + .putUInt64(fileOffset) + .putUInt32(len - off) + .putRawBytes(data, off, len) + ); + } + + private void checkResponse(Promise responsePromise) + throws SFTPException { + responsePromise.retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS).ensureStatusPacketIsOK(); } public void setAttributes(FileAttributes attrs) @@ -103,17 +108,25 @@ public class RemoteFile public class RemoteFileOutputStream extends OutputStream { - private final byte[] b = new byte[1]; + private final int maxUnconfirmedWrites; + private final Queue> unconfirmedWrites; + private long fileOffset; public RemoteFileOutputStream() { this(0); } - public RemoteFileOutputStream(long fileOffset) { - this.fileOffset = fileOffset; + public RemoteFileOutputStream(long startingOffset) { + this(startingOffset, 0); + } + + public RemoteFileOutputStream(long startingOffset, int maxUnconfirmedWrites) { + this.fileOffset = startingOffset; + this.maxUnconfirmedWrites = maxUnconfirmedWrites; + this.unconfirmedWrites = new LinkedList>(); } @Override @@ -126,10 +139,27 @@ public class RemoteFile @Override public void write(byte[] buf, int off, int len) throws IOException { - RemoteFile.this.write(fileOffset, buf, off, len); + if (unconfirmedWrites.size() > maxUnconfirmedWrites) { + checkResponse(unconfirmedWrites.remove()); + } + unconfirmedWrites.add(RemoteFile.this.asyncWrite(fileOffset, buf, off, len)); fileOffset += len; } + @Override + public void flush() + throws IOException { + while (!unconfirmedWrites.isEmpty()) { + checkResponse(unconfirmedWrites.remove()); + } + } + + @Override + public void close() + throws IOException { + flush(); + } + } public class RemoteFileInputStream diff --git a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java index fd734285..19e1c331 100644 --- a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java +++ b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java @@ -141,14 +141,16 @@ public class SFTPFileTransfer final LocalDestFile adjusted = local.getTargetFile(remote.getName()); final RemoteFile rf = engine.open(remote.getPath()); try { + final RemoteFile.RemoteFileInputStream rfis = rf.new RemoteFileInputStream(); final OutputStream os = adjusted.getOutputStream(); try { - new StreamCopier(rf.getInputStream(), os) + new StreamCopier(rfis, os) .bufSize(engine.getSubsystem().getLocalMaxPacketSize()) .keepFlushing(false) .listener(listener) .copy(); } finally { + rfis.close(); os.close(); } } finally { @@ -206,14 +208,16 @@ public class SFTPFileTransfer OpenMode.TRUNC)); try { final InputStream fis = local.getInputStream(); + final RemoteFile.RemoteFileOutputStream rfos = rf.new RemoteFileOutputStream(0, 16); try { - new StreamCopier(fis, rf.getOutputStream()) + new StreamCopier(fis, rfos) .bufSize(engine.getSubsystem().getRemoteMaxPacketSize() - rf.getOutgoingPacketOverhead()) .keepFlushing(false) .listener(listener) .copy(); } finally { fis.close(); + rfos.close(); } } finally { rf.close();