summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xqpid/cpp/etc/qpidd-primary.in25
-rwxr-xr-xqpid/cpp/etc/qpidd.in93
-rw-r--r--qpid/cpp/src/posix/QpiddBroker.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Daemon.cpp2
-rwxr-xr-xqpid/tools/src/py/qpid-ha12
7 files changed, 109 insertions, 59 deletions
diff --git a/qpid/cpp/etc/qpidd-primary.in b/qpid/cpp/etc/qpidd-primary.in
index 3119ebac6e..e79d8cc09c 100755
--- a/qpid/cpp/etc/qpidd-primary.in
+++ b/qpid/cpp/etc/qpidd-primary.in
@@ -65,24 +65,35 @@ status() {
fi
}
+# Ensure no concurrent start/stop of services.
+lock() {
+ export QPID_HA_LOCK_HELD=1 # For calls to the qpidd script
+ exec 9< $QPID_INIT
+ flock 9
+}
+
start() {
- $QPID_INIT start
- echo -n $"Promoting qpid daemon to cluster primary: "
- $QPID_HA promote
- [ "$?" -eq 0 ] && success || failure
+ lock
+ $QPID_INIT start primary || return $?
+ echo -n $"Promoting to primary: "
+ err=$($QPID_HA promote 2>&1)
+ RETVAL=$?
+ [ $RETVAL = 0 ] && success || { echo -n "$err: "; failure; }
+ echo
+ return $RETVAL
}
stop() {
- $QPID_INIT stop
+ $QPID_INIT stop primary
}
reload() {
echo 1>&2 $"$0: reload not supported"
- exit 3
+ return 3
}
restart() {
- $QPID_INIT restart && start
+ stop && start
}
# See how we were called.
diff --git a/qpid/cpp/etc/qpidd.in b/qpid/cpp/etc/qpidd.in
index 09d87e565d..3d5f5424bd 100755
--- a/qpid/cpp/etc/qpidd.in
+++ b/qpid/cpp/etc/qpidd.in
@@ -40,6 +40,7 @@ lockfile=/var/lock/subsys/$prog
pidfile=/var/run/qpidd.pid
# The following variables can be overridden in @sysconfdir@/sysconfig/$prog
+QPID_INIT=@initdir@/$prog
QPID_BIN=@sbindir@/$prog
QPID_DATA_DIR=/var/lib/qpidd
QPID_CONFIG=@confdir@/qpidd.conf
@@ -59,10 +60,13 @@ for f in $QPID_BIN; do
done
qpid_ping() {
- # Only do ping test if qpid-ha is installed.
- if test -x $QPID_HA; then
- $QPID_HA $QPID_HA_OPTIONS ping
- fi
+ test -x $QPID_HA || return 0 # Only if qpid-ha installed
+ $QPID_HA $QPID_HA_OPTIONS ping >/dev/null 2>&1
+}
+
+qpid_is_primary() {
+ # Only if qpid-ha is installed.
+ test -x $QPID_HA && $QPID_HA $QPID_HA_OPTIONS status --is-primary >/dev/null 2>&1
}
RETVAL=0
@@ -76,67 +80,86 @@ fi
do_status() {
# Check PID file and ping for liveness
- MESSAGE=$(status $prog) || {
- RC=$?
- echo $MESSAGE
- return $RC
- }
- qpid_ping || {
- return 1
+ MESSAGE=$(status -p $pidfile $prog) && {
+ qpid_ping || return 1
}
+ RC=$?
echo $MESSAGE
- return 0
+ return $RC
+}
+
+FLOCK_FD=9
+# Ensure no concurrent start/stop of services.
+lock() {
+ [ "$QPID_HA_LOCK_HELD" ] || { # Held by caller
+ exec 9< $QPID_INIT
+ flock $FLOCK_FD
+ }
}
start() {
- echo -n $"Starting Qpid AMQP daemon: "
- daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --daemon $QPIDD_OPTIONS
- RETVAL=$?
- echo
- [ $RETVAL = 0 ] && touch $lockfile
- if [ $RETVAL = 0 ]; then
- touch $pidfile
- chown qpidd.qpidd $pidfile
- [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile
- runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile"
- fi
- return $RETVAL
+ lock
+ echo -n $"Starting Qpid AMQP daemon: "
+ daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --daemon $QPIDD_OPTIONS --close-fd $FLOCK_FD
+ RETVAL=$?
+ echo
+ [ $RETVAL = 0 ] && touch $lockfile
+ if [ $RETVAL = 0 ]; then
+ touch $pidfile
+ chown qpidd.qpidd $pidfile
+ [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile
+ runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile"
+ fi
+ return $RETVAL
}
stop() {
+ lock
+ # Primary script does not stop backup brokers and vice versa.
+ if qpid_is_primary; then
+ [ "$1" != primary ] && SKIP="Not stopping Qpid daemon, primary"
+ else
+ [ "$1" = primary ] && SKIP="Not stopping Qpid daemon, not primary"
+ fi
+ if [ -n "$SKIP" ]; then
+ echo -n "$SKIP: "
+ success
+ RETVAL=0
+ else
echo -n $"Stopping Qpid AMQP daemon: "
killproc -p ${pidfile} $prog
RETVAL=$?
- echo
[ $RETVAL = 0 ] && rm -f ${lockfile} ${pidfile}
+ fi
+ echo
+ return $RETVAL
}
reload() {
- echo 1>&2 $"$0: reload not supported"
- exit 3
+ echo 1>&2 $"$0: reload not supported"
+ return 3
}
restart() {
- stop
- start
+ stop && start
}
# See how we were called.
case "$1" in
- start|stop|restart|reload)
- $1
+ start|stop|restart|reload)
+ $1 $2
;;
- status)
+ status)
do_status
RETVAL=$?
;;
- force-reload)
+ force-reload)
restart
;;
- try-restart|condrestart)
+ try-restart|condrestart)
[ -e $lockfile ] && restart || :
;;
- *)
+ *)
echo 1>&2 $"Usage: $0 {start|stop|status|restart|condrestart|try-restart|force-reload}"
exit 2
esac
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index 831b2e0641..9228c2d18d 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -59,12 +59,14 @@ const std::string TCP = "tcp";
struct DaemonOptions : public qpid::Options {
bool daemon;
bool quit;
+ bool kill;
bool check;
+ std::vector<int> closeFd;
int wait;
std::string piddir;
std::string transport;
- DaemonOptions() : qpid::Options("Daemon options"), daemon(false), quit(false), check(false), wait(600), transport(TCP)
+ DaemonOptions() : qpid::Options("Daemon options"), daemon(false), quit(false), kill(false), check(false), wait(600), transport(TCP)
{
char *home = ::getenv("HOME");
@@ -78,9 +80,11 @@ struct DaemonOptions : public qpid::Options {
("daemon,d", pure_switch(daemon), "Run as a daemon. Logs to syslog by default in this mode.")
("transport", optValue(transport, "TRANSPORT"), "The transport for which to return the port")
("pid-dir", optValue(piddir, "DIR"), "Directory where port-specific PID file is stored")
+ ("close-fd", optValue(closeFd, "FD"), "File descriptors that the daemon should close")
("wait,w", optValue(wait, "SECONDS"), "Sets the maximum wait time to initialize or shutdown the daemon. If the daemon fails to initialize/shutdown, prints an error and returns 1")
("check,c", pure_switch(check), "Prints the daemon's process ID to stdout and returns 0 if the daemon is running, otherwise returns 1")
- ("quit,q", pure_switch(quit), "Tells the daemon to shut down");
+ ("quit,q", pure_switch(quit), "Tells the daemon to shut down with an INT signal")
+ ("kill,k", pure_switch(kill), "Kill the daemon with a KILL signal.");
}
};
@@ -132,12 +136,15 @@ struct QpiddDaemon : public Daemon {
/** Code for parent process */
void parent() {
uint16_t port = wait(options->daemon.wait);
- if (options->parent->broker.port == 0
- ) cout << port << endl;
+ if (options->parent->broker.port == 0)
+ cout << port << endl;
}
/** Code for forked child process */
void child() {
+ // Close extra FDs requested in options.
+ for (size_t i = 0; i < options->daemon.closeFd.size(); ++i)
+ ::close(options->daemon.closeFd[i]);
boost::intrusive_ptr<Broker> brokerPtr(new Broker(options->parent->broker));
ScopedSetBroker ssb(brokerPtr);
brokerPtr->accept();
@@ -157,21 +164,22 @@ int QpiddBroker::execute (QpiddOptions *options) {
if (myOptions == 0)
throw Exception("Internal error obtaining platform options");
- if (myOptions->daemon.check || myOptions->daemon.quit) {
+ if (myOptions->daemon.check || myOptions->daemon.quit || myOptions->daemon.kill) {
pid_t pid;
try {
pid = Daemon::getPid(myOptions->daemon.piddir, options->broker.port);
- } catch (const ErrnoException& e) {
+ } catch (const Exception& e) {
// This is not a critical error, usually means broker is not running
- QPID_LOG(notice, "Cannot stop broker: " << e.what());
+ QPID_LOG(notice, "Broker is not running: " << e.what());
return 1;
}
if (pid < 0)
return 1;
if (myOptions->daemon.check)
cout << pid << endl;
- if (myOptions->daemon.quit) {
- if (kill(pid, SIGINT) < 0)
+ if (myOptions->daemon.quit || myOptions->daemon.kill) {
+ int signal = myOptions->daemon.kill ? SIGKILL : SIGINT;
+ if (kill(pid, signal) < 0)
throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno));
// Wait for the process to die before returning
int retry=myOptions->daemon.wait*1000; // Try up to "--wait N" seconds, do retry every millisecond
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 3afaf43b81..6648ae706d 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -236,8 +236,10 @@ Broker::Broker(const BrokerOptions& conf) :
queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
timestampRcvMsgs(conf.timestampRcvMsgs),
+ logPrefix(Msg() << "Broker " << sys::SystemInfo::getProcessId()),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
+ QPID_LOG(notice, logPrefix << " initializing");
if (!dataDir.isEnabled()) {
QPID_LOG (info, "No data directory - Disabling persistent configuration");
}
@@ -388,6 +390,7 @@ Broker::Broker(const BrokerOptions& conf) :
finalize();
throw;
}
+ QPID_LOG(notice, logPrefix << " initialized");
}
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
@@ -499,7 +502,7 @@ void Broker::setStore () {
void Broker::run() {
if (config.workerThreads > 0) {
- QPID_LOG(notice, "Broker running");
+ QPID_LOG(notice, logPrefix << " running");
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
std::vector<Thread> t(numIOThreads-1);
@@ -515,6 +518,7 @@ void Broker::run() {
for (int i=0; i<numIOThreads-1; ++i) {
t[i].join();
}
+ QPID_LOG(notice, logPrefix << " stopped");
} else {
throw Exception((boost::format("Invalid value for worker-threads: %1%") % config.workerThreads).str());
}
@@ -528,6 +532,7 @@ void Broker::shutdown() {
}
Broker::~Broker() {
+ QPID_LOG(notice, logPrefix << " shutting down");
if (mgmtObject != 0)
mgmtObject->debugStats("destroying");
shutdown();
@@ -536,7 +541,7 @@ Broker::~Broker() {
SaslAuthenticator::fini();
timer->stop();
managementAgent.reset();
- QPID_LOG(notice, "Shut down");
+ QPID_LOG(notice, logPrefix << " shutdown complete");
}
ManagementObject::shared_ptr Broker::GetManagementObject(void) const
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 25e4e9f0ef..46dbe5d5b5 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -160,6 +160,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
bool timestampRcvMsgs;
+ std::string logPrefix;
public:
QPID_BROKER_EXTERN virtual ~Broker();
diff --git a/qpid/cpp/src/qpid/broker/Daemon.cpp b/qpid/cpp/src/qpid/broker/Daemon.cpp
index 281345bc95..5b6f898332 100644
--- a/qpid/cpp/src/qpid/broker/Daemon.cpp
+++ b/qpid/cpp/src/qpid/broker/Daemon.cpp
@@ -193,7 +193,7 @@ void Daemon::ready(uint16_t port) { // child
*/
int desired_write = sizeof(uint16_t);
if ( desired_write > ::write(pipeFds[1], & port, desired_write) ) {
- throw Exception("Error writing to parent." );
+ throw ErrnoException("Error writing to parent" );
}
QPID_LOG(debug, "Daemon ready on port: " << port);
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
index 49f6a244c6..640463c09a 100755
--- a/qpid/tools/src/py/qpid-ha
+++ b/qpid/tools/src/py/qpid-ha
@@ -19,7 +19,7 @@
# under the License.
#
-import optparse, sys, time, os, re
+import optparse, sys, time, os, re, math
from qpid.messaging import Connection
from qpid.messaging import Message as QpidMessage
from qpid.util import URL
@@ -100,7 +100,7 @@ class Command:
conn_options['client_properties'] = {'qpid.ha-admin' : 1}
if opts.timeout:
conn_options['timeout'] = opts.timeout
- conn_options['heartbeat'] = int(opts.timeout)
+ conn_options['heartbeat'] = int(math.ceil(opts.timeout/2))
connection = Connection.establish(opts.broker, **conn_options)
qmf_broker = self.connect_agent and BrokerAgent(connection)
ha_broker = self.connect_agent and qmf_broker.getHaBroker()
@@ -152,7 +152,7 @@ class PromoteCmd(Command):
def __init__(self):
Command.__init__(self, "promote","Promote a backup broker to primary. Note this command will not detect if another broker is already primary, and creating a second primary will make the cluster inconsistent. It is up to the caller (normally the cluster resource manager) to ensure there is only one primary.")
def do_execute(self, qmf_broker, ha_broker, opts, args):
- qmf_broker._method("promote", {}, HA_BROKER)
+ qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout)
PromoteCmd()
@@ -172,13 +172,15 @@ class StatusCmd(Command):
def do_execute(self, qmf_broker, ha_broker, opts, args):
if opts.is_primary:
if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1)
+ return
if opts.expect:
if opts.expect != ha_broker.status: raise ExitStatus(1)
+ return
+
def status(hb, b=None, ex=None):
if ex: print b, ex
elif b: print b, hb.status
else: print hb.status
-
self.all_brokers(ha_broker, opts, status)
StatusCmd()
@@ -187,7 +189,7 @@ class ReplicateCmd(Command):
def __init__(self):
Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
def do_execute(self, qmf_broker, ha_broker, opts, args):
- qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER)
+ qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout)
ReplicateCmd()
class QueryCmd(Command):