diff options
| author | Gordon Sim <gsim@apache.org> | 2012-10-19 17:15:46 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2012-10-19 17:15:46 +0000 |
| commit | d672df2c5c286c14ea65256ea78314523f7fd1dc (patch) | |
| tree | e788ad37059bf6fb1ba972a299effa316189c756 /qpid/cpp/src | |
| parent | ec9fb822992d2a61f9c1b96a5a18427f2276f7ec (diff) | |
| download | qpid-python-d672df2c5c286c14ea65256ea78314523f7fd1dc.tar.gz | |
QPID-4368: Allow pluggable protocol implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1400177 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Protocol.cpp | 69 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Protocol.h | 82 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h | 48 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 34 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/Connection.cpp | 27 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp | 71 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/ProtocolRegistry.h | 42 |
16 files changed, 373 insertions, 40 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index fb0db3e22e..10005f06e1 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1044,6 +1044,7 @@ set (qpidmessaging_SOURCES qpid/messaging/Message.cpp qpid/messaging/MessageImpl.h qpid/messaging/MessageImpl.cpp + qpid/messaging/ProtocolRegistry.cpp qpid/messaging/Receiver.cpp qpid/messaging/ReceiverImpl.h qpid/messaging/Session.cpp @@ -1118,6 +1119,7 @@ set (qpidbroker_SOURCES qpid/broker/MessageDeque.cpp qpid/broker/MessageMap.cpp qpid/broker/PriorityQueue.cpp + qpid/broker/Protocol.cpp qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp qpid/broker/QueueListeners.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 0130c62852..f2dcec5c99 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -608,6 +608,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageStoreModule.h \ qpid/broker/PriorityQueue.h \ qpid/broker/PriorityQueue.cpp \ + qpid/broker/Protocol.h \ + qpid/broker/Protocol.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.cpp \ @@ -657,6 +659,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveryManager.h \ qpid/broker/RecoveryManagerImpl.cpp \ qpid/broker/RecoveryManagerImpl.h \ + qpid/broker/RecoverableMessageImpl.h \ qpid/broker/RetryList.cpp \ qpid/broker/RetryList.h \ qpid/broker/SaslAuthenticator.cpp \ @@ -797,6 +800,8 @@ libqpidmessaging_la_SOURCES = \ qpid/messaging/MessageImpl.h \ qpid/messaging/MessageImpl.cpp \ qpid/messaging/PrivateImplRef.h \ + qpid/messaging/ProtocolRegistry.h \ + qpid/messaging/ProtocolRegistry.cpp \ qpid/messaging/Sender.cpp \ qpid/messaging/Receiver.cpp \ qpid/messaging/Session.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 784530b9ab..ab29356fef 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -293,7 +293,7 @@ Broker::Broker(const Broker::Options& conf) : // The cluster plug-in will setRecovery(false) on all but the first // broker to join a cluster. if (getRecovery()) { - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager); + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry); recoveryInProgress = true; store->recover(recoverer); recoveryInProgress = false; @@ -737,7 +737,7 @@ void Broker::createObject(const std::string& type, const std::string& name, else extensions[i->first] = i->second; } framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); + qpid::amqp_0_10::translate(extensions, arguments); try { std::pair<boost::shared_ptr<Exchange>, bool> result = @@ -759,7 +759,7 @@ void Broker::createObject(const std::string& type, const std::string& name, else extensions[i->first] = i->second; } framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); + qpid::amqp_0_10::translate(extensions, arguments); bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 11757657b8..68b9ee8b22 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -28,6 +28,7 @@ #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/MessageStore.h" +#include "qpid/broker/Protocol.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/SessionManager.h" @@ -38,6 +39,7 @@ #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" #include "qpid/broker/ConfigurationObservers.h" +#include "qpid/sys/ConnectionCodec.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -185,6 +187,7 @@ class Broker : public sys::Runnable, public Plugin::Target, bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConsumerFactories consumerFactories; + ProtocolRegistry protocolRegistry; mutable sys::Mutex linkClientPropertiesLock; framing::FieldTable linkClientProperties; @@ -223,6 +226,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DtxManager& getDtxManager() { return dtxManager; } DataDir& getDataDir() { return dataDir; } Options& getOptions() { return config; } + ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; } void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index d1dd1fae6c..a3c278e2e9 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -158,7 +158,7 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID); types::Variant::Map properties; - amqp_0_10::translate(clientProperties, properties); + qpid::amqp_0_10::translate(clientProperties, properties); mgmtObject->set_remoteProperties(properties); if (!procName.empty()) mgmtObject->set_remoteProcessName(procName); diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp new file mode 100644 index 0000000000..90d4d7833f --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Protocol.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Protocol.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s) +{ + qpid::sys::ConnectionCodec* codec = 0; + for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) { + codec = i->second->create(v, o, id, s); + } + return codec; +} +boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolRegistry::translate(const Message& m) +{ + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer; + const qpid::broker::amqp_0_10::MessageTransfer* ptr = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&m.getEncoding()); + if (ptr) transfer = boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(ptr); + for (Protocols::const_iterator i = protocols.begin(); !transfer && i != protocols.end(); ++i) { + transfer = i->second->translate(m); + } + return transfer; +} +boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b) +{ + boost::shared_ptr<RecoverableMessage> msg; + for (Protocols::const_iterator i = protocols.begin(); !msg && i != protocols.end(); ++i) { + msg = i->second->recover(b); + } + return msg; +} + +ProtocolRegistry::~ProtocolRegistry() +{ + for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) { + delete i->second; + } + protocols.clear(); +} +void ProtocolRegistry::add(const std::string& key, Protocol* protocol) +{ + protocols[key] = protocol; + QPID_LOG(info, "Loaded protocol " << key); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Protocol.h b/qpid/cpp/src/qpid/broker/Protocol.h new file mode 100644 index 0000000000..2f268748fb --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Protocol.h @@ -0,0 +1,82 @@ +#ifndef QPID_BROKER_PROTOCOL_H +#define QPID_BROKER_PROTOCOL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <map> +#include <string> +#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace sys { +class ConnectionCodec; +class OutputControl; +struct SecuritySettings; +} +namespace framing { +class Buffer; +class ProtocolVersion; +} +namespace broker { +class Message; +class RecoverableMessage; +namespace amqp_0_10 { +class MessageTransfer; +} + +/** + * A simple abstraction allowing pluggable protocol(s) + * (versions). AMQP 0-10 is considered the default. Alternatives must + * provide a ConnectionCodec for encoding/decoding the protocol in + * full, a means of translating the native message format of that + * protocol into AMQP 0-10 and a means of recovering durable messages + * from disk. + */ +class Protocol +{ + public: + virtual ~Protocol() {} + virtual qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) = 0; + virtual boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&) = 0; + virtual boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&) = 0; + + private: +}; + +class ProtocolRegistry : public Protocol +{ + public: + qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&); + boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); + + ~ProtocolRegistry(); + void add(const std::string&, Protocol*); + private: + //name may be useful for descriptive purposes or even for some + //limited manipulation of ordering + typedef std::map<std::string, Protocol*> Protocols; + Protocols protocols; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PROTOCOL_H*/ diff --git a/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h new file mode 100644 index 0000000000..f3ead261c1 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h @@ -0,0 +1,48 @@ +#ifndef QPID_BROKER_RECOVERABLEMESSAGEIMPL_H +#define QPID_BROKER_RECOVERABLEMESSAGEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "RecoverableMessage.h" + +namespace qpid { +namespace broker { +class DtxBuffer; +class Message; +class Queue; + +class RecoverableMessageImpl : public RecoverableMessage +{ + Message msg; +public: + RecoverableMessageImpl(const Message& _msg); + ~RecoverableMessageImpl() {}; + void setPersistenceId(uint64_t id); + void setRedelivered(); + bool loadContent(uint64_t available); + void decodeContent(framing::Buffer& buffer); + void recover(boost::shared_ptr<Queue> queue); + void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); + void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_RECOVERABLEMESSAGEIMPL_H*/ diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index b04d7c34e0..6d831563e2 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -25,6 +25,8 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/Protocol.h" +#include "qpid/broker/RecoverableMessageImpl.h" #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -38,26 +40,11 @@ namespace qpid { namespace broker { RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr) - : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {} + DtxManager& _dtxMgr, ProtocolRegistry& p) + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} -class RecoverableMessageImpl : public RecoverableMessage -{ - Message msg; -public: - RecoverableMessageImpl(const Message& _msg); - ~RecoverableMessageImpl() {}; - void setPersistenceId(uint64_t id); - void setRedelivered(); - bool loadContent(uint64_t available); - void decodeContent(framing::Buffer& buffer); - void recover(Queue::shared_ptr queue); - void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); - void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); -}; - class RecoverableQueueImpl : public RecoverableQueue { Queue::shared_ptr queue; @@ -131,10 +118,15 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - //TODO: determine encoding/version actually used - boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); - transfer->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); + framing::Buffer sniffer(buffer.getPointer(), buffer.available()); + RecoverableMessage::shared_ptr m = protocols.recover(sniffer); + if (m) { + return m; + } else { + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + transfer->decodeHeader(buffer); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); + } } RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h index 1ad7892b13..60ca28092d 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -30,15 +30,17 @@ namespace qpid { namespace broker { +class ProtocolRegistry; class RecoveryManagerImpl : public RecoveryManager{ QueueRegistry& queues; ExchangeRegistry& exchanges; LinkRegistry& links; DtxManager& dtxMgr; + ProtocolRegistry& protocols; public: RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links, - DtxManager& dtxMgr); + DtxManager& dtxMgr, ProtocolRegistry&); ~RecoveryManagerImpl(); RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 757f6efc59..e5657fd93e 100644 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -31,7 +31,7 @@ namespace broker { using framing::ProtocolVersion; using qpid::sys::SecuritySettings; -typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr; +typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr; typedef std::auto_ptr<SecureConnection> SecureConnectionPtr; typedef std::auto_ptr<Connection> ConnectionPtr; typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr; @@ -43,12 +43,14 @@ SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, cons const SecuritySettings& external) { if (v == ProtocolVersion(0, 10)) { SecureConnectionPtr sc(new SecureConnection()); - CodecPtr c(new amqp_0_10::Connection(out, id, false)); + CodecPtr c(new qpid::amqp_0_10::Connection(out, id, false)); ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false)); i->setSecureConnection(sc.get()); c->setInputHandler(InputPtr(i.release())); sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); return sc.release(); + } else { + return broker.getProtocolRegistry().create(v, out, id, external); } return 0; } @@ -58,7 +60,7 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { // used to create connections from one broker to another SecureConnectionPtr sc(new SecureConnection()); - CodecPtr c(new amqp_0_10::Connection(out, id, true)); + CodecPtr c(new qpid::amqp_0_10::Connection(out, id, true)); ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true )); i->setSecureConnection(sc.get()); c->setInputHandler(InputPtr(i.release())); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index f3d97dc078..767305af81 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -300,7 +300,8 @@ Consumer(_name, type), arguments(_arguments), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), - deliveryCount(0) + deliveryCount(0), + protocols(parent->getSession().getBroker().getProtocolRegistry()) { if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) { @@ -344,11 +345,11 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) { allocateCredit(msg); + boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(), - consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg)); + consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit()); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset - const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding()); record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(), ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE, diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 1d7ccbfa9c..be7c8d490a 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -56,6 +56,7 @@ namespace broker { class Exchange; class MessageStore; +class ProtocolRegistry; class SessionContext; class SessionState; @@ -97,6 +98,7 @@ class SemanticState : private boost::noncopyable { const int syncFrequency; int deliveryCount; qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject; + ProtocolRegistry& protocols; bool checkCredit(const Message& msg); void allocateCredit(const Message& msg); diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp index bd90aa54a7..fde931038b 100644 --- a/qpid/cpp/src/qpid/messaging/Connection.cpp +++ b/qpid/cpp/src/qpid/messaging/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ #include "qpid/messaging/Session.h" #include "qpid/messaging/SessionImpl.h" #include "qpid/messaging/PrivateImplRef.h" +#include "qpid/messaging/ProtocolRegistry.h" #include "qpid/client/amqp0_10/ConnectionImpl.h" #include "qpid/log/Statement.h" @@ -40,22 +41,32 @@ Connection& Connection::operator=(const Connection& c) { return PI::assign(*this Connection::~Connection() { PI::dtor(*this); } Connection::Connection(const std::string& url, const std::string& o) -{ +{ Variant::Map options; AddressParser parser(o); if (o.empty() || parser.parseMap(options)) { - PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + ConnectionImpl* impl = ProtocolRegistry::create(url, options); + if (impl) { + PI::ctor(*this, impl); + } else { + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + } } else { throw InvalidOptionString("Invalid option string: " + o); } } Connection::Connection(const std::string& url, const Variant::Map& options) { - PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + ConnectionImpl* impl = ProtocolRegistry::create(url, options); + if (impl) { + PI::ctor(*this, impl); + } else { + PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); + } } Connection::Connection() -{ +{ Variant::Map options; std::string url = "amqp:tcp:127.0.0.1:5672"; PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); @@ -67,12 +78,12 @@ bool Connection::isOpen() const { return impl->isOpen(); } void Connection::close() { impl->close(); } Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); } Session Connection::createTransactionalSession(const std::string& name) -{ +{ return impl->newSession(true, name); } Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } void Connection::setOption(const std::string& name, const Variant& value) -{ +{ impl->setOption(name, value); } std::string Connection::getAuthenticatedUsername() diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp new file mode 100644 index 0000000000..4d91382eaa --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ProtocolRegistry.h" +#include "qpid/Exception.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" +#include <map> + +using qpid::types::Variant; + +namespace qpid { +namespace messaging { +namespace { +typedef std::map<std::string, ProtocolRegistry::Factory*> Registry; + +Registry& theRegistry() +{ + static Registry factories; + return factories; +} + +bool extract(const std::string& key, Variant& value, const Variant::Map& in, Variant::Map& out) +{ + bool matched = false; + for (Variant::Map::const_iterator i = in.begin(); i != in.end(); ++i) { + if (i->first == key) { + value = i->second; + matched = true; + } else { + out.insert(*i); + } + } + return matched; +} +} + +ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::Map& options) +{ + Variant name; + Variant::Map stripped; + if (extract("protocol", name, options, stripped)) { + Registry::const_iterator i = theRegistry().find(name.asString()); + if (i != theRegistry().end()) return (i->second)(url, stripped); + else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped); + else throw qpid::Exception("Unsupported protocol: " + name.asString()); + } + return 0; +} +void ProtocolRegistry::add(const std::string& name, Factory* factory) +{ + theRegistry()[name] = factory; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h new file mode 100644 index 0000000000..bcb62248a5 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h @@ -0,0 +1,42 @@ +#ifndef QPID_MESSAGING_PROTOCOLREGISTRY_H +#define QPID_MESSAGING_PROTOCOLREGISTRY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" + +namespace qpid { +namespace messaging { +class ConnectionImpl; +/** + * Registry for different implementations of the messaging API e.g AMQP 1.0 + */ +class ProtocolRegistry +{ + public: + typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options); + static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options); + static void add(const std::string& name, Factory* factory); + private: +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_PROTOCOLREGISTRY_H*/ |
