java.io.IOExceptionjava.util.concurrent.atomic.AtomicBoolean



Project 0xdata/h2o in file ...f0a1d02386331db17b6ef/src.main.java.water.RPC.java (2013-08-28)
@@ -1,8 +1,9 @@
 package water;
 
-import java.io.IOException;
+import java.io.*;
 import java.util.ArrayList;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import jsr166y.CountedCompleter;
@@ -251,6 +252,7 @@ public class RPC<V extends DTask> implements Future<V>, Delayed, ForkJoinPool.Ma
     long _started;              // Retry fields for the ackack
     long _retry;
     volatile boolean _computed; // One time transition from false to true
+    transient AtomicBoolean _firstException = new AtomicBoolean(false);
     // To help with asserts, record the size of the sent DTask - if we resend
     // if should remain the same size.
     int _size;
@@ -292,8 +294,12 @@ public class RPC<V extends DTask> implements Future<V>, Delayed, ForkJoinPool.Ma
         Log.info("Done  remote task#"+_tsknum+" "+dt.getClass()+" to "+_client);
       _client.record_task_answer(this); // Setup for retrying Ack & AckAck
     }
+    // exception occured when processing this task locally, set exception and send it back to the caller
     @Override public boolean onExceptionalCompletion( Throwable ex, CountedCompleter caller ) {
-      System.out.println("RPC sees an exceptional completion");
+      if(!_firstException.getAndSet(true)){
+        _dt.setException(ex);
+        onCompletion(caller);
+      }
       return true;
     }
     // Re-send strictly the ack, because we're missing an AckAck
@@ -457,11 +463,20 @@ public class RPC<V extends DTask> implements Future<V>, Delayed, ForkJoinPool.Ma
       _done = true;             // Only read one (of many) response packets
       ab._h2o.taskRemove(_tasknum); // Flag as task-completed, even if the result is null
       notifyAll();              // And notify in any case
+      final Exception e = _dt.getDException();
       // Also notify any and all pending completion-style tasks
       if( _fjtasks != null )
         for( final H2OCountedCompleter task : _fjtasks )
           H2O.submitTask(new H2OCountedCompleter() {
-              @Override public void compute2() { task.tryComplete(); }
+              @Override public void compute2() {
+                if(e != null) // re-throw exception on this side as if it happened locally
+                  task.completeExceptionally(e);
+                else try {
+                  task.tryComplete();
+                } catch(Throwable e){
+                  task.completeExceptionally(e);
+                }
+              }
               @Override public byte priority() { return task.priority(); }
             });
     }
Project SpringSource/spring-framework in file ...ing.simp.stomp.StompBrokerRelayMessageHandler.java (2014-04-03)
@@ -16,11 +16,11 @@
 
 package org.springframework.messaging.simp.stomp;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
@@ -607,6 +607,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 		}
 
 		@Override
+		public void handleFailure(Throwable ex) {
+			if (this.tcpConnection == null) {
+				return;
+			}
+			handleTcpConnectionFailure("Closing connection after TCP failure", ex);
+		}
+
+		@Override
 		public void afterConnectionClosed() {
 			if (this.tcpConnection == null) {
 				return;
@@ -629,21 +637,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 			}
 		}
 
