summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-07-31 13:55:11 +0000
committerAlan Conway <aconway@apache.org>2014-07-31 13:55:11 +0000
commitc9276b03da088b3f4d3f4b527f2e02703e2729eb (patch)
treeb3f0553221917ffeb27f9562d9df7a5d9f8000d2 /qpid/cpp
parent5b6f651d3f2c5b33fa510e120dc0e98f6a95409a (diff)
downloadqpid-python-c9276b03da088b3f4d3f4b527f2e02703e2729eb.tar.gz
QPID-5942: qpid HA cluster may end-up in joining state after HA primary is killed
There are two issues here, both related to the fact that rgmanager sees qpidd and qpidd-primary as two separate services. 1. The service start/stop scripts can be called concurrently. This can lead to running a qpidd process who's pid is not in the pidfile. rgmanager cannot detect or kill this qpidd and cannot start another qpidd because of the lock on the qpidd data directory. 2. rgmanager sees a primary failure as two failures: qpidd and qpidd-primary, and will then try to stop and start both services. The order of these actions is not defined and can lead to rgmanager killing a service it has just started. This patch makes two major changes to the init scripts: 1. Uses flock to lock the sensitive stop/start part of the scripts to ensure they are not executed concurrently. 2. On "stop" the scripts check if a running qpidd is primary or not. "qpidd stop" is a no-op if the running broker is primary, "qpidd-primary stop" is a no op if it is not. This ensures that a broker will be stopped by the same stream of service actions that started it. Minor changes in this patch: - better logging of broker start-up and shut-down sequence. - qpid-ha heartbeat use half of timeout option. - add missing timeouts in qpid-ha. Notes: This changes the behavior of 'clusvcadm -d <qpidd-service>' on the primary node. Previously this would have stopped the qpidd service on that node, killed the qpidd process and relocated the primary service. Now this will stop the qpidd service (as far as rgmanager is concerned) but will not kill qpidd or relocate the primary service. When the primary is relocated the qpidd service wil not be able to re-start on that node until it is re-enabled with 'clusvcadm -e'. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614895 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rwxr-xr-xqpid/cpp/etc/qpidd-primary.in25
-rwxr-xr-xqpid/cpp/etc/qpidd.in93
-rw-r--r--qpid/cpp/src/posix/QpiddBroker.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Daemon.cpp2
6 files changed, 102 insertions, 54 deletions
diff --git a/qpid/cpp/etc/qpidd-primary.in b/qpid/cpp/etc/qpidd-primary.in
index 3119ebac6e..e79d8cc09c 100755
--- a/qpid/cpp/etc/qpidd-primary.in
+++ b/qpid/cpp/etc/qpidd-primary.in
@@ -65,24 +65,35 @@ status() {
fi
}
+# Ensure no concurrent start/stop of services.
+lock() {
+ export QPID_HA_LOCK_HELD=1 # For calls to the qpidd script
+ exec 9< $QPID_INIT
+ flock 9
+}
+
start() {
- $QPID_INIT start
- echo -n $"Promoting qpid daemon to cluster primary: "
- $QPID_HA promote
- [ "$?" -eq 0 ] && success || failure
+ lock
+ $QPID_INIT start primary || return $?
+ echo -n $"Promoting to primary: "
+ err=$($QPID_HA promote 2>&1)
+ RETVAL=$?
+ [ $RETVAL = 0 ] && success || { echo -n "$err: "; failure; }
+ echo
+ return $RETVAL
}
stop() {
- $QPID_INIT stop
+ $QPID_INIT stop primary
}
reload() {
echo 1>&2 $"$0: reload not supported"
- exit 3
+ return 3
}
restart() {
- $QPID_INIT restart && start
+ stop && start
}
# See how we were called.
diff --git a/qpid/cpp/etc/qpidd.in b/qpid/cpp/etc/qpidd.in
index 09d87e565d..3d5f5424bd 100755
--- a/qpid/cpp/etc/qpidd.in
+++ b/qpid/cpp/etc/qpidd.in
@@ -40,6 +40,7 @@ lockfile=/var/lock/subsys/$prog
pidfile=/var/run/qpidd.pid
# The following variables can be overridden in @sysconfdir@/sysconfig/$prog
+QPID_INIT=@initdir@/$prog
QPID_BIN=@sbindir@/$prog
QPID_DATA_DIR=/var/lib/qpidd
QPID_CONFIG=@confdir@/qpidd.conf
@@ -59,10 +60,13 @@ for f in $QPID_BIN; do
done
qpid_ping() {
- # Only do ping test if qpid-ha is installed.
- if test -x $QPID_HA; then
- $QPID_HA $QPID_HA_OPTIONS ping
- fi
+ test -x $QPID_HA || return 0 # Only if qpid-ha installed
+ $QPID_HA $QPID_HA_OPTIONS ping >/dev/null 2>&1
+}
+
+qpid_is_primary() {
+ # Only if qpid-ha is installed.
+ test -x $QPID_HA && $QPID_HA $QPID_HA_OPTIONS status --is-primary >/dev/null 2>&1
}
RETVAL=0
@@ -76,67 +80,86 @@ fi
do_status() {
# Check PID file and ping for liveness
- MESSAGE=$(status $prog) || {
- RC=$?
- echo $MESSAGE
- return $RC
- }
- qpid_ping || {
- return 1
+ MESSAGE=$(status -p $pidfile $prog) && {
+ qpid_ping || return 1
}
+ RC=$?
echo $MESSAGE
- return 0
+ return $RC
+}
+
+FLOCK_FD=9
+# Ensure no concurrent start/stop of services.
+lock() {
+ [ "$QPID_HA_LOCK_HELD" ] || { # Held by caller
+ exec 9< $QPID_INIT
+ flock $FLOCK_FD
+ }
}
start() {
- echo -n $"Starting Qpid AMQP daemon: "
- daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --daemon $QPIDD_OPTIONS
- RETVAL=$?
- echo
- [ $RETVAL = 0 ] && touch $lockfile
- if [ $RETVAL = 0 ]; then
- touch $pidfile
- chown qpidd.qpidd $pidfile
- [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile
- runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile"
- fi
- return $RETVAL
+ lock
+ echo -n $"Starting Qpid AMQP daemon: "
+ daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --daemon $QPIDD_OPTIONS --close-fd $FLOCK_FD
+ RETVAL=$?
+ echo
+ [ $RETVAL = 0 ] && touch $lockfile
+ if [ $RETVAL = 0 ]; then
+ touch $pidfile
+ chown qpidd.qpidd $pidfile
+ [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile
+ runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile"
+ fi
+ return $RETVAL
}
stop() {
+ lock
+ # Primary script does not stop backup brokers and vice versa.
+ if qpid_is_primary; then
+ [ "$1" != primary ] && SKIP="Not stopping Qpid daemon, primary"
+ else
+ [ "$1" = primary ] && SKIP="Not stopping Qpid daemon, not primary"
+ fi
+ if [ -n "$SKIP" ]; then
+ echo -n "$SKIP: "
+ success
+ RETVAL=0
+ else
echo -n $"Stopping Qpid AMQP daemon: "
killproc -p ${pidfile} $prog
RETVAL=$?
- echo
[ $RETVAL = 0 ] && rm -f ${lockfile} ${pidfile}
+ fi
+ echo
+ return $RETVAL
}
reload() {
- echo 1>&2 $"$0: reload not supported"
- exit 3
+ echo 1>&2 $"$0: reload not supported"
+ return 3
}
restart() {
- stop
- start
+ stop && start
}
# See how we were called.
case "$1" in
- start|stop|restart|reload)
- $1
+ start|stop|restart|reload)
+ $1 $2
;;
- status)
+ status)
do_status
RETVAL=$?
;;
- force-reload)
+ force-reload)
restart
;;
- try-restart|condrestart)
+ try-restart|condrestart)
[ -e $lockfile ] && restart || :
;;
- *)
+ *)
echo 1>&2 $"Usage: $0 {start|stop|status|restart|condrestart|try-restart|force-reload}"
exit 2
esac
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index 831b2e0641..9228c2d18d 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -59,12 +59,14 @@ const std::string TCP = "tcp";
struct DaemonOptions : public qpid::Options {
bool daemon;
bool quit;
+ bool kill;
bool check;
+ std::vector<int> closeFd;
int wait;
std::string piddir;
std::string transport;
- DaemonOptions() : qpid::Options("Daemon options"), daemon(false), quit(false), check(false), wait(600), transport(TCP)
+ DaemonOptions() : qpid::Options("Daemon options"), daemon(false), quit(false), kill(false), check(false), wait(600), transport(TCP)
{
char *home = ::getenv("HOME");
@@ -78,9 +80,11 @@ struct DaemonOptions : public qpid::Options {
("daemon,d", pure_switch(daemon), "Run as a daemon. Logs to syslog by default in this mode.")
("transport", optValue(transport, "TRANSPORT"), "The transport for which to return the port")
("pid-dir", optValue(piddir, "DIR"), "Directory where port-specific PID file is stored")
+ ("close-fd", optValue(closeFd, "FD"), "File descriptors that the daemon should close")
("wait,w", optValue(wait, "SECONDS"), "Sets the maximum wait time to initialize or shutdown the daemon. If the daemon fails to initialize/shutdown, prints an error and returns 1")
("check,c", pure_switch(check), "Prints the daemon's process ID to stdout and returns 0 if the daemon is running, otherwise returns 1")
- ("quit,q", pure_switch(quit), "Tells the daemon to shut down");
+ ("quit,q", pure_switch(quit), "Tells the daemon to shut down with an INT signal")
+ ("kill,k", pure_switch(kill), "Kill the daemon with a KILL signal.");
}
};
@@ -132,12 +136,15 @@ struct QpiddDaemon : public Daemon {
/** Code for parent process */
void parent() {
uint16_t port = wait(options->daemon.wait);
- if (options->parent->broker.port == 0
- ) cout << port << endl;
+ if (options->parent->broker.port == 0)
+ cout << port << endl;
}
/** Code for forked child process */
void child() {
+ // Close extra FDs requested in options.
+ for (size_t i = 0; i < options->daemon.closeFd.size(); ++i)
+ ::close(options->daemon.closeFd[i]);
boost::intrusive_ptr<Broker> brokerPtr(new Broker(options->parent->broker));
ScopedSetBroker ssb(brokerPtr);
brokerPtr->accept();
@@ -157,21 +164,22 @@ int QpiddBroker::execute (QpiddOptions *options) {
if (myOptions == 0)
throw Exception("Internal error obtaining platform options");
- if (myOptions->daemon.check || myOptions->daemon.quit) {
+ if (myOptions->daemon.check || myOptions->daemon.quit || myOptions->daemon.kill) {
pid_t pid;
try {
pid = Daemon::getPid(myOptions->daemon.piddir, options->broker.port);
- } catch (const ErrnoException& e) {
+ } catch (const Exception& e) {
// This is not a critical error, usually means broker is not running
- QPID_LOG(notice, "Cannot stop broker: " << e.what());
+ QPID_LOG(notice, "Broker is not running: " << e.what());
return 1;
}
if (pid < 0)
return 1;
if (myOptions->daemon.check)
cout << pid << endl;
- if (myOptions->daemon.quit) {
- if (kill(pid, SIGINT) < 0)
+ if (myOptions->daemon.quit || myOptions->daemon.kill) {
+ int signal = myOptions->daemon.kill ? SIGKILL : SIGINT;
+ if (kill(pid, signal) < 0)
throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno));
// Wait for the process to die before returning
int retry=myOptions->daemon.wait*1000; // Try up to "--wait N" seconds, do retry every millisecond
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 3afaf43b81..6648ae706d 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -236,8 +236,10 @@ Broker::Broker(const BrokerOptions& conf) :
queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
timestampRcvMsgs(conf.timestampRcvMsgs),
+ logPrefix(Msg() << "Broker " << sys::SystemInfo::getProcessId()),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
+ QPID_LOG(notice, logPrefix << " initializing");
if (!dataDir.isEnabled()) {
QPID_LOG (info, "No data directory - Disabling persistent configuration");
}
@@ -388,6 +390,7 @@ Broker::Broker(const BrokerOptions& conf) :
finalize();
throw;
}
+ QPID_LOG(notice, logPrefix << " initialized");
}
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
@@ -499,7 +502,7 @@ void Broker::setStore () {
void Broker::run() {
if (config.workerThreads > 0) {
- QPID_LOG(notice, "Broker running");
+ QPID_LOG(notice, logPrefix << " running");
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
std::vector<Thread> t(numIOThreads-1);
@@ -515,6 +518,7 @@ void Broker::run() {
for (int i=0; i<numIOThreads-1; ++i) {
t[i].join();
}
+ QPID_LOG(notice, logPrefix << " stopped");
} else {
throw Exception((boost::format("Invalid value for worker-threads: %1%") % config.workerThreads).str());
}
@@ -528,6 +532,7 @@ void Broker::shutdown() {
}
Broker::~Broker() {
+ QPID_LOG(notice, logPrefix << " shutting down");
if (mgmtObject != 0)
mgmtObject->debugStats("destroying");
shutdown();
@@ -536,7 +541,7 @@ Broker::~Broker() {
SaslAuthenticator::fini();
timer->stop();
managementAgent.reset();
- QPID_LOG(notice, "Shut down");
+ QPID_LOG(notice, logPrefix << " shutdown complete");
}
ManagementObject::shared_ptr Broker::GetManagementObject(void) const
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 25e4e9f0ef..46dbe5d5b5 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -160,6 +160,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
bool timestampRcvMsgs;
+ std::string logPrefix;
public:
QPID_BROKER_EXTERN virtual ~Broker();
diff --git a/qpid/cpp/src/qpid/broker/Daemon.cpp b/qpid/cpp/src/qpid/broker/Daemon.cpp
index 281345bc95..5b6f898332 100644
--- a/qpid/cpp/src/qpid/broker/Daemon.cpp
+++ b/qpid/cpp/src/qpid/broker/Daemon.cpp
@@ -193,7 +193,7 @@ void Daemon::ready(uint16_t port) { // child
*/
int desired_write = sizeof(uint16_t);
if ( desired_write > ::write(pipeFds[1], & port, desired_write) ) {
- throw Exception("Error writing to parent." );
+ throw ErrnoException("Error writing to parent" );
}
QPID_LOG(debug, "Daemon ready on port: " << port);