diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-22 23:23:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-22 23:23:52 +0000 |
| commit | 067f367d27bef7500410ea27c000d0ca275c748a (patch) | |
| tree | b20d2f526860870c22dbcffa3570ed347f8979ba /cpp/lib/client/ClientChannel.h | |
| parent | 20a442ea00c82b7fd9b6b7a560916f69f3155f56 (diff) | |
| download | qpid-python-067f367d27bef7500410ea27c000d0ca275c748a.tar.gz | |
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
from Channel into separate Basic class.
* cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.h')
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 194 |
1 files changed, 41 insertions, 153 deletions
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 9c422305b0..56fdd57d03 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -21,23 +21,15 @@ * under the License. * */ -#include <map> -#include <string> -#include <queue> -#include <boost/scoped_ptr.hpp> #include "sys/types.h" - #include <framing/amqp_framing.h> #include <ClientExchange.h> -#include <IncomingMessage.h> #include <ClientMessage.h> -#include <MessageListener.h> #include <ClientQueue.h> #include <ResponseHandler.h> -#include <ReturnedMessageHandler.h> -#include "Runnable.h" #include "ChannelAdapter.h" #include "Thread.h" +#include "Basic.h" namespace qpid { @@ -50,27 +42,6 @@ namespace client { class Connection; -/** - * The available acknowledgements modes - * - * \ingroup clientapi - */ -enum ack_modes { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; /** * Represents an AMQP channel, i.e. loosely a session of work. It @@ -79,41 +50,34 @@ enum ack_modes { * * \ingroup clientapi */ -class Channel : public framing::ChannelAdapter, - public sys::Runnable +class Channel : public framing::ChannelAdapter { - struct Consumer{ - MessageListener* listener; - int ackMode; - int count; - u_int64_t lastDeliveryTag; - }; - typedef std::map<std::string, Consumer> ConsumerMap; + private: + // TODO aconway 2007-02-22: Remove friendship. + friend class Basic; + // FIXME aconway 2007-02-22: friend class Message; + struct UnknownMethod {}; + sys::Mutex lock; + Basic basic; Connection* connection; - sys::Thread dispatcher; - IncomingMessage incoming; + sys::Thread basicDispatcher; ResponseHandler responses; - ConsumerMap consumers; - ReturnedMessageHandler* returnsHandler; u_int16_t prefetch; const bool transactional; framing::ProtocolVersion version; - void retrieve(Message& msg); - void deliver(Consumer& consumer, Message& msg); - void handleHeader(framing::AMQHeaderBody::shared_ptr body); void handleContent(framing::AMQContentBody::shared_ptr body); void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); - void handleMethodInContext( - framing::AMQMethodBody::shared_ptr, - const framing::MethodContext& method); + framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); + void handleChannel(framing::AMQMethodBody::shared_ptr method); + void handleConnection(framing::AMQMethodBody::shared_ptr method); + void setQos(); - void cancelAll(); void protocolInit( const std::string& uid, const std::string& pwd, @@ -148,18 +112,18 @@ class Channel : public framing::ChannelAdapter, public: /** - * Creates a channel object. - * - * @param transactional if true, the publishing and acknowledgement - * of messages will be transactional and can be committed or - * aborted in atomic units (@see commit(), @see rollback()) - * - * @param prefetch specifies the number of unacknowledged - * messages the channel is willing to have sent to it - * asynchronously + * Creates a channel object. + * + * @param transactional if true, the publishing and acknowledgement + * of messages will be transactional and can be committed or + * aborted in atomic units (@see commit(), @see rollback()) + * + * @param prefetch specifies the number of unacknowledged + * messages the channel is willing to have sent to it + * asynchronously */ - Channel(bool transactional = false, u_int16_t prefetch = 500); - ~Channel(); + Channel(bool transactional = false, u_int16_t prefetch = 500); + ~Channel(); /** * Declares an exchange. @@ -221,85 +185,16 @@ class Channel : public framing::ChannelAdapter, * @param synch if true this call will block until a response * is received from the broker */ - void bind(const Exchange& exchange, const Queue& queue, const std::string& key, - const framing::FieldTable& args, bool synch = true); - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see ack_modes - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - int ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(const std::string& tag, bool synch = true); + void bind(const Exchange& exchange, const Queue& queue, + const std::string& key, const framing::FieldTable& args, + bool synch = true); + /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * ack_modes) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. + * Get a Basic object which provides functions to send and + * receive messages using the AMQP 0-8 Basic class methods. + *@see Basic */ - bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); + Basic& getBasic() { return basic; } /** * For a transactional channel this will commit all @@ -314,6 +209,7 @@ class Channel : public framing::ChannelAdapter, * object is created (@see Channel()). */ void commit(); + /** * For a transactional channel, this will rollback any * publications or acknowledgements. It will be as if the @@ -327,33 +223,25 @@ class Channel : public framing::ChannelAdapter, */ void setPrefetch(u_int16_t prefetch); + u_int16_t getPrefetch() { return prefetch; } + /** * Start message dispatching on a new thread */ void start(); - // TODO aconway 2007-01-26: Can it be private? - /** - * Dispatch messages on this channel in the calling thread. - */ - void run(); - /** * Close the channel with optional error information. * Closing a channel that is not open has no effect. */ void close( framing::ReplyCode = 200, const std::string& ="OK", - framing::ClassId = 0, framing::MethodId = 0); - - /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); + framing::ClassId = 0, framing::MethodId = 0); + /** True if the channel is transactional */ + bool isTransactional() { return transactional; } + + /** True if the channel is open */ bool isOpen() const; }; |
