Read-ahead input stream moved to its own class, as it will not play nice with mark/reset/skip. Use it by defautl for SFTPFileTransfer.

Closes #76 - no longer a significant difference in scp & sftp transfer speed
This commit is contained in:
shikhar
2014-06-25 11:47:01 +05:30
parent ecc1d06dc2
commit 4806b1d6c7
2 changed files with 52 additions and 42 deletions

View File

@@ -53,8 +53,8 @@ public class RemoteFile
public int read(long fileOffset, byte[] to, int offset, int len)
throws IOException {
final Response res = this.asyncRead(fileOffset, len).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
return this.checkReadResponse(res, to, offset);
final Response res = asyncRead(fileOffset, len).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
return checkReadResponse(res, to, offset);
}
protected Promise<Response, SFTPException> asyncRead(long fileOffset, int len)
@@ -176,11 +176,6 @@ public class RemoteFile
private final byte[] b = new byte[1];
private final int maxUnconfirmedReads;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads;
private boolean eof;
private long fileOffset;
private long markPos;
private long readLimit;
@@ -190,13 +185,7 @@ public class RemoteFile
}
public RemoteFileInputStream(long fileOffset) {
this(fileOffset, 0);
}
public RemoteFileInputStream(long fileOffset, int maxUnconfirmedReads) {
this.fileOffset = fileOffset;
this.maxUnconfirmedReads = maxUnconfirmedReads;
this.unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
}
@Override
@@ -207,7 +196,7 @@ public class RemoteFile
@Override
public void mark(int readLimit) {
this.readLimit = readLimit;
this.markPos = fileOffset;
markPos = fileOffset;
}
@Override
@@ -228,6 +217,49 @@ public class RemoteFile
return read(b, 0, 1) == -1 ? -1 : b[0] & 0xff;
}
@Override
public int read(byte[] into, int off, int len)
throws IOException {
int read = RemoteFile.this.read(fileOffset, into, off, len);
if (read != -1) {
fileOffset += read;
if (markPos != 0 && read > readLimit) // Invalidate mark position
markPos = 0;
}
return read;
}
}
public class ReadAheadRemoteFileInputStream
extends InputStream {
private final byte[] b = new byte[1];
private final int maxUnconfirmedReads;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads;
private long fileOffset;
private boolean eof;
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
this.maxUnconfirmedReads = maxUnconfirmedReads;
this.unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
this.fileOffset = 0;
}
@Override
public long skip(long n)
throws IOException {
throw new IOException("skip is not supported by ReadAheadFileInputStream, use RemoteFileInputStream instead");
}
@Override
public int read()
throws IOException {
return read(b, 0, 1) == -1 ? -1 : b[0] & 0xff;
}
@Override
public int read(byte[] into, int off, int len)
throws IOException {
@@ -237,40 +269,18 @@ public class RemoteFile
fileOffset += len;
}
if (unconfirmedReads.isEmpty()) {
// Attempted to read while status was already received
assert eof;
return -1;
}
// Retrieve first in
final Response res = unconfirmedReads.remove().retrieve(
requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
final Response res = unconfirmedReads.remove().retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
final int recvLen = checkReadResponse(res, into, off);
if (markPos != 0 && recvLen > readLimit) // Invalidate mark position
{
markPos = 0;
}
if (-1 == recvLen) {
if (recvLen == -1) {
eof = true;
}
return recvLen;
}
@Override
public void close()
throws IOException {
while (!unconfirmedReads.isEmpty()) {
final Response res = unconfirmedReads.remove().retrieve(
requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
switch (res.getType()) {
case STATUS:
res.ensureStatusIs(StatusCode.EOF);
break;
case DATA:
log.warn("Pending data packet from read response discarded");
continue;
default:
throw new SFTPException("Unexpected packet: " + res.getType());
}
}
}
}
}
}

View File

@@ -141,7 +141,7 @@ public class SFTPFileTransfer
final LocalDestFile adjusted = local.getTargetFile(remote.getName());
final RemoteFile rf = engine.open(remote.getPath());
try {
final RemoteFile.RemoteFileInputStream rfis = rf.new RemoteFileInputStream();
final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16);
final OutputStream os = adjusted.getOutputStream();
try {
new StreamCopier(rfis, os)