+		/**
+		 * Forward the given message to the STOMP broker.
+		 *
+		 * <p>The method checks whether we have an active TCP connection and have
+		 * received the STOMP CONNECTED frame. For client messages this should be
+		 * false only if we lose the TCP connection around the same time when a
+		 * client message is being forwarded, so we simply log the ignored message
+		 * at trace level. For messages from within the application being sent on
+		 * the "system" connection an exception is raised so that components sending
+		 * the message have a chance to handle it -- by default the broker message
+		 * channel is synchronous.
+		 *
+		 * <p>Note that if messages arrive concurrently around the same time a TCP
+		 * connection is lost, there is a brief period of time before the connection
+		 * is reset when one or more messages may sneak through and an attempt made
+		 * to forward them. Rather than synchronizing to guard against that, this
+		 * method simply lets them try and fail. For client sessions that may
+		 * result in an additional STOMP ERROR frame(s) being sent downstream but
+		 * code handling that downstream should be idempotent in such cases.
+		 *
+		 * @param message the message to send, never {@code null}
+		 * @return a future to wait for the result
+		 */
+		@SuppressWarnings("unchecked")
 		public ListenableFuture<Void> forward(final Message<?> message) {
 
+			TcpConnection<byte[]> conn = this.tcpConnection;
+
 			if (!this.isStompConnected) {
 				if (this.isRemoteClientSession) {
-					if (StompCommand.DISCONNECT.equals(StompHeaderAccessor.wrap(message).getCommand())) {
-						return EMPTY_TASK;
+					if (logger.isTraceEnabled()) {
+						logger.trace("Ignoring client message received " + message +
+								(conn != null ? "before CONNECTED frame" : "after TCP connection closed"));
 					}
-					// Should never happen
-					throw new IllegalStateException("Unexpected client message " + message +
-							(this.tcpConnection != null ?
-									"before STOMP CONNECTED frame" : "after TCP connection closed"));
+					return EMPTY_TASK;
 				}
 				else {
 					throw new IllegalStateException("Cannot forward messages on system connection " +
-							(this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") +
+							(conn != null ? "before STOMP CONNECTED frame" : "while inactive") +
 							". Try listening for BrokerAvailabilityEvent ApplicationContext events.");
 
 				}
@@ -659,8 +691,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 				}
 			}
 
-			@SuppressWarnings("unchecked")
-			ListenableFuture<Void> future = this.tcpConnection.send((Message<byte[]>) message);
+			ListenableFuture<Void> future = conn.send((Message<byte[]>) message);
 
 			future.addCallback(new ListenableFutureCallback<Void>() {
 				@Override
@@ -672,7 +703,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 				}
 				@Override
 				public void onFailure(Throwable t) {
-					handleTcpConnectionFailure("Failed to send message " + message, t);
+					if (tcpConnection == null) {
+						// already reset
+					}
+					else {
+						handleTcpConnectionFailure("Failed to send message " + message, t);
+					}
 				}
 			});
 
Project h2oai/h2o in file ...f0a1d02386331db17b6ef/src.main.java.water.RPC.java (2013-08-28)
@@ -1,8 +1,9 @@
 package water;
 
-import java.io.IOException;
+import java.io.*;
 import java.util.ArrayList;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import jsr166y.CountedCompleter;
@@ -251,6 +252,7 @@ public class RPC<V extends DTask> implements Future<V>, Delayed, ForkJoinPool.Ma
     long _started;              // Retry fields for the ackack
     long _retry;
     volatile boolean _computed; // One time transition from false to true
+    transient AtomicBoolean _firstException = new AtomicBoolean(false);
     // To help with asserts, record the size of the sent DTask - if we resend
     // if should remain the same size.
     int _size;
@@ -292,8 +294,12 @@ public class RPC<V extends DTask> implements Future<V>, Delayed, ForkJoinPool.Ma
         Log.info("Done  remote task#"+_tsknum+" "+dt.getClass()+" to "+_client);
       _client.record_task_answer(this); // Setup for retrying Ack & AckAck
     }
+    // exception occured when processing this task locally, set exception and send it back to the caller
     @Override public boolean onExceptionalCompletion( Throwable ex, CountedCompleter caller ) {
-      System.out.println("RPC sees an exceptional completion");
+      if(!_firstException.getAndSet(true)){
+        _dt.setException(ex);
+        onCompletion(caller);
+      }
       return true;
     }
     // Re-send strictly the ack, because we're missing an AckAck
