diff options
| author | Alan Conway <aconway@apache.org> | 2008-02-13 15:39:59 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-02-13 15:39:59 +0000 |
| commit | 4deb3b4fd425aecd74c8c00983a199dd139df858 (patch) | |
| tree | f45f1d0b6b974ec1af44bfcd1e8f38df938f39a3 /cpp | |
| parent | eee5501a55c9fe6386df9f43739cebf6cd6f9356 (diff) | |
| download | qpid-python-4deb3b4fd425aecd74c8c00983a199dd139df858.tar.gz | |
Broker::connect - connect to URL, return ConnectionInputHandler.
M src/qpid/broker/Broker.cpp
M src/qpid/broker/Broker.h
M src/qpid/sys/Acceptor.h
M src/qpid/sys/AsynchIOAcceptor.cpp
AMQBody::match - test for matching frames.
M src/qpid/framing/AMQBody.cpp
M src/qpid/framing/AMQBody.h
Url::throwIfEmpty() - test for empty URL.
M src/qpid/Url.cpp
M src/qpid/Url.h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@627484 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/Url.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/Url.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQBody.cpp | 37 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQBody.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 9 |
8 files changed, 85 insertions, 14 deletions
diff --git a/cpp/src/qpid/Url.cpp b/cpp/src/qpid/Url.cpp index d056edc683..aa53d5cbe2 100644 --- a/cpp/src/qpid/Url.cpp +++ b/cpp/src/qpid/Url.cpp @@ -160,6 +160,10 @@ void Url::parseNoThrow(const char* url) { clear(); } +void Url::throwIfEmpty() const { + throw InvalidUrl("URL contains no addresses"); +} + std::istream& operator>>(std::istream& is, Url& url) { std::string s; is >> s; diff --git a/cpp/src/qpid/Url.h b/cpp/src/qpid/Url.h index 2e24ba948d..20f42db0ad 100644 --- a/cpp/src/qpid/Url.h +++ b/cpp/src/qpid/Url.h @@ -78,6 +78,9 @@ struct Url : public std::vector<Address> { template<class T> Url& operator=(T s) { parse(s); return *this; } + /** Throw InvalidUrl if the URL does not contain any addresses. */ + void throwIfEmpty() const; + /** Replace contents with parsed URL as defined in * https://wiki.108.redhat.com/jira/browse/AMQP-95 *@exception InvalidUrl if the url is invalid. diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 117a93b571..0a0eb0a0df 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -40,6 +40,7 @@ #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/Url.h" #include <boost/bind.hpp> @@ -263,11 +264,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_ECHO : status = Manageable::STATUS_OK; break; - case management::Broker::METHOD_CONNECT : - connect(dynamic_cast<management::ArgsBrokerConnect&>(args)); + case management::Broker::METHOD_CONNECT : { + management::ArgsBrokerConnect& hp= + dynamic_cast<management::ArgsBrokerConnect&>(args); + connect(hp.i_host, hp.i_port); status = Manageable::STATUS_OK; break; - + } case management::Broker::METHOD_JOINCLUSTER : case management::Broker::METHOD_LEAVECLUSTER : status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -277,9 +280,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } -void Broker::connect(management::ArgsBrokerConnect& args) +sys::ConnectionInputHandler* Broker::connect( + const std::string& host, uint16_t port, + sys::ConnectionInputHandlerFactory* f) +{ + return getAcceptor().connect(host, port, f ? f : &factory); +} + +sys::ConnectionInputHandler* Broker::connect( + const Url& url, sys::ConnectionInputHandlerFactory* f) { - getAcceptor().connect(args.i_host, args.i_port, &factory); + url.throwIfEmpty(); + TcpAddress addr=boost::get<TcpAddress>(url[0]); + return connect(addr.host, addr.port, f); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 68dbf570f0..55bc7644a5 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -47,6 +47,9 @@ #include <vector> namespace qpid { + +class Url; + namespace broker { static const uint16_t DEFAULT_PORT=5672; @@ -54,7 +57,8 @@ static const uint16_t DEFAULT_PORT=5672; /** * A broker instance. */ -class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable +class Broker : public sys::Runnable, public Plugin::Target, + public management::Manageable { public: @@ -111,6 +115,14 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); + /** Create a connection to another broker. */ + sys::ConnectionInputHandler* + connect(const std::string& host, uint16_t port, + sys::ConnectionInputHandlerFactory* =0); + /** Create a connection to another broker. */ + sys::ConnectionInputHandler* + connect(const Url& url, sys::ConnectionInputHandlerFactory* =0); + private: sys::Acceptor& getAcceptor() const; @@ -129,7 +141,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M Vhost::shared_ptr vhostObject; void declareStandardExchange(const std::string& name, const std::string& type); - void connect(management::ArgsBrokerConnect& args); }; }} diff --git a/cpp/src/qpid/framing/AMQBody.cpp b/cpp/src/qpid/framing/AMQBody.cpp index a64d224a86..b3eeae0615 100644 --- a/cpp/src/qpid/framing/AMQBody.cpp +++ b/cpp/src/qpid/framing/AMQBody.cpp @@ -19,15 +19,46 @@ * */ -#include "AMQBody.h" +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeartbeatBody.h" #include <iostream> -std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body) +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& out, const AMQBody& body) { body.print(out); return out; } -qpid::framing::AMQBody::~AMQBody() {} +AMQBody::~AMQBody() {} + +namespace { +struct MatchBodies : public AMQBodyConstVisitor { + const AMQBody& body; + bool match; + + MatchBodies(const AMQBody& b) : body(b), match(false) {} + virtual ~MatchBodies() {} + + virtual void visit(const AMQHeaderBody&) { match=dynamic_cast<const AMQHeaderBody*>(&body); } + virtual void visit(const AMQContentBody&) { match=dynamic_cast<const AMQContentBody*>(&body); } + virtual void visit(const AMQHeartbeatBody&) { match=dynamic_cast<const AMQHeartbeatBody*>(&body); } + virtual void visit(const AMQMethodBody& x) { + const AMQMethodBody* y=dynamic_cast<const AMQMethodBody*>(&body); + match = (y && y->amqpMethodId() == x.amqpMethodId() && y->amqpClassId() == x.amqpClassId()); + } +}; +} +bool AMQBody::match(const AMQBody& a, const AMQBody& b) { + MatchBodies matcher(a); + b.accept(matcher); + return matcher.match; +} +}} // namespace diff --git a/cpp/src/qpid/framing/AMQBody.h b/cpp/src/qpid/framing/AMQBody.h index b05301bd05..f3bf65470c 100644 --- a/cpp/src/qpid/framing/AMQBody.h +++ b/cpp/src/qpid/framing/AMQBody.h @@ -59,6 +59,9 @@ class AMQBody virtual AMQMethodBody* getMethod() { return 0; } virtual const AMQMethodBody* getMethod() const { return 0; } + + /** Match if same type and same class/method ID for methods */ + static bool match(const AMQBody& , const AMQBody& ); }; std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 7ff03e0eeb..5eb1f1a500 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -29,6 +29,7 @@ namespace qpid { namespace sys { class ConnectionInputHandlerFactory; +class ConnectionInputHandler; class Acceptor : public qpid::SharedObject<Acceptor> { @@ -38,7 +39,9 @@ class Acceptor : public qpid::SharedObject<Acceptor> virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; virtual void run(ConnectionInputHandlerFactory* factory) = 0; - virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0; + virtual ConnectionInputHandler* connect( + const std::string& host, int16_t port, + ConnectionInputHandlerFactory* factory) = 0; /** Note: this function is async-signal safe */ virtual void shutdown() = 0; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 9fd32add72..0586eb9d36 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -54,7 +54,10 @@ class AsynchIOAcceptor : public Acceptor { AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); - void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory); + ConnectionInputHandler* connect( + const std::string& host, int16_t port, + ConnectionInputHandlerFactory* factory); + void shutdown(); uint16_t getPort() const; @@ -188,7 +191,7 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { } } -void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) +ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); @@ -209,7 +212,7 @@ void AsynchIOAcceptor::connect(const std::string& host, int16_t port, Connection aio->queueReadBuffer(new Buff); } aio->start(poller); - + return handler; } |
