summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp45
-rw-r--r--cpp/src/qpid/broker/Broker.h2
-rw-r--r--cpp/src/qpid/log/Logger.cpp5
-rw-r--r--cpp/src/qpid/sys/BlockingQueue.h6
-rw-r--r--cpp/src/qpid/sys/Waitable.h3
5 files changed, 58 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 33364e48df..0cc150419f 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -34,10 +34,15 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
+#include "qpid/log/posix/SinkOptions.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/Uuid.h"
@@ -51,6 +56,7 @@
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/Address.h"
+#include "qpid/StringUtils.h"
#include "qpid/Url.h"
#include "qpid/Version.h"
@@ -388,17 +394,21 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
- QPID_LOG (debug, "Broker::ManagementMethod [id=" << methodId << "]");
-
switch (methodId)
{
case _qmf::Broker::METHOD_ECHO :
+ QPID_LOG (debug, "Broker::echo("
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+ << ", "
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
+ << ")");
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CONNECT : {
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
+ QPID_LOG (debug, "Broker::connect()");
string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
@@ -415,13 +425,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : {
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
+ QPID_LOG (debug, "Broker::queueMoveMessages()");
if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
break;
}
+ case _qmf::Broker::METHOD_SETLOGLEVEL :
+ setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level);
+ QPID_LOG (debug, "Broker::setLogLevel()");
+ status = Manageable::STATUS_OK;
+ break;
+ case _qmf::Broker::METHOD_GETLOGLEVEL :
+ dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel();
+ QPID_LOG (debug, "Broker::getLogLevel()");
+ status = Manageable::STATUS_OK;
+ break;
default:
+ QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
@@ -429,6 +451,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
+void Broker::setLogLevel(const std::string& level)
+{
+ QPID_LOG(notice, "Changing log level to " << level);
+ std::vector<std::string> selectors;
+ split(selectors, level, ", ");
+ qpid::log::Logger::instance().reconfigure(selectors);
+}
+
+std::string Broker::getLogLevel()
+{
+ std::string level;
+ const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
+ for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
+ if (i != selectors.begin()) level += std::string(",");
+ level += *i;
+ }
+ return level;
+}
+
boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
ProtocolFactoryMap::const_iterator i
= name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 4f089a1fca..32e2c8ab6b 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -145,6 +145,8 @@ public:
void declareStandardExchange(const std::string& name, const std::string& type);
void setStore ();
+ void setLogLevel(const std::string& level);
+ std::string getLogLevel();
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index 939e2502cc..2217cdddbd 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -146,6 +146,11 @@ void Logger::configure(const Options& opts) {
options.sinkOptions->setup(this);
}
+void Logger::reconfigure(const std::vector<std::string>& selectors) {
+ options.selectors = selectors;
+ select(Selector(options));
+}
+
void Logger::setPrefix(const std::string& p) { prefix = p; }
}} // namespace qpid::log
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h
index 210cb4ad82..ca6b529930 100644
--- a/cpp/src/qpid/sys/BlockingQueue.h
+++ b/cpp/src/qpid/sys/BlockingQueue.h
@@ -53,9 +53,13 @@ public:
Waitable::ScopedWait w(waitable);
if (timeout == TIME_INFINITE) {
while (queue.empty()) waitable.wait();
- } else {
+ } else if (timeout) {
AbsTime deadline(now(),timeout);
while (queue.empty() && deadline > now()) waitable.wait(deadline);
+ } else {
+ //ensure zero timeout pop does not miss the fact that
+ //queue is closed
+ waitable.checkException();
}
}
if (queue.empty()) return false;
diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h
index 7701b6f97d..8f6bd17049 100644
--- a/cpp/src/qpid/sys/Waitable.h
+++ b/cpp/src/qpid/sys/Waitable.h
@@ -79,6 +79,9 @@ class Waitable : public Monitor {
/** True if the waitable has an exception */
bool hasException() const { return exception; }
+ /** Throws if the waitable has an exception */
+ void checkException() const { exception.raise(); }
+
/** Clear the exception if any */
void resetException() { exception.reset(); }