diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/messaging | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/messaging')
41 files changed, 4486 insertions, 49 deletions
diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index bd90aa54a7..fde931038b 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ #include "qpid/messaging/Session.h" #include "qpid/messaging/SessionImpl.h" #include "qpid/messaging/PrivateImplRef.h" +#include "qpid/messaging/ProtocolRegistry.h" #include "qpid/client/amqp0_10/ConnectionImpl.h" #include "qpid/log/Statement.h" @@ -40,22 +41,32 @@ Connection& Connection::operator=(const Connection& c) { return PI::assign(*this Connection::~Connection() { PI::dtor(*this); } Connection::Connection(const std::string& url, const std::string& o) -{ +{ Variant::Map options; AddressParser parser(o); if (o.empty() || parser.parseMap(options)) { - PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + ConnectionImpl* impl = ProtocolRegistry::create(url, options); + if (impl) { + PI::ctor(*this, impl); + } else { + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + } } else { throw InvalidOptionString("Invalid option string: " + o); } } Connection::Connection(const std::string& url, const Variant::Map& options) { - PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + ConnectionImpl* impl = ProtocolRegistry::create(url, options); + if (impl) { + PI::ctor(*this, impl); + } else { + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + } } Connection::Connection() -{ +{ Variant::Map options; std::string url = "amqp:tcp:127.0.0.1:5672"; PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); @@ -67,12 +78,12 @@ bool Connection::isOpen() const { return impl->isOpen(); } void Connection::close() { impl->close(); } Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); } Session Connection::createTransactionalSession(const std::string& name) -{ +{ return impl->newSession(true, name); } Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } void Connection::setOption(const std::string& name, const Variant& value) -{ +{ impl->setOption(name, value); } std::string Connection::getAuthenticatedUsername() diff --git a/cpp/src/qpid/messaging/ConnectionOptions.cpp b/cpp/src/qpid/messaging/ConnectionOptions.cpp new file mode 100644 index 0000000000..ecd5ba9693 --- /dev/null +++ b/cpp/src/qpid/messaging/ConnectionOptions.cpp @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ConnectionOptions.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include <algorithm> +#include <limits> + +namespace qpid { +namespace messaging { + +namespace { +double FOREVER(std::numeric_limits<double>::max()); + +double timeValue(const qpid::types::Variant& value) { + if (types::isIntegerType(value.getType())) + return double(value.asInt64()); + return value.asDouble(); +} + +void merge(const std::string& value, std::vector<std::string>& list) { + if (std::find(list.begin(), list.end(), value) == list.end()) + list.push_back(value); +} + +void merge(const qpid::types::Variant::List& from, std::vector<std::string>& to) +{ + for (qpid::types::Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) + merge(i->asString(), to); +} + +} + +ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options) + : replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2), + retries(0), reconnectOnLimitExceeded(true) +{ + for (qpid::types::Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { + set(i->first, i->second); + } +} + +void ConnectionOptions::set(const std::string& name, const qpid::types::Variant& value) +{ + if (name == "reconnect") { + reconnect = value; + } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { + timeout = timeValue(value); + } else if (name == "reconnect-limit" || name == "reconnect_limit") { + limit = value; + } else if (name == "reconnect-interval" || name == "reconnect_interval") { + maxReconnectInterval = minReconnectInterval = timeValue(value); + } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { + minReconnectInterval = timeValue(value); + } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { + maxReconnectInterval = timeValue(value); + } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { + replaceUrls = value.asBool(); + } else if (name == "reconnect-urls" || name == "reconnect_urls") { + if (replaceUrls) urls.clear(); + if (value.getType() == qpid::types::VAR_LIST) { + merge(value.asList(), urls); + } else { + merge(value.asString(), urls); + } + } else if (name == "username") { + username = value.asString(); + } else if (name == "password") { + password = value.asString(); + } else if (name == "sasl-mechanism" || name == "sasl_mechanism" || + name == "sasl-mechanisms" || name == "sasl_mechanisms") { + mechanism = value.asString(); + } else if (name == "sasl-service" || name == "sasl_service") { + service = value.asString(); + } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") { + minSsf = value; + } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") { + maxSsf = value; + } else if (name == "heartbeat") { + heartbeat = value; + } else if (name == "tcp-nodelay" || name == "tcp_nodelay") { + tcpNoDelay = value; + } else if (name == "locale") { + locale = value.asString(); + } else if (name == "max-channels" || name == "max_channels") { + maxChannels = value; + } else if (name == "max-frame-size" || name == "max_frame_size") { + maxFrameSize = value; + } else if (name == "bounds") { + bounds = value; + } else if (name == "transport") { + protocol = value.asString(); + } else if (name == "ssl-cert-name" || name == "ssl_cert_name") { + sslCertName = value.asString(); + } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { + reconnectOnLimitExceeded = value; + } else { + throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); + } +} + +}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/ConnectionOptions.h b/cpp/src/qpid/messaging/ConnectionOptions.h new file mode 100644 index 0000000000..6786fd4a64 --- /dev/null +++ b/cpp/src/qpid/messaging/ConnectionOptions.h @@ -0,0 +1,51 @@ +#ifndef QPID_MESSAGING_CONNECTIONOPTIONS_H +#define QPID_MESSAGING_CONNECTIONOPTIONS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/ConnectionSettings.h" +#include <map> +#include <vector> + +namespace qpid { +namespace types { +class Variant; +} +namespace messaging { + +struct ConnectionOptions : qpid::client::ConnectionSettings +{ + std::vector<std::string> urls; + bool replaceUrls; + bool reconnect; + double timeout; + int32_t limit; + double minReconnectInterval; + double maxReconnectInterval; + int32_t retries; + bool reconnectOnLimitExceeded; + + ConnectionOptions(const std::map<std::string, qpid::types::Variant>&); + void set(const std::string& name, const qpid::types::Variant& value); +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_CONNECTIONOPTIONS_H*/ diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp index ef70c103e9..0f03bc8ca3 100644 --- a/cpp/src/qpid/messaging/Message.cpp +++ b/cpp/src/qpid/messaging/Message.cpp @@ -46,26 +46,26 @@ const std::string& Message::getSubject() const { return impl->getSubject(); } void Message::setContentType(const std::string& s) { impl->setContentType(s); } const std::string& Message::getContentType() const { return impl->getContentType(); } -void Message::setMessageId(const std::string& id) { impl->messageId = id; } -const std::string& Message::getMessageId() const { return impl->messageId; } +void Message::setMessageId(const std::string& id) { impl->setMessageId(id); } +const std::string& Message::getMessageId() const { return impl->getMessageId(); } -void Message::setUserId(const std::string& id) { impl->userId = id; } -const std::string& Message::getUserId() const { return impl->userId; } +void Message::setUserId(const std::string& id) { impl->setUserId(id); } +const std::string& Message::getUserId() const { return impl->getUserId(); } -void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; } -const std::string& Message::getCorrelationId() const { return impl->correlationId; } +void Message::setCorrelationId(const std::string& id) { impl->setCorrelationId(id); } +const std::string& Message::getCorrelationId() const { return impl->getCorrelationId(); } -uint8_t Message::getPriority() const { return impl->priority; } -void Message::setPriority(uint8_t priority) { impl->priority = priority; } +uint8_t Message::getPriority() const { return impl->getPriority(); } +void Message::setPriority(uint8_t priority) { impl->setPriority(priority); } -void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); } -Duration Message::getTtl() const { return Duration(impl->ttl); } +void Message::setTtl(Duration ttl) { impl->setTtl(ttl.getMilliseconds()); } +Duration Message::getTtl() const { return Duration(impl->getTtl()); } -void Message::setDurable(bool durable) { impl->durable = durable; } -bool Message::getDurable() const { return impl->durable; } +void Message::setDurable(bool durable) { impl->setDurable(durable); } +bool Message::getDurable() const { return impl->isDurable(); } -bool Message::getRedelivered() const { return impl->redelivered; } -void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; } +bool Message::getRedelivered() const { return impl->isRedelivered(); } +void Message::setRedelivered(bool redelivered) { impl->setRedelivered(redelivered); } const Variant::Map& Message::getProperties() const { return impl->getHeaders(); } Variant::Map& Message::getProperties() { return impl->getHeaders(); } diff --git a/cpp/src/qpid/messaging/MessageImpl.cpp b/cpp/src/qpid/messaging/MessageImpl.cpp index 0601800e46..fc9bc5dfa1 100644 --- a/cpp/src/qpid/messaging/MessageImpl.cpp +++ b/cpp/src/qpid/messaging/MessageImpl.cpp @@ -45,28 +45,163 @@ MessageImpl::MessageImpl(const char* chars, size_t count) : bytes(chars, count), internalId(0) {} -void MessageImpl::setReplyTo(const Address& d) { replyTo = d; } -const Address& MessageImpl::getReplyTo() const { return replyTo; } +void MessageImpl::setReplyTo(const Address& d) +{ + replyTo = d; + updated(); +} +const Address& MessageImpl::getReplyTo() const +{ + if (!replyTo && encoded) encoded->getReplyTo(replyTo); + return replyTo; +} -void MessageImpl::setSubject(const std::string& s) { subject = s; } -const std::string& MessageImpl::getSubject() const { return subject; } +void MessageImpl::setSubject(const std::string& s) +{ + subject = s; + updated(); +} +const std::string& MessageImpl::getSubject() const +{ + if (!subject.size() && encoded) encoded->getSubject(subject); + return subject; +} -void MessageImpl::setContentType(const std::string& s) { contentType = s; } -const std::string& MessageImpl::getContentType() const { return contentType; } +void MessageImpl::setContentType(const std::string& s) +{ + contentType = s; + updated(); +} +const std::string& MessageImpl::getContentType() const +{ + if (!contentType.size() && encoded) encoded->getContentType(contentType); + return contentType; +} -const Variant::Map& MessageImpl::getHeaders() const { return headers; } -Variant::Map& MessageImpl::getHeaders() { return headers; } -void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val) { headers[key] = val; } +void MessageImpl::setMessageId(const std::string& s) +{ + messageId = s; + updated(); +} +const std::string& MessageImpl::getMessageId() const +{ + if (!messageId.size() && encoded) encoded->getMessageId(messageId); + return messageId; +} +void MessageImpl::setUserId(const std::string& s) +{ + userId = s; + updated(); +} +const std::string& MessageImpl::getUserId() const +{ + if (!userId.size() && encoded) encoded->getUserId(userId); + return userId; +} +void MessageImpl::setCorrelationId(const std::string& s) +{ + correlationId = s; + updated(); +} +const std::string& MessageImpl::getCorrelationId() const +{ + if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId); + return correlationId; +} +void MessageImpl::setPriority(uint8_t p) +{ + priority = p; +} +uint8_t MessageImpl::getPriority() const +{ + return priority; +} +void MessageImpl::setTtl(uint64_t t) +{ + ttl = t; +} +uint64_t MessageImpl::getTtl() const +{ + return ttl; +} +void MessageImpl::setDurable(bool d) +{ + durable = d; +} +bool MessageImpl::isDurable() const +{ + return durable; +} +void MessageImpl::setRedelivered(bool b) +{ + redelivered = b; +} +bool MessageImpl::isRedelivered() const +{ + return redelivered; +} + +const Variant::Map& MessageImpl::getHeaders() const +{ + if (!headers.size() && encoded) encoded->populate(headers); + return headers; +} +Variant::Map& MessageImpl::getHeaders() { + if (!headers.size() && encoded) encoded->populate(headers); + updated(); + return headers; +} +void MessageImpl::setHeader(const std::string& key, const qpid::types::Variant& val) +{ + headers[key] = val; updated(); +} //should these methods be on MessageContent? -void MessageImpl::setBytes(const std::string& c) { bytes = c; } -void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); } -const std::string& MessageImpl::getBytes() const { return bytes; } -std::string& MessageImpl::getBytes() { return bytes; } +void MessageImpl::setBytes(const std::string& c) +{ + bytes = c; + updated(); +} +void MessageImpl::setBytes(const char* chars, size_t count) +{ + bytes.assign(chars, count); + updated(); +} +void MessageImpl::appendBytes(const char* chars, size_t count) +{ + bytes.append(chars, count); + updated(); +} +const std::string& MessageImpl::getBytes() const +{ + if (!bytes.size() && encoded) encoded->getBody(bytes); + return bytes; +} +std::string& MessageImpl::getBytes() +{ + if (!bytes.size() && encoded) encoded->getBody(bytes); + updated();//have to assume body may be edited, invalidating our message + return bytes; +} void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; } qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; } +void MessageImpl::updated() +{ + + if (!replyTo && encoded) encoded->getReplyTo(replyTo); + if (!subject.size() && encoded) encoded->getSubject(subject); + if (!contentType.size() && encoded) encoded->getContentType(contentType); + if (!messageId.size() && encoded) encoded->getMessageId(messageId); + if (!userId.size() && encoded) encoded->getUserId(userId); + if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId); + if (!headers.size() && encoded) encoded->populate(headers); + if (!bytes.size() && encoded) encoded->getBody(bytes); + + encoded.reset(); +} + MessageImpl& MessageImplAccess::get(Message& msg) { return *msg.impl; diff --git a/cpp/src/qpid/messaging/MessageImpl.h b/cpp/src/qpid/messaging/MessageImpl.h index 57df6b3fda..915c790153 100644 --- a/cpp/src/qpid/messaging/MessageImpl.h +++ b/cpp/src/qpid/messaging/MessageImpl.h @@ -24,52 +24,77 @@ #include "qpid/messaging/Address.h" #include "qpid/types/Variant.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/messaging/amqp/EncodedMessage.h" +#include <vector> +#include <boost/shared_ptr.hpp> namespace qpid { namespace messaging { -struct MessageImpl +class MessageImpl { - Address replyTo; - std::string subject; - std::string contentType; - std::string messageId; - std::string userId; - std::string correlationId; + private: + mutable Address replyTo; + mutable std::string subject; + mutable std::string contentType; + mutable std::string messageId; + mutable std::string userId; + mutable std::string correlationId; uint8_t priority; uint64_t ttl; bool durable; bool redelivered; - qpid::types::Variant::Map headers; + mutable qpid::types::Variant::Map headers; - std::string bytes; + mutable std::string bytes; + boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded; qpid::framing::SequenceNumber internalId; + void updated(); + public: MessageImpl(const std::string& c); MessageImpl(const char* chars, size_t count); void setReplyTo(const Address& d); const Address& getReplyTo() const; - + void setSubject(const std::string& s); const std::string& getSubject() const; - + void setContentType(const std::string& s); const std::string& getContentType() const; - + + void setMessageId(const std::string&); + const std::string& getMessageId() const; + void setUserId(const std::string& ); + const std::string& getUserId() const; + void setCorrelationId(const std::string& ); + const std::string& getCorrelationId() const; + void setPriority(uint8_t); + uint8_t getPriority() const; + void setTtl(uint64_t); + uint64_t getTtl() const; + void setDurable(bool); + bool isDurable() const; + void setRedelivered(bool); + bool isRedelivered() const; + + const qpid::types::Variant::Map& getHeaders() const; qpid::types::Variant::Map& getHeaders(); void setHeader(const std::string& key, const qpid::types::Variant& val); - + void setBytes(const std::string& bytes); void setBytes(const char* chars, size_t count); + void appendBytes(const char* chars, size_t count); const std::string& getBytes() const; std::string& getBytes(); void setInternalId(qpid::framing::SequenceNumber id); qpid::framing::SequenceNumber getInternalId(); - + void setEncoded(boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> e) { encoded = e; } + boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> getEncoded() const { return encoded; } }; class Message; diff --git a/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/cpp/src/qpid/messaging/ProtocolRegistry.cpp new file mode 100644 index 0000000000..0232da8ae1 --- /dev/null +++ b/cpp/src/qpid/messaging/ProtocolRegistry.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ProtocolRegistry.h" +#include "qpid/Exception.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include "qpid/client/LoadPlugins.h" +#include <map> + +using qpid::types::Variant; + +namespace qpid { +namespace messaging { +namespace { +typedef std::map<std::string, ProtocolRegistry::Factory*> Registry; + +Registry& theRegistry() +{ + static Registry factories; + return factories; +} + +bool extract(const std::string& key, Variant& value, const Variant::Map& in, Variant::Map& out) +{ + bool matched = false; + for (Variant::Map::const_iterator i = in.begin(); i != in.end(); ++i) { + if (i->first == key) { + value = i->second; + matched = true; + } else { + out.insert(*i); + } + } + return matched; +} +} + +ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::Map& options) +{ + qpid::client::theModuleLoader();//ensure modules are loaded + Variant name; + Variant::Map stripped; + if (extract("protocol", name, options, stripped)) { + Registry::const_iterator i = theRegistry().find(name.asString()); + if (i != theRegistry().end()) return (i->second)(url, stripped); + else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped); + else throw qpid::Exception("Unsupported protocol: " + name.asString()); + } + return 0; +} +void ProtocolRegistry::add(const std::string& name, Factory* factory) +{ + theRegistry()[name] = factory; +} + +}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/ProtocolRegistry.h b/cpp/src/qpid/messaging/ProtocolRegistry.h new file mode 100644 index 0000000000..bcb62248a5 --- /dev/null +++ b/cpp/src/qpid/messaging/ProtocolRegistry.h @@ -0,0 +1,42 @@ +#ifndef QPID_MESSAGING_PROTOCOLREGISTRY_H +#define QPID_MESSAGING_PROTOCOLREGISTRY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" + +namespace qpid { +namespace messaging { +class ConnectionImpl; +/** + * Registry for different implementations of the messaging API e.g AMQP 1.0 + */ +class ProtocolRegistry +{ + public: + typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options); + static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options); + static void add(const std::string& name, Factory* factory); + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_PROTOCOLREGISTRY_H*/ diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h index 57059bfd28..e450693d2c 100644 --- a/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/cpp/src/qpid/messaging/ReceiverImpl.h @@ -22,10 +22,12 @@ * */ #include "qpid/RefCounted.h" +#include "qpid/sys/IntegerTypes.h" namespace qpid { namespace messaging { +class Duration; class Message; class MessageListener; class Session; diff --git a/cpp/src/qpid/messaging/SenderImpl.h b/cpp/src/qpid/messaging/SenderImpl.h index a1ca02c72c..d978463fdb 100644 --- a/cpp/src/qpid/messaging/SenderImpl.h +++ b/cpp/src/qpid/messaging/SenderImpl.h @@ -22,6 +22,7 @@ * */ #include "qpid/RefCounted.h" +#include "qpid/sys/IntegerTypes.h" namespace qpid { namespace messaging { diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp new file mode 100644 index 0000000000..359660dce5 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/Address.h" +#include <vector> +#include <boost/assign.hpp> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + +using qpid::types::Variant; + +namespace { +//policy types +const std::string CREATE("create"); +const std::string ASSERT("assert"); +const std::string DELETE("delete"); + +//policy values +const std::string ALWAYS("always"); +const std::string NEVER("never"); +const std::string RECEIVER("receiver"); +const std::string SENDER("sender"); + +const std::string NODE("node"); +const std::string LINK("link"); + +const std::string TYPE("type"); +const std::string TOPIC("topic"); +const std::string QUEUE("queue"); + +//distribution modes: +const std::string MOVE("move"); +const std::string COPY("copy"); + +const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); + + +const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); +const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); + +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} + +bool bind(const Variant::Map& options, const std::string& name, std::string& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asString(); + return true; + } +} + +bool bind(const Variant::Map& options, const std::string& name, Variant::Map& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asMap(); + return true; + } +} + +bool bind(const Address& address, const std::string& name, std::string& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool bind(const Address& address, const std::string& name, Variant::Map& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool in(const std::string& value, const std::vector<std::string>& choices) +{ + for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { + if (value == *i) return true; + } + return false; +} +} + +AddressHelper::AddressHelper(const Address& address) +{ + bind(address, CREATE, createPolicy); + bind(address, DELETE, deletePolicy); + bind(address, ASSERT, assertPolicy); + + bind(address, NODE, node); + bind(address, LINK, link); +} + +bool AddressHelper::createEnabled(CheckMode mode) const +{ + return enabled(createPolicy, mode); +} +bool AddressHelper::deleteEnabled(CheckMode mode) const +{ + return enabled(deletePolicy, mode); +} +bool AddressHelper::assertEnabled(CheckMode mode) const +{ + return enabled(assertPolicy, mode); +} +bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const +{ + bool result = false; + switch (mode) { + case FOR_RECEIVER: + result = in(policy, RECEIVER_MODES); + break; + case FOR_SENDER: + result = in(policy, SENDER_MODES); + break; + } + return result; +} + +const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const +{ + return node; +} +const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const +{ + return link; +} + +void AddressHelper::setNodeProperties(pn_terminus_t* terminus) +{ + pn_terminus_set_dynamic(terminus, true); + + //properties for dynamically created node: + pn_data_t* data = pn_terminus_properties(terminus); + if (node.size()) { + pn_data_put_map(data); + pn_data_enter(data); + } + for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) { + if (i->first == TYPE) { + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE)); + } else { + pn_data_put_symbol(data, convert(i->first)); + pn_data_put_string(data, convert(i->second.asString())); + } + } + if (node.size()) { + pn_data_exit(data); + } +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.h b/cpp/src/qpid/messaging/amqp/AddressHelper.h new file mode 100644 index 0000000000..cd0aa1be9e --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -0,0 +1,57 @@ +#ifndef QPID_MESSAGING_AMQP_ADDRESSHELPER_H +#define QPID_MESSAGING_AMQP_ADDRESSHELPER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" + +struct pn_terminus_t; + +namespace qpid { +namespace messaging { +class Address; +namespace amqp { + +class AddressHelper +{ + public: + enum CheckMode {FOR_RECEIVER, FOR_SENDER}; + + AddressHelper(const Address& address); + bool createEnabled(CheckMode mode) const; + bool deleteEnabled(CheckMode mode) const; + bool assertEnabled(CheckMode mode) const; + + void setNodeProperties(pn_terminus_t*); + const qpid::types::Variant::Map& getNodeProperties() const; + const qpid::types::Variant::Map& getLinkProperties() const; + private: + std::string createPolicy; + std::string assertPolicy; + std::string deletePolicy; + qpid::types::Variant::Map node; + qpid::types::Variant::Map link; + + bool enabled(const std::string& policy, CheckMode mode) const; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_ADDRESSHELPER_H*/ diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp new file mode 100644 index 0000000000..b2a9b979b6 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -0,0 +1,612 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionContext.h" +#include "DriverImpl.h" +#include "ReceiverContext.h" +#include "Sasl.h" +#include "SenderContext.h" +#include "SessionContext.h" +#include "Transport.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include <vector> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + + +ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o) + : qpid::messaging::ConnectionOptions(o), + url(u), + engine(pn_transport()), + connection(pn_connection()), + //note: disabled read/write of header as now handled by engine + writeHeader(false), + readHeader(false), + haveOutput(false), + state(DISCONNECTED), + codecSwitch(*this) +{ + if (pn_transport_bind(engine, connection)) { + //error + } + pn_connection_set_container(connection, "qpid::messaging");//TODO: take this from a connection option + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM); +} + +ConnectionContext::~ConnectionContext() +{ + close(); + sessions.clear(); + pn_transport_free(engine); + pn_connection_free(connection); +} + +namespace { +const std::string COLON(":"); +} +void ConnectionContext::open() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + + for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) { + transport = driver->getTransport(i->protocol, *this); + std::stringstream port; + port << i->port; + id = i->host + COLON + port.str(); + if (useSasl()) { + sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host)); + } + state = CONNECTING; + try { + QPID_LOG(debug, id << " Connecting ..."); + transport->connect(i->host, port.str()); + } catch (const std::exception& e) { + QPID_LOG(info, id << " Error while connecting: " << e.what()); + } + while (state == CONNECTING) { + lock.wait(); + } + if (state == DISCONNECTED) { + QPID_LOG(debug, id << " Failed to connect"); + transport = boost::shared_ptr<Transport>(); + } else { + QPID_LOG(debug, id << " Connected"); + } + } + + if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url)); + + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated()) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + wait(); + } + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + pn_connection_open(connection); + wakeupDriver(); //want to write + while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { + wait(); + } + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + QPID_LOG(debug, id << " Opened"); +} + +bool ConnectionContext::isOpen() const +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); +} + +void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_session_close(ssn->session); + //TODO: need to destroy session and remove context from map + wakeupDriver(); +} + +void ConnectionContext::close() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != CONNECTED) return; + if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + //wait for outstanding sends to settle + while (!i->second->settled()) { + QPID_LOG(debug, "Waiting for sends to settle before closing"); + wait();//wait until message has been confirmed + } + + + if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { + pn_session_close(i->second->session); + } + } + pn_connection_close(connection); + wakeupDriver(); + //wait for close to be confirmed by peer? + while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { + wait(); + } + sessions.clear(); + } + transport->close(); + while (state != DISCONNECTED) { + lock.wait(); + } +} + +bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (!lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + wakeupDriver(); + } + } + if (get(ssn, lnk, message, timeout)) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (lnk->capacity) { + pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach? + wakeupDriver(); + } + return true; + } else { + { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + wait(); + } + if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { + pn_link_flow(lnk->receiver, lnk->capacity); + } + } + if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + wakeupDriver(); + } + return true; + } else { + return false; + } + } +} + +qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until; + uint64_t ms = timeout.getMilliseconds(); + if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC); + } else { + return qpid::sys::FAR_FUTURE; + } +} + +bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); + QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); + if (current) { + qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message); + boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current))); + ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize()); + if (read < 0) throw qpid::messaging::MessagingException("Failed to read message"); + encoded->trim((size_t) read); + QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); + encoded->init(impl); + impl.setEncoded(encoded); + impl.setInternalId(ssn->record(current)); + pn_link_advance(lnk->receiver); + return true; + } else if (until > qpid::sys::now()) { + wait(); + } else { + return false; + } + } + return false; +} + +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (message) { + ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); + } else { + ssn->acknowledge(); + } + wakeupDriver(); +} + + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + lnk->configure(); + attach(ssn->session, (pn_link_t*) lnk->sender); + if (!pn_link_remote_target((pn_link_t*) lnk->sender)) { + std::string msg("No such target : "); + msg += lnk->getTarget(); + throw qpid::messaging::NotFound(msg); + } +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + lnk->configure(); + attach(ssn->session, lnk->receiver, lnk->capacity); + if (!pn_link_remote_source(lnk->receiver)) { + std::string msg("No such source : "); + msg += lnk->getSource(); + throw qpid::messaging::NotFound(msg); + } +} + +void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + QPID_LOG(debug, "Attaching link " << link << ", state=" << pn_link_state(link)); + pn_link_open(link); + QPID_LOG(debug, "Link attached " << link << ", state=" << pn_link_state(link)); + if (credit) pn_link_flow(link, credit); + wakeupDriver(); + while (pn_link_state(link) & PN_REMOTE_UNINIT) { + QPID_LOG(debug, "waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link)); + wait(); + } +} + +void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + SenderContext::Delivery* delivery(0); + while (!(delivery = snd->send(message))) { + QPID_LOG(debug, "Waiting for capacity..."); + wait();//wait for capacity + } + wakeupDriver(); + if (sync) { + while (!delivery->accepted()) { + QPID_LOG(debug, "Waiting for confirmation..."); + wait();//wait until message has been confirmed + } + } +} + +void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sender->setCapacity(capacity); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return sender->getCapacity(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return sender->getUnsettled(); +} + +void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + receiver->setCapacity(capacity); + pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); + wakeupDriver(); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getCapacity(); +} +uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getAvailable(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getUnsettled(); +} + +void ConnectionContext::activateOutput() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + wakeupDriver(); +} +/** + * Expects lock to be held by caller + */ +void ConnectionContext::wakeupDriver() +{ + switch (state) { + case CONNECTED: + haveOutput = true; + transport->activateOutput(); + QPID_LOG(debug, "wakeupDriver()"); + break; + case DISCONNECTED: + case CONNECTING: + QPID_LOG(error, "wakeupDriver() called while not connected"); + break; + } +} + +void ConnectionContext::wait() +{ + lock.wait(); + if (state == DISCONNECTED) { + throw qpid::messaging::TransportFailure("Disconnected"); + } + //check for any closed links, sessions or indeed the connection +} + +boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported"); + std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + boost::shared_ptr<SessionContext> s(new SessionContext(connection)); + s->session = pn_session(connection); + pn_session_open(s->session); + sessions[name] = s; + wakeupDriver(); + while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { + wait(); + } + return s; + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); + } + +} +boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const +{ + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + throw qpid::messaging::KeyError(std::string("No such session") + name); + } else { + return i->second; + } +} + +void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value) +{ + set(name, value); +} + +std::string ConnectionContext::getAuthenticatedUsername() +{ + return sasl.get() ? sasl->getAuthenticatedUsername() : std::string(); +} + +std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + QPID_LOG(trace, id << " decode(" << size << ")"); + if (readHeader) { + size_t decoded = readProtocolHeader(buffer, size); + if (decoded < size) { + decoded += decode(buffer + decoded, size - decoded); + } + return decoded; + } + + //TODO: Fix pn_engine_input() to take const buffer + ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size); + if (n > 0 || n == PN_EOS) { + //If engine returns EOS, have no way of knowing how many bytes + //it processed, but can assume none need to be reprocessed so + //consider them all read: + if (n == PN_EOS) n = size; + QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) + pn_transport_tick(engine, 0); + lock.notifyAll(); + return n; + } else if (n == PN_ERR) { + throw qpid::Exception(QPID_MSG("Error on input: " << getError())); + } else { + return 0; + } + +} +std::size_t ConnectionContext::encode(char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + QPID_LOG(trace, id << " encode(" << size << ")"); + if (writeHeader) { + size_t encoded = writeProtocolHeader(buffer, size); + if (encoded < size) { + encoded += encode(buffer + encoded, size - encoded); + } + return encoded; + } + + ssize_t n = pn_transport_output(engine, buffer, size); + if (n > 0) { + QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) + haveOutput = true; + return n; + } else if (n == PN_ERR) { + throw qpid::Exception(QPID_MSG("Error on output: " << getError())); + } else if (n == PN_EOS) { + haveOutput = false; + return 0;//Is this right? + } else { + haveOutput = false; + return 0; + } +} +bool ConnectionContext::canEncode() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return haveOutput && state == CONNECTED; +} +void ConnectionContext::closed() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + state = DISCONNECTED; + lock.notifyAll(); +} +void ConnectionContext::opened() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + state = CONNECTED; + lock.notifyAll(); +} +bool ConnectionContext::isClosed() const +{ + return !isOpen(); +} +namespace { +qpid::framing::ProtocolVersion AMQP_1_0_PLAIN(1,0,qpid::framing::ProtocolVersion::AMQP); +} + +std::string ConnectionContext::getError() +{ + std::stringstream text; + pn_error_t* cerror = pn_connection_error(connection); + if (cerror) text << "connection error " << pn_error_text(cerror); + pn_error_t* terror = pn_transport_error(engine); + if (terror) text << "transport error " << pn_error_text(terror); + return text.str(); +} + +framing::ProtocolVersion ConnectionContext::getVersion() const +{ + return AMQP_1_0_PLAIN; +} + +std::size_t ConnectionContext::readProtocolHeader(const char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + readHeader = false; + qpid::framing::Buffer out(const_cast<char*>(buffer), size); + pi.decode(out); + QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi); + return pi.encodedSize(); + } else { + return 0; + } +} +std::size_t ConnectionContext::writeProtocolHeader(char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi); + writeHeader = false; + qpid::framing::Buffer out(buffer, size); + pi.encode(out); + return pi.encodedSize(); + } else { + QPID_LOG_CAT(debug, protocol, id << " insufficient buffer for protocol header: " << size) + return 0; + } +} +bool ConnectionContext::useSasl() +{ + return !(mechanism == "none" || mechanism == "NONE" || mechanism == "None"); +} + +qpid::sys::Codec& ConnectionContext::getCodec() +{ + return codecSwitch; +} + +ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {} +std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + size_t decoded = 0; + if (parent.sasl.get() && !parent.sasl->authenticated()) { + decoded = parent.sasl->decode(buffer, size); + if (!parent.sasl->authenticated()) return decoded; + } + if (decoded < size) { + if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); + else decoded += parent.decode(buffer+decoded, size-decoded); + } + return decoded; +} +std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + size_t encoded = 0; + if (parent.sasl.get() && parent.sasl->canEncode()) { + encoded += parent.sasl->encode(buffer, size); + if (!parent.sasl->authenticated()) return encoded; + } + if (encoded < size) { + if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); + else encoded += parent.encode(buffer+encoded, size-encoded); + } + return encoded; +} +bool ConnectionContext::CodecSwitch::canEncode() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + if (parent.sasl.get()) { + if (parent.sasl->canEncode()) return true; + else if (!parent.sasl->authenticated()) return false; + else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode(); + } + return parent.canEncode(); +} + + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/cpp/src/qpid/messaging/amqp/ConnectionContext.h new file mode 100644 index 0000000000..3718184365 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -0,0 +1,150 @@ +#ifndef QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H +#define QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <deque> +#include <map> +#include <memory> +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/Url.h" +#include "qpid/messaging/ConnectionOptions.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Monitor.h" +#include "qpid/types/Variant.h" +#include "qpid/messaging/amqp/TransportContext.h" + +struct pn_connection_t; +struct pn_link_t; +struct pn_session_t; +struct pn_transport_t; + + +namespace qpid { +namespace framing { +class ProtocolVersion; +} +namespace messaging { +class Duration; +class Message; +namespace amqp { + +class DriverImpl; +class ReceiverContext; +class Sasl; +class SessionContext; +class SenderContext; +class Transport; + +/** + * + */ +class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messaging::ConnectionOptions, public TransportContext +{ + public: + ConnectionContext(const std::string& url, const qpid::types::Variant::Map& options); + ~ConnectionContext(); + void open(); + bool isOpen() const; + void close(); + boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name); + boost::shared_ptr<SessionContext> getSession(const std::string& name) const; + void endSession(boost::shared_ptr<SessionContext>); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); + bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); + + void setOption(const std::string& name, const qpid::types::Variant& value); + std::string getAuthenticatedUsername(); + + void setCapacity(boost::shared_ptr<SenderContext>, uint32_t); + uint32_t getCapacity(boost::shared_ptr<SenderContext>); + uint32_t getUnsettled(boost::shared_ptr<SenderContext>); + + void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t); + uint32_t getCapacity(boost::shared_ptr<ReceiverContext>); + uint32_t getAvailable(boost::shared_ptr<ReceiverContext>); + uint32_t getUnsettled(boost::shared_ptr<ReceiverContext>); + + + void activateOutput(); + qpid::sys::Codec& getCodec(); + //ConnectionCodec interface: + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + void closed(); + bool isClosed() const; + framing::ProtocolVersion getVersion() const; + //additionally, Transport needs: + void opened();//signal successful connection + + private: + typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap; + qpid::Url url; + + boost::shared_ptr<DriverImpl> driver; + boost::shared_ptr<Transport> transport; + + pn_transport_t* engine; + pn_connection_t* connection; + SessionMap sessions; + mutable qpid::sys::Monitor lock; + bool writeHeader; + bool readHeader; + bool haveOutput; + std::string id; + enum { + DISCONNECTED, + CONNECTING, + CONNECTED + } state; + std::auto_ptr<Sasl> sasl; + class CodecSwitch : public qpid::sys::Codec + { + public: + CodecSwitch(ConnectionContext&); + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + private: + ConnectionContext& parent; + }; + CodecSwitch codecSwitch; + + void wait(); + void wakeupDriver(); + void attach(pn_session_t*, pn_link_t*, int credit=0); + + std::size_t readProtocolHeader(const char* buffer, std::size_t size); + std::size_t writeProtocolHeader(char* buffer, std::size_t size); + std::string getError(); + bool useSasl(); +}; + +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H*/ diff --git a/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp new file mode 100644 index 0000000000..0c4ec2bfcb --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionHandle.h" +#include "ConnectionContext.h" +#include "SessionHandle.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/ProtocolRegistry.h" + +namespace qpid { +namespace messaging { +namespace amqp { +// Static constructor which registers this implementation in the ProtocolRegistry +namespace { +ConnectionImpl* create(const std::string& u, const qpid::types::Variant::Map& o) +{ + return new ConnectionHandle(u, o); +} + +struct StaticInit +{ + StaticInit() + { + ProtocolRegistry::add("amqp1.0", &create); + }; +} init; +} + +ConnectionHandle::ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options) : connection(new ConnectionContext(url, options)) {} +ConnectionHandle::ConnectionHandle(boost::shared_ptr<ConnectionContext> c) : connection(c) {} + +void ConnectionHandle::open() +{ + connection->open(); +} + +bool ConnectionHandle::isOpen() const +{ + return connection->isOpen(); +} + +void ConnectionHandle::close() +{ + connection->close(); +} + +Session ConnectionHandle::newSession(bool transactional, const std::string& name) +{ + return qpid::messaging::Session(new SessionHandle(connection, connection->newSession(transactional, name))); +} + +Session ConnectionHandle::getSession(const std::string& name) const +{ + return qpid::messaging::Session(new SessionHandle(connection, connection->getSession(name))); +} + +void ConnectionHandle::setOption(const std::string& name, const qpid::types::Variant& value) +{ + connection->setOption(name, value); +} + +std::string ConnectionHandle::getAuthenticatedUsername() +{ + return connection->getAuthenticatedUsername(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/cpp/src/qpid/messaging/amqp/ConnectionHandle.h new file mode 100644 index 0000000000..d1eb27f6de --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ConnectionHandle.h @@ -0,0 +1,58 @@ +#ifndef QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H +#define QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/ConnectionImpl.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +/** + * Handles are directly referenced by applications; Contexts are + * referenced by Handles. This allows a graph structure that + * remains intact as long as the application references any part + * of it, but that can be automatically reclaimed if the whole + * graph becomes unreferenced. + */ +class ConnectionHandle : public qpid::messaging::ConnectionImpl +{ + public: + ConnectionHandle(const std::string& url, const qpid::types::Variant::Map& options); + ConnectionHandle(boost::shared_ptr<ConnectionContext>); + void open(); + bool isOpen() const; + void close(); + Session newSession(bool transactional, const std::string& name); + Session getSession(const std::string& name) const; + void setOption(const std::string& name, const qpid::types::Variant& value); + std::string getAuthenticatedUsername(); + private: + boost::shared_ptr<ConnectionContext> connection; +}; + +}}} // namespace qpid::messaging::amqp_1.0 + +#endif /*!QPID_MESSAGING_AMQP_CONNECTIONHANDLE_H*/ diff --git a/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/cpp/src/qpid/messaging/amqp/DriverImpl.cpp new file mode 100644 index 0000000000..16307b3c22 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/DriverImpl.cpp @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DriverImpl.h" +#include "Transport.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +DriverImpl::DriverImpl() : poller(new qpid::sys::Poller) +{ + start(); +} +DriverImpl::~DriverImpl() +{ + stop(); +} + +void DriverImpl::start() +{ + thread = qpid::sys::Thread(*poller); + QPID_LOG(debug, "Driver started"); +} + +void DriverImpl::stop() +{ + QPID_LOG(debug, "Driver stopped"); + poller->shutdown(); + thread.join(); +} + +boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection) +{ + boost::shared_ptr<Transport> t(Transport::create(protocol, connection, poller)); + if (!t) throw qpid::messaging::ConnectionError("No such transport: " + protocol); + return t; +} + + +qpid::sys::Mutex DriverImpl::defaultLock; +boost::weak_ptr<DriverImpl> DriverImpl::theDefault; +boost::shared_ptr<DriverImpl> DriverImpl::getDefault() +{ + qpid::sys::Mutex::ScopedLock l(defaultLock); + boost::shared_ptr<DriverImpl> p = theDefault.lock(); + if (!p) { + p = boost::shared_ptr<DriverImpl>(new DriverImpl); + theDefault = p; + } + return p; +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/DriverImpl.h b/cpp/src/qpid/messaging/amqp/DriverImpl.h new file mode 100644 index 0000000000..354fa1ae35 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/DriverImpl.h @@ -0,0 +1,60 @@ +#ifndef QPID_MESSAGING_AMQP_DRIVERIMPL_H +#define QPID_MESSAGING_AMQP_DRIVERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Thread.h" +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> + +namespace qpid { +namespace sys { +class Poller; +} +namespace messaging { +namespace amqp { +class TransportContext; +class Transport; +/** + * + */ +class DriverImpl +{ + public: + DriverImpl(); + ~DriverImpl(); + + void start(); + void stop(); + + boost::shared_ptr<Transport> getTransport(const std::string& protocol, TransportContext& connection); + + static boost::shared_ptr<DriverImpl> getDefault(); + private: + boost::shared_ptr<qpid::sys::Poller> poller; + qpid::sys::Thread thread; + static qpid::sys::Mutex defaultLock; + static boost::weak_ptr<DriverImpl> theDefault; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_DRIVERIMPL_H*/ diff --git a/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp new file mode 100644 index 0000000000..54de3eae45 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/EncodedMessage.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/amqp/Decoder.h" +#include <boost/lexical_cast.hpp> +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { + +using namespace qpid::amqp; + +EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0) +{ + init(); +} + +EncodedMessage::EncodedMessage() : size(0), data(0) +{ + init(); +} + +EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0) +{ + init(); +} + +void EncodedMessage::init() +{ + //init all CharSequence members + deliveryAnnotations.init(); + messageAnnotations.init(); + userId.init(); + to.init(); + subject.init(); + replyTo.init(); + contentType.init(); + contentEncoding.init(); + groupId.init(); + replyToGroupId.init(); + applicationProperties.init(); + body.init(); + footer.init(); +} + +EncodedMessage::~EncodedMessage() +{ + delete[] data; +} + +size_t EncodedMessage::getSize() const +{ + return size; +} +void EncodedMessage::trim(size_t t) +{ + size = t; +} +void EncodedMessage::resize(size_t s) +{ + delete[] data; + size = s; + data = new char[size]; +} + +char* EncodedMessage::getData() +{ + return data; +} +const char* EncodedMessage::getData() const +{ + return data; +} + +void EncodedMessage::init(qpid::messaging::MessageImpl& impl) +{ + //initial scan of raw data + qpid::amqp::Decoder decoder(data, size); + InitialScan reader(*this, impl); + decoder.read(reader); + bareMessage = reader.getBareMessage(); + if (bareMessage.data && !bareMessage.size) { + bareMessage.size = (data + size) - bareMessage.data; + } + +} +void EncodedMessage::populate(qpid::types::Variant::Map& map) const +{ + //decode application properties + if (applicationProperties) { + qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size); + decoder.readMap(map); + } + //add in 'x-amqp-' prefixed values + if (!!firstAcquirer) { + map["x-amqp-first-acquirer"] = firstAcquirer.get(); + } + if (!!deliveryCount) { + map["x-amqp-delivery-count"] = deliveryCount.get(); + } + if (to) { + map["x-amqp-delivery-count"] = to.str(); + } + if (!!absoluteExpiryTime) { + map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get(); + } + if (!!creationTime) { + map["x-amqp-creation-time"] = creationTime.get(); + } + if (groupId) { + map["x-amqp-group-id"] = groupId.str(); + } + if (!!groupSequence) { + map["x-amqp-qroup-sequence"] = groupSequence.get(); + } + if (replyToGroupId) { + map["x-amqp-reply-to-group-id"] = replyToGroupId.str(); + } + //add in any annotations + if (deliveryAnnotations) { + qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap(); + qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size); + decoder.readMap(annotations); + } + if (messageAnnotations) { + qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap(); + qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size); + decoder.readMap(annotations); + } +} +qpid::amqp::CharSequence EncodedMessage::getBareMessage() const +{ + return bareMessage; +} + +void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const +{ + a = qpid::messaging::Address(replyTo.str()); +} +void EncodedMessage::getSubject(std::string& s) const +{ + s.assign(subject.data, subject.size); +} +void EncodedMessage::getContentType(std::string& s) const +{ + s.assign(contentType.data, contentType.size); +} +void EncodedMessage::getUserId(std::string& s) const +{ + s.assign(userId.data, userId.size); +} +void EncodedMessage::getMessageId(std::string& s) const +{ + messageId.assign(s); +} +void EncodedMessage::getCorrelationId(std::string& s) const +{ + correlationId.assign(s); +} +void EncodedMessage::getBody(std::string& s) const +{ + s.assign(body.data, body.size); +} + +qpid::amqp::CharSequence EncodedMessage::getBody() const +{ + return body; +} + +bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) const +{ + if (!durable) { + if (msg.isDurable()) return true; + } else { + if (durable.get() != msg.isDurable()) return true; + } + + if (!priority) { + if (msg.getPriority() != 4) return true; + } else { + if (priority.get() != msg.getPriority()) return true; + } + + if (msg.getTtl() && (!ttl || msg.getTtl() != ttl.get())) { + return true; + } + + //first-acquirer can't be changed via Message interface as yet + + if (msg.isRedelivered() && (!deliveryCount || deliveryCount.get() == 0)) { + return true; + } + + return false; +} + + +EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m) +{ + //set up defaults as needed: + mi.setPriority(4); +} +//header: +void EncodedMessage::InitialScan::onDurable(bool b) { mi.setDurable(b); em.durable = b; } +void EncodedMessage::InitialScan::onPriority(uint8_t i) { mi.setPriority(i); em.priority = i; } +void EncodedMessage::InitialScan::onTtl(uint32_t i) { mi.setTtl(i); em.ttl = i; } +void EncodedMessage::InitialScan::onFirstAcquirer(bool b) { em.firstAcquirer = b; } +void EncodedMessage::InitialScan::onDeliveryCount(uint32_t i) +{ + mi.setRedelivered(i); + em.deliveryCount = i; +} + +//properties: +void EncodedMessage::InitialScan::onMessageId(uint64_t v) { em.messageId.set(v); } +void EncodedMessage::InitialScan::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.messageId.set(v, t); } +void EncodedMessage::InitialScan::onUserId(const qpid::amqp::CharSequence& v) { em.userId = v; } +void EncodedMessage::InitialScan::onTo(const qpid::amqp::CharSequence& v) { em.to = v; } +void EncodedMessage::InitialScan::onSubject(const qpid::amqp::CharSequence& v) { em.subject = v; } +void EncodedMessage::InitialScan::onReplyTo(const qpid::amqp::CharSequence& v) { em.replyTo = v;} +void EncodedMessage::InitialScan::onCorrelationId(uint64_t v) { em.correlationId.set(v); } +void EncodedMessage::InitialScan::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.correlationId.set(v, t); } +void EncodedMessage::InitialScan::onContentType(const qpid::amqp::CharSequence& v) { em.contentType = v; } +void EncodedMessage::InitialScan::onContentEncoding(const qpid::amqp::CharSequence& v) { em.contentEncoding = v; } +void EncodedMessage::InitialScan::onAbsoluteExpiryTime(int64_t i) { em.absoluteExpiryTime = i; } +void EncodedMessage::InitialScan::onCreationTime(int64_t i) { em.creationTime = i; } +void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { em.groupId = v; } +void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; } +void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; } + +void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; } +void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; } +void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; } +void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&) +{ + //TODO: how to communicate the type, i.e. descriptor? + em.body = v; +} +void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {} +void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; } + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/EncodedMessage.h b/cpp/src/qpid/messaging/amqp/EncodedMessage.h new file mode 100644 index 0000000000..09a9d948d5 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/EncodedMessage.h @@ -0,0 +1,177 @@ +#ifndef QPID_MESSAGING_AMQP_ENCODEDMESSAGE_H +#define QPID_MESSAGING_AMQP_ENCODEDMESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/MessageId.h" +#include "qpid/amqp/MessageReader.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/types/Variant.h" +#include <boost/optional.hpp> + +namespace qpid { +namespace amqp { +struct Descriptor; +} +namespace messaging { +class Address; +class MessageImpl; +namespace amqp { + +/** + * Used to 'lazy-decode' an AMQP 1.0 message. + * + * There are four categories of data item: + * + * (i) simple, fixed width primitives - priority, ttl, durability, + * delivery count - for which lazy-decoding doesn't buy much. These + * are decoded unconditionally on an initial scan of the message. + * + * (ii) standard variable length string properties - subject, + * message-id, user-id etc - which require conversion to a std::string + * for returning to the application. By delaying the conversion of + * these to a std::string we can avoid allocation & copying until it + * is actually required. The initial scan of the message merely + * records the position of these strings within the raw message data. + * + * (iii) custom, application defined headers. These form a map, and + * again, delaying the creation of that map until it is actually + * required can be advantageous. The initial scan of the message merely + * records the position of this section within the raw message data. + * + * (iv) the body content. This may be retreived as a std::string, or + * as a char*. Avoiding conversion to the string until it is required + * is advantageous. The initial scan of the message merely records the + * position of this section within the raw message data. + * + * At present the Message class only explicitly exposes some of the + * standard property and headers defined by AMQP 1.0. The remainder + * will have to be accessed through the message 'headers' map, using + * the 'x-amqp-' prefix. + */ +class EncodedMessage +{ + public: + EncodedMessage(); + EncodedMessage(size_t); + EncodedMessage(const EncodedMessage&); + ~EncodedMessage(); + + + size_t getSize() const; + char* getData(); + const char* getData() const; + void trim(size_t); + void resize(size_t); + + void getReplyTo(qpid::messaging::Address&) const; + void getSubject(std::string&) const; + void getContentType(std::string&) const; + void getMessageId(std::string&) const; + void getUserId(std::string&) const; + void getCorrelationId(std::string&) const; + + void init(qpid::messaging::MessageImpl&); + void populate(qpid::types::Variant::Map&) const; + void getBody(std::string&) const; + qpid::amqp::CharSequence getBareMessage() const; + qpid::amqp::CharSequence getBody() const; + bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const; + private: + size_t size; + char* data; + + class InitialScan : public qpid::amqp::MessageReader + { + public: + InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m); + //header: + void onDurable(bool b); + void onPriority(uint8_t i); + void onTtl(uint32_t i); + void onFirstAcquirer(bool b); + void onDeliveryCount(uint32_t i); + //properties: + void onMessageId(uint64_t); + void onMessageId(const qpid::amqp::CharSequence&, qpid::types::VariantType); + void onUserId(const qpid::amqp::CharSequence& v); + void onTo(const qpid::amqp::CharSequence& v); + void onSubject(const qpid::amqp::CharSequence& v); + void onReplyTo(const qpid::amqp::CharSequence& v); + void onCorrelationId(uint64_t); + void onCorrelationId(const qpid::amqp::CharSequence&, qpid::types::VariantType); + void onContentType(const qpid::amqp::CharSequence& v); + void onContentEncoding(const qpid::amqp::CharSequence& v); + void onAbsoluteExpiryTime(int64_t i); + void onCreationTime(int64_t); + void onGroupId(const qpid::amqp::CharSequence&); + void onGroupSequence(uint32_t); + void onReplyToGroupId(const qpid::amqp::CharSequence&); + + void onApplicationProperties(const qpid::amqp::CharSequence&); + void onDeliveryAnnotations(const qpid::amqp::CharSequence&); + void onMessageAnnotations(const qpid::amqp::CharSequence&); + void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&); + void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&); + void onFooter(const qpid::amqp::CharSequence&); + private: + EncodedMessage& em; + qpid::messaging::MessageImpl& mi; + }; + //header: + boost::optional<bool> durable; + boost::optional<uint8_t> priority; + boost::optional<uint32_t> ttl; + boost::optional<bool> firstAcquirer; + boost::optional<uint32_t> deliveryCount; + //annotations: + qpid::amqp::CharSequence deliveryAnnotations; + qpid::amqp::CharSequence messageAnnotations; + + qpid::amqp::CharSequence bareMessage;//properties, application-properties and content + //properties: + qpid::amqp::MessageId messageId; + qpid::amqp::CharSequence userId; + qpid::amqp::CharSequence to; + qpid::amqp::CharSequence subject; + qpid::amqp::CharSequence replyTo; + qpid::amqp::MessageId correlationId; + qpid::amqp::CharSequence contentType; + qpid::amqp::CharSequence contentEncoding; + boost::optional<int64_t> absoluteExpiryTime; + boost::optional<int64_t> creationTime; + qpid::amqp::CharSequence groupId; + boost::optional<uint32_t> groupSequence; + qpid::amqp::CharSequence replyToGroupId; + //application-properties: + qpid::amqp::CharSequence applicationProperties; + qpid::amqp::CharSequence body; + //footer: + qpid::amqp::CharSequence footer; + + void init(); + //not implemented: + EncodedMessage& operator=(const EncodedMessage&); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_ENCODEDMESSAGE_H*/ diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp new file mode 100644 index 0000000000..414793c7fd --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/ReceiverContext.h" +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/amqp/descriptors.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { +//TODO: proper conversion to wide string for address +ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) + : name(n), + address(a), + receiver(pn_receiver(session, name.c_str())), + capacity(0) {} +ReceiverContext::~ReceiverContext() +{ + pn_link_free(receiver); +} + +void ReceiverContext::setCapacity(uint32_t c) +{ + if (c != capacity) { + //stop + capacity = c; + //reissue credit + } +} + +uint32_t ReceiverContext::getCapacity() +{ + return capacity; +} + +uint32_t ReceiverContext::getAvailable() +{ + uint32_t count(0); + for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) { + ++count; + if (d == pn_link_current(receiver)) break; + } + return count; +} + +uint32_t ReceiverContext::getUnsettled() +{ + uint32_t count(0); + for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) { + ++count; + } + return count; +} + +void ReceiverContext::close() +{ + +} + +const std::string& ReceiverContext::getName() const +{ + return name; +} + +const std::string& ReceiverContext::getSource() const +{ + return address.getName(); +} +namespace { +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} +bool hasWildcards(const std::string& key) +{ + return key.find('*') != std::string::npos || key.find('#') != std::string::npos; +} + +uint64_t getFilterDescriptor(const std::string& key) +{ + return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; +} +} + +void ReceiverContext::configure() const +{ + configure(pn_link_source(receiver)); +} +void ReceiverContext::configure(pn_terminus_t* source) const +{ + pn_terminus_set_address(source, address.getName().c_str()); + //dynamic create: + AddressHelper helper(address); + if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) { + helper.setNodeProperties(source); + } + + if (!address.getSubject().empty()) { + //filter: + pn_data_t* filter = pn_terminus_filter(source); + pn_data_put_map(filter); + pn_data_enter(filter); + pn_data_put_symbol(filter, convert("subject")); + //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved + //pn_data_put_described(filter); + //pn_data_enter(filter); + //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject())); + pn_data_put_string(filter, convert(address.getSubject())); + //pn_data_exit(filter); + pn_data_exit(filter); + } +} + +bool ReceiverContext::isClosed() const +{ + return false;//TODO +} + + + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/cpp/src/qpid/messaging/amqp/ReceiverContext.h new file mode 100644 index 0000000000..34ecdda6be --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -0,0 +1,68 @@ +#ifndef QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H +#define QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Address.h" +#include <string> +#include "qpid/sys/IntegerTypes.h" + +struct pn_link_t; +struct pn_session_t; +struct pn_terminus_t; + +namespace qpid { +namespace messaging { + +class Duration; +class Message; + +namespace amqp { + +/** + * + */ +class ReceiverContext +{ + public: + ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source); + ~ReceiverContext(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getAvailable(); + uint32_t getUnsettled(); + void attach(); + void close(); + const std::string& getName() const; + const std::string& getSource() const; + bool isClosed() const; + void configure() const; + private: + friend class ConnectionContext; + const std::string name; + const Address address; + pn_link_t* receiver; + uint32_t capacity; + void configure(pn_terminus_t*) const; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_RECEIVERCONTEXT_H*/ diff --git a/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp new file mode 100644 index 0000000000..9bf64ebb8d --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReceiverHandle.h" +#include "ConnectionContext.h" +#include "SessionContext.h" +#include "SessionHandle.h" +#include "ReceiverContext.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +ReceiverHandle::ReceiverHandle(boost::shared_ptr<ConnectionContext> c, + boost::shared_ptr<SessionContext> s, + boost::shared_ptr<ReceiverContext> r +) : connection(c), session(s), receiver(r) {} + + +bool ReceiverHandle::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + return connection->get(session, receiver, message, timeout); +} + +qpid::messaging::Message ReceiverHandle::get(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!get(result, timeout)) throw qpid::messaging::NoMessageAvailable(); + return result; +} + +bool ReceiverHandle::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + return connection->fetch(session, receiver, message, timeout); +} + +qpid::messaging::Message ReceiverHandle::fetch(qpid::messaging::Duration timeout) +{ + qpid::messaging::Message result; + if (!fetch(result, timeout)) throw qpid::messaging::NoMessageAvailable(); + return result; +} + +void ReceiverHandle::setCapacity(uint32_t capacity) +{ + connection->setCapacity(receiver, capacity); +} + +uint32_t ReceiverHandle::getCapacity() +{ + return connection->getCapacity(receiver); +} + +uint32_t ReceiverHandle::getAvailable() +{ + return connection->getAvailable(receiver); +} + +uint32_t ReceiverHandle::getUnsettled() +{ + return connection->getUnsettled(receiver); +} + +void ReceiverHandle::close() +{ + session->closeReceiver(getName()); +} + +const std::string& ReceiverHandle::getName() const +{ + return receiver->getName(); +} + +qpid::messaging::Session ReceiverHandle::getSession() const +{ + //create new SessionHandle instance; i.e. create new handle that shares the same context + return qpid::messaging::Session(new SessionHandle(connection, session)); +} + +bool ReceiverHandle::isClosed() const +{ + return receiver->isClosed(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/ReceiverHandle.h b/cpp/src/qpid/messaging/amqp/ReceiverHandle.h new file mode 100644 index 0000000000..a1a6f26025 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ReceiverHandle.h @@ -0,0 +1,63 @@ +#ifndef QPID_MESSAGING_AMQP_RECEIVERHANDLE_H +#define QPID_MESSAGING_AMQP_RECEIVERHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/ReceiverImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +class ReceiverContext; +/** + * + */ +class ReceiverHandle : public qpid::messaging::ReceiverImpl +{ + public: + ReceiverHandle(boost::shared_ptr<ConnectionContext>, + boost::shared_ptr<SessionContext>, + boost::shared_ptr<ReceiverContext> + ); + bool get(Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message get(qpid::messaging::Duration timeout); + bool fetch(Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message fetch(qpid::messaging::Duration timeout); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getAvailable(); + uint32_t getUnsettled(); + void close(); + const std::string& getName() const; + qpid::messaging::Session getSession() const; + bool isClosed() const; + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; + boost::shared_ptr<ReceiverContext> receiver; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_RECEIVERHANDLE_H*/ diff --git a/cpp/src/qpid/messaging/amqp/Sasl.cpp b/cpp/src/qpid/messaging/amqp/Sasl.cpp new file mode 100644 index 0000000000..a8bae1adda --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/Sasl.cpp @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionContext.h" +#include "qpid/messaging/amqp/Sasl.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/log/Statement.h" +#include "qpid/Sasl.h" +#include "qpid/SaslFactory.h" +#include "qpid/StringUtils.h" +#include <sstream> + +namespace qpid { +namespace messaging { +namespace amqp { + +Sasl::Sasl(const std::string& id, ConnectionContext& c, const std::string& hostname_) + : qpid::amqp::SaslClient(id), context(c), + sasl(qpid::SaslFactory::getInstance().create(c.username, c.password, c.service, hostname_, c.minSsf, c.maxSsf, false)), + hostname(hostname_), readHeader(true), writeHeader(true), haveOutput(false), state(NONE) {} + +std::size_t Sasl::decode(const char* buffer, std::size_t size) +{ + size_t decoded = 0; + if (readHeader) { + decoded += readProtocolHeader(buffer, size); + readHeader = !decoded; + } + if (state == NONE && decoded < size) { + decoded += read(buffer + decoded, size - decoded); + } + QPID_LOG(trace, id << " Sasl::decode(" << size << "): " << decoded); + return decoded; +} + +std::size_t Sasl::encode(char* buffer, std::size_t size) +{ + size_t encoded = 0; + if (writeHeader) { + encoded += writeProtocolHeader(buffer, size); + writeHeader = !encoded; + } + if (encoded < size) { + encoded += write(buffer + encoded, size - encoded); + } + haveOutput = (encoded == size); + QPID_LOG(trace, id << " Sasl::encode(" << size << "): " << encoded); + return encoded; +} + +bool Sasl::canEncode() +{ + QPID_LOG(trace, id << " Sasl::canEncode(): " << writeHeader << " || " << haveOutput); + return writeHeader || haveOutput; +} + +void Sasl::mechanisms(const std::string& offered) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-MECHANISMS(" << offered << ")"); + std::string response; + + std::string mechanisms; + if (context.mechanism.size()) { + std::vector<std::string> allowed = split(context.mechanism, " "); + std::vector<std::string> supported = split(offered, " "); + std::stringstream intersection; + for (std::vector<std::string>::const_iterator i = allowed.begin(); i != allowed.end(); ++i) { + if (std::find(supported.begin(), supported.end(), *i) != supported.end()) { + intersection << *i << " "; + } + } + mechanisms = intersection.str(); + } else { + mechanisms = offered; + } + + if (sasl->start(mechanisms, response)) { + init(sasl->getMechanism(), &response, hostname.size() ? &hostname : 0); + } else { + init(sasl->getMechanism(), 0, hostname.size() ? &hostname : 0); + } + haveOutput = true; + context.activateOutput(); +} +void Sasl::challenge(const std::string& challenge) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(" << challenge.size() << " bytes)"); + std::string r = sasl->step(challenge); + response(&r); + haveOutput = true; + context.activateOutput(); +} +namespace { +const std::string EMPTY; +} +void Sasl::challenge() +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(null)"); + std::string r = sasl->step(EMPTY); + response(&r); +} +void Sasl::outcome(uint8_t result, const std::string& extra) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ", " << extra << ")"); + outcome(result); +} +void Sasl::outcome(uint8_t result) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ")"); + if (result) state = FAILED; + else state = SUCCEEDED; + + securityLayer = sasl->getSecurityLayer(context.maxFrameSize); + if (securityLayer.get()) { + securityLayer->init(&context); + } + context.activateOutput(); +} + +qpid::sys::Codec* Sasl::getSecurityLayer() +{ + return securityLayer.get(); +} + +bool Sasl::authenticated() +{ + switch (state) { + case SUCCEEDED: return true; + case FAILED: throw qpid::messaging::UnauthorizedAccess("Failed to authenticate"); + case NONE: default: return false; + } +} + +std::string Sasl::getAuthenticatedUsername() +{ + return sasl->getUserId(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/Sasl.h b/cpp/src/qpid/messaging/amqp/Sasl.h new file mode 100644 index 0000000000..6657779fdc --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/Sasl.h @@ -0,0 +1,72 @@ +#ifndef QPID_MESSAGING_AMQP_SASL_H +#define QPID_MESSAGING_AMQP_SASL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Codec.h" +#include "qpid/amqp/SaslClient.h" +#include <memory> + +namespace qpid { +class Sasl; +namespace sys { +class SecurityLayer; +} +namespace messaging { +class ConnectionOptions; +namespace amqp { +class ConnectionContext; + +/** + * + */ +class Sasl : public qpid::sys::Codec, qpid::amqp::SaslClient +{ + public: + Sasl(const std::string& id, ConnectionContext& context, const std::string& hostname); + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + + bool authenticated(); + qpid::sys::Codec* getSecurityLayer(); + std::string getAuthenticatedUsername(); + private: + ConnectionContext& context; + std::auto_ptr<qpid::Sasl> sasl; + std::string hostname; + bool readHeader; + bool writeHeader; + bool haveOutput; + enum { + NONE, FAILED, SUCCEEDED + } state; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + + void mechanisms(const std::string&); + void challenge(const std::string&); + void challenge(); //null != empty string + void outcome(uint8_t result, const std::string&); + void outcome(uint8_t result); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SASL_H*/ diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp new file mode 100644 index 0000000000..96c4437b89 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -0,0 +1,363 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/SenderContext.h" +#include "qpid/messaging/amqp/EncodedMessage.h" +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} +#include <boost/shared_ptr.hpp> +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { +//TODO: proper conversion to wide string for address +SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) + : name(n), + address(a), + sender(pn_sender(session, n.c_str())), capacity(1000) {} + +SenderContext::~SenderContext() +{ + pn_link_free(sender); +} + +void SenderContext::close() +{ + +} + +void SenderContext::setCapacity(uint32_t c) +{ + if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!"); + capacity = c; +} + +uint32_t SenderContext::getCapacity() +{ + return capacity; +} + +uint32_t SenderContext::getUnsettled() +{ + return processUnsettled(); +} + +const std::string& SenderContext::getName() const +{ + return name; +} + +const std::string& SenderContext::getTarget() const +{ + return address.getName(); +} + +SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) +{ + if (processUnsettled() < capacity && pn_link_credit(sender)) { + deliveries.push_back(Delivery(nextId++)); + Delivery& delivery = deliveries.back(); + delivery.encode(MessageImplAccess::get(message), address); + delivery.send(sender); + return &delivery; + } else { + return 0; + } +} + +uint32_t SenderContext::processUnsettled() +{ + //remove accepted messages from front of deque + while (!deliveries.empty() && deliveries.front().accepted()) { + deliveries.front().settle(); + deliveries.pop_front(); + } + return deliveries.size(); +} +namespace { +class HeaderAdapter : public qpid::amqp::MessageEncoder::Header +{ + public: + HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {} + virtual bool isDurable() const + { + return msg.isDurable(); + } + virtual uint8_t getPriority() const + { + return msg.getPriority(); + } + virtual bool hasTtl() const + { + return msg.getTtl(); + } + virtual uint32_t getTtl() const + { + return msg.getTtl(); + } + virtual bool isFirstAcquirer() const + { + return false; + } + virtual uint32_t getDeliveryCount() const + { + return msg.isRedelivered() ? 1 : 0; + } + private: + const qpid::messaging::MessageImpl& msg; +}; +const std::string EMPTY; + +class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties +{ + public: + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {} + bool hasMessageId() const + { + return getMessageId().size(); + } + std::string getMessageId() const + { + return msg.getMessageId(); + } + + bool hasUserId() const + { + return getUserId().size(); + } + + std::string getUserId() const + { + return msg.getUserId(); + } + + bool hasTo() const + { + return false;//not yet supported + } + + std::string getTo() const + { + return EMPTY;//not yet supported + } + + bool hasSubject() const + { + return subject.size() || getSubject().size(); + } + + std::string getSubject() const + { + return subject.size() ? subject : msg.getSubject(); + } + + bool hasReplyTo() const + { + return msg.getReplyTo(); + } + + std::string getReplyTo() const + { + return msg.getReplyTo().str(); + } + + bool hasCorrelationId() const + { + return getCorrelationId().size(); + } + + std::string getCorrelationId() const + { + return msg.getCorrelationId(); + } + + bool hasContentType() const + { + return getContentType().size(); + } + + std::string getContentType() const + { + return msg.getContentType(); + } + + bool hasContentEncoding() const + { + return false;//not yet supported + } + + std::string getContentEncoding() const + { + return EMPTY;//not yet supported + } + + bool hasAbsoluteExpiryTime() const + { + return false;//not yet supported + } + + int64_t getAbsoluteExpiryTime() const + { + return 0;//not yet supported + } + + bool hasCreationTime() const + { + return false;//not yet supported + } + + int64_t getCreationTime() const + { + return 0;//not yet supported + } + + bool hasGroupId() const + { + return false;//not yet supported + } + + std::string getGroupId() const + { + return EMPTY;//not yet supported + } + + bool hasGroupSequence() const + { + return false;//not yet supported + } + + uint32_t getGroupSequence() const + { + return 0;//not yet supported + } + + bool hasReplyToGroupId() const + { + return false;//not yet supported + } + + std::string getReplyToGroupId() const + { + return EMPTY;//not yet supported + } + private: + const qpid::messaging::MessageImpl& msg; + const std::string subject; +}; + +bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + return address.getSubject().size() && address.getSubject() != msg.getSubject(); +} + +} + +SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {} + +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); + + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered + //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? + if (original->hasHeaderChanged(msg)) { + //since as yet have no annotations, just write the revised header then the rest of the message as received + encoded.resize(16/*max header size*/ + original->getBareMessage().size); + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + HeaderAdapter header(msg); + encoder.writeHeader(header); + ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size); + } else { + //since as yet have no annotations, if the header hasn't + //changed and we still have the original bare message, can + //send the entire content as is + encoded.resize(original->getSize()); + ::memcpy(encoded.getData(), original->getData(), original->getSize()); + } + } else { + HeaderAdapter header(msg); + PropertiesAdapter properties(msg, address.getSubject()); + //compute size: + encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes())); + QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + //write header: + encoder.writeHeader(header); + //write delivery-annotations, write message-annotations (none yet supported) + //write properties + encoder.writeProperties(properties); + //write application-properties + encoder.writeApplicationProperties(msg.getHeaders()); + //write body + if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported + if (encoder.getPosition() < encoded.getSize()) { + QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition()); + encoded.trim(encoder.getPosition()); + } + //write footer (no annotations yet supported) + } +} +void SenderContext::Delivery::send(pn_link_t* sender) +{ + pn_delivery_tag_t tag; + tag.size = sizeof(id); + tag.bytes = reinterpret_cast<const char*>(&id); + token = pn_delivery(sender, tag); + pn_link_send(sender, encoded.getData(), encoded.getSize()); + pn_link_advance(sender); +} + +bool SenderContext::Delivery::accepted() +{ + return pn_delivery_remote_state(token) == PN_ACCEPTED; +} +void SenderContext::Delivery::settle() +{ + pn_delivery_settle(token); +} +void SenderContext::configure() const +{ + configure(pn_link_target(sender)); +} +void SenderContext::configure(pn_terminus_t* target) const +{ + pn_terminus_set_address(target, address.getName().c_str()); + //dynamic create: + AddressHelper helper(address); + if (helper.createEnabled(AddressHelper::FOR_SENDER)) { + helper.setNodeProperties(target); + } +} + +bool SenderContext::settled() +{ + return processUnsettled() == 0; +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.h b/cpp/src/qpid/messaging/amqp/SenderContext.h new file mode 100644 index 0000000000..3595379e70 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -0,0 +1,90 @@ +#ifndef QPID_MESSAGING_AMQP_SENDERCONTEXT_H +#define QPID_MESSAGING_AMQP_SENDERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <deque> +#include <string> +#include <vector> +#include "qpid/sys/IntegerTypes.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/amqp/EncodedMessage.h" + +struct pn_delivery_t; +struct pn_link_t; +struct pn_session_t; +struct pn_terminus_t; + +namespace qpid { +namespace messaging { + +class Message; +class MessageImpl; + +namespace amqp { +/** + * + */ +class SenderContext +{ + public: + class Delivery + { + public: + Delivery(int32_t id); + void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&); + void send(pn_link_t*); + bool accepted(); + void settle(); + private: + int32_t id; + pn_delivery_t* token; + EncodedMessage encoded; + }; + + SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target); + ~SenderContext(); + void close(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getUnsettled(); + const std::string& getName() const; + const std::string& getTarget() const; + Delivery* send(const qpid::messaging::Message& message); + void configure() const; + bool settled(); + private: + friend class ConnectionContext; + typedef std::deque<Delivery> Deliveries; + + const std::string name; + const qpid::messaging::Address address; + pn_link_t* sender; + int32_t nextId; + Deliveries deliveries; + uint32_t capacity; + + uint32_t processUnsettled(); + void configure(pn_terminus_t*) const; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SENDERCONTEXT_H*/ diff --git a/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/cpp/src/qpid/messaging/amqp/SenderHandle.cpp new file mode 100644 index 0000000000..b7168e5b31 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SenderHandle.h" +#include "ConnectionContext.h" +#include "SessionContext.h" +#include "SessionHandle.h" +#include "SenderContext.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Session.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c, + boost::shared_ptr<SessionContext> s, + boost::shared_ptr<SenderContext> sndr +) : connection(c), session(s), sender(sndr) {} + +void SenderHandle::send(const Message& message, bool sync) +{ + connection->send(sender, message, sync); +} + +void SenderHandle::close() +{ + session->closeSender(getName()); +} + +void SenderHandle::setCapacity(uint32_t capacity) +{ + connection->setCapacity(sender, capacity); +} + +uint32_t SenderHandle::getCapacity() +{ + return connection->getCapacity(sender); +} + +uint32_t SenderHandle::getUnsettled() +{ + return connection->getUnsettled(sender); +} + +const std::string& SenderHandle::getName() const +{ + return sender->getName(); +} + +qpid::messaging::Session SenderHandle::getSession() const +{ + return qpid::messaging::Session(new SessionHandle(connection, session)); +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/SenderHandle.h b/cpp/src/qpid/messaging/amqp/SenderHandle.h new file mode 100644 index 0000000000..3c6b666582 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SenderHandle.h @@ -0,0 +1,58 @@ +#ifndef QPID_MESSAGING_AMQP_SENDERHANDLE_H +#define QPID_MESSAGING_AMQP_SENDERHANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/SenderImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +class SenderContext; +/** + * + */ +class SenderHandle : public qpid::messaging::SenderImpl +{ + public: + SenderHandle(boost::shared_ptr<ConnectionContext> connection, + boost::shared_ptr<SessionContext> session, + boost::shared_ptr<SenderContext> sender + ); + void send(const Message& message, bool sync); + void close(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t getUnsettled(); + const std::string& getName() const; + Session getSession() const; + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; + boost::shared_ptr<SenderContext> sender; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SENDERHANDLE_H*/ diff --git a/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/cpp/src/qpid/messaging/amqp/SessionContext.cpp new file mode 100644 index 0000000000..9bdc658bc7 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionContext.h" +#include "SenderContext.h" +#include "ReceiverContext.h" +#include <boost/format.hpp> +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + +SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} +SessionContext::~SessionContext() +{ + senders.clear(); receivers.clear(); + pn_session_free(session); +} + +boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address) +{ + std::string name = address.getName(); + + int count = 1; + for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) { + name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); + } + boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address)); + senders[name] = s; + return s; +} + +boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) +{ + std::string name = address.getName(); + + int count = 1; + for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) { + name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); + } + boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); + receivers[name] = r; + return r; +} + +boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const +{ + SenderMap::const_iterator i = senders.find(name); + if (i == senders.end()) { + throw qpid::messaging::KeyError(std::string("No such sender") + name); + } else { + return i->second; + } +} + +boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const +{ + ReceiverMap::const_iterator i = receivers.find(name); + if (i == receivers.end()) { + throw qpid::messaging::KeyError(std::string("No such receiver") + name); + } else { + return i->second; + } +} + +void SessionContext::closeReceiver(const std::string&) +{ + +} + +void SessionContext::closeSender(const std::string&) +{ + +} + +boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/) +{ + return boost::shared_ptr<ReceiverContext>(); +} + +uint32_t SessionContext::getReceivable() +{ + return 0;//TODO +} + +uint32_t SessionContext::getUnsettledAcks() +{ + return 0;//TODO +} + +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) +{ + qpid::framing::SequenceNumber id = next++; + unacked[id] = delivery; + QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); + return id; +} + +void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) +{ + for (DeliveryMap::iterator i = begin; i != end; ++i) { + QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second); + pn_delivery_update(i->second, PN_ACCEPTED); + pn_delivery_settle(i->second);//TODO: different settlement modes? + } + unacked.erase(begin, end); +} + +void SessionContext::acknowledge() +{ + QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages"); + acknowledge(unacked.begin(), unacked.end()); +} + +void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) +{ + DeliveryMap::iterator i = unacked.find(id); + if (i != unacked.end()) { + acknowledge(cumulative ? unacked.begin() : i, ++i); + } +} + +bool SessionContext::settled() +{ + bool result = true; + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + if (!i->second->settled()) result = false; + } + return result; +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/SessionContext.h b/cpp/src/qpid/messaging/amqp/SessionContext.h new file mode 100644 index 0000000000..eca30a0e97 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -0,0 +1,81 @@ +#ifndef QPID_MESSAGING_AMQP_SESSIONCONTEXT_H +#define QPID_MESSAGING_AMQP_SESSIONCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <map> +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/sys/IntegerTypes.h" +#include "qpid/framing/SequenceNumber.h" + +struct pn_connection_t; +struct pn_session_t; +struct pn_delivery_t; + +namespace qpid { +namespace messaging { + +class Address; +class Duration; + +namespace amqp { + +class ConnectionContext; +class SenderContext; +class ReceiverContext; +/** + * + */ +class SessionContext +{ + public: + SessionContext(pn_connection_t*); + ~SessionContext(); + boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address); + boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address); + boost::shared_ptr<SenderContext> getSender(const std::string& name) const; + boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const; + void closeReceiver(const std::string&); + void closeSender(const std::string&); + boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout); + uint32_t getReceivable(); + uint32_t getUnsettledAcks(); + bool settled(); + private: + friend class ConnectionContext; + typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; + typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap; + typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap; + pn_session_t* session; + SenderMap senders; + ReceiverMap receivers; + DeliveryMap unacked; + qpid::framing::SequenceNumber next; + + qpid::framing::SequenceNumber record(pn_delivery_t*); + void acknowledge(); + void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); + void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SESSIONCONTEXT_H*/ diff --git a/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/cpp/src/qpid/messaging/amqp/SessionHandle.cpp new file mode 100644 index 0000000000..bf79771ca4 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionHandle.h" +#include "ConnectionContext.h" +#include "ConnectionHandle.h" +#include "ReceiverContext.h" +#include "ReceiverHandle.h" +#include "SenderContext.h" +#include "SenderHandle.h" +#include "SessionContext.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shared_ptr<SessionContext> s) : connection(c), session(s) {} + +void SessionHandle::commit() +{ + +} + +void SessionHandle::rollback() +{ + +} + +void SessionHandle::acknowledge(bool /*sync*/) +{ + connection->acknowledge(session, 0, false); +} + +void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative) +{ + //TODO: handle cumulative + connection->acknowledge(session, &msg, cumulative); +} + +void SessionHandle::reject(qpid::messaging::Message&) +{ + +} + +void SessionHandle::release(qpid::messaging::Message&) +{ + +} + +void SessionHandle::close() +{ + connection->endSession(session); +} + +void SessionHandle::sync(bool /*block*/) +{ + +} + +qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address) +{ + boost::shared_ptr<SenderContext> sender = session->createSender(address); + connection->attach(session, sender); + return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); +} + +qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address) +{ + boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); + connection->attach(session, receiver); + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver)); +} + +bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout) +{ + boost::shared_ptr<ReceiverContext> r = session->nextReceiver(timeout); + if (r) { + //TODO: cache handles in this case to avoid frequent allocation + receiver = qpid::messaging::Receiver(new ReceiverHandle(connection, session, r)); + return true; + } else { + return false; + } +} + +qpid::messaging::Receiver SessionHandle::nextReceiver(Duration timeout) +{ + qpid::messaging::Receiver r; + if (nextReceiver(r, timeout)) return r; + else throw qpid::messaging::NoMessageAvailable(); +} + +uint32_t SessionHandle::getReceivable() +{ + return session->getReceivable(); +} + +uint32_t SessionHandle::getUnsettledAcks() +{ + return session->getUnsettledAcks(); +} + +Sender SessionHandle::getSender(const std::string& name) const +{ + return qpid::messaging::Sender(new SenderHandle(connection, session, session->getSender(name))); +} + +Receiver SessionHandle::getReceiver(const std::string& name) const +{ + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, session->getReceiver(name))); +} + +Connection SessionHandle::getConnection() const +{ + return qpid::messaging::Connection(new ConnectionHandle(connection)); +} + +void SessionHandle::checkError() +{ + +} + + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/SessionHandle.h b/cpp/src/qpid/messaging/amqp/SessionHandle.h new file mode 100644 index 0000000000..5e843aaacc --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SessionHandle.h @@ -0,0 +1,64 @@ +#ifndef QPID_MESSAGING_AMQP_SESSIONIMPL_H +#define QPID_MESSAGING_AMQP_SESSIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "qpid/messaging/SessionImpl.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +class ConnectionContext; +class SessionContext; +/** + * + */ +class SessionHandle : public qpid::messaging::SessionImpl +{ + public: + SessionHandle(boost::shared_ptr<ConnectionContext>, boost::shared_ptr<SessionContext>); + void commit(); + void rollback(); + void acknowledge(bool sync); + void acknowledge(Message&, bool); + void reject(Message&); + void release(Message&); + void close(); + void sync(bool block); + qpid::messaging::Sender createSender(const Address& address); + qpid::messaging::Receiver createReceiver(const Address& address); + bool nextReceiver(Receiver& receiver, Duration timeout); + qpid::messaging::Receiver nextReceiver(Duration timeout); + uint32_t getReceivable(); + uint32_t getUnsettledAcks(); + qpid::messaging::Sender getSender(const std::string& name) const; + qpid::messaging::Receiver getReceiver(const std::string& name) const; + qpid::messaging::Connection getConnection() const; + void checkError(); + private: + boost::shared_ptr<ConnectionContext> connection; + boost::shared_ptr<SessionContext> session; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SESSIONIMPL_H*/ diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.cpp b/cpp/src/qpid/messaging/amqp/SslTransport.cpp new file mode 100644 index 0000000000..ea2375cb26 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SslTransport.cpp @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SslTransport.h" +#include "TransportContext.h" +#include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Poller.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <boost/format.hpp> + +using namespace qpid::sys; +using namespace qpid::sys::ssl; + +namespace qpid { +namespace messaging { +namespace amqp { + +// Static constructor which registers connector here +namespace { +Transport* create(TransportContext& c, Poller::shared_ptr p) +{ + return new SslTransport(c, p); +} + +struct StaticInit +{ + StaticInit() + { + Transport::add("ssl", &create); + }; +} init; +} + + +SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {} + +void SslTransport::connect(const std::string& host, const std::string& port) +{ + assert(!connector); + assert(!aio); + connector = AsynchConnector::create( + socket, + host, port, + boost::bind(&SslTransport::connected, this, _1), + boost::bind(&SslTransport::failed, this, _3)); + + connector->start(poller); +} + +void SslTransport::failed(const std::string& msg) +{ + QPID_LOG(debug, "Failed to connect: " << msg); + socket.close(); + context.closed(); +} + +void SslTransport::connected(const Socket&) +{ + context.opened(); + aio = AsynchIO::create(socket, + boost::bind(&SslTransport::read, this, _1, _2), + boost::bind(&SslTransport::eof, this, _1), + boost::bind(&SslTransport::disconnected, this, _1), + boost::bind(&SslTransport::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&SslTransport::write, this, _1)); + aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes + id = boost::str(boost::format("[%1%]") % socket.getFullAddress()); + aio->start(poller); +} + +void SslTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer) +{ + int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount); + if (decoded < buffer->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buffer->dataStart += decoded; + buffer->dataCount -= decoded; + aio->unread(buffer); + } else { + // Give whole buffer back to aio subsystem + aio->queueReadBuffer(buffer); + } +} + +void SslTransport::write(AsynchIO&) +{ + if (context.getCodec().canEncode()) { + AsynchIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { + size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer); + } + } + +} + +void SslTransport::close() +{ + QPID_LOG(debug, id << " SslTransport closing..."); + if (aio) + aio->queueWriteClose(); +} + +void SslTransport::eof(AsynchIO&) +{ + close(); +} + +void SslTransport::disconnected(AsynchIO&) +{ + close(); + socketClosed(*aio, socket); +} + +void SslTransport::socketClosed(AsynchIO&, const Socket&) +{ + if (aio) + aio->queueForDeletion(); + context.closed(); + QPID_LOG(debug, id << " Socket closed"); +} + +void SslTransport::abort() +{ + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&SslTransport::eof, this, _1)); + } +} + +void SslTransport::activateOutput() +{ + if (aio) aio->notifyPendingWrite(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.h b/cpp/src/qpid/messaging/amqp/SslTransport.h new file mode 100644 index 0000000000..f67ab95673 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SslTransport.h @@ -0,0 +1,74 @@ +#ifndef QPID_MESSAGING_AMQP_SSLTRANSPORT_H +#define QPID_MESSAGING_AMQP_SSLTRANSPORT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/Transport.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/ssl/SslSocket.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { +class ConnectionCodec; +class Poller; +class AsynchConnector; +class AsynchIO; +class AsynchIOBufferBase; +} + +namespace messaging { +namespace amqp { +class TransportContext; + +class SslTransport : public Transport +{ + public: + SslTransport(TransportContext&, boost::shared_ptr<qpid::sys::Poller> p); + + void connect(const std::string& host, const std::string& port); + + void activateOutput(); + void abort(); + void close(); + + private: + qpid::sys::ssl::SslSocket socket; + TransportContext& context; + qpid::sys::AsynchConnector* connector; + qpid::sys::AsynchIO* aio; + boost::shared_ptr<qpid::sys::Poller> poller; + bool closed; + std::string id; + + void connected(const qpid::sys::Socket&); + void failed(const std::string& msg); + void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void write(qpid::sys::AsynchIO&); + void eof(qpid::sys::AsynchIO&); + void disconnected(qpid::sys::AsynchIO&); + void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); + + friend class DriverImpl; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_SSLTRANSPORT_H*/ diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.cpp b/cpp/src/qpid/messaging/amqp/TcpTransport.cpp new file mode 100644 index 0000000000..98022d634c --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/TcpTransport.cpp @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "TcpTransport.h" +#include "ConnectionContext.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/Poller.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <boost/format.hpp> + +using namespace qpid::sys; + +namespace qpid { +namespace messaging { +namespace amqp { +// Static constructor which registers connector here +namespace { +Transport* create(TransportContext& c, Poller::shared_ptr p) +{ + return new TcpTransport(c, p); +} + +struct StaticInit +{ + StaticInit() + { + Transport::add("tcp", &create); + }; +} init; +} + +TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p) {} + +void TcpTransport::connect(const std::string& host, const std::string& port) +{ + assert(!connector); + assert(!aio); + connector = AsynchConnector::create( + *socket, + host, port, + boost::bind(&TcpTransport::connected, this, _1), + boost::bind(&TcpTransport::failed, this, _3)); + + connector->start(poller); +} + +void TcpTransport::failed(const std::string& msg) +{ + QPID_LOG(debug, "Failed to connect: " << msg); + connector = 0; + socket->close(); + context.closed(); +} + +void TcpTransport::connected(const Socket&) +{ + context.opened(); + connector = 0; + aio = AsynchIO::create(*socket, + boost::bind(&TcpTransport::read, this, _1, _2), + boost::bind(&TcpTransport::eof, this, _1), + boost::bind(&TcpTransport::disconnected, this, _1), + boost::bind(&TcpTransport::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&TcpTransport::write, this, _1)); + aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes + id = boost::str(boost::format("[%1%]") % socket->getFullAddress()); + aio->start(poller); +} + +void TcpTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer) +{ + int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount); + if (decoded < buffer->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buffer->dataStart += decoded; + buffer->dataCount -= decoded; + aio->unread(buffer); + } else { + // Give whole buffer back to aio subsystem + aio->queueReadBuffer(buffer); + } +} + +void TcpTransport::write(AsynchIO&) +{ + if (context.getCodec().canEncode()) { + AsynchIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { + size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer); + } + } + +} + +void TcpTransport::close() +{ + QPID_LOG(debug, id << " TcpTransport closing..."); + if (aio) + aio->queueWriteClose(); +} + +void TcpTransport::eof(AsynchIO&) +{ + close(); +} + +void TcpTransport::disconnected(AsynchIO&) +{ + close(); + socketClosed(*aio, *socket); +} + +void TcpTransport::socketClosed(AsynchIO&, const Socket&) +{ + if (aio) + aio->queueForDeletion(); + context.closed(); + QPID_LOG(debug, id << " Socket closed"); +} + +void TcpTransport::abort() +{ + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1)); + } else if (connector) { + // We're still connecting + connector->stop(); + failed("Connection timedout"); + } +} + +void TcpTransport::activateOutput() +{ + if (aio) aio->notifyPendingWrite(); +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.h b/cpp/src/qpid/messaging/amqp/TcpTransport.h new file mode 100644 index 0000000000..8c1087abb3 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/TcpTransport.h @@ -0,0 +1,71 @@ +#ifndef QPID_MESSAGING_AMQP_TCPTRANSPORT_H +#define QPID_MESSAGING_AMQP_TCPTRANSPORT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/Transport.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include <boost/scoped_ptr.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { +class ConnectionCodec; +class AsynchConnector; +class AsynchIO; +class AsynchIOBufferBase; +class Poller; +} +namespace messaging { +namespace amqp { +class TransportContext; + +class TcpTransport : public Transport +{ + public: + TcpTransport(TransportContext&, boost::shared_ptr<qpid::sys::Poller>); + + void connect(const std::string& host, const std::string& port); + + void activateOutput(); + void abort(); + void close(); + + private: + boost::scoped_ptr<qpid::sys::Socket> socket; + TransportContext& context; + qpid::sys::AsynchConnector* connector; + qpid::sys::AsynchIO* aio; + boost::shared_ptr<qpid::sys::Poller> poller; + std::string id; + + void connected(const qpid::sys::Socket&); + void failed(const std::string& msg); + void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void write(qpid::sys::AsynchIO&); + void eof(qpid::sys::AsynchIO&); + void disconnected(qpid::sys::AsynchIO&); + void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_TCPTRANSPORT_H*/ diff --git a/cpp/src/qpid/messaging/amqp/Transport.cpp b/cpp/src/qpid/messaging/amqp/Transport.cpp new file mode 100644 index 0000000000..21f51046b1 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/Transport.cpp @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/Transport.h" +#include "qpid/messaging/amqp/TransportContext.h" +#include <map> +#include <string> + +namespace qpid { +namespace messaging { +namespace amqp { +namespace { +typedef std::map<std::string, Transport::Factory*> Registry; + +Registry& theRegistry() +{ + static Registry factories; + return factories; +} +} + +Transport* Transport::create(const std::string& name, TransportContext& context, boost::shared_ptr<qpid::sys::Poller> poller) +{ + Registry::const_iterator i = theRegistry().find(name); + if (i != theRegistry().end()) return (i->second)(context, poller); + else return 0; +} +void Transport::add(const std::string& name, Factory* factory) +{ + theRegistry()[name] = factory; +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/Transport.h b/cpp/src/qpid/messaging/amqp/Transport.h new file mode 100644 index 0000000000..ee021f645b --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/Transport.h @@ -0,0 +1,48 @@ +#ifndef QPID_MESSAGING_AMQP_TRANSPORT_H +#define QPID_MESSAGING_AMQP_TRANSPORT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/OutputControl.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace sys { +class Poller; +} +namespace messaging { +namespace amqp { +class TransportContext; + +class Transport : public qpid::sys::OutputControl +{ + public: + virtual ~Transport() {} + virtual void connect(const std::string& host, const std::string& port) = 0; + virtual void close() = 0; + + typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>); + static Transport* create(const std::string& name, TransportContext&, boost::shared_ptr<qpid::sys::Poller>); + static void add(const std::string& name, Factory* factory); +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_TRANSPORT_H*/ diff --git a/cpp/src/qpid/messaging/amqp/TransportContext.h b/cpp/src/qpid/messaging/amqp/TransportContext.h new file mode 100644 index 0000000000..57192b5976 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/TransportContext.h @@ -0,0 +1,47 @@ +#ifndef QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H +#define QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace sys { +class Codec; +} +namespace messaging { +namespace amqp { + +/** + * Interface to be supplied by 'users' of Transport interface, in + * order to provide codec and handle callbaskc for opening and closing + * of connection. + */ +class TransportContext +{ + public: + virtual ~TransportContext() {} + virtual qpid::sys::Codec& getCodec() = 0; + virtual void closed() = 0; + virtual void opened() = 0; + private: +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_TRANSPORTCONTEXT_H*/ |