Add 'unconfirmed writes' feature to SFTP RemoteFileOutputStream, allowing for major speedups

Thanks to @romainreuillon for the idea and initial implementation! #97
This commit is contained in:
Shikhar Bhushan
2013-02-23 18:15:10 -05:00
parent 9539ff6b7a
commit 0d52441f01
2 changed files with 53 additions and 19 deletions

View File

@@ -15,11 +15,14 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.sftp.Response.StatusCode; import net.schmizz.sshj.sftp.Response.StatusCode;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class RemoteFile public class RemoteFile
@@ -29,14 +32,6 @@ public class RemoteFile
super(requester, path, handle); super(requester, path, handle);
} }
public RemoteFileInputStream getInputStream() {
return new RemoteFileInputStream();
}
public RemoteFileOutputStream getOutputStream() {
return new RemoteFileOutputStream();
}
public FileAttributes fetchAttributes() public FileAttributes fetchAttributes()
throws IOException { throws IOException {
return requester.request(newRequest(PacketType.FSTAT)) return requester.request(newRequest(PacketType.FSTAT))
@@ -77,11 +72,21 @@ public class RemoteFile
public void write(long fileOffset, byte[] data, int off, int len) public void write(long fileOffset, byte[] data, int off, int len)
throws IOException { throws IOException {
requester.request(newRequest(PacketType.WRITE) checkResponse(asyncWrite(fileOffset, data, off, len));
.putUInt64(fileOffset) }
.putUInt32(len - off)
.putRawBytes(data, off, len) protected Promise<Response, SFTPException> asyncWrite(long fileOffset, byte[] data, int off, int len)
).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS).ensureStatusPacketIsOK(); throws IOException {
return requester.request(newRequest(PacketType.WRITE)
.putUInt64(fileOffset)
.putUInt32(len - off)
.putRawBytes(data, off, len)
);
}
private void checkResponse(Promise<Response, SFTPException> responsePromise)
throws SFTPException {
responsePromise.retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS).ensureStatusPacketIsOK();
} }
public void setAttributes(FileAttributes attrs) public void setAttributes(FileAttributes attrs)
@@ -103,17 +108,25 @@ public class RemoteFile
public class RemoteFileOutputStream public class RemoteFileOutputStream
extends OutputStream { extends OutputStream {
private final byte[] b = new byte[1]; private final byte[] b = new byte[1];
private final int maxUnconfirmedWrites;
private final Queue<Promise<Response, SFTPException>> unconfirmedWrites;
private long fileOffset; private long fileOffset;
public RemoteFileOutputStream() { public RemoteFileOutputStream() {
this(0); this(0);
} }
public RemoteFileOutputStream(long fileOffset) { public RemoteFileOutputStream(long startingOffset) {
this.fileOffset = fileOffset; this(startingOffset, 0);
}
public RemoteFileOutputStream(long startingOffset, int maxUnconfirmedWrites) {
this.fileOffset = startingOffset;
this.maxUnconfirmedWrites = maxUnconfirmedWrites;
this.unconfirmedWrites = new LinkedList<Promise<Response, SFTPException>>();
} }
@Override @Override
@@ -126,10 +139,27 @@ public class RemoteFile
@Override @Override
public void write(byte[] buf, int off, int len) public void write(byte[] buf, int off, int len)
throws IOException { throws IOException {
RemoteFile.this.write(fileOffset, buf, off, len); if (unconfirmedWrites.size() > maxUnconfirmedWrites) {
checkResponse(unconfirmedWrites.remove());
}
unconfirmedWrites.add(RemoteFile.this.asyncWrite(fileOffset, buf, off, len));
fileOffset += len; fileOffset += len;
} }
@Override
public void flush()
throws IOException {
while (!unconfirmedWrites.isEmpty()) {
checkResponse(unconfirmedWrites.remove());
}
}
@Override
public void close()
throws IOException {
flush();
}
} }
public class RemoteFileInputStream public class RemoteFileInputStream

View File

@@ -141,14 +141,16 @@ public class SFTPFileTransfer
final LocalDestFile adjusted = local.getTargetFile(remote.getName()); final LocalDestFile adjusted = local.getTargetFile(remote.getName());
final RemoteFile rf = engine.open(remote.getPath()); final RemoteFile rf = engine.open(remote.getPath());
try { try {
final RemoteFile.RemoteFileInputStream rfis = rf.new RemoteFileInputStream();
final OutputStream os = adjusted.getOutputStream(); final OutputStream os = adjusted.getOutputStream();
try { try {
new StreamCopier(rf.getInputStream(), os) new StreamCopier(rfis, os)
.bufSize(engine.getSubsystem().getLocalMaxPacketSize()) .bufSize(engine.getSubsystem().getLocalMaxPacketSize())
.keepFlushing(false) .keepFlushing(false)
.listener(listener) .listener(listener)
.copy(); .copy();
} finally { } finally {
rfis.close();
os.close(); os.close();
} }
} finally { } finally {
@@ -206,14 +208,16 @@ public class SFTPFileTransfer
OpenMode.TRUNC)); OpenMode.TRUNC));
try { try {
final InputStream fis = local.getInputStream(); final InputStream fis = local.getInputStream();
final RemoteFile.RemoteFileOutputStream rfos = rf.new RemoteFileOutputStream(0, 16);
try { try {
new StreamCopier(fis, rf.getOutputStream()) new StreamCopier(fis, rfos)
.bufSize(engine.getSubsystem().getRemoteMaxPacketSize() - rf.getOutgoingPacketOverhead()) .bufSize(engine.getSubsystem().getRemoteMaxPacketSize() - rf.getOutgoingPacketOverhead())
.keepFlushing(false) .keepFlushing(false)
.listener(listener) .listener(listener)
.copy(); .copy();
} finally { } finally {
fis.close(); fis.close();
rfos.close();
} }
} finally { } finally {
rf.close(); rf.close();