Discussion:
mina-sshd git commit: [SSHD-849] Using a ClientChannelPendingMessagesQueue to regulate messages order while forwarding channel is being set up
l***@apache.org
2018-10-30 11:30:25 UTC
Permalink
Repository: mina-sshd
Updated Branches:
refs/heads/master 0241783b1 -> 515628547


[SSHD-849] Using a ClientChannelPendingMessagesQueue to regulate messages order while forwarding channel is being set up


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/51562854
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/51562854
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/51562854

Branch: refs/heads/master
Commit: 5156285473fe7b42577e747c9c8f458f98fa73ca
Parents: 0241783
Author: Lyor Goldstein <***@apache.org>
Authored: Thu Oct 18 17:34:53 2018 +0300
Committer: Lyor Goldstein <***@apache.org>
Committed: Tue Oct 30 13:35:55 2018 +0200

----------------------------------------------------------------------
.../client/channel/ClientChannelHolder.java | 41 +++
.../ClientChannelPendingMessagesQueue.java | 265 +++++++++++++++++++
.../sshd/client/subsystem/SubsystemClient.java | 12 +-
.../common/forward/DefaultForwardingFilter.java | 67 ++---
.../sshd/common/forward/TcpipClientChannel.java | 32 ++-
5 files changed, 374 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/51562854/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelHolder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelHolder.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelHolder.java
new file mode 100644
index 0000000..d639ff3
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelHolder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.client.channel;
+
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelHolder;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:***@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@FunctionalInterface
+public interface ClientChannelHolder extends ChannelHolder {
+ @Override
+ default Channel getChannel() {
+ return getClientChannel();
+ }
+
+ /**
+ * @return The underlying {@link ClientChannel} used
+ */
+ ClientChannel getClientChannel();
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/51562854/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelPendingMessagesQueue.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelPendingMessagesQueue.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelPendingMessagesQueue.java
new file mode 100644
index 0000000..e6ab058
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ClientChannelPendingMessagesQueue.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.client.channel;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channel;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.apache.sshd.client.future.DefaultOpenFuture;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.logging.AbstractLoggingBean;
+
+/**
+ * This is a specialized {@link SshFutureListener} that is used to enqueue data
+ * that is sent while the channel is being set-up, so that when it is established
+ * it will send them in the same order as they have been received.
+ *
+ * It also serves as a &quot;backstop&quot; in case session is closed (normally)
+ * while the packets as still being written.
+ *
+ * @author <a href="mailto:***@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class ClientChannelPendingMessagesQueue
+ extends AbstractLoggingBean
+ implements SshFutureListener<OpenFuture>, Channel, ClientChannelHolder {
+ protected final Deque<Map.Entry<Buffer, Consumer<? super Throwable>>> pendingQueue = new LinkedList<>();
+ protected final DefaultOpenFuture completedFuture;
+
+ private final ClientChannel clientChannel;
+ private final AtomicBoolean open = new AtomicBoolean(true);
+
+ public ClientChannelPendingMessagesQueue(ClientChannel channel) {
+ this.clientChannel = Objects.requireNonNull(channel, "No channel provided");
+ this.completedFuture = new DefaultOpenFuture(getClass().getSimpleName() + "[" + channel + "]", null);
+ }
+
+ @Override
+ public ClientChannel getClientChannel() {
+ return clientChannel;
+ }
+
+ /**
+ * @return An internal {@link OpenFuture} that can be used to wait
+ * for all internal pending messages to be flushed before actually
+ * signaling that operation is complete
+ */
+ public OpenFuture getCompletedFuture() {
+ return completedFuture;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ markClosed();
+
+ // NOTE: do not close the channel here - it may need to remain open for other purposes
+ int numPending = clearPendingQueue();
+ if (log.isDebugEnabled()) {
+ log.debug("close({}) cleared {} pending messages", this, numPending);
+ }
+ }
+
+ /**
+ * Marks the queue as closed
+ *
+ * @return {@code true} if was open and now is closed
+ */
+ protected boolean markClosed() {
+ OpenFuture f = getCompletedFuture();
+ if (!f.isDone()) {
+ f.setException(new CancellationException("Cancelled"));
+ }
+ return open.getAndSet(false);
+ }
+
+ /**
+ * Checks if the future is already open and manages the message handling accordingly:
+ * <ul>
+ * <p><li>
+ * If channel is not open yet, it enqueues the request
+ * </li></p>
+ *
+ * <p><li>
+ * If channel is open but there are still pending messages not yet written
+ * out, it will wait for them to be written (or exception signaled) before
+ * proceeding to write out the incoming message.
+ * </li></p>
+ *
+ * <p><li>
+ * Otherwise (i.e., channel is open and no pending messages yet) it will
+ * write the message to the underlying channel immediately.
+ * </li></p>
+ * </ul>
+ * @param buffer The message {@link Buffer}
+ * @param errHandler The error handler to invoke it had to enqueue the
+ * message and was unsuccessful in writing it. Must be non-{@code null} if future not open yet.
+ * Otherwise, if {@code null} and exception occurs it will be simple re-thrown
+ * @return The total number of still pending messages - zero if none and
+ * message was written (either immediately or after waiting for the
+ * pending ones to be written).
+ * @throws IOException If wrote the message directly, encountered an error and
+ * no handler was provided.
+ */
+ public int handleIncomingMessage(Buffer buffer, Consumer<? super Throwable> errHandler) throws IOException {
+ if (!isOpen()) {
+ throw new EOFException("Queue is closed");
+ }
+
+ Objects.requireNonNull(buffer, "No message to enqueue");
+ OpenFuture future = getCompletedFuture();
+ synchronized (pendingQueue) {
+ boolean enqueue = !future.isDone();
+ if (enqueue) {
+ Objects.requireNonNull(errHandler, "No pending message error handler provided");
+ }
+
+ if (enqueue) {
+ pendingQueue.add(new SimpleImmutableEntry<>(buffer, errHandler));
+ } else {
+ writeMessage(buffer, errHandler);
+ }
+
+ return pendingQueue.size();
+ }
+ }
+
+ protected void writeMessage(Buffer buffer, Consumer<? super IOException> errHandler) throws IOException {
+ ClientChannel channel = getClientChannel();
+ try {
+ if (!isOpen()) {
+ throw new EOFException("Queue is marked as closed");
+ }
+
+ OutputStream outputStream = channel.getInvertedIn();
+ outputStream.write(buffer.array(), buffer.rpos(), buffer.available());
+ outputStream.flush();
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("writeMessage({}) failed ({}) to output message: {}",
+ this, e.getClass().getSimpleName(), e.getMessage());
+ }
+ if (errHandler != null) {
+ errHandler.accept(e);
+ }
+
+ markCompletionException(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void operationComplete(OpenFuture future) {
+ Throwable err = future.getException();
+ if (err != null) {
+ markCompletionException(err);
+
+ if (markClosed()) {
+ log.warn("operationComplete({}) {}[{}] signaled",
+ this, err.getClass().getSimpleName(), err.getMessage());
+ } else {
+ log.warn("operationComplete({}) got {}[{}] signal while queue is closed",
+ this, err.getClass().getSimpleName(), err.getMessage());
+ }
+
+ clearPendingQueue();
+ } else {
+ flushPendingQueue();
+ }
+ }
+
+ protected void flushPendingQueue() {
+ int numSent = 0;
+ try {
+ boolean debugEnabled = log.isDebugEnabled();
+ if (debugEnabled) {
+ log.debug("flushPendingQueue({}) start sending pending messages", this);
+ }
+
+ synchronized (pendingQueue) {
+ for (; !pendingQueue.isEmpty(); numSent++) {
+ Map.Entry<Buffer, Consumer<? super Throwable>> msgEntry = pendingQueue.removeFirst();
+ writeMessage(msgEntry.getKey(), msgEntry.getValue());
+ }
+
+ markCompletionSuccessful();
+ }
+
+ if (debugEnabled) {
+ log.debug("flushPendingQueue({}) sent {} pending messages", this, numSent);
+ }
+ } catch (IOException e) {
+ markCompletionException(e);
+
+ boolean closed = markClosed();
+ int numPending = clearPendingQueue();
+ log.warn("flushPendingQueue({}) Failed ({}) after {} successfully sent messages (pending={}, markClosed={}): {}",
+ this, e.getClass().getSimpleName(), numSent, numPending, closed, e.getMessage());
+ }
+ }
+
+ protected OpenFuture markCompletionSuccessful() {
+ OpenFuture f = getCompletedFuture();
+ f.setOpened();
+ return f;
+ }
+
+ protected OpenFuture markCompletionException(Throwable err) {
+ OpenFuture f = getCompletedFuture();
+ f.setException(err);
+ return f;
+ }
+
+ protected int clearPendingQueue() {
+ int numEntries;
+ synchronized (pendingQueue) {
+ numEntries = pendingQueue.size();
+ if (numEntries > 0) {
+ pendingQueue.clear();
+ }
+ pendingQueue.notifyAll(); // in case anyone waiting
+ }
+
+ return numEntries;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName()
+ + "[channel=" + getClientChannel()
+ + ", open=" + isOpen()
+ + "]";
+ }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/51562854/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
index cfc7970..2cf7399 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
@@ -21,11 +21,10 @@ package org.apache.sshd.client.subsystem;

import java.nio.channels.Channel;

-import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.channel.ClientChannelHolder;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.session.ClientSessionHolder;
import org.apache.sshd.common.NamedResource;
-import org.apache.sshd.common.channel.ChannelHolder;
import org.apache.sshd.common.session.SessionHolder;

/**
@@ -35,10 +34,7 @@ public interface SubsystemClient
extends SessionHolder<ClientSession>,
ClientSessionHolder,
NamedResource,
- ChannelHolder,
- Channel {
- /**
- * @return The underlying {@link ClientChannel} used
- */
- ClientChannel getClientChannel();
+ Channel,
+ ClientChannelHolder {
+ // nothing extra
}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/51562854/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
index 2644751..04c05bd 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultForwardingFilter.java
@@ -19,7 +19,6 @@
package org.apache.sshd.common.forward;

import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.AbstractMap.SimpleImmutableEntry;
@@ -38,9 +37,11 @@ import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.channel.ClientChannelPendingMessagesQueue;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.Factory;
@@ -85,7 +86,7 @@ public class DefaultForwardingFilter
public static final String FORWARD_REQUEST_TIMEOUT = "tcpip-forward-request-timeout";

/**
- * Default value for {@link #FORWARD_REQUEST_TIMEOUT} if none specified
+ * Default value for {@value #FORWARD_REQUEST_TIMEOUT} if none specified
*/
public static final long DEFAULT_FORWARD_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);

@@ -1047,19 +1048,25 @@ public class DefaultForwardingFilter
channel.close(true);
} else {
/*
- * Make sure channel is fully open in case the client was very fast
+ * Make sure channel is pending messages have all been sent in case the client was very fast
* and sent data + closed the connection before channel open was completed.
*/
OpenFuture openFuture = channel.getOpenFuture();
- // If channel is established then listener is invoked immediately
- openFuture.addListener(f -> {
- Throwable err = f.getException();
- if (err != null) {
- log.warn("sessionClosed({}) closing incompletely open channel={} after {} messages due to {}[{}]",
- session, channel, messagesCounter, err.getClass().getSimpleName(), err.getMessage());
- }
- channel.close(err != null);
- });
+ Throwable err = openFuture.getException();
+ ClientChannelPendingMessagesQueue queue = channel.getPendingMessagesQueue();
+ OpenFuture completedFuture = queue.getCompletedFuture();
+ if (err == null) {
+ err = completedFuture.getException();
+ }
+ boolean immediately = err != null;
+ if (immediately) {
+ channel.close(true);
+ } else {
+ completedFuture.addListener(f -> {
+ Throwable thrown = f.getException();
+ channel.close(immediately || (thrown != null));
+ });
+ }
}
}

