summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-09 19:52:44 +0000
committerGordon Sim <gsim@apache.org>2008-04-09 19:52:44 +0000
commit6cf06312f5f9d686a0af76f7c1c08732a7ae27cb (patch)
treed415498924b234d73645a9ae74f486085fe63f15 /cpp/src/qpid
parent363ed6d7e6a0986c49a9ae5d43954dfec08e7e8c (diff)
downloadqpid-python-6cf06312f5f9d686a0af76f7c1c08732a7ae27cb.tar.gz
Fixes and automated tests for federation function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@646505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp4
-rw-r--r--cpp/src/qpid/broker/Connection.cpp11
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.cpp14
3 files changed, 19 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 566b9cc197..32819380de 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -49,8 +49,6 @@ void Bridge::create()
framing::AMQP_ServerProxy::Session session(channel);
session.open(0);
- //peer.getSession().open(0);
-
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
@@ -62,7 +60,7 @@ void Bridge::create()
string queue = "bridge_queue_";
queue += Uuid(true).str();
peer.getQueue().declare(0, queue, "", false, false, true, true, FieldTable());
- peer.getQueue().bind(0, queue, args.i_dest, args.i_key, FieldTable());
+ peer.getQueue().bind(0, queue, args.i_src, args.i_key, FieldTable());
peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0, false, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 1e55087390..ef1100a2ec 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -70,7 +70,7 @@ class Connection::MgmtLink : public Connection::MgmtWrapper
Bridges cancelled;//holds list of bridges pending cancellation
Bridges active;//holds active bridges
uint channelCounter;
- sys::Mutex lock;
+ sys::Mutex linkLock;
void cancel(Bridge*);
@@ -88,7 +88,7 @@ public:
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
ConnectionState(out_, broker_),
adapter(*this),
- mgmtClosing(0),
+ mgmtClosing(false),
mgmtId(mgmtId_)
{
initMgmt();
@@ -164,6 +164,7 @@ bool Connection::doOutput()
try{
//process any pending mgmt commands:
if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
//then do other output as needed:
return outputTasks.doOutput();
@@ -203,8 +204,9 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
switch (methodId)
{
case management::Client::METHOD_CLOSE :
- mgmtClosing = 1;
+ mgmtClosing = true;
if (mgmtWrapper.get()) mgmtWrapper->closing();
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
case management::Link::METHOD_BRIDGE :
@@ -253,6 +255,7 @@ void Connection::MgmtLink::closing()
void Connection::MgmtLink::processPending()
{
+ Mutex::ScopedLock l(linkLock);
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -271,6 +274,7 @@ void Connection::MgmtLink::processPending()
void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
{
+ Mutex::ScopedLock l(linkLock);
created.push_back(new Bridge(channelCounter++, connection,
boost::bind(&MgmtLink::cancel, this, _1),
dynamic_cast<const management::ArgsLinkBridge&>(args)));
@@ -278,6 +282,7 @@ void Connection::MgmtLink::process(Connection& connection, const management::Arg
void Connection::MgmtLink::cancel(Bridge* b)
{
+ Mutex::ScopedLock l(linkLock);
//need to take this out the active map and add it to the cancelled map
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
if (&(*i) == b) {
diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp
index f5a629248c..8901aa9f9d 100644
--- a/cpp/src/qpid/broker/PreviewConnection.cpp
+++ b/cpp/src/qpid/broker/PreviewConnection.cpp
@@ -70,7 +70,7 @@ class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper
Bridges cancelled;//holds list of bridges pending cancellation
Bridges active;//holds active bridges
uint channelCounter;
- sys::Mutex lock;
+ sys::Mutex linkLock;
void cancel(Bridge*);
@@ -88,7 +88,7 @@ public:
PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
adapter(*this, isLink),
- mgmtClosing(0),
+ mgmtClosing(false),
mgmtId(mgmtId_)
{
Manageable* parent = broker.GetVhostObject ();
@@ -111,7 +111,7 @@ PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& brok
PreviewConnection::~PreviewConnection () {}
void PreviewConnection::received(framing::AMQFrame& frame){
- if (mgmtClosing)
+ if (mgmtClosing)
close (403, "Closed by Management Request", 0, 0);
if (frame.getChannel() == 0) {
@@ -159,6 +159,8 @@ bool PreviewConnection::doOutput()
try{
//process any pending mgmt commands:
if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
+
//then do other output as needed:
return outputTasks.doOutput();
@@ -198,8 +200,9 @@ Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId,
switch (methodId)
{
case management::Client::METHOD_CLOSE :
- mgmtClosing = 1;
+ mgmtClosing = true;
if (mgmtWrapper.get()) mgmtWrapper->closing();
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
case management::Link::METHOD_BRIDGE :
@@ -248,6 +251,7 @@ void PreviewConnection::MgmtLink::closing()
void PreviewConnection::MgmtLink::processPending()
{
+ Mutex::ScopedLock l(linkLock);
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -266,6 +270,7 @@ void PreviewConnection::MgmtLink::processPending()
void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args)
{
+ Mutex::ScopedLock l(linkLock);
created.push_back(new Bridge(channelCounter++, connection,
boost::bind(&MgmtLink::cancel, this, _1),
dynamic_cast<const management::ArgsLinkBridge&>(args)));
@@ -273,6 +278,7 @@ void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const m
void PreviewConnection::MgmtLink::cancel(Bridge* b)
{
+ Mutex::ScopedLock l(linkLock);
//need to take this out the active map and add it to the cancelled map
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
if (&(*i) == b) {