Get rid of Requester.doRequest(), replace with request() method that returns the response promise. Make getTimeout() part of the interface.

This commit is contained in:
Shikhar Bhushan
2013-02-23 16:22:28 -05:00
parent 77924fd0be
commit 1ced1d4fdc
7 changed files with 38 additions and 23 deletions

View File

@@ -103,8 +103,11 @@ public class PacketReader
promise.deliver(resp); promise.deliver(resp);
} }
public void expectResponseTo(Request req) { public Promise<Response, SFTPException> expectResponseTo(long requestId) {
promises.put(req.getRequestID(), req.getResponsePromise()); final Promise<Response, SFTPException> promise
= new Promise<Response, SFTPException>("sftp / " + requestId, SFTPException.chainer);
promises.put(requestId, promise);
return promise;
} }
} }

View File

@@ -20,6 +20,7 @@ import net.schmizz.sshj.sftp.Response.StatusCode;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
public class RemoteDirectory public class RemoteDirectory
extends RemoteResource { extends RemoteResource {
@@ -33,7 +34,8 @@ public class RemoteDirectory
List<RemoteResourceInfo> rri = new LinkedList<RemoteResourceInfo>(); List<RemoteResourceInfo> rri = new LinkedList<RemoteResourceInfo>();
loop: loop:
for (; ; ) { for (; ; ) {
Response res = requester.doRequest(newRequest(PacketType.READDIR)); final Response res = requester.request(newRequest(PacketType.READDIR))
.retrieve(requester.getTimeout(), TimeUnit.SECONDS);
switch (res.getType()) { switch (res.getType()) {
case NAME: case NAME:

View File

@@ -20,6 +20,7 @@ import net.schmizz.sshj.sftp.Response.StatusCode;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
public class RemoteFile public class RemoteFile
extends RemoteResource { extends RemoteResource {
@@ -38,7 +39,8 @@ public class RemoteFile
public FileAttributes fetchAttributes() public FileAttributes fetchAttributes()
throws IOException { throws IOException {
return requester.doRequest(newRequest(PacketType.FSTAT)) return requester.request(newRequest(PacketType.FSTAT))
.retrieve(requester.getTimeout(), TimeUnit.SECONDS)
.ensurePacketTypeIs(PacketType.ATTRS) .ensurePacketTypeIs(PacketType.ATTRS)
.readFileAttributes(); .readFileAttributes();
} }
@@ -55,7 +57,9 @@ public class RemoteFile
public int read(long fileOffset, byte[] to, int offset, int len) public int read(long fileOffset, byte[] to, int offset, int len)
throws IOException { throws IOException {
Response res = requester.doRequest(newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len)); final Response res = requester.request(
newRequest(PacketType.READ).putUInt64(fileOffset).putUInt32(len)
).retrieve(requester.getTimeout(), TimeUnit.SECONDS);
switch (res.getType()) { switch (res.getType()) {
case DATA: case DATA:
int recvLen = res.readUInt32AsInt(); int recvLen = res.readUInt32AsInt();
@@ -73,16 +77,17 @@ public class RemoteFile
public void write(long fileOffset, byte[] data, int off, int len) public void write(long fileOffset, byte[] data, int off, int len)
throws IOException { throws IOException {
requester.doRequest(newRequest(PacketType.WRITE) requester.request(newRequest(PacketType.WRITE)
.putUInt64(fileOffset) .putUInt64(fileOffset)
.putUInt32(len - off) .putUInt32(len - off)
.putRawBytes(data, off, len) .putRawBytes(data, off, len)
).ensureStatusPacketIsOK(); ).retrieve(requester.getTimeout(), TimeUnit.SECONDS).ensureStatusPacketIsOK();
} }
public void setAttributes(FileAttributes attrs) public void setAttributes(FileAttributes attrs)
throws IOException { throws IOException {
requester.doRequest(newRequest(PacketType.FSETSTAT).putFileAttributes(attrs)).ensureStatusPacketIsOK(); requester.request(newRequest(PacketType.FSETSTAT).putFileAttributes(attrs))
.retrieve(requester.getTimeout(), TimeUnit.SECONDS).ensureStatusPacketIsOK();
} }
public int getOutgoingPacketOverhead() { public int getOutgoingPacketOverhead() {

View File

@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
public abstract class RemoteResource public abstract class RemoteResource
implements Closeable { implements Closeable {
@@ -49,7 +50,9 @@ public abstract class RemoteResource
public void close() public void close()
throws IOException { throws IOException {
log.debug("Closing `{}`", this); log.debug("Closing `{}`", this);
requester.doRequest(newRequest(PacketType.CLOSE)).ensureStatusPacketIsOK(); requester.request(newRequest(PacketType.CLOSE))
.retrieve(requester.getTimeout(), TimeUnit.SECONDS)
.ensureStatusPacketIsOK();
} }
@Override @Override

View File

@@ -15,20 +15,16 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
public final class Request public final class Request
extends SFTPPacket<Request> { extends SFTPPacket<Request> {
private final PacketType type; private final PacketType type;
private final long reqID; private final long reqID;
private final Promise<Response, SFTPException> responsePromise;
public Request(PacketType type, long reqID) { public Request(PacketType type, long reqID) {
super(type); super(type);
this.type = type; this.type = type;
this.reqID = reqID; this.reqID = reqID;
responsePromise = new Promise<Response, SFTPException>("sftp / " + reqID, SFTPException.chainer);
putUInt32(reqID); putUInt32(reqID);
} }
@@ -40,10 +36,6 @@ public final class Request
return type; return type;
} }
public Promise<Response, SFTPException> getResponsePromise() {
return responsePromise;
}
@Override @Override
public String toString() { public String toString() {
return "Request{" + reqID + ";" + type + "}"; return "Request{" + reqID + ";" + type + "}";

View File

@@ -15,6 +15,8 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import java.io.IOException; import java.io.IOException;
public interface Requester { public interface Requester {
@@ -23,7 +25,9 @@ public interface Requester {
Request newRequest(PacketType type); Request newRequest(PacketType type);
Response doRequest(Request req) Promise<Response, SFTPException> request(Request req)
throws IOException; throws IOException;
int getTimeout();
} }

View File

@@ -15,6 +15,7 @@
*/ */
package net.schmizz.sshj.sftp; package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.common.SSHException; import net.schmizz.sshj.common.SSHException;
import net.schmizz.sshj.connection.channel.direct.Session.Subsystem; import net.schmizz.sshj.connection.channel.direct.Session.Subsystem;
import net.schmizz.sshj.connection.channel.direct.SessionFactory; import net.schmizz.sshj.connection.channel.direct.SessionFactory;
@@ -116,12 +117,17 @@ public class SFTPEngine
} }
@Override @Override
public Response doRequest(Request req) public Promise<Response, SFTPException> request(Request req)
throws IOException { throws IOException {
reader.expectResponseTo(req); final Promise<Response, SFTPException> promise = reader.expectResponseTo(req.getRequestID());
log.debug("Sending {}", req); log.debug("Sending {}", req);
transmit(req); transmit(req);
return req.getResponsePromise().retrieve(timeout, TimeUnit.SECONDS); return promise;
}
private Response doRequest(Request req)
throws IOException {
return request(req).retrieve(getTimeout(), TimeUnit.SECONDS);
} }
public RemoteFile open(String path, Set<OpenMode> modes, FileAttributes fa) public RemoteFile open(String path, Set<OpenMode> modes, FileAttributes fa)