diff options
| author | Gordon Sim <gsim@apache.org> | 2007-08-02 18:09:48 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-08-02 18:09:48 +0000 |
| commit | 89aa36d093182e9e191c000504c174663932458f (patch) | |
| tree | 06d7e9a3feb4abdaab74b79c94e4352dfa40adaa /cpp/src/qpid/client/ClientChannel.h | |
| parent | 2290d4ed915f1202bcd6cd50b1a85f27f3eb6cd2 (diff) | |
| download | qpid-python-89aa36d093182e9e191c000504c174663932458f.tar.gz | |
Some restructuring of the client code:
* Introduced three separate 'handlers' for the connection, channel and execution 'layers'.
* Support for asynchronous retrieval of response or completion status.
* Channel methods no longer included in execution layers command id count.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.h')
| -rw-r--r-- | cpp/src/qpid/client/ClientChannel.h | 97 |
1 files changed, 37 insertions, 60 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index fc82fb41ff..4853603281 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -26,10 +26,12 @@ #include "ClientExchange.h" #include "ClientMessage.h" #include "ClientQueue.h" -#include "ResponseHandler.h" +#include "ChannelHandler.h" +#include "ExecutionHandler.h" +#include "FutureFactory.h" #include "qpid/Exception.h" -#include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "AckMode.h" @@ -54,19 +56,23 @@ class ReturnedMessageHandler; * * \ingroup clientapi */ -class Channel : public framing::ChannelAdapter +class Channel : private sys::Runnable { private: struct UnknownMethod {}; typedef shared_ptr<framing::AMQMethodBody> MethodPtr; + + struct Consumer{ + MessageListener* listener; + AckMode ackMode; + int count; + u_int64_t lastDeliveryTag; + }; + typedef std::map<std::string, Consumer> ConsumerMap; mutable sys::Mutex lock; - boost::scoped_ptr<MessageChannel> messaging; Connection* connection; sys::Thread dispatcher; - ResponseHandler responses; - sys::Monitor outgoingMonitor; - framing::Window outgoing; uint16_t prefetch; const bool transactional; @@ -78,32 +84,29 @@ class Channel : public framing::ChannelAdapter sys::Mutex stopLock; bool running; - void stop(); + ConsumerMap consumers; + ExecutionHandler executionHandler; + ChannelHandler channelHandler; + framing::ChannelId channelId; + BlockingQueue<ReceivedContent::shared_ptr> gets; + FutureFactory futures; - void handleHeader(framing::AMQHeaderBody::shared_ptr body); - void handleContent(framing::AMQContentBody::shared_ptr body); - void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); - void handleMethodInContext( - framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); - void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt); - void handleConnection(framing::AMQMethodBody::shared_ptr method); - void handleExecution(framing::AMQMethodBody::shared_ptr method); + void stop(); void setQos(); - - void protocolInit( - const std::string& uid, const std::string& pwd, - const std::string& vhost); framing::AMQMethodBody::shared_ptr sendAndReceive( framing::AMQMethodBody::shared_ptr, - framing::ClassId, framing::MethodId); + framing::ClassId = 0, framing::MethodId = 0); framing::AMQMethodBody::shared_ptr sendAndReceiveSync( bool sync, framing::AMQMethodBody::shared_ptr, framing::ClassId, framing::MethodId); + void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body); + + template <class BodyType> boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) { return boost::shared_polymorphic_downcast<BodyType>( @@ -118,21 +121,16 @@ class Channel : public framing::ChannelAdapter sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } - void sendCommand(framing::AMQBody::shared_ptr body); - void open(framing::ChannelId, Connection&); void closeInternal(); - void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); - bool waitForCompletion(framing::SequenceNumber, sys::Duration); - + void peerClose(uint16_t, const std::string&); + // FIXME aconway 2007-02-23: Get rid of friendships. - friend class Connection; - friend class BasicMessageChannel; // for sendAndReceive. - friend class MessageMessageChannel; // for sendAndReceive. + friend class Connection; + friend class BasicMessageChannel; // for sendAndReceive. + friend class MessageMessageChannel; // for sendAndReceive. public: - enum InteropMode { AMQP_08, AMQP_09 }; - /** * Creates a channel object. * @@ -143,16 +141,10 @@ class Channel : public framing::ChannelAdapter * @param prefetch specifies the number of unacknowledged * messages the channel is willing to have sent to it * asynchronously - * - * @param messageImpl Alternate messaging implementation class to - * allow alternate protocol implementations of messaging - * operations. Takes ownership. */ - Channel( - bool transactional = false, u_int16_t prefetch = 500, - InteropMode=AMQP_08); + Channel(bool transactional = false, u_int16_t prefetch = 500); - ~Channel(); + ~Channel(); /** * Declares an exchange. @@ -254,12 +246,10 @@ class Channel : public framing::ChannelAdapter void start(); /** - * Close the channel with optional error information. - * Closing a channel that is not open has no effect. + * Close the channel. Closing a channel that is not open has no + * effect. */ - void close( - framing::ReplyCode = 200, const std::string& ="OK", - framing::ClassId = 0, framing::MethodId = 0); + void close(); /** True if the channel is transactional */ bool isTransactional() { return transactional; } @@ -301,7 +291,7 @@ class Channel : public framing::ChannelAdapter * is received from the broker */ void consume( - Queue& queue, std::string& tag, MessageListener* listener, + Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, const framing::FieldTable* fields = 0); @@ -353,22 +343,9 @@ class Channel : public framing::ChannelAdapter bool mandatory = false, bool immediate = false); /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - /** - * Deliver messages from the broker to the appropriate MessageListener. + * Deliver incoming messages to the appropriate MessageListener. */ void run(); - - /** - * TESTING ONLY FOR NOW! - */ - bool synchWithServer(sys::Duration timeout = 0); }; }} |
