summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-04-13 20:58:27 +0000
committerAlan Conway <aconway@apache.org>2007-04-13 20:58:27 +0000
commit26a723475dc6926bde883c8c7f983ee44d8deb01 (patch)
treed3be63007fb9799549310ee31cfd1aae589bfd96 /qpid/cpp/src/client
parent52a1b1fbcaf2935874f8ab7b85a06d09eed4a94c (diff)
downloadqpid-python-26a723475dc6926bde883c8c7f983ee44d8deb01.tar.gz
Moved src/ source code to src/qpid directory:
- allows rhm package to build consistently against checked-out or installed qpid. - consistent correspondence between source paths and C++ namespaces. - consistent use of #include <qpid/foo> in source and by users. - allows header files to split over multiple directories, e.g. separating generated code, separating public API from private files. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@528668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/client')
-rw-r--r--qpid/cpp/src/client/AckMode.h102
-rw-r--r--qpid/cpp/src/client/BasicMessageChannel.cpp345
-rw-r--r--qpid/cpp/src/client/BasicMessageChannel.h90
-rw-r--r--qpid/cpp/src/client/ClientAdapter.cpp70
-rw-r--r--qpid/cpp/src/client/ClientAdapter.h66
-rw-r--r--qpid/cpp/src/client/ClientChannel.cpp341
-rw-r--r--qpid/cpp/src/client/ClientChannel.h356
-rw-r--r--qpid/cpp/src/client/ClientConnection.cpp156
-rw-r--r--qpid/cpp/src/client/ClientExchange.cpp34
-rw-r--r--qpid/cpp/src/client/ClientExchange.h106
-rw-r--r--qpid/cpp/src/client/ClientMessage.h64
-rw-r--r--qpid/cpp/src/client/ClientQueue.cpp58
-rw-r--r--qpid/cpp/src/client/ClientQueue.h103
-rw-r--r--qpid/cpp/src/client/Connection.h179
-rw-r--r--qpid/cpp/src/client/Connector.cpp188
-rw-r--r--qpid/cpp/src/client/Connector.h98
-rw-r--r--qpid/cpp/src/client/IncomingMessage.cpp168
-rw-r--r--qpid/cpp/src/client/IncomingMessage.h136
-rw-r--r--qpid/cpp/src/client/MessageChannel.h94
-rw-r--r--qpid/cpp/src/client/MessageListener.cpp24
-rw-r--r--qpid/cpp/src/client/MessageListener.h49
-rw-r--r--qpid/cpp/src/client/MessageMessageChannel.cpp431
-rw-r--r--qpid/cpp/src/client/MessageMessageChannel.h82
-rw-r--r--qpid/cpp/src/client/MethodBodyInstances.h100
-rw-r--r--qpid/cpp/src/client/ResponseHandler.cpp79
-rw-r--r--qpid/cpp/src/client/ResponseHandler.h75
-rw-r--r--qpid/cpp/src/client/ReturnedMessageHandler.cpp24
-rw-r--r--qpid/cpp/src/client/ReturnedMessageHandler.h49
28 files changed, 0 insertions, 3667 deletions
diff --git a/qpid/cpp/src/client/AckMode.h b/qpid/cpp/src/client/AckMode.h
deleted file mode 100644
index 9ad5ef925c..0000000000
--- a/qpid/cpp/src/client/AckMode.h
+++ /dev/null
@@ -1,102 +0,0 @@
-#ifndef _client_AckMode_h
-#define _client_AckMode_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace client {
-
-/**
- * The available acknowledgements modes.
- *
- * \ingroup clientapi
- */
-enum AckMode {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_AckMode_h*/
-#ifndef _client_AckMode_h
-#define _client_AckMode_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace client {
-
-/**
- * The available acknowledgements modes.
- *
- * \ingroup clientapi
- */
-enum AckMode {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_AckMode_h*/
diff --git a/qpid/cpp/src/client/BasicMessageChannel.cpp b/qpid/cpp/src/client/BasicMessageChannel.cpp
deleted file mode 100644
index c577c0a305..0000000000
--- a/qpid/cpp/src/client/BasicMessageChannel.cpp
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "BasicMessageChannel.h"
-#include "../framing/AMQMethodBody.h"
-#include "ClientChannel.h"
-#include "ReturnedMessageHandler.h"
-#include "MessageListener.h"
-#include "../framing/FieldTable.h"
-#include "Connection.h"
-#include <queue>
-#include <iostream>
-#include <boost/format.hpp>
-#include <boost/variant.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace std;
-using namespace sys;
-using namespace framing;
-using boost::format;
-
-namespace {
-
-// Destination name constants
-const std::string BASIC_GET("__basic_get__");
-const std::string BASIC_RETURN("__basic_return__");
-
-// Reference name constant
-const std::string BASIC_REF("__basic_reference__");
-}
-
-BasicMessageChannel::BasicMessageChannel(Channel& ch)
- : channel(ch), returnsHandler(0)
-{
- incoming.addDestination(BASIC_RETURN, destDispatch);
-}
-
-void BasicMessageChannel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
- {
- // Note we create a consumer even if tag="". In that case
- // It will be renamed when we handle BasicConsumeOkBody.
- //
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- THROW_QPID_ERROR(CLIENT_ERROR,
- "Consumer already exists with tag="+tag);
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
- }
-
- // FIXME aconway 2007-03-23: get processed in both.
-
- // BasicConsumeOkBody is really processed in handle(), here
- // we just pick up the tag to return to the user.
- //
- // We can't process it here because messages for the consumer may
- // already be arriving.
- //
- BasicConsumeOkBody::shared_ptr ok =
- channel.sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- make_shared_ptr(new BasicConsumeBody(
- channel.version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable())));
- tag = ok->getConsumerTag();
-}
-
-
-void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
- channel.sendAndReceiveSync<BasicCancelOkBody>(
- synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
-}
-
-void BasicMessageChannel::close(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
- destGet.shutdown();
- destDispatch.shutdown();
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
- {
- Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
- {
- channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
- }
- }
-}
-
-
-bool BasicMessageChannel::get(
- Message& msg, const Queue& queue, AckMode ackMode)
-{
- // Prepare for incoming response
- incoming.addDestination(BASIC_GET, destGet);
- channel.send(
- new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- bool got = destGet.wait(msg);
- return got;
-}
-
-void BasicMessageChannel::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- const string e = exchange.getName();
- string key = routingKey;
-
- // Make a header for the message
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(
- *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
- header->setContentSize(msg.getData().size());
-
- channel.send(
- new BasicPublishBody(
- channel.version, 0, e, key, mandatory, immediate));
- channel.send(header);
- string data = msg.getData();
- u_int64_t data_length = data.length();
- if(data_length > 0){
- //frame itself uses 8 bytes
- u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
- if(data_length < frag_size){
- channel.send(new AMQContentBody(data));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- channel.send(new AMQContentBody(frag));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
-void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
- assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
- switch(method->amqpMethodId()) {
- case BasicGetOkBody::METHOD_ID: {
- incoming.openReference(BASIC_REF);
- incoming.createMessage(BASIC_GET, BASIC_REF);
- return;
- }
- case BasicGetEmptyBody::METHOD_ID: {
- incoming.getDestination(BASIC_GET).empty();
- incoming.removeDestination(BASIC_GET);
- return;
- }
- case BasicDeliverBody::METHOD_ID: {
- BasicDeliverBody::shared_ptr deliver=
- boost::shared_polymorphic_downcast<BasicDeliverBody>(method);
- incoming.openReference(BASIC_REF);
- Message& msg = incoming.createMessage(
- deliver->getConsumerTag(), BASIC_REF);
- msg.setDestination(deliver->getConsumerTag());
- msg.setDeliveryTag(deliver->getDeliveryTag());
- msg.setRedelivered(deliver->getRedelivered());
- return;
- }
- case BasicReturnBody::METHOD_ID: {
- incoming.openReference(BASIC_REF);
- incoming.createMessage(BASIC_RETURN, BASIC_REF);
- return;
- }
- case BasicConsumeOkBody::METHOD_ID: {
- Mutex::ScopedLock l(lock);
- BasicConsumeOkBody::shared_ptr consumeOk =
- boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method);
- std::string tag = consumeOk->getConsumerTag();
- ConsumerMap::iterator i = consumers.find(std::string());
- if (i != consumers.end()) {
- // Need to rename the un-named consumer.
- if (consumers.find(tag) == consumers.end()) {
- consumers[tag] = i->second;
- consumers.erase(i);
- }
- else // Tag already exists.
- throw ChannelException(404, "Tag already exists: "+tag);
- }
- // FIXME aconway 2007-03-23: Integrate consumer & destination
- // maps.
- incoming.addDestination(tag, destDispatch);
- return;
- }
- }
- throw Channel::UnknownMethod();
-}
-
-void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) {
- BasicHeaderProperties* props =
- boost::polymorphic_downcast<BasicHeaderProperties*>(
- header->getProperties());
- IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF);
- assert (ref.messages.size() == 1);
- ref.messages.front().BasicHeaderProperties::operator=(*props);
- incoming_size = header->getContentSize();
- if (incoming_size==0)
- incoming.closeReference(BASIC_REF);
-}
-
-void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){
- incoming.appendReference(BASIC_REF, content->getData());
- size_t size = incoming.getReference(BASIC_REF).data.size();
- if (size >= incoming_size) {
- incoming.closeReference(BASIC_REF);
- if (size > incoming_size)
- throw ChannelException(502, "Content exceeded declared size");
- }
-}
-
-void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
- //record delivery tag:
- consumer.lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer.listener->received(msg);
-
- if(channel.isOpen()){
- bool multiple(false);
- switch(consumer.ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer.count) < channel.getPrefetch())
- break;
- //else drop-through
- case AUTO_ACK:
- consumer.lastDeliveryTag = 0;
- channel.send(
- new BasicAckBody(
- channel.version,
- msg.getDeliveryTag(),
- multiple));
- case NO_ACK: // Nothing to do
- case CLIENT_ACK: // User code must ack.
- break;
- // TODO aconway 2007-02-22: Provide a way for user
- // to ack!
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-
-void BasicMessageChannel::run() {
- while(channel.isOpen()) {
- try {
- Message msg;
- bool gotMessge = destDispatch.wait(msg);
- if (gotMessge) {
- if(msg.getDestination() == BASIC_RETURN) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler != 0)
- handler->returned(msg);
- }
- else {
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(
- msg.getDestination());
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" +
- msg.getDestination());
- consumer = i->second;
- }
- deliver(consumer, msg);
- }
- }
- }
- catch (const ShutdownException&) {
- /* Orderly shutdown */
- }
- catch (const Exception& e) {
- // FIXME aconway 2007-02-20: Report exception to user.
- cout << "client::BasicMessageChannel::run() terminated by: "
- << e.toString() << endl;
- }
- }
-}
-
-void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- Mutex::ScopedLock l(lock);
- returnsHandler = handler;
-}
-
-void BasicMessageChannel::setQos(){
- channel.sendAndReceive<BasicQosOkBody>(
- make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
- if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
-}
-
-}} // namespace qpid::client
diff --git a/qpid/cpp/src/client/BasicMessageChannel.h b/qpid/cpp/src/client/BasicMessageChannel.h
deleted file mode 100644
index 13e1cf1e00..0000000000
--- a/qpid/cpp/src/client/BasicMessageChannel.h
+++ /dev/null
@@ -1,90 +0,0 @@
-#ifndef _client_BasicMessageChannel_h
-#define _client_BasicMessageChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "MessageChannel.h"
-#include "IncomingMessage.h"
-#include <boost/scoped_ptr.hpp>
-
-namespace qpid {
-namespace client {
-/**
- * Messaging implementation using AMQP 0-8 BasicMessageChannel class
- * to send and receiving messages.
- */
-class BasicMessageChannel : public MessageChannel
-{
- public:
- BasicMessageChannel(Channel& parent);
-
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- void cancel(const std::string& tag, bool synch = true);
-
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- void run();
-
- void handle(boost::shared_ptr<framing::AMQMethodBody>);
-
- void handle(shared_ptr<framing::AMQHeaderBody>);
-
- void handle(shared_ptr<framing::AMQContentBody>);
-
- void setQos();
-
- void close();
-
- private:
-
- struct Consumer{
- MessageListener* listener;
- AckMode ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string, Consumer> ConsumerMap;
-
- void deliver(Consumer& consumer, Message& msg);
-
- sys::Mutex lock;
- Channel& channel;
- IncomingMessage incoming;
- uint64_t incoming_size;
- ConsumerMap consumers ;
- ReturnedMessageHandler* returnsHandler;
- IncomingMessage::WaitableDestination destGet;
- IncomingMessage::WaitableDestination destDispatch;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_BasicMessageChannel_h*/
diff --git a/qpid/cpp/src/client/ClientAdapter.cpp b/qpid/cpp/src/client/ClientAdapter.cpp
deleted file mode 100644
index f5eb2f4536..0000000000
--- a/qpid/cpp/src/client/ClientAdapter.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "../gen/AMQP_ClientOperations.h"
-#include "ClientAdapter.h"
-#include "Connection.h"
-#include "../Exception.h"
-#include "../framing/AMQMethodBody.h"
-
-namespace qpid {
-namespace client {
-
-using namespace qpid;
-using namespace qpid::framing;
-
-typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
-void ClientAdapter::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- method->invoke(*clientOps, context);
- }catch(ChannelException& e){
- connection.client->getChannel().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.client->getConnection().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.client->getConnection().close(
- context, 541/*internal error*/, e.what(),
- method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
- channel->handleHeader(body);
-}
-
-void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) {
- channel->handleContent(body);
-}
-
-void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
- // TODO aconway 2007-01-17: Implement heartbeats.
-}
-
-
-
-}} // namespace qpid::client
-
diff --git a/qpid/cpp/src/client/ClientAdapter.h b/qpid/cpp/src/client/ClientAdapter.h
deleted file mode 100644
index ca029a793f..0000000000
--- a/qpid/cpp/src/client/ClientAdapter.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#ifndef _client_ClientAdapter_h
-#define _client_ClientAdapter_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "../framing/ChannelAdapter.h"
-#include "ClientChannel.h"
-
-namespace qpid {
-namespace client {
-
-class AMQMethodBody;
-class Connection;
-
-/**
- * Per-channel protocol adapter.
- *
- * Translates protocol bodies into calls on the core Channel,
- * Connection and Client objects.
- *
- * Owns a channel, has references to Connection and Client.
- */
-class ClientAdapter : public framing::ChannelAdapter
-{
- public:
- ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&);
- Channel& getChannel() { return *channel; }
-
- void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
-
- private:
- void handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const framing::MethodContext& context);
-
- class ClientOps;
-
- std::auto_ptr<Channel> channel;
- Connection& connection;
- Client& client;
- boost::shared_ptr<ClientOps> clientOps;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_ClientAdapter_h*/
diff --git a/qpid/cpp/src/client/ClientChannel.cpp b/qpid/cpp/src/client/ClientChannel.cpp
deleted file mode 100644
index 533b590010..0000000000
--- a/qpid/cpp/src/client/ClientChannel.cpp
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include "ClientChannel.h"
-#include "../sys/Monitor.h"
-#include "ClientMessage.h"
-#include "../QpidError.h"
-#include "MethodBodyInstances.h"
-#include "Connection.h"
-#include "BasicMessageChannel.h"
-#include "MessageMessageChannel.h"
-
-// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
-// handling of errors that should close the connection or the channel.
-// Make sure the user thread receives a connection in each case.
-//
-using namespace std;
-using namespace boost;
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
- connection(0), prefetch(_prefetch), transactional(_transactional)
-{
- switch (mode) {
- case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
- case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break;
- default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
- }
-}
-
-Channel::~Channel(){
- close();
-}
-
-void Channel::open(ChannelId id, Connection& con)
-{
- if (isOpen())
- THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
- connection = &con;
- init(id, con, con.getVersion()); // ChannelAdapter initialization.
- string oob;
- if (id != 0)
- sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
-}
-
-void Channel::protocolInit(
- const std::string& uid, const std::string& pwd, const std::string& vhost) {
- assert(connection);
- responses.expect();
- connection->connector->init(); // Send ProtocolInit block.
- ConnectionStartBody::shared_ptr connectionStart =
- responses.receive<ConnectionStartBody>();
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- ConnectionTuneBody::shared_ptr proposal =
- sendAndReceive<ConnectionTuneBody>(
- make_shared_ptr(new ConnectionStartOkBody(
- version, connectionStart->getRequestId(),
- props, mechanism,
- response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- send(new ConnectionTuneOkBody(
- version, proposal->getRequestId(),
- proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat()));
-
- uint16_t heartbeat = proposal->getHeartbeat();
- connection->connector->setReadTimeout(heartbeat * 2);
- connection->connector->setWriteTimeout(heartbeat);
-
- // Send connection open.
- std::string capabilities;
- responses.expect();
- send(new ConnectionOpenBody(version, vhost, capabilities, true));
- //receive connection.open-ok (or redirect, but ignore that for now
- //esp. as using force=true).
- AMQMethodBody::shared_ptr openResponse = responses.receive();
- if(openResponse->isA<ConnectionOpenOkBody>()) {
- //ok
- }else if(openResponse->isA<ConnectionRedirectBody>()){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(
- shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse));
- cout << "Received redirection to " << redirect->getHost()
- << endl;
- } else {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open");
- }
-}
-
-bool Channel::isOpen() const { return connection; }
-
-void Channel::setQos() {
- messaging->setQos();
-}
-
-void Channel::setPrefetch(uint16_t _prefetch){
- prefetch = _prefetch;
- setQos();
-}
-
-void Channel::declareExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- string type = exchange.getType();
- FieldTable args;
- sendAndReceiveSync<ExchangeDeclareOkBody>(
- synch,
- make_shared_ptr(new ExchangeDeclareBody(
- version, 0, name, type, false, false, false, false, !synch, args)));
-}
-
-void Channel::deleteExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- sendAndReceiveSync<ExchangeDeleteOkBody>(
- synch,
- make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
-}
-
-void Channel::declareQueue(Queue& queue, bool synch){
- string name = queue.getName();
- FieldTable args;
- QueueDeclareOkBody::shared_ptr response =
- sendAndReceiveSync<QueueDeclareOkBody>(
- synch,
- make_shared_ptr(new QueueDeclareBody(
- version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
- if(synch) {
- if(queue.getName().length() == 0)
- queue.setName(response->getQueue());
- }
-}
-
-void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- //ticket, queue, ifunused, ifempty, nowait
- string name = queue.getName();
- sendAndReceiveSync<QueueDeleteOkBody>(
- synch,
- make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
-}
-
-void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
- string e = exchange.getName();
- string q = queue.getName();
- sendAndReceiveSync<QueueBindOkBody>(
- synch,
- make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
-}
-
-void Channel::commit(){
- sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
-}
-
-void Channel::rollback(){
- sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
-}
-
-void Channel::handleMethodInContext(
- AMQMethodBody::shared_ptr method, const MethodContext&)
-{
- // TODO aconway 2007-03-23: Special case for consume OK as it
- // is both an expected response and needs handling in this thread.
- // Need to review & reationalize the client-side processing model.
- if (method->isA<BasicConsumeOkBody>()) {
- messaging->handle(method);
- responses.signalResponse(method);
- return;
- }
- if(responses.isWaiting()) {
- responses.signalResponse(method);
- return;
- }
- try {
- switch (method->amqpClassId()) {
- case MessageOkBody::CLASS_ID:
- case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
- case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
- case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
- default: throw UnknownMethod();
- }
- }
- catch (const UnknownMethod&) {
- connection->close(
- 504, "Unknown method",
- method->amqpClassId(), method->amqpMethodId());
- }
- }
-
-void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
- switch (method->amqpMethodId()) {
- case ChannelCloseBody::METHOD_ID:
- peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
- return;
- case ChannelFlowBody::METHOD_ID:
- // FIXME aconway 2007-02-22: Not yet implemented.
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
- if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
- connection->close();
- return;
- }
- throw UnknownMethod();
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr body){
- messaging->handle(body);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
-}
-
-void Channel::start(){
- dispatcher = Thread(*messaging);
-}
-
-// Close called by local application.
-void Channel::close(
- uint16_t code, const std::string& text,
- ClassId classId, MethodId methodId)
-{
- if (isOpen()) {
- try {
- if (getId() != 0) {
- sendAndReceive<ChannelCloseOkBody>(
- make_shared_ptr(new ChannelCloseBody(
- version, code, text, classId, methodId)));
- }
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- } catch (...) {
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
- closeInternal();
- throw;
- }
- }
-}
-
-// Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr) {
- assert(isOpen());
- closeInternal();
-}
-
-void Channel::closeInternal() {
- if (isOpen());
- {
- messaging->close();
- connection = 0;
- // A 0 response means we are closed.
- responses.signalResponse(AMQMethodBody::shared_ptr());
- }
- dispatcher.join();
-}
-
-AMQMethodBody::shared_ptr Channel::sendAndReceive(
- AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
-{
- responses.expect();
- send(toSend);
- return responses.receive(c, m);
-}
-
-AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
- bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
-{
- if(sync)
- return sendAndReceive(body, c, m);
- else {
- send(body);
- return AMQMethodBody::shared_ptr();
- }
-}
-
-void Channel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
- messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
-}
-
-void Channel::cancel(const std::string& tag, bool synch) {
- messaging->cancel(tag, synch);
-}
-
-bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- return messaging->get(msg, queue, ackMode);
-}
-
-void Channel::publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory, bool immediate) {
- messaging->publish(msg, exchange, routingKey, mandatory, immediate);
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
- messaging->setReturnedMessageHandler(handler);
-}
-
-void Channel::run() {
- messaging->run();
-}
-
diff --git a/qpid/cpp/src/client/ClientChannel.h b/qpid/cpp/src/client/ClientChannel.h
deleted file mode 100644
index 328fc23f68..0000000000
--- a/qpid/cpp/src/client/ClientChannel.h
+++ /dev/null
@@ -1,356 +0,0 @@
-#ifndef _client_ClientChannel_h
-#define _client_ClientChannel_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/scoped_ptr.hpp>
-#include "../framing/amqp_framing.h"
-#include "ClientExchange.h"
-#include "ClientMessage.h"
-#include "ClientQueue.h"
-#include "ResponseHandler.h"
-#include "../framing/ChannelAdapter.h"
-#include "../sys/Thread.h"
-#include "AckMode.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelCloseBody;
-class AMQMethodBody;
-}
-
-namespace client {
-
-class Connection;
-class MessageChannel;
-class MessageListener;
-class ReturnedMessageHandler;
-
-/**
- * Represents an AMQP channel, i.e. loosely a session of work. It
- * is through a channel that most of the AMQP 'methods' are
- * exposed.
- *
- * \ingroup clientapi
- */
-class Channel : public framing::ChannelAdapter
-{
- private:
- struct UnknownMethod {};
- typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
-
- sys::Mutex lock;
- boost::scoped_ptr<MessageChannel> messaging;
- Connection* connection;
- sys::Thread dispatcher;
- ResponseHandler responses;
-
- uint16_t prefetch;
- const bool transactional;
- framing::ProtocolVersion version;
-
- void handleHeader(framing::AMQHeaderBody::shared_ptr body);
- void handleContent(framing::AMQContentBody::shared_ptr body);
- void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
- void handleMethodInContext(
- framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
- void handleChannel(framing::AMQMethodBody::shared_ptr method);
- void handleConnection(framing::AMQMethodBody::shared_ptr method);
-
- void setQos();
-
- void protocolInit(
- const std::string& uid, const std::string& pwd,
- const std::string& vhost);
-
- framing::AMQMethodBody::shared_ptr sendAndReceive(
- framing::AMQMethodBody::shared_ptr,
- framing::ClassId, framing::MethodId);
-
- framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
- bool sync,
- framing::AMQMethodBody::shared_ptr,
- framing::ClassId, framing::MethodId);
-
- template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
- return boost::shared_polymorphic_downcast<BodyType>(
- sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
- }
-
- template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceiveSync(
- bool sync, framing::AMQMethodBody::shared_ptr body) {
- return boost::shared_polymorphic_downcast<BodyType>(
- sendAndReceiveSync(
- sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
- }
-
- void open(framing::ChannelId, Connection&);
- void closeInternal();
- void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
-
- // FIXME aconway 2007-02-23: Get rid of friendships.
- friend class Connection;
- friend class BasicMessageChannel; // for sendAndReceive.
- friend class MessageMessageChannel; // for sendAndReceive.
-
- public:
- enum InteropMode { AMQP_08, AMQP_09 };
-
- /**
- * Creates a channel object.
- *
- * @param transactional if true, the publishing and acknowledgement
- * of messages will be transactional and can be committed or
- * aborted in atomic units (@see commit(), @see rollback())
- *
- * @param prefetch specifies the number of unacknowledged
- * messages the channel is willing to have sent to it
- * asynchronously
- *
- * @param messageImpl Alternate messaging implementation class to
- * allow alternate protocol implementations of messaging
- * operations. Takes ownership.
- */
- Channel(
- bool transactional = false, u_int16_t prefetch = 500,
- InteropMode=AMQP_08);
-
- ~Channel();
-
- /**
- * Declares an exchange.
- *
- * In AMQP Exchanges are the destinations to which messages
- * are published. They have Queues bound to them and route
- * messages they receive to those queues. The routing rules
- * depend on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareExchange(Exchange& exchange, bool synch = true);
- /**
- * Deletes an exchange
- *
- * @param exchange an Exchange object representing the exchange to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteExchange(Exchange& exchange, bool synch = true);
- /**
- * Declares a Queue
- *
- * @param queue a Queue object representing the queue to declare
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void declareQueue(Queue& queue, bool synch = true);
- /**
- * Deletes a Queue
- *
- * @param queue a Queue object representing the queue to delete
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
- /**
- * Binds a queue to an exchange. The exact semantics of this
- * (in particular how 'routing keys' and 'binding arguments'
- * are used) depends on the type of the exchange.
- *
- * @param exchange an Exchange object representing the
- * exchange to bind to
- *
- * @param queue a Queue object representing the queue to be
- * bound
- *
- * @param key the 'routing key' for the binding
- *
- * @param args the 'binding arguments' for the binding
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void bind(const Exchange& exchange, const Queue& queue,
- const std::string& key, const framing::FieldTable& args,
- bool synch = true);
-
- /**
- * For a transactional channel this will commit all
- * publications and acknowledgements since the last commit (or
- * the channel was opened if there has been no previous
- * commit). This will cause published messages to become
- * available to consumers and acknowledged messages to be
- * consumed and removed from the queues they were dispatched
- * from.
- *
- * Transactionailty of a channel is specified when the channel
- * object is created (@see Channel()).
- */
- void commit();
-
- /**
- * For a transactional channel, this will rollback any
- * publications or acknowledgements. It will be as if the
- * ppblished messages were never sent and the acknowledged
- * messages were never consumed.
- */
- void rollback();
-
- /**
- * Change the prefetch in use.
- */
- void setPrefetch(uint16_t prefetch);
-
- uint16_t getPrefetch() { return prefetch; }
-
- /**
- * Start message dispatching on a new thread
- */
- void start();
-
- /**
- * Close the channel with optional error information.
- * Closing a channel that is not open has no effect.
- */
- void close(
- framing::ReplyCode = 200, const std::string& ="OK",
- framing::ClassId = 0, framing::MethodId = 0);
-
- /** True if the channel is transactional */
- bool isTransactional() { return transactional; }
-
- /** True if the channel is open */
- bool isOpen() const;
-
- /** Get the connection associated with this channel */
- Connection& getConnection() { return *connection; }
-
- /** Return the protocol version */
- framing::ProtocolVersion getVersion() const { return version ; }
-
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see AckMode
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(const std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * AckMode)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- /**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
- */
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- /**
- * Deliver messages from the broker to the appropriate MessageListener.
- */
- void run();
-
-
-};
-
-}}
-
-#endif /*!_client_ClientChannel_h*/
diff --git a/qpid/cpp/src/client/ClientConnection.cpp b/qpid/cpp/src/client/ClientConnection.cpp
deleted file mode 100644
index b053a45b0f..0000000000
--- a/qpid/cpp/src/client/ClientConnection.cpp
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <algorithm>
-#include <boost/format.hpp>
-#include <boost/bind.hpp>
-
-#include "Connection.h"
-#include "ClientChannel.h"
-#include "ClientMessage.h"
-#include "../QpidError.h"
-#include <iostream>
-#include <sstream>
-#include "MethodBodyInstances.h"
-#include <functional>
-
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-
-namespace qpid {
-namespace client {
-
-const std::string Connection::OK("OK");
-
-Connection::Connection(
- bool _debug, uint32_t _max_frame_size,
- framing::ProtocolVersion _version
-) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
- defaultConnector(version, _debug, _max_frame_size),
- isOpen(false), debug(_debug)
-{
- setConnector(defaultConnector);
-}
-
-Connection::~Connection(){}
-
-void Connection::setConnector(Connector& con)
-{
- connector = &con;
- connector->setInputHandler(this);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
- out = connector->getOutputHandler();
-}
-
-void Connection::open(
- const std::string& host, int port,
- const std::string& uid, const std::string& pwd, const std::string& vhost)
-{
- if (isOpen)
- THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
- connector->connect(host, port);
- channels[0] = &channel0;
- channel0.open(0, *this);
- channel0.protocolInit(uid, pwd, vhost);
- isOpen = true;
-}
-
-void Connection::shutdown() {
- close();
-}
-
-void Connection::close(
- ReplyCode code, const string& msg, ClassId classId, MethodId methodId
-)
-{
- if(isOpen) {
- // TODO aconway 2007-01-29: Exception handling - could end up
- // partly closed with threads left unjoined.
- isOpen = false;
- channel0.sendAndReceive<ConnectionCloseOkBody>(
- make_shared_ptr(new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId)));
-
- using boost::bind;
- for_each(channels.begin(), channels.end(),
- bind(&Channel::closeInternal,
- bind(&ChannelMap::value_type::second, _1)));
- channels.clear();
- connector->close();
- }
-}
-
-void Connection::openChannel(Channel& channel) {
- ChannelId id = ++channelIdCounter;
- assert (channels.find(id) == channels.end());
- assert(out);
- channels[id] = &channel;
- channel.open(id, *this);
-}
-
-void Connection::erase(ChannelId id) {
- channels.erase(id);
-}
-
-void Connection::received(AMQFrame* frame){
- // FIXME aconway 2007-01-25: Mutex
- ChannelId id = frame->getChannel();
- Channel* channel = channels[id];
- // FIXME aconway 2007-01-26: Exception thrown here is hanging the
- // client. Need to review use of exceptions.
- if (channel == 0)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR+504,
- (boost::format("Invalid channel number %g") % id).str());
- try{
- channel->handleBody(frame->getBody());
- }catch(const qpid::QpidError& e){
- channelException(
- *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
- }
-}
-
-void Connection::send(AMQFrame* frame) {
- out->send(frame);
-}
-
-void Connection::channelException(
- Channel& channel, AMQMethodBody* method, const QpidError& e)
-{
- int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
- string msg = e.msg;
- if(method == 0)
- channel.close(code, msg);
- else
- channel.close(
- code, msg, method->amqpClassId(), method->amqpMethodId());
-}
-
-void Connection::idleIn(){
- connector->close();
-}
-
-void Connection::idleOut(){
- out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
-}
-
-}} // namespace qpid::client
diff --git a/qpid/cpp/src/client/ClientExchange.cpp b/qpid/cpp/src/client/ClientExchange.cpp
deleted file mode 100644
index d5914beea2..0000000000
--- a/qpid/cpp/src/client/ClientExchange.cpp
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ClientExchange.h"
-
-qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
-const std::string& qpid::client::Exchange::getName() const { return name; }
-const std::string& qpid::client::Exchange::getType() const { return type; }
-
-const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
-const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
-const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
-
-const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);
diff --git a/qpid/cpp/src/client/ClientExchange.h b/qpid/cpp/src/client/ClientExchange.h
deleted file mode 100644
index a8ac21fa9b..0000000000
--- a/qpid/cpp/src/client/ClientExchange.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-#ifndef _Exchange_
-#define _Exchange_
-
-namespace qpid {
-namespace client {
-
- /**
- * A 'handle' used to represent an AMQP exchange in the Channel
- * methods. Exchanges are the destinations to which messages are
- * published.
- *
- * There are different types of exchange (the standard types are
- * available as static constants, see DIRECT_EXCHANGE,
- * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to
- * an exchange using Channel::bind() and messages published to
- * that exchange are then routed to the queue based on the details
- * of the binding and the type of exchange.
- *
- * There are some standard exchange instances that are predeclared
- * on all AMQP brokers. These are defined as static members
- * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and
- * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange
- * (member DEFAULT_EXCHANGE) which is nameless and of type
- * 'direct' and has every declared queue bound to it by queue
- * name.
- *
- * \ingroup clientapi
- */
- class Exchange{
- const std::string name;
- const std::string type;
-
- public:
- /**
- * A direct exchange routes messages published with routing
- * key X to any queue bound with key X (i.e. an exact match is
- * used).
- */
- static const std::string DIRECT_EXCHANGE;
- /**
- * A topic exchange treat the key with which a queue is bound
- * as a pattern and routes all messages whose routing keys
- * match that pattern to the bound queue. The routing key for
- * a message must consist of zero or more alpha-numeric words
- * delimited by dots. The pattern is of a similar form but *
- * can be used to match excatly one word and # can be used to
- * match zero or more words.
- */
- static const std::string TOPIC_EXCHANGE;
- /**
- * The headers exchange routes messages based on whether their
- * headers match the binding arguments specified when
- * binding. (see the AMQP spec for more details).
- */
- static const std::string HEADERS_EXCHANGE;
-
- /**
- * The 'default' exchange, nameless and of type 'direct'. Has
- * every declared queue bound to it by name.
- */
- static const Exchange DEFAULT_EXCHANGE;
- /**
- * The standard direct exchange, named amq.direct.
- */
- static const Exchange STANDARD_DIRECT_EXCHANGE;
- /**
- * The standard topic exchange, named amq.topic.
- */
- static const Exchange STANDARD_TOPIC_EXCHANGE;
- /**
- * The standard headers exchange, named amq.header.
- */
- static const Exchange STANDARD_HEADERS_EXCHANGE;
-
- Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
- const std::string& getName() const;
- const std::string& getType() const;
- };
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/src/client/ClientMessage.h b/qpid/cpp/src/client/ClientMessage.h
deleted file mode 100644
index 35aed6c734..0000000000
--- a/qpid/cpp/src/client/ClientMessage.h
+++ /dev/null
@@ -1,64 +0,0 @@
-#ifndef _client_ClientMessage_h
-#define _client_ClientMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include "../framing/BasicHeaderProperties.h"
-
-namespace qpid {
-namespace client {
-
-/**
- * A representation of messages for sent or recived through the
- * client api.
- *
- * \ingroup clientapi
- */
-// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
-// basic header properties.
-class Message : public framing::BasicHeaderProperties {
- public:
- Message(const std::string& data_=std::string()) : data(data_) {}
-
- std::string getData() const { return data; }
- void setData(const std::string& _data) { data = _data; }
-
- std::string getDestination() const { return destination; }
- void setDestination(const std::string& dest) { destination = dest; }
-
- // TODO aconway 2007-03-22: only needed for Basic.deliver support.
- uint64_t getDeliveryTag() const { return deliveryTag; }
- void setDeliveryTag(uint64_t dt) { deliveryTag = dt; }
-
- bool isRedelivered() const { return redelivered; }
- void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
-
- private:
- std::string data;
- std::string destination;
- bool redelivered;
- uint64_t deliveryTag;
-};
-
-}}
-
-#endif /*!_client_ClientMessage_h*/
diff --git a/qpid/cpp/src/client/ClientQueue.cpp b/qpid/cpp/src/client/ClientQueue.cpp
deleted file mode 100644
index 613cf8d288..0000000000
--- a/qpid/cpp/src/client/ClientQueue.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ClientQueue.h"
-
-qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){}
-
-qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){}
-
-qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){}
-
-qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable)
- : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){}
-
-const std::string& qpid::client::Queue::getName() const{
- return name;
-}
-
-void qpid::client::Queue::setName(const std::string& _name){
- name = _name;
-}
-
-bool qpid::client::Queue::isAutoDelete() const{
- return autodelete;
-}
-
-bool qpid::client::Queue::isExclusive() const{
- return exclusive;
-}
-
-bool qpid::client::Queue::isDurable() const{
- return durable;
-}
-
-void qpid::client::Queue::setDurable(bool _durable){
- durable = _durable;
-}
-
-
-
-
diff --git a/qpid/cpp/src/client/ClientQueue.h b/qpid/cpp/src/client/ClientQueue.h
deleted file mode 100644
index b37a44b004..0000000000
--- a/qpid/cpp/src/client/ClientQueue.h
+++ /dev/null
@@ -1,103 +0,0 @@
-#ifndef _client_ClientQueue_h
-#define _client_ClientQueue_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-namespace qpid {
-namespace client {
-
- /**
- * A 'handle' used to represent an AMQP queue in the Channel
- * methods. Creating an instance of this class does not cause the
- * queue to be created on the broker. Rather, an instance of this
- * class should be passed to Channel::declareQueue() to ensure
- * that the queue exists or is created.
- *
- * Queues hold messages and allow clients to consume
- * (see Channel::consume()) or get (see Channel::get()) those messags. A
- * queue receives messages by being bound to one or more Exchange;
- * messages published to that exchange may then be routed to the
- * queue based on the details of the binding and the type of the
- * exchange (see Channel::bind()).
- *
- * Queues are identified by a name. They can be exclusive (in which
- * case they can only be used in the context of the connection
- * over which they were declared, and are deleted when then
- * connection closes), or they can be shared. Shared queues can be
- * auto deleted when they have no consumers.
- *
- * We use the term 'temporary queue' to refer to an exclusive
- * queue.
- *
- * \ingroup clientapi
- */
- class Queue{
- std::string name;
- const bool autodelete;
- const bool exclusive;
- bool durable;
-
- public:
-
- /**
- * Creates an unnamed, non-durable, temporary queue. A name
- * will be assigned to this queue instance by a call to
- * Channel::declareQueue().
- */
- Queue();
- /**
- * Creates a shared, non-durable, queue with a given name,
- * that will not be autodeleted.
- *
- * @param name the name of the queue
- */
- Queue(std::string name);
- /**
- * Creates a non-durable queue with a given name.
- *
- * @param name the name of the queue
- *
- * @param temp if true the queue will be a temporary queue, if
- * false it will be shared and not autodeleted.
- */
- Queue(std::string name, bool temp);
- /**
- * This constructor allows the autodelete, exclusive and
- * durable propeties to be explictly set. Note however that if
- * exclusive is true, autodelete has no meaning as exclusive
- * queues are always destroyed when the connection that
- * created them is closed.
- */
- Queue(std::string name, bool autodelete, bool exclusive, bool durable);
- const std::string& getName() const;
- void setName(const std::string&);
- bool isAutoDelete() const;
- bool isExclusive() const;
- bool isDurable() const;
- void setDurable(bool durable);
- };
-
-}
-}
-
-#endif /*!_client_ClientQueue_h*/
diff --git a/qpid/cpp/src/client/Connection.h b/qpid/cpp/src/client/Connection.h
deleted file mode 100644
index 5e0b413dac..0000000000
--- a/qpid/cpp/src/client/Connection.h
+++ /dev/null
@@ -1,179 +0,0 @@
-#ifndef _client_Connection_
-#define _client_Connection_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <map>
-#include <string>
-#include "../QpidError.h"
-#include "ClientChannel.h"
-#include "Connector.h"
-#include "../sys/ShutdownHandler.h"
-#include "../sys/TimeoutHandler.h"
-
-
-namespace qpid {
-
-/**
- * The client namespace contains all classes that make up a client
- * implementation of the AMQP protocol. The key classes that form
- * the basis of the client API to be used by applications are
- * Connection and Channel.
- */
-namespace client {
-
-/**
- * \internal provide access to selected private channel functions
- * for the Connection without making it a friend of the entire channel.
- */
-class ConnectionForChannel :
- public framing::InputHandler,
- public framing::OutputHandler,
- public sys::TimeoutHandler,
- public sys::ShutdownHandler
-
-{
- private:
- friend class Channel;
- virtual void erase(framing::ChannelId) = 0;
-};
-
-
-/**
- * \defgroup clientapi Application API for an AMQP client
- */
-
-/**
- * Represents a connection to an AMQP broker. All communication is
- * initiated by establishing a connection, then opening one or
- * more Channels over that connection.
- *
- * \ingroup clientapi
- */
-class Connection : public ConnectionForChannel
-{
- typedef std::map<framing::ChannelId, Channel*> ChannelMap;
-
- framing::ChannelId channelIdCounter;
- static const std::string OK;
-
- framing::ProtocolVersion version;
- const uint32_t max_frame_size;
- ChannelMap channels;
- Connector defaultConnector;
- Connector* connector;
- framing::OutputHandler* out;
- volatile bool isOpen;
- Channel channel0;
- bool debug;
-
- void erase(framing::ChannelId);
- void channelException(
- Channel&, framing::AMQMethodBody*, const QpidError&);
-
- // TODO aconway 2007-01-26: too many friendships, untagle these classes.
- friend class Channel;
-
- public:
- /**
- * Creates a connection object, but does not open the
- * connection.
- *
- * @param _version the version of the protocol to connect with
- *
- * @param debug turns on tracing for the connection
- * (i.e. prints details of the frames sent and received to std
- * out). Optional and defaults to false.
- *
- * @param max_frame_size the maximum frame size that the
- * client will accept. Optional and defaults to 65536.
- */
- Connection(bool debug = false, uint32_t max_frame_size = 65536,
- framing::ProtocolVersion=framing::highestProtocolVersion);
- ~Connection();
-
- /**
- * Opens a connection to a broker.
- *
- * @param host the host on which the broker is running
- *
- * @param port the port on the which the broker is listening
- *
- * @param uid the userid to connect with
- *
- * @param pwd the password to connect with (currently SASL
- * PLAIN is the only authentication method supported so this
- * is sent in clear text)
- *
- * @param virtualhost the AMQP virtual host to use (virtual
- * hosts, where implemented(!), provide namespace partitioning
- * within a single broker).
- */
- void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest",
- const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
-
- /**
- * Close the connection with optional error information for the peer.
- *
- * Any further use of this connection (without reopening it) will
- * not succeed.
- */
- void close(framing::ReplyCode=200, const std::string& msg=OK,
- framing::ClassId = 0, framing::MethodId = 0);
-
- /**
- * Associate a Channel with this connection and open it for use.
- *
- * In AMQP channels are like multi-plexed 'sessions' of work over
- * a connection. Almost all the interaction with AMQP is done over
- * a channel.
- *
- * @param connection the connection object to be associated with
- * the channel. Call Channel::close() to close the channel.
- */
- void openChannel(Channel&);
-
-
- // TODO aconway 2007-01-26: can these be private?
- void send(framing::AMQFrame*);
- void received(framing::AMQFrame*);
- void idleOut();
- void idleIn();
- void shutdown();
-
- /**\internal used for testing */
- void setConnector(Connector& connector);
-
- /**
- * @return the maximum frame size in use on this connection
- */
- inline uint32_t getMaxFrameSize(){ return max_frame_size; }
-
- /** @return protocol version in use on this connection. */
- framing::ProtocolVersion getVersion() const { return version; }
-};
-
-}} // namespace qpid::client
-
-
-#endif
diff --git a/qpid/cpp/src/client/Connector.cpp b/qpid/cpp/src/client/Connector.cpp
deleted file mode 100644
index 566e58ec13..0000000000
--- a/qpid/cpp/src/client/Connector.cpp
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include "../QpidError.h"
-#include "../sys/Time.h"
-#include "Connector.h"
-
-namespace qpid {
-namespace client {
-
-using namespace qpid::sys;
-using namespace qpid::framing;
-using qpid::QpidError;
-
-Connector::Connector(
- ProtocolVersion ver, bool _debug, uint32_t buffer_size
-) : debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- version(ver),
- closed(true),
- lastIn(0), lastOut(0),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- inbuf(receive_buffer_size),
- outbuf(send_buffer_size)
-{ }
-
-Connector::~Connector(){ }
-
-void Connector::connect(const std::string& host, int port){
- socket = Socket::createTcp();
- socket.connect(host, port);
- closed = false;
- receiver = Thread(this);
-}
-
-void Connector::init(){
- ProtocolInitiation init(version);
- writeBlock(&init);
-}
-
-void Connector::close(){
- closed = true;
- socket.close();
- receiver.join();
-}
-
-void Connector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void Connector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* Connector::getOutputHandler(){
- return this;
-}
-
-void Connector::send(AMQFrame* f){
- std::auto_ptr<AMQFrame> frame(f);
- AMQBody::shared_ptr body = frame->getBody();
- writeBlock(frame.get());
- if(debug) std::cout << "SENT: " << *frame << std::endl;
-}
-
-void Connector::writeBlock(AMQDataBlock* data){
- Mutex::ScopedLock l(writeLock);
- data->encode(outbuf);
- //transfer data to wire
- outbuf.flip();
- writeToSocket(outbuf.start(), outbuf.available());
- outbuf.clear();
-}
-
-void Connector::writeToSocket(char* data, size_t available){
- size_t written = 0;
- while(written < available && !closed){
- ssize_t sent = socket.send(data + written, available-written);
- if(sent > 0) {
- lastOut = now() * TIME_MSEC;
- written += sent;
- }
- }
-}
-
-void Connector::handleClosed(){
- closed = true;
- socket.close();
- if(shutdownHandler) shutdownHandler->shutdown();
-}
-
-void Connector::checkIdle(ssize_t status){
- if(timeoutHandler){
- Time t = now() * TIME_MSEC;
- if(status == Socket::SOCKET_TIMEOUT) {
- if(idleIn && (t - lastIn > idleIn)){
- timeoutHandler->idleIn();
- }
- }
- else if(status == 0 || status == Socket::SOCKET_EOF) {
- handleClosed();
- }
- else {
- lastIn = t;
- }
- if(idleOut && (t - lastOut > idleOut)){
- timeoutHandler->idleOut();
- }
- }
-}
-
-void Connector::setReadTimeout(uint16_t t){
- idleIn = t * 1000;//t is in secs
- if(idleIn && (!timeout || idleIn < timeout)){
- timeout = idleIn;
- setSocketTimeout();
- }
-
-}
-
-void Connector::setWriteTimeout(uint16_t t){
- idleOut = t * 1000;//t is in secs
- if(idleOut && (!timeout || idleOut < timeout)){
- timeout = idleOut;
- setSocketTimeout();
- }
-}
-
-void Connector::setSocketTimeout(){
- socket.setTimeout(timeout*TIME_MSEC);
-}
-
-void Connector::setTimeoutHandler(TimeoutHandler* handler){
- timeoutHandler = handler;
-}
-
-void Connector::run(){
- try{
- while(!closed){
- ssize_t available = inbuf.available();
- if(available < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- ssize_t received = socket.recv(inbuf.start(), available);
- checkIdle(received);
-
- if(!closed && received > 0){
- inbuf.move(received);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame(version);
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: " << frame << std::endl;
- input->received(&frame);
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
- } catch (const std::exception& e) {
- std::cout << e.what() << std::endl;
- handleClosed();
- }
-}
-
-}} // namespace qpid::client
diff --git a/qpid/cpp/src/client/Connector.h b/qpid/cpp/src/client/Connector.h
deleted file mode 100644
index 928bfa2d14..0000000000
--- a/qpid/cpp/src/client/Connector.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Connector_
-#define _Connector_
-
-
-#include "../framing/InputHandler.h"
-#include "../framing/OutputHandler.h"
-#include "../framing/InitiationHandler.h"
-#include "../framing/ProtocolInitiation.h"
-#include "../framing/ProtocolVersion.h"
-#include "../sys/ShutdownHandler.h"
-#include "../sys/TimeoutHandler.h"
-#include "../sys/Thread.h"
-#include "../sys/Monitor.h"
-#include "../sys/Socket.h"
-
-namespace qpid {
-
-namespace client {
-
-class Connector : public framing::OutputHandler,
- private sys::Runnable
-{
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
- framing::ProtocolVersion version;
-
- bool closed;
-
- int64_t lastIn;
- int64_t lastOut;
- int64_t timeout;
- uint32_t idleIn;
- uint32_t idleOut;
-
- sys::TimeoutHandler* timeoutHandler;
- sys::ShutdownHandler* shutdownHandler;
- framing::InputHandler* input;
- framing::InitiationHandler* initialiser;
- framing::OutputHandler* output;
-
- framing::Buffer inbuf;
- framing::Buffer outbuf;
-
- sys::Mutex writeLock;
- sys::Thread receiver;
-
- sys::Socket socket;
-
- void checkIdle(ssize_t status);
- void writeBlock(framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
- void handleClosed();
-
- friend class Channel;
- public:
- Connector(framing::ProtocolVersion pVersion,
- bool debug = false, uint32_t buffer_size = 1024);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port);
- virtual void init();
- virtual void close();
- virtual void setInputHandler(framing::InputHandler* handler);
- virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
- virtual void setShutdownHandler(sys::ShutdownHandler* handler);
- virtual framing::OutputHandler* getOutputHandler();
- virtual void send(framing::AMQFrame* frame);
- virtual void setReadTimeout(uint16_t timeout);
- virtual void setWriteTimeout(uint16_t timeout);
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/src/client/IncomingMessage.cpp b/qpid/cpp/src/client/IncomingMessage.cpp
deleted file mode 100644
index eb5f2b6fae..0000000000
--- a/qpid/cpp/src/client/IncomingMessage.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "IncomingMessage.h"
-#include "../Exception.h"
-#include "ClientMessage.h"
-#include <boost/format.hpp>
-
-namespace qpid {
-namespace client {
-
-using boost::format;
-using sys::Mutex;
-
-IncomingMessage::Destination::~Destination() {}
-
-
-IncomingMessage::WaitableDestination::WaitableDestination()
- : shutdownFlag(false) {}
-
-void IncomingMessage::WaitableDestination::message(const Message& msg) {
- Mutex::ScopedLock l(monitor);
- queue.push(msg);
- monitor.notify();
-}
-
-void IncomingMessage::WaitableDestination::empty() {
- Mutex::ScopedLock l(monitor);
- queue.push(Empty());
- monitor.notify();
-}
-
-bool IncomingMessage::WaitableDestination::wait(Message& msgOut) {
- Mutex::ScopedLock l(monitor);
- while (queue.empty() && !shutdownFlag)
- monitor.wait();
- if (shutdownFlag)
- return false;
- Message* msg = boost::get<Message>(&queue.front());
- bool success = msg;
- if (success)
- msgOut=*msg;
- queue.pop();
- if (!queue.empty())
- monitor.notify(); // Wake another waiter.
- return success;
-}
-
-void IncomingMessage::WaitableDestination::shutdown() {
- Mutex::ScopedLock l(monitor);
- shutdownFlag = true;
- monitor.notifyAll();
-}
-
-void IncomingMessage::openReference(const std::string& name) {
- Mutex::ScopedLock l(lock);
- if (references.find(name) != references.end())
- throw ConnectionException(
- 503, format("Attempt to open existing reference %s.") % name);
- references[name];
- return;
-}
-
-void IncomingMessage::appendReference(
- const std::string& name, const std::string& data)
-{
- Mutex::ScopedLock l(lock);
- getRefUnlocked(name).data += data;
-}
-
-Message& IncomingMessage::createMessage(
- const std::string& destination, const std::string& reference)
-{
- Mutex::ScopedLock l(lock);
- getDestUnlocked(destination); // Verify destination.
- Reference& ref = getRefUnlocked(reference);
- ref.messages.resize(ref.messages.size() +1);
- ref.messages.back().setDestination(destination);
- return ref.messages.back();
-}
-
-void IncomingMessage::closeReference(const std::string& name) {
- Reference refCopy;
- {
- Mutex::ScopedLock l(lock);
- refCopy = getRefUnlocked(name);
- references.erase(name);
- }
- for (std::vector<Message>::iterator i = refCopy.messages.begin();
- i != refCopy.messages.end();
- ++i)
- {
- i->setData(refCopy.data);
- // TODO aconway 2007-03-23: Thread safety,
- // can a destination be removed while we're doing this?
- getDestination(i->getDestination()).message(*i);
- }
-}
-
-
-void IncomingMessage::addDestination(std::string name, Destination& dest) {
- Mutex::ScopedLock l(lock);
- DestinationMap::iterator i = destinations.find(name);
- if (i == destinations.end())
- destinations[name]=&dest;
- else if (i->second != &dest)
- throw ConnectionException(
- 503, format("Destination already exists: %s.") % name);
-}
-
-void IncomingMessage::removeDestination(std::string name) {
- Mutex::ScopedLock l(lock);
- DestinationMap::iterator i = destinations.find(name);
- if (i == destinations.end())
- throw ConnectionException(
- 503, format("No such destination: %s.") % name);
- destinations.erase(i);
-}
-
-IncomingMessage::Destination& IncomingMessage::getDestination(
- const std::string& name) {
- return getDestUnlocked(name);
-}
-
-IncomingMessage::Reference& IncomingMessage::getReference(
- const std::string& name) {
- return getRefUnlocked(name);
-}
-
-IncomingMessage::Reference& IncomingMessage::getRefUnlocked(
- const std::string& name) {
- Mutex::ScopedLock l(lock);
- ReferenceMap::iterator i = references.find(name);
- if (i == references.end())
- throw ConnectionException(
- 503, format("No such reference: %s.") % name);
- return i->second;
-}
-
-IncomingMessage::Destination& IncomingMessage::getDestUnlocked(
- const std::string& name) {
- Mutex::ScopedLock l(lock);
- DestinationMap::iterator i = destinations.find(name);
- if (i == destinations.end())
- throw ConnectionException(
- 503, format("No such destination: %s.") % name);
- return *i->second;
-}
-
-}} // namespace qpid::client
diff --git a/qpid/cpp/src/client/IncomingMessage.h b/qpid/cpp/src/client/IncomingMessage.h
deleted file mode 100644
index bc650dfbe1..0000000000
--- a/qpid/cpp/src/client/IncomingMessage.h
+++ /dev/null
@@ -1,136 +0,0 @@
-#ifndef _IncomingMessage_
-#define _IncomingMessage_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "../sys/Monitor.h"
-#include <map>
-#include <queue>
-#include <vector>
-#include <boost/variant.hpp>
-
-namespace qpid {
-namespace client {
-
-class Message;
-
-/**
- * Manage incoming messages.
- *
- * Uses reference and destination concepts from 0-9 Messsage class.
- *
- * Basic messages use special destination and reference names to indicate
- * get-ok, return etc. messages.
- *
- */
-class IncomingMessage {
- public:
- /** Accumulate data associated with a set of messages. */
- struct Reference {
- std::string data;
- std::vector<Message> messages;
- };
-
- /** Interface to a destination for messages. */
- class Destination {
- public:
- virtual ~Destination();
-
- /** Pass a message to the destination */
- virtual void message(const Message&) = 0;
-
- /** Notify destination of queue-empty contition */
- virtual void empty() = 0;
- };
-
-
- /** A destination that a thread can wait on till a message arrives. */
- class WaitableDestination : public Destination
- {
- public:
- WaitableDestination();
- void message(const Message& msg);
- void empty();
- /** Wait till message() or empty() is called. True for message() */
- bool wait(Message& msgOut);
- void shutdown();
-
- private:
- struct Empty {};
- typedef boost::variant<Message,Empty> Item;
- sys::Monitor monitor;
- std::queue<Item> queue;
- bool shutdownFlag;
- };
-
-
-
- /** Add a reference. Throws if already open. */
- void openReference(const std::string& name);
-
- /** Get a reference. Throws if not already open. */
- void appendReference(const std::string& name,
- const std::string& data);
-
- /** Create a message to destination associated with reference
- *@exception if destination or reference non-existent.
- */
- Message& createMessage(const std::string& destination,
- const std::string& reference);
-
- /** Get a reference.
- *@exception if non-existent.
- */
- Reference& getReference(const std::string& name);
-
- /** Close a reference and deliver all its messages.
- * Throws if not open or a message has an invalid destination.
- */
- void closeReference(const std::string& name);
-
- /** Add a destination.
- *@exception if a different Destination is already registered
- * under name.
- */
- void addDestination(std::string name, Destination&);
-
- /** Remove a destination. Throws if does not exist */
- void removeDestination(std::string name);
-
- /** Get a destination. Throws if does not exist */
- Destination& getDestination(const std::string& name);
- private:
-
- typedef std::map<std::string, Reference> ReferenceMap;
- typedef std::map<std::string, Destination*> DestinationMap;
-
- Reference& getRefUnlocked(const std::string& name);
- Destination& getDestUnlocked(const std::string& name);
-
- mutable sys::Mutex lock;
- ReferenceMap references;
- DestinationMap destinations;
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/src/client/MessageChannel.h b/qpid/cpp/src/client/MessageChannel.h
deleted file mode 100644
index 2fa387b7f7..0000000000
--- a/qpid/cpp/src/client/MessageChannel.h
+++ /dev/null
@@ -1,94 +0,0 @@
-#ifndef _client_MessageChannel_h
-#define _client_MessageChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "../shared_ptr.h"
-#include "../sys/Runnable.h"
-#include "AckMode.h"
-
-namespace qpid {
-
-namespace framing {
-class AMQMethodBody;
-class AMQHeaderBody;
-class AMQContentBody;
-class FieldTable;
-}
-
-namespace client {
-
-class Channel;
-class Message;
-class Queue;
-class Exchange;
-class MessageListener;
-class ReturnedMessageHandler;
-
-/**
- * Abstract interface for messaging implementation for a channel.
- *
- *@see Channel for documentation.
- */
-class MessageChannel : public sys::Runnable
-{
- public:
- /**@see Channel::consume */
- virtual void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0) = 0;
-
- /**@see Channel::cancel */
- virtual void cancel(const std::string& tag, bool synch = true) = 0;
-
- /**@see Channel::get */
- virtual bool get(
- Message& msg, const Queue& queue, AckMode ackMode = NO_ACK) = 0;
-
- /**@see Channel::get */
- virtual void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false) = 0;
-
- /**@see Channel::setReturnedMessageHandler */
- virtual void setReturnedMessageHandler(
- ReturnedMessageHandler* handler) = 0;
-
- /** Handle an incoming method. */
- virtual void handle(shared_ptr<framing::AMQMethodBody>) = 0;
-
- /** Handle an incoming header */
- virtual void handle(shared_ptr<framing::AMQHeaderBody>) = 0;
-
- /** Handle an incoming content */
- virtual void handle(shared_ptr<framing::AMQContentBody>) = 0;
-
- /** Send channel's QOS settings */
- virtual void setQos() = 0;
-
- /** Channel is closing */
- virtual void close() = 0;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_MessageChannel_h*/
diff --git a/qpid/cpp/src/client/MessageListener.cpp b/qpid/cpp/src/client/MessageListener.cpp
deleted file mode 100644
index 68ebedeb0d..0000000000
--- a/qpid/cpp/src/client/MessageListener.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "MessageListener.h"
-
-qpid::client::MessageListener::~MessageListener() {}
diff --git a/qpid/cpp/src/client/MessageListener.h b/qpid/cpp/src/client/MessageListener.h
deleted file mode 100644
index 501862a3ef..0000000000
--- a/qpid/cpp/src/client/MessageListener.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-#ifndef _MessageListener_
-#define _MessageListener_
-
-#include "ClientMessage.h"
-
-namespace qpid {
-namespace client {
-
- /**
- * An interface through which asynchronously delivered messages
- * can be received by an application.
- *
- * @see Channel::consume()
- *
- * \ingroup clientapi
- */
- class MessageListener{
- public:
- virtual ~MessageListener();
- virtual void received(Message& msg) = 0;
- };
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/src/client/MessageMessageChannel.cpp b/qpid/cpp/src/client/MessageMessageChannel.cpp
deleted file mode 100644
index 8d0fdc3189..0000000000
--- a/qpid/cpp/src/client/MessageMessageChannel.cpp
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <boost/format.hpp>
-#include "MessageMessageChannel.h"
-#include "../framing/AMQMethodBody.h"
-#include "ClientChannel.h"
-#include "ReturnedMessageHandler.h"
-#include "MessageListener.h"
-#include "../framing/FieldTable.h"
-#include "Connection.h"
-#include "../shared_ptr.h"
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace std;
-using namespace sys;
-using namespace framing;
-
-MessageMessageChannel::MessageMessageChannel(Channel& ch)
- : channel(ch), tagCount(0) {}
-
-string MessageMessageChannel::newTag() {
- Mutex::ScopedLock l(lock);
- return (boost::format("__tag%d")%++tagCount).str();
-}
-
-void MessageMessageChannel::consume(
- Queue& queue, std::string& tag, MessageListener* /*listener*/,
- AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields)
-{
- if (tag.empty())
- tag = newTag();
- channel.sendAndReceive<MessageOkBody>(
- make_shared_ptr(new MessageConsumeBody(
- channel.getVersion(), 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
-
-// // FIXME aconway 2007-02-20: Race condition!
-// // We could receive the first message for the consumer
-// // before we create the consumer below.
-// // Move consumer creation to handler for MessageConsumeOkBody
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if (i != consumers.end())
-// THROW_QPID_ERROR(CLIENT_ERROR,
-// "Consumer already exists with tag="+tag);
-// Consumer& c = consumers[tag];
-// c.listener = listener;
-// c.ackMode = ackMode;
-// c.lastDeliveryTag = 0;
-// }
-}
-
-
-void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) {
- // FIXME aconway 2007-02-23:
-// Consumer c;
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if (i == consumers.end())
-// return;
-// c = i->second;
-// consumers.erase(i);
-// }
-// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
-// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
-// channel.sendAndReceiveSync<MessageCancelOkBody>(
-// synch, new MessageCancelBody(channel.version, tag, !synch));
-}
-
-void MessageMessageChannel::close(){
- // FIXME aconway 2007-02-23:
-// ConsumerMap consumersCopy;
-// {
-// Mutex::ScopedLock l(lock);
-// consumersCopy = consumers;
-// consumers.clear();
-// }
-// for (ConsumerMap::iterator i=consumersCopy.begin();
-// i != consumersCopy.end(); ++i)
-// {
-// Consumer& c = i->second;
-// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
-// && c.lastDeliveryTag > 0)
-// {
-// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
-// }
-// }
-// incoming.shutdown();
-}
-
-
-/** Destination ID for the current get.
- * Must not clash with a generated consumer ID.
- * TODO aconway 2007-03-06: support multiple outstanding gets?
- */
-const string getDestinationId("__get__");
-
-/**
- * A destination that provides a Correlator::Action to handle
- * MessageEmpty responses.
- */
-struct MessageGetDestination : public IncomingMessage::WaitableDestination
-{
- void response(shared_ptr<AMQResponseBody> response) {
- if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
- switch (response->amqpMethodId()) {
- case MessageOkBody::METHOD_ID:
- // Nothing to do, wait for transfer.
- return;
- case MessageEmptyBody::METHOD_ID:
- empty(); // Wake up waiter with empty queue.
- return;
- }
- }
- throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
- }
-
- Correlator::Action action() {
- return boost::bind(&MessageGetDestination::response, this, _1);
- }
-};
-
-bool MessageMessageChannel::get(
- Message& msg, const Queue& queue, AckMode ackMode)
-{
- Mutex::ScopedLock l(lock);
- std::string destName=newTag();
- MessageGetDestination dest;
- incoming.addDestination(destName, dest);
- channel.send(
- make_shared_ptr(
- new MessageGetBody(
- channel.version, 0, queue.getName(), destName, ackMode)),
- dest.action());
- return dest.wait(msg);
-}
-
-
-/** Convert a message to a transfer command. */
-MessageTransferBody::shared_ptr makeTransfer(
- ProtocolVersion version,
- const Message& msg, const string& destination,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- return MessageTransferBody::shared_ptr(
- new MessageTransferBody(
- version,
- 0, // FIXME aconway 2007-04-03: ticket.
- destination,
- msg.isRedelivered(),
- immediate,
- 0, // FIXME aconway 2007-02-23: ttl
- msg.getPriority(),
- msg.getTimestamp(),
- static_cast<uint8_t>(msg.getDeliveryMode()),
- 0, // FIXME aconway 2007-04-03: Expiration
- string(), // Exchange: for broker use only.
- routingKey,
- msg.getMessageId(),
- msg.getCorrelationId(),
- msg.getReplyTo(),
- msg.getContentType(),
- msg.getContentEncoding(),
- msg.getUserId(),
- msg.getAppId(),
- string(), // FIXME aconway 2007-04-03: TransactionId
- string(), //FIXME aconway 2007-04-03: SecurityToken
- msg.getHeaders(),
- Content(INLINE, msg.getData()),
- mandatory
- ));
-}
-
-// FIXME aconway 2007-04-05: Generated code should provide this.
-/**
- * Calculate the size of a frame containing the given body type
- * if all variable-lengths parts are empty.
- */
-template <class T> size_t overhead() {
- static AMQFrame frame(
- ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion())));
- return frame.size();
-}
-
-void MessageMessageChannel::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- MessageTransferBody::shared_ptr transfer = makeTransfer(
- channel.getVersion(),
- msg, exchange.getName(), routingKey, mandatory, immediate);
- // Frame itself uses 8 bytes.
- u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
- if (transfer->size() <= frameMax) {
- channel.sendAndReceive<MessageOkBody>(transfer);
- }
- else {
- std::string ref = newTag();
- std::string data = transfer->getBody().getValue();
- size_t chunk =
- channel.connection->getMaxFrameSize() -
- (overhead<MessageAppendBody>() + ref.size());
- // TODO aconway 2007-04-05: cast around lack of generated setters
- const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref);
- channel.send(
- make_shared_ptr(new MessageOpenBody(channel.version, ref)));
- channel.send(transfer);
- const char* p = data.data();
- const char* end = data.data()+data.size();
- while (p+chunk <= end) {
- channel.send(
- make_shared_ptr(
- new MessageAppendBody(channel.version, ref, std::string(p, chunk))));
- p += chunk;
- }
- if (p < end) {
- channel.send(
- make_shared_ptr(
- new MessageAppendBody(channel.version, ref, std::string(p, end-p))));
- }
- channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref)));
- }
-}
-
-void copy(Message& msg, MessageTransferBody& transfer) {
- // FIXME aconway 2007-04-05: Verify all required fields
- // are copied.
- msg.setContentType(transfer.getContentType());
- msg.setContentEncoding(transfer.getContentEncoding());
- msg.setHeaders(transfer.getApplicationHeaders());
- msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
- msg.setPriority(transfer.getPriority());
- msg.setCorrelationId(transfer.getCorrelationId());
- msg.setReplyTo(transfer.getReplyTo());
- // FIXME aconway 2007-04-05: TTL/Expiration
- msg.setMessageId(transfer.getMessageId());
- msg.setTimestamp(transfer.getTimestamp());
- msg.setUserId(transfer.getUserId());
- msg.setAppId(transfer.getAppId());
- msg.setDestination(transfer.getDestination());
- msg.setRedelivered(transfer.getRedelivered());
- msg.setDeliveryTag(0); // No meaning in 0-9
- if (transfer.getBody().isInline())
- msg.setData(transfer.getBody().getValue());
-}
-
-void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
- assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
- switch(method->amqpMethodId()) {
- case MessageAppendBody::METHOD_ID: {
- MessageAppendBody::shared_ptr append =
- shared_polymorphic_downcast<MessageAppendBody>(method);
- incoming.appendReference(append->getReference(), append->getBytes());
- break;
- }
- case MessageOpenBody::METHOD_ID: {
- MessageOpenBody::shared_ptr open =
- shared_polymorphic_downcast<MessageOpenBody>(method);
- incoming.openReference(open->getReference());
- break;
- }
-
- case MessageCloseBody::METHOD_ID: {
- MessageCloseBody::shared_ptr close =
- shared_polymorphic_downcast<MessageCloseBody>(method);
- incoming.closeReference(close->getReference());
- break;
- }
-
- case MessageTransferBody::METHOD_ID: {
- MessageTransferBody::shared_ptr transfer=
- shared_polymorphic_downcast<MessageTransferBody>(method);
- if (transfer->getBody().isInline()) {
- Message msg;
- copy(msg, *transfer);
- // Deliver it.
- incoming.getDestination(transfer->getDestination()).message(msg);
- }
- else {
- Message& msg=incoming.createMessage(
- transfer->getDestination(), transfer->getBody().getValue());
- copy(msg, *transfer);
- // Will be delivered when reference closes.
- }
- break;
- }
-
- case MessageEmptyBody::METHOD_ID:
- case MessageOkBody::METHOD_ID:
- // Nothing to do
- break;
-
- // FIXME aconway 2007-04-03: TODO
- case MessageCancelBody::METHOD_ID:
- case MessageCheckpointBody::METHOD_ID:
- case MessageOffsetBody::METHOD_ID:
- case MessageQosBody::METHOD_ID:
- case MessageRecoverBody::METHOD_ID:
- case MessageRejectBody::METHOD_ID:
- case MessageResumeBody::METHOD_ID:
- break;
- default:
- throw Channel::UnknownMethod();
- }
-}
-
-void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){
- throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
-}
-
-void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){
- throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
-}
-
-// FIXME aconway 2007-02-23:
-// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){
-// //record delivery tag:
-// consumer.lastDeliveryTag = msg.getDeliveryTag();
-
-// //allow registered listener to handle the message
-// consumer.listener->received(msg);
-
-// if(channel.isOpen()){
-// bool multiple(false);
-// switch(consumer.ackMode){
-// case LAZY_ACK:
-// multiple = true;
-// if(++(consumer.count) < channel.getPrefetch())
-// break;
-// //else drop-through
-// case AUTO_ACK:
-// consumer.lastDeliveryTag = 0;
-// channel.send(
-// new MessageAckBody(
-// channel.version, msg.getDeliveryTag(), multiple));
-// case NO_ACK: // Nothing to do
-// case CLIENT_ACK: // User code must ack.
-// break;
-// // TODO aconway 2007-02-22: Provide a way for user
-// // to ack!
-// }
-// }
-
-// //as it stands, transactionality is entirely orthogonal to ack
-// //mode, though the acks will not be processed by the broker under
-// //a transaction until it commits.
-// }
-
-
-void MessageMessageChannel::run() {
- // FIXME aconway 2007-02-23:
-// while(channel.isOpen()) {
-// try {
-// Message msg = incoming.waitDispatch();
-// if(msg.getMethod()->isA<MessageReturnBody>()) {
-// ReturnedMessageHandler* handler=0;
-// {
-// Mutex::ScopedLock l(lock);
-// handler=returnsHandler;
-// }
-// if(handler == 0) {
-// // TODO aconway 2007-02-20: proper logging.
-// cout << "Message returned: " << msg.getData() << endl;
-// }
-// else
-// handler->returned(msg);
-// }
-// else {
-// MessageDeliverBody::shared_ptr deliverBody =
-// boost::shared_polymorphic_downcast<MessageDeliverBody>(
-// msg.getMethod());
-// std::string tag = deliverBody->getConsumerTag();
-// Consumer consumer;
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if(i == consumers.end())
-// THROW_QPID_ERROR(PROTOCOL_ERROR+504,
-// "Unknown consumer tag=" + tag);
-// consumer = i->second;
-// }
-// deliver(consumer, msg);
-// }
-// }
-// catch (const ShutdownException&) {
-// /* Orderly shutdown */
-// }
-// catch (const Exception& e) {
-// // FIXME aconway 2007-02-20: Report exception to user.
-// cout << "client::Message::run() terminated by: " << e.toString()
-// << "(" << typeid(e).name() << ")" << endl;
-// }
-// }
-}
-
-void MessageMessageChannel::setReturnedMessageHandler(
- ReturnedMessageHandler* )
-{
- throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns");
-}
-
-void MessageMessageChannel::setQos(){
- channel.sendAndReceive<MessageOkBody>(
- make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
- if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(
- make_shared_ptr(new TxSelectBody(channel.version)));
-}
-
-}} // namespace qpid::client
diff --git a/qpid/cpp/src/client/MessageMessageChannel.h b/qpid/cpp/src/client/MessageMessageChannel.h
deleted file mode 100644
index 4c4721ce90..0000000000
--- a/qpid/cpp/src/client/MessageMessageChannel.h
+++ /dev/null
@@ -1,82 +0,0 @@
-#ifndef _client_MessageMessageChannel_h
-#define _client_MessageMessageChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "MessageChannel.h"
-#include "IncomingMessage.h"
-#include "../sys/Monitor.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
-namespace qpid {
-namespace client {
-/**
- * Messaging implementation using AMQP 0-9 MessageMessageChannel class
- * to send and receiving messages.
- */
-class MessageMessageChannel : public MessageChannel
-{
- public:
- MessageMessageChannel(Channel& parent);
-
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- void cancel(const std::string& tag, bool synch = true);
-
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- void run();
-
- void handle(boost::shared_ptr<framing::AMQMethodBody>);
-
- void handle(shared_ptr<framing::AMQHeaderBody>);
-
- void handle(shared_ptr<framing::AMQContentBody>);
-
- void setQos();
-
- void close();
-
- private:
- typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
- Destinations;
-
- std::string newTag();
-
- sys::Mutex lock;
- Channel& channel;
- IncomingMessage incoming;
- long tagCount;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_MessageMessageChannel_h*/
-
diff --git a/qpid/cpp/src/client/MethodBodyInstances.h b/qpid/cpp/src/client/MethodBodyInstances.h
deleted file mode 100644
index 57b9bf73ce..0000000000
--- a/qpid/cpp/src/client/MethodBodyInstances.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "../framing/amqp_framing.h"
-
-#ifndef _MethodBodyInstances_h_
-#define _MethodBodyInstances_h_
-
-namespace qpid {
-namespace client {
-
-/**
- * A list of method body instances that can be used to compare against
- * incoming bodies.
- */
-class MethodBodyInstances
-{
-private:
- qpid::framing::ProtocolVersion version;
-public:
- const qpid::framing::BasicCancelOkBody basic_cancel_ok;
- const qpid::framing::BasicConsumeOkBody basic_consume_ok;
- const qpid::framing::BasicDeliverBody basic_deliver;
- const qpid::framing::BasicGetEmptyBody basic_get_empty;
- const qpid::framing::BasicGetOkBody basic_get_ok;
- const qpid::framing::BasicQosOkBody basic_qos_ok;
- const qpid::framing::BasicReturnBody basic_return;
- const qpid::framing::ChannelCloseBody channel_close;
- const qpid::framing::ChannelCloseOkBody channel_close_ok;
- const qpid::framing::ChannelFlowBody channel_flow;
- const qpid::framing::ChannelOpenOkBody channel_open_ok;
- const qpid::framing::ConnectionCloseBody connection_close;
- const qpid::framing::ConnectionCloseOkBody connection_close_ok;
- const qpid::framing::ConnectionOpenOkBody connection_open_ok;
- const qpid::framing::ConnectionRedirectBody connection_redirect;
- const qpid::framing::ConnectionStartBody connection_start;
- const qpid::framing::ConnectionTuneBody connection_tune;
- const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
- const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
- const qpid::framing::QueueDeclareOkBody queue_declare_ok;
- const qpid::framing::QueueDeleteOkBody queue_delete_ok;
- const qpid::framing::QueueBindOkBody queue_bind_ok;
- const qpid::framing::TxCommitOkBody tx_commit_ok;
- const qpid::framing::TxRollbackOkBody tx_rollback_ok;
- const qpid::framing::TxSelectOkBody tx_select_ok;
-
- MethodBodyInstances(uint8_t major, uint8_t minor) :
- version(major, minor),
- basic_cancel_ok(version),
- basic_consume_ok(version),
- basic_deliver(version),
- basic_get_empty(version),
- basic_get_ok(version),
- basic_qos_ok(version),
- basic_return(version),
- channel_close(version),
- channel_close_ok(version),
- channel_flow(version),
- channel_open_ok(version),
- connection_close(version),
- connection_close_ok(version),
- connection_open_ok(version),
- connection_redirect(version),
- connection_start(version),
- connection_tune(version),
- exchange_declare_ok(version),
- exchange_delete_ok(version),
- queue_declare_ok(version),
- queue_delete_ok(version),
- queue_bind_ok(version),
- tx_commit_ok(version),
- tx_rollback_ok(version),
- tx_select_ok(version)
- {}
-
-};
-
-static MethodBodyInstances method_bodies(8, 0);
-
-} // namespace client
-} // namespace qpid
-
-#endif
diff --git a/qpid/cpp/src/client/ResponseHandler.cpp b/qpid/cpp/src/client/ResponseHandler.cpp
deleted file mode 100644
index ca0129d587..0000000000
--- a/qpid/cpp/src/client/ResponseHandler.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "../QpidError.h"
-#include <boost/format.hpp>
-#include "ResponseHandler.h"
-#include "../framing/AMQMethodBody.h"
-
-using namespace qpid::sys;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace client {
-
-ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {}
-
-ResponseHandler::~ResponseHandler(){}
-
-bool ResponseHandler::isWaiting() {
- Monitor::ScopedLock l(monitor);
- return waiting;
-}
-
-void ResponseHandler::expect(){
- Monitor::ScopedLock l(monitor);
- waiting = true;
-}
-
-void ResponseHandler::signalResponse(MethodPtr _response)
-{
- Monitor::ScopedLock l(monitor);
- response = _response;
- if (!response)
- shutdownFlag=true;
- waiting = false;
- monitor.notify();
-}
-
-ResponseHandler::MethodPtr ResponseHandler::receive() {
- Monitor::ScopedLock l(monitor);
- while (!response && !shutdownFlag)
- monitor.wait();
- if (shutdownFlag)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR, "Channel closed unexpectedly.");
- MethodPtr result = response;
- response.reset();
- return result;
-}
-
-ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) {
- MethodPtr response = receive();
- if(c != response->amqpClassId() || m != response->amqpMethodId()) {
- THROW_QPID_ERROR(
- PROTOCOL_ERROR,
- boost::format("Expected class:method %d:%d, got %d:%d")
- % c % m % response->amqpClassId() % response->amqpMethodId());
- }
- return response;
-}
-
-}} // namespace qpid::client
diff --git a/qpid/cpp/src/client/ResponseHandler.h b/qpid/cpp/src/client/ResponseHandler.h
deleted file mode 100644
index 289a5dd994..0000000000
--- a/qpid/cpp/src/client/ResponseHandler.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "../shared_ptr.h"
-#include "../sys/Monitor.h"
-
-#ifndef _ResponseHandler_
-#define _ResponseHandler_
-
-namespace qpid {
-
-namespace framing {
-class AMQMethodBody;
-}
-
-namespace client {
-
-/**
- * Holds a response from the broker peer for the client.
- */
-class ResponseHandler{
- typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
- bool waiting;
- bool shutdownFlag;
- MethodPtr response;
- sys::Monitor monitor;
-
- public:
- ResponseHandler();
- ~ResponseHandler();
-
- /** Is a response expected? */
- bool isWaiting();
-
- /** Provide a response to the waiting thread */
- void signalResponse(MethodPtr response);
-
- /** Indicate a message is expected. */
- void expect();
-
- /** Wait for a response. */
- MethodPtr receive();
-
- /** Wait for a specific response. */
- MethodPtr receive(framing::ClassId, framing::MethodId);
-
- /** Template version of receive returns typed pointer. */
- template <class BodyType>
- shared_ptr<BodyType> receive() {
- return shared_polymorphic_downcast<BodyType>(
- receive(BodyType::CLASS_ID, BodyType::METHOD_ID));
- }
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/src/client/ReturnedMessageHandler.cpp b/qpid/cpp/src/client/ReturnedMessageHandler.cpp
deleted file mode 100644
index 35d0b5c0a8..0000000000
--- a/qpid/cpp/src/client/ReturnedMessageHandler.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ReturnedMessageHandler.h"
-
-qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {}
diff --git a/qpid/cpp/src/client/ReturnedMessageHandler.h b/qpid/cpp/src/client/ReturnedMessageHandler.h
deleted file mode 100644
index 8b42fc0764..0000000000
--- a/qpid/cpp/src/client/ReturnedMessageHandler.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-
-#ifndef _ReturnedMessageHandler_
-#define _ReturnedMessageHandler_
-
-#include "ClientMessage.h"
-
-namespace qpid {
-namespace client {
-
- /**
- * An interface through which returned messages can be received by
- * an application.
- *
- * @see Channel::setReturnedMessageHandler()
- *
- * \ingroup clientapi
- */
- class ReturnedMessageHandler{
- public:
- virtual ~ReturnedMessageHandler();
- virtual void returned(Message& msg) = 0;
- };
-
-}
-}
-
-
-#endif