summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-10-19 17:15:46 +0000
committerGordon Sim <gsim@apache.org>2012-10-19 17:15:46 +0000
commitd672df2c5c286c14ea65256ea78314523f7fd1dc (patch)
treee788ad37059bf6fb1ba972a299effa316189c756 /qpid/cpp/src
parentec9fb822992d2a61f9c1b96a5a18427f2276f7ec (diff)
downloadqpid-python-d672df2c5c286c14ea65256ea78314523f7fd1dc.tar.gz
QPID-4368: Allow pluggable protocol implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1400177 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/Makefile.am5
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h4
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Protocol.cpp69
-rw-r--r--qpid/cpp/src/qpid/broker/Protocol.h82
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h48
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/Connection.cpp27
-rw-r--r--qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp71
-rw-r--r--qpid/cpp/src/qpid/messaging/ProtocolRegistry.h42
16 files changed, 373 insertions, 40 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index fb0db3e22e..10005f06e1 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1044,6 +1044,7 @@ set (qpidmessaging_SOURCES
qpid/messaging/Message.cpp
qpid/messaging/MessageImpl.h
qpid/messaging/MessageImpl.cpp
+ qpid/messaging/ProtocolRegistry.cpp
qpid/messaging/Receiver.cpp
qpid/messaging/ReceiverImpl.h
qpid/messaging/Session.cpp
@@ -1118,6 +1119,7 @@ set (qpidbroker_SOURCES
qpid/broker/MessageDeque.cpp
qpid/broker/MessageMap.cpp
qpid/broker/PriorityQueue.cpp
+ qpid/broker/Protocol.cpp
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 0130c62852..f2dcec5c99 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -608,6 +608,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageStoreModule.h \
qpid/broker/PriorityQueue.h \
qpid/broker/PriorityQueue.cpp \
+ qpid/broker/Protocol.h \
+ qpid/broker/Protocol.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.cpp \
@@ -657,6 +659,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveryManager.h \
qpid/broker/RecoveryManagerImpl.cpp \
qpid/broker/RecoveryManagerImpl.h \
+ qpid/broker/RecoverableMessageImpl.h \
qpid/broker/RetryList.cpp \
qpid/broker/RetryList.h \
qpid/broker/SaslAuthenticator.cpp \
@@ -797,6 +800,8 @@ libqpidmessaging_la_SOURCES = \
qpid/messaging/MessageImpl.h \
qpid/messaging/MessageImpl.cpp \
qpid/messaging/PrivateImplRef.h \
+ qpid/messaging/ProtocolRegistry.h \
+ qpid/messaging/ProtocolRegistry.cpp \
qpid/messaging/Sender.cpp \
qpid/messaging/Receiver.cpp \
qpid/messaging/Session.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 784530b9ab..ab29356fef 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -293,7 +293,7 @@ Broker::Broker(const Broker::Options& conf) :
// The cluster plug-in will setRecovery(false) on all but the first
// broker to join a cluster.
if (getRecovery()) {
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
recoveryInProgress = true;
store->recover(recoverer);
recoveryInProgress = false;
@@ -737,7 +737,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
try {
std::pair<boost::shared_ptr<Exchange>, bool> result =
@@ -759,7 +759,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 11757657b8..68b9ee8b22 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -28,6 +28,7 @@
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionManager.h"
@@ -38,6 +39,7 @@
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
#include "qpid/broker/ConfigurationObservers.h"
+#include "qpid/sys/ConnectionCodec.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -185,6 +187,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
+ ProtocolRegistry protocolRegistry;
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
@@ -223,6 +226,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
+ ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index d1dd1fae6c..a3c278e2e9 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -158,7 +158,7 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
types::Variant::Map properties;
- amqp_0_10::translate(clientProperties, properties);
+ qpid::amqp_0_10::translate(clientProperties, properties);
mgmtObject->set_remoteProperties(properties);
if (!procName.empty())
mgmtObject->set_remoteProcessName(procName);
diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp
new file mode 100644
index 0000000000..90d4d7833f
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Protocol.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Protocol.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s)
+{
+ qpid::sys::ConnectionCodec* codec = 0;
+ for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) {
+ codec = i->second->create(v, o, id, s);
+ }
+ return codec;
+}
+boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolRegistry::translate(const Message& m)
+{
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer;
+ const qpid::broker::amqp_0_10::MessageTransfer* ptr = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&m.getEncoding());
+ if (ptr) transfer = boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(ptr);
+ for (Protocols::const_iterator i = protocols.begin(); !transfer && i != protocols.end(); ++i) {
+ transfer = i->second->translate(m);
+ }
+ return transfer;
+}
+boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b)
+{
+ boost::shared_ptr<RecoverableMessage> msg;
+ for (Protocols::const_iterator i = protocols.begin(); !msg && i != protocols.end(); ++i) {
+ msg = i->second->recover(b);
+ }
+ return msg;
+}
+
+ProtocolRegistry::~ProtocolRegistry()
+{
+ for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) {
+ delete i->second;
+ }
+ protocols.clear();
+}
+void ProtocolRegistry::add(const std::string& key, Protocol* protocol)
+{
+ protocols[key] = protocol;
+ QPID_LOG(info, "Loaded protocol " << key);
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Protocol.h b/qpid/cpp/src/qpid/broker/Protocol.h
new file mode 100644
index 0000000000..2f268748fb
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Protocol.h
@@ -0,0 +1,82 @@
+#ifndef QPID_BROKER_PROTOCOL_H
+#define QPID_BROKER_PROTOCOL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+class ConnectionCodec;
+class OutputControl;
+struct SecuritySettings;
+}
+namespace framing {
+class Buffer;
+class ProtocolVersion;
+}
+namespace broker {
+class Message;
+class RecoverableMessage;
+namespace amqp_0_10 {
+class MessageTransfer;
+}
+
+/**
+ * A simple abstraction allowing pluggable protocol(s)
+ * (versions). AMQP 0-10 is considered the default. Alternatives must
+ * provide a ConnectionCodec for encoding/decoding the protocol in
+ * full, a means of translating the native message format of that
+ * protocol into AMQP 0-10 and a means of recovering durable messages
+ * from disk.
+ */
+class Protocol
+{
+ public:
+ virtual ~Protocol() {}
+ virtual qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) = 0;
+ virtual boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&) = 0;
+ virtual boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&) = 0;
+
+ private:
+};
+
+class ProtocolRegistry : public Protocol
+{
+ public:
+ qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&);
+ boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
+
+ ~ProtocolRegistry();
+ void add(const std::string&, Protocol*);
+ private:
+ //name may be useful for descriptive purposes or even for some
+ //limited manipulation of ordering
+ typedef std::map<std::string, Protocol*> Protocols;
+ Protocols protocols;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PROTOCOL_H*/
diff --git a/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
new file mode 100644
index 0000000000..f3ead261c1
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
@@ -0,0 +1,48 @@
+#ifndef QPID_BROKER_RECOVERABLEMESSAGEIMPL_H
+#define QPID_BROKER_RECOVERABLEMESSAGEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoverableMessage.h"
+
+namespace qpid {
+namespace broker {
+class DtxBuffer;
+class Message;
+class Queue;
+
+class RecoverableMessageImpl : public RecoverableMessage
+{
+ Message msg;
+public:
+ RecoverableMessageImpl(const Message& _msg);
+ ~RecoverableMessageImpl() {};
+ void setPersistenceId(uint64_t id);
+ void setRedelivered();
+ bool loadContent(uint64_t available);
+ void decodeContent(framing::Buffer& buffer);
+ void recover(boost::shared_ptr<Queue> queue);
+ void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+ void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_RECOVERABLEMESSAGEIMPL_H*/
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index b04d7c34e0..6d831563e2 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -25,6 +25,8 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Protocol.h"
+#include "qpid/broker/RecoverableMessageImpl.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -38,26 +40,11 @@ namespace qpid {
namespace broker {
RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
- DtxManager& _dtxMgr)
- : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {}
+ DtxManager& _dtxMgr, ProtocolRegistry& p)
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
-class RecoverableMessageImpl : public RecoverableMessage
-{
- Message msg;
-public:
- RecoverableMessageImpl(const Message& _msg);
- ~RecoverableMessageImpl() {};
- void setPersistenceId(uint64_t id);
- void setRedelivered();
- bool loadContent(uint64_t available);
- void decodeContent(framing::Buffer& buffer);
- void recover(Queue::shared_ptr queue);
- void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
- void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
-};
-
class RecoverableQueueImpl : public RecoverableQueue
{
Queue::shared_ptr queue;
@@ -131,10 +118,15 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer&
RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
{
- //TODO: determine encoding/version actually used
- boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
- transfer->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
+ framing::Buffer sniffer(buffer.getPointer(), buffer.available());
+ RecoverableMessage::shared_ptr m = protocols.recover(sniffer);
+ if (m) {
+ return m;
+ } else {
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ transfer->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
+ }
}
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
index 1ad7892b13..60ca28092d 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -30,15 +30,17 @@
namespace qpid {
namespace broker {
+class ProtocolRegistry;
class RecoveryManagerImpl : public RecoveryManager{
QueueRegistry& queues;
ExchangeRegistry& exchanges;
LinkRegistry& links;
DtxManager& dtxMgr;
+ ProtocolRegistry& protocols;
public:
RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
- DtxManager& dtxMgr);
+ DtxManager& dtxMgr, ProtocolRegistry&);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
index 757f6efc59..e5657fd93e 100644
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -31,7 +31,7 @@ namespace broker {
using framing::ProtocolVersion;
using qpid::sys::SecuritySettings;
-typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr;
+typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
typedef std::auto_ptr<Connection> ConnectionPtr;
typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
@@ -43,12 +43,14 @@ SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, cons
const SecuritySettings& external) {
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, false));
+ CodecPtr c(new qpid::amqp_0_10::Connection(out, id, false));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
return sc.release();
+ } else {
+ return broker.getProtocolRegistry().create(v, out, id, external);
}
return 0;
}
@@ -58,7 +60,7 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
// used to create connections from one broker to another
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, true));
+ CodecPtr c(new qpid::amqp_0_10::Connection(out, id, true));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true ));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index f3d97dc078..767305af81 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -300,7 +300,8 @@ Consumer(_name, type),
arguments(_arguments),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
- deliveryCount(0)
+ deliveryCount(0),
+ protocols(parent->getSession().getBroker().getProtocolRegistry())
{
if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
{
@@ -344,11 +345,11 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa
bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
allocateCredit(msg);
+ boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
- consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+ consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
- const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 1d7ccbfa9c..be7c8d490a 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -56,6 +56,7 @@ namespace broker {
class Exchange;
class MessageStore;
+class ProtocolRegistry;
class SessionContext;
class SessionState;
@@ -97,6 +98,7 @@ class SemanticState : private boost::noncopyable {
const int syncFrequency;
int deliveryCount;
qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
+ ProtocolRegistry& protocols;
bool checkCredit(const Message& msg);
void allocateCredit(const Message& msg);
diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp
index bd90aa54a7..fde931038b 100644
--- a/qpid/cpp/src/qpid/messaging/Connection.cpp
+++ b/qpid/cpp/src/qpid/messaging/Connection.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
#include "qpid/messaging/Session.h"
#include "qpid/messaging/SessionImpl.h"
#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/messaging/ProtocolRegistry.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/log/Statement.h"
@@ -40,22 +41,32 @@ Connection& Connection::operator=(const Connection& c) { return PI::assign(*this
Connection::~Connection() { PI::dtor(*this); }
Connection::Connection(const std::string& url, const std::string& o)
-{
+{
Variant::Map options;
AddressParser parser(o);
if (o.empty() || parser.parseMap(options)) {
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+ if (impl) {
+ PI::ctor(*this, impl);
+ } else {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ }
} else {
throw InvalidOptionString("Invalid option string: " + o);
}
}
Connection::Connection(const std::string& url, const Variant::Map& options)
{
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+ if (impl) {
+ PI::ctor(*this, impl);
+ } else {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ }
}
Connection::Connection()
-{
+{
Variant::Map options;
std::string url = "amqp:tcp:127.0.0.1:5672";
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
@@ -67,12 +78,12 @@ bool Connection::isOpen() const { return impl->isOpen(); }
void Connection::close() { impl->close(); }
Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); }
Session Connection::createTransactionalSession(const std::string& name)
-{
+{
return impl->newSession(true, name);
}
Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
void Connection::setOption(const std::string& name, const Variant& value)
-{
+{
impl->setOption(name, value);
}
std::string Connection::getAuthenticatedUsername()
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
new file mode 100644
index 0000000000..4d91382eaa
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ProtocolRegistry.h"
+#include "qpid/Exception.h"
+#include "qpid/client/amqp0_10/ConnectionImpl.h"
+#include <map>
+
+using qpid::types::Variant;
+
+namespace qpid {
+namespace messaging {
+namespace {
+typedef std::map<std::string, ProtocolRegistry::Factory*> Registry;
+
+Registry& theRegistry()
+{
+ static Registry factories;
+ return factories;
+}
+
+bool extract(const std::string& key, Variant& value, const Variant::Map& in, Variant::Map& out)
+{
+ bool matched = false;
+ for (Variant::Map::const_iterator i = in.begin(); i != in.end(); ++i) {
+ if (i->first == key) {
+ value = i->second;
+ matched = true;
+ } else {
+ out.insert(*i);
+ }
+ }
+ return matched;
+}
+}
+
+ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::Map& options)
+{
+ Variant name;
+ Variant::Map stripped;
+ if (extract("protocol", name, options, stripped)) {
+ Registry::const_iterator i = theRegistry().find(name.asString());
+ if (i != theRegistry().end()) return (i->second)(url, stripped);
+ else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
+ else throw qpid::Exception("Unsupported protocol: " + name.asString());
+ }
+ return 0;
+}
+void ProtocolRegistry::add(const std::string& name, Factory* factory)
+{
+ theRegistry()[name] = factory;
+}
+
+}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
new file mode 100644
index 0000000000..bcb62248a5
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
@@ -0,0 +1,42 @@
+#ifndef QPID_MESSAGING_PROTOCOLREGISTRY_H
+#define QPID_MESSAGING_PROTOCOLREGISTRY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace messaging {
+class ConnectionImpl;
+/**
+ * Registry for different implementations of the messaging API e.g AMQP 1.0
+ */
+class ProtocolRegistry
+{
+ public:
+ typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options);
+ static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options);
+ static void add(const std::string& name, Factory* factory);
+ private:
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_PROTOCOLREGISTRY_H*/