summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp10
-rw-r--r--cpp/src/qpid/client/SessionImpl.h1
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp464
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.h68
-rw-r--r--cpp/src/qpid/client/amqp0_10/Codecs.cpp299
-rw-r--r--cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp48
-rw-r--r--cpp/src/qpid/client/amqp0_10/CompletionTracker.h50
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp79
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h43
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp241
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.h91
-rw-r--r--cpp/src/qpid/client/amqp0_10/MessageSink.h50
-rw-r--r--cpp/src/qpid/client/amqp0_10/MessageSource.h47
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp146
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h76
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp49
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h58
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp281
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h107
19 files changed, 2208 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index a617335370..8ead44a172 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -202,6 +202,16 @@ bool SessionImpl::isCompleteUpTo(const SequenceNumber& id)
return f.result;
}
+framing::SequenceNumber SessionImpl::getCompleteUpTo()
+{
+ SequenceNumber firstIncomplete;
+ {
+ Lock l(state);
+ firstIncomplete = incompleteIn.front();
+ }
+ return --firstIncomplete;
+}
+
struct MarkCompleted
{
const SequenceNumber& id;
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 3659450236..49d268c44d 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -103,6 +103,7 @@ public:
void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
+ framing::SequenceNumber getCompleteUpTo();
void waitForCompletion(const framing::SequenceNumber& id);
void sendCompletion();
void sendFlush();
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
new file mode 100644
index 0000000000..6ff9c2397a
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -0,0 +1,464 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/ReplyTo.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::Exception;
+using qpid::messaging::Address;
+using qpid::messaging::Filter;
+using qpid::messaging::Variant;
+using qpid::framing::FieldTable;
+using qpid::framing::ReplyTo;
+using namespace qpid::framing::message;
+
+
+namespace{
+const Variant EMPTY_VARIANT;
+const FieldTable EMPTY_FIELD_TABLE;
+const std::string EMPTY_STRING;
+
+//option names
+const std::string BROWSE("browse");
+const std::string EXCLUSIVE("exclusive");
+const std::string MODE("mode");
+const std::string NAME("name");
+const std::string UNACKNOWLEDGED("unacknowledged");
+
+const std::string QUEUE_ADDRESS("queue");
+const std::string TOPIC_ADDRESS("topic");
+const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+");
+const std::string DIVIDER("/");
+
+const std::string SIMPLE_SUBSCRIPTION("simple");
+const std::string RELIABLE_SUBSCRIPTION("reliable");
+const std::string DURABLE_SUBSCRIPTION("durable");
+}
+
+class QueueSource : public MessageSource
+{
+ public:
+ QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED,
+ bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+ private:
+ const std::string name;
+ const AcceptMode acceptMode;
+ const AcquireMode acquireMode;
+ const bool exclusive;
+ const FieldTable options;
+};
+
+class Subscription : public MessageSource
+{
+ public:
+ enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE};
+
+ Subscription(const std::string& name, SubscriptionMode mode = SIMPLE,
+ const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE);
+ void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+
+ static SubscriptionMode getMode(const std::string& mode);
+ private:
+ struct Binding
+ {
+ Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+
+ std::string exchange;
+ std::string key;
+ FieldTable options;
+ };
+
+ typedef std::vector<Binding> Bindings;
+
+ const std::string name;
+ const bool autoDelete;
+ const bool durable;
+ const FieldTable queueOptions;
+ const FieldTable subscriptionOptions;
+ Bindings bindings;
+ std::string queue;
+};
+
+class Exchange : public MessageSink
+{
+ public:
+ Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING,
+ bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false,
+ const FieldTable& options = EMPTY_FIELD_TABLE);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+ const std::string name;
+ const std::string defaultSubject;
+ const bool passive;
+ const std::string type;
+ const bool durable;
+ const FieldTable options;
+};
+
+class QueueSink : public MessageSink
+{
+ public:
+ QueueSink(const std::string& name, bool passive=true, bool exclusive=false,
+ bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+ const std::string name;
+ const bool passive;
+ const bool exclusive;
+ const bool autoDelete;
+ const bool durable;
+ const FieldTable options;
+};
+
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject);
+
+const Variant& getOption(const std::string& key, const Variant::Map& options)
+{
+ Variant::Map::const_iterator i = options.find(key);
+ if (i == options.end()) return EMPTY_VARIANT;
+ else return i->second;
+}
+
+std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
+ const Address& address,
+ const Filter* filter,
+ const Variant::Map& options)
+{
+ //TODO: handle case where there exists a queue and an exchange of
+ //the same name (hence an unqualified address is ambiguous)
+
+ //TODO: make sure specified address type gives sane error message
+ //if it does npt match the configuration on server
+
+ if (isQueue(session, address)) {
+ //TODO: support auto-created queue as source, if requested by specific option
+
+ AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+ AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED;
+ bool exclusive = getOption(EXCLUSIVE, options).asBool();
+ FieldTable arguments;
+ //TODO: extract subscribe arguments from options (e.g. either
+ //filter out already processed keys and send the rest, or have
+ //a nested map)
+
+ std::auto_ptr<MessageSource> source =
+ std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments));
+ return source;
+ } else {
+ //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important)
+ std::auto_ptr<Subscription> bindings =
+ std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(),
+ Subscription::getMode(getOption(MODE, options).asString())));
+
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value);
+ if (result.getNotFound()) {
+ throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+ } else if (result.getType() == "topic") {
+ if (filter) {
+ if (filter->type != Filter::WILDCARD) {
+ throw qpid::framing::NotImplementedException(
+ QPID_MSG("Filters of type " << filter->type << " not supported by address " << address));
+
+ }
+ for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) {
+ bindings->add(address.value, *i, qpid::framing::FieldTable());
+ }
+ } else {
+ //default is to receive all messages
+ bindings->add(address.value, "*", qpid::framing::FieldTable());
+ }
+ } else if (result.getType() == "fanout") {
+ if (filter) {
+ throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address));
+ }
+ bindings->add(address.value, address.value, qpid::framing::FieldTable());
+ } else if (result.getType() == "direct") {
+ //TODO: ????
+ } else {
+ //TODO: xml and headers exchanges
+ throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address));
+ }
+ std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release());
+ return source;
+ }
+}
+
+
+std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Variant::Map& /*options*/)
+{
+ std::auto_ptr<MessageSink> sink;
+ if (isQueue(session, address)) {
+ //TODO: support for auto-created queues as sink
+ sink = std::auto_ptr<MessageSink>(new QueueSink(address.value));
+ } else {
+ std::string subject;
+ if (isTopic(session, address, subject)) {
+ //TODO: support for auto-created exchanges as sink
+ sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject));
+ } else {
+ if (address.type.empty()) {
+ throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+ } else {
+ throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type));
+ }
+ }
+ }
+ return sink;
+}
+
+QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) :
+ name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {}
+
+void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ session.messageSubscribe(arg::queue=name,
+ arg::destination=destination,
+ arg::acceptMode=acceptMode,
+ arg::acquireMode=acquireMode,
+ arg::exclusive=exclusive,
+ arg::arguments=options);
+}
+
+void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ session.messageCancel(destination);
+}
+
+Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions)
+ : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE),
+ queueOptions(qOptions), subscriptionOptions(sOptions) {}
+
+void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
+{
+ bindings.push_back(Binding(exchange, key, options));
+}
+
+void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ if (name.empty()) {
+ //TODO: use same scheme as JMS client for subscription queue name generation?
+ queue = session.getId().getName() + destination;
+ } else {
+ queue = name;
+ }
+ session.queueDeclare(arg::queue=queue, arg::exclusive=true,
+ arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions);
+ for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
+ }
+ AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+}
+
+void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ session.messageCancel(destination);
+ session.queueDelete(arg::queue=queue);
+}
+
+Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
+ exchange(e), key(k), options(o) {}
+
+Subscription::SubscriptionMode Subscription::getMode(const std::string& s)
+{
+ if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE;
+ else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE;
+ else if (s == DURABLE_SUBSCRIPTION) return DURABLE;
+ else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s));
+}
+
+void convert(qpid::messaging::Message& from, qpid::client::Message& to);
+
+Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject,
+ bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) :
+ name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {}
+
+void Exchange::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ //TODO: should this really by synchronous? want to get error if not valid...
+ if (passive) {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } else {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options);
+ }
+}
+
+void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+{
+ qpid::client::Message message;
+ convert(m, message);
+ if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
+ message.getDeliveryProperties().setRoutingKey(defaultSubject);
+ }
+ session.messageTransfer(arg::destination=name, arg::content=message);
+}
+
+void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
+
+QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive,
+ bool _autoDelete, bool _durable, const FieldTable& _options) :
+ name(_name), passive(_passive), exclusive(_exclusive),
+ autoDelete(_autoDelete), durable(_durable), options(_options) {}
+
+void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ //TODO: should this really by synchronous?
+ if (passive) {
+ sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+ } else {
+ sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable,
+ arg::autoDelete=autoDelete, arg::arguments=options);
+ }
+}
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m)
+{
+ qpid::client::Message message;
+ convert(m, message);
+ message.getDeliveryProperties().setRoutingKey(name);
+ session.messageTransfer(arg::content=message);
+}
+
+void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
+
+template <class T> void encode(qpid::messaging::Message& from)
+{
+ T codec;
+ from.encode(codec);
+ from.setContentType(T::contentType);
+}
+
+void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
+
+void convert(qpid::messaging::Message& from, qpid::client::Message& to)
+{
+ //TODO: need to avoid copying as much as possible
+ if (from.getContent().isList()) encode<ListCodec>(from);
+ if (from.getContent().isMap()) encode<MapCodec>(from);
+ to.setData(from.getBytes());
+ to.getDeliveryProperties().setRoutingKey(from.getSubject());
+ //TODO: set other delivery properties
+ to.getMessageProperties().setContentType(from.getContentType());
+ const Address& address = from.getReplyTo();
+ if (!address.value.empty()) {
+ to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
+ }
+ translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
+ //TODO: set other message properties
+}
+
+Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
+{
+ if (rt.getExchange().empty()) {
+ if (rt.getRoutingKey().empty()) {
+ return Address();//empty address
+ } else {
+ return Address(rt.getRoutingKey(), QUEUE_ADDRESS);
+ }
+ } else {
+ if (rt.getRoutingKey().empty()) {
+ return Address(rt.getExchange(), TOPIC_ADDRESS);
+ } else {
+ return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT);
+ }
+ }
+}
+
+qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
+{
+ if (address.type == QUEUE_ADDRESS || address.type.empty()) {
+ return ReplyTo(EMPTY_STRING, address.value);
+ } else if (address.type == TOPIC_ADDRESS) {
+ return ReplyTo(address.value, EMPTY_STRING);
+ } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
+ //need to split the value
+ string::size_type i = address.value.find(DIVIDER);
+ if (i != string::npos) {
+ std::string exchange = address.value.substr(0, i);
+ std::string routingKey;
+ if (i+1 < address.value.size()) {
+ routingKey = address.value.substr(i+1);
+ }
+ return ReplyTo(exchange, routingKey);
+ } else {
+ return ReplyTo(address.value, EMPTY_STRING);
+ }
+ } else {
+ QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type);
+ //treat as queue
+ return ReplyTo(EMPTY_STRING, address.value);
+ }
+}
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
+{
+ return address.type == QUEUE_ADDRESS ||
+ (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value);
+}
+
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject)
+{
+ if (address.type.empty()) {
+ return !session.exchangeQuery(address.value).getNotFound();
+ } else if (address.type == TOPIC_ADDRESS) {
+ return true;
+ } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
+ string::size_type i = address.value.find(DIVIDER);
+ if (i != string::npos) {
+ std::string exchange = address.value.substr(0, i);
+ if (i+1 < address.value.size()) {
+ subject = address.value.substr(i+1);
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/cpp/src/qpid/client/amqp0_10/AddressResolution.h
new file mode 100644
index 0000000000..87758abe6d
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+#define QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Session.h"
+
+namespace qpid {
+
+namespace framing{
+class ReplyTo;
+}
+
+namespace messaging {
+class Address;
+class Filter;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class MessageSource;
+class MessageSink;
+
+/**
+ * Maps from a generic Address and optional Filter to an AMQP 0-10
+ * MessageSource which will then be used by a ReceiverImpl instance
+ * created for the address.
+ */
+class AddressResolution
+{
+ public:
+ std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Filter* filter,
+ const qpid::messaging::Variant::Map& options);
+
+ std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Variant::Map& options);
+
+ static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
+ static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
+
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/Codecs.cpp b/cpp/src/qpid/client/amqp0_10/Codecs.cpp
new file mode 100644
index 0000000000..9aee3118fe
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/Codecs.cpp
@@ -0,0 +1,299 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/List.h"
+#include <algorithm>
+#include <functional>
+
+using namespace qpid::framing;
+using namespace qpid::messaging;
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+namespace {
+const std::string iso885915("iso-8859-15");
+const std::string utf8("utf8");
+const std::string utf16("utf16");
+const std::string amqp0_10_binary("amqp0-10:binary");
+const std::string amqp0_10_bit("amqp0-10:bit");
+const std::string amqp0_10_datetime("amqp0-10:datetime");
+const std::string amqp0_10_struct("amqp0-10:struct");
+}
+
+template <class T, class U, class F> void convert(const T& from, U& to, F f)
+{
+ std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f);
+}
+
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in);
+FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in);
+Variant toVariant(boost::shared_ptr<FieldValue> in);
+boost::shared_ptr<FieldValue> toFieldValue(const Variant& in);
+
+template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f)
+{
+ T t;
+ getEncodedValue<T>(in, t);
+ convert(t, u, f);
+}
+
+template <class T, class U, class F> T* toFieldValueCollection(const U& u, F f)
+{
+ typename T::ValueType t;
+ convert(u, t, f);
+ return new T(t);
+}
+
+FieldTableValue* toFieldTableValue(const Variant::Map& map)
+{
+ FieldTable ft;
+ convert(map, ft, &toFieldTableEntry);
+ return new FieldTableValue(ft);
+}
+
+ListValue* toListValue(const Variant::List& list)
+{
+ List l;
+ convert(list, l, &toFieldValue);
+ return new ListValue(l);
+}
+
+void setEncodingFor(Variant& out, uint8_t code)
+{
+ switch(code){
+ case 0x80:
+ case 0x90:
+ case 0xa0:
+ out.setEncoding(amqp0_10_binary);
+ break;
+ case 0x84:
+ case 0x94:
+ out.setEncoding(iso885915);
+ break;
+ case 0x85:
+ case 0x95:
+ out.setEncoding(utf8);
+ break;
+ case 0x86:
+ case 0x96:
+ out.setEncoding(utf16);
+ break;
+ case 0xab:
+ out.setEncoding(amqp0_10_struct);
+ break;
+ default:
+ //do nothing
+ break;
+ }
+}
+
+Variant toVariant(boost::shared_ptr<FieldValue> in)
+{
+ Variant out;
+ //based on AMQP 0-10 typecode, pick most appropriate variant type
+ switch (in->getType()) {
+ //Fixed Width types:
+ case 0x01: out.setEncoding(amqp0_10_binary);
+ case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;
+ case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;
+ case 0x04: break; //TODO: iso-8859-15 char
+ case 0x08: out = in->getIntegerValue<bool, 1>(); break;
+ case 0x010: out.setEncoding(amqp0_10_binary);
+ case 0x011: out = in->getIntegerValue<int16_t, 2>(); break;
+ case 0x012: out = in->getIntegerValue<uint16_t, 2>(); break;
+ case 0x020: out.setEncoding(amqp0_10_binary);
+ case 0x021: out = in->getIntegerValue<int32_t, 4>(); break;
+ case 0x022: out = in->getIntegerValue<uint32_t, 4>(); break;
+ case 0x023: out = in->get<float>(); break;
+ case 0x027: break; //TODO: utf-32 char
+ case 0x030: out.setEncoding(amqp0_10_binary);
+ case 0x031: out = in->getIntegerValue<int64_t, 8>(); break;
+ case 0x038: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+ case 0x032: out = in->getIntegerValue<uint64_t, 8>(); break;
+ case 0x033:out = in->get<double>(); break;
+
+ //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+ case 0xf0: break;//void, which is the default value for Variant
+ case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+
+ //Variable Width types:
+ //strings:
+ case 0x80:
+ case 0x84:
+ case 0x85:
+ case 0x86:
+ case 0x90:
+ case 0x94:
+ case 0x95:
+ case 0x96:
+ case 0xa0:
+ case 0xab:
+ setEncodingFor(out, in->getType());
+ out = in->get<std::string>();
+ break;
+
+ case 0xa8:
+ out = Variant::Map();
+ translate<FieldTable>(in, out.asMap(), &toVariantMapEntry);
+ break;
+
+ case 0xa9:
+ out = Variant::List();
+ translate<List>(in, out.asList(), &toVariant);
+ break;
+ case 0xaa: //convert amqp0-10 array into variant list
+ out = Variant::List();
+ translate<Array>(in, out.asList(), &toVariant);
+ break;
+
+ default:
+ //error?
+ break;
+ }
+ return out;
+}
+
+boost::shared_ptr<FieldValue> toFieldValue(const Variant& in)
+{
+ boost::shared_ptr<FieldValue> out;
+ switch (in.getType()) {
+ case VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break;
+ case BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break;
+ case UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break;
+ case UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break;
+ case UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break;
+ case UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break;
+ case INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break;
+ case INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break;
+ case INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break;
+ case INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break;
+ case FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break;
+ case DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break;
+ //TODO: check encoding (and length?) when deciding what AMQP type to treat string as
+ case STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break;
+ case MAP:
+ //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<FieldTableValue>(in.asMap(), &toFieldTableEntry));
+ out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap()));
+ break;
+ case LIST:
+ //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<ListValue>(in.asList(), &toFieldValue));
+ out = boost::shared_ptr<FieldValue>(toListValue(in.asList()));
+ break;
+ }
+ return out;
+}
+
+Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in)
+{
+ return Variant::Map::value_type(in.first, toVariant(in.second));
+}
+
+FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in)
+{
+ return FieldTable::value_type(in.first, toFieldValue(in.second));
+}
+
+struct EncodeBuffer
+{
+ char* data;
+ Buffer buffer;
+
+ EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {}
+ ~EncodeBuffer() { delete[] data; }
+
+ template <class T> void encode(T& t) { t.encode(buffer); }
+
+ void getData(std::string& s) {
+ s.assign(data, buffer.getSize());
+ }
+};
+
+struct DecodeBuffer
+{
+ Buffer buffer;
+
+ DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {}
+
+ template <class T> void decode(T& t) { t.decode(buffer); }
+
+};
+
+template <class T, class U, class F> void _encode(const U& value, std::string& data, F f)
+{
+ T t;
+ convert(value, t, f);
+ EncodeBuffer buffer(t.encodedSize());
+ buffer.encode(t);
+ buffer.getData(data);
+}
+
+template <class T, class U, class F> void _decode(const std::string& data, U& value, F f)
+{
+ T t;
+ DecodeBuffer buffer(data);
+ buffer.decode(t);
+ convert(t, value, f);
+}
+
+void MapCodec::encode(const Variant& value, std::string& data)
+{
+ _encode<FieldTable>(value.asMap(), data, &toFieldTableEntry);
+}
+
+void MapCodec::decode(const std::string& data, Variant& value)
+{
+ value = Variant::Map();
+ _decode<FieldTable>(data, value.asMap(), &toVariantMapEntry);
+}
+
+void ListCodec::encode(const Variant& value, std::string& data)
+{
+ _encode<List>(value.asList(), data, &toFieldValue);
+}
+
+void ListCodec::decode(const std::string& data, Variant& value)
+{
+ value = Variant::List();
+ _decode<List>(data, value.asList(), &toVariant);
+}
+
+void translate(const Variant::Map& from, FieldTable& to)
+{
+ convert(from, to, &toFieldTableEntry);
+}
+
+void translate(const FieldTable& from, Variant::Map& to)
+{
+ convert(from, to, &toVariantMapEntry);
+}
+
+const std::string ListCodec::contentType("amqp0_10/list");
+const std::string MapCodec::contentType("amqp0_10/map");
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp b/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
new file mode 100644
index 0000000000..52b623b65c
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "CompletionTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::framing::SequenceNumber;
+
+void CompletionTracker::track(SequenceNumber command, void* token)
+{
+ tokens[command] = token;
+}
+
+void CompletionTracker::completedTo(SequenceNumber command)
+{
+ Tokens::iterator i = tokens.lower_bound(command);
+ if (i != tokens.end()) {
+ lastCompleted = i->second;
+ tokens.erase(tokens.begin(), ++i);
+ }
+}
+
+void* CompletionTracker::getLastCompletedToken()
+{
+ return lastCompleted;
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/CompletionTracker.h b/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
new file mode 100644
index 0000000000..6147c5682e
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
+#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceNumber.h"
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Provides a mapping from command ids to application supplied
+ * 'tokens', and is used to determine when the sending or
+ * acknowledging of a specific message is complete.
+ */
+class CompletionTracker
+{
+ public:
+ void track(qpid::framing::SequenceNumber command, void* token);
+ void completedTo(qpid::framing::SequenceNumber command);
+ void* getLastCompletedToken();
+ private:
+ typedef std::map<qpid::framing::SequenceNumber, void*> Tokens;
+ Tokens tokens;
+ void* lastCompleted;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
new file mode 100644
index 0000000000..9f738731e2
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionImpl.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Variant;
+
+template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
+{
+ Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ value = (T) i->second;
+ }
+}
+
+void convert(const Variant::Map& from, ConnectionSettings& to)
+{
+ setIfFound(from, "username", to.username);
+ setIfFound(from, "password", to.password);
+ setIfFound(from, "sasl-mechanism", to.mechanism);
+ setIfFound(from, "sasl-service", to.service);
+ setIfFound(from, "sasl-min-ssf", to.minSsf);
+ setIfFound(from, "sasl-max-ssf", to.maxSsf);
+
+ setIfFound(from, "heartbeat", to.heartbeat);
+ setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
+
+ setIfFound(from, "locale", to.locale);
+ setIfFound(from, "max-channels", to.maxChannels);
+ setIfFound(from, "max-frame-size", to.maxFrameSize);
+ setIfFound(from, "bounds", to.bounds);
+}
+
+ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
+{
+ QPID_LOG(debug, "Opening connection to " << url << " with " << options);
+ Url u(url);
+ ConnectionSettings settings;
+ convert(options, settings);
+ connection.open(u, settings);
+}
+
+void ConnectionImpl::close()
+{
+ connection.close();
+}
+
+qpid::messaging::Session ConnectionImpl::newSession()
+{
+ qpid::messaging::Session impl(new SessionImpl(connection.newSession()));
+ return impl;
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
new file mode 100644
index 0000000000..120a8ab9d8
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -0,0 +1,43 @@
+#ifndef QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+#define QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/ConnectionImpl.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Connection.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class ConnectionImpl : public qpid::messaging::ConnectionImpl
+{
+ public:
+ ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options);
+ void close();
+ qpid::messaging::Session newSession();
+ private:
+ qpid::client::Connection connection;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
new file mode 100644
index 0000000000..83e1b48bed
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/IncomingMessages.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using namespace qpid::framing;
+using namespace qpid::framing::message;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::messaging::Variant;
+
+namespace {
+const std::string EMPTY_STRING;
+
+
+struct GetNone : IncomingMessages::Handler
+{
+ bool accept(IncomingMessages::MessageTransfer&) { return false; }
+};
+
+struct GetAny : IncomingMessages::Handler
+{
+ bool accept(IncomingMessages::MessageTransfer& transfer)
+ {
+ transfer.retrieve(0);
+ return true;
+ }
+};
+
+struct MatchAndTrack
+{
+ const std::string destination;
+ SequenceSet ids;
+
+ MatchAndTrack(const std::string& d) : destination(d) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ids.add(command->getId());
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+}
+
+IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) :
+ session(s),
+ incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {}
+
+bool IncomingMessages::get(Handler& handler, Duration timeout)
+{
+ //search through received list for any transfer of interest:
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+ {
+ MessageTransfer transfer(*i, *this);
+ if (handler.accept(transfer)) {
+ received.erase(i);
+ return true;
+ }
+ }
+ //none found, check incoming:
+ return process(&handler, timeout);
+}
+
+void IncomingMessages::accept()
+{
+ session.messageAccept(unaccepted);
+ unaccepted.clear();
+}
+
+void IncomingMessages::releaseAll()
+{
+ //first process any received messages...
+ while (!received.empty()) {
+ retrieve(received.front(), 0);
+ received.pop_front();
+ }
+ //then pump out any available messages from incoming queue...
+ GetAny handler;
+ while (process(&handler, 0));
+ //now release all messages
+ session.messageRelease(unaccepted);
+ unaccepted.clear();
+}
+
+void IncomingMessages::releasePending(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0));
+
+ //now remove all messages for this destination from received list, recording their ids...
+ MatchAndTrack match(destination);
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i);
+ //now release those messages
+ session.messageRelease(match.ids);
+}
+
+/**
+ * Get a frameset from session queue, waiting for up to the specified
+ * duration and returning true if this could be achieved, false
+ * otherwise. If a destination is supplied, only return a message for
+ * that destination. In this case messages from other destinations
+ * will be held on a received queue.
+ */
+bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ MessageTransfer transfer(content, *this);
+ if (handler && handler->accept(transfer)) {
+ QPID_LOG(debug, "Delivered " << *content->getMethod());
+ return true;
+ } else {
+ //received message for another destination, keep for later
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ received.push_back(content);
+ }
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ return false;
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command);
+
+/**
+ * Called when message is retrieved; records retrieval for subsequent
+ * acceptance, marks the command as completed and converts command to
+ * message if message is required
+ */
+void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message)
+{
+ if (message) {
+ populate(*message, *command);
+ }
+ const MessageTransferBody* transfer = command->as<MessageTransferBody>();
+ if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
+ unaccepted.add(command->getId());
+ }
+ session.markCompleted(command->getId(), false, false);
+}
+
+IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {}
+
+const std::string& IncomingMessages::MessageTransfer::getDestination()
+{
+ return content->as<MessageTransferBody>()->getDestination();
+}
+void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message)
+{
+ parent.retrieve(content, message);
+}
+
+void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp
+
+void populateHeaders(qpid::messaging::Message& message,
+ const DeliveryProperties* deliveryProperties,
+ const MessageProperties* messageProperties)
+{
+ if (deliveryProperties) {
+ message.setSubject(deliveryProperties->getRoutingKey());
+ //TODO: convert other delivery properties
+ }
+ if (messageProperties) {
+ message.setContentType(messageProperties->getContentType());
+ if (messageProperties->hasReplyTo()) {
+ message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
+ }
+ translate(messageProperties->getApplicationHeaders(), message.getHeaders());
+ //TODO: convert other message properties
+ }
+}
+
+void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers)
+{
+ populateHeaders(message, headers->get<DeliveryProperties>(), headers->get<MessageProperties>());
+}
+
+void populate(qpid::messaging::Message& message, FrameSet& command)
+{
+ //need to be able to link the message back to the transfer it was delivered by
+ //e.g. for rejecting. TODO: hide this from API
+ uint32_t commandId = command.getId();
+ message.setInternalId(reinterpret_cast<void*>(commandId));
+
+ command.getContent(message.getBytes());
+
+ populateHeaders(message, command.getHeaders());
+
+ //decode content if necessary
+ if (message.getContentType() == ListCodec::contentType) {
+ ListCodec codec;
+ message.decode(codec);
+ } else if (message.getContentType() == MapCodec::contentType) {
+ MapCodec codec;
+ message.decode(codec);
+ }
+}
+
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
new file mode 100644
index 0000000000..c4346fd7d7
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -0,0 +1,91 @@
+#ifndef QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+#define QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/BlockingQueue.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+
+namespace framing{
+class FrameSet;
+}
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+/**
+ *
+ */
+class IncomingMessages
+{
+ public:
+ typedef boost::shared_ptr<qpid::framing::FrameSet> FrameSetPtr;
+ class MessageTransfer
+ {
+ public:
+ const std::string& getDestination();
+ void retrieve(qpid::messaging::Message* message);
+ private:
+ FrameSetPtr content;
+ IncomingMessages& parent;
+
+ MessageTransfer(FrameSetPtr, IncomingMessages&);
+ friend class IncomingMessages;
+ };
+
+ struct Handler
+ {
+ virtual ~Handler() {}
+ virtual bool accept(MessageTransfer& transfer) = 0;
+ };
+
+ IncomingMessages(qpid::client::AsyncSession session);
+ bool get(Handler& handler, qpid::sys::Duration timeout);
+ //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ void accept();
+ void releaseAll();
+ void releasePending(const std::string& destination);
+ private:
+ typedef std::deque<FrameSetPtr> FrameSetQueue;
+
+ qpid::client::AsyncSession session;
+ qpid::framing::SequenceSet unaccepted;
+ boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
+ FrameSetQueue received;
+
+ bool process(Handler*, qpid::sys::Duration);
+ void retrieve(FrameSetPtr, qpid::messaging::Message*);
+
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/MessageSink.h b/cpp/src/qpid/client/amqp0_10/MessageSink.h
new file mode 100644
index 0000000000..19d5e4ef82
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/MessageSink.h
@@ -0,0 +1,50 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+
+namespace messaging {
+class Message;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+/**
+ *
+ */
+class MessageSink
+{
+ public:
+ virtual ~MessageSink() {}
+ virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0;
+ virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0;
+ virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0;
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESINK_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/MessageSource.h b/cpp/src/qpid/client/amqp0_10/MessageSource.h
new file mode 100644
index 0000000000..74f2732f59
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/MessageSource.h
@@ -0,0 +1,47 @@
+#ifndef QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+#define QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/client/AsyncSession.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Abstraction behind which the AMQP 0-10 commands required to
+ * establish (and tear down) an incoming stream of messages from a
+ * given address are hidden.
+ */
+class MessageSource
+{
+ public:
+ virtual ~MessageSource() {}
+ virtual void subscribe(qpid::client::AsyncSession& session, const std::string& destination) = 0;
+ virtual void cancel(qpid::client::AsyncSession& session, const std::string& destination) = 0;
+
+ private:
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
new file mode 100644
index 0000000000..e6ed4bfc4e
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverImpl.h"
+#include "MessageSource.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/MessageListener.h"
+#include "qpid/messaging/Receiver.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Receiver;
+
+void ReceiverImpl::received(qpid::messaging::Message&)
+{
+ //TODO: should this be configurable
+ if (capacity && --window <= capacity/2) {
+ session.sendCompletion();
+ window = capacity;
+ }
+}
+
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ return parent.get(*this, message, timeout);
+}
+
+qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
+ return result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ if (capacity == 0 && !cancelled) {
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+ }
+
+ if (get(message, timeout)) {
+ return true;
+ } else {
+ if (!cancelled) {
+ sync(session).messageFlush(destination);
+ start();//reallocate credit
+ }
+ return get(message, 0);
+ }
+}
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
+ return result;
+}
+
+void ReceiverImpl::cancel()
+{
+ if (!cancelled) {
+ //TODO: should syncronicity be an optional argument to this call?
+ source->cancel(session, destination);
+ //need to be sure cancel is complete and all incoming
+ //framesets are processed before removing the receiver
+ parent.receiverCancelled(destination);
+ cancelled = true;
+ }
+}
+
+void ReceiverImpl::start()
+{
+ if (!cancelled) {
+ started = true;
+ session.messageSetFlowMode(destination, capacity > 0);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+ window = capacity;
+ }
+}
+
+void ReceiverImpl::stop()
+{
+ session.messageStop(destination);
+ started = false;
+}
+
+void ReceiverImpl::subscribe()
+{
+ source->subscribe(session, destination);
+}
+
+void ReceiverImpl::setSession(qpid::client::AsyncSession s)
+{
+ session = s;
+ if (!cancelled) {
+ subscribe();
+ //if we were in started state before the session was changed,
+ //start again on this new session
+ //TODO: locking if receiver is to be threadsafe...
+ if (started) start();
+ }
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+ if (c != capacity) {
+ capacity = c;
+ if (!cancelled && started) {
+ stop();
+ start();
+ }
+ }
+}
+
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) :
+ parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF),
+ capacity(0), started(false), cancelled(false), listener(0), window(0) {}
+
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
new file mode 100644
index 0000000000..b549242d35
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+#define QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/sys/Time.h"
+#include <memory>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class MessageSource;
+class SessionImpl;
+
+/**
+ * A receiver implementation based on an AMQP 0-10 subscription.
+ */
+class ReceiverImpl : public qpid::messaging::ReceiverImpl
+{
+ public:
+
+ ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source);
+
+ bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ qpid::messaging::Message get(qpid::sys::Duration timeout);
+ bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ qpid::messaging::Message fetch(qpid::sys::Duration timeout);
+ void cancel();
+ void start();
+ void stop();
+ void subscribe();
+ void setSession(qpid::client::AsyncSession s);
+ const std::string& getName() const;
+ void setCapacity(uint32_t);
+ void setListener(qpid::messaging::MessageListener* listener);
+ qpid::messaging::MessageListener* getListener();
+ void received(qpid::messaging::Message& message);
+ private:
+ SessionImpl& parent;
+ const std::auto_ptr<MessageSource> source;
+ const std::string destination;
+ const uint32_t byteCredit;
+
+ uint32_t capacity;
+ qpid::client::AsyncSession session;
+ bool started;
+ bool cancelled;
+ qpid::messaging::MessageListener* listener;
+ uint32_t window;
+};
+
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
new file mode 100644
index 0000000000..ac36eb1537
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SenderImpl.h"
+#include "MessageSink.h"
+#include "SessionImpl.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) :
+ parent(_parent), name(_name), sink(_sink) {}
+
+void SenderImpl::send(qpid::messaging::Message& m)
+{
+ sink->send(session, name, m);
+}
+
+void SenderImpl::cancel()
+{
+ sink->cancel(session, name);
+ parent.senderCancelled(name);
+}
+
+void SenderImpl::setSession(qpid::client::AsyncSession s)
+{
+ session = s;
+ sink->declare(session, name);
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
new file mode 100644
index 0000000000..e737450ba1
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -0,0 +1,58 @@
+#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/SenderImpl.h"
+#include "qpid/client/AsyncSession.h"
+#include <memory>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+class MessageSink;
+class SessionImpl;
+
+/**
+ *
+ */
+class SenderImpl : public qpid::messaging::SenderImpl
+{
+ public:
+ SenderImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSink> sink);
+ void send(qpid::messaging::Message&);
+ void cancel();
+ void setSession(qpid::client::AsyncSession);
+
+ private:
+ SessionImpl& parent;
+ const std::string name;
+ std::auto_ptr<MessageSink> sink;
+
+ qpid::client::AsyncSession session;
+ std::string destination;
+ std::string routingKey;
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
new file mode 100644
index 0000000000..647ace5f92
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/SessionImpl.h"
+#include "qpid/client/amqp0_10/ReceiverImpl.h"
+#include "qpid/client/amqp0_10/SenderImpl.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/PrivateImplRef.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageListener.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/framing/reply_exceptions.h"
+#include <boost/format.hpp>
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+using qpid::messaging::Filter;
+using qpid::messaging::Sender;
+using qpid::messaging::Receiver;
+using qpid::messaging::VariantMap;
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+SessionImpl::SessionImpl(qpid::client::Session s) : session(s), incoming(session) {}
+
+
+void SessionImpl::commit()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ incoming.accept();
+ session.txCommit();
+}
+
+void SessionImpl::rollback()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop();
+ //ensure that stop has been processed and all previously sent
+ //messages are available for release:
+ session.sync();
+ incoming.releaseAll();
+ session.txRollback();
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start();
+}
+
+void SessionImpl::acknowledge()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ incoming.accept();
+}
+
+void SessionImpl::reject(qpid::messaging::Message& m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ //TODO: how do I get the id of the original transfer command? think this through some more...
+ SequenceNumber id(reinterpret_cast<uint32_t>(m.getInternalId()));
+ SequenceSet set;
+ set.add(id);
+ session.messageReject(set);
+}
+
+void SessionImpl::close()
+{
+ session.close();
+}
+
+void translate(const VariantMap& options, SubscriptionSettings& settings)
+{
+ //TODO: fill this out
+ VariantMap::const_iterator i = options.find("auto_acknowledge");
+ if (i != options.end()) {
+ settings.autoAck = i->second.asInt32();
+ }
+}
+
+template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
+{
+ return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t));
+}
+
+template <class T> void getFreeKey(std::string& key, T& map)
+{
+ std::string name = key;
+ int count = 1;
+ for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) {
+ name = (boost::format("%1%_%2%") % key % ++count).str();
+ }
+ key = name;
+}
+
+Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ std::auto_ptr<MessageSink> sink = resolver.resolveSink(session, address, options);
+ std::string name = address;
+ getFreeKey(name, senders);
+ Sender sender(new SenderImpl(*this, name, sink));
+ getImplPtr<Sender, SenderImpl>(sender)->setSession(session);
+ senders[name] = sender;
+ return sender;
+}
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options)
+{
+ return addReceiver(address, 0, options);
+}
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const Filter& filter, const VariantMap& options)
+{
+ return addReceiver(address, &filter, options);
+}
+
+Receiver SessionImpl::addReceiver(const qpid::messaging::Address& address, const Filter* filter, const VariantMap& options)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ std::auto_ptr<MessageSource> source = resolver.resolveSource(session, address, filter, options);
+ std::string name = address;
+ getFreeKey(name, receivers);
+ Receiver receiver(new ReceiverImpl(*this, name, source));
+ getImplPtr<Receiver, ReceiverImpl>(receiver)->setSession(session);
+ receivers[name] = receiver;
+ return receiver;
+}
+
+qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName)
+{
+ std::string name = baseName + std::string("_") + session.getId().getName();
+ session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true);
+ return qpid::messaging::Address(name);
+}
+
+SessionImpl& SessionImpl::convert(qpid::messaging::Session& s)
+{
+ boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s);
+ if (!impl) {
+ throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
+ }
+ return *impl;
+}
+
+namespace {
+
+struct IncomingMessageHandler : IncomingMessages::Handler
+{
+ typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback;
+ Callback callback;
+
+ IncomingMessageHandler(Callback c) : callback(c) {}
+
+ bool accept(IncomingMessages::MessageTransfer& transfer)
+ {
+ return callback(transfer);
+ }
+};
+
+}
+
+bool SessionImpl::accept(ReceiverImpl* receiver,
+ qpid::messaging::Message* message,
+ bool isDispatch,
+ IncomingMessages::MessageTransfer& transfer)
+{
+ if (receiver->getName() == transfer.getDestination()) {
+ transfer.retrieve(message);
+ if (isDispatch) {
+ qpid::sys::Mutex::ScopedUnlock u(lock);
+ qpid::messaging::MessageListener* listener = receiver->getListener();
+ if (listener) listener->received(*message);
+ }
+ receiver->received(*message);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, IncomingMessages::MessageTransfer& transfer)
+{
+ Receivers::iterator i = receivers.find(transfer.getDestination());
+ if (i == receivers.end()) {
+ QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
+ return false;
+ } else {
+ boost::intrusive_ptr<ReceiverImpl> receiver = getImplPtr<Receiver, ReceiverImpl>(i->second);
+ return receiver && (!isDispatch || receiver->getListener()) && accept(receiver.get(), message, isDispatch, transfer);
+ }
+}
+
+bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return incoming.get(handler, timeout);
+}
+
+bool SessionImpl::dispatch(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message message;
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1));
+ return getIncoming(handler, timeout);
+}
+
+bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1));
+ return getIncoming(handler, timeout);
+}
+
+bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1));
+ return getIncoming(handler, timeout);
+}
+
+qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
+ return result;
+}
+
+void SessionImpl::receiverCancelled(const std::string& name)
+{
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ receivers.erase(name);
+ }
+ session.sync();
+ incoming.releasePending(name);
+}
+
+void SessionImpl::senderCancelled(const std::string& name)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ senders.erase(name);
+}
+
+void SessionImpl::sync()
+{
+ session.sync();
+}
+
+void SessionImpl::flush()
+{
+ session.flush();
+}
+
+void* SessionImpl::getLastConfirmedSent()
+{
+ return 0;
+}
+
+void* SessionImpl::getLastConfirmedAcknowledged()
+{
+ return 0;
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
new file mode 100644
index 0000000000..6926fb0235
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -0,0 +1,107 @@
+#ifndef QPID_CLIENT_AMQP0_10_SESSIONIMPL_H
+#define QPID_CLIENT_AMQP0_10_SESSIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/SessionImpl.h"
+#include "qpid/messaging/Variant.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/client/amqp0_10/IncomingMessages.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+
+namespace messaging {
+class Address;
+class Filter;
+class Message;
+class Receiver;
+class Sender;
+class Session;
+}
+
+namespace client {
+namespace amqp0_10 {
+
+class ReceiverImpl;
+class SenderImpl;
+
+/**
+ * Implementation of the protocol independent Session interface using
+ * AMQP 0-10.
+ */
+class SessionImpl : public qpid::messaging::SessionImpl
+{
+ public:
+ SessionImpl(qpid::client::Session);
+ void commit();
+ void rollback();
+ void acknowledge();
+ void reject(qpid::messaging::Message&);
+ void close();
+ void sync();
+ void flush();
+ qpid::messaging::Address createTempQueue(const std::string& baseName);
+ qpid::messaging::Sender createSender(const qpid::messaging::Address& address,
+ const qpid::messaging::VariantMap& options);
+ qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
+ const qpid::messaging::VariantMap& options);
+ qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
+ const qpid::messaging::Filter& filter,
+ const qpid::messaging::VariantMap& options);
+
+ void* getLastConfirmedSent();
+ void* getLastConfirmedAcknowledged();
+
+ bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ qpid::messaging::Message fetch(qpid::sys::Duration timeout);
+ bool dispatch(qpid::sys::Duration timeout);
+
+ bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout);
+
+ void receiverCancelled(const std::string& name);
+ void senderCancelled(const std::string& name);
+
+ static SessionImpl& convert(qpid::messaging::Session&);
+
+ qpid::client::Session session;
+ private:
+ typedef std::map<std::string, qpid::messaging::Receiver> Receivers;
+ typedef std::map<std::string, qpid::messaging::Sender> Senders;
+
+ qpid::sys::Mutex lock;
+ AddressResolution resolver;
+ IncomingMessages incoming;
+ Receivers receivers;
+ Senders senders;
+
+ qpid::messaging::Receiver addReceiver(const qpid::messaging::Address& address,
+ const qpid::messaging::Filter* filter,
+ const qpid::messaging::VariantMap& options);
+ bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
+ bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
+ bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_SESSIONIMPL_H*/