diff options
| author | Gordon Sim <gsim@apache.org> | 2009-08-25 17:57:34 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-08-25 17:57:34 +0000 |
| commit | 082fa377137d1a73382a0c3f1ab22b5abe6cb485 (patch) | |
| tree | 27375051e0f05a91ff63f123b2b027916840221c /cpp/src/qpid/client | |
| parent | 28e1de98b115ebc834a1e232bfd630809689a59e (diff) | |
| download | qpid-python-082fa377137d1a73382a0c3f1ab22b5abe6cb485.tar.gz | |
QPID-664: Initial checkin of high level messaging api for c++
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@807731 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
19 files changed, 2208 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index a617335370..8ead44a172 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -202,6 +202,16 @@ bool SessionImpl::isCompleteUpTo(const SequenceNumber& id) return f.result; } +framing::SequenceNumber SessionImpl::getCompleteUpTo() +{ + SequenceNumber firstIncomplete; + { + Lock l(state); + firstIncomplete = incompleteIn.front(); + } + return --firstIncomplete; +} + struct MarkCompleted { const SequenceNumber& id; diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 3659450236..49d268c44d 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -103,6 +103,7 @@ public: void markCompleted(const framing::SequenceSet& ids, bool notifyPeer); bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); + framing::SequenceNumber getCompleteUpTo(); void waitForCompletion(const framing::SequenceNumber& id); void sendCompletion(); void sendFlush(); diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp new file mode 100644 index 0000000000..6ff9c2397a --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -0,0 +1,464 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/MessageSource.h" +#include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Filter.h" +#include "qpid/messaging/Message.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/ReplyTo.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::Exception; +using qpid::messaging::Address; +using qpid::messaging::Filter; +using qpid::messaging::Variant; +using qpid::framing::FieldTable; +using qpid::framing::ReplyTo; +using namespace qpid::framing::message; + + +namespace{ +const Variant EMPTY_VARIANT; +const FieldTable EMPTY_FIELD_TABLE; +const std::string EMPTY_STRING; + +//option names +const std::string BROWSE("browse"); +const std::string EXCLUSIVE("exclusive"); +const std::string MODE("mode"); +const std::string NAME("name"); +const std::string UNACKNOWLEDGED("unacknowledged"); + +const std::string QUEUE_ADDRESS("queue"); +const std::string TOPIC_ADDRESS("topic"); +const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+"); +const std::string DIVIDER("/"); + +const std::string SIMPLE_SUBSCRIPTION("simple"); +const std::string RELIABLE_SUBSCRIPTION("reliable"); +const std::string DURABLE_SUBSCRIPTION("durable"); +} + +class QueueSource : public MessageSource +{ + public: + QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED, + bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE); + void subscribe(qpid::client::AsyncSession& session, const std::string& destination); + void cancel(qpid::client::AsyncSession& session, const std::string& destination); + private: + const std::string name; + const AcceptMode acceptMode; + const AcquireMode acquireMode; + const bool exclusive; + const FieldTable options; +}; + +class Subscription : public MessageSource +{ + public: + enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE}; + + Subscription(const std::string& name, SubscriptionMode mode = SIMPLE, + const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE); + void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + void subscribe(qpid::client::AsyncSession& session, const std::string& destination); + void cancel(qpid::client::AsyncSession& session, const std::string& destination); + + static SubscriptionMode getMode(const std::string& mode); + private: + struct Binding + { + Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + + std::string exchange; + std::string key; + FieldTable options; + }; + + typedef std::vector<Binding> Bindings; + + const std::string name; + const bool autoDelete; + const bool durable; + const FieldTable queueOptions; + const FieldTable subscriptionOptions; + Bindings bindings; + std::string queue; +}; + +class Exchange : public MessageSink +{ + public: + Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING, + bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, + const FieldTable& options = EMPTY_FIELD_TABLE); + void declare(qpid::client::AsyncSession& session, const std::string& name); + void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message); + void cancel(qpid::client::AsyncSession& session, const std::string& name); + private: + const std::string name; + const std::string defaultSubject; + const bool passive; + const std::string type; + const bool durable; + const FieldTable options; +}; + +class QueueSink : public MessageSink +{ + public: + QueueSink(const std::string& name, bool passive=true, bool exclusive=false, + bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE); + void declare(qpid::client::AsyncSession& session, const std::string& name); + void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message); + void cancel(qpid::client::AsyncSession& session, const std::string& name); + private: + const std::string name; + const bool passive; + const bool exclusive; + const bool autoDelete; + const bool durable; + const FieldTable options; +}; + + +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address); +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject); + +const Variant& getOption(const std::string& key, const Variant::Map& options) +{ + Variant::Map::const_iterator i = options.find(key); + if (i == options.end()) return EMPTY_VARIANT; + else return i->second; +} + +std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session, + const Address& address, + const Filter* filter, + const Variant::Map& options) +{ + //TODO: handle case where there exists a queue and an exchange of + //the same name (hence an unqualified address is ambiguous) + + //TODO: make sure specified address type gives sane error message + //if it does npt match the configuration on server + + if (isQueue(session, address)) { + //TODO: support auto-created queue as source, if requested by specific option + + AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; + AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED; + bool exclusive = getOption(EXCLUSIVE, options).asBool(); + FieldTable arguments; + //TODO: extract subscribe arguments from options (e.g. either + //filter out already processed keys and send the rest, or have + //a nested map) + + std::auto_ptr<MessageSource> source = + std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments)); + return source; + } else { + //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important) + std::auto_ptr<Subscription> bindings = + std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(), + Subscription::getMode(getOption(MODE, options).asString()))); + + qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value); + if (result.getNotFound()) { + throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); + } else if (result.getType() == "topic") { + if (filter) { + if (filter->type != Filter::WILDCARD) { + throw qpid::framing::NotImplementedException( + QPID_MSG("Filters of type " << filter->type << " not supported by address " << address)); + + } + for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) { + bindings->add(address.value, *i, qpid::framing::FieldTable()); + } + } else { + //default is to receive all messages + bindings->add(address.value, "*", qpid::framing::FieldTable()); + } + } else if (result.getType() == "fanout") { + if (filter) { + throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address)); + } + bindings->add(address.value, address.value, qpid::framing::FieldTable()); + } else if (result.getType() == "direct") { + //TODO: ???? + } else { + //TODO: xml and headers exchanges + throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address)); + } + std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release()); + return source; + } +} + + +std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session, + const qpid::messaging::Address& address, + const qpid::messaging::Variant::Map& /*options*/) +{ + std::auto_ptr<MessageSink> sink; + if (isQueue(session, address)) { + //TODO: support for auto-created queues as sink + sink = std::auto_ptr<MessageSink>(new QueueSink(address.value)); + } else { + std::string subject; + if (isTopic(session, address, subject)) { + //TODO: support for auto-created exchanges as sink + sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject)); + } else { + if (address.type.empty()) { + throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); + } else { + throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type)); + } + } + } + return sink; +} + +QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) : + name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {} + +void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) +{ + session.messageSubscribe(arg::queue=name, + arg::destination=destination, + arg::acceptMode=acceptMode, + arg::acquireMode=acquireMode, + arg::exclusive=exclusive, + arg::arguments=options); +} + +void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination) +{ + session.messageCancel(destination); +} + +Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions) + : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE), + queueOptions(qOptions), subscriptionOptions(sOptions) {} + +void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options) +{ + bindings.push_back(Binding(exchange, key, options)); +} + +void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination) +{ + if (name.empty()) { + //TODO: use same scheme as JMS client for subscription queue name generation? + queue = session.getId().getName() + destination; + } else { + queue = name; + } + session.queueDeclare(arg::queue=queue, arg::exclusive=true, + arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions); + for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { + session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options); + } + AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; + session.messageSubscribe(arg::queue=queue, arg::destination=destination, + arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); +} + +void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) +{ + session.messageCancel(destination); + session.queueDelete(arg::queue=queue); +} + +Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): + exchange(e), key(k), options(o) {} + +Subscription::SubscriptionMode Subscription::getMode(const std::string& s) +{ + if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE; + else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE; + else if (s == DURABLE_SUBSCRIPTION) return DURABLE; + else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s)); +} + +void convert(qpid::messaging::Message& from, qpid::client::Message& to); + +Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject, + bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) : + name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {} + +void Exchange::declare(qpid::client::AsyncSession& session, const std::string&) +{ + //TODO: should this really by synchronous? want to get error if not valid... + if (passive) { + sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); + } else { + sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options); + } +} + +void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m) +{ + qpid::client::Message message; + convert(m, message); + if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { + message.getDeliveryProperties().setRoutingKey(defaultSubject); + } + session.messageTransfer(arg::destination=name, arg::content=message); +} + +void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {} + +QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive, + bool _autoDelete, bool _durable, const FieldTable& _options) : + name(_name), passive(_passive), exclusive(_exclusive), + autoDelete(_autoDelete), durable(_durable), options(_options) {} + +void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) +{ + //TODO: should this really by synchronous? + if (passive) { + sync(session).queueDeclare(arg::queue=name, arg::passive=true); + } else { + sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable, + arg::autoDelete=autoDelete, arg::arguments=options); + } +} +void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m) +{ + qpid::client::Message message; + convert(m, message); + message.getDeliveryProperties().setRoutingKey(name); + session.messageTransfer(arg::content=message); +} + +void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} + +template <class T> void encode(qpid::messaging::Message& from) +{ + T codec; + from.encode(codec); + from.setContentType(T::contentType); +} + +void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp + +void convert(qpid::messaging::Message& from, qpid::client::Message& to) +{ + //TODO: need to avoid copying as much as possible + if (from.getContent().isList()) encode<ListCodec>(from); + if (from.getContent().isMap()) encode<MapCodec>(from); + to.setData(from.getBytes()); + to.getDeliveryProperties().setRoutingKey(from.getSubject()); + //TODO: set other delivery properties + to.getMessageProperties().setContentType(from.getContentType()); + const Address& address = from.getReplyTo(); + if (!address.value.empty()) { + to.getMessageProperties().setReplyTo(AddressResolution::convert(address)); + } + translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders()); + //TODO: set other message properties +} + +Address AddressResolution::convert(const qpid::framing::ReplyTo& rt) +{ + if (rt.getExchange().empty()) { + if (rt.getRoutingKey().empty()) { + return Address();//empty address + } else { + return Address(rt.getRoutingKey(), QUEUE_ADDRESS); + } + } else { + if (rt.getRoutingKey().empty()) { + return Address(rt.getExchange(), TOPIC_ADDRESS); + } else { + return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT); + } + } +} + +qpid::framing::ReplyTo AddressResolution::convert(const Address& address) +{ + if (address.type == QUEUE_ADDRESS || address.type.empty()) { + return ReplyTo(EMPTY_STRING, address.value); + } else if (address.type == TOPIC_ADDRESS) { + return ReplyTo(address.value, EMPTY_STRING); + } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { + //need to split the value + string::size_type i = address.value.find(DIVIDER); + if (i != string::npos) { + std::string exchange = address.value.substr(0, i); + std::string routingKey; + if (i+1 < address.value.size()) { + routingKey = address.value.substr(i+1); + } + return ReplyTo(exchange, routingKey); + } else { + return ReplyTo(address.value, EMPTY_STRING); + } + } else { + QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type); + //treat as queue + return ReplyTo(EMPTY_STRING, address.value); + } +} + +bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) +{ + return address.type == QUEUE_ADDRESS || + (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value); +} + +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject) +{ + if (address.type.empty()) { + return !session.exchangeQuery(address.value).getNotFound(); + } else if (address.type == TOPIC_ADDRESS) { + return true; + } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { + string::size_type i = address.value.find(DIVIDER); + if (i != string::npos) { + std::string exchange = address.value.substr(0, i); + if (i+1 < address.value.size()) { + subject = address.value.substr(i+1); + } + } + return true; + } else { + return false; + } +} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/cpp/src/qpid/client/amqp0_10/AddressResolution.h new file mode 100644 index 0000000000..87758abe6d --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H +#define QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Variant.h" +#include "qpid/client/Session.h" + +namespace qpid { + +namespace framing{ +class ReplyTo; +} + +namespace messaging { +class Address; +class Filter; +} + +namespace client { +namespace amqp0_10 { + +class MessageSource; +class MessageSink; + +/** + * Maps from a generic Address and optional Filter to an AMQP 0-10 + * MessageSource which will then be used by a ReceiverImpl instance + * created for the address. + */ +class AddressResolution +{ + public: + std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session, + const qpid::messaging::Address& address, + const qpid::messaging::Filter* filter, + const qpid::messaging::Variant::Map& options); + + std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session, + const qpid::messaging::Address& address, + const qpid::messaging::Variant::Map& options); + + static qpid::messaging::Address convert(const qpid::framing::ReplyTo&); + static qpid::framing::ReplyTo convert(const qpid::messaging::Address&); + + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_ADDRESSRESOLUTION_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/Codecs.cpp b/cpp/src/qpid/client/amqp0_10/Codecs.cpp new file mode 100644 index 0000000000..9aee3118fe --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/Codecs.cpp @@ -0,0 +1,299 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/messaging/Variant.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/List.h" +#include <algorithm> +#include <functional> + +using namespace qpid::framing; +using namespace qpid::messaging; + +namespace qpid { +namespace client { +namespace amqp0_10 { + +namespace { +const std::string iso885915("iso-8859-15"); +const std::string utf8("utf8"); +const std::string utf16("utf16"); +const std::string amqp0_10_binary("amqp0-10:binary"); +const std::string amqp0_10_bit("amqp0-10:bit"); +const std::string amqp0_10_datetime("amqp0-10:datetime"); +const std::string amqp0_10_struct("amqp0-10:struct"); +} + +template <class T, class U, class F> void convert(const T& from, U& to, F f) +{ + std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f); +} + +Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in); +FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in); +Variant toVariant(boost::shared_ptr<FieldValue> in); +boost::shared_ptr<FieldValue> toFieldValue(const Variant& in); + +template <class T, class U, class F> void translate(boost::shared_ptr<FieldValue> in, U& u, F f) +{ + T t; + getEncodedValue<T>(in, t); + convert(t, u, f); +} + +template <class T, class U, class F> T* toFieldValueCollection(const U& u, F f) +{ + typename T::ValueType t; + convert(u, t, f); + return new T(t); +} + +FieldTableValue* toFieldTableValue(const Variant::Map& map) +{ + FieldTable ft; + convert(map, ft, &toFieldTableEntry); + return new FieldTableValue(ft); +} + +ListValue* toListValue(const Variant::List& list) +{ + List l; + convert(list, l, &toFieldValue); + return new ListValue(l); +} + +void setEncodingFor(Variant& out, uint8_t code) +{ + switch(code){ + case 0x80: + case 0x90: + case 0xa0: + out.setEncoding(amqp0_10_binary); + break; + case 0x84: + case 0x94: + out.setEncoding(iso885915); + break; + case 0x85: + case 0x95: + out.setEncoding(utf8); + break; + case 0x86: + case 0x96: + out.setEncoding(utf16); + break; + case 0xab: + out.setEncoding(amqp0_10_struct); + break; + default: + //do nothing + break; + } +} + +Variant toVariant(boost::shared_ptr<FieldValue> in) +{ + Variant out; + //based on AMQP 0-10 typecode, pick most appropriate variant type + switch (in->getType()) { + //Fixed Width types: + case 0x01: out.setEncoding(amqp0_10_binary); + case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; + case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; + case 0x04: break; //TODO: iso-8859-15 char + case 0x08: out = in->getIntegerValue<bool, 1>(); break; + case 0x010: out.setEncoding(amqp0_10_binary); + case 0x011: out = in->getIntegerValue<int16_t, 2>(); break; + case 0x012: out = in->getIntegerValue<uint16_t, 2>(); break; + case 0x020: out.setEncoding(amqp0_10_binary); + case 0x021: out = in->getIntegerValue<int32_t, 4>(); break; + case 0x022: out = in->getIntegerValue<uint32_t, 4>(); break; + case 0x023: out = in->get<float>(); break; + case 0x027: break; //TODO: utf-32 char + case 0x030: out.setEncoding(amqp0_10_binary); + case 0x031: out = in->getIntegerValue<int64_t, 8>(); break; + case 0x038: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding + case 0x032: out = in->getIntegerValue<uint64_t, 8>(); break; + case 0x033:out = in->get<double>(); break; + + //TODO: figure out whether and how to map values with codes 0x40-0xd8 + + case 0xf0: break;//void, which is the default value for Variant + case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant + + //Variable Width types: + //strings: + case 0x80: + case 0x84: + case 0x85: + case 0x86: + case 0x90: + case 0x94: + case 0x95: + case 0x96: + case 0xa0: + case 0xab: + setEncodingFor(out, in->getType()); + out = in->get<std::string>(); + break; + + case 0xa8: + out = Variant::Map(); + translate<FieldTable>(in, out.asMap(), &toVariantMapEntry); + break; + + case 0xa9: + out = Variant::List(); + translate<List>(in, out.asList(), &toVariant); + break; + case 0xaa: //convert amqp0-10 array into variant list + out = Variant::List(); + translate<Array>(in, out.asList(), &toVariant); + break; + + default: + //error? + break; + } + return out; +} + +boost::shared_ptr<FieldValue> toFieldValue(const Variant& in) +{ + boost::shared_ptr<FieldValue> out; + switch (in.getType()) { + case VOID: out = boost::shared_ptr<FieldValue>(new VoidValue()); break; + case BOOL: out = boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); break; + case UINT8: out = boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); break; + case UINT16: out = boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); break; + case UINT32: out = boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); break; + case UINT64: out = boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); break; + case INT8: out = boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); break; + case INT16: out = boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); break; + case INT32: out = boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); break; + case INT64: out = boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); break; + case FLOAT: out = boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); break; + case DOUBLE: out = boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); break; + //TODO: check encoding (and length?) when deciding what AMQP type to treat string as + case STRING: out = boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); break; + case MAP: + //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<FieldTableValue>(in.asMap(), &toFieldTableEntry)); + out = boost::shared_ptr<FieldValue>(toFieldTableValue(in.asMap())); + break; + case LIST: + //out = boost::shared_ptr<FieldValue>(toFieldValueCollection<ListValue>(in.asList(), &toFieldValue)); + out = boost::shared_ptr<FieldValue>(toListValue(in.asList())); + break; + } + return out; +} + +Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in) +{ + return Variant::Map::value_type(in.first, toVariant(in.second)); +} + +FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in) +{ + return FieldTable::value_type(in.first, toFieldValue(in.second)); +} + +struct EncodeBuffer +{ + char* data; + Buffer buffer; + + EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {} + ~EncodeBuffer() { delete[] data; } + + template <class T> void encode(T& t) { t.encode(buffer); } + + void getData(std::string& s) { + s.assign(data, buffer.getSize()); + } +}; + +struct DecodeBuffer +{ + Buffer buffer; + + DecodeBuffer(const std::string& s) : buffer(const_cast<char*>(s.data()), s.size()) {} + + template <class T> void decode(T& t) { t.decode(buffer); } + +}; + +template <class T, class U, class F> void _encode(const U& value, std::string& data, F f) +{ + T t; + convert(value, t, f); + EncodeBuffer buffer(t.encodedSize()); + buffer.encode(t); + buffer.getData(data); +} + +template <class T, class U, class F> void _decode(const std::string& data, U& value, F f) +{ + T t; + DecodeBuffer buffer(data); + buffer.decode(t); + convert(t, value, f); +} + +void MapCodec::encode(const Variant& value, std::string& data) +{ + _encode<FieldTable>(value.asMap(), data, &toFieldTableEntry); +} + +void MapCodec::decode(const std::string& data, Variant& value) +{ + value = Variant::Map(); + _decode<FieldTable>(data, value.asMap(), &toVariantMapEntry); +} + +void ListCodec::encode(const Variant& value, std::string& data) +{ + _encode<List>(value.asList(), data, &toFieldValue); +} + +void ListCodec::decode(const std::string& data, Variant& value) +{ + value = Variant::List(); + _decode<List>(data, value.asList(), &toVariant); +} + +void translate(const Variant::Map& from, FieldTable& to) +{ + convert(from, to, &toFieldTableEntry); +} + +void translate(const FieldTable& from, Variant::Map& to) +{ + convert(from, to, &toVariantMapEntry); +} + +const std::string ListCodec::contentType("amqp0_10/list"); +const std::string MapCodec::contentType("amqp0_10/map"); + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp b/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp new file mode 100644 index 0000000000..52b623b65c --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "CompletionTracker.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::framing::SequenceNumber; + +void CompletionTracker::track(SequenceNumber command, void* token) +{ + tokens[command] = token; +} + +void CompletionTracker::completedTo(SequenceNumber command) +{ + Tokens::iterator i = tokens.lower_bound(command); + if (i != tokens.end()) { + lastCompleted = i->second; + tokens.erase(tokens.begin(), ++i); + } +} + +void* CompletionTracker::getLastCompletedToken() +{ + return lastCompleted; +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/CompletionTracker.h b/cpp/src/qpid/client/amqp0_10/CompletionTracker.h new file mode 100644 index 0000000000..6147c5682e --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/CompletionTracker.h @@ -0,0 +1,50 @@ +#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H +#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumber.h" +#include <map> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Provides a mapping from command ids to application supplied + * 'tokens', and is used to determine when the sending or + * acknowledging of a specific message is complete. + */ +class CompletionTracker +{ + public: + void track(qpid::framing::SequenceNumber command, void* token); + void completedTo(qpid::framing::SequenceNumber command); + void* getLastCompletedToken(); + private: + typedef std::map<qpid::framing::SequenceNumber, void*> Tokens; + Tokens tokens; + void* lastCompleted; +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp new file mode 100644 index 0000000000..9f738731e2 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionImpl.h" +#include "SessionImpl.h" +#include "qpid/messaging/Session.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Variant; + +template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) +{ + Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + value = (T) i->second; + } +} + +void convert(const Variant::Map& from, ConnectionSettings& to) +{ + setIfFound(from, "username", to.username); + setIfFound(from, "password", to.password); + setIfFound(from, "sasl-mechanism", to.mechanism); + setIfFound(from, "sasl-service", to.service); + setIfFound(from, "sasl-min-ssf", to.minSsf); + setIfFound(from, "sasl-max-ssf", to.maxSsf); + + setIfFound(from, "heartbeat", to.heartbeat); + setIfFound(from, "tcp-nodelay", to.tcpNoDelay); + + setIfFound(from, "locale", to.locale); + setIfFound(from, "max-channels", to.maxChannels); + setIfFound(from, "max-frame-size", to.maxFrameSize); + setIfFound(from, "bounds", to.bounds); +} + +ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) +{ + QPID_LOG(debug, "Opening connection to " << url << " with " << options); + Url u(url); + ConnectionSettings settings; + convert(options, settings); + connection.open(u, settings); +} + +void ConnectionImpl::close() +{ + connection.close(); +} + +qpid::messaging::Session ConnectionImpl::newSession() +{ + qpid::messaging::Session impl(new SessionImpl(connection.newSession())); + return impl; +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h new file mode 100644 index 0000000000..120a8ab9d8 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -0,0 +1,43 @@ +#ifndef QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H +#define QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/messaging/Variant.h" +#include "qpid/client/Connection.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class ConnectionImpl : public qpid::messaging::ConnectionImpl +{ + public: + ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options); + void close(); + qpid::messaging::Session newSession(); + private: + qpid::client::Connection connection; +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_CONNECTIONIMPL_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp new file mode 100644 index 0000000000..83e1b48bed --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -0,0 +1,241 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/IncomingMessages.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Variant.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/enum.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using namespace qpid::framing; +using namespace qpid::framing::message; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::messaging::Variant; + +namespace { +const std::string EMPTY_STRING; + + +struct GetNone : IncomingMessages::Handler +{ + bool accept(IncomingMessages::MessageTransfer&) { return false; } +}; + +struct GetAny : IncomingMessages::Handler +{ + bool accept(IncomingMessages::MessageTransfer& transfer) + { + transfer.retrieve(0); + return true; + } +}; + +struct MatchAndTrack +{ + const std::string destination; + SequenceSet ids; + + MatchAndTrack(const std::string& d) : destination(d) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ids.add(command->getId()); + return true; + } else { + return false; + } + } +}; +} + +IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) : + session(s), + incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {} + +bool IncomingMessages::get(Handler& handler, Duration timeout) +{ + //search through received list for any transfer of interest: + for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) + { + MessageTransfer transfer(*i, *this); + if (handler.accept(transfer)) { + received.erase(i); + return true; + } + } + //none found, check incoming: + return process(&handler, timeout); +} + +void IncomingMessages::accept() +{ + session.messageAccept(unaccepted); + unaccepted.clear(); +} + +void IncomingMessages::releaseAll() +{ + //first process any received messages... + while (!received.empty()) { + retrieve(received.front(), 0); + received.pop_front(); + } + //then pump out any available messages from incoming queue... + GetAny handler; + while (process(&handler, 0)); + //now release all messages + session.messageRelease(unaccepted); + unaccepted.clear(); +} + +void IncomingMessages::releasePending(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0)); + + //now remove all messages for this destination from received list, recording their ids... + MatchAndTrack match(destination); + for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i); + //now release those messages + session.messageRelease(match.ids); +} + +/** + * Get a frameset from session queue, waiting for up to the specified + * duration and returning true if this could be achieved, false + * otherwise. If a destination is supplied, only return a message for + * that destination. In this case messages from other destinations + * will be held on a received queue. + */ +bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) +{ + AbsTime deadline(AbsTime::now(), duration); + FrameSet::shared_ptr content; + for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + if (content->isA<MessageTransferBody>()) { + MessageTransfer transfer(content, *this); + if (handler && handler->accept(transfer)) { + QPID_LOG(debug, "Delivered " << *content->getMethod()); + return true; + } else { + //received message for another destination, keep for later + QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + received.push_back(content); + } + } else { + //TODO: handle other types of commands (e.g. message-accept, message-flow etc) + } + } + return false; +} + +void populate(qpid::messaging::Message& message, FrameSet& command); + +/** + * Called when message is retrieved; records retrieval for subsequent + * acceptance, marks the command as completed and converts command to + * message if message is required + */ +void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message) +{ + if (message) { + populate(*message, *command); + } + const MessageTransferBody* transfer = command->as<MessageTransferBody>(); + if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { + unaccepted.add(command->getId()); + } + session.markCompleted(command->getId(), false, false); +} + +IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {} + +const std::string& IncomingMessages::MessageTransfer::getDestination() +{ + return content->as<MessageTransferBody>()->getDestination(); +} +void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message) +{ + parent.retrieve(content, message); +} + +void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp + +void populateHeaders(qpid::messaging::Message& message, + const DeliveryProperties* deliveryProperties, + const MessageProperties* messageProperties) +{ + if (deliveryProperties) { + message.setSubject(deliveryProperties->getRoutingKey()); + //TODO: convert other delivery properties + } + if (messageProperties) { + message.setContentType(messageProperties->getContentType()); + if (messageProperties->hasReplyTo()) { + message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo())); + } + translate(messageProperties->getApplicationHeaders(), message.getHeaders()); + //TODO: convert other message properties + } +} + +void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers) +{ + populateHeaders(message, headers->get<DeliveryProperties>(), headers->get<MessageProperties>()); +} + +void populate(qpid::messaging::Message& message, FrameSet& command) +{ + //need to be able to link the message back to the transfer it was delivered by + //e.g. for rejecting. TODO: hide this from API + uint32_t commandId = command.getId(); + message.setInternalId(reinterpret_cast<void*>(commandId)); + + command.getContent(message.getBytes()); + + populateHeaders(message, command.getHeaders()); + + //decode content if necessary + if (message.getContentType() == ListCodec::contentType) { + ListCodec codec; + message.decode(codec); + } else if (message.getContentType() == MapCodec::contentType) { + MapCodec codec; + message.decode(codec); + } +} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h new file mode 100644 index 0000000000..c4346fd7d7 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -0,0 +1,91 @@ +#ifndef QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H +#define QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/sys/BlockingQueue.h" +#include "qpid/sys/Time.h" + +namespace qpid { + +namespace framing{ +class FrameSet; +} + +namespace messaging { +class Message; +} + +namespace client { +namespace amqp0_10 { + +/** + * + */ +class IncomingMessages +{ + public: + typedef boost::shared_ptr<qpid::framing::FrameSet> FrameSetPtr; + class MessageTransfer + { + public: + const std::string& getDestination(); + void retrieve(qpid::messaging::Message* message); + private: + FrameSetPtr content; + IncomingMessages& parent; + + MessageTransfer(FrameSetPtr, IncomingMessages&); + friend class IncomingMessages; + }; + + struct Handler + { + virtual ~Handler() {} + virtual bool accept(MessageTransfer& transfer) = 0; + }; + + IncomingMessages(qpid::client::AsyncSession session); + bool get(Handler& handler, qpid::sys::Duration timeout); + //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); + //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout); + void accept(); + void releaseAll(); + void releasePending(const std::string& destination); + private: + typedef std::deque<FrameSetPtr> FrameSetQueue; + + qpid::client::AsyncSession session; + qpid::framing::SequenceSet unaccepted; + boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming; + FrameSetQueue received; + + bool process(Handler*, qpid::sys::Duration); + void retrieve(FrameSetPtr, qpid::messaging::Message*); + +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_INCOMINGMESSAGES_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/MessageSink.h b/cpp/src/qpid/client/amqp0_10/MessageSink.h new file mode 100644 index 0000000000..19d5e4ef82 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/MessageSink.h @@ -0,0 +1,50 @@ +#ifndef QPID_CLIENT_AMQP0_10_MESSAGESINK_H +#define QPID_CLIENT_AMQP0_10_MESSAGESINK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/AsyncSession.h" + +namespace qpid { + +namespace messaging { +class Message; +} + +namespace client { +namespace amqp0_10 { + +/** + * + */ +class MessageSink +{ + public: + virtual ~MessageSink() {} + virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0; + virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0; + virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0; + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESINK_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/MessageSource.h b/cpp/src/qpid/client/amqp0_10/MessageSource.h new file mode 100644 index 0000000000..74f2732f59 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/MessageSource.h @@ -0,0 +1,47 @@ +#ifndef QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H +#define QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/client/AsyncSession.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Abstraction behind which the AMQP 0-10 commands required to + * establish (and tear down) an incoming stream of messages from a + * given address are hidden. + */ +class MessageSource +{ + public: + virtual ~MessageSource() {} + virtual void subscribe(qpid::client::AsyncSession& session, const std::string& destination) = 0; + virtual void cancel(qpid::client::AsyncSession& session, const std::string& destination) = 0; + + private: +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_MESSAGESOURCE_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp new file mode 100644 index 0000000000..e6ed4bfc4e --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReceiverImpl.h" +#include "MessageSource.h" +#include "SessionImpl.h" +#include "qpid/messaging/MessageListener.h" +#include "qpid/messaging/Receiver.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Receiver; + +void ReceiverImpl::received(qpid::messaging::Message&) +{ + //TODO: should this be configurable + if (capacity && --window <= capacity/2) { + session.sendCompletion(); + window = capacity; + } +} + +bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + return parent.get(*this, message, timeout); +} + +qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) +{ + qpid::messaging::Message result; + if (!get(result, timeout)) throw Receiver::NoMessageAvailable(); + return result; +} + +bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + if (capacity == 0 && !cancelled) { + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); + if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); + } + + if (get(message, timeout)) { + return true; + } else { + if (!cancelled) { + sync(session).messageFlush(destination); + start();//reallocate credit + } + return get(message, 0); + } +} + +qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); + return result; +} + +void ReceiverImpl::cancel() +{ + if (!cancelled) { + //TODO: should syncronicity be an optional argument to this call? + source->cancel(session, destination); + //need to be sure cancel is complete and all incoming + //framesets are processed before removing the receiver + parent.receiverCancelled(destination); + cancelled = true; + } +} + +void ReceiverImpl::start() +{ + if (!cancelled) { + started = true; + session.messageSetFlowMode(destination, capacity > 0); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity); + session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); + window = capacity; + } +} + +void ReceiverImpl::stop() +{ + session.messageStop(destination); + started = false; +} + +void ReceiverImpl::subscribe() +{ + source->subscribe(session, destination); +} + +void ReceiverImpl::setSession(qpid::client::AsyncSession s) +{ + session = s; + if (!cancelled) { + subscribe(); + //if we were in started state before the session was changed, + //start again on this new session + //TODO: locking if receiver is to be threadsafe... + if (started) start(); + } +} + +void ReceiverImpl::setCapacity(uint32_t c) +{ + if (c != capacity) { + capacity = c; + if (!cancelled && started) { + stop(); + start(); + } + } +} + +void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } +qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } + +const std::string& ReceiverImpl::getName() const { return destination; } + +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) : + parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF), + capacity(0), started(false), cancelled(false), listener(0), window(0) {} + + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h new file mode 100644 index 0000000000..b549242d35 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H +#define QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Message.h" +#include "qpid/messaging/ReceiverImpl.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/sys/Time.h" +#include <memory> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class MessageSource; +class SessionImpl; + +/** + * A receiver implementation based on an AMQP 0-10 subscription. + */ +class ReceiverImpl : public qpid::messaging::ReceiverImpl +{ + public: + + ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source); + + bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); + qpid::messaging::Message get(qpid::sys::Duration timeout); + bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); + qpid::messaging::Message fetch(qpid::sys::Duration timeout); + void cancel(); + void start(); + void stop(); + void subscribe(); + void setSession(qpid::client::AsyncSession s); + const std::string& getName() const; + void setCapacity(uint32_t); + void setListener(qpid::messaging::MessageListener* listener); + qpid::messaging::MessageListener* getListener(); + void received(qpid::messaging::Message& message); + private: + SessionImpl& parent; + const std::auto_ptr<MessageSource> source; + const std::string destination; + const uint32_t byteCredit; + + uint32_t capacity; + qpid::client::AsyncSession session; + bool started; + bool cancelled; + qpid::messaging::MessageListener* listener; + uint32_t window; +}; + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_RECEIVERIMPL_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp new file mode 100644 index 0000000000..ac36eb1537 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderImpl.h" +#include "MessageSink.h" +#include "SessionImpl.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) : + parent(_parent), name(_name), sink(_sink) {} + +void SenderImpl::send(qpid::messaging::Message& m) +{ + sink->send(session, name, m); +} + +void SenderImpl::cancel() +{ + sink->cancel(session, name); + parent.senderCancelled(name); +} + +void SenderImpl::setSession(qpid::client::AsyncSession s) +{ + session = s; + sink->declare(session, name); +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h new file mode 100644 index 0000000000..e737450ba1 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -0,0 +1,58 @@ +#ifndef QPID_CLIENT_AMQP0_10_SENDERIMPL_H +#define QPID_CLIENT_AMQP0_10_SENDERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Message.h" +#include "qpid/messaging/SenderImpl.h" +#include "qpid/client/AsyncSession.h" +#include <memory> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +class MessageSink; +class SessionImpl; + +/** + * + */ +class SenderImpl : public qpid::messaging::SenderImpl +{ + public: + SenderImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSink> sink); + void send(qpid::messaging::Message&); + void cancel(); + void setSession(qpid::client::AsyncSession); + + private: + SessionImpl& parent; + const std::string name; + std::auto_ptr<MessageSink> sink; + + qpid::client::AsyncSession session; + std::string destination; + std::string routingKey; +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_SENDERIMPL_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp new file mode 100644 index 0000000000..647ace5f92 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/SessionImpl.h" +#include "qpid/client/amqp0_10/ReceiverImpl.h" +#include "qpid/client/amqp0_10/SenderImpl.h" +#include "qpid/client/amqp0_10/MessageSource.h" +#include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/PrivateImplRef.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Filter.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageListener.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Session.h" +#include "qpid/framing/reply_exceptions.h" +#include <boost/format.hpp> +#include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> + +using qpid::messaging::Filter; +using qpid::messaging::Sender; +using qpid::messaging::Receiver; +using qpid::messaging::VariantMap; + +namespace qpid { +namespace client { +namespace amqp0_10 { + +SessionImpl::SessionImpl(qpid::client::Session s) : session(s), incoming(session) {} + + +void SessionImpl::commit() +{ + qpid::sys::Mutex::ScopedLock l(lock); + incoming.accept(); + session.txCommit(); +} + +void SessionImpl::rollback() +{ + qpid::sys::Mutex::ScopedLock l(lock); + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop(); + //ensure that stop has been processed and all previously sent + //messages are available for release: + session.sync(); + incoming.releaseAll(); + session.txRollback(); + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start(); +} + +void SessionImpl::acknowledge() +{ + qpid::sys::Mutex::ScopedLock l(lock); + incoming.accept(); +} + +void SessionImpl::reject(qpid::messaging::Message& m) +{ + qpid::sys::Mutex::ScopedLock l(lock); + //TODO: how do I get the id of the original transfer command? think this through some more... + SequenceNumber id(reinterpret_cast<uint32_t>(m.getInternalId())); + SequenceSet set; + set.add(id); + session.messageReject(set); +} + +void SessionImpl::close() +{ + session.close(); +} + +void translate(const VariantMap& options, SubscriptionSettings& settings) +{ + //TODO: fill this out + VariantMap::const_iterator i = options.find("auto_acknowledge"); + if (i != options.end()) { + settings.autoAck = i->second.asInt32(); + } +} + +template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t) +{ + return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t)); +} + +template <class T> void getFreeKey(std::string& key, T& map) +{ + std::string name = key; + int count = 1; + for (typename T::const_iterator i = map.find(name); i != map.end(); i = map.find(name)) { + name = (boost::format("%1%_%2%") % key % ++count).str(); + } + key = name; +} + +Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options) +{ + qpid::sys::Mutex::ScopedLock l(lock); + std::auto_ptr<MessageSink> sink = resolver.resolveSink(session, address, options); + std::string name = address; + getFreeKey(name, senders); + Sender sender(new SenderImpl(*this, name, sink)); + getImplPtr<Sender, SenderImpl>(sender)->setSession(session); + senders[name] = sender; + return sender; +} +Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options) +{ + return addReceiver(address, 0, options); +} +Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const Filter& filter, const VariantMap& options) +{ + return addReceiver(address, &filter, options); +} + +Receiver SessionImpl::addReceiver(const qpid::messaging::Address& address, const Filter* filter, const VariantMap& options) +{ + qpid::sys::Mutex::ScopedLock l(lock); + std::auto_ptr<MessageSource> source = resolver.resolveSource(session, address, filter, options); + std::string name = address; + getFreeKey(name, receivers); + Receiver receiver(new ReceiverImpl(*this, name, source)); + getImplPtr<Receiver, ReceiverImpl>(receiver)->setSession(session); + receivers[name] = receiver; + return receiver; +} + +qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName) +{ + std::string name = baseName + std::string("_") + session.getId().getName(); + session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); + return qpid::messaging::Address(name); +} + +SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) +{ + boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); + if (!impl) { + throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); + } + return *impl; +} + +namespace { + +struct IncomingMessageHandler : IncomingMessages::Handler +{ + typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback; + Callback callback; + + IncomingMessageHandler(Callback c) : callback(c) {} + + bool accept(IncomingMessages::MessageTransfer& transfer) + { + return callback(transfer); + } +}; + +} + +bool SessionImpl::accept(ReceiverImpl* receiver, + qpid::messaging::Message* message, + bool isDispatch, + IncomingMessages::MessageTransfer& transfer) +{ + if (receiver->getName() == transfer.getDestination()) { + transfer.retrieve(message); + if (isDispatch) { + qpid::sys::Mutex::ScopedUnlock u(lock); + qpid::messaging::MessageListener* listener = receiver->getListener(); + if (listener) listener->received(*message); + } + receiver->received(*message); + return true; + } else { + return false; + } +} + +bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, IncomingMessages::MessageTransfer& transfer) +{ + Receivers::iterator i = receivers.find(transfer.getDestination()); + if (i == receivers.end()) { + QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); + return false; + } else { + boost::intrusive_ptr<ReceiverImpl> receiver = getImplPtr<Receiver, ReceiverImpl>(i->second); + return receiver && (!isDispatch || receiver->getListener()) && accept(receiver.get(), message, isDispatch, transfer); + } +} + +bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout) +{ + qpid::sys::Mutex::ScopedLock l(lock); + return incoming.get(handler, timeout); +} + +bool SessionImpl::dispatch(qpid::sys::Duration timeout) +{ + qpid::messaging::Message message; + IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1)); + return getIncoming(handler, timeout); +} + +bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1)); + return getIncoming(handler, timeout); +} + +bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1)); + return getIncoming(handler, timeout); +} + +qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); + return result; +} + +void SessionImpl::receiverCancelled(const std::string& name) +{ + { + qpid::sys::Mutex::ScopedLock l(lock); + receivers.erase(name); + } + session.sync(); + incoming.releasePending(name); +} + +void SessionImpl::senderCancelled(const std::string& name) +{ + qpid::sys::Mutex::ScopedLock l(lock); + senders.erase(name); +} + +void SessionImpl::sync() +{ + session.sync(); +} + +void SessionImpl::flush() +{ + session.flush(); +} + +void* SessionImpl::getLastConfirmedSent() +{ + return 0; +} + +void* SessionImpl::getLastConfirmedAcknowledged() +{ + return 0; +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h new file mode 100644 index 0000000000..6926fb0235 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -0,0 +1,107 @@ +#ifndef QPID_CLIENT_AMQP0_10_SESSIONIMPL_H +#define QPID_CLIENT_AMQP0_10_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/Variant.h" +#include "qpid/client/Session.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/IncomingMessages.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { + +namespace messaging { +class Address; +class Filter; +class Message; +class Receiver; +class Sender; +class Session; +} + +namespace client { +namespace amqp0_10 { + +class ReceiverImpl; +class SenderImpl; + +/** + * Implementation of the protocol independent Session interface using + * AMQP 0-10. + */ +class SessionImpl : public qpid::messaging::SessionImpl +{ + public: + SessionImpl(qpid::client::Session); + void commit(); + void rollback(); + void acknowledge(); + void reject(qpid::messaging::Message&); + void close(); + void sync(); + void flush(); + qpid::messaging::Address createTempQueue(const std::string& baseName); + qpid::messaging::Sender createSender(const qpid::messaging::Address& address, + const qpid::messaging::VariantMap& options); + qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, + const qpid::messaging::VariantMap& options); + qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, + const qpid::messaging::Filter& filter, + const qpid::messaging::VariantMap& options); + + void* getLastConfirmedSent(); + void* getLastConfirmedAcknowledged(); + + bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); + qpid::messaging::Message fetch(qpid::sys::Duration timeout); + bool dispatch(qpid::sys::Duration timeout); + + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout); + + void receiverCancelled(const std::string& name); + void senderCancelled(const std::string& name); + + static SessionImpl& convert(qpid::messaging::Session&); + + qpid::client::Session session; + private: + typedef std::map<std::string, qpid::messaging::Receiver> Receivers; + typedef std::map<std::string, qpid::messaging::Sender> Senders; + + qpid::sys::Mutex lock; + AddressResolution resolver; + IncomingMessages incoming; + Receivers receivers; + Senders senders; + + qpid::messaging::Receiver addReceiver(const qpid::messaging::Address& address, + const qpid::messaging::Filter* filter, + const qpid::messaging::VariantMap& options); + bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); + bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); + bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_SESSIONIMPL_H*/ |