@@ -1072,30 +1079,24 @@ public class DefaultForwardingFilter

if (traceEnabled) {
log.trace("messageReceived({}) channel={}, count={}, handle len={}",
- session, channel, totalMessages, message.available());
+ session, channel, totalMessages, message.available());
}

OpenFuture future = channel.getOpenFuture();
- if (future.isOpened()) {
- OutputStream outputStream = channel.getInvertedIn();
- outputStream.write(buffer.array(), buffer.rpos(), buffer.available());
- outputStream.flush();
- } else {
- future.addListener(f -> {
- try {
- OutputStream outputStream = channel.getInvertedIn();
- outputStream.write(buffer.array(), buffer.rpos(), buffer.available());
- outputStream.flush();
- } catch (IOException e) {
- try {
- exceptionCaught(session, e);
- } catch (Exception err) {
- log.warn("messageReceived({}) failed ({}) to signal {}[{}] on channel={}: {}",
- session, err.getClass().getSimpleName(), e.getClass().getSimpleName(),
- e.getMessage(), channel, err.getMessage());
- }
- }
- });
+ Consumer<Throwable> errHandler = future.isOpened() ? null : e -> {
+ try {
+ exceptionCaught(session, e);
+ } catch (Exception err) {
+ log.warn("messageReceived({}) failed ({}) to signal {}[{}] on channel={}: {}",
+ session, err.getClass().getSimpleName(), e.getClass().getSimpleName(),
+ e.getMessage(), channel, err.getMessage());
+ }
+ };
+ ClientChannelPendingMessagesQueue messagesQueue = channel.getPendingMessagesQueue();
+ int pendCount = messagesQueue.handleIncomingMessage(buffer, errHandler);
+ if (traceEnabled) {
+ log.trace("messageReceived({}) channel={} pend count={} after processing message",
+ session, channel, pendCount);
}
}


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/51562854/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index 82d6d2f..ecccfc9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.Set;

