summaryrefslogtreecommitdiff
path: root/cpp/lib/client/ClientChannel.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-22 23:23:52 +0000
committerAlan Conway <aconway@apache.org>2007-02-22 23:23:52 +0000
commit067f367d27bef7500410ea27c000d0ca275c748a (patch)
treeb20d2f526860870c22dbcffa3570ed347f8979ba /cpp/lib/client/ClientChannel.h
parent20a442ea00c82b7fd9b6b7a560916f69f3155f56 (diff)
downloadqpid-python-067f367d27bef7500410ea27c000d0ca275c748a.tar.gz
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
from Channel into separate Basic class. * cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.h')
-rw-r--r--cpp/lib/client/ClientChannel.h194
1 files changed, 41 insertions, 153 deletions
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index 9c422305b0..56fdd57d03 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -21,23 +21,15 @@
* under the License.
*
*/
-#include <map>
-#include <string>
-#include <queue>
-#include <boost/scoped_ptr.hpp>
#include "sys/types.h"
-
#include <framing/amqp_framing.h>
#include <ClientExchange.h>
-#include <IncomingMessage.h>
#include <ClientMessage.h>
-#include <MessageListener.h>
#include <ClientQueue.h>
#include <ResponseHandler.h>
-#include <ReturnedMessageHandler.h>
-#include "Runnable.h"
#include "ChannelAdapter.h"
#include "Thread.h"
+#include "Basic.h"
namespace qpid {
@@ -50,27 +42,6 @@ namespace client {
class Connection;
-/**
- * The available acknowledgements modes
- *
- * \ingroup clientapi
- */
-enum ack_modes {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
-};
/**
* Represents an AMQP channel, i.e. loosely a session of work. It
@@ -79,41 +50,34 @@ enum ack_modes {
*
* \ingroup clientapi
*/
-class Channel : public framing::ChannelAdapter,
- public sys::Runnable
+class Channel : public framing::ChannelAdapter
{
- struct Consumer{
- MessageListener* listener;
- int ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string, Consumer> ConsumerMap;
+ private:
+ // TODO aconway 2007-02-22: Remove friendship.
+ friend class Basic;
+ // FIXME aconway 2007-02-22: friend class Message;
+ struct UnknownMethod {};
+
sys::Mutex lock;
+ Basic basic;
Connection* connection;
- sys::Thread dispatcher;
- IncomingMessage incoming;
+ sys::Thread basicDispatcher;
ResponseHandler responses;
- ConsumerMap consumers;
- ReturnedMessageHandler* returnsHandler;
u_int16_t prefetch;
const bool transactional;
framing::ProtocolVersion version;
- void retrieve(Message& msg);
- void deliver(Consumer& consumer, Message& msg);
-
void handleHeader(framing::AMQHeaderBody::shared_ptr body);
void handleContent(framing::AMQContentBody::shared_ptr body);
void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
-
void handleMethodInContext(
- framing::AMQMethodBody::shared_ptr,
- const framing::MethodContext& method);
+ framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
+ void handleChannel(framing::AMQMethodBody::shared_ptr method);
+ void handleConnection(framing::AMQMethodBody::shared_ptr method);
+
void setQos();
- void cancelAll();
void protocolInit(
const std::string& uid, const std::string& pwd,
@@ -148,18 +112,18 @@ class Channel : public framing::ChannelAdapter,
public:
/**
- * Creates a channel object.
- *
- * @param transactional if true, the publishing and acknowledgement
- * of messages will be transactional and can be committed or
- * aborted in atomic units (@see commit(), @see rollback())
- *
- * @param prefetch specifies the number of unacknowledged
- * messages the channel is willing to have sent to it
- * asynchronously
+ * Creates a channel object.
+ *
+ * @param transactional if true, the publishing and acknowledgement
+ * of messages will be transactional and can be committed or
+ * aborted in atomic units (@see commit(), @see rollback())
+ *
+ * @param prefetch specifies the number of unacknowledged
+ * messages the channel is willing to have sent to it
+ * asynchronously
*/
- Channel(bool transactional = false, u_int16_t prefetch = 500);
- ~Channel();
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
+ ~Channel();
/**
* Declares an exchange.
@@ -221,85 +185,16 @@ class Channel : public framing::ChannelAdapter,
* @param synch if true this call will block until a response
* is received from the broker
*/
- void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const framing::FieldTable& args, bool synch = true);
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see ack_modes
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(const std::string& tag, bool synch = true);
+ void bind(const Exchange& exchange, const Queue& queue,
+ const std::string& key, const framing::FieldTable& args,
+ bool synch = true);
+
/**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * ack_modes)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
+ * Get a Basic object which provides functions to send and
+ * receive messages using the AMQP 0-8 Basic class methods.
+ *@see Basic
*/
- bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
+ Basic& getBasic() { return basic; }
/**
* For a transactional channel this will commit all
@@ -314,6 +209,7 @@ class Channel : public framing::ChannelAdapter,
* object is created (@see Channel()).
*/
void commit();
+
/**
* For a transactional channel, this will rollback any
* publications or acknowledgements. It will be as if the
@@ -327,33 +223,25 @@ class Channel : public framing::ChannelAdapter,
*/
void setPrefetch(u_int16_t prefetch);
+ u_int16_t getPrefetch() { return prefetch; }
+
/**
* Start message dispatching on a new thread
*/
void start();
- // TODO aconway 2007-01-26: Can it be private?
- /**
- * Dispatch messages on this channel in the calling thread.
- */
- void run();
-
/**
* Close the channel with optional error information.
* Closing a channel that is not open has no effect.
*/
void close(
framing::ReplyCode = 200, const std::string& ="OK",
- framing::ClassId = 0, framing::MethodId = 0);
-
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+ framing::ClassId = 0, framing::MethodId = 0);
+ /** True if the channel is transactional */
+ bool isTransactional() { return transactional; }
+
+ /** True if the channel is open */
bool isOpen() const;
};