fix for issue 183 (sftp.RemoteFile.ReadAheadRemoteFileInputStream)

This commit is contained in:
Björn Karge
2015-06-03 14:36:23 +08:00
parent a18d623f44
commit 8e74330b0b

View File

@@ -19,6 +19,7 @@ import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.common.Buffer;
import net.schmizz.sshj.sftp.Response.StatusCode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -88,8 +89,7 @@ public class RemoteFile
throws IOException {
return requester.request(newRequest(PacketType.WRITE)
.putUInt64(fileOffset)
// TODO The SFTP spec claims this field is unneeded...? See #187
.putUInt32(len)
.putUInt32(len - off)
.putRawBytes(data, off, len)
);
}
@@ -238,27 +238,56 @@ public class RemoteFile
private final byte[] b = new byte[1];
private final int maxUnconfirmedReads;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
private final Queue<Long> unconfirmedReadOffsets = new LinkedList<Long>();
private long fileOffset;
private long requestOffset;
private long responseOffset;
private boolean eof;
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
assert 0 <= maxUnconfirmedReads;
this.maxUnconfirmedReads = maxUnconfirmedReads;
this.unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
this.fileOffset = 0;
}
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) {
assert 0 <= maxUnconfirmedReads;
assert 0 <= fileOffset;
this.maxUnconfirmedReads = maxUnconfirmedReads;
this.unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
this.fileOffset = fileOffset;
this.requestOffset = this.responseOffset = fileOffset;
}
@Override
public long skip(long n)
throws IOException {
throw new IOException("skip is not supported by ReadAheadFileInputStream, use RemoteFileInputStream instead");
private ByteArrayInputStream pending = new ByteArrayInputStream( new byte[0] );
private boolean retrieveUnconfirmedRead(boolean blocking) throws IOException {
if (unconfirmedReads.size() <= 0)
return false;
if (!blocking && !unconfirmedReads.peek().isDelivered())
return false;
unconfirmedReadOffsets.remove();
final Response res = unconfirmedReads.remove().retrieve( requester.getTimeoutMs(), TimeUnit.MILLISECONDS );
switch (res.getType()) {
case DATA:
int recvLen = res.readUInt32AsInt();
responseOffset += recvLen;
pending = new ByteArrayInputStream( res.array(), res.rpos(), recvLen );
break;
case STATUS:
res.ensureStatusIs( Response.StatusCode.EOF );
eof = true;
break;
default:
throw new SFTPException( "Unexpected packet: " + res.getType() );
}
return true;
}
@Override
@@ -268,26 +297,64 @@ public class RemoteFile
}
@Override
public int read(byte[] into, int off, int len)
throws IOException {
while (!eof && unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism
unconfirmedReads.add(asyncRead(fileOffset, len));
fileOffset += len;
public int read(byte[] into, int off, int len) throws IOException {
while (!eof && pending.available() <= 0) {
// we also need to go here for len <= 0, because pending may be at
// EOF in which case it would return -1 instead of 0
while (unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism
int reqLen = Math.max( 1024, len ); // don't be shy!
unconfirmedReads.add( RemoteFile.this.asyncRead( requestOffset, reqLen ) );
unconfirmedReadOffsets.add( requestOffset );
requestOffset += reqLen;
}
long nextOffset = unconfirmedReadOffsets.peek();
if (responseOffset != nextOffset) {
// the server could not give us all the data we needed, so
// we try to fill the gap synchronously
assert responseOffset < nextOffset;
assert 0 < (nextOffset - responseOffset);
assert (nextOffset - responseOffset) <= Integer.MAX_VALUE;
byte[] buf = new byte[(int) (nextOffset - responseOffset)];
int recvLen = RemoteFile.this.read( responseOffset, buf, 0, buf.length );
if (recvLen < 0) {
eof = true;
return -1;
}
if (0 == recvLen) // avoid infinite loops
throw new SFTPException( "Unexpected response size (0), bailing out" );
responseOffset += recvLen;
pending = new ByteArrayInputStream( buf, 0, recvLen );
}
else
if (!retrieveUnconfirmedRead( true /*blocking*/ )) {
// this may happen if we change prefetch strategy
// currently, we should never get here...
throw new IllegalStateException( "Could not retrieve data for pending read request" );
}
}
if (unconfirmedReads.isEmpty()) {
assert eof;
return -1;
}
// Retrieve first in
final Response res = unconfirmedReads.remove().retrieve(requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
final int recvLen = checkReadResponse(res, into, off);
if (recvLen == -1) {
eof = true;
}
return recvLen;
return pending.read( into, off, len );
}
@Override
public int available() throws IOException {
while (!eof && (pending.available() <= 0) && retrieveUnconfirmedRead( false /*blocking*/ ))
/*loop*/;
return pending.available();
}
}
}