@@ -457,11 +463,20 @@ public class RPC<V extends DTask> implements Future<V>, Delayed, ForkJoinPool.Ma
       _done = true;             // Only read one (of many) response packets
       ab._h2o.taskRemove(_tasknum); // Flag as task-completed, even if the result is null
       notifyAll();              // And notify in any case
+      final Exception e = _dt.getDException();
       // Also notify any and all pending completion-style tasks
       if( _fjtasks != null )
         for( final H2OCountedCompleter task : _fjtasks )
           H2O.submitTask(new H2OCountedCompleter() {
-              @Override public void compute2() { task.tryComplete(); }
+              @Override public void compute2() {
+                if(e != null) // re-throw exception on this side as if it happened locally
+                  task.completeExceptionally(e);
+                else try {
+                  task.tryComplete();
+                } catch(Throwable e){
+                  task.completeExceptionally(e);
+                }
+              }
               @Override public byte priority() { return task.priority(); }
             });
     }
Project netty/netty in file ...rc.main.java.io.netty.channel.AbstractChannel.java (2014-08-11)
@@ -24,12 +24,12 @@ import io.netty.util.internal.PlatformDependent;
 import io.netty.util.internal.logging.InternalLogger;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.NotYetConnectedException;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A skeletal {@link Channel} implementation.
@@ -59,7 +59,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
 
     private volatile SocketAddress localAddress;
     private volatile SocketAddress remoteAddress;
-    private volatile EventLoop eventLoop;
+    private volatile PausableChannelEventLoop eventLoop;
     private volatile boolean registered;
 
     /** Cache for the string representation of this channel */
@@ -119,7 +119,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
     }
 
     @Override
-    public EventLoop eventLoop() {
+    public final EventLoop eventLoop() {
         EventLoop eventLoop = this.eventLoop;
         if (eventLoop == null) {
             throw new IllegalStateException("channel not registered to an event loop");
@@ -198,6 +198,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
 
     @Override
     public ChannelFuture deregister() {
+        /**
+         * One problem of channel deregistration is that after a channel has been deregistered
+         * there may still be tasks, created from within one of the channel's ChannelHandlers,
+         * in the {@link EventLoop}'s task queue. That way, an unfortunate twist of events could lead
+         * to tasks still being in the old {@link EventLoop}'s queue even after the channel has been
+         * registered with a new {@link EventLoop}. This would lead to the tasks being executed by two
+         * different {@link EventLoop}s.
+         *
+         * Our solution to this problem is to always perform the actual deregistration of
+         * the channel as a task and to reject any submission of new tasks, from within
+         * one of the channel's ChannelHandlers, until the channel is registered with
+         * another {@link EventLoop}. That way we can be sure that there are no more tasks regarding
+         * that particular channel after it has been deregistered (because the deregistration
+         * task is the last one.).
+         *
+         * This only works for one time tasks. To see how we handle periodic/delayed tasks have a look
+         * at {@link io.netty.util.concurrent.ScheduledFutureTask#run()}.
+         *
+         * Also see {@link HeadContext#deregister(ChannelHandlerContext, ChannelPromise)}.
+         */
+        eventLoop.rejectNewTasks();
         return pipeline.deregister();
     }
 
@@ -234,6 +255,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
 
     @Override
     public ChannelFuture deregister(ChannelPromise promise) {
+        eventLoop.rejectNewTasks();
         return pipeline.deregister(promise);
     }
 
@@ -401,7 +423,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
 
         @Override
         public final ChannelHandlerInvoker invoker() {
-            return eventLoop().asInvoker();
+            // return the unwrapped invoker.
+            return ((PausableChannelEventLoop) eventLoop().asInvoker()).unwrapInvoker();
         }
 
         @Override
@@ -424,6 +447,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
             if (eventLoop == null) {
                 throw new NullPointerException("eventLoop");
             }
+            if (promise == null) {
+                throw new NullPointerException("promise");
+            }
             if (isRegistered()) {
                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
                 return;
@@ -434,7 +460,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
                 return;
             }
 
-            AbstractChannel.this.eventLoop = eventLoop;
+            // It's necessary to reuse the wrapped eventloop object. Otherwise the user will end up with multiple
+            // objects that do not share a common state.
+            if (AbstractChannel.this.eventLoop == null) {
+                AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop);
+            } else {
+                AbstractChannel.this.eventLoop.unwrapped = eventLoop;
+            }
 
             if (eventLoop.inEventLoop()) {
                 register0(promise);
@@ -466,6 +498,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
                 }
                 doRegister();
                 registered = true;
+                AbstractChannel.this.eventLoop.acceptNewTasks();
                 safeSetSuccess(promise);
                 pipeline.fireChannelRegistered();
                 if (isActive()) {
@@ -587,17 +620,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
                 outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
                 outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
             } finally {
-
                 if (wasActive && !isActive()) {
                     invokeLater(new OneTimeTask() {
                         @Override
                         public void run() {
                             pipeline.fireChannelInactive();
+                            deregister(voidPromise());
+                        }
+                    });
+                } else {
+                    invokeLater(new OneTimeTask() {
+                        @Override
+                        public void run() {
+                            deregister(voidPromise());
                         }
                     });
                 }
-
-                deregister(voidPromise());
             }
         }
 
@@ -610,6 +648,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
             }
         }
 
