summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java24
3 files changed, 28 insertions, 72 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
deleted file mode 100644
index 6596da1f8f..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.framing.AMQMethodBody;
-
-/**
- * Interface that allows classes to register for interest in protocol method frames.
- *
- */
-public interface AMQMethodListener
-{
- /**
- * Invoked when a method frame has been received
- * @param evt the event that contains the method and channel
- * @param protocolSession the protocol session associated with the event
- * @return true if the handler has processed the method frame, false otherwise. Note
- * that this does not prohibit the method event being delivered to subsequent listeners
- * but can be used to determine if nobody has dealt with an incoming method frame.
- * @throws AMQException if an error has occurred. This exception will be delivered
- * to all registered listeners using the error() method (see below) allowing them to
- * perform cleanup if necessary.
- */
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws AMQException;
-
- /**
- * Callback when an error has occurred. Allows listeners to clean up.
- * @param e
- */
- void error(AMQException e);
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 415b7a3c68..fa43b8809d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -29,6 +29,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
@@ -99,10 +100,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQCodecFactory codecFactory)
throws AMQException
{
- this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+ _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+ _minaProtocolSession = session;
+ session.setAttachment(this);
+
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _codecFactory = codecFactory;
+ _managedObject = createMBean();
+ _managedObject.register();
+// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
@@ -208,13 +218,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
(AMQMethodBody) frame.bodyFrame);
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry);
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if(!_frameListeners.isEmpty())
{
for (AMQMethodListener listener : _frameListeners)
{
- wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
+ wasAnyoneInterested = listener.methodReceived(evt) ||
wasAnyoneInterested;
}
}
@@ -233,7 +243,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_logger.error("Closing connection due to: " + e.getMessage());
writeFrame(e.getCloseFrame(frame.channel));
}
- catch (AMQException e)
+ catch (Exception e)
{
_stateManager.error(e);
for (AMQMethodListener listener : _frameListeners)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index fb78b0d8b7..70e530699e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -25,7 +25,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.*;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.log4j.Logger;
@@ -43,7 +43,9 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
-
+ private final QueueRegistry _queueRegistry;
+ private final ExchangeRegistry _exchangeRegistry;
+ private final AMQProtocolSession _protocolSession;
/**
* The current state
*/
@@ -58,13 +60,16 @@ public class AMQStateManager implements AMQMethodListener
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
- public AMQStateManager()
+ public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register)
+ protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _protocolSession = protocolSession;
_currentState = initial;
if (register)
{
@@ -149,7 +154,7 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public void error(AMQException e)
+ public void error(Exception e)
{
_logger.error("State manager received error notification: " + e, e);
for (StateListener l : _stateListeners)
@@ -158,15 +163,12 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws AMQException
+ public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt);
+ handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
return true;
}
return false;