summaryrefslogtreecommitdiff
path: root/cpp/lib/client/ClientChannel.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
committerAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
commit5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch)
treef9a982b65400154a86edd02faf75da143a96404c /cpp/lib/client/ClientChannel.h
parent5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff)
downloadqpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side. * Improved various exception messages with boost::format messages. * Removed unnecssary virtual inheritance. * Widespread: fixed incorrect non-const ProtocolVersion& parameters. * Client API: pass channels by reference, not pointer. * codegen: - MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template. - Various: fixed non-const ProtocolVersion& parameters. * cpp/bootstrap: Allow config arguments with -build. * cpp/gen/Makefile.am: Merged codegen fixes from trunk. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.h')
-rw-r--r--cpp/lib/client/ClientChannel.h550
1 files changed, 296 insertions, 254 deletions
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index e7bab8b4ee..67274ddfc4 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -27,7 +27,6 @@
#include "sys/types.h"
#include <framing/amqp_framing.h>
-#include <Connection.h>
#include <ClientExchange.h>
#include <IncomingMessage.h>
#include <ClientMessage.h>
@@ -35,86 +34,126 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <ReturnedMessageHandler.h>
+#include "Runnable.h"
+#include "ChannelAdapter.h"
+#include "Thread.h"
namespace qpid {
+namespace framing {
+class ChannelCloseBody;
+}
+
namespace client {
- /**
- * The available acknowledgements modes
- *
- * \ingroup clientapi
- */
- enum ack_modes {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
+
+class Connection;
+
+/**
+ * The available acknowledgements modes
+ *
+ * \ingroup clientapi
+ */
+enum ack_modes {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+/**
+ * Represents an AMQP channel, i.e. loosely a session of work. It
+ * is through a channel that most of the AMQP 'methods' are
+ * exposed.
+ *
+ * \ingroup clientapi
+ */
+class Channel : public framing::ChannelAdapter,
+ public sys::Runnable
+{
+ struct Consumer{
+ MessageListener* listener;
+ int ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
};
+ typedef std::map<std::string, Consumer> ConsumerMap;
+ static const std::string OK;
+
+ Connection* connection;
+ sys::Thread dispatcher;
+ IncomingMessage* incoming;
+ ResponseHandler responses;
+ std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+ IncomingMessage* retrieved;//holds response to basic.get
+ sys::Monitor dispatchMonitor;
+ sys::Monitor retrievalMonitor;
+ ConsumerMap consumers;
+ ReturnedMessageHandler* returnsHandler;
- /**
- * Represents an AMQP channel, i.e. loosely a session of work. It
- * is through a channel that most of the AMQP 'methods' are
- * exposed.
- *
- * \ingroup clientapi
- */
- class Channel : private virtual framing::BodyHandler,
- public virtual sys::Runnable
- {
- struct Consumer{
- MessageListener* listener;
- int ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string,Consumer*>::iterator consumer_iterator;
+ u_int16_t prefetch;
+ const bool transactional;
+ framing::ProtocolVersion version;
+
+ void enqueue();
+ void retrieve(Message& msg);
+ IncomingMessage* dequeue();
+ void dispatch();
+ void deliver(Consumer& consumer, Message& msg);
+
+ void handleHeader(framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
+
+ void handleMethodInContext(
+ framing::AMQMethodBody::shared_ptr,
+ const framing::MethodContext& method);
+ void setQos();
+ void cancelAll();
- u_int16_t id;
- Connection* con;
- sys::Thread dispatcher;
- framing::OutputHandler* out;
- IncomingMessage* incoming;
- ResponseHandler responses;
- std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
- IncomingMessage* retrieved;//holds response to basic.get
- sys::Monitor dispatchMonitor;
- sys::Monitor retrievalMonitor;
- std::map<std::string, Consumer*> consumers;
- ReturnedMessageHandler* returnsHandler;
- bool closed;
+ void protocolInit(
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost);
+
+ void sendAndReceive(
+ framing::AMQBody*, framing::ClassId, framing::MethodId);
- u_int16_t prefetch;
- const bool transactional;
- framing::ProtocolVersion version;
+ void sendAndReceiveSync(
+ bool sync,
+ framing::AMQBody*, framing::ClassId, framing::MethodId);
- void enqueue();
- void retrieve(Message& msg);
- IncomingMessage* dequeue();
- void dispatch();
- void stop();
- void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
- void deliver(Consumer* consumer, Message& msg);
- void setQos();
- void cancelAll();
+ template <class BodyType>
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) {
+ sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ return boost::shared_polymorphic_downcast<BodyType>(
+ responses.getResponse());
+ }
- virtual void handleMethod(framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
- void handleRequest(framing::AMQRequestBody::shared_ptr);
- void handleResponse(framing::AMQResponseBody::shared_ptr);
+ template <class BodyType> void sendAndReceiveSync(
+ bool sync, framing::AMQBody* body) {
+ sendAndReceiveSync(
+ sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
+ }
- public:
- /**
+ void open(framing::ChannelId, Connection&);
+ void closeInternal();
+ void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+
+ friend class Connection;
+
+ public:
+
+ bool isOpen() const;
+
+ /**
* Creates a channel object.
*
* @param transactional if true, the publishing and acknowledgement
@@ -124,199 +163,202 @@ namespace client {
* @param prefetch specifies the number of unacknowledged
* messages the channel is willing to have sent to it
* asynchronously
- */
+ */
Channel(bool transactional = false, u_int16_t prefetch = 500);
~Channel();
- /**
- * Declares an exchange.
- *
- * In AMQP Exchanges are the destinations to which messages
- * are published. They have Queues bound to them and route
- * messages they receive to those queues. The routing rules
- * depend on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareExchange(Exchange& exchange, bool synch = true);
- /**
- * Deletes an exchange
- *
- * @param exchange an Exchange object representing the exchange to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteExchange(Exchange& exchange, bool synch = true);
- /**
- * Declares a Queue
- *
- * @param queue a Queue object representing the queue to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareQueue(Queue& queue, bool synch = true);
- /**
- * Deletes a Queue
- *
- * @param queue a Queue object representing the queue to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
- /**
- * Binds a queue to an exchange. The exact semantics of this
- * (in particular how 'routing keys' and 'binding arguments'
- * are used) depends on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to bind to
- *
- * @param queue a Queue object representing the queue to be
- * bound
- *
- * @param key the 'routing key' for the binding
- *
- * @param args the 'binding arguments' for the binding
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const framing::FieldTable& args, bool synch = true);
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see ack_modes
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
+ /**
+ * Declares an exchange.
+ *
+ * In AMQP Exchanges are the destinations to which messages
+ * are published. They have Queues bound to them and route
+ * messages they receive to those queues. The routing rules
+ * depend on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Deletes an exchange
+ *
+ * @param exchange an Exchange object representing the exchange to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Declares a Queue
+ *
+ * @param queue a Queue object representing the queue to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareQueue(Queue& queue, bool synch = true);
+ /**
+ * Deletes a Queue
+ *
+ * @param queue a Queue object representing the queue to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+ /**
+ * Binds a queue to an exchange. The exact semantics of this
+ * (in particular how 'routing keys' and 'binding arguments'
+ * are used) depends on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to bind to
+ *
+ * @param queue a Queue object representing the queue to be
+ * bound
+ *
+ * @param key the 'routing key' for the binding
+ *
+ * @param args the 'binding arguments' for the binding
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
+ const framing::FieldTable& args, bool synch = true);
+ /**
+ * Creates a 'consumer' for a queue. Messages in (or arriving
+ * at) that queue will be delivered to consumers
+ * asynchronously.
+ *
+ * @param queue a Queue instance representing the queue to
+ * consume from
+ *
+ * @param tag an identifier to associate with the consumer
+ * that can be used to cancel its subscription (if empty, this
+ * will be assigned by the broker)
+ *
+ * @param listener a pointer to an instance of an
+ * implementation of the MessageListener interface. Messages
+ * received from this queue for this consumer will result in
+ * invocation of the received() method on the listener, with
+ * the message itself passed in.
+ *
+ * @param ackMode the mode of acknowledgement that the broker
+ * should assume for this consumer. @see ack_modes
+ *
+ * @param noLocal if true, this consumer will not be sent any
+ * message published by this connection
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * ack_modes)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
+ /**
+ * Cancels a subscription previously set up through a call to consume().
+ *
+ * @param tag the identifier used (or assigned) in the consume
+ * request that set up the subscription to be cancelled.
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void cancel(const std::string& tag, bool synch = true);
+ /**
+ * Synchronous pull of a message from a queue.
+ *
+ * @param msg a message object that will contain the message
+ * headers and content if the call completes.
+ *
+ * @param queue the queue to consume from
+ *
+ * @param ackMode the acknowledgement mode to use (@see
+ * ack_modes)
+ *
+ * @return true if a message was succcessfully dequeued from
+ * the queue, false if the queue was empty.
+ */
+ bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+ /**
+ * Publishes (i.e. sends a message to the broker).
+ *
+ * @param msg the message to publish
+ *
+ * @param exchange the exchange to publish the message to
+ *
+ * @param routingKey the routing key to publish with
+ *
+ * @param mandatory if true and the exchange to which this
+ * publish is directed has no matching bindings, the message
+ * will be returned (see setReturnedMessageHandler()).
+ *
+ * @param immediate if true and there is no consumer to
+ * receive this message on publication, the message will be
+ * returned (see setReturnedMessageHandler()).
+ */
+ void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
- /**
- * For a transactional channel this will commit all
- * publications and acknowledgements since the last commit (or
- * the channel was opened if there has been no previous
- * commit). This will cause published messages to become
- * available to consumers and acknowledged messages to be
- * consumed and removed from the queues they were dispatched
- * from.
- *
- * Transactionailty of a channel is specified when the channel
- * object is created (@see Channel()).
- */
- void commit();
- /**
- * For a transactional channel, this will rollback any
- * publications or acknowledgements. It will be as if the
- * ppblished messages were never sent and the acknowledged
- * messages were never consumed.
- */
- void rollback();
+ /**
+ * For a transactional channel this will commit all
+ * publications and acknowledgements since the last commit (or
+ * the channel was opened if there has been no previous
+ * commit). This will cause published messages to become
+ * available to consumers and acknowledged messages to be
+ * consumed and removed from the queues they were dispatched
+ * from.
+ *
+ * Transactionailty of a channel is specified when the channel
+ * object is created (@see Channel()).
+ */
+ void commit();
+ /**
+ * For a transactional channel, this will rollback any
+ * publications or acknowledgements. It will be as if the
+ * ppblished messages were never sent and the acknowledged
+ * messages were never consumed.
+ */
+ void rollback();
- /**
- * Change the prefetch in use.
- */
- void setPrefetch(u_int16_t prefetch);
+ /**
+ * Change the prefetch in use.
+ */
+ void setPrefetch(u_int16_t prefetch);
- /**
- * Start message dispatching on a new thread
- */
- void start();
- /**
- * Do message dispatching on this thread
- */
- void run();
+ /**
+ * Start message dispatching on a new thread
+ */
+ void start();
- /**
- * Closes a channel, stopping any message dispatching.
- */
- void close();
+ // TODO aconway 2007-01-26: Can it be private?
+ /**
+ * Dispatch messages on this channel in the calling thread.
+ */
+ void run();
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+ /**
+ * Close the channel with optional error information.
+ * Closing a channel that is not open has no effect.
+ */
+ void close(
+ framing::ReplyCode = 200, const std::string& =OK,
+ framing::ClassId = 0, framing::MethodId = 0);
- friend class Connection;
- };
+ /**
+ * Set a handler for this channel that will process any
+ * returned messages
+ *
+ * @see publish()
+ */
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+};
}
}