+        /**
+         * This method must NEVER be called directly, but be executed as an
+         * extra task with a clean call stack instead. The reason for this
+         * is that this method calls {@link ChannelPipeline#fireChannelUnregistered()}
+         * directly, which might lead to an unfortunate nesting of independent inbound/outbound
+         * events. See the comments in {@link #invokeLater(Runnable)} for more details.
+         */
         @Override
         public final void deregister(final ChannelPromise promise) {
             if (!promise.setUncancellable()) {
@@ -624,17 +669,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
             try {
                 doDeregister();
             } catch (Throwable t) {
+                safeSetFailure(promise, t);
                 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
             } finally {
                 if (registered) {
                     registered = false;
-                    invokeLater(new OneTimeTask() {
-                        @Override
-                        public void run() {
-                            pipeline.fireChannelUnregistered();
-                        }
-                    });
                     safeSetSuccess(promise);
+                    pipeline.fireChannelUnregistered();
                 } else {
                     // Some transports like local and AIO does not allow the deregistration of
                     // an open channel.  Their doDeregister() calls close().  Consequently,
@@ -792,7 +833,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
                 //
                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
-                eventLoop().execute(task);
+                eventLoop().unwrap().execute(task);
             } catch (RejectedExecutionException e) {
                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
             }
@@ -895,4 +936,70 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
             return super.trySuccess();
         }
     }
+
+    private final class PausableChannelEventLoop
+            extends PausableChannelEventExecutor implements EventLoop {
+
+        volatile boolean isAcceptingNewTasks = true;
+        volatile EventLoop unwrapped;
+
+        PausableChannelEventLoop(EventLoop unwrapped) {
+            this.unwrapped = unwrapped;
+        }
+
+        @Override
+        public void rejectNewTasks() {
+            isAcceptingNewTasks = false;
+        }
+
+        @Override
+        public void acceptNewTasks() {
+            isAcceptingNewTasks = true;
+        }
+
+        @Override
+        public boolean isAcceptingNewTasks() {
+            return isAcceptingNewTasks;
+        }
+
+        @Override
+        public EventLoopGroup parent() {
+            return unwrap().parent();
+        }
+
+        @Override
+        public EventLoop next() {
+            return unwrap().next();
+        }
+
+        @Override
+        public EventLoop unwrap() {
+            return unwrapped;
+        }
+
+        @Override
+        public ChannelHandlerInvoker asInvoker() {
+            return this;
+        }
+
+        @Override
+        public ChannelFuture register(Channel channel) {
+            return unwrap().register(channel);
+        }
+
+        @Override
+        public ChannelFuture register(Channel channel, ChannelPromise promise) {
+            return unwrap().register(channel, promise);
+        }
+
+        @Override
+        Channel channel() {
+            return AbstractChannel.this;
+        }
+
+        @Override
+        ChannelHandlerInvoker unwrapInvoker() {
+            return unwrapped.asInvoker();
+        }
+    }
 }
