diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-05-01 07:17:43 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-05-01 07:17:43 +0000 |
| commit | c3df6ede2abf64b1ef2ec157621c65735599eea7 (patch) | |
| tree | 75600aacb5124d304f46e339ac2751a688de3fcc /java | |
| parent | 02e8e01f5e41a6a3f7123c73abdbe229e37af381 (diff) | |
| download | qpid-python-c3df6ede2abf64b1ef2ec157621c65735599eea7.tar.gz | |
Merged revisions 532766-532785,532788-532790,532792-533064,533066-533074,533076,533080-533130,533132-533139,533142-533703,533705-533765 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r532766 | rgreig | 2007-04-26 15:57:04 +0100 (Thu, 26 Apr 2007) | 1 line
Rationlized the performance tests.
........
r532794 | rgreig | 2007-04-26 17:33:10 +0100 (Thu, 26 Apr 2007) | 1 line
Rationalized the performance tests.
........
r533721 | rgodfrey | 2007-04-30 13:24:41 +0100 (Mon, 30 Apr 2007) | 1 line
QPID-476 : Remove duplicate map of channelId to session
........
r533764 | ritchiem | 2007-04-30 15:37:23 +0100 (Mon, 30 Apr 2007) | 4 lines
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java client.
Updated to allow the use of durable subscriptions but it will not be as clean as with the extensions.
Selectors are also now disabled.
........
r533765 | ritchiem | 2007-04-30 15:39:18 +0100 (Mon, 30 Apr 2007) | 1 line
QPID-461 Update to CommitRollbackTest. Ensuring messages received have the correct redelivered value, regardless of order.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@533957 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
7 files changed, 274 insertions, 251 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0e3d99eeba..347f5728e2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -96,7 +96,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap + private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>(); + private String _clientName; @@ -508,7 +509,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); - _protocolHandler.addSessionByChannel(channelId, session); + //_protocolHandler.addSessionByChannel(channelId, session); registerSession(channelId, session); boolean success = false; @@ -527,7 +528,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!success) { - _protocolHandler.removeSessionByChannel(channelId); deregisterSession(channelId); } } @@ -589,7 +589,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (AMQException e) { - _protocolHandler.removeSessionByChannel(channelId); deregisterSession(channelId); throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e); } @@ -1136,7 +1135,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); - _protocolHandler.addSessionByChannel(s.getChannelId(), s); + //_protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); } @@ -1223,4 +1222,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _taskPool.execute(task); } + + + public AMQSession getSession(int channelId) + { + return _sessions.get(channelId); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 700222dabb..10101976eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -209,6 +209,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final boolean _strictAMQP; + /** System property to enable strickt AMQP compliance */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; + /** Strickt AMQP default */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; + + private final boolean _strictAMQPFATAL; /** System property to enable immediate message prefetching */ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; @@ -429,23 +435,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry) - { - this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK); - } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch) - { - this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch); - } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); + _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); _connection = con; @@ -493,15 +490,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry()); - } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch); - } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { @@ -796,7 +785,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqe = new AMQException("Closing session forcibly", e); } _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + closeProducersAndConsumers(amqe); } } @@ -809,6 +798,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _closed.set(true); _connection.deregisterSession(_channelId); markClosedProducersAndConsumers(); + } private void markClosedProducersAndConsumers() @@ -941,7 +931,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue - _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); + _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else { @@ -1229,13 +1219,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final int prefetchLow, final boolean noLocal, final boolean exclusive, - final String selector, + String selector, final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); + final String messageSelector; + + if (_strictAMQP && !(selector == null || selector.equals(""))) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); + } + else + { + messageSelector = null; + } + } + else + { + messageSelector = selector; + } return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() { @@ -1246,6 +1253,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQDestination amqd = (AMQDestination) destination; final AMQProtocolHandler protocolHandler = getProtocolHandler(); + // TODO: Define selectors in AMQP // TODO: construct the rawSelector from the selector string if rawSelector == null final FieldTable ft = FieldTableFactory.newFieldTable(); //if (rawSelector != null) @@ -1254,7 +1262,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { ft.addAll(rawSelector); } - BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, + + BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); @@ -1647,6 +1656,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + + checkNotClosed(); AMQTopic origTopic = checkValidTopic(topic); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); @@ -1674,13 +1685,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { topicName = new AMQShortString(topic.getTopicName()); } - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && - !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + + if (_strictAMQP) { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); + } + deleteQueue(dest.getAMQQueueName()); } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && + !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } } subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); @@ -1778,13 +1807,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) + if (_strictAMQP) { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + + " Requesting queue deletion regardless."); + } + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } else { - throw new InvalidDestinationException("Unknown subscription exchange:" + name); + + if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) + { + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + } + else + { + throw new InvalidDestinationException("Unknown subscription exchange:" + name); + } } } } @@ -1796,10 +1843,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { - if (isStrictAMQP()) - { - throw new UnsupportedOperationException(); - } // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index e0c4b61333..a219f7d655 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -9,119 +9,141 @@ import javax.jms.Queue; import javax.jms.QueueSender; import javax.jms.InvalidDestinationException; -public class QueueSenderAdapter implements QueueSender { - - private BasicMessageProducer _delegate; - private Queue _queue; - private boolean closed = false; - - public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){ - _delegate = msgProducer; - _queue = queue; - } - - public Queue getQueue() throws JMSException { - checkPreConditions(); - return _queue; - } - - public void send(Message msg) throws JMSException { - checkPreConditions(); - _delegate.send(msg); - } - - public void send(Queue queue, Message msg) throws JMSException { - checkPreConditions(queue); - _delegate.send(queue, msg); - } - - public void publish(Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { - checkPreConditions(); - _delegate.send(msg, deliveryMode,priority,timeToLive); - } - - public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { - checkPreConditions(queue); - _delegate.send(queue,msg, deliveryMode,priority,timeToLive); - } - - public void close() throws JMSException { - _delegate.close(); - closed = true; - } - - public int getDeliveryMode() throws JMSException { - checkPreConditions(); - return _delegate.getDeliveryMode(); - } - - public Destination getDestination() throws JMSException { - checkPreConditions(); - return _delegate.getDestination(); - } - - public boolean getDisableMessageID() throws JMSException { - checkPreConditions(); - return _delegate.getDisableMessageID(); - } - - public boolean getDisableMessageTimestamp() throws JMSException { - checkPreConditions(); - return _delegate.getDisableMessageTimestamp(); - } - - public int getPriority() throws JMSException { - checkPreConditions(); - return _delegate.getPriority(); - } - - public long getTimeToLive() throws JMSException { - checkPreConditions(); - return _delegate.getTimeToLive(); - } - - public void send(Destination dest, Message msg) throws JMSException { - checkPreConditions((Queue)dest); - _delegate.send(dest,msg); - } - - public void send(Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { - checkPreConditions(); - _delegate.send(msg, deliveryMode,priority,timeToLive); - } - - public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions((Queue)dest); - _delegate.send(dest,msg, deliveryMode,priority,timeToLive); - } - - public void setDeliveryMode(int deliveryMode) throws JMSException { - checkPreConditions(); - _delegate.setDeliveryMode(deliveryMode); - } - - public void setDisableMessageID(boolean disableMessageID) throws JMSException { - checkPreConditions(); - _delegate.setDisableMessageID(disableMessageID); - } - - public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { - checkPreConditions(); - _delegate.setDisableMessageTimestamp(disableMessageTimestamp); - } - - public void setPriority(int priority) throws JMSException { - checkPreConditions(); - _delegate.setPriority(priority); - } - - public void setTimeToLive(long timeToLive) throws JMSException { - checkPreConditions(); - _delegate.setTimeToLive(timeToLive); - } +public class QueueSenderAdapter implements QueueSender +{ + + private BasicMessageProducer _delegate; + private Queue _queue; + private boolean closed = false; + + public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue) + { + _delegate = msgProducer; + _queue = queue; + } + + public Queue getQueue() throws JMSException + { + checkPreConditions(); + return _queue; + } + + public void send(Message msg) throws JMSException + { + checkPreConditions(); + _delegate.send(msg); + } + + public void send(Queue queue, Message msg) throws JMSException + { + checkPreConditions(queue); + _delegate.send(queue, msg); + } + + public void publish(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(); + _delegate.send(msg, deliveryMode, priority, timeToLive); + } + + public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(queue); + _delegate.send(queue, msg, deliveryMode, priority, timeToLive); + } + + public void close() throws JMSException + { + _delegate.close(); + closed = true; + } + + public int getDeliveryMode() throws JMSException + { + checkPreConditions(); + return _delegate.getDeliveryMode(); + } + + public Destination getDestination() throws JMSException + { + checkPreConditions(); + return _delegate.getDestination(); + } + + public boolean getDisableMessageID() throws JMSException + { + checkPreConditions(); + return _delegate.getDisableMessageID(); + } + + public boolean getDisableMessageTimestamp() throws JMSException + { + checkPreConditions(); + return _delegate.getDisableMessageTimestamp(); + } + + public int getPriority() throws JMSException + { + checkPreConditions(); + return _delegate.getPriority(); + } + + public long getTimeToLive() throws JMSException + { + checkPreConditions(); + return _delegate.getTimeToLive(); + } + + public void send(Destination dest, Message msg) throws JMSException + { + checkPreConditions((Queue) dest); + _delegate.send(dest, msg); + } + + public void send(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(); + _delegate.send(msg, deliveryMode, priority, timeToLive); + } + + public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException + { + checkPreConditions((Queue) dest); + _delegate.send(dest, msg, deliveryMode, priority, timeToLive); + } + + public void setDeliveryMode(int deliveryMode) throws JMSException + { + checkPreConditions(); + _delegate.setDeliveryMode(deliveryMode); + } + + public void setDisableMessageID(boolean disableMessageID) throws JMSException + { + checkPreConditions(); + _delegate.setDisableMessageID(disableMessageID); + } + + public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException + { + checkPreConditions(); + _delegate.setDisableMessageTimestamp(disableMessageTimestamp); + } + + public void setPriority(int priority) throws JMSException + { + checkPreConditions(); + _delegate.setPriority(priority); + } + + public void setTimeToLive(long timeToLive) throws JMSException + { + checkPreConditions(); + _delegate.setTimeToLive(timeToLive); + } private void checkPreConditions() throws JMSException { @@ -130,31 +152,41 @@ public class QueueSenderAdapter implements QueueSender { private void checkPreConditions(Queue queue) throws JMSException { - if (closed){ - throw new javax.jms.IllegalStateException("Publisher is closed"); - } - - AMQSession session = ((BasicMessageProducer) _delegate).getSession(); - - if(session == null || session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - - if(!(queue instanceof AMQDestination)) + if (closed) + { + throw new javax.jms.IllegalStateException("Publisher is closed"); + } + + AMQSession session = ((BasicMessageProducer) _delegate).getSession(); + + if (session == null || session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + + if (!(queue instanceof AMQDestination)) { throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); } AMQDestination destination = (AMQDestination) queue; - if(!destination.isValidated() && checkQueueBeforePublish()) + if (!destination.isValidated() && checkQueueBeforePublish()) { - if (_delegate.isBound(destination)) + if (_delegate.getSession().isStrictAMQP()) { + _delegate._logger.warn("AMQP does not support destination validation before publish, "); destination.setValidated(true); } else { - throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server"); + if (_delegate.isBound(destination)) + { + destination.setValidated(true); + } + else + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server"); + } } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 5bc1555df7..addef94215 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -490,27 +490,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); } - /** - * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol - * handler will ensure that messages are delivered to the consumer(s) on that session. - * - * @param channelId the channel id of the session - * @param session the session instance. - */ - public void addSessionByChannel(int channelId, AMQSession session) - { - _protocolSession.addSessionByChannel(channelId, session); - } - /** - * Convenience method to deregister an AMQSession with the protocol handler. - * - * @param channelId then channel id of the session - */ - public void removeSessionByChannel(int channelId) - { - _protocolSession.removeSessionByChannel(channelId); - } public void closeSession(AMQSession session) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 35aa69bd82..a557fc8027 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -85,7 +85,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final AMQProtocolHandler _protocolHandler; /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); @@ -104,26 +104,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); - /** - * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for - * test - */ - public AMQProtocolSession() - { - _protocolHandler = null; - _minaProtocolSession = null; - _stateManager = new AMQStateManager(this); - } + private final AMQConnection _connection; + public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - // fixme - real value needed - _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _stateManager = new AMQStateManager(this); + this(protocolHandler,protocolSession,connection, new AMQStateManager()); + } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, @@ -138,6 +125,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); _stateManager = stateManager; _stateManager.setProtocolSession(this); + _connection = connection; } @@ -305,11 +293,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) { - AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId); + AMQSession session = getSession(channelId); session.messageReceived(msg); _channelId2UnprocessedMsgMap.remove(channelId); } + protected AMQSession getSession(int channelId) + { + return _connection.getSession(channelId); + } + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -335,32 +328,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public void addSessionByChannel(int channelId, AMQSession session) - { - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero"); - } - - if (session == null) - { - throw new IllegalArgumentException("Attempt to register a null session"); - } - - _logger.debug("Add session with channel id " + channelId); - _channelId2SessionMap.put(channelId, session); - } - - public void removeSessionByChannel(int channelId) - { - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero"); - } - - _logger.debug("Removing session with channelId " + channelId); - _channelId2SessionMap.remove(channelId); - } /** * Starts the process of closing a session @@ -393,11 +360,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException { - final Integer chId = channelId; + // if this is not a response to an earlier request to close the channel - if (_closingChannels.remove(chId) == null) + if (_closingChannels.remove(channelId) == null) { - final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + final AMQSession session = getSession(channelId); try { session.closed(new AMQException(code, text)); @@ -469,8 +436,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { - final Integer chId = channelId; - final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + final AMQSession session = getSession(channelId); session.confirmConsumerCancelled(consumerTag); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 4374329fb0..3776ff767f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -32,9 +32,6 @@ public class AMQProtocolSessionTest extends TestCase { private static class AMQProtSession extends AMQProtocolSession { - public AMQProtSession() - { - } public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 685fe20048..8aaa760537 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -390,7 +390,6 @@ public class CommitRollbackTest extends TestCase assertEquals("1", ((TextMessage) result).getText()); assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); - _logger.info("Closing Consumer"); _consumer.close(); @@ -398,31 +397,32 @@ public class CommitRollbackTest extends TestCase _consumer = _session.createConsumer(_jmsQueue); _logger.info("receiving result"); + + // NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. + // Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. result = _consumer.receive(1000); assertNotNull("test message was consumed and rolled back, but is gone", result); + + // The first message back will be either 1 or 2 being redelivered if (result.getJMSRedelivered()) { - assertEquals("1", ((TextMessage) result).getText()); - - result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("2", ((TextMessage) result).getText()); assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); } - else + else // or it will be msg 2 arriving the first time due to latency. { - assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); + _logger.info("Message 2 wasn't prefetched so wasn't rejected"); + assertEquals("2", ((TextMessage) result).getText()); + } - result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - } result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); + _session.commit(); + } |
