From 00b761b3b6d80ee2bb3e538face881748efb2b09 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 25 Sep 2007 18:16:02 +0000 Subject: Renamed the following files for consistency: broker/BrokerExchange.cpp -> Exchange.cpp broker/BrokerExchange.h -> Exchange.h broker/BrokerQueue.cpp -> Queue.cpp broker/BrokerQueue.h -> Queue.h client/ClientChannel.cpp -> Channel.cpp client/ClientChannel.h -> Channel.h client/ClientConnection.cpp -> Connection.cpp client/ClientExchange.cpp -> Exchange.cpp client/ClientExchange.h -> Exchange.h client/ClientMessage.h -> Message.h client/ClientQueue.cpp -> Queue.cpp client/ClientQueue.h -> Queue.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@579340 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/Channel.cpp | 271 ++++++++++++++++++++++++++ cpp/src/qpid/client/Channel.h | 316 +++++++++++++++++++++++++++++++ cpp/src/qpid/client/ClientChannel.cpp | 271 -------------------------- cpp/src/qpid/client/ClientChannel.h | 316 ------------------------------- cpp/src/qpid/client/ClientConnection.cpp | 86 --------- cpp/src/qpid/client/ClientExchange.cpp | 34 ---- cpp/src/qpid/client/ClientExchange.h | 106 ----------- cpp/src/qpid/client/ClientMessage.h | 86 --------- cpp/src/qpid/client/ClientQueue.cpp | 58 ------ cpp/src/qpid/client/ClientQueue.h | 103 ---------- cpp/src/qpid/client/Connection.cpp | 86 +++++++++ cpp/src/qpid/client/Connection.h | 2 +- cpp/src/qpid/client/Dispatcher.cpp | 2 +- cpp/src/qpid/client/Exchange.cpp | 34 ++++ cpp/src/qpid/client/Exchange.h | 106 +++++++++++ cpp/src/qpid/client/Message.h | 86 +++++++++ cpp/src/qpid/client/MessageListener.h | 2 +- cpp/src/qpid/client/Queue.cpp | 58 ++++++ cpp/src/qpid/client/Queue.h | 103 ++++++++++ 19 files changed, 1063 insertions(+), 1063 deletions(-) create mode 100644 cpp/src/qpid/client/Channel.cpp create mode 100644 cpp/src/qpid/client/Channel.h delete mode 100644 cpp/src/qpid/client/ClientChannel.cpp delete mode 100644 cpp/src/qpid/client/ClientChannel.h delete mode 100644 cpp/src/qpid/client/ClientConnection.cpp delete mode 100644 cpp/src/qpid/client/ClientExchange.cpp delete mode 100644 cpp/src/qpid/client/ClientExchange.h delete mode 100644 cpp/src/qpid/client/ClientMessage.h delete mode 100644 cpp/src/qpid/client/ClientQueue.cpp delete mode 100644 cpp/src/qpid/client/ClientQueue.h create mode 100644 cpp/src/qpid/client/Connection.cpp create mode 100644 cpp/src/qpid/client/Exchange.cpp create mode 100644 cpp/src/qpid/client/Exchange.h create mode 100644 cpp/src/qpid/client/Message.h create mode 100644 cpp/src/qpid/client/Queue.cpp create mode 100644 cpp/src/qpid/client/Queue.h (limited to 'cpp/src/qpid/client') diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp new file mode 100644 index 0000000000..cef34630db --- /dev/null +++ b/cpp/src/qpid/client/Channel.cpp @@ -0,0 +1,271 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/log/Statement.h" +#include +#include +#include "Channel.h" +#include "qpid/sys/Monitor.h" +#include "Message.h" +#include "qpid/QpidError.h" +#include "Connection.h" +#include "Demux.h" +#include "FutureResponse.h" +#include "MessageListener.h" +#include "MessageQueue.h" +#include +#include +#include "qpid/framing/all_method_bodies.h" + +// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent +// handling of errors that should close the connection or the channel. +// Make sure the user thread receives a connection in each case. +// +using namespace std; +using namespace boost; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qpid{ +namespace client{ + +const std::string empty; + +class ScopedSync +{ + Session& session; + public: + ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } + ~ScopedSync() { session.setSynchronous(false); } +}; + +Channel::Channel(bool _transactional, u_int16_t _prefetch) : + prefetch(_prefetch), transactional(_transactional), running(false), + uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) +{ +} + +Channel::~Channel() +{ + join(); +} + +void Channel::open(const Session& s) +{ + Mutex::ScopedLock l(stopLock); + if (isOpen()) + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + active = true; + session = s; + if(isTransactional()) { + session.txSelect(); + } +} + +bool Channel::isOpen() const { + Mutex::ScopedLock l(stopLock); + return active; +} + +void Channel::setPrefetch(uint32_t _prefetch){ + prefetch = _prefetch; +} + +void Channel::declareExchange(Exchange& _exchange, bool synch){ + ScopedSync s(session, synch); + session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType()); +} + +void Channel::deleteExchange(Exchange& _exchange, bool synch){ + ScopedSync s(session, synch); + session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false); +} + +void Channel::declareQueue(Queue& _queue, bool synch){ + if (_queue.getName().empty()) { + stringstream uniqueName; + uniqueName << uniqueId << "-queue-" << ++nameCounter; + _queue.setName(uniqueName.str()); + } + + ScopedSync s(session, synch); + session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), + exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()); + +} + +void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){ + ScopedSync s(session, synch); + session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty); +} + +void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ + string e = exchange.getName(); + string q = queue.getName(); + ScopedSync s(session, synch); + session.queueBind(0, q, e, key, args); +} + +void Channel::commit(){ + session.txCommit(); +} + +void Channel::rollback(){ + session.txRollback(); +} + +void Channel::consume( + Queue& _queue, const std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + + if (tag.empty()) { + throw Exception("A tag must be specified for a consumer."); + } + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.count = 0; + } + uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; + ScopedSync s(session, synch); + session.messageSubscribe(0, _queue.getName(), tag, noLocal, + confirmMode, 0/*pre-acquire*/, + false, fields ? *fields : FieldTable()); + if (!prefetch) { + session.messageFlowMode(tag, 0/*credit based*/); + } + + //allocate some credit: + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); +} + +void Channel::cancel(const std::string& tag, bool synch) { + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + ScopedSync s(session, synch); + session.messageCancel(tag); +} + +bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { + string tag = "get-handler"; + ScopedDivert handler(tag, session.execution().getDemux()); + Demux::Queue& incoming = handler.getQueue(); + + session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, 1); + Completion status = session.messageFlush(tag); + status.sync(); + session.messageCancel(tag); + + if (incoming.empty()) { + return false; + } else { + msg.populate(*(incoming.pop())); + if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); + return true; + } +} + +void Channel::publish(Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory, bool /*?TODO-restore immediate?*/) { + + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); + session.messageTransfer_(destination=exchange.getName(), content=msg); +} + +void Channel::close() +{ + session.close(); + { + Mutex::ScopedLock l(stopLock); + active = false; + } + stop(); +} + +void Channel::start(){ + running = true; + dispatcher = Thread(*this); +} + +void Channel::stop() { + gets.close(); + join(); +} + +void Channel::join() { + Mutex::ScopedLock l(stopLock); + if(running && dispatcher.id()) { + dispatcher.join(); + running = false; + } +} + +void Channel::dispatch(FrameSet& content, const std::string& destination) +{ + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + MessageListener* listener = i->second.listener; + listener->received(msg); + if (isOpen() && i->second.ackMode != CLIENT_ACK) { + bool send = i->second.ackMode == AUTO_ACK + || (prefetch && ++(i->second.count) > (prefetch / 2)); + if (send) i->second.count = 0; + session.execution().completed(content.getId(), true, send); + } + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); + } +} + +void Channel::run() { + try { + while (true) { + FrameSet::shared_ptr content = session.get(); + //need to dispatch this to the relevant listener: + if (content->isA()) { + dispatch(*content, content->as()->getDestination()); + } else { + QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); + } + } + } catch (const QueueClosed&) {} +} + +}} + diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h new file mode 100644 index 0000000000..bf0b289077 --- /dev/null +++ b/cpp/src/qpid/client/Channel.h @@ -0,0 +1,316 @@ +#ifndef _client_Channel_h +#define _client_Channel_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/Uuid.h" +#include "Exchange.h" +#include "Message.h" +#include "Queue.h" +#include "ConnectionImpl.h" +#include "qpid/client/Session.h" +#include "qpid/Exception.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "AckMode.h" + +namespace qpid { + +namespace framing { +class ChannelCloseBody; +class AMQMethodBody; +} + +namespace client { + +class Connection; +class MessageChannel; +class MessageListener; +class ReturnedMessageHandler; + +/** + * Represents an AMQP channel, i.e. loosely a session of work. It + * is through a channel that most of the AMQP 'methods' are + * exposed. + * + * \ingroup clientapi + */ +class Channel : private sys::Runnable +{ + private: + struct Consumer{ + MessageListener* listener; + AckMode ackMode; + uint32_t count; + }; + typedef std::map ConsumerMap; + + mutable sys::Mutex lock; + sys::Thread dispatcher; + + uint32_t prefetch; + const bool transactional; + framing::ProtocolVersion version; + + mutable sys::Mutex stopLock; + bool running; + + ConsumerMap consumers; + Session session; + framing::ChannelId channelId; + BlockingQueue gets; + framing::Uuid uniqueId; + uint32_t nameCounter; + bool active; + + void stop(); + + void open(const Session& session); + void closeInternal(); + void join(); + + void dispatch(framing::FrameSet& msg, const std::string& destination); + + // FIXME aconway 2007-02-23: Get rid of friendships. + friend class Connection; + + public: + /** + * Creates a channel object. + * + * @param transactional if true, the publishing and acknowledgement + * of messages will be transactional and can be committed or + * aborted in atomic units (@see commit(), @see rollback()) + * + * @param prefetch specifies the number of unacknowledged + * messages the channel is willing to have sent to it + * asynchronously + */ + Channel(bool transactional = false, u_int16_t prefetch = 0); + + ~Channel(); + + /** + * Declares an exchange. + * + * In AMQP Exchanges are the destinations to which messages + * are published. They have Queues bound to them and route + * messages they receive to those queues. The routing rules + * depend on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareExchange(Exchange& exchange, bool synch = true); + /** + * Deletes an exchange + * + * @param exchange an Exchange object representing the exchange to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteExchange(Exchange& exchange, bool synch = true); + /** + * Declares a Queue + * + * @param queue a Queue object representing the queue to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareQueue(Queue& queue, bool synch = true); + /** + * Deletes a Queue + * + * @param queue a Queue object representing the queue to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); + /** + * Binds a queue to an exchange. The exact semantics of this + * (in particular how 'routing keys' and 'binding arguments' + * are used) depends on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to bind to + * + * @param queue a Queue object representing the queue to be + * bound + * + * @param key the 'routing key' for the binding + * + * @param args the 'binding arguments' for the binding + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void bind(const Exchange& exchange, const Queue& queue, + const std::string& key, + const framing::FieldTable& args=framing::FieldTable(), + bool synch = true); + + /** + * For a transactional channel this will commit all + * publications and acknowledgements since the last commit (or + * the channel was opened if there has been no previous + * commit). This will cause published messages to become + * available to consumers and acknowledged messages to be + * consumed and removed from the queues they were dispatched + * from. + * + * Transactionailty of a channel is specified when the channel + * object is created (@see Channel()). + */ + void commit(); + + /** + * For a transactional channel, this will rollback any + * publications or acknowledgements. It will be as if the + * ppblished messages were never sent and the acknowledged + * messages were never consumed. + */ + void rollback(); + + /** + * Change the prefetch in use. + */ + void setPrefetch(uint32_t prefetch); + + uint32_t getPrefetch() { return prefetch; } + + /** + * Start message dispatching on a new thread + */ + void start(); + + /** + * Close the channel. Closing a channel that is not open has no + * effect. + */ + void close(); + + /** True if the channel is transactional */ + bool isTransactional() { return transactional; } + + /** True if the channel is open */ + bool isOpen() const; + + /** Return the protocol version */ + framing::ProtocolVersion getVersion() const { return version ; } + + /** + * Creates a 'consumer' for a queue. Messages in (or arriving + * at) that queue will be delivered to consumers + * asynchronously. + * + * @param queue a Queue instance representing the queue to + * consume from + * + * @param tag an identifier to associate with the consumer + * that can be used to cancel its subscription (if empty, this + * will be assigned by the broker) + * + * @param listener a pointer to an instance of an + * implementation of the MessageListener interface. Messages + * received from this queue for this consumer will result in + * invocation of the received() method on the listener, with + * the message itself passed in. + * + * @param ackMode the mode of acknowledgement that the broker + * should assume for this consumer. @see AckMode + * + * @param noLocal if true, this consumer will not be sent any + * message published by this connection + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void consume( + Queue& queue, const std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); + + /** + * Cancels a subscription previously set up through a call to consume(). + * + * @param tag the identifier used (or assigned) in the consume + * request that set up the subscription to be cancelled. + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void cancel(const std::string& tag, bool synch = true); + /** + * Synchronous pull of a message from a queue. + * + * @param msg a message object that will contain the message + * headers and content if the call completes. + * + * @param queue the queue to consume from + * + * @param ackMode the acknowledgement mode to use (@see + * AckMode) + * + * @return true if a message was succcessfully dequeued from + * the queue, false if the queue was empty. + */ + bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); + + /** + * Publishes (i.e. sends a message to the broker). + * + * @param msg the message to publish + * + * @param exchange the exchange to publish the message to + * + * @param routingKey the routing key to publish with + * + * @param mandatory if true and the exchange to which this + * publish is directed has no matching bindings, the message + * will be returned (see setReturnedMessageHandler()). + * + * @param immediate if true and there is no consumer to + * receive this message on publication, the message will be + * returned (see setReturnedMessageHandler()). + */ + void publish(Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + /** + * Deliver incoming messages to the appropriate MessageListener. + */ + void run(); +}; + +}} + +#endif /*!_client_Channel_h*/ diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp deleted file mode 100644 index f5362bf688..0000000000 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ /dev/null @@ -1,271 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/log/Statement.h" -#include -#include -#include "ClientChannel.h" -#include "qpid/sys/Monitor.h" -#include "ClientMessage.h" -#include "qpid/QpidError.h" -#include "Connection.h" -#include "Demux.h" -#include "FutureResponse.h" -#include "MessageListener.h" -#include "MessageQueue.h" -#include -#include -#include "qpid/framing/all_method_bodies.h" - -// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent -// handling of errors that should close the connection or the channel. -// Make sure the user thread receives a connection in each case. -// -using namespace std; -using namespace boost; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qpid{ -namespace client{ - -const std::string empty; - -class ScopedSync -{ - Session& session; - public: - ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } - ~ScopedSync() { session.setSynchronous(false); } -}; - -Channel::Channel(bool _transactional, u_int16_t _prefetch) : - prefetch(_prefetch), transactional(_transactional), running(false), - uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) -{ -} - -Channel::~Channel() -{ - join(); -} - -void Channel::open(const Session& s) -{ - Mutex::ScopedLock l(stopLock); - if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); - active = true; - session = s; - if(isTransactional()) { - session.txSelect(); - } -} - -bool Channel::isOpen() const { - Mutex::ScopedLock l(stopLock); - return active; -} - -void Channel::setPrefetch(uint32_t _prefetch){ - prefetch = _prefetch; -} - -void Channel::declareExchange(Exchange& _exchange, bool synch){ - ScopedSync s(session, synch); - session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType()); -} - -void Channel::deleteExchange(Exchange& _exchange, bool synch){ - ScopedSync s(session, synch); - session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false); -} - -void Channel::declareQueue(Queue& _queue, bool synch){ - if (_queue.getName().empty()) { - stringstream uniqueName; - uniqueName << uniqueId << "-queue-" << ++nameCounter; - _queue.setName(uniqueName.str()); - } - - ScopedSync s(session, synch); - session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), - exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()); - -} - -void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){ - ScopedSync s(session, synch); - session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty); -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - ScopedSync s(session, synch); - session.queueBind(0, q, e, key, args); -} - -void Channel::commit(){ - session.txCommit(); -} - -void Channel::rollback(){ - session.txRollback(); -} - -void Channel::consume( - Queue& _queue, const std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { - - if (tag.empty()) { - throw Exception("A tag must be specified for a consumer."); - } - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.count = 0; - } - uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; - ScopedSync s(session, synch); - session.messageSubscribe(0, _queue.getName(), tag, noLocal, - confirmMode, 0/*pre-acquire*/, - false, fields ? *fields : FieldTable()); - if (!prefetch) { - session.messageFlowMode(tag, 0/*credit based*/); - } - - //allocate some credit: - session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); - session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); -} - -void Channel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - ScopedSync s(session, synch); - session.messageCancel(tag); -} - -bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { - string tag = "get-handler"; - ScopedDivert handler(tag, session.execution().getDemux()); - Demux::Queue& incoming = handler.getQueue(); - - session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); - session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); - session.messageFlow(tag, 0/*MESSAGES*/, 1); - Completion status = session.messageFlush(tag); - status.sync(); - session.messageCancel(tag); - - if (incoming.empty()) { - return false; - } else { - msg.populate(*(incoming.pop())); - if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); - return true; - } -} - -void Channel::publish(Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory, bool /*?TODO-restore immediate?*/) { - - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); - session.messageTransfer_(destination=exchange.getName(), content=msg); -} - -void Channel::close() -{ - session.close(); - { - Mutex::ScopedLock l(stopLock); - active = false; - } - stop(); -} - -void Channel::start(){ - running = true; - dispatcher = Thread(*this); -} - -void Channel::stop() { - gets.close(); - join(); -} - -void Channel::join() { - Mutex::ScopedLock l(stopLock); - if(running && dispatcher.id()) { - dispatcher.join(); - running = false; - } -} - -void Channel::dispatch(FrameSet& content, const std::string& destination) -{ - ConsumerMap::iterator i = consumers.find(destination); - if (i != consumers.end()) { - Message msg; - msg.populate(content); - MessageListener* listener = i->second.listener; - listener->received(msg); - if (isOpen() && i->second.ackMode != CLIENT_ACK) { - bool send = i->second.ackMode == AUTO_ACK - || (prefetch && ++(i->second.count) > (prefetch / 2)); - if (send) i->second.count = 0; - session.execution().completed(content.getId(), true, send); - } - } else { - QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); - } -} - -void Channel::run() { - try { - while (true) { - FrameSet::shared_ptr content = session.get(); - //need to dispatch this to the relevant listener: - if (content->isA()) { - dispatch(*content, content->as()->getDestination()); - } else { - QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); - } - } - } catch (const QueueClosed&) {} -} - -}} - diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h deleted file mode 100644 index 527f5d418f..0000000000 --- a/cpp/src/qpid/client/ClientChannel.h +++ /dev/null @@ -1,316 +0,0 @@ -#ifndef _client_ClientChannel_h -#define _client_ClientChannel_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/Uuid.h" -#include "ClientExchange.h" -#include "ClientMessage.h" -#include "ClientQueue.h" -#include "ConnectionImpl.h" -#include "qpid/client/Session.h" -#include "qpid/Exception.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" -#include "AckMode.h" - -namespace qpid { - -namespace framing { -class ChannelCloseBody; -class AMQMethodBody; -} - -namespace client { - -class Connection; -class MessageChannel; -class MessageListener; -class ReturnedMessageHandler; - -/** - * Represents an AMQP channel, i.e. loosely a session of work. It - * is through a channel that most of the AMQP 'methods' are - * exposed. - * - * \ingroup clientapi - */ -class Channel : private sys::Runnable -{ - private: - struct Consumer{ - MessageListener* listener; - AckMode ackMode; - uint32_t count; - }; - typedef std::map ConsumerMap; - - mutable sys::Mutex lock; - sys::Thread dispatcher; - - uint32_t prefetch; - const bool transactional; - framing::ProtocolVersion version; - - mutable sys::Mutex stopLock; - bool running; - - ConsumerMap consumers; - Session session; - framing::ChannelId channelId; - BlockingQueue gets; - framing::Uuid uniqueId; - uint32_t nameCounter; - bool active; - - void stop(); - - void open(const Session& session); - void closeInternal(); - void join(); - - void dispatch(framing::FrameSet& msg, const std::string& destination); - - // FIXME aconway 2007-02-23: Get rid of friendships. - friend class Connection; - - public: - /** - * Creates a channel object. - * - * @param transactional if true, the publishing and acknowledgement - * of messages will be transactional and can be committed or - * aborted in atomic units (@see commit(), @see rollback()) - * - * @param prefetch specifies the number of unacknowledged - * messages the channel is willing to have sent to it - * asynchronously - */ - Channel(bool transactional = false, u_int16_t prefetch = 0); - - ~Channel(); - - /** - * Declares an exchange. - * - * In AMQP Exchanges are the destinations to which messages - * are published. They have Queues bound to them and route - * messages they receive to those queues. The routing rules - * depend on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareExchange(Exchange& exchange, bool synch = true); - /** - * Deletes an exchange - * - * @param exchange an Exchange object representing the exchange to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteExchange(Exchange& exchange, bool synch = true); - /** - * Declares a Queue - * - * @param queue a Queue object representing the queue to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareQueue(Queue& queue, bool synch = true); - /** - * Deletes a Queue - * - * @param queue a Queue object representing the queue to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); - /** - * Binds a queue to an exchange. The exact semantics of this - * (in particular how 'routing keys' and 'binding arguments' - * are used) depends on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to bind to - * - * @param queue a Queue object representing the queue to be - * bound - * - * @param key the 'routing key' for the binding - * - * @param args the 'binding arguments' for the binding - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void bind(const Exchange& exchange, const Queue& queue, - const std::string& key, - const framing::FieldTable& args=framing::FieldTable(), - bool synch = true); - - /** - * For a transactional channel this will commit all - * publications and acknowledgements since the last commit (or - * the channel was opened if there has been no previous - * commit). This will cause published messages to become - * available to consumers and acknowledged messages to be - * consumed and removed from the queues they were dispatched - * from. - * - * Transactionailty of a channel is specified when the channel - * object is created (@see Channel()). - */ - void commit(); - - /** - * For a transactional channel, this will rollback any - * publications or acknowledgements. It will be as if the - * ppblished messages were never sent and the acknowledged - * messages were never consumed. - */ - void rollback(); - - /** - * Change the prefetch in use. - */ - void setPrefetch(uint32_t prefetch); - - uint32_t getPrefetch() { return prefetch; } - - /** - * Start message dispatching on a new thread - */ - void start(); - - /** - * Close the channel. Closing a channel that is not open has no - * effect. - */ - void close(); - - /** True if the channel is transactional */ - bool isTransactional() { return transactional; } - - /** True if the channel is open */ - bool isOpen() const; - - /** Return the protocol version */ - framing::ProtocolVersion getVersion() const { return version ; } - - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see AckMode - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, const std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(const std::string& tag, bool synch = true); - /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * AckMode) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. - */ - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - /** - * Deliver incoming messages to the appropriate MessageListener. - */ - void run(); -}; - -}} - -#endif /*!_client_ClientChannel_h*/ diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp deleted file mode 100644 index 8c5f83f9f5..0000000000 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include - -#include "Connection.h" -#include "ClientChannel.h" -#include "ClientMessage.h" -#include "ScopedAssociation.h" -#include "qpid/log/Logger.h" -#include "qpid/log/Options.h" -#include "qpid/log/Statement.h" -#include "qpid/QpidError.h" -#include -#include -#include - -using namespace qpid::framing; -using namespace qpid::sys; - - -namespace qpid { -namespace client { - -Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : - channelIdCounter(0), version(_version), - max_frame_size(_max_frame_size), - impl(new ConnectionImpl(boost::shared_ptr(new Connector(_version, _debug)))), - isOpen(false) {} - -Connection::Connection(boost::shared_ptr c) : - channelIdCounter(0), version(framing::highestProtocolVersion), - max_frame_size(65536), - impl(new ConnectionImpl(c)), - isOpen(false) {} - -Connection::~Connection(){} - -void Connection::open( - const std::string& host, int port, - const std::string& uid, const std::string& pwd, const std::string& vhost) -{ - if (isOpen) - THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); - - impl->open(host, port, uid, pwd, vhost); - isOpen = true; -} - -void Connection::openChannel(Channel& channel) { - channel.open(newSession()); -} - -Session Connection::newSession() { - ChannelId id = ++channelIdCounter; - SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); - ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl)); - session->open(); - return Session(assoc); -} - -void Connection::close() -{ - impl->close(); -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/ClientExchange.cpp b/cpp/src/qpid/client/ClientExchange.cpp deleted file mode 100644 index d5914beea2..0000000000 --- a/cpp/src/qpid/client/ClientExchange.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClientExchange.h" - -qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} -const std::string& qpid::client::Exchange::getName() const { return name; } -const std::string& qpid::client::Exchange::getType() const { return type; } - -const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; -const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; -const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; - -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/src/qpid/client/ClientExchange.h b/cpp/src/qpid/client/ClientExchange.h deleted file mode 100644 index a8ac21fa9b..0000000000 --- a/cpp/src/qpid/client/ClientExchange.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include - -#ifndef _Exchange_ -#define _Exchange_ - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP exchange in the Channel - * methods. Exchanges are the destinations to which messages are - * published. - * - * There are different types of exchange (the standard types are - * available as static constants, see DIRECT_EXCHANGE, - * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to - * an exchange using Channel::bind() and messages published to - * that exchange are then routed to the queue based on the details - * of the binding and the type of exchange. - * - * There are some standard exchange instances that are predeclared - * on all AMQP brokers. These are defined as static members - * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and - * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange - * (member DEFAULT_EXCHANGE) which is nameless and of type - * 'direct' and has every declared queue bound to it by queue - * name. - * - * \ingroup clientapi - */ - class Exchange{ - const std::string name; - const std::string type; - - public: - /** - * A direct exchange routes messages published with routing - * key X to any queue bound with key X (i.e. an exact match is - * used). - */ - static const std::string DIRECT_EXCHANGE; - /** - * A topic exchange treat the key with which a queue is bound - * as a pattern and routes all messages whose routing keys - * match that pattern to the bound queue. The routing key for - * a message must consist of zero or more alpha-numeric words - * delimited by dots. The pattern is of a similar form but * - * can be used to match excatly one word and # can be used to - * match zero or more words. - */ - static const std::string TOPIC_EXCHANGE; - /** - * The headers exchange routes messages based on whether their - * headers match the binding arguments specified when - * binding. (see the AMQP spec for more details). - */ - static const std::string HEADERS_EXCHANGE; - - /** - * The 'default' exchange, nameless and of type 'direct'. Has - * every declared queue bound to it by name. - */ - static const Exchange DEFAULT_EXCHANGE; - /** - * The standard direct exchange, named amq.direct. - */ - static const Exchange STANDARD_DIRECT_EXCHANGE; - /** - * The standard topic exchange, named amq.topic. - */ - static const Exchange STANDARD_TOPIC_EXCHANGE; - /** - * The standard headers exchange, named amq.header. - */ - static const Exchange STANDARD_HEADERS_EXCHANGE; - - Exchange(std::string name, std::string type = DIRECT_EXCHANGE); - const std::string& getName() const; - const std::string& getType() const; - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h deleted file mode 100644 index a573e17940..0000000000 --- a/cpp/src/qpid/client/ClientMessage.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef _client_ClientMessage_h -#define _client_ClientMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include "qpid/client/Session.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/TransferContent.h" - -namespace qpid { -namespace client { - -/** - * A representation of messages for sent or recived through the - * client api. - * - * \ingroup clientapi - */ -class Message : public framing::TransferContent -{ -public: - Message(const std::string& data_=std::string()) : TransferContent(data_) {} - - std::string getDestination() const - { - return method.getDestination(); - } - - bool isRedelivered() const - { - return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); - } - - void setRedelivered(bool redelivered) - { - getDeliveryProperties().setRedelivered(redelivered); - } - - framing::FieldTable& getHeaders() - { - return getMessageProperties().getApplicationHeaders(); - } - - void acknowledge(Session& session, bool cumulative = true, bool send = true) const - { - session.execution().completed(id, cumulative, send); - } - - Message(const framing::FrameSet& frameset) : method(*frameset.as()), id(frameset.getId()) - { - populate(frameset); - } - - const framing::MessageTransferBody& getMethod() const - { - return method; - } - -private: - //method and id are only set for received messages: - const framing::MessageTransferBody method; - const framing::SequenceNumber id; -}; - -}} - -#endif /*!_client_ClientMessage_h*/ diff --git a/cpp/src/qpid/client/ClientQueue.cpp b/cpp/src/qpid/client/ClientQueue.cpp deleted file mode 100644 index 613cf8d288..0000000000 --- a/cpp/src/qpid/client/ClientQueue.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClientQueue.h" - -qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} - -qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) - : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} - -const std::string& qpid::client::Queue::getName() const{ - return name; -} - -void qpid::client::Queue::setName(const std::string& _name){ - name = _name; -} - -bool qpid::client::Queue::isAutoDelete() const{ - return autodelete; -} - -bool qpid::client::Queue::isExclusive() const{ - return exclusive; -} - -bool qpid::client::Queue::isDurable() const{ - return durable; -} - -void qpid::client::Queue::setDurable(bool _durable){ - durable = _durable; -} - - - - diff --git a/cpp/src/qpid/client/ClientQueue.h b/cpp/src/qpid/client/ClientQueue.h deleted file mode 100644 index b37a44b004..0000000000 --- a/cpp/src/qpid/client/ClientQueue.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef _client_ClientQueue_h -#define _client_ClientQueue_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP queue in the Channel - * methods. Creating an instance of this class does not cause the - * queue to be created on the broker. Rather, an instance of this - * class should be passed to Channel::declareQueue() to ensure - * that the queue exists or is created. - * - * Queues hold messages and allow clients to consume - * (see Channel::consume()) or get (see Channel::get()) those messags. A - * queue receives messages by being bound to one or more Exchange; - * messages published to that exchange may then be routed to the - * queue based on the details of the binding and the type of the - * exchange (see Channel::bind()). - * - * Queues are identified by a name. They can be exclusive (in which - * case they can only be used in the context of the connection - * over which they were declared, and are deleted when then - * connection closes), or they can be shared. Shared queues can be - * auto deleted when they have no consumers. - * - * We use the term 'temporary queue' to refer to an exclusive - * queue. - * - * \ingroup clientapi - */ - class Queue{ - std::string name; - const bool autodelete; - const bool exclusive; - bool durable; - - public: - - /** - * Creates an unnamed, non-durable, temporary queue. A name - * will be assigned to this queue instance by a call to - * Channel::declareQueue(). - */ - Queue(); - /** - * Creates a shared, non-durable, queue with a given name, - * that will not be autodeleted. - * - * @param name the name of the queue - */ - Queue(std::string name); - /** - * Creates a non-durable queue with a given name. - * - * @param name the name of the queue - * - * @param temp if true the queue will be a temporary queue, if - * false it will be shared and not autodeleted. - */ - Queue(std::string name, bool temp); - /** - * This constructor allows the autodelete, exclusive and - * durable propeties to be explictly set. Note however that if - * exclusive is true, autodelete has no meaning as exclusive - * queues are always destroyed when the connection that - * created them is closed. - */ - Queue(std::string name, bool autodelete, bool exclusive, bool durable); - const std::string& getName() const; - void setName(const std::string&); - bool isAutoDelete() const; - bool isExclusive() const; - bool isDurable() const; - void setDurable(bool durable); - }; - -} -} - -#endif /*!_client_ClientQueue_h*/ diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp new file mode 100644 index 0000000000..cef076527f --- /dev/null +++ b/cpp/src/qpid/client/Connection.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include +#include + +#include "Connection.h" +#include "Channel.h" +#include "Message.h" +#include "ScopedAssociation.h" +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" +#include "qpid/log/Statement.h" +#include "qpid/QpidError.h" +#include +#include +#include + +using namespace qpid::framing; +using namespace qpid::sys; + + +namespace qpid { +namespace client { + +Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : + channelIdCounter(0), version(_version), + max_frame_size(_max_frame_size), + impl(new ConnectionImpl(boost::shared_ptr(new Connector(_version, _debug)))), + isOpen(false) {} + +Connection::Connection(boost::shared_ptr c) : + channelIdCounter(0), version(framing::highestProtocolVersion), + max_frame_size(65536), + impl(new ConnectionImpl(c)), + isOpen(false) {} + +Connection::~Connection(){} + +void Connection::open( + const std::string& host, int port, + const std::string& uid, const std::string& pwd, const std::string& vhost) +{ + if (isOpen) + THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + + impl->open(host, port, uid, pwd, vhost); + isOpen = true; +} + +void Connection::openChannel(Channel& channel) { + channel.open(newSession()); +} + +Session Connection::newSession() { + ChannelId id = ++channelIdCounter; + SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); + ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl)); + session->open(); + return Session(assoc); +} + +void Connection::close() +{ + impl->close(); +} + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index e309b5c63e..f5d6a387a9 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -24,7 +24,7 @@ #include #include #include "qpid/QpidError.h" -#include "ClientChannel.h" +#include "Channel.h" #include "ConnectionImpl.h" #include "qpid/client/Session.h" #include "qpid/framing/AMQP_HighestVersion.h" diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 8f3ed8bcbe..fd437725ce 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -25,7 +25,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "BlockingQueue.h" -#include "ClientMessage.h" +#include "Message.h" using qpid::framing::FrameSet; using qpid::framing::MessageTransferBody; diff --git a/cpp/src/qpid/client/Exchange.cpp b/cpp/src/qpid/client/Exchange.cpp new file mode 100644 index 0000000000..e7fbdeb47e --- /dev/null +++ b/cpp/src/qpid/client/Exchange.cpp @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Exchange.h" + +qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} +const std::string& qpid::client::Exchange::getName() const { return name; } +const std::string& qpid::client::Exchange::getType() const { return type; } + +const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; +const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; +const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; + +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/src/qpid/client/Exchange.h b/cpp/src/qpid/client/Exchange.h new file mode 100644 index 0000000000..a8ac21fa9b --- /dev/null +++ b/cpp/src/qpid/client/Exchange.h @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include + +#ifndef _Exchange_ +#define _Exchange_ + +namespace qpid { +namespace client { + + /** + * A 'handle' used to represent an AMQP exchange in the Channel + * methods. Exchanges are the destinations to which messages are + * published. + * + * There are different types of exchange (the standard types are + * available as static constants, see DIRECT_EXCHANGE, + * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to + * an exchange using Channel::bind() and messages published to + * that exchange are then routed to the queue based on the details + * of the binding and the type of exchange. + * + * There are some standard exchange instances that are predeclared + * on all AMQP brokers. These are defined as static members + * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and + * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange + * (member DEFAULT_EXCHANGE) which is nameless and of type + * 'direct' and has every declared queue bound to it by queue + * name. + * + * \ingroup clientapi + */ + class Exchange{ + const std::string name; + const std::string type; + + public: + /** + * A direct exchange routes messages published with routing + * key X to any queue bound with key X (i.e. an exact match is + * used). + */ + static const std::string DIRECT_EXCHANGE; + /** + * A topic exchange treat the key with which a queue is bound + * as a pattern and routes all messages whose routing keys + * match that pattern to the bound queue. The routing key for + * a message must consist of zero or more alpha-numeric words + * delimited by dots. The pattern is of a similar form but * + * can be used to match excatly one word and # can be used to + * match zero or more words. + */ + static const std::string TOPIC_EXCHANGE; + /** + * The headers exchange routes messages based on whether their + * headers match the binding arguments specified when + * binding. (see the AMQP spec for more details). + */ + static const std::string HEADERS_EXCHANGE; + + /** + * The 'default' exchange, nameless and of type 'direct'. Has + * every declared queue bound to it by name. + */ + static const Exchange DEFAULT_EXCHANGE; + /** + * The standard direct exchange, named amq.direct. + */ + static const Exchange STANDARD_DIRECT_EXCHANGE; + /** + * The standard topic exchange, named amq.topic. + */ + static const Exchange STANDARD_TOPIC_EXCHANGE; + /** + * The standard headers exchange, named amq.header. + */ + static const Exchange STANDARD_HEADERS_EXCHANGE; + + Exchange(std::string name, std::string type = DIRECT_EXCHANGE); + const std::string& getName() const; + const std::string& getType() const; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h new file mode 100644 index 0000000000..dab7f3c8d8 --- /dev/null +++ b/cpp/src/qpid/client/Message.h @@ -0,0 +1,86 @@ +#ifndef _client_Message_h +#define _client_Message_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include "qpid/client/Session.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/TransferContent.h" + +namespace qpid { +namespace client { + +/** + * A representation of messages for sent or recived through the + * client api. + * + * \ingroup clientapi + */ +class Message : public framing::TransferContent +{ +public: + Message(const std::string& data_=std::string()) : TransferContent(data_) {} + + std::string getDestination() const + { + return method.getDestination(); + } + + bool isRedelivered() const + { + return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); + } + + void setRedelivered(bool redelivered) + { + getDeliveryProperties().setRedelivered(redelivered); + } + + framing::FieldTable& getHeaders() + { + return getMessageProperties().getApplicationHeaders(); + } + + void acknowledge(Session& session, bool cumulative = true, bool send = true) const + { + session.execution().completed(id, cumulative, send); + } + + Message(const framing::FrameSet& frameset) : method(*frameset.as()), id(frameset.getId()) + { + populate(frameset); + } + + const framing::MessageTransferBody& getMethod() const + { + return method; + } + +private: + //method and id are only set for received messages: + const framing::MessageTransferBody method; + const framing::SequenceNumber id; +}; + +}} + +#endif /*!_client_Message_h*/ diff --git a/cpp/src/qpid/client/MessageListener.h b/cpp/src/qpid/client/MessageListener.h index 501862a3ef..86e5dd63dc 100644 --- a/cpp/src/qpid/client/MessageListener.h +++ b/cpp/src/qpid/client/MessageListener.h @@ -23,7 +23,7 @@ #ifndef _MessageListener_ #define _MessageListener_ -#include "ClientMessage.h" +#include "Message.h" namespace qpid { namespace client { diff --git a/cpp/src/qpid/client/Queue.cpp b/cpp/src/qpid/client/Queue.cpp new file mode 100644 index 0000000000..1752a48a3a --- /dev/null +++ b/cpp/src/qpid/client/Queue.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Queue.h" + +qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} + +qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} + +qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} + +qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) + : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} + +const std::string& qpid::client::Queue::getName() const{ + return name; +} + +void qpid::client::Queue::setName(const std::string& _name){ + name = _name; +} + +bool qpid::client::Queue::isAutoDelete() const{ + return autodelete; +} + +bool qpid::client::Queue::isExclusive() const{ + return exclusive; +} + +bool qpid::client::Queue::isDurable() const{ + return durable; +} + +void qpid::client::Queue::setDurable(bool _durable){ + durable = _durable; +} + + + + diff --git a/cpp/src/qpid/client/Queue.h b/cpp/src/qpid/client/Queue.h new file mode 100644 index 0000000000..9ab8c70b08 --- /dev/null +++ b/cpp/src/qpid/client/Queue.h @@ -0,0 +1,103 @@ +#ifndef _client_Queue_h +#define _client_Queue_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include + +namespace qpid { +namespace client { + + /** + * A 'handle' used to represent an AMQP queue in the Channel + * methods. Creating an instance of this class does not cause the + * queue to be created on the broker. Rather, an instance of this + * class should be passed to Channel::declareQueue() to ensure + * that the queue exists or is created. + * + * Queues hold messages and allow clients to consume + * (see Channel::consume()) or get (see Channel::get()) those messags. A + * queue receives messages by being bound to one or more Exchange; + * messages published to that exchange may then be routed to the + * queue based on the details of the binding and the type of the + * exchange (see Channel::bind()). + * + * Queues are identified by a name. They can be exclusive (in which + * case they can only be used in the context of the connection + * over which they were declared, and are deleted when then + * connection closes), or they can be shared. Shared queues can be + * auto deleted when they have no consumers. + * + * We use the term 'temporary queue' to refer to an exclusive + * queue. + * + * \ingroup clientapi + */ + class Queue{ + std::string name; + const bool autodelete; + const bool exclusive; + bool durable; + + public: + + /** + * Creates an unnamed, non-durable, temporary queue. A name + * will be assigned to this queue instance by a call to + * Channel::declareQueue(). + */ + Queue(); + /** + * Creates a shared, non-durable, queue with a given name, + * that will not be autodeleted. + * + * @param name the name of the queue + */ + Queue(std::string name); + /** + * Creates a non-durable queue with a given name. + * + * @param name the name of the queue + * + * @param temp if true the queue will be a temporary queue, if + * false it will be shared and not autodeleted. + */ + Queue(std::string name, bool temp); + /** + * This constructor allows the autodelete, exclusive and + * durable propeties to be explictly set. Note however that if + * exclusive is true, autodelete has no meaning as exclusive + * queues are always destroyed when the connection that + * created them is closed. + */ + Queue(std::string name, bool autodelete, bool exclusive, bool durable); + const std::string& getName() const; + void setName(const std::string&); + bool isAutoDelete() const; + bool isExclusive() const; + bool isDurable() const; + void setDurable(bool durable); + }; + +} +} + +#endif /*!_client_Queue_h*/ -- cgit v1.2.1