diff --git a/src/main/java/net/schmizz/concurrent/FutureUtils.java b/src/main/java/net/schmizz/concurrent/ErrorDeliveryUtil.java similarity index 52% rename from src/main/java/net/schmizz/concurrent/FutureUtils.java rename to src/main/java/net/schmizz/concurrent/ErrorDeliveryUtil.java index 87ccbdb6..a8ba77d0 100644 --- a/src/main/java/net/schmizz/concurrent/FutureUtils.java +++ b/src/main/java/net/schmizz/concurrent/ErrorDeliveryUtil.java @@ -17,16 +17,26 @@ package net.schmizz.concurrent; import java.util.Collection; -public class FutureUtils { +public class ErrorDeliveryUtil { - public static void alertAll(Throwable x, Future... futures) { - for (Future f : futures) - f.error(x); + public static void alertPromises(Throwable x, Promise... promises) { + for (Promise p : promises) + p.deliverError(x); } - public static void alertAll(Throwable x, Collection futures) { - for (Future f : futures) - f.error(x); + public static void alertPromises(Throwable x, Collection promises) { + for (Promise p : promises) + p.deliverError(x); + } + + public static void alertEvents(Throwable x, Event... events) { + for (Event e: events) + e.deliverError(x); + } + + public static void alertEvents(Throwable x, Collection events) { + for (Event e: events) + e.deliverError(x); } } diff --git a/src/main/java/net/schmizz/concurrent/Event.java b/src/main/java/net/schmizz/concurrent/Event.java index 2b70bf24..999a35c6 100644 --- a/src/main/java/net/schmizz/concurrent/Event.java +++ b/src/main/java/net/schmizz/concurrent/Event.java @@ -18,21 +18,17 @@ package net.schmizz.concurrent; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -/* - * Syntactic sugar around Future - */ - /** - * A kind of {@link Future} that caters to boolean values. - *

* An event can be set, cleared, or awaited, similar to Python's {@code threading.event}. The key difference is that a - * waiter may be delivered an exception of parameterized type {@code T}. Furthermore, an event {@link #isSet()} when it - * is not {@code null} i.e. it can be either {@code true} or {@code false} when set. - * - * @see Future + * waiter may be delivered an exception of parameterized type {@code T}. + *

+ * Uses {@link Promise} under the hood. */ -public class Event - extends Future { +public class Event { + + private static final Object SOME = new Object(); + + private final Promise promise; /** * Creates this event with given {@code name} and exception {@code chainer}. Allocates a new {@link @@ -42,7 +38,7 @@ public class Event * @param chainer {@link ExceptionChainer} that will be used for chaining exceptions */ public Event(String name, ExceptionChainer chainer) { - super(name, chainer); + promise = new Promise(name, chainer); } /** @@ -53,12 +49,30 @@ public class Event * @param lock lock to use */ public Event(String name, ExceptionChainer chainer, ReentrantLock lock) { - super(name, chainer, lock); + promise = new Promise(name, chainer, lock); } /** Sets this event to be {@code true}. Short for {@code set(true)}. */ public void set() { - super.set(true); + promise.deliver(SOME); + } + + /** Clear this event. A cleared event {@code !isSet()}. */ + public void clear() { + promise.clear(); + } + + /** Deliver the error {@code t} (after chaining) to any present or future waiters. */ + public void deliverError(Throwable t) { + promise.deliverError(t); + } + + /** + * @return whether this event is in a 'set' state. An event is set by a call to {@link set()} or {@link + * deliverError} + */ + public boolean isSet() { + return promise.isDelivered(); } /** @@ -68,7 +82,7 @@ public class Event */ public void await() throws T { - super.get(); + promise.retrieve(); } /** @@ -81,13 +95,13 @@ public class Event */ public void await(long timeout, TimeUnit unit) throws T { - super.get(timeout, unit); + promise.retrieve(timeout, unit); } /** * Await this event to have a definite {@code true} or {@code false} value, for {@code timeout} duration. - * - * If the definite value is not available by the time timeout expires, returns {@code null}. + *

+ * If the definite value is not available when the timeout expires, returns {@code false}. * * @param timeout timeout * @param unit the time unit for the timeout @@ -96,7 +110,32 @@ public class Event */ public boolean tryAwait(long timeout, TimeUnit unit) throws T { - return super.tryGet(timeout, unit) != null; + return promise.tryRetrieve(timeout, unit) != null; + } + + /** @return whether there are any threads waiting on this event to be set. */ + public boolean hasWaiters() { + return promise.hasWaiters(); + } + + /** @return whether this event is in an error state i.e. has been delivered an error. */ + public boolean inError() { + return promise.inError(); + } + + /** Acquire the lock associated with this event. */ + public void lock() { + promise.lock(); + } + + /** Release the lock associated with this event. */ + public void unlock() { + promise.unlock(); + } + + @Override + public String toString() { + return promise.toString(); } } \ No newline at end of file diff --git a/src/main/java/net/schmizz/concurrent/Future.java b/src/main/java/net/schmizz/concurrent/Promise.java similarity index 59% rename from src/main/java/net/schmizz/concurrent/Future.java rename to src/main/java/net/schmizz/concurrent/Promise.java index 851698a0..b4c4a007 100644 --- a/src/main/java/net/schmizz/concurrent/Future.java +++ b/src/main/java/net/schmizz/concurrent/Promise.java @@ -24,13 +24,13 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** - * Represents future data of the parameterized type {@code V} and allows waiting on it. An exception may also be + * Represents promised data of the parameterized type {@code V} and allows waiting on it. An exception may also be * delivered to a waiter, and will be of the parameterized type {@code T}. *

- * For atomic operations on a future, e.g. checking if a value is set and if it is not then setting it - in other words, - * Compare-And-Set type operations - the associated lock for the future should be acquired while doing so. + * For atomic operations on a promise, e.g. checking if a value is delivered and if it is not then setting it, the + * associated lock for the promise should be acquired while doing so. */ -public class Future { +public class Promise { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -43,24 +43,24 @@ public class Future { private T pendingEx; /** - * Creates this future with given {@code name} and exception {@code chainer}. Allocates a new {@link - * java.util.concurrent.locks.Lock lock} object for this future. + * Creates this promise with given {@code name} and exception {@code chainer}. Allocates a new {@link + * java.util.concurrent.locks.Lock lock} object for this promise. * - * @param name name of this future + * @param name name of this promise * @param chainer {@link ExceptionChainer} that will be used for chaining exceptions */ - public Future(String name, ExceptionChainer chainer) { + public Promise(String name, ExceptionChainer chainer) { this(name, chainer, null); } /** - * Creates this future with given {@code name}, exception {@code chainer}, and associated {@code lock}. + * Creates this promise with given {@code name}, exception {@code chainer}, and associated {@code lock}. * - * @param name name of this future + * @param name name of this promise * @param chainer {@link ExceptionChainer} that will be used for chaining exceptions * @param lock lock to use */ - public Future(String name, ExceptionChainer chainer, ReentrantLock lock) { + public Promise(String name, ExceptionChainer chainer, ReentrantLock lock) { this.name = name; this.chainer = chainer; this.lock = lock == null ? new ReentrantLock() : lock; @@ -68,73 +68,73 @@ public class Future { } /** - * Set this future's value to {@code val}. Any waiters will be delivered this value. + * Set this promise's value to {@code val}. Any waiters will be delivered this value. * * @param val the value */ - public void set(V val) { - lock(); + public void deliver(V val) { + lock.lock(); try { log.debug("Setting <<{}>> to `{}`", name, val); this.val = val; cond.signalAll(); } finally { - unlock(); + lock.unlock(); } } /** - * Queues error that will be thrown in any waiting thread or any thread that attempts to wait on this future + * Queues error that will be thrown in any waiting thread or any thread that attempts to wait on this promise * hereafter. * * @param e the error */ - public void error(Throwable e) { - lock(); + public void deliverError(Throwable e) { + lock.lock(); try { pendingEx = chainer.chain(e); cond.signalAll(); } finally { - unlock(); + lock.unlock(); } } - /** Clears this future by setting its value and queued exception to {@code null}. */ + /** Clears this promise by setting its value and queued exception to {@code null}. */ public void clear() { - lock(); + lock.lock(); try { pendingEx = null; - set(null); + deliver(null); } finally { - unlock(); + lock.unlock(); } } /** - * Wait indefinitely for this future's value to be set. + * Wait indefinitely for this promise's value to be deliver. * * @return the value * - * @throws T in case another thread informs the future of an error meanwhile + * @throws T in case another thread informs the promise of an error meanwhile */ - public V get() + public V retrieve() throws T { - return tryGet(0, TimeUnit.SECONDS); + return tryRetrieve(0, TimeUnit.SECONDS); } /** - * Wait for {@code timeout} duration for this future's value to be set. + * Wait for {@code timeout} duration for this promise's value to be deliver. * * @param timeout the timeout * @param unit time unit for the timeout * * @return the value * - * @throws T in case another thread informs the future of an error meanwhile, or the timeout expires + * @throws T in case another thread informs the promise of an error meanwhile, or the timeout expires */ - public V get(long timeout, TimeUnit unit) + public V retrieve(long timeout, TimeUnit unit) throws T { - final V value = tryGet(timeout, unit); + final V value = tryRetrieve(timeout, unit); if (value == null) throw chainer.chain(new TimeoutException("Timeout expired")); else @@ -142,20 +142,20 @@ public class Future { } /** - * Wait for {@code timeout} duration for this future's value to be set. - * - * If the value is not set by the time the timeout expires, returns {@code null}. + * Wait for {@code timeout} duration for this promise's value to be deliver. + *

+ * If the value is not deliver by the time the timeout expires, returns {@code null}. * * @param timeout the timeout * @param unit time unit for the timeout * * @return the value or {@code null} * - * @throws T in case another thread informs the future of an error meanwhile + * @throws T in case another thread informs the promise of an error meanwhile */ - public V tryGet(long timeout, TimeUnit unit) + public V tryRetrieve(long timeout, TimeUnit unit) throws T { - lock(); + lock.lock(); try { if (pendingEx != null) throw pendingEx; @@ -175,52 +175,46 @@ public class Future { } catch (InterruptedException ie) { throw chainer.chain(ie); } finally { - unlock(); + lock.unlock(); } } - /** @return whether this future has a value set, and no error waiting to pop. */ - public boolean isSet() { - lock(); + /** @return whether this promise has a value delivered, and no error waiting to pop. */ + public boolean isDelivered() { + lock.lock(); try { return pendingEx == null && val != null; } finally { - unlock(); + lock.unlock(); } } - /** @return whether this future currently has an error set. */ - public boolean hasError() { - lock(); + /** @return whether this promise has been delivered an error. */ + public boolean inError() { + lock.lock(); try { return pendingEx != null; } finally { - unlock(); + lock.unlock(); } } - /** @return whether this future has threads waiting on it. */ + /** @return whether this promise has threads waiting on it. */ public boolean hasWaiters() { - lock(); + lock.lock(); try { return lock.hasWaiters(cond); } finally { - unlock(); + lock.unlock(); } } - /** - * Lock using the associated lock. Use as part of a {@code try-finally} construct in conjunction with {@link - * #unlock()}. - */ + /** Acquire the lock associated with this promise. */ public void lock() { lock.lock(); } - /** - * Unlock using the associated lock. Use as part of a {@code try-finally} construct in conjunction with {@link - * #lock()}. - */ + /** Release the lock associated with this promise. */ public void unlock() { lock.unlock(); } diff --git a/src/main/java/net/schmizz/sshj/common/StreamCopier.java b/src/main/java/net/schmizz/sshj/common/StreamCopier.java index e5786e1c..66fbee76 100644 --- a/src/main/java/net/schmizz/sshj/common/StreamCopier.java +++ b/src/main/java/net/schmizz/sshj/common/StreamCopier.java @@ -108,7 +108,7 @@ public class StreamCopier { doneEvent.set(); } catch (IOException ioe) { log.error("In pipe from {} to {}: " + ioe.toString(), in, out); - doneEvent.error(ioe); + doneEvent.deliverError(ioe); } } }.start(); diff --git a/src/main/java/net/schmizz/sshj/connection/Connection.java b/src/main/java/net/schmizz/sshj/connection/Connection.java index 167c7b11..538590f8 100644 --- a/src/main/java/net/schmizz/sshj/connection/Connection.java +++ b/src/main/java/net/schmizz/sshj/connection/Connection.java @@ -15,7 +15,7 @@ */ package net.schmizz.sshj.connection; -import net.schmizz.concurrent.Future; +import net.schmizz.concurrent.Promise; import net.schmizz.sshj.common.SSHPacket; import net.schmizz.sshj.connection.channel.Channel; import net.schmizz.sshj.connection.channel.OpenFailException; @@ -89,12 +89,12 @@ public interface Connection { * @param wantReply whether a reply is requested * @param specifics {@link SSHPacket} containing fields specific to the request * - * @return a {@link Future} for the reply data (in case {@code wantReply} is true) which allows waiting on the + * @return a {@link net.schmizz.concurrent.Promise} for the reply data (in case {@code wantReply} is true) which allows waiting on the * reply, or {@code null} if a reply is not requested. * * @throws TransportException if there is an error sending the request */ - public Future sendGlobalRequest(String name, boolean wantReply, + public Promise sendGlobalRequest(String name, boolean wantReply, byte[] specifics) throws TransportException; diff --git a/src/main/java/net/schmizz/sshj/connection/ConnectionImpl.java b/src/main/java/net/schmizz/sshj/connection/ConnectionImpl.java index adfb4a13..6144f6c8 100644 --- a/src/main/java/net/schmizz/sshj/connection/ConnectionImpl.java +++ b/src/main/java/net/schmizz/sshj/connection/ConnectionImpl.java @@ -15,8 +15,8 @@ */ package net.schmizz.sshj.connection; -import net.schmizz.concurrent.Future; -import net.schmizz.concurrent.FutureUtils; +import net.schmizz.concurrent.Promise; +import net.schmizz.concurrent.ErrorDeliveryUtil; import net.schmizz.sshj.AbstractService; import net.schmizz.sshj.common.DisconnectReason; import net.schmizz.sshj.common.ErrorNotifiable; @@ -30,7 +30,6 @@ import net.schmizz.sshj.connection.channel.forwarded.ForwardedChannelOpener; import net.schmizz.sshj.transport.Transport; import net.schmizz.sshj.transport.TransportException; -import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Queue; @@ -50,7 +49,7 @@ public class ConnectionImpl private final Map openers = new ConcurrentHashMap(); - private final Queue> globalReqFutures = new LinkedList>(); + private final Queue> globalReqPromises = new LinkedList>(); private int windowSize = 2048 * 1024; private int maxPacketSize = 32 * 1024; @@ -180,34 +179,34 @@ public class ConnectionImpl } @Override - public Future sendGlobalRequest(String name, boolean wantReply, + public Promise sendGlobalRequest(String name, boolean wantReply, byte[] specifics) throws TransportException { - synchronized (globalReqFutures) { + synchronized (globalReqPromises) { log.info("Making global request for `{}`", name); trans.write(new SSHPacket(Message.GLOBAL_REQUEST).putString(name) .putBoolean(wantReply).putRawBytes(specifics)); - Future future = null; + Promise promise = null; if (wantReply) { - future = new Future("global req for " + name, ConnectionException.chainer); - globalReqFutures.add(future); + promise = new Promise("global req for " + name, ConnectionException.chainer); + globalReqPromises.add(promise); } - return future; + return promise; } } private void gotGlobalReqResponse(SSHPacket response) throws ConnectionException { - synchronized (globalReqFutures) { - Future gr = globalReqFutures.poll(); + synchronized (globalReqPromises) { + Promise gr = globalReqPromises.poll(); if (gr == null) throw new ConnectionException(DisconnectReason.PROTOCOL_ERROR, "Got a global request response when none was requested"); else if (response == null) - gr.error(new ConnectionException("Global request [" + gr + "] failed")); + gr.deliverError(new ConnectionException("Global request [" + gr + "] failed")); else - gr.set(response); + gr.deliver(response); } } @@ -235,9 +234,9 @@ public class ConnectionImpl @Override public void notifyError(SSHException error) { super.notifyError(error); - synchronized (globalReqFutures) { - FutureUtils.alertAll(error, globalReqFutures); - globalReqFutures.clear(); + synchronized (globalReqPromises) { + ErrorDeliveryUtil.alertPromises(error, globalReqPromises); + globalReqPromises.clear(); } ErrorNotifiable.Util.alertAll(error, channels.values()); channels.clear(); diff --git a/src/main/java/net/schmizz/sshj/connection/channel/AbstractChannel.java b/src/main/java/net/schmizz/sshj/connection/channel/AbstractChannel.java index c162128a..f1950692 100644 --- a/src/main/java/net/schmizz/sshj/connection/channel/AbstractChannel.java +++ b/src/main/java/net/schmizz/sshj/connection/channel/AbstractChannel.java @@ -36,7 +36,7 @@ package net.schmizz.sshj.connection.channel; import net.schmizz.concurrent.Event; -import net.schmizz.concurrent.FutureUtils; +import net.schmizz.concurrent.ErrorDeliveryUtil; import net.schmizz.sshj.common.Buffer; import net.schmizz.sshj.common.ByteArrayUtils; import net.schmizz.sshj.common.DisconnectReason; @@ -236,8 +236,8 @@ public abstract class AbstractChannel public void notifyError(SSHException error) { log.debug("Channel #{} got notified of {}", getID(), error.toString()); - FutureUtils.alertAll(error, open, close); - FutureUtils.alertAll(error, chanReqResponseEvents); + ErrorDeliveryUtil.alertEvents(error, open, close); + ErrorDeliveryUtil.alertEvents(error, chanReqResponseEvents); in.notifyError(error); out.notifyError(error); @@ -258,7 +258,7 @@ public abstract class AbstractChannel try { sendClose(); } catch (TransportException e) { - if (!close.hasError()) + if (!close.inError()) throw e; } close.await(conn.getTimeout(), TimeUnit.SECONDS); @@ -372,7 +372,7 @@ public abstract class AbstractChannel if (success) responseEvent.set(); else - responseEvent.error(new ConnectionException("Request failed")); + responseEvent.deliverError(new ConnectionException("Request failed")); } else throw new ConnectionException( DisconnectReason.PROTOCOL_ERROR, diff --git a/src/main/java/net/schmizz/sshj/connection/channel/direct/AbstractDirectChannel.java b/src/main/java/net/schmizz/sshj/connection/channel/direct/AbstractDirectChannel.java index dc0c0e6c..358b2557 100644 --- a/src/main/java/net/schmizz/sshj/connection/channel/direct/AbstractDirectChannel.java +++ b/src/main/java/net/schmizz/sshj/connection/channel/direct/AbstractDirectChannel.java @@ -73,7 +73,7 @@ public abstract class AbstractDirectChannel } private void gotOpenFailure(SSHPacket buf) { - open.error(new OpenFailException(getType(), buf.readInt(), buf.readString())); + open.deliverError(new OpenFailException(getType(), buf.readInt(), buf.readString())); finishOff(); } diff --git a/src/main/java/net/schmizz/sshj/connection/channel/forwarded/RemotePortForwarder.java b/src/main/java/net/schmizz/sshj/connection/channel/forwarded/RemotePortForwarder.java index ebaf9c7a..8a14715d 100644 --- a/src/main/java/net/schmizz/sshj/connection/channel/forwarded/RemotePortForwarder.java +++ b/src/main/java/net/schmizz/sshj/connection/channel/forwarded/RemotePortForwarder.java @@ -196,7 +196,7 @@ public class RemotePortForwarder final byte[] specifics = new Buffer.PlainBuffer().putString(forward.address).putInt(forward.port) .getCompactData(); return conn.sendGlobalRequest(reqName, true, specifics) - .get(conn.getTimeout(), TimeUnit.SECONDS); + .retrieve(conn.getTimeout(), TimeUnit.SECONDS); } /** @return the active forwards. */ diff --git a/src/main/java/net/schmizz/sshj/sftp/PacketReader.java b/src/main/java/net/schmizz/sshj/sftp/PacketReader.java index 9ba95b35..71f7fe3f 100644 --- a/src/main/java/net/schmizz/sshj/sftp/PacketReader.java +++ b/src/main/java/net/schmizz/sshj/sftp/PacketReader.java @@ -15,7 +15,7 @@ */ package net.schmizz.sshj.sftp; -import net.schmizz.concurrent.Future; +import net.schmizz.concurrent.Promise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ public class PacketReader private final Logger log = LoggerFactory.getLogger(getClass()); private final InputStream in; - private final Map> futures = new ConcurrentHashMap>(); + private final Map> promises = new ConcurrentHashMap>(); private final SFTPPacket packet = new SFTPPacket(); private final byte[] lenBuf = new byte[4]; private final SFTPEngine engine; @@ -85,25 +85,25 @@ public class PacketReader handle(); } } catch (IOException e) { - for (Future future : futures.values()) - future.error(e); + for (Promise promise : promises.values()) + promise.deliverError(e); } } public void handle() throws SFTPException { Response resp = new Response(packet, engine.getOperativeProtocolVersion()); - Future future = futures.remove(resp.getRequestID()); + Promise promise = promises.remove(resp.getRequestID()); log.debug("Received {} packet", resp.getType()); - if (future == null) + if (promise == null) throw new SFTPException("Received [" + resp.readType() + "] response for request-id " + resp.getRequestID() + ", no such request was made"); else - future.set(resp); + promise.deliver(resp); } public void expectResponseTo(Request req) { - futures.put(req.getRequestID(), req.getResponseFuture()); + promises.put(req.getRequestID(), req.getResponsePromise()); } } diff --git a/src/main/java/net/schmizz/sshj/sftp/Request.java b/src/main/java/net/schmizz/sshj/sftp/Request.java index 62d7da5e..1b529222 100644 --- a/src/main/java/net/schmizz/sshj/sftp/Request.java +++ b/src/main/java/net/schmizz/sshj/sftp/Request.java @@ -15,20 +15,20 @@ */ package net.schmizz.sshj.sftp; -import net.schmizz.concurrent.Future; +import net.schmizz.concurrent.Promise; public class Request extends SFTPPacket { private final PacketType type; private final long reqID; - private final Future responseFuture; + private final Promise responsePromise; public Request(PacketType type, long reqID) { super(type); this.type = type; this.reqID = reqID; - responseFuture = new Future("sftp / " + reqID, SFTPException.chainer); + responsePromise = new Promise("sftp / " + reqID, SFTPException.chainer); putInt(reqID); } @@ -40,8 +40,8 @@ public class Request return type; } - public Future getResponseFuture() { - return responseFuture; + public Promise getResponsePromise() { + return responsePromise; } @Override diff --git a/src/main/java/net/schmizz/sshj/sftp/SFTPEngine.java b/src/main/java/net/schmizz/sshj/sftp/SFTPEngine.java index 6b34ef7b..8fca0092 100644 --- a/src/main/java/net/schmizz/sshj/sftp/SFTPEngine.java +++ b/src/main/java/net/schmizz/sshj/sftp/SFTPEngine.java @@ -102,7 +102,7 @@ public class SFTPEngine reader.expectResponseTo(req); log.debug("Sending {}", req); transmit(req); - return req.getResponseFuture().get(timeout, TimeUnit.SECONDS); + return req.getResponsePromise().retrieve(timeout, TimeUnit.SECONDS); } public RemoteFile open(String path, Set modes, FileAttributes fa) diff --git a/src/main/java/net/schmizz/sshj/transport/KeyExchanger.java b/src/main/java/net/schmizz/sshj/transport/KeyExchanger.java index 48fccac0..440c1dac 100644 --- a/src/main/java/net/schmizz/sshj/transport/KeyExchanger.java +++ b/src/main/java/net/schmizz/sshj/transport/KeyExchanger.java @@ -36,7 +36,7 @@ package net.schmizz.sshj.transport; import net.schmizz.concurrent.Event; -import net.schmizz.concurrent.FutureUtils; +import net.schmizz.concurrent.ErrorDeliveryUtil; import net.schmizz.sshj.common.Buffer; import net.schmizz.sshj.common.DisconnectReason; import net.schmizz.sshj.common.ErrorNotifiable; @@ -397,7 +397,7 @@ final class KeyExchanger @Override public void notifyError(SSHException error) { log.debug("Got notified of {}", error.toString()); - FutureUtils.alertAll(error, kexInitSent, done); + ErrorDeliveryUtil.alertEvents(error, kexInitSent, done); } } \ No newline at end of file diff --git a/src/main/java/net/schmizz/sshj/transport/TransportImpl.java b/src/main/java/net/schmizz/sshj/transport/TransportImpl.java index 9fcb3a4e..8d851d7f 100644 --- a/src/main/java/net/schmizz/sshj/transport/TransportImpl.java +++ b/src/main/java/net/schmizz/sshj/transport/TransportImpl.java @@ -36,7 +36,7 @@ package net.schmizz.sshj.transport; import net.schmizz.concurrent.Event; -import net.schmizz.concurrent.FutureUtils; +import net.schmizz.concurrent.ErrorDeliveryUtil; import net.schmizz.sshj.AbstractService; import net.schmizz.sshj.Config; import net.schmizz.sshj.Service; @@ -562,7 +562,7 @@ public final class TransportImpl disconnectListener.notifyDisconnect(causeOfDeath.getDisconnectReason()); - FutureUtils.alertAll(causeOfDeath, close, serviceAccept); + ErrorDeliveryUtil.alertEvents(causeOfDeath, close, serviceAccept); kexer.notifyError(causeOfDeath); getService().notifyError(causeOfDeath); setService(nullService); diff --git a/src/main/java/net/schmizz/sshj/userauth/UserAuthImpl.java b/src/main/java/net/schmizz/sshj/userauth/UserAuthImpl.java index ec43a66e..577802db 100644 --- a/src/main/java/net/schmizz/sshj/userauth/UserAuthImpl.java +++ b/src/main/java/net/schmizz/sshj/userauth/UserAuthImpl.java @@ -15,7 +15,7 @@ */ package net.schmizz.sshj.userauth; -import net.schmizz.concurrent.Event; +import net.schmizz.concurrent.Promise; import net.schmizz.sshj.AbstractService; import net.schmizz.sshj.Service; import net.schmizz.sshj.common.DisconnectReason; @@ -42,7 +42,8 @@ public class UserAuthImpl private final Deque savedEx = new ArrayDeque(); - private final Event result = new Event("userauth result", UserAuthException.chainer); + private final Promise result + = new Promise("userauth result", UserAuthException.chainer); private String username; private AuthMethod currentMethod; @@ -172,7 +173,7 @@ public class UserAuthImpl @Override public void notifyError(SSHException error) { super.notifyError(error); - result.error(error); + result.deliverError(error); } private void clearState() { @@ -194,14 +195,14 @@ public class UserAuthImpl currentMethod.request(); else { saveException(currentMethod.getName() + " auth failed"); - result.set(false); + result.deliver(false); } } private void gotSuccess() { trans.setAuthenticated(); // So it can put delayed compression into force if applicable trans.setService(nextService); // We aren't in charge anymore, next service is - result.set(true); + result.deliver(true); } private void gotUnknown(Message msg, SSHPacket buf) @@ -215,7 +216,7 @@ public class UserAuthImpl try { currentMethod.handle(msg, buf); } catch (UserAuthException e) { - result.error(e); + result.deliverError(e); } } @@ -234,7 +235,7 @@ public class UserAuthImpl result.clear(); meth.init(this); meth.request(); - return result.get(timeout, TimeUnit.SECONDS); + return result.retrieve(timeout, TimeUnit.SECONDS); } }