summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-24 14:27:31 +0000
committerGordon Sim <gsim@apache.org>2007-07-24 14:27:31 +0000
commita6303894d7f9a24df4a691af3ce94647c033ebff (patch)
tree943b75df7528c9fbff6b3170c3c4b66758bf22ad /cpp/src/qpid/client
parent9f120205e0d7a0b2666b9fd21a5296db07e32fd8 (diff)
downloadqpid-python-a6303894d7f9a24df4a691af3ce94647c033ebff.tar.gz
Initial support for latest approved 0-10 xml (with some transitional hacks included).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559059 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp10
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp33
-rw-r--r--cpp/src/qpid/client/MessageMessageChannel.cpp431
-rw-r--r--cpp/src/qpid/client/MessageMessageChannel.h84
-rw-r--r--cpp/src/qpid/client/MethodBodyInstances.h34
5 files changed, 36 insertions, 556 deletions
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
index a1aacdee4e..70cb473426 100644
--- a/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ b/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -197,11 +197,6 @@ void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
msg.setRedelivered(deliver->getRedelivered());
return;
}
- case BasicReturnBody::METHOD_ID: {
- incoming.openReference(BASIC_REF);
- incoming.createMessage(BASIC_RETURN, BASIC_REF);
- return;
- }
case BasicConsumeOkBody::METHOD_ID: {
Mutex::ScopedLock l(lock);
BasicConsumeOkBody::shared_ptr consumeOk =
@@ -332,10 +327,9 @@ void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* hand
}
void BasicMessageChannel::setQos(){
- channel.sendAndReceive<BasicQosOkBody>(
- make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
+ channel.send(make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
+ channel.send(make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 816ff05e85..0033cbdbe4 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -27,7 +27,6 @@
#include "MethodBodyInstances.h"
#include "Connection.h"
#include "BasicMessageChannel.h"
-#include "MessageMessageChannel.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -39,12 +38,18 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+namespace qpid{
+namespace client{
+
+const std::string empty;
+
+}}
+
Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
switch (mode) {
case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
- case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break;
default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
}
}
@@ -138,17 +143,14 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- sendAndReceiveSync<ExchangeDeclareOkBody>(
- synch,
- make_shared_ptr(new ExchangeDeclareBody(
- version, 0, name, type, false, false, false, false, !synch, args)));
+ send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
+ if (synch) synchWithServer();
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- sendAndReceiveSync<ExchangeDeleteOkBody>(
- synch,
- make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
+ send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
+ if (synch) synchWithServer();
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -158,7 +160,7 @@ void Channel::declareQueue(Queue& queue, bool synch){
sendAndReceiveSync<QueueDeclareOkBody>(
synch,
make_shared_ptr(new QueueDeclareBody(
- version, 0, name, false/*passive*/, queue.isDurable(),
+ version, 0, name, empty, false/*passive*/, queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
if(synch) {
if(queue.getName().length() == 0)
@@ -177,17 +179,16 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- sendAndReceiveSync<QueueBindOkBody>(
- synch,
- make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
+ send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
+ if (synch) synchWithServer();
}
void Channel::commit(){
- sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
+ send(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
+ send(make_shared_ptr(new TxRollbackBody(version)));
}
void Channel::handleMethodInContext(
@@ -206,7 +207,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
}
try {
switch (method->amqpClassId()) {
- case MessageOkBody::CLASS_ID:
+ case MessageTransferBody::CLASS_ID:
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
diff --git a/cpp/src/qpid/client/MessageMessageChannel.cpp b/cpp/src/qpid/client/MessageMessageChannel.cpp
deleted file mode 100644
index 2a8f7a01c1..0000000000
--- a/cpp/src/qpid/client/MessageMessageChannel.cpp
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <boost/format.hpp>
-#include "MessageMessageChannel.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "ClientChannel.h"
-#include "ReturnedMessageHandler.h"
-#include "MessageListener.h"
-#include "qpid/framing/FieldTable.h"
-#include "Connection.h"
-#include "qpid/shared_ptr.h"
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace std;
-using namespace sys;
-using namespace framing;
-
-MessageMessageChannel::MessageMessageChannel(Channel& ch)
- : channel(ch), tagCount(0) {}
-
-string MessageMessageChannel::newTag() {
- Mutex::ScopedLock l(lock);
- return (boost::format("__tag%d")%++tagCount).str();
-}
-
-void MessageMessageChannel::consume(
- Queue& queue, std::string& tag, MessageListener* /*listener*/,
- AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields)
-{
- if (tag.empty())
- tag = newTag();
- channel.sendAndReceive<MessageOkBody>(
- make_shared_ptr(new MessageConsumeBody(
- channel.getVersion(), 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
-
-// // FIXME aconway 2007-02-20: Race condition!
-// // We could receive the first message for the consumer
-// // before we create the consumer below.
-// // Move consumer creation to handler for MessageConsumeOkBody
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if (i != consumers.end())
-// THROW_QPID_ERROR(CLIENT_ERROR,
-// "Consumer already exists with tag="+tag);
-// Consumer& c = consumers[tag];
-// c.listener = listener;
-// c.ackMode = ackMode;
-// c.lastDeliveryTag = 0;
-// }
-}
-
-
-void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) {
- // FIXME aconway 2007-02-23:
-// Consumer c;
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if (i == consumers.end())
-// return;
-// c = i->second;
-// consumers.erase(i);
-// }
-// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
-// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
-// channel.sendAndReceiveSync<MessageCancelOkBody>(
-// synch, new MessageCancelBody(channel.version, tag, !synch));
-}
-
-void MessageMessageChannel::close(){
- // FIXME aconway 2007-02-23:
-// ConsumerMap consumersCopy;
-// {
-// Mutex::ScopedLock l(lock);
-// consumersCopy = consumers;
-// consumers.clear();
-// }
-// for (ConsumerMap::iterator i=consumersCopy.begin();
-// i != consumersCopy.end(); ++i)
-// {
-// Consumer& c = i->second;
-// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
-// && c.lastDeliveryTag > 0)
-// {
-// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
-// }
-// }
-// incoming.shutdown();
-}
-
-void MessageMessageChannel::cancelAll(){
-}
-
-/** Destination ID for the current get.
- * Must not clash with a generated consumer ID.
- * TODO aconway 2007-03-06: support multiple outstanding gets?
- */
-const string getDestinationId("__get__");
-
-/**
- * A destination that provides a Correlator::Action to handle
- * MessageEmpty responses.
- */
-struct MessageGetDestination : public IncomingMessage::WaitableDestination
-{
- void response(shared_ptr<AMQResponseBody> response) {
- if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
- switch (response->amqpMethodId()) {
- case MessageOkBody::METHOD_ID:
- // Nothing to do, wait for transfer.
- return;
- case MessageEmptyBody::METHOD_ID:
- empty(); // Wake up waiter with empty queue.
- return;
- }
- }
- throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
- }
-
- Correlator::Action action() {
- return boost::bind(&MessageGetDestination::response, this, _1);
- }
-};
-
-bool MessageMessageChannel::get(
- Message& msg, const Queue& queue, AckMode ackMode)
-{
- Mutex::ScopedLock l(lock);
- std::string destName=newTag();
- MessageGetDestination dest;
- incoming.addDestination(destName, dest);
- channel.send(
- make_shared_ptr(
- new MessageGetBody(
- channel.version, 0, queue.getName(), destName, ackMode)),
- dest.action());
- return dest.wait(msg);
-}
-
-
-/** Convert a message to a transfer command. */
-MessageTransferBody::shared_ptr makeTransfer(
- ProtocolVersion version,
- const Message& msg, const string& destination,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- return MessageTransferBody::shared_ptr(
- new MessageTransferBody(
- version,
- 0, // FIXME aconway 2007-04-03: ticket.
- destination,
- msg.isRedelivered(),
- immediate,
- 0, // FIXME aconway 2007-02-23: ttl
- msg.getPriority(),
- msg.getTimestamp(),
- static_cast<uint8_t>(msg.getDeliveryMode()),
- 0, // FIXME aconway 2007-04-03: Expiration
- string(), // Exchange: for broker use only.
- routingKey,
- msg.getMessageId(),
- msg.getCorrelationId(),
- msg.getReplyTo(),
- msg.getContentType(),
- msg.getContentEncoding(),
- msg.getUserId(),
- msg.getAppId(),
- string(), // FIXME aconway 2007-04-03: TransactionId
- string(), //FIXME aconway 2007-04-03: SecurityToken
- msg.getHeaders(),
- Content(INLINE, msg.getData()),
- mandatory
- ));
-}
-
-// FIXME aconway 2007-04-05: Generated code should provide this.
-/**
- * Calculate the size of a frame containing the given body type
- * if all variable-lengths parts are empty.
- */
-template <class T> size_t overhead() {
- static AMQFrame frame(
- ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion())));
- return frame.size();
-}
-
-void MessageMessageChannel::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- MessageTransferBody::shared_ptr transfer = makeTransfer(
- channel.getVersion(),
- msg, exchange.getName(), routingKey, mandatory, immediate);
- // Frame itself uses 8 bytes.
- u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
- if (transfer->size() <= frameMax) {
- channel.sendAndReceive<MessageOkBody>(transfer);
- }
- else {
- std::string ref = newTag();
- std::string data = transfer->getBody().getValue();
- size_t chunk =
- channel.connection->getMaxFrameSize() -
- (overhead<MessageAppendBody>() + ref.size());
- // TODO aconway 2007-04-05: cast around lack of generated setters
- const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref);
- channel.send(
- make_shared_ptr(new MessageOpenBody(channel.version, ref)));
- channel.send(transfer);
- const char* p = data.data();
- const char* end = data.data()+data.size();
- while (p+chunk <= end) {
- channel.send(
- make_shared_ptr(
- new MessageAppendBody(channel.version, ref, std::string(p, chunk))));
- p += chunk;
- }
- if (p < end) {
- channel.send(
- make_shared_ptr(
- new MessageAppendBody(channel.version, ref, std::string(p, end-p))));
- }
- channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref)));
- }
-}
-
-void copy(Message& msg, MessageTransferBody& transfer) {
- // FIXME aconway 2007-04-05: Verify all required fields
- // are copied.
- msg.setContentType(transfer.getContentType());
- msg.setContentEncoding(transfer.getContentEncoding());
- msg.setHeaders(transfer.getApplicationHeaders());
- msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
- msg.setPriority(transfer.getPriority());
- msg.setCorrelationId(transfer.getCorrelationId());
- msg.setReplyTo(transfer.getReplyTo());
- // FIXME aconway 2007-04-05: TTL/Expiration
- msg.setMessageId(transfer.getMessageId());
- msg.setTimestamp(transfer.getTimestamp());
- msg.setUserId(transfer.getUserId());
- msg.setAppId(transfer.getAppId());
- msg.setDestination(transfer.getDestination());
- msg.setRedelivered(transfer.getRedelivered());
- msg.setDeliveryTag(0); // No meaning in 0-9
- if (transfer.getBody().isInline())
- msg.setData(transfer.getBody().getValue());
-}
-
-void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
- assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
- switch(method->amqpMethodId()) {
- case MessageAppendBody::METHOD_ID: {
- MessageAppendBody::shared_ptr append =
- shared_polymorphic_downcast<MessageAppendBody>(method);
- incoming.appendReference(append->getReference(), append->getBytes());
- break;
- }
- case MessageOpenBody::METHOD_ID: {
- MessageOpenBody::shared_ptr open =
- shared_polymorphic_downcast<MessageOpenBody>(method);
- incoming.openReference(open->getReference());
- break;
- }
-
- case MessageCloseBody::METHOD_ID: {
- MessageCloseBody::shared_ptr close =
- shared_polymorphic_downcast<MessageCloseBody>(method);
- incoming.closeReference(close->getReference());
- break;
- }
-
- case MessageTransferBody::METHOD_ID: {
- MessageTransferBody::shared_ptr transfer=
- shared_polymorphic_downcast<MessageTransferBody>(method);
- if (transfer->getBody().isInline()) {
- Message msg;
- copy(msg, *transfer);
- // Deliver it.
- incoming.getDestination(transfer->getDestination()).message(msg);
- }
- else {
- Message& msg=incoming.createMessage(
- transfer->getDestination(), transfer->getBody().getValue());
- copy(msg, *transfer);
- // Will be delivered when reference closes.
- }
- break;
- }
-
- case MessageEmptyBody::METHOD_ID:
- case MessageOkBody::METHOD_ID:
- // Nothing to do
- break;
-
- // FIXME aconway 2007-04-03: TODO
- case MessageCancelBody::METHOD_ID:
- case MessageCheckpointBody::METHOD_ID:
- case MessageOffsetBody::METHOD_ID:
- case MessageQosBody::METHOD_ID:
- case MessageRecoverBody::METHOD_ID:
- case MessageRejectBody::METHOD_ID:
- case MessageResumeBody::METHOD_ID:
- break;
- default:
- throw Channel::UnknownMethod();
- }
-}
-
-void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){
- throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
-}
-
-void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){
- throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
-}
-
-// FIXME aconway 2007-02-23:
-// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){
-// //record delivery tag:
-// consumer.lastDeliveryTag = msg.getDeliveryTag();
-
-// //allow registered listener to handle the message
-// consumer.listener->received(msg);
-
-// if(channel.isOpen()){
-// bool multiple(false);
-// switch(consumer.ackMode){
-// case LAZY_ACK:
-// multiple = true;
-// if(++(consumer.count) < channel.getPrefetch())
-// break;
-// //else drop-through
-// case AUTO_ACK:
-// consumer.lastDeliveryTag = 0;
-// channel.send(
-// new MessageAckBody(
-// channel.version, msg.getDeliveryTag(), multiple));
-// case NO_ACK: // Nothing to do
-// case CLIENT_ACK: // User code must ack.
-// break;
-// // TODO aconway 2007-02-22: Provide a way for user
-// // to ack!
-// }
-// }
-
-// //as it stands, transactionality is entirely orthogonal to ack
-// //mode, though the acks will not be processed by the broker under
-// //a transaction until it commits.
-// }
-
-
-void MessageMessageChannel::run() {
- // FIXME aconway 2007-02-23:
-// while(channel.isOpen()) {
-// try {
-// Message msg = incoming.waitDispatch();
-// if(msg.getMethod()->isA<MessageReturnBody>()) {
-// ReturnedMessageHandler* handler=0;
-// {
-// Mutex::ScopedLock l(lock);
-// handler=returnsHandler;
-// }
-// if(handler == 0) {
-// // TODO aconway 2007-02-20: proper logging.
-// QPID_LOG(warn, "No handler for message.");
-// }
-// else
-// handler->returned(msg);
-// }
-// else {
-// MessageDeliverBody::shared_ptr deliverBody =
-// boost::shared_polymorphic_downcast<MessageDeliverBody>(
-// msg.getMethod());
-// std::string tag = deliverBody->getConsumerTag();
-// Consumer consumer;
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if(i == consumers.end())
-// THROW_QPID_ERROR(PROTOCOL_ERROR+504,
-// "Unknown consumer tag=" + tag);
-// consumer = i->second;
-// }
-// deliver(consumer, msg);
-// }
-// }
-// catch (const ShutdownException&) {
-// /* Orderly shutdown */
-// }
-// catch (const Exception& e) {
-// QPID_LOG(error, e.what());
-// }
-// }
-}
-
-void MessageMessageChannel::setReturnedMessageHandler(
- ReturnedMessageHandler* )
-{
- throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns");
-}
-
-void MessageMessageChannel::setQos(){
- channel.sendAndReceive<MessageOkBody>(
- make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
- if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(
- make_shared_ptr(new TxSelectBody(channel.version)));
-}
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/MessageMessageChannel.h b/cpp/src/qpid/client/MessageMessageChannel.h
deleted file mode 100644
index 44b64b3d80..0000000000
--- a/cpp/src/qpid/client/MessageMessageChannel.h
+++ /dev/null
@@ -1,84 +0,0 @@
-#ifndef _client_MessageMessageChannel_h
-#define _client_MessageMessageChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "MessageChannel.h"
-#include "IncomingMessage.h"
-#include "qpid/sys/Monitor.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
-namespace qpid {
-namespace client {
-/**
- * Messaging implementation using AMQP 0-9 MessageMessageChannel class
- * to send and receiving messages.
- */
-class MessageMessageChannel : public MessageChannel
-{
- public:
- MessageMessageChannel(Channel& parent);
-
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- void cancel(const std::string& tag, bool synch = true);
-
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- void run();
-
- void handle(boost::shared_ptr<framing::AMQMethodBody>);
-
- void handle(shared_ptr<framing::AMQHeaderBody>);
-
- void handle(shared_ptr<framing::AMQContentBody>);
-
- void setQos();
-
- void close();
-
- void cancelAll();
-
- private:
- typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
- Destinations;
-
- std::string newTag();
-
- sys::Mutex lock;
- Channel& channel;
- IncomingMessage incoming;
- long tagCount;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_MessageMessageChannel_h*/
-
diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h
index 516ba6e4e3..eb4188663d 100644
--- a/cpp/src/qpid/client/MethodBodyInstances.h
+++ b/cpp/src/qpid/client/MethodBodyInstances.h
@@ -40,8 +40,8 @@ public:
const qpid::framing::BasicDeliverBody basic_deliver;
const qpid::framing::BasicGetEmptyBody basic_get_empty;
const qpid::framing::BasicGetOkBody basic_get_ok;
- const qpid::framing::BasicQosOkBody basic_qos_ok;
- const qpid::framing::BasicReturnBody basic_return;
+ //const qpid::framing::BasicQosOkBody basic_qos_ok;
+ //const qpid::framing::BasicReturnBody basic_return;
const qpid::framing::ChannelCloseBody channel_close;
const qpid::framing::ChannelCloseOkBody channel_close_ok;
const qpid::framing::ChannelFlowBody channel_flow;
@@ -52,14 +52,14 @@ public:
const qpid::framing::ConnectionRedirectBody connection_redirect;
const qpid::framing::ConnectionStartBody connection_start;
const qpid::framing::ConnectionTuneBody connection_tune;
- const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
- const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
+ //const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
+ //const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
const qpid::framing::QueueDeclareOkBody queue_declare_ok;
const qpid::framing::QueueDeleteOkBody queue_delete_ok;
- const qpid::framing::QueueBindOkBody queue_bind_ok;
- const qpid::framing::TxCommitOkBody tx_commit_ok;
- const qpid::framing::TxRollbackOkBody tx_rollback_ok;
- const qpid::framing::TxSelectOkBody tx_select_ok;
+ //const qpid::framing::QueueBindOkBody queue_bind_ok;
+ //const qpid::framing::TxCommitOkBody tx_commit_ok;
+ //const qpid::framing::TxRollbackOkBody tx_rollback_ok;
+ //const qpid::framing::TxSelectOkBody tx_select_ok;
MethodBodyInstances(uint8_t major, uint8_t minor) :
version(major, minor),
@@ -68,8 +68,8 @@ public:
basic_deliver(version),
basic_get_empty(version),
basic_get_ok(version),
- basic_qos_ok(version),
- basic_return(version),
+ //basic_qos_ok(version),
+ //basic_return(version),
channel_close(version),
channel_close_ok(version),
channel_flow(version),
@@ -80,14 +80,14 @@ public:
connection_redirect(version),
connection_start(version),
connection_tune(version),
- exchange_declare_ok(version),
- exchange_delete_ok(version),
+ //exchange_declare_ok(version),
+ //exchange_delete_ok(version),
queue_declare_ok(version),
- queue_delete_ok(version),
- queue_bind_ok(version),
- tx_commit_ok(version),
- tx_rollback_ok(version),
- tx_select_ok(version)
+ queue_delete_ok(version)//,
+ //queue_bind_ok(version),
+ //tx_commit_ok(version),
+ //tx_rollback_ok(version),
+ //tx_select_ok(version)
{}
};