From 4806b1d6c75abe05fab19e271db7b8c335078dfc Mon Sep 17 00:00:00 2001 From: shikhar Date: Wed, 25 Jun 2014 11:47:01 +0530 Subject: [PATCH] Read-ahead input stream moved to its own class, as it will not play nice with mark/reset/skip. Use it by defautl for SFTPFileTransfer. Closes #76 - no longer a significant difference in scp & sftp transfer speed --- .../net/schmizz/sshj/sftp/RemoteFile.java | 92 ++++++++++--------- .../schmizz/sshj/sftp/SFTPFileTransfer.java | 2 +- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java index 73191737..5d192d62 100644 --- a/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java +++ b/src/main/java/net/schmizz/sshj/sftp/RemoteFile.java @@ -53,8 +53,8 @@ public class RemoteFile public int read(long fileOffset, byte[] to, int offset, int len) throws IOException { - final Response res = this.asyncRead(fileOffset, len).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); - return this.checkReadResponse(res, to, offset); + final Response res = asyncRead(fileOffset, len).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); + return checkReadResponse(res, to, offset); } protected Promise asyncRead(long fileOffset, int len) @@ -176,11 +176,6 @@ public class RemoteFile private final byte[] b = new byte[1]; - private final int maxUnconfirmedReads; - private final Queue> unconfirmedReads; - - private boolean eof; - private long fileOffset; private long markPos; private long readLimit; @@ -190,13 +185,7 @@ public class RemoteFile } public RemoteFileInputStream(long fileOffset) { - this(fileOffset, 0); - } - - public RemoteFileInputStream(long fileOffset, int maxUnconfirmedReads) { this.fileOffset = fileOffset; - this.maxUnconfirmedReads = maxUnconfirmedReads; - this.unconfirmedReads = new LinkedList>(); } @Override @@ -207,7 +196,7 @@ public class RemoteFile @Override public void mark(int readLimit) { this.readLimit = readLimit; - this.markPos = fileOffset; + markPos = fileOffset; } @Override @@ -228,6 +217,49 @@ public class RemoteFile return read(b, 0, 1) == -1 ? -1 : b[0] & 0xff; } + @Override + public int read(byte[] into, int off, int len) + throws IOException { + int read = RemoteFile.this.read(fileOffset, into, off, len); + if (read != -1) { + fileOffset += read; + if (markPos != 0 && read > readLimit) // Invalidate mark position + markPos = 0; + } + return read; + } + + } + + public class ReadAheadRemoteFileInputStream + extends InputStream { + + private final byte[] b = new byte[1]; + + private final int maxUnconfirmedReads; + private final Queue> unconfirmedReads; + + private long fileOffset; + private boolean eof; + + public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) { + this.maxUnconfirmedReads = maxUnconfirmedReads; + this.unconfirmedReads = new LinkedList>(); + this.fileOffset = 0; + } + + @Override + public long skip(long n) + throws IOException { + throw new IOException("skip is not supported by ReadAheadFileInputStream, use RemoteFileInputStream instead"); + } + + @Override + public int read() + throws IOException { + return read(b, 0, 1) == -1 ? -1 : b[0] & 0xff; + } + @Override public int read(byte[] into, int off, int len) throws IOException { @@ -237,40 +269,18 @@ public class RemoteFile fileOffset += len; } if (unconfirmedReads.isEmpty()) { - // Attempted to read while status was already received + assert eof; return -1; } // Retrieve first in - final Response res = unconfirmedReads.remove().retrieve( - requester.getTimeoutMs(), TimeUnit.MILLISECONDS); + final Response res = unconfirmedReads.remove().retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS); final int recvLen = checkReadResponse(res, into, off); - if (markPos != 0 && recvLen > readLimit) // Invalidate mark position - { - markPos = 0; - } - if (-1 == recvLen) { + if (recvLen == -1) { eof = true; } return recvLen; } - @Override - public void close() - throws IOException { - while (!unconfirmedReads.isEmpty()) { - final Response res = unconfirmedReads.remove().retrieve( - requester.getTimeoutMs(), TimeUnit.MILLISECONDS); - switch (res.getType()) { - case STATUS: - res.ensureStatusIs(StatusCode.EOF); - break; - case DATA: - log.warn("Pending data packet from read response discarded"); - continue; - default: - throw new SFTPException("Unexpected packet: " + res.getType()); - } - } - } } -} + +} \ No newline at end of file diff --git a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java index 19e1c331..49188af9 100644 --- a/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java +++ b/src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java @@ -141,7 +141,7 @@ public class SFTPFileTransfer final LocalDestFile adjusted = local.getTargetFile(remote.getName()); final RemoteFile rf = engine.open(remote.getPath()); try { - final RemoteFile.RemoteFileInputStream rfis = rf.new RemoteFileInputStream(); + final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16); final OutputStream os = adjusted.getOutputStream(); try { new StreamCopier(rfis, os)