don't close underlying RemoteFile when closing streams of that file - reverts f34667521d

This commit is contained in:
shikhar
2014-06-24 14:20:04 +05:30
parent 0a3ad4f68f
commit 0875417dde

View File

@@ -15,6 +15,10 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.common.Buffer;
import net.schmizz.sshj.sftp.Response.StatusCode;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@@ -22,10 +26,6 @@ import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.common.Buffer;
import net.schmizz.sshj.sftp.Response.StatusCode;
public class RemoteFile public class RemoteFile
extends RemoteResource { extends RemoteResource {
@@ -62,8 +62,9 @@ public class RemoteFile
return requester.request(newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len)); return requester.request(newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len));
} }
protected int checkReadResponse(Response res, byte[] to, int offset) throws Buffer.BufferException, SFTPException { protected int checkReadResponse(Response res, byte[] to, int offset)
switch(res.getType()) { throws Buffer.BufferException, SFTPException {
switch (res.getType()) {
case DATA: case DATA:
int recvLen = res.readUInt32AsInt(); int recvLen = res.readUInt32AsInt();
System.arraycopy(res.array(), res.rpos(), to, offset, recvLen); System.arraycopy(res.array(), res.rpos(), to, offset, recvLen);
@@ -86,9 +87,9 @@ public class RemoteFile
protected Promise<Response, SFTPException> asyncWrite(long fileOffset, byte[] data, int off, int len) protected Promise<Response, SFTPException> asyncWrite(long fileOffset, byte[] data, int off, int len)
throws IOException { throws IOException {
return requester.request(newRequest(PacketType.WRITE) return requester.request(newRequest(PacketType.WRITE)
.putUInt64(fileOffset) .putUInt64(fileOffset)
.putUInt32(len - off) .putUInt32(len - off)
.putRawBytes(data, off, len) .putRawBytes(data, off, len)
); );
} }
@@ -147,7 +148,7 @@ public class RemoteFile
@Override @Override
public void write(byte[] buf, int off, int len) public void write(byte[] buf, int off, int len)
throws IOException { throws IOException {
if(unconfirmedWrites.size() > maxUnconfirmedWrites) { if (unconfirmedWrites.size() > maxUnconfirmedWrites) {
checkWriteResponse(unconfirmedWrites.remove()); checkWriteResponse(unconfirmedWrites.remove());
} }
unconfirmedWrites.add(RemoteFile.this.asyncWrite(fileOffset, buf, off, len)); unconfirmedWrites.add(RemoteFile.this.asyncWrite(fileOffset, buf, off, len));
@@ -157,7 +158,7 @@ public class RemoteFile
@Override @Override
public void flush() public void flush()
throws IOException { throws IOException {
while(!unconfirmedWrites.isEmpty()) { while (!unconfirmedWrites.isEmpty()) {
checkWriteResponse(unconfirmedWrites.remove()); checkWriteResponse(unconfirmedWrites.remove());
} }
} }
@@ -166,8 +167,6 @@ public class RemoteFile
public void close() public void close()
throws IOException { throws IOException {
flush(); flush();
// Close handle
RemoteFile.this.close();
} }
} }
@@ -232,12 +231,12 @@ public class RemoteFile
@Override @Override
public int read(byte[] into, int off, int len) public int read(byte[] into, int off, int len)
throws IOException { throws IOException {
while(!eof && unconfirmedReads.size() <= maxUnconfirmedReads) { while (!eof && unconfirmedReads.size() <= maxUnconfirmedReads) {
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism // Send read requests as long as there is no EOF and we have not reached the maximum parallelism
unconfirmedReads.add(asyncRead(fileOffset, len)); unconfirmedReads.add(asyncRead(fileOffset, len));
fileOffset += len; fileOffset += len;
} }
if(unconfirmedReads.isEmpty()) { if (unconfirmedReads.isEmpty()) {
// Attempted to read while status was already received // Attempted to read while status was already received
return -1; return -1;
} }
@@ -245,22 +244,23 @@ public class RemoteFile
final Response res = unconfirmedReads.remove().retrieve( final Response res = unconfirmedReads.remove().retrieve(
requester.getTimeoutMs(), TimeUnit.MILLISECONDS); requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
final int recvLen = checkReadResponse(res, into, off); final int recvLen = checkReadResponse(res, into, off);
if(markPos != 0 && recvLen > readLimit) // Invalidate mark position if (markPos != 0 && recvLen > readLimit) // Invalidate mark position
{ {
markPos = 0; markPos = 0;
} }
if(-1 == recvLen) { if (-1 == recvLen) {
eof = true; eof = true;
} }
return recvLen; return recvLen;
} }
@Override @Override
public void close() throws IOException { public void close()
while(!unconfirmedReads.isEmpty()) { throws IOException {
while (!unconfirmedReads.isEmpty()) {
final Response res = unconfirmedReads.remove().retrieve( final Response res = unconfirmedReads.remove().retrieve(
requester.getTimeoutMs(), TimeUnit.MILLISECONDS); requester.getTimeoutMs(), TimeUnit.MILLISECONDS);
switch(res.getType()) { switch (res.getType()) {
case STATUS: case STATUS:
res.ensureStatusIs(StatusCode.EOF); res.ensureStatusIs(StatusCode.EOF);
break; break;
@@ -271,8 +271,6 @@ public class RemoteFile
throw new SFTPException("Unexpected packet: " + res.getType()); throw new SFTPException("Unexpected packet: " + res.getType());
} }
} }
// Close handle
RemoteFile.this.close();
} }
} }
} }