summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.h
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-02 18:09:48 +0000
committerGordon Sim <gsim@apache.org>2007-08-02 18:09:48 +0000
commit89aa36d093182e9e191c000504c174663932458f (patch)
tree06d7e9a3feb4abdaab74b79c94e4352dfa40adaa /cpp/src/qpid/client/ClientChannel.h
parent2290d4ed915f1202bcd6cd50b1a85f27f3eb6cd2 (diff)
downloadqpid-python-89aa36d093182e9e191c000504c174663932458f.tar.gz
Some restructuring of the client code:
* Introduced three separate 'handlers' for the connection, channel and execution 'layers'. * Support for asynchronous retrieval of response or completion status. * Channel methods no longer included in execution layers command id count. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.h')
-rw-r--r--cpp/src/qpid/client/ClientChannel.h97
1 files changed, 37 insertions, 60 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index fc82fb41ff..4853603281 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -26,10 +26,12 @@
#include "ClientExchange.h"
#include "ClientMessage.h"
#include "ClientQueue.h"
-#include "ResponseHandler.h"
+#include "ChannelHandler.h"
+#include "ExecutionHandler.h"
+#include "FutureFactory.h"
#include "qpid/Exception.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "AckMode.h"
@@ -54,19 +56,23 @@ class ReturnedMessageHandler;
*
* \ingroup clientapi
*/
-class Channel : public framing::ChannelAdapter
+class Channel : private sys::Runnable
{
private:
struct UnknownMethod {};
typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
+
+ struct Consumer{
+ MessageListener* listener;
+ AckMode ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+ typedef std::map<std::string, Consumer> ConsumerMap;
mutable sys::Mutex lock;
- boost::scoped_ptr<MessageChannel> messaging;
Connection* connection;
sys::Thread dispatcher;
- ResponseHandler responses;
- sys::Monitor outgoingMonitor;
- framing::Window outgoing;
uint16_t prefetch;
const bool transactional;
@@ -78,32 +84,29 @@ class Channel : public framing::ChannelAdapter
sys::Mutex stopLock;
bool running;
- void stop();
+ ConsumerMap consumers;
+ ExecutionHandler executionHandler;
+ ChannelHandler channelHandler;
+ framing::ChannelId channelId;
+ BlockingQueue<ReceivedContent::shared_ptr> gets;
+ FutureFactory futures;
- void handleHeader(framing::AMQHeaderBody::shared_ptr body);
- void handleContent(framing::AMQContentBody::shared_ptr body);
- void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
- void handleMethodInContext(
- framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
- void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
- void handleConnection(framing::AMQMethodBody::shared_ptr method);
- void handleExecution(framing::AMQMethodBody::shared_ptr method);
+ void stop();
void setQos();
-
- void protocolInit(
- const std::string& uid, const std::string& pwd,
- const std::string& vhost);
framing::AMQMethodBody::shared_ptr sendAndReceive(
framing::AMQMethodBody::shared_ptr,
- framing::ClassId, framing::MethodId);
+ framing::ClassId = 0, framing::MethodId = 0);
framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
framing::AMQMethodBody::shared_ptr,
framing::ClassId, framing::MethodId);
+ void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body);
+
+
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
@@ -118,21 +121,16 @@ class Channel : public framing::ChannelAdapter
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
- void sendCommand(framing::AMQBody::shared_ptr body);
-
void open(framing::ChannelId, Connection&);
void closeInternal();
- void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
- bool waitForCompletion(framing::SequenceNumber, sys::Duration);
-
+ void peerClose(uint16_t, const std::string&);
+
// FIXME aconway 2007-02-23: Get rid of friendships.
- friend class Connection;
- friend class BasicMessageChannel; // for sendAndReceive.
- friend class MessageMessageChannel; // for sendAndReceive.
+ friend class Connection;
+ friend class BasicMessageChannel; // for sendAndReceive.
+ friend class MessageMessageChannel; // for sendAndReceive.
public:
- enum InteropMode { AMQP_08, AMQP_09 };
-
/**
* Creates a channel object.
*
@@ -143,16 +141,10 @@ class Channel : public framing::ChannelAdapter
* @param prefetch specifies the number of unacknowledged
* messages the channel is willing to have sent to it
* asynchronously
- *
- * @param messageImpl Alternate messaging implementation class to
- * allow alternate protocol implementations of messaging
- * operations. Takes ownership.
*/
- Channel(
- bool transactional = false, u_int16_t prefetch = 500,
- InteropMode=AMQP_08);
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
- ~Channel();
+ ~Channel();
/**
* Declares an exchange.
@@ -254,12 +246,10 @@ class Channel : public framing::ChannelAdapter
void start();
/**
- * Close the channel with optional error information.
- * Closing a channel that is not open has no effect.
+ * Close the channel. Closing a channel that is not open has no
+ * effect.
*/
- void close(
- framing::ReplyCode = 200, const std::string& ="OK",
- framing::ClassId = 0, framing::MethodId = 0);
+ void close();
/** True if the channel is transactional */
bool isTransactional() { return transactional; }
@@ -301,7 +291,7 @@ class Channel : public framing::ChannelAdapter
* is received from the broker
*/
void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
+ Queue& queue, const std::string& tag, MessageListener* listener,
AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
const framing::FieldTable* fields = 0);
@@ -353,22 +343,9 @@ class Channel : public framing::ChannelAdapter
bool mandatory = false, bool immediate = false);
/**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- /**
- * Deliver messages from the broker to the appropriate MessageListener.
+ * Deliver incoming messages to the appropriate MessageListener.
*/
void run();
-
- /**
- * TESTING ONLY FOR NOW!
- */
- bool synchWithServer(sys::Duration timeout = 0);
};
}}