summaryrefslogtreecommitdiff
path: root/extras/dispatch
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-02-28 01:05:17 +0000
committerTed Ross <tross@apache.org>2013-02-28 01:05:17 +0000
commitff671a2adbbbcf9d2ba26e2157eaac3b5ef4e072 (patch)
tree9c184014d27d000e88d2a96face453497d0312de /extras/dispatch
parentee3af4d23df2ebcd7c13900c83e182f6b3e2625e (diff)
downloadqpid-python-ff671a2adbbbcf9d2ba26e2157eaac3b5ef4e072.tar.gz
NO-JIRA - Added non-blocking server-start for when the application want to keep its own
main thread. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1451069 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras/dispatch')
-rw-r--r--extras/dispatch/include/qpid/dispatch/server.h10
-rw-r--r--extras/dispatch/src/alloc.c4
-rw-r--r--extras/dispatch/src/server.c23
-rw-r--r--extras/dispatch/tests/server_test.c17
4 files changed, 49 insertions, 5 deletions
diff --git a/extras/dispatch/include/qpid/dispatch/server.h b/extras/dispatch/include/qpid/dispatch/server.h
index 635e1323dd..0fb746f4a8 100644
--- a/extras/dispatch/include/qpid/dispatch/server.h
+++ b/extras/dispatch/include/qpid/dispatch/server.h
@@ -68,7 +68,7 @@ void dx_server_set_start_handler(dx_thread_start_cb_t start_handler, void *conte
/**
- * \brief Run the server threads until completion.
+ * \brief Run the server threads until completion - The blocking version.
*
* Start the operation of the server, including launching all of the worker threads.
* This function does not return until after the server has been stopped. The thread
@@ -78,6 +78,14 @@ void dx_server_run(void);
/**
+ * \brief Start the server threads and return immediately.
+ *
+ * Start the operation of the server, including launching all of the worker threads.
+ */
+void dx_server_start(void);
+
+
+/**
* \brief Stop the server
*
* Stop the server and join all of its worker threads. This function may be called from any
diff --git a/extras/dispatch/src/alloc.c b/extras/dispatch/src/alloc.c
index 2b3b953aad..63769c48a8 100644
--- a/extras/dispatch/src/alloc.c
+++ b/extras/dispatch/src/alloc.c
@@ -50,8 +50,8 @@ static void dx_alloc_init(dx_alloc_type_desc_t *desc)
if (desc->additional_size)
desc->total_size += *desc->additional_size;
- dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d",
- desc->type_name, desc->type_size, desc->total_size);
+ //dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d",
+ // desc->type_name, desc->type_size, desc->total_size);
if (!desc->global_pool) {
if (desc->config == 0)
diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c
index 0099393f60..e5e521b47e 100644
--- a/extras/dispatch/src/server.c
+++ b/extras/dispatch/src/server.c
@@ -30,6 +30,7 @@
#include <signal.h>
static char *module="SERVER";
+static __thread int server_thread = 0;
typedef struct dx_thread_t {
int thread_id;
@@ -271,6 +272,7 @@ static void *thread_run(void *arg)
if (!thread)
return 0;
+ server_thread = 1;
thread->running = 1;
if (thread->canceled)
@@ -670,6 +672,21 @@ void dx_server_run(void)
}
+void dx_server_start(void)
+{
+ int i;
+ if (!dx_server)
+ return;
+
+ assert(dx_server->conn_handler); // Server can't run without a connection handler.
+
+ for (i = 0; i < dx_server->thread_count; i++)
+ thread_start(dx_server->threads[i]);
+
+ dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);
+}
+
+
void dx_server_stop(void)
{
int idx;
@@ -680,6 +697,12 @@ void dx_server_stop(void)
sys_cond_signal_all(dx_server->cond);
pn_driver_wakeup(dx_server->driver);
sys_mutex_unlock(dx_server->lock);
+
+ if (!server_thread) {
+ for (idx = 0; idx < dx_server->thread_count; idx++)
+ thread_join(dx_server->threads[idx]);
+ dx_log(module, LOG_INFO, "Shut Down");
+ }
}
diff --git a/extras/dispatch/tests/server_test.c b/extras/dispatch/tests/server_test.c
index adeab62af9..74b4f9f2ae 100644
--- a/extras/dispatch/tests/server_test.c
+++ b/extras/dispatch/tests/server_test.c
@@ -47,7 +47,7 @@ static dx_user_fd_t *ufd_write;
static dx_user_fd_t *ufd_read;
-static void thread_start(void *context, int thread_id)
+static void thread_start_handler(void *context, int thread_id)
{
sys_mutex_lock(test_lock);
if (context != expected_context && !stored_error[0])
@@ -134,7 +134,7 @@ static char* test_start_handler(void *context)
threads_seen[i] = 0;
dx_server_set_conn_handler(conn_handler);
- dx_server_set_start_handler(thread_start, expected_context);
+ dx_server_set_start_handler(thread_start_handler, expected_context);
dx_server_run();
dx_server_finalize();
@@ -147,6 +147,18 @@ static char* test_start_handler(void *context)
}
+static char *test_server_start(void *context)
+{
+ dx_server_initialize(THREAD_COUNT);
+ dx_server_set_conn_handler(conn_handler);
+ dx_server_start();
+ dx_server_stop();
+ dx_server_finalize();
+
+ return 0;
+}
+
+
static char* test_user_fd(void *context)
{
int res;
@@ -186,6 +198,7 @@ int server_tests(void)
test_lock = sys_mutex();
dx_log_set_mask(LOG_NONE);
+ TEST_CASE(test_server_start, 0);
TEST_CASE(test_start_handler, 0);
TEST_CASE(test_user_fd, 0);