Implement read ahead to speed up transfer rates for downloads by a magnitude.

This commit is contained in:
David Kocher
2014-05-09 13:43:52 +02:00
parent 93f1543af8
commit f354fd6661

View File

@@ -15,9 +15,6 @@
*/
package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.sftp.Response.StatusCode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -25,6 +22,10 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.common.Buffer;
import net.schmizz.sshj.sftp.Response.StatusCode;
public class RemoteFile
extends RemoteResource {
@@ -52,10 +53,17 @@ public class RemoteFile
public int read(long fileOffset, byte[] to, int offset, int len)
throws IOException {
final Response res = requester.request(
newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len)
).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
switch (res.getType()) {
final Response res = this.asyncRead(fileOffset, len).retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
return this.checkReadResponse(res, to, offset);
}
protected Promise<Response, SFTPException> asyncRead(long fileOffset, int len)
throws IOException {
return requester.request(newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len));
}
protected int checkReadResponse(Response res, byte[] to, int offset) throws Buffer.BufferException, SFTPException {
switch(res.getType()) {
case DATA:
int recvLen = res.readUInt32AsInt();
System.arraycopy(res.array(), res.rpos(), to, offset, recvLen);
@@ -72,19 +80,19 @@ public class RemoteFile
public void write(long fileOffset, byte[] data, int off, int len)
throws IOException {
checkResponse(asyncWrite(fileOffset, data, off, len));
checkWriteResponse(asyncWrite(fileOffset, data, off, len));
}
protected Promise<Response, SFTPException> asyncWrite(long fileOffset, byte[] data, int off, int len)
throws IOException {
return requester.request(newRequest(PacketType.WRITE)
.putUInt64(fileOffset)
.putUInt32(len - off)
.putRawBytes(data, off, len)
.putUInt64(fileOffset)
.putUInt32(len - off)
.putRawBytes(data, off, len)
);
}
private void checkResponse(Promise<Response, SFTPException> responsePromise)
private void checkWriteResponse(Promise<Response, SFTPException> responsePromise)
throws SFTPException {
responsePromise.retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS).ensureStatusPacketIsOK();
}
@@ -139,8 +147,8 @@ public class RemoteFile
@Override
public void write(byte[] buf, int off, int len)
throws IOException {
if (unconfirmedWrites.size() > maxUnconfirmedWrites) {
checkResponse(unconfirmedWrites.remove());
if(unconfirmedWrites.size() > maxUnconfirmedWrites) {
checkWriteResponse(unconfirmedWrites.remove());
}
unconfirmedWrites.add(RemoteFile.this.asyncWrite(fileOffset, buf, off, len));
fileOffset += len;
@@ -149,8 +157,8 @@ public class RemoteFile
@Override
public void flush()
throws IOException {
while (!unconfirmedWrites.isEmpty()) {
checkResponse(unconfirmedWrites.remove());
while(!unconfirmedWrites.isEmpty()) {
checkWriteResponse(unconfirmedWrites.remove());
}
}
@@ -167,6 +175,11 @@ public class RemoteFile
private final byte[] b = new byte[1];
private final int maxUnconfirmedReads;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads;
private boolean eof;
private long fileOffset;
private long markPos;
private long readLimit;
@@ -176,7 +189,13 @@ public class RemoteFile
}
public RemoteFileInputStream(long fileOffset) {
this(fileOffset, 0);
}
public RemoteFileInputStream(long fileOffset, int maxUnconfirmedReads) {
this.fileOffset = fileOffset;
this.maxUnconfirmedReads = maxUnconfirmedReads;
this.unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
}
@Override
@@ -187,7 +206,7 @@ public class RemoteFile
@Override
public void mark(int readLimit) {
this.readLimit = readLimit;
markPos = fileOffset;
this.markPos = fileOffset;
}
@Override
@@ -211,14 +230,45 @@ public class RemoteFile
@Override
public int read(byte[] into, int off, int len)
throws IOException {
int read = RemoteFile.this.read(fileOffset, into, off, len);
if (read != -1) {
fileOffset += read;
if (markPos != 0 && read > readLimit) // Invalidate mark position
markPos = 0;
while(!eof && unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism
unconfirmedReads.add(asyncRead(fileOffset, len));
fileOffset += len;
}
return read;
if(unconfirmedReads.isEmpty()) {
// Attempted to read while status was already received
return -1;
}
// Retrieve first in
final Response res = unconfirmedReads.remove().retrieve(
requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
final int recvLen = checkReadResponse(res, into, off);
if(markPos != 0 && recvLen > readLimit) // Invalidate mark position
{
markPos = 0;
}
if(-1 == recvLen) {
eof = true;
}
return recvLen;
}
@Override
public void close() throws IOException {
while(!unconfirmedReads.isEmpty()) {
final Response res = unconfirmedReads.remove().retrieve(
requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
switch(res.getType()) {
case STATUS:
res.ensureStatusIs(StatusCode.EOF);
break;
case DATA:
log.warn("Pending data packet from read response discarded");
continue;
default:
throw new SFTPException("Unexpected packet: " + res.getType());
}
}
}
}
}