It's not a Future, it's a Promise. Rename inspired by https://gist.github.com/959802.

Also Event now delegates to Promise instead of inheriting from it.
This commit is contained in:
Shikhar Bhushan
2011-05-06 23:11:12 +01:00
parent 803b154505
commit 40b401406c
15 changed files with 180 additions and 137 deletions

View File

@@ -17,16 +17,26 @@ package net.schmizz.concurrent;
import java.util.Collection;
public class FutureUtils {
public class ErrorDeliveryUtil {
public static void alertAll(Throwable x, Future... futures) {
for (Future f : futures)
f.error(x);
public static void alertPromises(Throwable x, Promise... promises) {
for (Promise p : promises)
p.deliverError(x);
}
public static void alertAll(Throwable x, Collection<? extends Future> futures) {
for (Future f : futures)
f.error(x);
public static void alertPromises(Throwable x, Collection<? extends Promise> promises) {
for (Promise p : promises)
p.deliverError(x);
}
public static void alertEvents(Throwable x, Event... events) {
for (Event e: events)
e.deliverError(x);
}
public static void alertEvents(Throwable x, Collection<? extends Event> events) {
for (Event e: events)
e.deliverError(x);
}
}

View File

@@ -18,21 +18,17 @@ package net.schmizz.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/*
* Syntactic sugar around Future
*/
/**
* A kind of {@link Future} that caters to boolean values.
* <p/>
* An event can be set, cleared, or awaited, similar to Python's {@code threading.event}. The key difference is that a
* waiter may be delivered an exception of parameterized type {@code T}. Furthermore, an event {@link #isSet()} when it
* is not {@code null} i.e. it can be either {@code true} or {@code false} when set.
*
* @see Future
* waiter may be delivered an exception of parameterized type {@code T}.
* <p/>
* Uses {@link Promise} under the hood.
*/
public class Event<T extends Throwable>
extends Future<Boolean, T> {
public class Event<T extends Throwable> {
private static final Object SOME = new Object();
private final Promise<Object, T> promise;
/**
* Creates this event with given {@code name} and exception {@code chainer}. Allocates a new {@link
@@ -42,7 +38,7 @@ public class Event<T extends Throwable>
* @param chainer {@link ExceptionChainer} that will be used for chaining exceptions
*/
public Event(String name, ExceptionChainer<T> chainer) {
super(name, chainer);
promise = new Promise<Object, T>(name, chainer);
}
/**
@@ -53,12 +49,30 @@ public class Event<T extends Throwable>
* @param lock lock to use
*/
public Event(String name, ExceptionChainer<T> chainer, ReentrantLock lock) {
super(name, chainer, lock);
promise = new Promise<Object, T>(name, chainer, lock);
}
/** Sets this event to be {@code true}. Short for {@code set(true)}. */
public void set() {
super.set(true);
promise.deliver(SOME);
}
/** Clear this event. A cleared event {@code !isSet()}. */
public void clear() {
promise.clear();
}
/** Deliver the error {@code t} (after chaining) to any present or future waiters. */
public void deliverError(Throwable t) {
promise.deliverError(t);
}
/**
* @return whether this event is in a 'set' state. An event is set by a call to {@link set()} or {@link
* deliverError}
*/
public boolean isSet() {
return promise.isDelivered();
}
/**
@@ -68,7 +82,7 @@ public class Event<T extends Throwable>
*/
public void await()
throws T {
super.get();
promise.retrieve();
}
/**
@@ -81,13 +95,13 @@ public class Event<T extends Throwable>
*/
public void await(long timeout, TimeUnit unit)
throws T {
super.get(timeout, unit);
promise.retrieve(timeout, unit);
}
/**
* Await this event to have a definite {@code true} or {@code false} value, for {@code timeout} duration.
*
* If the definite value is not available by the time timeout expires, returns {@code null}.
* <p/>
* If the definite value is not available when the timeout expires, returns {@code false}.
*
* @param timeout timeout
* @param unit the time unit for the timeout
@@ -96,7 +110,32 @@ public class Event<T extends Throwable>
*/
public boolean tryAwait(long timeout, TimeUnit unit)
throws T {
return super.tryGet(timeout, unit) != null;
return promise.tryRetrieve(timeout, unit) != null;
}
/** @return whether there are any threads waiting on this event to be set. */
public boolean hasWaiters() {
return promise.hasWaiters();
}
/** @return whether this event is in an error state i.e. has been delivered an error. */
public boolean inError() {
return promise.inError();
}
/** Acquire the lock associated with this event. */
public void lock() {
promise.lock();
}
/** Release the lock associated with this event. */
public void unlock() {
promise.unlock();
}
@Override
public String toString() {
return promise.toString();
}
}

View File

@@ -24,13 +24,13 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Represents future data of the parameterized type {@code V} and allows waiting on it. An exception may also be
* Represents promised data of the parameterized type {@code V} and allows waiting on it. An exception may also be
* delivered to a waiter, and will be of the parameterized type {@code T}.
* <p/>
* For atomic operations on a future, e.g. checking if a value is set and if it is not then setting it - in other words,
* Compare-And-Set type operations - the associated lock for the future should be acquired while doing so.
* For atomic operations on a promise, e.g. checking if a value is delivered and if it is not then setting it, the
* associated lock for the promise should be acquired while doing so.
*/
public class Future<V, T extends Throwable> {
public class Promise<V, T extends Throwable> {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -43,24 +43,24 @@ public class Future<V, T extends Throwable> {
private T pendingEx;
/**
* Creates this future with given {@code name} and exception {@code chainer}. Allocates a new {@link
* java.util.concurrent.locks.Lock lock} object for this future.
* Creates this promise with given {@code name} and exception {@code chainer}. Allocates a new {@link
* java.util.concurrent.locks.Lock lock} object for this promise.
*
* @param name name of this future
* @param name name of this promise
* @param chainer {@link ExceptionChainer} that will be used for chaining exceptions
*/
public Future(String name, ExceptionChainer<T> chainer) {
public Promise(String name, ExceptionChainer<T> chainer) {
this(name, chainer, null);
}
/**
* Creates this future with given {@code name}, exception {@code chainer}, and associated {@code lock}.
* Creates this promise with given {@code name}, exception {@code chainer}, and associated {@code lock}.
*
* @param name name of this future
* @param name name of this promise
* @param chainer {@link ExceptionChainer} that will be used for chaining exceptions
* @param lock lock to use
*/
public Future(String name, ExceptionChainer<T> chainer, ReentrantLock lock) {
public Promise(String name, ExceptionChainer<T> chainer, ReentrantLock lock) {
this.name = name;
this.chainer = chainer;
this.lock = lock == null ? new ReentrantLock() : lock;
@@ -68,73 +68,73 @@ public class Future<V, T extends Throwable> {
}
/**
* Set this future's value to {@code val}. Any waiters will be delivered this value.
* Set this promise's value to {@code val}. Any waiters will be delivered this value.
*
* @param val the value
*/
public void set(V val) {
lock();
public void deliver(V val) {
lock.lock();
try {
log.debug("Setting <<{}>> to `{}`", name, val);
this.val = val;
cond.signalAll();
} finally {
unlock();
lock.unlock();
}
}
/**
* Queues error that will be thrown in any waiting thread or any thread that attempts to wait on this future
* Queues error that will be thrown in any waiting thread or any thread that attempts to wait on this promise
* hereafter.
*
* @param e the error
*/
public void error(Throwable e) {
lock();
public void deliverError(Throwable e) {
lock.lock();
try {
pendingEx = chainer.chain(e);
cond.signalAll();
} finally {
unlock();
lock.unlock();
}
}
/** Clears this future by setting its value and queued exception to {@code null}. */
/** Clears this promise by setting its value and queued exception to {@code null}. */
public void clear() {
lock();
lock.lock();
try {
pendingEx = null;
set(null);
deliver(null);
} finally {
unlock();
lock.unlock();
}
}
/**
* Wait indefinitely for this future's value to be set.
* Wait indefinitely for this promise's value to be deliver.
*
* @return the value
*
* @throws T in case another thread informs the future of an error meanwhile
* @throws T in case another thread informs the promise of an error meanwhile
*/
public V get()
public V retrieve()
throws T {
return tryGet(0, TimeUnit.SECONDS);
return tryRetrieve(0, TimeUnit.SECONDS);
}
/**
* Wait for {@code timeout} duration for this future's value to be set.
* Wait for {@code timeout} duration for this promise's value to be deliver.
*
* @param timeout the timeout
* @param unit time unit for the timeout
*
* @return the value
*
* @throws T in case another thread informs the future of an error meanwhile, or the timeout expires
* @throws T in case another thread informs the promise of an error meanwhile, or the timeout expires
*/
public V get(long timeout, TimeUnit unit)
public V retrieve(long timeout, TimeUnit unit)
throws T {
final V value = tryGet(timeout, unit);
final V value = tryRetrieve(timeout, unit);
if (value == null)
throw chainer.chain(new TimeoutException("Timeout expired"));
else
@@ -142,20 +142,20 @@ public class Future<V, T extends Throwable> {
}
/**
* Wait for {@code timeout} duration for this future's value to be set.
*
* If the value is not set by the time the timeout expires, returns {@code null}.
* Wait for {@code timeout} duration for this promise's value to be deliver.
* <p/>
* If the value is not deliver by the time the timeout expires, returns {@code null}.
*
* @param timeout the timeout
* @param unit time unit for the timeout
*
* @return the value or {@code null}
*
* @throws T in case another thread informs the future of an error meanwhile
* @throws T in case another thread informs the promise of an error meanwhile
*/
public V tryGet(long timeout, TimeUnit unit)
public V tryRetrieve(long timeout, TimeUnit unit)
throws T {
lock();
lock.lock();
try {
if (pendingEx != null)
throw pendingEx;
@@ -175,52 +175,46 @@ public class Future<V, T extends Throwable> {
} catch (InterruptedException ie) {
throw chainer.chain(ie);
} finally {
unlock();
lock.unlock();
}
}
/** @return whether this future has a value set, and no error waiting to pop. */
public boolean isSet() {
lock();
/** @return whether this promise has a value delivered, and no error waiting to pop. */
public boolean isDelivered() {
lock.lock();
try {
return pendingEx == null && val != null;
} finally {
unlock();
lock.unlock();
}
}
/** @return whether this future currently has an error set. */
public boolean hasError() {
lock();
/** @return whether this promise has been delivered an error. */
public boolean inError() {
lock.lock();
try {
return pendingEx != null;
} finally {
unlock();
lock.unlock();
}
}
/** @return whether this future has threads waiting on it. */
/** @return whether this promise has threads waiting on it. */
public boolean hasWaiters() {
lock();
lock.lock();
try {
return lock.hasWaiters(cond);
} finally {
unlock();
lock.unlock();
}
}
/**
* Lock using the associated lock. Use as part of a {@code try-finally} construct in conjunction with {@link
* #unlock()}.
*/
/** Acquire the lock associated with this promise. */
public void lock() {
lock.lock();
}
/**
* Unlock using the associated lock. Use as part of a {@code try-finally} construct in conjunction with {@link
* #lock()}.
*/
/** Release the lock associated with this promise. */
public void unlock() {
lock.unlock();
}

View File

@@ -108,7 +108,7 @@ public class StreamCopier {
doneEvent.set();
} catch (IOException ioe) {
log.error("In pipe from {} to {}: " + ioe.toString(), in, out);
doneEvent.error(ioe);
doneEvent.deliverError(ioe);
}
}
}.start();

View File

@@ -15,7 +15,7 @@
*/
package net.schmizz.sshj.connection;
import net.schmizz.concurrent.Future;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.common.SSHPacket;
import net.schmizz.sshj.connection.channel.Channel;
import net.schmizz.sshj.connection.channel.OpenFailException;
@@ -89,12 +89,12 @@ public interface Connection {
* @param wantReply whether a reply is requested
* @param specifics {@link SSHPacket} containing fields specific to the request
*
* @return a {@link Future} for the reply data (in case {@code wantReply} is true) which allows waiting on the
* @return a {@link net.schmizz.concurrent.Promise} for the reply data (in case {@code wantReply} is true) which allows waiting on the
* reply, or {@code null} if a reply is not requested.
*
* @throws TransportException if there is an error sending the request
*/
public Future<SSHPacket, ConnectionException> sendGlobalRequest(String name, boolean wantReply,
public Promise<SSHPacket, ConnectionException> sendGlobalRequest(String name, boolean wantReply,
byte[] specifics)
throws TransportException;

View File

@@ -15,8 +15,8 @@
*/
package net.schmizz.sshj.connection;
import net.schmizz.concurrent.Future;
import net.schmizz.concurrent.FutureUtils;
import net.schmizz.concurrent.Promise;
import net.schmizz.concurrent.ErrorDeliveryUtil;
import net.schmizz.sshj.AbstractService;
import net.schmizz.sshj.common.DisconnectReason;
import net.schmizz.sshj.common.ErrorNotifiable;
@@ -30,7 +30,6 @@ import net.schmizz.sshj.connection.channel.forwarded.ForwardedChannelOpener;
import net.schmizz.sshj.transport.Transport;
import net.schmizz.sshj.transport.TransportException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
@@ -50,7 +49,7 @@ public class ConnectionImpl
private final Map<String, ForwardedChannelOpener> openers = new ConcurrentHashMap<String, ForwardedChannelOpener>();
private final Queue<Future<SSHPacket, ConnectionException>> globalReqFutures = new LinkedList<Future<SSHPacket, ConnectionException>>();
private final Queue<Promise<SSHPacket, ConnectionException>> globalReqPromises = new LinkedList<Promise<SSHPacket, ConnectionException>>();
private int windowSize = 2048 * 1024;
private int maxPacketSize = 32 * 1024;
@@ -180,34 +179,34 @@ public class ConnectionImpl
}
@Override
public Future<SSHPacket, ConnectionException> sendGlobalRequest(String name, boolean wantReply,
public Promise<SSHPacket, ConnectionException> sendGlobalRequest(String name, boolean wantReply,
byte[] specifics)
throws TransportException {
synchronized (globalReqFutures) {
synchronized (globalReqPromises) {
log.info("Making global request for `{}`", name);
trans.write(new SSHPacket(Message.GLOBAL_REQUEST).putString(name)
.putBoolean(wantReply).putRawBytes(specifics));
Future<SSHPacket, ConnectionException> future = null;
Promise<SSHPacket, ConnectionException> promise = null;
if (wantReply) {
future = new Future<SSHPacket, ConnectionException>("global req for " + name, ConnectionException.chainer);
globalReqFutures.add(future);
promise = new Promise<SSHPacket, ConnectionException>("global req for " + name, ConnectionException.chainer);
globalReqPromises.add(promise);
}
return future;
return promise;
}
}
private void gotGlobalReqResponse(SSHPacket response)
throws ConnectionException {
synchronized (globalReqFutures) {
Future<SSHPacket, ConnectionException> gr = globalReqFutures.poll();
synchronized (globalReqPromises) {
Promise<SSHPacket, ConnectionException> gr = globalReqPromises.poll();
if (gr == null)
throw new ConnectionException(DisconnectReason.PROTOCOL_ERROR,
"Got a global request response when none was requested");
else if (response == null)
gr.error(new ConnectionException("Global request [" + gr + "] failed"));
gr.deliverError(new ConnectionException("Global request [" + gr + "] failed"));
else
gr.set(response);
gr.deliver(response);
}
}
@@ -235,9 +234,9 @@ public class ConnectionImpl
@Override
public void notifyError(SSHException error) {
super.notifyError(error);
synchronized (globalReqFutures) {
FutureUtils.alertAll(error, globalReqFutures);
globalReqFutures.clear();
synchronized (globalReqPromises) {
ErrorDeliveryUtil.alertPromises(error, globalReqPromises);
globalReqPromises.clear();
}
ErrorNotifiable.Util.alertAll(error, channels.values());
channels.clear();

View File

@@ -36,7 +36,7 @@
package net.schmizz.sshj.connection.channel;
import net.schmizz.concurrent.Event;
import net.schmizz.concurrent.FutureUtils;
import net.schmizz.concurrent.ErrorDeliveryUtil;
import net.schmizz.sshj.common.Buffer;
import net.schmizz.sshj.common.ByteArrayUtils;
import net.schmizz.sshj.common.DisconnectReason;
@@ -236,8 +236,8 @@ public abstract class AbstractChannel
public void notifyError(SSHException error) {
log.debug("Channel #{} got notified of {}", getID(), error.toString());
FutureUtils.alertAll(error, open, close);
FutureUtils.alertAll(error, chanReqResponseEvents);
ErrorDeliveryUtil.alertEvents(error, open, close);
ErrorDeliveryUtil.alertEvents(error, chanReqResponseEvents);
in.notifyError(error);
out.notifyError(error);
@@ -258,7 +258,7 @@ public abstract class AbstractChannel
try {
sendClose();
} catch (TransportException e) {
if (!close.hasError())
if (!close.inError())
throw e;
}
close.await(conn.getTimeout(), TimeUnit.SECONDS);
@@ -372,7 +372,7 @@ public abstract class AbstractChannel
if (success)
responseEvent.set();
else
responseEvent.error(new ConnectionException("Request failed"));
responseEvent.deliverError(new ConnectionException("Request failed"));
} else
throw new ConnectionException(
DisconnectReason.PROTOCOL_ERROR,

View File

@@ -73,7 +73,7 @@ public abstract class AbstractDirectChannel
}
private void gotOpenFailure(SSHPacket buf) {
open.error(new OpenFailException(getType(), buf.readInt(), buf.readString()));
open.deliverError(new OpenFailException(getType(), buf.readInt(), buf.readString()));
finishOff();
}

View File

@@ -196,7 +196,7 @@ public class RemotePortForwarder
final byte[] specifics = new Buffer.PlainBuffer().putString(forward.address).putInt(forward.port)
.getCompactData();
return conn.sendGlobalRequest(reqName, true, specifics)
.get(conn.getTimeout(), TimeUnit.SECONDS);
.retrieve(conn.getTimeout(), TimeUnit.SECONDS);
}
/** @return the active forwards. */

View File

@@ -15,7 +15,7 @@
*/
package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Future;
import net.schmizz.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@ public class PacketReader
private final Logger log = LoggerFactory.getLogger(getClass());
private final InputStream in;
private final Map<Long, Future<Response, SFTPException>> futures = new ConcurrentHashMap<Long, Future<Response, SFTPException>>();
private final Map<Long, Promise<Response, SFTPException>> promises = new ConcurrentHashMap<Long, Promise<Response, SFTPException>>();
private final SFTPPacket<Response> packet = new SFTPPacket<Response>();
private final byte[] lenBuf = new byte[4];
private final SFTPEngine engine;
@@ -85,25 +85,25 @@ public class PacketReader
handle();
}
} catch (IOException e) {
for (Future<Response, SFTPException> future : futures.values())
future.error(e);
for (Promise<Response, SFTPException> promise : promises.values())
promise.deliverError(e);
}
}
public void handle()
throws SFTPException {
Response resp = new Response(packet, engine.getOperativeProtocolVersion());
Future<Response, SFTPException> future = futures.remove(resp.getRequestID());
Promise<Response, SFTPException> promise = promises.remove(resp.getRequestID());
log.debug("Received {} packet", resp.getType());
if (future == null)
if (promise == null)
throw new SFTPException("Received [" + resp.readType() + "] response for request-id " + resp.getRequestID()
+ ", no such request was made");
else
future.set(resp);
promise.deliver(resp);
}
public void expectResponseTo(Request req) {
futures.put(req.getRequestID(), req.getResponseFuture());
promises.put(req.getRequestID(), req.getResponsePromise());
}
}

View File

@@ -15,20 +15,20 @@
*/
package net.schmizz.sshj.sftp;
import net.schmizz.concurrent.Future;
import net.schmizz.concurrent.Promise;
public class Request
extends SFTPPacket<Request> {
private final PacketType type;
private final long reqID;
private final Future<Response, SFTPException> responseFuture;
private final Promise<Response, SFTPException> responsePromise;
public Request(PacketType type, long reqID) {
super(type);
this.type = type;
this.reqID = reqID;
responseFuture = new Future<Response, SFTPException>("sftp / " + reqID, SFTPException.chainer);
responsePromise = new Promise<Response, SFTPException>("sftp / " + reqID, SFTPException.chainer);
putInt(reqID);
}
@@ -40,8 +40,8 @@ public class Request
return type;
}
public Future<Response, SFTPException> getResponseFuture() {
return responseFuture;
public Promise<Response, SFTPException> getResponsePromise() {
return responsePromise;
}
@Override

View File

@@ -102,7 +102,7 @@ public class SFTPEngine
reader.expectResponseTo(req);
log.debug("Sending {}", req);
transmit(req);
return req.getResponseFuture().get(timeout, TimeUnit.SECONDS);
return req.getResponsePromise().retrieve(timeout, TimeUnit.SECONDS);
}
public RemoteFile open(String path, Set<OpenMode> modes, FileAttributes fa)

View File

@@ -36,7 +36,7 @@
package net.schmizz.sshj.transport;
import net.schmizz.concurrent.Event;
import net.schmizz.concurrent.FutureUtils;
import net.schmizz.concurrent.ErrorDeliveryUtil;
import net.schmizz.sshj.common.Buffer;
import net.schmizz.sshj.common.DisconnectReason;
import net.schmizz.sshj.common.ErrorNotifiable;
@@ -397,7 +397,7 @@ final class KeyExchanger
@Override
public void notifyError(SSHException error) {
log.debug("Got notified of {}", error.toString());
FutureUtils.alertAll(error, kexInitSent, done);
ErrorDeliveryUtil.alertEvents(error, kexInitSent, done);
}
}

View File

@@ -36,7 +36,7 @@
package net.schmizz.sshj.transport;
import net.schmizz.concurrent.Event;
import net.schmizz.concurrent.FutureUtils;
import net.schmizz.concurrent.ErrorDeliveryUtil;
import net.schmizz.sshj.AbstractService;
import net.schmizz.sshj.Config;
import net.schmizz.sshj.Service;
@@ -562,7 +562,7 @@ public final class TransportImpl
disconnectListener.notifyDisconnect(causeOfDeath.getDisconnectReason());
FutureUtils.alertAll(causeOfDeath, close, serviceAccept);
ErrorDeliveryUtil.alertEvents(causeOfDeath, close, serviceAccept);
kexer.notifyError(causeOfDeath);
getService().notifyError(causeOfDeath);
setService(nullService);

View File

@@ -15,7 +15,7 @@
*/
package net.schmizz.sshj.userauth;
import net.schmizz.concurrent.Event;
import net.schmizz.concurrent.Promise;
import net.schmizz.sshj.AbstractService;
import net.schmizz.sshj.Service;
import net.schmizz.sshj.common.DisconnectReason;
@@ -42,7 +42,8 @@ public class UserAuthImpl
private final Deque<UserAuthException> savedEx = new ArrayDeque<UserAuthException>();
private final Event<UserAuthException> result = new Event<UserAuthException>("userauth result", UserAuthException.chainer);
private final Promise<Boolean, UserAuthException> result
= new Promise<Boolean, UserAuthException>("userauth result", UserAuthException.chainer);
private String username;
private AuthMethod currentMethod;
@@ -172,7 +173,7 @@ public class UserAuthImpl
@Override
public void notifyError(SSHException error) {
super.notifyError(error);
result.error(error);
result.deliverError(error);
}
private void clearState() {
@@ -194,14 +195,14 @@ public class UserAuthImpl
currentMethod.request();
else {
saveException(currentMethod.getName() + " auth failed");
result.set(false);
result.deliver(false);
}
}
private void gotSuccess() {
trans.setAuthenticated(); // So it can put delayed compression into force if applicable
trans.setService(nextService); // We aren't in charge anymore, next service is
result.set(true);
result.deliver(true);
}
private void gotUnknown(Message msg, SSHPacket buf)
@@ -215,7 +216,7 @@ public class UserAuthImpl
try {
currentMethod.handle(msg, buf);
} catch (UserAuthException e) {
result.error(e);
result.deliverError(e);
}
}
@@ -234,7 +235,7 @@ public class UserAuthImpl
result.clear();
meth.init(this);
meth.request();
return result.get(timeout, TimeUnit.SECONDS);
return result.retrieve(timeout, TimeUnit.SECONDS);
}
}