summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-02-13 15:39:59 +0000
committerAlan Conway <aconway@apache.org>2008-02-13 15:39:59 +0000
commit4deb3b4fd425aecd74c8c00983a199dd139df858 (patch)
treef45f1d0b6b974ec1af44bfcd1e8f38df938f39a3 /cpp
parenteee5501a55c9fe6386df9f43739cebf6cd6f9356 (diff)
downloadqpid-python-4deb3b4fd425aecd74c8c00983a199dd139df858.tar.gz
Broker::connect - connect to URL, return ConnectionInputHandler.
M src/qpid/broker/Broker.cpp M src/qpid/broker/Broker.h M src/qpid/sys/Acceptor.h M src/qpid/sys/AsynchIOAcceptor.cpp AMQBody::match - test for matching frames. M src/qpid/framing/AMQBody.cpp M src/qpid/framing/AMQBody.h Url::throwIfEmpty() - test for empty URL. M src/qpid/Url.cpp M src/qpid/Url.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@627484 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/Url.cpp4
-rw-r--r--cpp/src/qpid/Url.h3
-rw-r--r--cpp/src/qpid/broker/Broker.cpp23
-rw-r--r--cpp/src/qpid/broker/Broker.h15
-rw-r--r--cpp/src/qpid/framing/AMQBody.cpp37
-rw-r--r--cpp/src/qpid/framing/AMQBody.h3
-rw-r--r--cpp/src/qpid/sys/Acceptor.h5
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp9
8 files changed, 85 insertions, 14 deletions
diff --git a/cpp/src/qpid/Url.cpp b/cpp/src/qpid/Url.cpp
index d056edc683..aa53d5cbe2 100644
--- a/cpp/src/qpid/Url.cpp
+++ b/cpp/src/qpid/Url.cpp
@@ -160,6 +160,10 @@ void Url::parseNoThrow(const char* url) {
clear();
}
+void Url::throwIfEmpty() const {
+ throw InvalidUrl("URL contains no addresses");
+}
+
std::istream& operator>>(std::istream& is, Url& url) {
std::string s;
is >> s;
diff --git a/cpp/src/qpid/Url.h b/cpp/src/qpid/Url.h
index 2e24ba948d..20f42db0ad 100644
--- a/cpp/src/qpid/Url.h
+++ b/cpp/src/qpid/Url.h
@@ -78,6 +78,9 @@ struct Url : public std::vector<Address> {
template<class T> Url& operator=(T s) { parse(s); return *this; }
+ /** Throw InvalidUrl if the URL does not contain any addresses. */
+ void throwIfEmpty() const;
+
/** Replace contents with parsed URL as defined in
* https://wiki.108.redhat.com/jira/browse/AMQP-95
*@exception InvalidUrl if the url is invalid.
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 117a93b571..0a0eb0a0df 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -40,6 +40,7 @@
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/Url.h"
#include <boost/bind.hpp>
@@ -263,11 +264,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_ECHO :
status = Manageable::STATUS_OK;
break;
- case management::Broker::METHOD_CONNECT :
- connect(dynamic_cast<management::ArgsBrokerConnect&>(args));
+ case management::Broker::METHOD_CONNECT : {
+ management::ArgsBrokerConnect& hp=
+ dynamic_cast<management::ArgsBrokerConnect&>(args);
+ connect(hp.i_host, hp.i_port);
status = Manageable::STATUS_OK;
break;
-
+ }
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -277,9 +280,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
-void Broker::connect(management::ArgsBrokerConnect& args)
+sys::ConnectionInputHandler* Broker::connect(
+ const std::string& host, uint16_t port,
+ sys::ConnectionInputHandlerFactory* f)
+{
+ return getAcceptor().connect(host, port, f ? f : &factory);
+}
+
+sys::ConnectionInputHandler* Broker::connect(
+ const Url& url, sys::ConnectionInputHandlerFactory* f)
{
- getAcceptor().connect(args.i_host, args.i_port, &factory);
+ url.throwIfEmpty();
+ TcpAddress addr=boost::get<TcpAddress>(url[0]);
+ return connect(addr.host, addr.port, f);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 68dbf570f0..55bc7644a5 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -47,6 +47,9 @@
#include <vector>
namespace qpid {
+
+class Url;
+
namespace broker {
static const uint16_t DEFAULT_PORT=5672;
@@ -54,7 +57,8 @@ static const uint16_t DEFAULT_PORT=5672;
/**
* A broker instance.
*/
-class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable
+class Broker : public sys::Runnable, public Plugin::Target,
+ public management::Manageable
{
public:
@@ -111,6 +115,14 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
+ /** Create a connection to another broker. */
+ sys::ConnectionInputHandler*
+ connect(const std::string& host, uint16_t port,
+ sys::ConnectionInputHandlerFactory* =0);
+ /** Create a connection to another broker. */
+ sys::ConnectionInputHandler*
+ connect(const Url& url, sys::ConnectionInputHandlerFactory* =0);
+
private:
sys::Acceptor& getAcceptor() const;
@@ -129,7 +141,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
Vhost::shared_ptr vhostObject;
void declareStandardExchange(const std::string& name, const std::string& type);
- void connect(management::ArgsBrokerConnect& args);
};
}}
diff --git a/cpp/src/qpid/framing/AMQBody.cpp b/cpp/src/qpid/framing/AMQBody.cpp
index a64d224a86..b3eeae0615 100644
--- a/cpp/src/qpid/framing/AMQBody.cpp
+++ b/cpp/src/qpid/framing/AMQBody.cpp
@@ -19,15 +19,46 @@
*
*/
-#include "AMQBody.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeartbeatBody.h"
#include <iostream>
-std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body)
+namespace qpid {
+namespace framing {
+
+std::ostream& operator<<(std::ostream& out, const AMQBody& body)
{
body.print(out);
return out;
}
-qpid::framing::AMQBody::~AMQBody() {}
+AMQBody::~AMQBody() {}
+
+namespace {
+struct MatchBodies : public AMQBodyConstVisitor {
+ const AMQBody& body;
+ bool match;
+
+ MatchBodies(const AMQBody& b) : body(b), match(false) {}
+ virtual ~MatchBodies() {}
+
+ virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); }
+ virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); }
+ virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); }
+ virtual void visit(const AMQMethodBody& x) {
+ const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body);
+ match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId());
+ }
+};
+}
+bool AMQBody::match(const AMQBody& a, const AMQBody& b) {
+ MatchBodies matcher(a);
+ b.accept(matcher);
+ return matcher.match;
+}
+}} // namespace
diff --git a/cpp/src/qpid/framing/AMQBody.h b/cpp/src/qpid/framing/AMQBody.h
index b05301bd05..f3bf65470c 100644
--- a/cpp/src/qpid/framing/AMQBody.h
+++ b/cpp/src/qpid/framing/AMQBody.h
@@ -59,6 +59,9 @@ class AMQBody
virtual AMQMethodBody* getMethod() { return 0; }
virtual const AMQMethodBody* getMethod() const { return 0; }
+
+ /** Match if same type and same class/method ID for methods */
+ static bool match(const AMQBody& , const AMQBody& );
};
std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 7ff03e0eeb..5eb1f1a500 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -29,6 +29,7 @@ namespace qpid {
namespace sys {
class ConnectionInputHandlerFactory;
+class ConnectionInputHandler;
class Acceptor : public qpid::SharedObject<Acceptor>
{
@@ -38,7 +39,9 @@ class Acceptor : public qpid::SharedObject<Acceptor>
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
virtual void run(ConnectionInputHandlerFactory* factory) = 0;
- virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
+ virtual ConnectionInputHandler* connect(
+ const std::string& host, int16_t port,
+ ConnectionInputHandlerFactory* factory) = 0;
/** Note: this function is async-signal safe */
virtual void shutdown() = 0;
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 9fd32add72..0586eb9d36 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -54,7 +54,10 @@ class AsynchIOAcceptor : public Acceptor {
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
- void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
+ ConnectionInputHandler* connect(
+ const std::string& host, int16_t port,
+ ConnectionInputHandlerFactory* factory);
+
void shutdown();
uint16_t getPort() const;
@@ -188,7 +191,7 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
}
}
-void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
@@ -209,7 +212,7 @@ void AsynchIOAcceptor::connect(const std::string& host, int16_t port, Connection
aio->queueReadBuffer(new Buff);
}
aio->start(poller);
-
+ return handler;
}