diff options
| -rwxr-xr-x | qpid/cpp/etc/qpidd-primary.in | 25 | ||||
| -rwxr-xr-x | qpid/cpp/etc/qpidd.in | 93 | ||||
| -rw-r--r-- | qpid/cpp/src/posix/QpiddBroker.cpp | 26 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Daemon.cpp | 2 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid-ha | 12 |
7 files changed, 109 insertions, 59 deletions
diff --git a/qpid/cpp/etc/qpidd-primary.in b/qpid/cpp/etc/qpidd-primary.in index 3119ebac6e..e79d8cc09c 100755 --- a/qpid/cpp/etc/qpidd-primary.in +++ b/qpid/cpp/etc/qpidd-primary.in @@ -65,24 +65,35 @@ status() { fi } +# Ensure no concurrent start/stop of services. +lock() { + export QPID_HA_LOCK_HELD=1 # For calls to the qpidd script + exec 9< $QPID_INIT + flock 9 +} + start() { - $QPID_INIT start - echo -n $"Promoting qpid daemon to cluster primary: " - $QPID_HA promote - [ "$?" -eq 0 ] && success || failure + lock + $QPID_INIT start primary || return $? + echo -n $"Promoting to primary: " + err=$($QPID_HA promote 2>&1) + RETVAL=$? + [ $RETVAL = 0 ] && success || { echo -n "$err: "; failure; } + echo + return $RETVAL } stop() { - $QPID_INIT stop + $QPID_INIT stop primary } reload() { echo 1>&2 $"$0: reload not supported" - exit 3 + return 3 } restart() { - $QPID_INIT restart && start + stop && start } # See how we were called. diff --git a/qpid/cpp/etc/qpidd.in b/qpid/cpp/etc/qpidd.in index 09d87e565d..3d5f5424bd 100755 --- a/qpid/cpp/etc/qpidd.in +++ b/qpid/cpp/etc/qpidd.in @@ -40,6 +40,7 @@ lockfile=/var/lock/subsys/$prog pidfile=/var/run/qpidd.pid # The following variables can be overridden in @sysconfdir@/sysconfig/$prog +QPID_INIT=@initdir@/$prog QPID_BIN=@sbindir@/$prog QPID_DATA_DIR=/var/lib/qpidd QPID_CONFIG=@confdir@/qpidd.conf @@ -59,10 +60,13 @@ for f in $QPID_BIN; do done qpid_ping() { - # Only do ping test if qpid-ha is installed. - if test -x $QPID_HA; then - $QPID_HA $QPID_HA_OPTIONS ping - fi + test -x $QPID_HA || return 0 # Only if qpid-ha installed + $QPID_HA $QPID_HA_OPTIONS ping >/dev/null 2>&1 +} + +qpid_is_primary() { + # Only if qpid-ha is installed. + test -x $QPID_HA && $QPID_HA $QPID_HA_OPTIONS status --is-primary >/dev/null 2>&1 } RETVAL=0 @@ -76,67 +80,86 @@ fi do_status() { # Check PID file and ping for liveness - MESSAGE=$(status $prog) || { - RC=$? - echo $MESSAGE - return $RC - } - qpid_ping || { - return 1 + MESSAGE=$(status -p $pidfile $prog) && { + qpid_ping || return 1 } + RC=$? echo $MESSAGE - return 0 + return $RC +} + +FLOCK_FD=9 +# Ensure no concurrent start/stop of services. +lock() { + [ "$QPID_HA_LOCK_HELD" ] || { # Held by caller + exec 9< $QPID_INIT + flock $FLOCK_FD + } } start() { - echo -n $"Starting Qpid AMQP daemon: " - daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --daemon $QPIDD_OPTIONS - RETVAL=$? - echo - [ $RETVAL = 0 ] && touch $lockfile - if [ $RETVAL = 0 ]; then - touch $pidfile - chown qpidd.qpidd $pidfile - [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile - runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile" - fi - return $RETVAL + lock + echo -n $"Starting Qpid AMQP daemon: " + daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --daemon $QPIDD_OPTIONS --close-fd $FLOCK_FD + RETVAL=$? + echo + [ $RETVAL = 0 ] && touch $lockfile + if [ $RETVAL = 0 ]; then + touch $pidfile + chown qpidd.qpidd $pidfile + [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile + runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile" + fi + return $RETVAL } stop() { + lock + # Primary script does not stop backup brokers and vice versa. + if qpid_is_primary; then + [ "$1" != primary ] && SKIP="Not stopping Qpid daemon, primary" + else + [ "$1" = primary ] && SKIP="Not stopping Qpid daemon, not primary" + fi + if [ -n "$SKIP" ]; then + echo -n "$SKIP: " + success + RETVAL=0 + else echo -n $"Stopping Qpid AMQP daemon: " killproc -p ${pidfile} $prog RETVAL=$? - echo [ $RETVAL = 0 ] && rm -f ${lockfile} ${pidfile} + fi + echo + return $RETVAL } reload() { - echo 1>&2 $"$0: reload not supported" - exit 3 + echo 1>&2 $"$0: reload not supported" + return 3 } restart() { - stop - start + stop && start } # See how we were called. case "$1" in - start|stop|restart|reload) - $1 + start|stop|restart|reload) + $1 $2 ;; - status) + status) do_status RETVAL=$? ;; - force-reload) + force-reload) restart ;; - try-restart|condrestart) + try-restart|condrestart) [ -e $lockfile ] && restart || : ;; - *) + *) echo 1>&2 $"Usage: $0 {start|stop|status|restart|condrestart|try-restart|force-reload}" exit 2 esac diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index 831b2e0641..9228c2d18d 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -59,12 +59,14 @@ const std::string TCP = "tcp"; struct DaemonOptions : public qpid::Options { bool daemon; bool quit; + bool kill; bool check; + std::vector<int> closeFd; int wait; std::string piddir; std::string transport; - DaemonOptions() : qpid::Options("Daemon options"), daemon(false), quit(false), check(false), wait(600), transport(TCP) + DaemonOptions() : qpid::Options("Daemon options"), daemon(false), quit(false), kill(false), check(false), wait(600), transport(TCP) { char *home = ::getenv("HOME"); @@ -78,9 +80,11 @@ struct DaemonOptions : public qpid::Options { ("daemon,d", pure_switch(daemon), "Run as a daemon. Logs to syslog by default in this mode.") ("transport", optValue(transport, "TRANSPORT"), "The transport for which to return the port") ("pid-dir", optValue(piddir, "DIR"), "Directory where port-specific PID file is stored") + ("close-fd", optValue(closeFd, "FD"), "File descriptors that the daemon should close") ("wait,w", optValue(wait, "SECONDS"), "Sets the maximum wait time to initialize or shutdown the daemon. If the daemon fails to initialize/shutdown, prints an error and returns 1") ("check,c", pure_switch(check), "Prints the daemon's process ID to stdout and returns 0 if the daemon is running, otherwise returns 1") - ("quit,q", pure_switch(quit), "Tells the daemon to shut down"); + ("quit,q", pure_switch(quit), "Tells the daemon to shut down with an INT signal") + ("kill,k", pure_switch(kill), "Kill the daemon with a KILL signal."); } }; @@ -132,12 +136,15 @@ struct QpiddDaemon : public Daemon { /** Code for parent process */ void parent() { uint16_t port = wait(options->daemon.wait); - if (options->parent->broker.port == 0 - ) cout << port << endl; + if (options->parent->broker.port == 0) + cout << port << endl; } /** Code for forked child process */ void child() { + // Close extra FDs requested in options. + for (size_t i = 0; i < options->daemon.closeFd.size(); ++i) + ::close(options->daemon.closeFd[i]); boost::intrusive_ptr<Broker> brokerPtr(new Broker(options->parent->broker)); ScopedSetBroker ssb(brokerPtr); brokerPtr->accept(); @@ -157,21 +164,22 @@ int QpiddBroker::execute (QpiddOptions *options) { if (myOptions == 0) throw Exception("Internal error obtaining platform options"); - if (myOptions->daemon.check || myOptions->daemon.quit) { + if (myOptions->daemon.check || myOptions->daemon.quit || myOptions->daemon.kill) { pid_t pid; try { pid = Daemon::getPid(myOptions->daemon.piddir, options->broker.port); - } catch (const ErrnoException& e) { + } catch (const Exception& e) { // This is not a critical error, usually means broker is not running - QPID_LOG(notice, "Cannot stop broker: " << e.what()); + QPID_LOG(notice, "Broker is not running: " << e.what()); return 1; } if (pid < 0) return 1; if (myOptions->daemon.check) cout << pid << endl; - if (myOptions->daemon.quit) { - if (kill(pid, SIGINT) < 0) + if (myOptions->daemon.quit || myOptions->daemon.kill) { + int signal = myOptions->daemon.kill ? SIGKILL : SIGINT; + if (kill(pid, signal) < 0) throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno)); // Wait for the process to die before returning int retry=myOptions->daemon.wait*1000; // Try up to "--wait N" seconds, do retry every millisecond diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 3afaf43b81..6648ae706d 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -236,8 +236,10 @@ Broker::Broker(const BrokerOptions& conf) : queueCleaner(queues, poller, timer.get()), recoveryInProgress(false), timestampRcvMsgs(conf.timestampRcvMsgs), + logPrefix(Msg() << "Broker " << sys::SystemInfo::getProcessId()), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { + QPID_LOG(notice, logPrefix << " initializing"); if (!dataDir.isEnabled()) { QPID_LOG (info, "No data directory - Disabling persistent configuration"); } @@ -388,6 +390,7 @@ Broker::Broker(const BrokerOptions& conf) : finalize(); throw; } + QPID_LOG(notice, logPrefix << " initialized"); } void Broker::declareStandardExchange(const std::string& name, const std::string& type) @@ -499,7 +502,7 @@ void Broker::setStore () { void Broker::run() { if (config.workerThreads > 0) { - QPID_LOG(notice, "Broker running"); + QPID_LOG(notice, logPrefix << " running"); Dispatcher d(poller); int numIOThreads = config.workerThreads; std::vector<Thread> t(numIOThreads-1); @@ -515,6 +518,7 @@ void Broker::run() { for (int i=0; i<numIOThreads-1; ++i) { t[i].join(); } + QPID_LOG(notice, logPrefix << " stopped"); } else { throw Exception((boost::format("Invalid value for worker-threads: %1%") % config.workerThreads).str()); } @@ -528,6 +532,7 @@ void Broker::shutdown() { } Broker::~Broker() { + QPID_LOG(notice, logPrefix << " shutting down"); if (mgmtObject != 0) mgmtObject->debugStats("destroying"); shutdown(); @@ -536,7 +541,7 @@ Broker::~Broker() { SaslAuthenticator::fini(); timer->stop(); managementAgent.reset(); - QPID_LOG(notice, "Shut down"); + QPID_LOG(notice, logPrefix << " shutdown complete"); } ManagementObject::shared_ptr Broker::GetManagementObject(void) const diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 25e4e9f0ef..46dbe5d5b5 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -160,6 +160,7 @@ class Broker : public sys::Runnable, public Plugin::Target, mutable sys::Mutex linkClientPropertiesLock; framing::FieldTable linkClientProperties; bool timestampRcvMsgs; + std::string logPrefix; public: QPID_BROKER_EXTERN virtual ~Broker(); diff --git a/qpid/cpp/src/qpid/broker/Daemon.cpp b/qpid/cpp/src/qpid/broker/Daemon.cpp index 281345bc95..5b6f898332 100644 --- a/qpid/cpp/src/qpid/broker/Daemon.cpp +++ b/qpid/cpp/src/qpid/broker/Daemon.cpp @@ -193,7 +193,7 @@ void Daemon::ready(uint16_t port) { // child */ int desired_write = sizeof(uint16_t); if ( desired_write > ::write(pipeFds[1], & port, desired_write) ) { - throw Exception("Error writing to parent." ); + throw ErrnoException("Error writing to parent" ); } QPID_LOG(debug, "Daemon ready on port: " << port); diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha index 49f6a244c6..640463c09a 100755 --- a/qpid/tools/src/py/qpid-ha +++ b/qpid/tools/src/py/qpid-ha @@ -19,7 +19,7 @@ # under the License. # -import optparse, sys, time, os, re +import optparse, sys, time, os, re, math from qpid.messaging import Connection from qpid.messaging import Message as QpidMessage from qpid.util import URL @@ -100,7 +100,7 @@ class Command: conn_options['client_properties'] = {'qpid.ha-admin' : 1} if opts.timeout: conn_options['timeout'] = opts.timeout - conn_options['heartbeat'] = int(opts.timeout) + conn_options['heartbeat'] = int(math.ceil(opts.timeout/2)) connection = Connection.establish(opts.broker, **conn_options) qmf_broker = self.connect_agent and BrokerAgent(connection) ha_broker = self.connect_agent and qmf_broker.getHaBroker() @@ -152,7 +152,7 @@ class PromoteCmd(Command): def __init__(self): Command.__init__(self, "promote","Promote a backup broker to primary. Note this command will not detect if another broker is already primary, and creating a second primary will make the cluster inconsistent. It is up to the caller (normally the cluster resource manager) to ensure there is only one primary.") def do_execute(self, qmf_broker, ha_broker, opts, args): - qmf_broker._method("promote", {}, HA_BROKER) + qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout) PromoteCmd() @@ -172,13 +172,15 @@ class StatusCmd(Command): def do_execute(self, qmf_broker, ha_broker, opts, args): if opts.is_primary: if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1) + return if opts.expect: if opts.expect != ha_broker.status: raise ExitStatus(1) + return + def status(hb, b=None, ex=None): if ex: print b, ex elif b: print b, hb.status else: print hb.status - self.all_brokers(ha_broker, opts, status) StatusCmd() @@ -187,7 +189,7 @@ class ReplicateCmd(Command): def __init__(self): Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"]) def do_execute(self, qmf_broker, ha_broker, opts, args): - qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER) + qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout) ReplicateCmd() class QueryCmd(Command): |
