diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-09-05 19:54:24 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-09-05 19:54:24 +0000 |
| commit | 4cbd6e951597f78b8d5d0b11e72c02ed534f83ea (patch) | |
| tree | 9fc2ca75229a3449c29a5569963b170d481d318e | |
| parent | ba9a064e7a85aa1b7d9f78a6e155c0e678b22378 (diff) | |
| download | qpid-python-4cbd6e951597f78b8d5d0b11e72c02ed534f83ea.tar.gz | |
Separated 0-8 functionality from the AMQSession,/BasicMessageProducer and BasicMessageConsumer and made them abstract
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573039 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 765 insertions, 433 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 1ac43f4388..c001e97ec7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -535,7 +535,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // open it, so that there is no window where we could receive data on the channel and not be set // up to handle it appropriately. AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + new AMQSession_0_8(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); // _protocolHandler.addSessionByChannel(channelId, session); registerSession(channelId, session); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index bcd9337bcc..bc4036ac18 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,6 +21,41 @@ package org.apache.qpid.client; +import java.io.Serializable; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; @@ -129,7 +164,7 @@ import java.util.concurrent.atomic.AtomicLong; * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, * after looking at worse bottlenecks first. */ -public class AMQSession extends Closeable implements Session, QueueSession, TopicSession +public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -174,16 +209,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; /** The connection to which this session belongs. */ - private AMQConnection _connection; + protected AMQConnection _connection; /** Used to indicate whether or not this is a transactional session. */ - private boolean _transacted; + protected boolean _transacted; /** Holds the sessions acknowledgement mode. */ - private int _acknowledgeMode; + protected int _acknowledgeMode; /** Holds this session unique identifier, used to distinguish it from other sessions. */ - private int _channelId; + protected int _channelId; /** @todo This does not appear to be set? */ private int _ticket; @@ -231,7 +266,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private Dispatcher _dispatcher; /** Holds the message factory factory for this session. */ - private MessageFactoryRegistry _messageFactoryRegistry; + protected MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); @@ -246,7 +281,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - private Map<AMQShortString, BasicMessageConsumer> _consumers = + protected Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ @@ -428,19 +463,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @todo Be aware of possible changes to parameter order as versions change. */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) - { - final AMQFrame ackFrame = - BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, - multiple); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); - } - - getProtocolHandler().writeFrame(ackFrame); - } + public abstract void acknowledgeMessage(long deliveryTag, boolean multiple); /** * Binds the named queue, with the specified routing key, to the named exchange. @@ -466,22 +489,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { public Object execute() throws AMQException, FailoverException { - AMQFrame queueBind = - QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); - + sendQueueBind(queueName,routingKey,arguments,exchangeName); return null; } }, _connection).execute(); } + public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName) throws AMQException, FailoverException; + /** * Closes the session. @@ -525,20 +541,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - - getProtocolHandler().closeSession(this); - - final AMQFrame frame = - ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText - - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); - - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. + sendClose(timeout); } catch (AMQException e) { @@ -562,6 +565,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public abstract void sendClose(long timeout) throws AMQException, FailoverException; + /** * Called when the server initiates the closure of the session unilaterally. * @@ -620,10 +625,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Commits outstanding messages sent and outstanding acknowledgements. - final AMQProtocolHandler handler = getProtocolHandler(); - - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), - TxCommitOkBody.class); + sendCommit(); } catch (AMQException e) { @@ -635,6 +637,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public abstract void sendCommit() throws AMQException, FailoverException; + public void confirmConsumerCancelled(AMQShortString consumerTag) { @@ -968,24 +972,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { public Object execute() throws AMQException, FailoverException { - AMQFrame queueDeclare = - QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); - + sendCreateQueue(name, autoDelete,durable,exclusive); return null; } }, _connection).execute(); } + public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive)throws AMQException, FailoverException; /** * Creates a QueueReceiver * @@ -1356,20 +1350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _dispatcher.rollback(); } - if (isStrictAMQP()) - { - // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue - _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); - } - else - { - - _connection.getProtocolHandler().syncWrite( - BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue - , BasicRecoverOkBody.class); - } + sendRecover(); if (!isSuspended) { @@ -1386,6 +1367,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public abstract void sendRecover() throws AMQException, FailoverException; + public void rejectMessage(UnprocessedMessage message, boolean requeue) { @@ -1408,22 +1391,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void rejectMessage(long deliveryTag, boolean requeue) - { - if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting delivery tag:" + deliveryTag); - } - - AMQFrame basicRejectBody = - BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, - requeue); - - _connection.getProtocolHandler().writeFrame(basicRejectBody); - } - } + public abstract void rejectMessage(long deliveryTag, boolean requeue); /** * Commits all messages done in this transaction and releases any locks currently held. @@ -1458,8 +1426,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _dispatcher.rollback(); } - _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + sendRollback(); if (!isSuspended) { @@ -1477,6 +1444,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public void sendRollback() throws AMQException, FailoverException + { + _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + } + public void run() { throw new java.lang.UnsupportedOperationException(); @@ -1591,7 +1565,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQDestination amqd = (AMQDestination) destination; - final AMQProtocolHandler protocolHandler = getProtocolHandler(); // TODO: Define selectors in AMQP // TODO: construct the rawSelector from the selector string if rawSelector == null final FieldTable ft = FieldTableFactory.newFieldTable(); @@ -1602,10 +1575,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi ft.addAll(rawSelector); } - BasicMessageConsumer consumer = - new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, - _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, - exclusive, _acknowledgeMode, noConsume, autoClose); + BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow, + noLocal,exclusive, messageSelector, ft, noConsume, autoClose); if (_messageListener != null) { @@ -1653,6 +1624,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi }, _connection).execute(); } + public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose); + /** * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer * instance. @@ -1710,38 +1685,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @todo Be aware of possible changes to parameter order as versions change. */ - boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException - { - try - { - AMQMethodEvent response = - new FailoverRetrySupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() - { - public AMQMethodEvent execute() throws AMQException, FailoverException - { - AMQFrame boundFrame = - ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), - getProtocolMinorVersion(), exchangeName, // exchange - queueName, // queue - routingKey); // routingKey - - return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); - - } - }, _connection).execute(); - - // Extract and return the response code from the query. - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - - return (responseBody.replyCode == 0); - } - catch (AMQException e) - { - throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); - } - } + public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException; /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover @@ -2048,50 +1993,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { - // need to generate a consumer tag on the client so we can exploit the nowait flag + //need to generate a consumer tag on the client so we can exploit the nowait flag AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); - - FieldTable arguments = FieldTableFactory.newFieldTable(); - if ((messageSelector != null) && !messageSelector.equals("")) - { - arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); - } - - if (consumer.isAutoClose()) - { - arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); - } - - if (consumer.isNoConsume()) - { - arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); - } - consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening _consumers.put(tag, consumer); try { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = - BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket - - if (nowait) - { - protocolHandler.writeFrame(jmsConsume); - } - else - { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); - } + sendConsume(consumer,queueName,protocolHandler,nowait,messageSelector,tag); } catch (AMQException e) { @@ -2101,6 +2011,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException; + private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { @@ -2117,9 +2030,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); long producerId = getNextProducerId(); - BasicMessageProducer producer = - new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + BasicMessageProducer producer = createMessageProducer(destination, mandatory, + immediate, waitUntilSent, producerId); registerProducer(producerId, producer); return producer; @@ -2127,6 +2039,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi }, _connection).execute(); } + public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent, long producerId); + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); @@ -2147,31 +2062,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException + final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - AMQFrame exchangeDeclare = - ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - nowait, // nowait - false, // passive - getTicket(), // ticket - type); // type - - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); - + sendExchangeDeclare(name, type, protocolHandler, nowait); return null; } }, _connection).execute(); } + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException; + + /** * Declares a queue for a JMS destination. * @@ -2208,24 +2114,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqd.setQueueName(protocolHandler.generateQueueName()); } - AMQFrame queueDeclare = - QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket - - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + sendQueueDeclare(amqd,protocolHandler); return amqd.getAMQQueueName(); } }, _connection).execute(); } + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException; + /** * Undeclares the specified queue. * @@ -2245,16 +2142,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { public Object execute() throws AMQException, FailoverException { - AMQFrame queueDeleteFrame = - QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); - + sendQueueDelete(queueName); return null; } }, _connection).execute(); @@ -2265,22 +2153,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; + private long getNextProducerId() { return ++_nextProducerId; } - private AMQProtocolHandler getProtocolHandler() + protected AMQProtocolHandler getProtocolHandler() { return _connection.getProtocolHandler(); } - private byte getProtocolMajorVersion() + protected byte getProtocolMajorVersion() { return getProtocolHandler().getProtocolMajorVersion(); } - private byte getProtocolMinorVersion() + protected byte getProtocolMinorVersion() { return getProtocolHandler().getProtocolMinorVersion(); } @@ -2538,12 +2428,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } _suspended = suspend; - - AMQFrame channelFlowFrame = - ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), - !suspend); - - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + sendSuspendChannel(suspend); } catch (FailoverException e) { @@ -2552,6 +2437,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException; + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java new file mode 100644 index 0000000000..180a1e663c --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + + +import javax.jms.Destination; +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicAckBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ExchangeBoundBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.TxCommitBody; +import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.jms.Session; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQSession_0_8 extends AMQSession +{ + + /** Used for debugging. */ + private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + + /** + * Creates a new session on a connection. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param messageFactoryRegistry The message factory factory for the session. + * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. + */ + AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + { + + super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark); + } + + /** + * Creates a new session on a connection with the default message factory factory. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + */ + AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, + int defaultPrefetchLow) + { + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, + defaultPrefetchLow); + } + + public void acknowledgeMessage(long deliveryTag, boolean multiple) + { + final AMQFrame ackFrame = + BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + multiple); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + } + + getProtocolHandler().writeFrame(ackFrame); + } + + public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName) throws AMQException, FailoverException + { + AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + } + + public void sendClose(long timeout) throws AMQException, FailoverException + { + getProtocolHandler().closeSession(this); + + final AMQFrame frame = ChannelCloseBody.createAMQFrame + (getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText + + getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } + + public void sendCommit() throws AMQException, FailoverException + { + final AMQProtocolHandler handler = getProtocolHandler(); + + handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxCommitOkBody.class); + } + + public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException, + FailoverException + { + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + } + + public void sendRecover() throws AMQException, FailoverException + { + if (isStrictAMQP()) + { + // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. + _connection.getProtocolHandler().writeFrame( + BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue + _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); + } + else + { + + _connection.getProtocolHandler().syncWrite( + BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue + , BasicRecoverOkBody.class); + } + } + + public void rejectMessage(long deliveryTag, boolean requeue) + { + if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } + + AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + requeue); + + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } + } + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException + { + try + { + AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + exchangeName, // exchange + queueName, // queue + routingKey); // routingKey + + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + + } + }, _connection).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.replyCode == 0); + } + catch (AMQException e) + { + throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); + } + } + + public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, + String messageSelector, AMQShortString tag) throws AMQException, FailoverException + { + + FieldTable arguments = FieldTableFactory.newFieldTable(); + if ((messageSelector != null) && !messageSelector.equals("")) + { + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); + } + + if (consumer.isAutoClose()) + { + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } + + if (consumer.isNoConsume()) + { + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } + + consumer.setConsumerTag(tag); + // we must register the consumer in the map before we actually start listening + _consumers.put(tag, consumer); + // TODO: Be aware of possible changes to parameter order as versions change. + AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + getTicket()); // ticket + + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); + } + else + { + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + } + } + + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException + { + AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + nowait, // nowait + false, // passive + getTicket(), // ticket + type); // type + + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException + { + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + false, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + getTicket()); // ticket + + protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + } + + public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException + { + AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + } + + public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException + { + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), !suspend); + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + } + + public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, + final boolean noConsume, final boolean autoClose) + { + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal, + _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow, + exclusive, _acknowledgeMode, noConsume, autoClose); + } + + + public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent, long producerId) + { + + return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId, + this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index dfac0d45a8..96e0b30d23 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,37 +20,31 @@ */ package org.apache.qpid.client; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.failover.FailoverException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -public class BasicMessageConsumer extends Closeable implements MessageConsumer +public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -69,10 +63,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ - private AMQShortString _consumerTag; + protected AMQShortString _consumerTag; /** We need to know the channel id when constructing frames */ - private int _channelId; + protected int _channelId; /** * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors @@ -84,7 +78,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private final AMQSession _session; - private AMQProtocolHandler _protocolHandler; + protected AMQProtocolHandler _protocolHandler; /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ private FieldTable _rawSelectorFieldTable; @@ -482,29 +476,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (sendClose) { // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = - BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag - false); // nowait - - try - { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); - - if (_logger.isDebugEnabled()) - { - _logger.debug("CancelOk'd for consumer:" + debugIdentity()); - } - - } - catch (AMQException e) - { - throw new JMSAMQException("Error closing consumer: " + e, e); - } - catch (FailoverException e) - { - throw new JMSAMQException("FailoverException interrupted basic cancel.", e); - } + sendCancel(); } else { @@ -528,6 +500,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + public abstract void sendCancel() throws JMSAMQException; + /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has * vetoed automatic resubscription. The caller must hold the failover mutex. diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java new file mode 100644 index 0000000000..b4f9c5de37 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.FieldTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BasicMessageConsumer_0_8 extends BasicMessageConsumer +{ + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + + protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + { + super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, + protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, + acknowledgeMode, noConsume, autoClose); + } + + public void sendCancel() throws JMSAMQException + { + final AMQFrame cancelFrame = + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait + + try + { + _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + + if (_logger.isDebugEnabled()) + { + _logger.debug("CancelOk'd for consumer:" + debugIdentity()); + } + + } + catch (AMQException e) + { + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); + } + } +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 3691e80234..df0feff201 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,23 +20,8 @@ */ package org.apache.qpid.client; -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.CompositeAMQDataBlock; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ExchangeDeclareBody; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.UnsupportedEncodingException; +import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; @@ -51,10 +36,15 @@ import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; -import java.io.UnsupportedEncodingException; -import java.util.UUID; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.ContentBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer +public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { protected final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -63,7 +53,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j /** * If true, messages will not get a timestamp. */ - private boolean _disableTimestamps; + protected boolean _disableTimestamps; /** * Priority of messages created by this producer. @@ -95,14 +85,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j */ private String _mimeType; - private AMQProtocolHandler _protocolHandler; + protected AMQProtocolHandler _protocolHandler; /** * True if this producer was created from a transacted session */ private boolean _transacted; - private int _channelId; + protected int _channelId; /** * This is an id generated by the session and is used to tie individual producers to the session. This means we @@ -115,7 +105,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j /** * The session used to create this producer */ - private AMQSession _session; + protected AMQSession _session; private final boolean _immediate; @@ -156,24 +146,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - private void declareDestination(AMQDestination destination) - { - // Declare the exchange - // Note that the durable and internal arguments are ignored since passive is set to false - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame declare = - ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), null, // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - _session.getTicket(), // ticket - destination.getExchangeClass()); // type - _protocolHandler.writeFrame(declare); - } + public abstract void declareDestination(AMQDestination destination); public void setDisableMessageID(boolean b) throws JMSException { @@ -485,81 +458,13 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame publishFrame = - BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - _session.getTicket()); // ticket - - message.prepareForSending(); - ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); - - if (!_disableTimestamps) - { - final long currentTime = System.currentTimeMillis(); - contentHeaderProperties.setTimestamp(currentTime); - - if (timeToLive > 0) - { - contentHeaderProperties.setExpiration(currentTime + timeToLive); - } - else - { - contentHeaderProperties.setExpiration(0); - } - } - - contentHeaderProperties.setDeliveryMode((byte) deliveryMode); - contentHeaderProperties.setPriority((byte) priority); - - final int size = (payload != null) ? payload.limit() : 0; - final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); - final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; - - if (payload != null) - { - createContentBodies(payload, frames, 2, _channelId); - } - - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) - { - _logger.debug("Sending content body frames to " + destination); - } - - // weight argument of zero indicates no child content headers, just bodies - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending content header frame to " + destination); - } - - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - _protocolHandler.writeFrame(compositeFrame, wait); - - if (message != origMessage) - { - _logger.debug("Updating original message"); - origMessage.setJMSPriority(message.getJMSPriority()); - origMessage.setJMSTimestamp(message.getJMSTimestamp()); - _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); - origMessage.setJMSExpiration(message.getJMSExpiration()); - origMessage.setJMSMessageID(message.getJMSMessageID()); - } + sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, + mandatory, immediate, wait); } + public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode, + int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait)throws JMSException; + private void checkTemporaryDestination(AMQDestination destination) throws JMSException { if (destination instanceof TemporaryDestination) @@ -580,60 +485,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - /** - * Create content bodies. This will split a large message into numerous bodies depending on the negotiated - * maximum frame size. - * - * @param payload - * @param frames - * @param offset - * @param channelId @return the array of content bodies - */ - private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) - { - - if (frames.length == (offset + 1)) - { - frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); - } - else - { - - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - long remaining = payload.remaining(); - for (int i = offset; i < frames.length; i++) - { - payload.position((int) framePayloadMax * (i - offset)); - int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; - payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); - - remaining -= length; - } - } - - } - - private int calculateContentBodyFrameCount(ByteBuffer payload) - { - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - int frameCount; - if ((payload == null) || (payload.remaining() == 0)) - { - frameCount = 0; - } - else - { - int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; - frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - } - - return frameCount; - } - public void setMimeType(String mimeType) throws JMSException { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java new file mode 100644 index 0000000000..0e79069463 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -0,0 +1,199 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.CompositeAMQDataBlock; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ExchangeDeclareBody; + +public class BasicMessageProducer_0_8 extends BasicMessageProducer +{ + + BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, + boolean waitUntilSent) + { + super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent); + } + + public void declareDestination(AMQDestination destination) + { + // Declare the exchange + // Note that the durable and internal arguments are ignored since passive is set to false + // TODO: Be aware of possible changes to parameter order as versions change. + AMQFrame declare = + ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + destination.getExchangeName(), // exchange + false, // internal + true, // nowait + false, // passive + _session.getTicket(), // ticket + destination.getExchangeClass()); // type + _protocolHandler.writeFrame(declare); + } + + public void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message, + int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + { +// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame publishFrame = + BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange + immediate, // immediate + mandatory, // mandatory + destination.getRoutingKey(), // routingKey + _session.getTicket()); // ticket + + message.prepareForSending(); + ByteBuffer payload = message.getData(); + BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); + + if (!_disableTimestamps) + { + final long currentTime = System.currentTimeMillis(); + contentHeaderProperties.setTimestamp(currentTime); + + if (timeToLive > 0) + { + contentHeaderProperties.setExpiration(currentTime + timeToLive); + } + else + { + contentHeaderProperties.setExpiration(0); + } + } + + contentHeaderProperties.setDeliveryMode((byte) deliveryMode); + contentHeaderProperties.setPriority((byte) priority); + + final int size = (payload != null) ? payload.limit() : 0; + final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); + final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; + + if (payload != null) + { + createContentBodies(payload, frames, 2, _channelId); + } + + if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) + { + _logger.debug("Sending content body frames to " + destination); + } + + // weight argument of zero indicates no child content headers, just bodies + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + AMQFrame contentHeaderFrame = + ContentHeaderBody.createAMQFrame(_channelId, + BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size); + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending content header frame to " + destination); + } + + frames[0] = publishFrame; + frames[1] = contentHeaderFrame; + CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + _protocolHandler.writeFrame(compositeFrame, wait); + + if (message != origMessage) + { + _logger.debug("Updating original message"); + origMessage.setJMSPriority(message.getJMSPriority()); + origMessage.setJMSTimestamp(message.getJMSTimestamp()); + _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + origMessage.setJMSExpiration(message.getJMSExpiration()); + origMessage.setJMSMessageID(message.getJMSMessageID()); + } + } + + /** + * Create content bodies. This will split a large message into numerous bodies depending on the negotiated + * maximum frame size. + * + * @param payload + * @param frames + * @param offset + * @param channelId @return the array of content bodies + */ + private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) + { + + if (frames.length == (offset + 1)) + { + frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); + } + else + { + + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + long remaining = payload.remaining(); + for (int i = offset; i < frames.length; i++) + { + payload.position((int) framePayloadMax * (i - offset)); + int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; + payload.limit(payload.position() + length); + frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); + + remaining -= length; + } + } + + } + + private int calculateContentBodyFrameCount(ByteBuffer payload) + { + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + int frameCount; + if ((payload == null) || (payload.remaining() == 0)) + { + frameCount = 0; + } + else + { + int dataLength = payload.remaining(); + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; + frameCount = (int) (dataLength / framePayloadMax) + lastFrame; + } + + return frameCount; + } + +} |