import org.apache.sshd.client.channel.AbstractClientChannel;
+import org.apache.sshd.client.channel.ClientChannelPendingMessagesQueue;
import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Closeable;
@@ -40,6 +41,7 @@ import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.IoUtils;
import org.apache.sshd.common.util.net.SshdSocketAddress;

/**
@@ -74,6 +76,7 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward
private final Type typeEnum;
private final IoSession serverSession;
private final SshdSocketAddress remote;
+ private final ClientChannelPendingMessagesQueue messagesQueue;
private SshdSocketAddress tunnelEntrance;
private SshdSocketAddress tunnelExit;

@@ -82,6 +85,7 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward
this.typeEnum = type;
this.serverSession = Objects.requireNonNull(serverSession, "No server session provided");
this.remote = remote;
+ this.messagesQueue = new ClientChannelPendingMessagesQueue(this);
}

public OpenFuture getOpenFuture() {
@@ -92,6 +96,10 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward
return typeEnum;
}

+ public ClientChannelPendingMessagesQueue getPendingMessagesQueue() {
+ return messagesQueue;
+ }
+
@Override
public synchronized OpenFuture open() throws IOException {
InetSocketAddress src;
@@ -120,7 +128,9 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward
throw new SshException("Session has been closed");
}

- openFuture = new DefaultOpenFuture(src, lock);
+ // make sure the pending messages queue is 1st in line
+ openFuture = new DefaultOpenFuture(src, lock)
+ .addListener(getPendingMessagesQueue());
if (log.isDebugEnabled()) {
log.debug("open({}) send SSH_MSG_CHANNEL_OPEN", this);
}
@@ -156,8 +166,26 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward
}

@Override
+ protected void preClose() {
+ IOException err = IoUtils.closeQuietly(getPendingMessagesQueue());
+ if (err != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("preClose({}) Failed ({}) to close pending messages queue: {}",
+ this, err.getClass().getSimpleName(), err.getMessage());
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("preClose(" + this + ") pending messages queue close failure details", err);
+ }
+ }
+
+ super.preClose();
+ }
+
+ @Override
protected Closeable getInnerCloseable() {
- return builder().sequential(serverSession, super.getInnerCloseable()).build();
+ return builder()
+ .sequential(serverSession, super.getInnerCloseable())
+ .build();
}

@Override

Loading...