Project spring-projects/spring-framework in file ...ing.simp.stomp.StompBrokerRelayMessageHandler.java (2014-04-03)
@@ -16,11 +16,11 @@
 
 package org.springframework.messaging.simp.stomp;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
@@ -607,6 +607,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 		}
 
 		@Override
+		public void handleFailure(Throwable ex) {
+			if (this.tcpConnection == null) {
+				return;
+			}
+			handleTcpConnectionFailure("Closing connection after TCP failure", ex);
+		}
+
+		@Override
 		public void afterConnectionClosed() {
 			if (this.tcpConnection == null) {
 				return;
@@ -629,21 +637,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 			}
 		}
 
+		/**
+		 * Forward the given message to the STOMP broker.
+		 *
+		 * <p>The method checks whether we have an active TCP connection and have
+		 * received the STOMP CONNECTED frame. For client messages this should be
+		 * false only if we lose the TCP connection around the same time when a
+		 * client message is being forwarded, so we simply log the ignored message
+		 * at trace level. For messages from within the application being sent on
+		 * the "system" connection an exception is raised so that components sending
+		 * the message have a chance to handle it -- by default the broker message
+		 * channel is synchronous.
+		 *
+		 * <p>Note that if messages arrive concurrently around the same time a TCP
+		 * connection is lost, there is a brief period of time before the connection
+		 * is reset when one or more messages may sneak through and an attempt made
+		 * to forward them. Rather than synchronizing to guard against that, this
+		 * method simply lets them try and fail. For client sessions that may
+		 * result in an additional STOMP ERROR frame(s) being sent downstream but
+		 * code handling that downstream should be idempotent in such cases.
+		 *
+		 * @param message the message to send, never {@code null}
+		 * @return a future to wait for the result
+		 */
+		@SuppressWarnings("unchecked")
 		public ListenableFuture<Void> forward(final Message<?> message) {
 
+			TcpConnection<byte[]> conn = this.tcpConnection;
+
 			if (!this.isStompConnected) {
 				if (this.isRemoteClientSession) {
-					if (StompCommand.DISCONNECT.equals(StompHeaderAccessor.wrap(message).getCommand())) {
-						return EMPTY_TASK;
+					if (logger.isTraceEnabled()) {
+						logger.trace("Ignoring client message received " + message +
+								(conn != null ? "before CONNECTED frame" : "after TCP connection closed"));
 					}
-					// Should never happen
-					throw new IllegalStateException("Unexpected client message " + message +
-							(this.tcpConnection != null ?
-									"before STOMP CONNECTED frame" : "after TCP connection closed"));
+					return EMPTY_TASK;
 				}
 				else {
 					throw new IllegalStateException("Cannot forward messages on system connection " +
-							(this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") +
+							(conn != null ? "before STOMP CONNECTED frame" : "while inactive") +
 							". Try listening for BrokerAvailabilityEvent ApplicationContext events.");
 
 				}
@@ -659,8 +691,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 				}
 			}
 
-			@SuppressWarnings("unchecked")
-			ListenableFuture<Void> future = this.tcpConnection.send((Message<byte[]>) message);
+			ListenableFuture<Void> future = conn.send((Message<byte[]>) message);
 
 			future.addCallback(new ListenableFutureCallback<Void>() {
 				@Override
@@ -672,7 +703,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
 				}
 				@Override
 				public void onFailure(Throwable t) {
-					handleTcpConnectionFailure("Failed to send message " + message, t);
+					if (tcpConnection == null) {
+						// already reset
+					}
+					else {
+						handleTcpConnectionFailure("Failed to send message " + message, t);
+					}
 				}
 			});