summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-04-13 20:58:27 +0000
committerAlan Conway <aconway@apache.org>2007-04-13 20:58:27 +0000
commitca3a7cd64822e874076bd23e9981af077eb47b03 (patch)
tree677b7d1a4940d10bbb7874a5138c9c2dd45429a7 /cpp/src/qpid/client
parentee865f87027fb559d8884cca3f672a8cbdd44ae0 (diff)
downloadqpid-python-ca3a7cd64822e874076bd23e9981af077eb47b03.tar.gz
Moved src/ source code to src/qpid directory:
- allows rhm package to build consistently against checked-out or installed qpid. - consistent correspondence between source paths and C++ namespaces. - consistent use of #include <qpid/foo> in source and by users. - allows header files to split over multiple directories, e.g. separating generated code, separating public API from private files. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@528668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/AckMode.h102
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp345
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.h90
-rw-r--r--cpp/src/qpid/client/ClientAdapter.cpp70
-rw-r--r--cpp/src/qpid/client/ClientAdapter.h66
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp341
-rw-r--r--cpp/src/qpid/client/ClientChannel.h356
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp156
-rw-r--r--cpp/src/qpid/client/ClientExchange.cpp34
-rw-r--r--cpp/src/qpid/client/ClientExchange.h106
-rw-r--r--cpp/src/qpid/client/ClientMessage.h64
-rw-r--r--cpp/src/qpid/client/ClientQueue.cpp58
-rw-r--r--cpp/src/qpid/client/ClientQueue.h103
-rw-r--r--cpp/src/qpid/client/Connection.h179
-rw-r--r--cpp/src/qpid/client/Connector.cpp188
-rw-r--r--cpp/src/qpid/client/Connector.h98
-rw-r--r--cpp/src/qpid/client/IncomingMessage.cpp168
-rw-r--r--cpp/src/qpid/client/IncomingMessage.h136
-rw-r--r--cpp/src/qpid/client/MessageChannel.h94
-rw-r--r--cpp/src/qpid/client/MessageListener.cpp24
-rw-r--r--cpp/src/qpid/client/MessageListener.h49
-rw-r--r--cpp/src/qpid/client/MessageMessageChannel.cpp431
-rw-r--r--cpp/src/qpid/client/MessageMessageChannel.h82
-rw-r--r--cpp/src/qpid/client/MethodBodyInstances.h100
-rw-r--r--cpp/src/qpid/client/ResponseHandler.cpp79
-rw-r--r--cpp/src/qpid/client/ResponseHandler.h75
-rw-r--r--cpp/src/qpid/client/ReturnedMessageHandler.cpp24
-rw-r--r--cpp/src/qpid/client/ReturnedMessageHandler.h49
28 files changed, 3667 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/AckMode.h b/cpp/src/qpid/client/AckMode.h
new file mode 100644
index 0000000000..9ad5ef925c
--- /dev/null
+++ b/cpp/src/qpid/client/AckMode.h
@@ -0,0 +1,102 @@
+#ifndef _client_AckMode_h
+#define _client_AckMode_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * The available acknowledgements modes.
+ *
+ * \ingroup clientapi
+ */
+enum AckMode {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_AckMode_h*/
+#ifndef _client_AckMode_h
+#define _client_AckMode_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * The available acknowledgements modes.
+ *
+ * \ingroup clientapi
+ */
+enum AckMode {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_AckMode_h*/
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
new file mode 100644
index 0000000000..96aaad05dc
--- /dev/null
+++ b/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -0,0 +1,345 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "BasicMessageChannel.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "qpid/framing/FieldTable.h"
+#include "Connection.h"
+#include <queue>
+#include <iostream>
+#include <boost/format.hpp>
+#include <boost/variant.hpp>
+
+namespace qpid {
+namespace client {
+
+using namespace std;
+using namespace sys;
+using namespace framing;
+using boost::format;
+
+namespace {
+
+// Destination name constants
+const std::string BASIC_GET("__basic_get__");
+const std::string BASIC_RETURN("__basic_return__");
+
+// Reference name constant
+const std::string BASIC_REF("__basic_reference__");
+}
+
+BasicMessageChannel::BasicMessageChannel(Channel& ch)
+ : channel(ch), returnsHandler(0)
+{
+ incoming.addDestination(BASIC_RETURN, destDispatch);
+}
+
+void BasicMessageChannel::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+ {
+ // Note we create a consumer even if tag="". In that case
+ // It will be renamed when we handle BasicConsumeOkBody.
+ //
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+
+ // FIXME aconway 2007-03-23: get processed in both.
+
+ // BasicConsumeOkBody is really processed in handle(), here
+ // we just pick up the tag to return to the user.
+ //
+ // We can't process it here because messages for the consumer may
+ // already be arriving.
+ //
+ BasicConsumeOkBody::shared_ptr ok =
+ channel.sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ make_shared_ptr(new BasicConsumeBody(
+ channel.version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable())));
+ tag = ok->getConsumerTag();
+}
+
+
+void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.sendAndReceiveSync<BasicCancelOkBody>(
+ synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
+}
+
+void BasicMessageChannel::close(){
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ destGet.shutdown();
+ destDispatch.shutdown();
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
+ }
+}
+
+
+bool BasicMessageChannel::get(
+ Message& msg, const Queue& queue, AckMode ackMode)
+{
+ // Prepare for incoming response
+ incoming.addDestination(BASIC_GET, destGet);
+ channel.send(
+ new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ bool got = destGet.wait(msg);
+ return got;
+}
+
+void BasicMessageChannel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ const string e = exchange.getName();
+ string key = routingKey;
+
+ // Make a header for the message
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ BasicHeaderProperties::copy(
+ *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
+ header->setContentSize(msg.getData().size());
+
+ channel.send(
+ new BasicPublishBody(
+ channel.version, 0, e, key, mandatory, immediate));
+ channel.send(header);
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ channel.send(new AMQContentBody(data));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ channel.send(new AMQContentBody(frag));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
+
+void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
+ assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
+ switch(method->amqpMethodId()) {
+ case BasicGetOkBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_GET, BASIC_REF);
+ return;
+ }
+ case BasicGetEmptyBody::METHOD_ID: {
+ incoming.getDestination(BASIC_GET).empty();
+ incoming.removeDestination(BASIC_GET);
+ return;
+ }
+ case BasicDeliverBody::METHOD_ID: {
+ BasicDeliverBody::shared_ptr deliver=
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(method);
+ incoming.openReference(BASIC_REF);
+ Message& msg = incoming.createMessage(
+ deliver->getConsumerTag(), BASIC_REF);
+ msg.setDestination(deliver->getConsumerTag());
+ msg.setDeliveryTag(deliver->getDeliveryTag());
+ msg.setRedelivered(deliver->getRedelivered());
+ return;
+ }
+ case BasicReturnBody::METHOD_ID: {
+ incoming.openReference(BASIC_REF);
+ incoming.createMessage(BASIC_RETURN, BASIC_REF);
+ return;
+ }
+ case BasicConsumeOkBody::METHOD_ID: {
+ Mutex::ScopedLock l(lock);
+ BasicConsumeOkBody::shared_ptr consumeOk =
+ boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method);
+ std::string tag = consumeOk->getConsumerTag();
+ ConsumerMap::iterator i = consumers.find(std::string());
+ if (i != consumers.end()) {
+ // Need to rename the un-named consumer.
+ if (consumers.find(tag) == consumers.end()) {
+ consumers[tag] = i->second;
+ consumers.erase(i);
+ }
+ else // Tag already exists.
+ throw ChannelException(404, "Tag already exists: "+tag);
+ }
+ // FIXME aconway 2007-03-23: Integrate consumer & destination
+ // maps.
+ incoming.addDestination(tag, destDispatch);
+ return;
+ }
+ }
+ throw Channel::UnknownMethod();
+}
+
+void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) {
+ BasicHeaderProperties* props =
+ boost::polymorphic_downcast<BasicHeaderProperties*>(
+ header->getProperties());
+ IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF);
+ assert (ref.messages.size() == 1);
+ ref.messages.front().BasicHeaderProperties::operator=(*props);
+ incoming_size = header->getContentSize();
+ if (incoming_size==0)
+ incoming.closeReference(BASIC_REF);
+}
+
+void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){
+ incoming.appendReference(BASIC_REF, content->getData());
+ size_t size = incoming.getReference(BASIC_REF).data.size();
+ if (size >= incoming_size) {
+ incoming.closeReference(BASIC_REF);
+ if (size > incoming_size)
+ throw ChannelException(502, "Content exceeded declared size");
+ }
+}
+
+void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
+ //record delivery tag:
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer.listener->received(msg);
+
+ if(channel.isOpen()){
+ bool multiple(false);
+ switch(consumer.ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer.count) < channel.getPrefetch())
+ break;
+ //else drop-through
+ case AUTO_ACK:
+ consumer.lastDeliveryTag = 0;
+ channel.send(
+ new BasicAckBody(
+ channel.version,
+ msg.getDeliveryTag(),
+ multiple));
+ case NO_ACK: // Nothing to do
+ case CLIENT_ACK: // User code must ack.
+ break;
+ // TODO aconway 2007-02-22: Provide a way for user
+ // to ack!
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+
+void BasicMessageChannel::run() {
+ while(channel.isOpen()) {
+ try {
+ Message msg;
+ bool gotMessge = destDispatch.wait(msg);
+ if (gotMessge) {
+ if(msg.getDestination() == BASIC_RETURN) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
+ }
+ if(handler != 0)
+ handler->returned(msg);
+ }
+ else {
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(
+ msg.getDestination());
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" +
+ msg.getDestination());
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
+ }
+ }
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::BasicMessageChannel::run() terminated by: "
+ << e.toString() << endl;
+ }
+ }
+}
+
+void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
+ returnsHandler = handler;
+}
+
+void BasicMessageChannel::setQos(){
+ channel.sendAndReceive<BasicQosOkBody>(
+ make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
+ if(channel.isTransactional())
+ channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/BasicMessageChannel.h b/cpp/src/qpid/client/BasicMessageChannel.h
new file mode 100644
index 0000000000..13e1cf1e00
--- /dev/null
+++ b/cpp/src/qpid/client/BasicMessageChannel.h
@@ -0,0 +1,90 @@
+#ifndef _client_BasicMessageChannel_h
+#define _client_BasicMessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MessageChannel.h"
+#include "IncomingMessage.h"
+#include <boost/scoped_ptr.hpp>
+
+namespace qpid {
+namespace client {
+/**
+ * Messaging implementation using AMQP 0-8 BasicMessageChannel class
+ * to send and receiving messages.
+ */
+class BasicMessageChannel : public MessageChannel
+{
+ public:
+ BasicMessageChannel(Channel& parent);
+
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
+
+ void cancel(const std::string& tag, bool synch = true);
+
+ bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ void run();
+
+ void handle(boost::shared_ptr<framing::AMQMethodBody>);
+
+ void handle(shared_ptr<framing::AMQHeaderBody>);
+
+ void handle(shared_ptr<framing::AMQContentBody>);
+
+ void setQos();
+
+ void close();
+
+ private:
+
+ struct Consumer{
+ MessageListener* listener;
+ AckMode ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+ typedef std::map<std::string, Consumer> ConsumerMap;
+
+ void deliver(Consumer& consumer, Message& msg);
+
+ sys::Mutex lock;
+ Channel& channel;
+ IncomingMessage incoming;
+ uint64_t incoming_size;
+ ConsumerMap consumers ;
+ ReturnedMessageHandler* returnsHandler;
+ IncomingMessage::WaitableDestination destGet;
+ IncomingMessage::WaitableDestination destDispatch;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_BasicMessageChannel_h*/
diff --git a/cpp/src/qpid/client/ClientAdapter.cpp b/cpp/src/qpid/client/ClientAdapter.cpp
new file mode 100644
index 0000000000..ceabe344c9
--- /dev/null
+++ b/cpp/src/qpid/client/ClientAdapter.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "ClientAdapter.h"
+#include "Connection.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQMethodBody.h"
+
+namespace qpid {
+namespace client {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+void ClientAdapter::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ method->invoke(*clientOps, context);
+ }catch(ChannelException& e){
+ connection.client->getChannel().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.client->getConnection().close(
+ context, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.client->getConnection().close(
+ context, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
+ channel->handleHeader(body);
+}
+
+void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) {
+ channel->handleContent(body);
+}
+
+void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
+ // TODO aconway 2007-01-17: Implement heartbeats.
+}
+
+
+
+}} // namespace qpid::client
+
diff --git a/cpp/src/qpid/client/ClientAdapter.h b/cpp/src/qpid/client/ClientAdapter.h
new file mode 100644
index 0000000000..4a6b76077f
--- /dev/null
+++ b/cpp/src/qpid/client/ClientAdapter.h
@@ -0,0 +1,66 @@
+#ifndef _client_ClientAdapter_h
+#define _client_ClientAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/ChannelAdapter.h"
+#include "ClientChannel.h"
+
+namespace qpid {
+namespace client {
+
+class AMQMethodBody;
+class Connection;
+
+/**
+ * Per-channel protocol adapter.
+ *
+ * Translates protocol bodies into calls on the core Channel,
+ * Connection and Client objects.
+ *
+ * Owns a channel, has references to Connection and Client.
+ */
+class ClientAdapter : public framing::ChannelAdapter
+{
+ public:
+ ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&);
+ Channel& getChannel() { return *channel; }
+
+ void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+
+ private:
+ void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+
+ class ClientOps;
+
+ std::auto_ptr<Channel> channel;
+ Connection& connection;
+ Client& client;
+ boost::shared_ptr<ClientOps> clientOps;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_ClientAdapter_h*/
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
new file mode 100644
index 0000000000..05f01227f6
--- /dev/null
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -0,0 +1,341 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "ClientChannel.h"
+#include "qpid/sys/Monitor.h"
+#include "ClientMessage.h"
+#include "qpid/QpidError.h"
+#include "MethodBodyInstances.h"
+#include "Connection.h"
+#include "BasicMessageChannel.h"
+#include "MessageMessageChannel.h"
+
+// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
+// handling of errors that should close the connection or the channel.
+// Make sure the user thread receives a connection in each case.
+//
+using namespace std;
+using namespace boost;
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
+ connection(0), prefetch(_prefetch), transactional(_transactional)
+{
+ switch (mode) {
+ case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
+ case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break;
+ default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
+ }
+}
+
+Channel::~Channel(){
+ close();
+}
+
+void Channel::open(ChannelId id, Connection& con)
+{
+ if (isOpen())
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
+ connection = &con;
+ init(id, con, con.getVersion()); // ChannelAdapter initialization.
+ string oob;
+ if (id != 0)
+ sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
+}
+
+void Channel::protocolInit(
+ const std::string& uid, const std::string& pwd, const std::string& vhost) {
+ assert(connection);
+ responses.expect();
+ connection->connector->init(); // Send ProtocolInit block.
+ ConnectionStartBody::shared_ptr connectionStart =
+ responses.receive<ConnectionStartBody>();
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ ConnectionTuneBody::shared_ptr proposal =
+ sendAndReceive<ConnectionTuneBody>(
+ make_shared_ptr(new ConnectionStartOkBody(
+ version, connectionStart->getRequestId(),
+ props, mechanism,
+ response, locale)));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ send(new ConnectionTuneOkBody(
+ version, proposal->getRequestId(),
+ proposal->getChannelMax(), connection->getMaxFrameSize(),
+ proposal->getHeartbeat()));
+
+ uint16_t heartbeat = proposal->getHeartbeat();
+ connection->connector->setReadTimeout(heartbeat * 2);
+ connection->connector->setWriteTimeout(heartbeat);
+
+ // Send connection open.
+ std::string capabilities;
+ responses.expect();
+ send(new ConnectionOpenBody(version, vhost, capabilities, true));
+ //receive connection.open-ok (or redirect, but ignore that for now
+ //esp. as using force=true).
+ AMQMethodBody::shared_ptr openResponse = responses.receive();
+ if(openResponse->isA<ConnectionOpenOkBody>()) {
+ //ok
+ }else if(openResponse->isA<ConnectionRedirectBody>()){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(
+ shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
+ cout << "Received redirection to " << redirect->getHost()
+ << endl;
+ } else {
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
+ }
+}
+
+bool Channel::isOpen() const { return connection; }
+
+void Channel::setQos() {
+ messaging->setQos();
+}
+
+void Channel::setPrefetch(uint16_t _prefetch){
+ prefetch = _prefetch;
+ setQos();
+}
+
+void Channel::declareExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ string type = exchange.getType();
+ FieldTable args;
+ sendAndReceiveSync<ExchangeDeclareOkBody>(
+ synch,
+ make_shared_ptr(new ExchangeDeclareBody(
+ version, 0, name, type, false, false, false, false, !synch, args)));
+}
+
+void Channel::deleteExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ sendAndReceiveSync<ExchangeDeleteOkBody>(
+ synch,
+ make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
+}
+
+void Channel::declareQueue(Queue& queue, bool synch){
+ string name = queue.getName();
+ FieldTable args;
+ QueueDeclareOkBody::shared_ptr response =
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ make_shared_ptr(new QueueDeclareBody(
+ version, 0, name, false/*passive*/, queue.isDurable(),
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
+ if(synch) {
+ if(queue.getName().length() == 0)
+ queue.setName(response->getQueue());
+ }
+}
+
+void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
+ //ticket, queue, ifunused, ifempty, nowait
+ string name = queue.getName();
+ sendAndReceiveSync<QueueDeleteOkBody>(
+ synch,
+ make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
+}
+
+void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
+ string e = exchange.getName();
+ string q = queue.getName();
+ sendAndReceiveSync<QueueBindOkBody>(
+ synch,
+ make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
+}
+
+void Channel::commit(){
+ sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
+}
+
+void Channel::rollback(){
+ sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
+}
+
+void Channel::handleMethodInContext(
+ AMQMethodBody::shared_ptr method, const MethodContext&)
+{
+ // TODO aconway 2007-03-23: Special case for consume OK as it
+ // is both an expected response and needs handling in this thread.
+ // Need to review & reationalize the client-side processing model.
+ if (method->isA<BasicConsumeOkBody>()) {
+ messaging->handle(method);
+ responses.signalResponse(method);
+ return;
+ }
+ if(responses.isWaiting()) {
+ responses.signalResponse(method);
+ return;
+ }
+ try {
+ switch (method->amqpClassId()) {
+ case MessageOkBody::CLASS_ID:
+ case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
+ case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+ case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+ default: throw UnknownMethod();
+ }
+ }
+ catch (const UnknownMethod&) {
+ connection->close(
+ 504, "Unknown method",
+ method->amqpClassId(), method->amqpMethodId());
+ }
+ }
+
+void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+ switch (method->amqpMethodId()) {
+ case ChannelCloseBody::METHOD_ID:
+ peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
+ return;
+ case ChannelFlowBody::METHOD_ID:
+ // FIXME aconway 2007-02-22: Not yet implemented.
+ return;
+ }
+ throw UnknownMethod();
+}
+
+void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
+ if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
+ connection->close();
+ return;
+ }
+ throw UnknownMethod();
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
+ messaging->handle(body);
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr body){
+ messaging->handle(body);
+}
+
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+}
+
+void Channel::start(){
+ dispatcher = Thread(*messaging);
+}
+
+// Close called by local application.
+void Channel::close(
+ uint16_t code, const std::string& text,
+ ClassId classId, MethodId methodId)
+{
+ if (isOpen()) {
+ try {
+ if (getId() != 0) {
+ sendAndReceive<ChannelCloseOkBody>(
+ make_shared_ptr(new ChannelCloseBody(
+ version, code, text, classId, methodId)));
+ }
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ closeInternal();
+ } catch (...) {
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ closeInternal();
+ throw;
+ }
+ }
+}
+
+// Channel closed by peer.
+void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+ assert(isOpen());
+ closeInternal();
+}
+
+void Channel::closeInternal() {
+ if (isOpen());
+ {
+ messaging->close();
+ connection = 0;
+ // A 0 response means we are closed.
+ responses.signalResponse(AMQMethodBody::shared_ptr());
+ }
+ dispatcher.join();
+}
+
+AMQMethodBody::shared_ptr Channel::sendAndReceive(
+ AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
+{
+ responses.expect();
+ send(toSend);
+ return responses.receive(c, m);
+}
+
+AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
+ bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
+{
+ if(sync)
+ return sendAndReceive(body, c, m);
+ else {
+ send(body);
+ return AMQMethodBody::shared_ptr();
+ }
+}
+
+void Channel::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
+ messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+}
+
+void Channel::cancel(const std::string& tag, bool synch) {
+ messaging->cancel(tag, synch);
+}
+
+bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
+ return messaging->get(msg, queue, ackMode);
+}
+
+void Channel::publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory, bool immediate) {
+ messaging->publish(msg, exchange, routingKey, mandatory, immediate);
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
+ messaging->setReturnedMessageHandler(handler);
+}
+
+void Channel::run() {
+ messaging->run();
+}
+
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
new file mode 100644
index 0000000000..7f3074ea1b
--- /dev/null
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -0,0 +1,356 @@
+#ifndef _client_ClientChannel_h
+#define _client_ClientChannel_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/scoped_ptr.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "ClientExchange.h"
+#include "ClientMessage.h"
+#include "ClientQueue.h"
+#include "ResponseHandler.h"
+#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/sys/Thread.h"
+#include "AckMode.h"
+
+namespace qpid {
+
+namespace framing {
+class ChannelCloseBody;
+class AMQMethodBody;
+}
+
+namespace client {
+
+class Connection;
+class MessageChannel;
+class MessageListener;
+class ReturnedMessageHandler;
+
+/**
+ * Represents an AMQP channel, i.e. loosely a session of work. It
+ * is through a channel that most of the AMQP 'methods' are
+ * exposed.
+ *
+ * \ingroup clientapi
+ */
+class Channel : public framing::ChannelAdapter
+{
+ private:
+ struct UnknownMethod {};
+ typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
+
+ sys::Mutex lock;
+ boost::scoped_ptr<MessageChannel> messaging;
+ Connection* connection;
+ sys::Thread dispatcher;
+ ResponseHandler responses;
+
+ uint16_t prefetch;
+ const bool transactional;
+ framing::ProtocolVersion version;
+
+ void handleHeader(framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
+ void handleMethodInContext(
+ framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
+ void handleChannel(framing::AMQMethodBody::shared_ptr method);
+ void handleConnection(framing::AMQMethodBody::shared_ptr method);
+
+ void setQos();
+
+ void protocolInit(
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost);
+
+ framing::AMQMethodBody::shared_ptr sendAndReceive(
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
+
+ framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
+ bool sync,
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
+
+ template <class BodyType>
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
+ return boost::shared_polymorphic_downcast<BodyType>(
+ sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
+ }
+
+ template <class BodyType>
+ boost::shared_ptr<BodyType> sendAndReceiveSync(
+ bool sync, framing::AMQMethodBody::shared_ptr body) {
+ return boost::shared_polymorphic_downcast<BodyType>(
+ sendAndReceiveSync(
+ sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
+ }
+
+ void open(framing::ChannelId, Connection&);
+ void closeInternal();
+ void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+
+ // FIXME aconway 2007-02-23: Get rid of friendships.
+ friend class Connection;
+ friend class BasicMessageChannel; // for sendAndReceive.
+ friend class MessageMessageChannel; // for sendAndReceive.
+
+ public:
+ enum InteropMode { AMQP_08, AMQP_09 };
+
+ /**
+ * Creates a channel object.
+ *
+ * @param transactional if true, the publishing and acknowledgement
+ * of messages will be transactional and can be committed or
+ * aborted in atomic units (@see commit(), @see rollback())
+ *
+ * @param prefetch specifies the number of unacknowledged
+ * messages the channel is willing to have sent to it
+ * asynchronously
+ *
+ * @param messageImpl Alternate messaging implementation class to
+ * allow alternate protocol implementations of messaging
+ * operations. Takes ownership.
+ */
+ Channel(
+ bool transactional = false, u_int16_t prefetch = 500,
+ InteropMode=AMQP_08);
+
+ ~Channel();
+
+ /**
+ * Declares an exchange.
+ *
+ * In AMQP Exchanges are the destinations to which messages
+ * are published. They have Queues bound to them and route
+ * messages they receive to those queues. The routing rules
+ * depend on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Deletes an exchange
+ *
+ * @param exchange an Exchange object representing the exchange to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteExchange(Exchange& exchange, bool synch = true);
+ /**
+ * Declares a Queue
+ *
+ * @param queue a Queue object representing the queue to declare
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void declareQueue(Queue& queue, bool synch = true);
+ /**
+ * Deletes a Queue
+ *
+ * @param queue a Queue object representing the queue to delete
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+ /**
+ * Binds a queue to an exchange. The exact semantics of this
+ * (in particular how 'routing keys' and 'binding arguments'
+ * are used) depends on the type of the exchange.
+ *
+ * @param exchange an Exchange object representing the
+ * exchange to bind to
+ *
+ * @param queue a Queue object representing the queue to be
+ * bound
+ *
+ * @param key the 'routing key' for the binding
+ *
+ * @param args the 'binding arguments' for the binding
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void bind(const Exchange& exchange, const Queue& queue,
+ const std::string& key, const framing::FieldTable& args,
+ bool synch = true);
+
+ /**
+ * For a transactional channel this will commit all
+ * publications and acknowledgements since the last commit (or
+ * the channel was opened if there has been no previous
+ * commit). This will cause published messages to become
+ * available to consumers and acknowledged messages to be
+ * consumed and removed from the queues they were dispatched
+ * from.
+ *
+ * Transactionailty of a channel is specified when the channel
+ * object is created (@see Channel()).
+ */
+ void commit();
+
+ /**
+ * For a transactional channel, this will rollback any
+ * publications or acknowledgements. It will be as if the
+ * ppblished messages were never sent and the acknowledged
+ * messages were never consumed.
+ */
+ void rollback();
+
+ /**
+ * Change the prefetch in use.
+ */
+ void setPrefetch(uint16_t prefetch);
+
+ uint16_t getPrefetch() { return prefetch; }
+
+ /**
+ * Start message dispatching on a new thread
+ */
+ void start();
+
+ /**
+ * Close the channel with optional error information.
+ * Closing a channel that is not open has no effect.
+ */
+ void close(
+ framing::ReplyCode = 200, const std::string& ="OK",
+ framing::ClassId = 0, framing::MethodId = 0);
+
+ /** True if the channel is transactional */
+ bool isTransactional() { return transactional; }
+
+ /** True if the channel is open */
+ bool isOpen() const;
+
+ /** Get the connection associated with this channel */
+ Connection& getConnection() { return *connection; }
+
+ /** Return the protocol version */
+ framing::ProtocolVersion getVersion() const { return version ; }
+
+ /**
+ * Creates a 'consumer' for a queue. Messages in (or arriving
+ * at) that queue will be delivered to consumers
+ * asynchronously.
+ *
+ * @param queue a Queue instance representing the queue to
+ * consume from
+ *
+ * @param tag an identifier to associate with the consumer
+ * that can be used to cancel its subscription (if empty, this
+ * will be assigned by the broker)
+ *
+ * @param listener a pointer to an instance of an
+ * implementation of the MessageListener interface. Messages
+ * received from this queue for this consumer will result in
+ * invocation of the received() method on the listener, with
+ * the message itself passed in.
+ *
+ * @param ackMode the mode of acknowledgement that the broker
+ * should assume for this consumer. @see AckMode
+ *
+ * @param noLocal if true, this consumer will not be sent any
+ * message published by this connection
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
+
+ /**
+ * Cancels a subscription previously set up through a call to consume().
+ *
+ * @param tag the identifier used (or assigned) in the consume
+ * request that set up the subscription to be cancelled.
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void cancel(const std::string& tag, bool synch = true);
+ /**
+ * Synchronous pull of a message from a queue.
+ *
+ * @param msg a message object that will contain the message
+ * headers and content if the call completes.
+ *
+ * @param queue the queue to consume from
+ *
+ * @param ackMode the acknowledgement mode to use (@see
+ * AckMode)
+ *
+ * @return true if a message was succcessfully dequeued from
+ * the queue, false if the queue was empty.
+ */
+ bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+ /**
+ * Publishes (i.e. sends a message to the broker).
+ *
+ * @param msg the message to publish
+ *
+ * @param exchange the exchange to publish the message to
+ *
+ * @param routingKey the routing key to publish with
+ *
+ * @param mandatory if true and the exchange to which this
+ * publish is directed has no matching bindings, the message
+ * will be returned (see setReturnedMessageHandler()).
+ *
+ * @param immediate if true and there is no consumer to
+ * receive this message on publication, the message will be
+ * returned (see setReturnedMessageHandler()).
+ */
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ /**
+ * Set a handler for this channel that will process any
+ * returned messages
+ *
+ * @see publish()
+ */
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ /**
+ * Deliver messages from the broker to the appropriate MessageListener.
+ */
+ void run();
+
+
+};
+
+}}
+
+#endif /*!_client_ClientChannel_h*/
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
new file mode 100644
index 0000000000..48616cf3d9
--- /dev/null
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <algorithm>
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
+
+#include "Connection.h"
+#include "ClientChannel.h"
+#include "ClientMessage.h"
+#include "qpid/QpidError.h"
+#include <iostream>
+#include <sstream>
+#include "MethodBodyInstances.h"
+#include <functional>
+
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+namespace qpid {
+namespace client {
+
+const std::string Connection::OK("OK");
+
+Connection::Connection(
+ bool _debug, uint32_t _max_frame_size,
+ framing::ProtocolVersion _version
+) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
+ defaultConnector(version, _debug, _max_frame_size),
+ isOpen(false), debug(_debug)
+{
+ setConnector(defaultConnector);
+}
+
+Connection::~Connection(){}
+
+void Connection::setConnector(Connector& con)
+{
+ connector = &con;
+ connector->setInputHandler(this);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
+ out = connector->getOutputHandler();
+}
+
+void Connection::open(
+ const std::string& host, int port,
+ const std::string& uid, const std::string& pwd, const std::string& vhost)
+{
+ if (isOpen)
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+ connector->connect(host, port);
+ channels[0] = &channel0;
+ channel0.open(0, *this);
+ channel0.protocolInit(uid, pwd, vhost);
+ isOpen = true;
+}
+
+void Connection::shutdown() {
+ close();
+}
+
+void Connection::close(
+ ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+)
+{
+ if(isOpen) {
+ // TODO aconway 2007-01-29: Exception handling - could end up
+ // partly closed with threads left unjoined.
+ isOpen = false;
+ channel0.sendAndReceive<ConnectionCloseOkBody>(
+ make_shared_ptr(new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId)));
+
+ using boost::bind;
+ for_each(channels.begin(), channels.end(),
+ bind(&Channel::closeInternal,
+ bind(&ChannelMap::value_type::second, _1)));
+ channels.clear();
+ connector->close();
+ }
+}
+
+void Connection::openChannel(Channel& channel) {
+ ChannelId id = ++channelIdCounter;
+ assert (channels.find(id) == channels.end());
+ assert(out);
+ channels[id] = &channel;
+ channel.open(id, *this);
+}
+
+void Connection::erase(ChannelId id) {
+ channels.erase(id);
+}
+
+void Connection::received(AMQFrame* frame){
+ // FIXME aconway 2007-01-25: Mutex
+ ChannelId id = frame->getChannel();
+ Channel* channel = channels[id];
+ // FIXME aconway 2007-01-26: Exception thrown here is hanging the
+ // client. Need to review use of exceptions.
+ if (channel == 0)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR+504,
+ (boost::format("Invalid channel number %g") % id).str());
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(const qpid::QpidError& e){
+ channelException(
+ *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+ }
+}
+
+void Connection::send(AMQFrame* frame) {
+ out->send(frame);
+}
+
+void Connection::channelException(
+ Channel& channel, AMQMethodBody* method, const QpidError& e)
+{
+ int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
+ string msg = e.msg;
+ if(method == 0)
+ channel.close(code, msg);
+ else
+ channel.close(
+ code, msg, method->amqpClassId(), method->amqpMethodId());
+}
+
+void Connection::idleIn(){
+ connector->close();
+}
+
+void Connection::idleOut(){
+ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ClientExchange.cpp b/cpp/src/qpid/client/ClientExchange.cpp
new file mode 100644
index 0000000000..d5914beea2
--- /dev/null
+++ b/cpp/src/qpid/client/ClientExchange.cpp
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClientExchange.h"
+
+qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
+const std::string& qpid::client::Exchange::getName() const { return name; }
+const std::string& qpid::client::Exchange::getType() const { return type; }
+
+const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
+const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
+const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
+
+const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
+const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);
diff --git a/cpp/src/qpid/client/ClientExchange.h b/cpp/src/qpid/client/ClientExchange.h
new file mode 100644
index 0000000000..a8ac21fa9b
--- /dev/null
+++ b/cpp/src/qpid/client/ClientExchange.h
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Exchange_
+#define _Exchange_
+
+namespace qpid {
+namespace client {
+
+ /**
+ * A 'handle' used to represent an AMQP exchange in the Channel
+ * methods. Exchanges are the destinations to which messages are
+ * published.
+ *
+ * There are different types of exchange (the standard types are
+ * available as static constants, see DIRECT_EXCHANGE,
+ * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to
+ * an exchange using Channel::bind() and messages published to
+ * that exchange are then routed to the queue based on the details
+ * of the binding and the type of exchange.
+ *
+ * There are some standard exchange instances that are predeclared
+ * on all AMQP brokers. These are defined as static members
+ * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and
+ * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange
+ * (member DEFAULT_EXCHANGE) which is nameless and of type
+ * 'direct' and has every declared queue bound to it by queue
+ * name.
+ *
+ * \ingroup clientapi
+ */
+ class Exchange{
+ const std::string name;
+ const std::string type;
+
+ public:
+ /**
+ * A direct exchange routes messages published with routing
+ * key X to any queue bound with key X (i.e. an exact match is
+ * used).
+ */
+ static const std::string DIRECT_EXCHANGE;
+ /**
+ * A topic exchange treat the key with which a queue is bound
+ * as a pattern and routes all messages whose routing keys
+ * match that pattern to the bound queue. The routing key for
+ * a message must consist of zero or more alpha-numeric words
+ * delimited by dots. The pattern is of a similar form but *
+ * can be used to match excatly one word and # can be used to
+ * match zero or more words.
+ */
+ static const std::string TOPIC_EXCHANGE;
+ /**
+ * The headers exchange routes messages based on whether their
+ * headers match the binding arguments specified when
+ * binding. (see the AMQP spec for more details).
+ */
+ static const std::string HEADERS_EXCHANGE;
+
+ /**
+ * The 'default' exchange, nameless and of type 'direct'. Has
+ * every declared queue bound to it by name.
+ */
+ static const Exchange DEFAULT_EXCHANGE;
+ /**
+ * The standard direct exchange, named amq.direct.
+ */
+ static const Exchange STANDARD_DIRECT_EXCHANGE;
+ /**
+ * The standard topic exchange, named amq.topic.
+ */
+ static const Exchange STANDARD_TOPIC_EXCHANGE;
+ /**
+ * The standard headers exchange, named amq.header.
+ */
+ static const Exchange STANDARD_HEADERS_EXCHANGE;
+
+ Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
+ const std::string& getName() const;
+ const std::string& getType() const;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h
new file mode 100644
index 0000000000..3de3a693b9
--- /dev/null
+++ b/cpp/src/qpid/client/ClientMessage.h
@@ -0,0 +1,64 @@
+#ifndef _client_ClientMessage_h
+#define _client_ClientMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/framing/BasicHeaderProperties.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * A representation of messages for sent or recived through the
+ * client api.
+ *
+ * \ingroup clientapi
+ */
+// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
+// basic header properties.
+class Message : public framing::BasicHeaderProperties {
+ public:
+ Message(const std::string& data_=std::string()) : data(data_) {}
+
+ std::string getData() const { return data; }
+ void setData(const std::string& _data) { data = _data; }
+
+ std::string getDestination() const { return destination; }
+ void setDestination(const std::string& dest) { destination = dest; }
+
+ // TODO aconway 2007-03-22: only needed for Basic.deliver support.
+ uint64_t getDeliveryTag() const { return deliveryTag; }
+ void setDeliveryTag(uint64_t dt) { deliveryTag = dt; }
+
+ bool isRedelivered() const { return redelivered; }
+ void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+
+ private:
+ std::string data;
+ std::string destination;
+ bool redelivered;
+ uint64_t deliveryTag;
+};
+
+}}
+
+#endif /*!_client_ClientMessage_h*/
diff --git a/cpp/src/qpid/client/ClientQueue.cpp b/cpp/src/qpid/client/ClientQueue.cpp
new file mode 100644
index 0000000000..613cf8d288
--- /dev/null
+++ b/cpp/src/qpid/client/ClientQueue.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClientQueue.h"
+
+qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){}
+
+qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){}
+
+qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){}
+
+qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable)
+ : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){}
+
+const std::string& qpid::client::Queue::getName() const{
+ return name;
+}
+
+void qpid::client::Queue::setName(const std::string& _name){
+ name = _name;
+}
+
+bool qpid::client::Queue::isAutoDelete() const{
+ return autodelete;
+}
+
+bool qpid::client::Queue::isExclusive() const{
+ return exclusive;
+}
+
+bool qpid::client::Queue::isDurable() const{
+ return durable;
+}
+
+void qpid::client::Queue::setDurable(bool _durable){
+ durable = _durable;
+}
+
+
+
+
diff --git a/cpp/src/qpid/client/ClientQueue.h b/cpp/src/qpid/client/ClientQueue.h
new file mode 100644
index 0000000000..b37a44b004
--- /dev/null
+++ b/cpp/src/qpid/client/ClientQueue.h
@@ -0,0 +1,103 @@
+#ifndef _client_ClientQueue_h
+#define _client_ClientQueue_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+namespace qpid {
+namespace client {
+
+ /**
+ * A 'handle' used to represent an AMQP queue in the Channel
+ * methods. Creating an instance of this class does not cause the
+ * queue to be created on the broker. Rather, an instance of this
+ * class should be passed to Channel::declareQueue() to ensure
+ * that the queue exists or is created.
+ *
+ * Queues hold messages and allow clients to consume
+ * (see Channel::consume()) or get (see Channel::get()) those messags. A
+ * queue receives messages by being bound to one or more Exchange;
+ * messages published to that exchange may then be routed to the
+ * queue based on the details of the binding and the type of the
+ * exchange (see Channel::bind()).
+ *
+ * Queues are identified by a name. They can be exclusive (in which
+ * case they can only be used in the context of the connection
+ * over which they were declared, and are deleted when then
+ * connection closes), or they can be shared. Shared queues can be
+ * auto deleted when they have no consumers.
+ *
+ * We use the term 'temporary queue' to refer to an exclusive
+ * queue.
+ *
+ * \ingroup clientapi
+ */
+ class Queue{
+ std::string name;
+ const bool autodelete;
+ const bool exclusive;
+ bool durable;
+
+ public:
+
+ /**
+ * Creates an unnamed, non-durable, temporary queue. A name
+ * will be assigned to this queue instance by a call to
+ * Channel::declareQueue().
+ */
+ Queue();
+ /**
+ * Creates a shared, non-durable, queue with a given name,
+ * that will not be autodeleted.
+ *
+ * @param name the name of the queue
+ */
+ Queue(std::string name);
+ /**
+ * Creates a non-durable queue with a given name.
+ *
+ * @param name the name of the queue
+ *
+ * @param temp if true the queue will be a temporary queue, if
+ * false it will be shared and not autodeleted.
+ */
+ Queue(std::string name, bool temp);
+ /**
+ * This constructor allows the autodelete, exclusive and
+ * durable propeties to be explictly set. Note however that if
+ * exclusive is true, autodelete has no meaning as exclusive
+ * queues are always destroyed when the connection that
+ * created them is closed.
+ */
+ Queue(std::string name, bool autodelete, bool exclusive, bool durable);
+ const std::string& getName() const;
+ void setName(const std::string&);
+ bool isAutoDelete() const;
+ bool isExclusive() const;
+ bool isDurable() const;
+ void setDurable(bool durable);
+ };
+
+}
+}
+
+#endif /*!_client_ClientQueue_h*/
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
new file mode 100644
index 0000000000..071a1d9446
--- /dev/null
+++ b/cpp/src/qpid/client/Connection.h
@@ -0,0 +1,179 @@
+#ifndef _client_Connection_
+#define _client_Connection_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include "qpid/QpidError.h"
+#include "ClientChannel.h"
+#include "Connector.h"
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+
+
+namespace qpid {
+
+/**
+ * The client namespace contains all classes that make up a client
+ * implementation of the AMQP protocol. The key classes that form
+ * the basis of the client API to be used by applications are
+ * Connection and Channel.
+ */
+namespace client {
+
+/**
+ * \internal provide access to selected private channel functions
+ * for the Connection without making it a friend of the entire channel.
+ */
+class ConnectionForChannel :
+ public framing::InputHandler,
+ public framing::OutputHandler,
+ public sys::TimeoutHandler,
+ public sys::ShutdownHandler
+
+{
+ private:
+ friend class Channel;
+ virtual void erase(framing::ChannelId) = 0;
+};
+
+
+/**
+ * \defgroup clientapi Application API for an AMQP client
+ */
+
+/**
+ * Represents a connection to an AMQP broker. All communication is
+ * initiated by establishing a connection, then opening one or
+ * more Channels over that connection.
+ *
+ * \ingroup clientapi
+ */
+class Connection : public ConnectionForChannel
+{
+ typedef std::map<framing::ChannelId, Channel*> ChannelMap;
+
+ framing::ChannelId channelIdCounter;
+ static const std::string OK;
+
+ framing::ProtocolVersion version;
+ const uint32_t max_frame_size;
+ ChannelMap channels;
+ Connector defaultConnector;
+ Connector* connector;
+ framing::OutputHandler* out;
+ volatile bool isOpen;
+ Channel channel0;
+ bool debug;
+
+ void erase(framing::ChannelId);
+ void channelException(
+ Channel&, framing::AMQMethodBody*, const QpidError&);
+
+ // TODO aconway 2007-01-26: too many friendships, untagle these classes.
+ friend class Channel;
+
+ public:
+ /**
+ * Creates a connection object, but does not open the
+ * connection.
+ *
+ * @param _version the version of the protocol to connect with
+ *
+ * @param debug turns on tracing for the connection
+ * (i.e. prints details of the frames sent and received to std
+ * out). Optional and defaults to false.
+ *
+ * @param max_frame_size the maximum frame size that the
+ * client will accept. Optional and defaults to 65536.
+ */
+ Connection(bool debug = false, uint32_t max_frame_size = 65536,
+ framing::ProtocolVersion=framing::highestProtocolVersion);
+ ~Connection();
+
+ /**
+ * Opens a connection to a broker.
+ *
+ * @param host the host on which the broker is running
+ *
+ * @param port the port on the which the broker is listening
+ *
+ * @param uid the userid to connect with
+ *
+ * @param pwd the password to connect with (currently SASL
+ * PLAIN is the only authentication method supported so this
+ * is sent in clear text)
+ *
+ * @param virtualhost the AMQP virtual host to use (virtual
+ * hosts, where implemented(!), provide namespace partitioning
+ * within a single broker).
+ */
+ void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& virtualhost = "/");
+
+ /**
+ * Close the connection with optional error information for the peer.
+ *
+ * Any further use of this connection (without reopening it) will
+ * not succeed.
+ */
+ void close(framing::ReplyCode=200, const std::string& msg=OK,
+ framing::ClassId = 0, framing::MethodId = 0);
+
+ /**
+ * Associate a Channel with this connection and open it for use.
+ *
+ * In AMQP channels are like multi-plexed 'sessions' of work over
+ * a connection. Almost all the interaction with AMQP is done over
+ * a channel.
+ *
+ * @param connection the connection object to be associated with
+ * the channel. Call Channel::close() to close the channel.
+ */
+ void openChannel(Channel&);
+
+
+ // TODO aconway 2007-01-26: can these be private?
+ void send(framing::AMQFrame*);
+ void received(framing::AMQFrame*);
+ void idleOut();
+ void idleIn();
+ void shutdown();
+
+ /**\internal used for testing */
+ void setConnector(Connector& connector);
+
+ /**
+ * @return the maximum frame size in use on this connection
+ */
+ inline uint32_t getMaxFrameSize(){ return max_frame_size; }
+
+ /** @return protocol version in use on this connection. */
+ framing::ProtocolVersion getVersion() const { return version; }
+};
+
+}} // namespace qpid::client
+
+
+#endif
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
new file mode 100644
index 0000000000..9230050ff7
--- /dev/null
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/QpidError.h"
+#include "qpid/sys/Time.h"
+#include "Connector.h"
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+Connector::Connector(
+ ProtocolVersion ver, bool _debug, uint32_t buffer_size
+) : debug(_debug),
+ receive_buffer_size(buffer_size),
+ send_buffer_size(buffer_size),
+ version(ver),
+ closed(true),
+ lastIn(0), lastOut(0),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ inbuf(receive_buffer_size),
+ outbuf(send_buffer_size)
+{ }
+
+Connector::~Connector(){ }
+
+void Connector::connect(const std::string& host, int port){
+ socket = Socket::createTcp();
+ socket.connect(host, port);
+ closed = false;
+ receiver = Thread(this);
+}
+
+void Connector::init(){
+ ProtocolInitiation init(version);
+ writeBlock(&init);
+}
+
+void Connector::close(){
+ closed = true;
+ socket.close();
+ receiver.join();
+}
+
+void Connector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void Connector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* Connector::getOutputHandler(){
+ return this;
+}
+
+void Connector::send(AMQFrame* f){
+ std::auto_ptr<AMQFrame> frame(f);
+ AMQBody::shared_ptr body = frame->getBody();
+ writeBlock(frame.get());
+ if(debug) std::cout << "SENT: " << *frame << std::endl;
+}
+
+void Connector::writeBlock(AMQDataBlock* data){
+ Mutex::ScopedLock l(writeLock);
+ data->encode(outbuf);
+ //transfer data to wire
+ outbuf.flip();
+ writeToSocket(outbuf.start(), outbuf.available());
+ outbuf.clear();
+}
+
+void Connector::writeToSocket(char* data, size_t available){
+ size_t written = 0;
+ while(written < available && !closed){
+ ssize_t sent = socket.send(data + written, available-written);
+ if(sent > 0) {
+ lastOut = now() * TIME_MSEC;
+ written += sent;
+ }
+ }
+}
+
+void Connector::handleClosed(){
+ closed = true;
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+}
+
+void Connector::checkIdle(ssize_t status){
+ if(timeoutHandler){
+ Time t = now() * TIME_MSEC;
+ if(status == Socket::SOCKET_TIMEOUT) {
+ if(idleIn && (t - lastIn > idleIn)){
+ timeoutHandler->idleIn();
+ }
+ }
+ else if(status == 0 || status == Socket::SOCKET_EOF) {
+ handleClosed();
+ }
+ else {
+ lastIn = t;
+ }
+ if(idleOut && (t - lastOut > idleOut)){
+ timeoutHandler->idleOut();
+ }
+ }
+}
+
+void Connector::setReadTimeout(uint16_t t){
+ idleIn = t * 1000;//t is in secs
+ if(idleIn && (!timeout || idleIn < timeout)){
+ timeout = idleIn;
+ setSocketTimeout();
+ }
+
+}
+
+void Connector::setWriteTimeout(uint16_t t){
+ idleOut = t * 1000;//t is in secs
+ if(idleOut && (!timeout || idleOut < timeout)){
+ timeout = idleOut;
+ setSocketTimeout();
+ }
+}
+
+void Connector::setSocketTimeout(){
+ socket.setTimeout(timeout*TIME_MSEC);
+}
+
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
+ timeoutHandler = handler;
+}
+
+void Connector::run(){
+ try{
+ while(!closed){
+ ssize_t available = inbuf.available();
+ if(available < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ ssize_t received = socket.recv(inbuf.start(), available);
+ checkIdle(received);
+
+ if(!closed && received > 0){
+ inbuf.move(received);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame(version);
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: " << frame << std::endl;
+ input->received(&frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+ } catch (const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ handleClosed();
+ }
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
new file mode 100644
index 0000000000..c63a1ce8ac
--- /dev/null
+++ b/cpp/src/qpid/client/Connector.h
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Connector_
+#define _Connector_
+
+
+#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Socket.h"
+
+namespace qpid {
+
+namespace client {
+
+class Connector : public framing::OutputHandler,
+ private sys::Runnable
+{
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+ framing::ProtocolVersion version;
+
+ bool closed;
+
+ int64_t lastIn;
+ int64_t lastOut;
+ int64_t timeout;
+ uint32_t idleIn;
+ uint32_t idleOut;
+
+ sys::TimeoutHandler* timeoutHandler;
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
+
+ framing::Buffer inbuf;
+ framing::Buffer outbuf;
+
+ sys::Mutex writeLock;
+ sys::Thread receiver;
+
+ sys::Socket socket;
+
+ void checkIdle(ssize_t status);
+ void writeBlock(framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+ void handleClosed();
+
+ friend class Channel;
+ public:
+ Connector(framing::ProtocolVersion pVersion,
+ bool debug = false, uint32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init();
+ virtual void close();
+ virtual void setInputHandler(framing::InputHandler* handler);
+ virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(sys::ShutdownHandler* handler);
+ virtual framing::OutputHandler* getOutputHandler();
+ virtual void send(framing::AMQFrame* frame);
+ virtual void setReadTimeout(uint16_t timeout);
+ virtual void setWriteTimeout(uint16_t timeout);
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/client/IncomingMessage.cpp b/cpp/src/qpid/client/IncomingMessage.cpp
new file mode 100644
index 0000000000..059e644464
--- /dev/null
+++ b/cpp/src/qpid/client/IncomingMessage.cpp
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "IncomingMessage.h"
+#include "qpid/Exception.h"
+#include "ClientMessage.h"
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace client {
+
+using boost::format;
+using sys::Mutex;
+
+IncomingMessage::Destination::~Destination() {}
+
+
+IncomingMessage::WaitableDestination::WaitableDestination()
+ : shutdownFlag(false) {}
+
+void IncomingMessage::WaitableDestination::message(const Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ queue.push(msg);
+ monitor.notify();
+}
+
+void IncomingMessage::WaitableDestination::empty() {
+ Mutex::ScopedLock l(monitor);
+ queue.push(Empty());
+ monitor.notify();
+}
+
+bool IncomingMessage::WaitableDestination::wait(Message& msgOut) {
+ Mutex::ScopedLock l(monitor);
+ while (queue.empty() && !shutdownFlag)
+ monitor.wait();
+ if (shutdownFlag)
+ return false;
+ Message* msg = boost::get<Message>(&queue.front());
+ bool success = msg;
+ if (success)
+ msgOut=*msg;
+ queue.pop();
+ if (!queue.empty())
+ monitor.notify(); // Wake another waiter.
+ return success;
+}
+
+void IncomingMessage::WaitableDestination::shutdown() {
+ Mutex::ScopedLock l(monitor);
+ shutdownFlag = true;
+ monitor.notifyAll();
+}
+
+void IncomingMessage::openReference(const std::string& name) {
+ Mutex::ScopedLock l(lock);
+ if (references.find(name) != references.end())
+ throw ConnectionException(
+ 503, format("Attempt to open existing reference %s.") % name);
+ references[name];
+ return;
+}
+
+void IncomingMessage::appendReference(
+ const std::string& name, const std::string& data)
+{
+ Mutex::ScopedLock l(lock);
+ getRefUnlocked(name).data += data;
+}
+
+Message& IncomingMessage::createMessage(
+ const std::string& destination, const std::string& reference)
+{
+ Mutex::ScopedLock l(lock);
+ getDestUnlocked(destination); // Verify destination.
+ Reference& ref = getRefUnlocked(reference);
+ ref.messages.resize(ref.messages.size() +1);
+ ref.messages.back().setDestination(destination);
+ return ref.messages.back();
+}
+
+void IncomingMessage::closeReference(const std::string& name) {
+ Reference refCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ refCopy = getRefUnlocked(name);
+ references.erase(name);
+ }
+ for (std::vector<Message>::iterator i = refCopy.messages.begin();
+ i != refCopy.messages.end();
+ ++i)
+ {
+ i->setData(refCopy.data);
+ // TODO aconway 2007-03-23: Thread safety,
+ // can a destination be removed while we're doing this?
+ getDestination(i->getDestination()).message(*i);
+ }
+}
+
+
+void IncomingMessage::addDestination(std::string name, Destination& dest) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ destinations[name]=&dest;
+ else if (i->second != &dest)
+ throw ConnectionException(
+ 503, format("Destination already exists: %s.") % name);
+}
+
+void IncomingMessage::removeDestination(std::string name) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ throw ConnectionException(
+ 503, format("No such destination: %s.") % name);
+ destinations.erase(i);
+}
+
+IncomingMessage::Destination& IncomingMessage::getDestination(
+ const std::string& name) {
+ return getDestUnlocked(name);
+}
+
+IncomingMessage::Reference& IncomingMessage::getReference(
+ const std::string& name) {
+ return getRefUnlocked(name);
+}
+
+IncomingMessage::Reference& IncomingMessage::getRefUnlocked(
+ const std::string& name) {
+ Mutex::ScopedLock l(lock);
+ ReferenceMap::iterator i = references.find(name);
+ if (i == references.end())
+ throw ConnectionException(
+ 503, format("No such reference: %s.") % name);
+ return i->second;
+}
+
+IncomingMessage::Destination& IncomingMessage::getDestUnlocked(
+ const std::string& name) {
+ Mutex::ScopedLock l(lock);
+ DestinationMap::iterator i = destinations.find(name);
+ if (i == destinations.end())
+ throw ConnectionException(
+ 503, format("No such destination: %s.") % name);
+ return *i->second;
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/IncomingMessage.h b/cpp/src/qpid/client/IncomingMessage.h
new file mode 100644
index 0000000000..7aa8e33df2
--- /dev/null
+++ b/cpp/src/qpid/client/IncomingMessage.h
@@ -0,0 +1,136 @@
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Monitor.h"
+#include <map>
+#include <queue>
+#include <vector>
+#include <boost/variant.hpp>
+
+namespace qpid {
+namespace client {
+
+class Message;
+
+/**
+ * Manage incoming messages.
+ *
+ * Uses reference and destination concepts from 0-9 Messsage class.
+ *
+ * Basic messages use special destination and reference names to indicate
+ * get-ok, return etc. messages.
+ *
+ */
+class IncomingMessage {
+ public:
+ /** Accumulate data associated with a set of messages. */
+ struct Reference {
+ std::string data;
+ std::vector<Message> messages;
+ };
+
+ /** Interface to a destination for messages. */
+ class Destination {
+ public:
+ virtual ~Destination();
+
+ /** Pass a message to the destination */
+ virtual void message(const Message&) = 0;
+
+ /** Notify destination of queue-empty contition */
+ virtual void empty() = 0;
+ };
+
+
+ /** A destination that a thread can wait on till a message arrives. */
+ class WaitableDestination : public Destination
+ {
+ public:
+ WaitableDestination();
+ void message(const Message& msg);
+ void empty();
+ /** Wait till message() or empty() is called. True for message() */
+ bool wait(Message& msgOut);
+ void shutdown();
+
+ private:
+ struct Empty {};
+ typedef boost::variant<Message,Empty> Item;
+ sys::Monitor monitor;
+ std::queue<Item> queue;
+ bool shutdownFlag;
+ };
+
+
+
+ /** Add a reference. Throws if already open. */
+ void openReference(const std::string& name);
+
+ /** Get a reference. Throws if not already open. */
+ void appendReference(const std::string& name,
+ const std::string& data);
+
+ /** Create a message to destination associated with reference
+ *@exception if destination or reference non-existent.
+ */
+ Message& createMessage(const std::string& destination,
+ const std::string& reference);
+
+ /** Get a reference.
+ *@exception if non-existent.
+ */
+ Reference& getReference(const std::string& name);
+
+ /** Close a reference and deliver all its messages.
+ * Throws if not open or a message has an invalid destination.
+ */
+ void closeReference(const std::string& name);
+
+ /** Add a destination.
+ *@exception if a different Destination is already registered
+ * under name.
+ */
+ void addDestination(std::string name, Destination&);
+
+ /** Remove a destination. Throws if does not exist */
+ void removeDestination(std::string name);
+
+ /** Get a destination. Throws if does not exist */
+ Destination& getDestination(const std::string& name);
+ private:
+
+ typedef std::map<std::string, Reference> ReferenceMap;
+ typedef std::map<std::string, Destination*> DestinationMap;
+
+ Reference& getRefUnlocked(const std::string& name);
+ Destination& getDestUnlocked(const std::string& name);
+
+ mutable sys::Mutex lock;
+ ReferenceMap references;
+ DestinationMap destinations;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/client/MessageChannel.h b/cpp/src/qpid/client/MessageChannel.h
new file mode 100644
index 0000000000..a830a47986
--- /dev/null
+++ b/cpp/src/qpid/client/MessageChannel.h
@@ -0,0 +1,94 @@
+#ifndef _client_MessageChannel_h
+#define _client_MessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/shared_ptr.h"
+#include "qpid/sys/Runnable.h"
+#include "AckMode.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class FieldTable;
+}
+
+namespace client {
+
+class Channel;
+class Message;
+class Queue;
+class Exchange;
+class MessageListener;
+class ReturnedMessageHandler;
+
+/**
+ * Abstract interface for messaging implementation for a channel.
+ *
+ *@see Channel for documentation.
+ */
+class MessageChannel : public sys::Runnable
+{
+ public:
+ /**@see Channel::consume */
+ virtual void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0) = 0;
+
+ /**@see Channel::cancel */
+ virtual void cancel(const std::string& tag, bool synch = true) = 0;
+
+ /**@see Channel::get */
+ virtual bool get(
+ Message& msg, const Queue& queue, AckMode ackMode = NO_ACK) = 0;
+
+ /**@see Channel::get */
+ virtual void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false) = 0;
+
+ /**@see Channel::setReturnedMessageHandler */
+ virtual void setReturnedMessageHandler(
+ ReturnedMessageHandler* handler) = 0;
+
+ /** Handle an incoming method. */
+ virtual void handle(shared_ptr<framing::AMQMethodBody>) = 0;
+
+ /** Handle an incoming header */
+ virtual void handle(shared_ptr<framing::AMQHeaderBody>) = 0;
+
+ /** Handle an incoming content */
+ virtual void handle(shared_ptr<framing::AMQContentBody>) = 0;
+
+ /** Send channel's QOS settings */
+ virtual void setQos() = 0;
+
+ /** Channel is closing */
+ virtual void close() = 0;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_MessageChannel_h*/
diff --git a/cpp/src/qpid/client/MessageListener.cpp b/cpp/src/qpid/client/MessageListener.cpp
new file mode 100644
index 0000000000..68ebedeb0d
--- /dev/null
+++ b/cpp/src/qpid/client/MessageListener.cpp
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageListener.h"
+
+qpid::client::MessageListener::~MessageListener() {}
diff --git a/cpp/src/qpid/client/MessageListener.h b/cpp/src/qpid/client/MessageListener.h
new file mode 100644
index 0000000000..501862a3ef
--- /dev/null
+++ b/cpp/src/qpid/client/MessageListener.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _MessageListener_
+#define _MessageListener_
+
+#include "ClientMessage.h"
+
+namespace qpid {
+namespace client {
+
+ /**
+ * An interface through which asynchronously delivered messages
+ * can be received by an application.
+ *
+ * @see Channel::consume()
+ *
+ * \ingroup clientapi
+ */
+ class MessageListener{
+ public:
+ virtual ~MessageListener();
+ virtual void received(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/client/MessageMessageChannel.cpp b/cpp/src/qpid/client/MessageMessageChannel.cpp
new file mode 100644
index 0000000000..6ba5e00153
--- /dev/null
+++ b/cpp/src/qpid/client/MessageMessageChannel.cpp
@@ -0,0 +1,431 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <boost/format.hpp>
+#include "MessageMessageChannel.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "qpid/framing/FieldTable.h"
+#include "Connection.h"
+#include "qpid/shared_ptr.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace client {
+
+using namespace std;
+using namespace sys;
+using namespace framing;
+
+MessageMessageChannel::MessageMessageChannel(Channel& ch)
+ : channel(ch), tagCount(0) {}
+
+string MessageMessageChannel::newTag() {
+ Mutex::ScopedLock l(lock);
+ return (boost::format("__tag%d")%++tagCount).str();
+}
+
+void MessageMessageChannel::consume(
+ Queue& queue, std::string& tag, MessageListener* /*listener*/,
+ AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields)
+{
+ if (tag.empty())
+ tag = newTag();
+ channel.sendAndReceive<MessageOkBody>(
+ make_shared_ptr(new MessageConsumeBody(
+ channel.getVersion(), 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
+
+// // FIXME aconway 2007-02-20: Race condition!
+// // We could receive the first message for the consumer
+// // before we create the consumer below.
+// // Move consumer creation to handler for MessageConsumeOkBody
+// {
+// Mutex::ScopedLock l(lock);
+// ConsumerMap::iterator i = consumers.find(tag);
+// if (i != consumers.end())
+// THROW_QPID_ERROR(CLIENT_ERROR,
+// "Consumer already exists with tag="+tag);
+// Consumer& c = consumers[tag];
+// c.listener = listener;
+// c.ackMode = ackMode;
+// c.lastDeliveryTag = 0;
+// }
+}
+
+
+void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) {
+ // FIXME aconway 2007-02-23:
+// Consumer c;
+// {
+// Mutex::ScopedLock l(lock);
+// ConsumerMap::iterator i = consumers.find(tag);
+// if (i == consumers.end())
+// return;
+// c = i->second;
+// consumers.erase(i);
+// }
+// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
+// channel.sendAndReceiveSync<MessageCancelOkBody>(
+// synch, new MessageCancelBody(channel.version, tag, !synch));
+}
+
+void MessageMessageChannel::close(){
+ // FIXME aconway 2007-02-23:
+// ConsumerMap consumersCopy;
+// {
+// Mutex::ScopedLock l(lock);
+// consumersCopy = consumers;
+// consumers.clear();
+// }
+// for (ConsumerMap::iterator i=consumersCopy.begin();
+// i != consumersCopy.end(); ++i)
+// {
+// Consumer& c = i->second;
+// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+// && c.lastDeliveryTag > 0)
+// {
+// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
+// }
+// }
+// incoming.shutdown();
+}
+
+
+/** Destination ID for the current get.
+ * Must not clash with a generated consumer ID.
+ * TODO aconway 2007-03-06: support multiple outstanding gets?
+ */
+const string getDestinationId("__get__");
+
+/**
+ * A destination that provides a Correlator::Action to handle
+ * MessageEmpty responses.
+ */
+struct MessageGetDestination : public IncomingMessage::WaitableDestination
+{
+ void response(shared_ptr<AMQResponseBody> response) {
+ if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
+ switch (response->amqpMethodId()) {
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do, wait for transfer.
+ return;
+ case MessageEmptyBody::METHOD_ID:
+ empty(); // Wake up waiter with empty queue.
+ return;
+ }
+ }
+ throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
+ }
+
+ Correlator::Action action() {
+ return boost::bind(&MessageGetDestination::response, this, _1);
+ }
+};
+
+bool MessageMessageChannel::get(
+ Message& msg, const Queue& queue, AckMode ackMode)
+{
+ Mutex::ScopedLock l(lock);
+ std::string destName=newTag();
+ MessageGetDestination dest;
+ incoming.addDestination(destName, dest);
+ channel.send(
+ make_shared_ptr(
+ new MessageGetBody(
+ channel.version, 0, queue.getName(), destName, ackMode)),
+ dest.action());
+ return dest.wait(msg);
+}
+
+
+/** Convert a message to a transfer command. */
+MessageTransferBody::shared_ptr makeTransfer(
+ ProtocolVersion version,
+ const Message& msg, const string& destination,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ return MessageTransferBody::shared_ptr(
+ new MessageTransferBody(
+ version,
+ 0, // FIXME aconway 2007-04-03: ticket.
+ destination,
+ msg.isRedelivered(),
+ immediate,
+ 0, // FIXME aconway 2007-02-23: ttl
+ msg.getPriority(),
+ msg.getTimestamp(),
+ static_cast<uint8_t>(msg.getDeliveryMode()),
+ 0, // FIXME aconway 2007-04-03: Expiration
+ string(), // Exchange: for broker use only.
+ routingKey,
+ msg.getMessageId(),
+ msg.getCorrelationId(),
+ msg.getReplyTo(),
+ msg.getContentType(),
+ msg.getContentEncoding(),
+ msg.getUserId(),
+ msg.getAppId(),
+ string(), // FIXME aconway 2007-04-03: TransactionId
+ string(), //FIXME aconway 2007-04-03: SecurityToken
+ msg.getHeaders(),
+ Content(INLINE, msg.getData()),
+ mandatory
+ ));
+}
+
+// FIXME aconway 2007-04-05: Generated code should provide this.
+/**
+ * Calculate the size of a frame containing the given body type
+ * if all variable-lengths parts are empty.
+ */
+template <class T> size_t overhead() {
+ static AMQFrame frame(
+ ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion())));
+ return frame.size();
+}
+
+void MessageMessageChannel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ MessageTransferBody::shared_ptr transfer = makeTransfer(
+ channel.getVersion(),
+ msg, exchange.getName(), routingKey, mandatory, immediate);
+ // Frame itself uses 8 bytes.
+ u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
+ if (transfer->size() <= frameMax) {
+ channel.sendAndReceive<MessageOkBody>(transfer);
+ }
+ else {
+ std::string ref = newTag();
+ std::string data = transfer->getBody().getValue();
+ size_t chunk =
+ channel.connection->getMaxFrameSize() -
+ (overhead<MessageAppendBody>() + ref.size());
+ // TODO aconway 2007-04-05: cast around lack of generated setters
+ const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref);
+ channel.send(
+ make_shared_ptr(new MessageOpenBody(channel.version, ref)));
+ channel.send(transfer);
+ const char* p = data.data();
+ const char* end = data.data()+data.size();
+ while (p+chunk <= end) {
+ channel.send(
+ make_shared_ptr(
+ new MessageAppendBody(channel.version, ref, std::string(p, chunk))));
+ p += chunk;
+ }
+ if (p < end) {
+ channel.send(
+ make_shared_ptr(
+ new MessageAppendBody(channel.version, ref, std::string(p, end-p))));
+ }
+ channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref)));
+ }
+}
+
+void copy(Message& msg, MessageTransferBody& transfer) {
+ // FIXME aconway 2007-04-05: Verify all required fields
+ // are copied.
+ msg.setContentType(transfer.getContentType());
+ msg.setContentEncoding(transfer.getContentEncoding());
+ msg.setHeaders(transfer.getApplicationHeaders());
+ msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
+ msg.setPriority(transfer.getPriority());
+ msg.setCorrelationId(transfer.getCorrelationId());
+ msg.setReplyTo(transfer.getReplyTo());
+ // FIXME aconway 2007-04-05: TTL/Expiration
+ msg.setMessageId(transfer.getMessageId());
+ msg.setTimestamp(transfer.getTimestamp());
+ msg.setUserId(transfer.getUserId());
+ msg.setAppId(transfer.getAppId());
+ msg.setDestination(transfer.getDestination());
+ msg.setRedelivered(transfer.getRedelivered());
+ msg.setDeliveryTag(0); // No meaning in 0-9
+ if (transfer.getBody().isInline())
+ msg.setData(transfer.getBody().getValue());
+}
+
+void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
+ assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
+ switch(method->amqpMethodId()) {
+ case MessageAppendBody::METHOD_ID: {
+ MessageAppendBody::shared_ptr append =
+ shared_polymorphic_downcast<MessageAppendBody>(method);
+ incoming.appendReference(append->getReference(), append->getBytes());
+ break;
+ }
+ case MessageOpenBody::METHOD_ID: {
+ MessageOpenBody::shared_ptr open =
+ shared_polymorphic_downcast<MessageOpenBody>(method);
+ incoming.openReference(open->getReference());
+ break;
+ }
+
+ case MessageCloseBody::METHOD_ID: {
+ MessageCloseBody::shared_ptr close =
+ shared_polymorphic_downcast<MessageCloseBody>(method);
+ incoming.closeReference(close->getReference());
+ break;
+ }
+
+ case MessageTransferBody::METHOD_ID: {
+ MessageTransferBody::shared_ptr transfer=
+ shared_polymorphic_downcast<MessageTransferBody>(method);
+ if (transfer->getBody().isInline()) {
+ Message msg;
+ copy(msg, *transfer);
+ // Deliver it.
+ incoming.getDestination(transfer->getDestination()).message(msg);
+ }
+ else {
+ Message& msg=incoming.createMessage(
+ transfer->getDestination(), transfer->getBody().getValue());
+ copy(msg, *transfer);
+ // Will be delivered when reference closes.
+ }
+ break;
+ }
+
+ case MessageEmptyBody::METHOD_ID:
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do
+ break;
+
+ // FIXME aconway 2007-04-03: TODO
+ case MessageCancelBody::METHOD_ID:
+ case MessageCheckpointBody::METHOD_ID:
+ case MessageOffsetBody::METHOD_ID:
+ case MessageQosBody::METHOD_ID:
+ case MessageRecoverBody::METHOD_ID:
+ case MessageRejectBody::METHOD_ID:
+ case MessageResumeBody::METHOD_ID:
+ break;
+ default:
+ throw Channel::UnknownMethod();
+ }
+}
+
+void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){
+ throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
+}
+
+void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){
+ throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
+}
+
+// FIXME aconway 2007-02-23:
+// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){
+// //record delivery tag:
+// consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+// //allow registered listener to handle the message
+// consumer.listener->received(msg);
+
+// if(channel.isOpen()){
+// bool multiple(false);
+// switch(consumer.ackMode){
+// case LAZY_ACK:
+// multiple = true;
+// if(++(consumer.count) < channel.getPrefetch())
+// break;
+// //else drop-through
+// case AUTO_ACK:
+// consumer.lastDeliveryTag = 0;
+// channel.send(
+// new MessageAckBody(
+// channel.version, msg.getDeliveryTag(), multiple));
+// case NO_ACK: // Nothing to do
+// case CLIENT_ACK: // User code must ack.
+// break;
+// // TODO aconway 2007-02-22: Provide a way for user
+// // to ack!
+// }
+// }
+
+// //as it stands, transactionality is entirely orthogonal to ack
+// //mode, though the acks will not be processed by the broker under
+// //a transaction until it commits.
+// }
+
+
+void MessageMessageChannel::run() {
+ // FIXME aconway 2007-02-23:
+// while(channel.isOpen()) {
+// try {
+// Message msg = incoming.waitDispatch();
+// if(msg.getMethod()->isA<MessageReturnBody>()) {
+// ReturnedMessageHandler* handler=0;
+// {
+// Mutex::ScopedLock l(lock);
+// handler=returnsHandler;
+// }
+// if(handler == 0) {
+// // TODO aconway 2007-02-20: proper logging.
+// cout << "Message returned: " << msg.getData() << endl;
+// }
+// else
+// handler->returned(msg);
+// }
+// else {
+// MessageDeliverBody::shared_ptr deliverBody =
+// boost::shared_polymorphic_downcast<MessageDeliverBody>(
+// msg.getMethod());
+// std::string tag = deliverBody->getConsumerTag();
+// Consumer consumer;
+// {
+// Mutex::ScopedLock l(lock);
+// ConsumerMap::iterator i = consumers.find(tag);
+// if(i == consumers.end())
+// THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+// "Unknown consumer tag=" + tag);
+// consumer = i->second;
+// }
+// deliver(consumer, msg);
+// }
+// }
+// catch (const ShutdownException&) {
+// /* Orderly shutdown */
+// }
+// catch (const Exception& e) {
+// // FIXME aconway 2007-02-20: Report exception to user.
+// cout << "client::Message::run() terminated by: " << e.toString()
+// << "(" << typeid(e).name() << ")" << endl;
+// }
+// }
+}
+
+void MessageMessageChannel::setReturnedMessageHandler(
+ ReturnedMessageHandler* )
+{
+ throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns");
+}
+
+void MessageMessageChannel::setQos(){
+ channel.sendAndReceive<MessageOkBody>(
+ make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
+ if(channel.isTransactional())
+ channel.sendAndReceive<TxSelectOkBody>(
+ make_shared_ptr(new TxSelectBody(channel.version)));
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/MessageMessageChannel.h b/cpp/src/qpid/client/MessageMessageChannel.h
new file mode 100644
index 0000000000..12c4786b81
--- /dev/null
+++ b/cpp/src/qpid/client/MessageMessageChannel.h
@@ -0,0 +1,82 @@
+#ifndef _client_MessageMessageChannel_h
+#define _client_MessageMessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MessageChannel.h"
+#include "IncomingMessage.h"
+#include "qpid/sys/Monitor.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace client {
+/**
+ * Messaging implementation using AMQP 0-9 MessageMessageChannel class
+ * to send and receiving messages.
+ */
+class MessageMessageChannel : public MessageChannel
+{
+ public:
+ MessageMessageChannel(Channel& parent);
+
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
+
+ void cancel(const std::string& tag, bool synch = true);
+
+ bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ void run();
+
+ void handle(boost::shared_ptr<framing::AMQMethodBody>);
+
+ void handle(shared_ptr<framing::AMQHeaderBody>);
+
+ void handle(shared_ptr<framing::AMQContentBody>);
+
+ void setQos();
+
+ void close();
+
+ private:
+ typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
+ Destinations;
+
+ std::string newTag();
+
+ sys::Mutex lock;
+ Channel& channel;
+ IncomingMessage incoming;
+ long tagCount;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_MessageMessageChannel_h*/
+
diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h
new file mode 100644
index 0000000000..516ba6e4e3
--- /dev/null
+++ b/cpp/src/qpid/client/MethodBodyInstances.h
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/framing/amqp_framing.h"
+
+#ifndef _MethodBodyInstances_h_
+#define _MethodBodyInstances_h_
+
+namespace qpid {
+namespace client {
+
+/**
+ * A list of method body instances that can be used to compare against
+ * incoming bodies.
+ */
+class MethodBodyInstances
+{
+private:
+ qpid::framing::ProtocolVersion version;
+public:
+ const qpid::framing::BasicCancelOkBody basic_cancel_ok;
+ const qpid::framing::BasicConsumeOkBody basic_consume_ok;
+ const qpid::framing::BasicDeliverBody basic_deliver;
+ const qpid::framing::BasicGetEmptyBody basic_get_empty;
+ const qpid::framing::BasicGetOkBody basic_get_ok;
+ const qpid::framing::BasicQosOkBody basic_qos_ok;
+ const qpid::framing::BasicReturnBody basic_return;
+ const qpid::framing::ChannelCloseBody channel_close;
+ const qpid::framing::ChannelCloseOkBody channel_close_ok;
+ const qpid::framing::ChannelFlowBody channel_flow;
+ const qpid::framing::ChannelOpenOkBody channel_open_ok;
+ const qpid::framing::ConnectionCloseBody connection_close;
+ const qpid::framing::ConnectionCloseOkBody connection_close_ok;
+ const qpid::framing::ConnectionOpenOkBody connection_open_ok;
+ const qpid::framing::ConnectionRedirectBody connection_redirect;
+ const qpid::framing::ConnectionStartBody connection_start;
+ const qpid::framing::ConnectionTuneBody connection_tune;
+ const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
+ const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
+ const qpid::framing::QueueDeclareOkBody queue_declare_ok;
+ const qpid::framing::QueueDeleteOkBody queue_delete_ok;
+ const qpid::framing::QueueBindOkBody queue_bind_ok;
+ const qpid::framing::TxCommitOkBody tx_commit_ok;
+ const qpid::framing::TxRollbackOkBody tx_rollback_ok;
+ const qpid::framing::TxSelectOkBody tx_select_ok;
+
+ MethodBodyInstances(uint8_t major, uint8_t minor) :
+ version(major, minor),
+ basic_cancel_ok(version),
+ basic_consume_ok(version),
+ basic_deliver(version),
+ basic_get_empty(version),
+ basic_get_ok(version),
+ basic_qos_ok(version),
+ basic_return(version),
+ channel_close(version),
+ channel_close_ok(version),
+ channel_flow(version),
+ channel_open_ok(version),
+ connection_close(version),
+ connection_close_ok(version),
+ connection_open_ok(version),
+ connection_redirect(version),
+ connection_start(version),
+ connection_tune(version),
+ exchange_declare_ok(version),
+ exchange_delete_ok(version),
+ queue_declare_ok(version),
+ queue_delete_ok(version),
+ queue_bind_ok(version),
+ tx_commit_ok(version),
+ tx_rollback_ok(version),
+ tx_select_ok(version)
+ {}
+
+};
+
+static MethodBodyInstances method_bodies(8, 0);
+
+} // namespace client
+} // namespace qpid
+
+#endif
diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp
new file mode 100644
index 0000000000..10ad5c6fa2
--- /dev/null
+++ b/cpp/src/qpid/client/ResponseHandler.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/QpidError.h"
+#include <boost/format.hpp>
+#include "ResponseHandler.h"
+#include "qpid/framing/AMQMethodBody.h"
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace client {
+
+ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {}
+
+ResponseHandler::~ResponseHandler(){}
+
+bool ResponseHandler::isWaiting() {
+ Monitor::ScopedLock l(monitor);
+ return waiting;
+}
+
+void ResponseHandler::expect(){
+ Monitor::ScopedLock l(monitor);
+ waiting = true;
+}
+
+void ResponseHandler::signalResponse(MethodPtr _response)
+{
+ Monitor::ScopedLock l(monitor);
+ response = _response;
+ if (!response)
+ shutdownFlag=true;
+ waiting = false;
+ monitor.notify();
+}
+
+ResponseHandler::MethodPtr ResponseHandler::receive() {
+ Monitor::ScopedLock l(monitor);
+ while (!response && !shutdownFlag)
+ monitor.wait();
+ if (shutdownFlag)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, "Channel closed unexpectedly.");
+ MethodPtr result = response;
+ response.reset();
+ return result;
+}
+
+ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) {
+ MethodPtr response = receive();
+ if(c != response->amqpClassId() || m != response->amqpMethodId()) {
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR,
+ boost::format("Expected class:method %d:%d, got %d:%d")
+ % c % m % response->amqpClassId() % response->amqpMethodId());
+ }
+ return response;
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ResponseHandler.h b/cpp/src/qpid/client/ResponseHandler.h
new file mode 100644
index 0000000000..e599e4c8d1
--- /dev/null
+++ b/cpp/src/qpid/client/ResponseHandler.h
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+#include "qpid/sys/Monitor.h"
+
+#ifndef _ResponseHandler_
+#define _ResponseHandler_
+
+namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+}
+
+namespace client {
+
+/**
+ * Holds a response from the broker peer for the client.
+ */
+class ResponseHandler{
+ typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
+ bool waiting;
+ bool shutdownFlag;
+ MethodPtr response;
+ sys::Monitor monitor;
+
+ public:
+ ResponseHandler();
+ ~ResponseHandler();
+
+ /** Is a response expected? */
+ bool isWaiting();
+
+ /** Provide a response to the waiting thread */
+ void signalResponse(MethodPtr response);
+
+ /** Indicate a message is expected. */
+ void expect();
+
+ /** Wait for a response. */
+ MethodPtr receive();
+
+ /** Wait for a specific response. */
+ MethodPtr receive(framing::ClassId, framing::MethodId);
+
+ /** Template version of receive returns typed pointer. */
+ template <class BodyType>
+ shared_ptr<BodyType> receive() {
+ return shared_polymorphic_downcast<BodyType>(
+ receive(BodyType::CLASS_ID, BodyType::METHOD_ID));
+ }
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/client/ReturnedMessageHandler.cpp b/cpp/src/qpid/client/ReturnedMessageHandler.cpp
new file mode 100644
index 0000000000..35d0b5c0a8
--- /dev/null
+++ b/cpp/src/qpid/client/ReturnedMessageHandler.cpp
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReturnedMessageHandler.h"
+
+qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {}
diff --git a/cpp/src/qpid/client/ReturnedMessageHandler.h b/cpp/src/qpid/client/ReturnedMessageHandler.h
new file mode 100644
index 0000000000..8b42fc0764
--- /dev/null
+++ b/cpp/src/qpid/client/ReturnedMessageHandler.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _ReturnedMessageHandler_
+#define _ReturnedMessageHandler_
+
+#include "ClientMessage.h"
+
+namespace qpid {
+namespace client {
+
+ /**
+ * An interface through which returned messages can be received by
+ * an application.
+ *
+ * @see Channel::setReturnedMessageHandler()
+ *
+ * \ingroup clientapi
+ */
+ class ReturnedMessageHandler{
+ public:
+ virtual ~ReturnedMessageHandler();
+ virtual void returned(Message& msg) = 0;
+ };
+
+}
+}
+
+
+#endif