diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /extras/dispatch/src/server.c | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras/dispatch/src/server.c')
-rw-r--r-- | extras/dispatch/src/server.c | 903 |
1 files changed, 903 insertions, 0 deletions
diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c new file mode 100644 index 0000000000..0099393f60 --- /dev/null +++ b/extras/dispatch/src/server.c @@ -0,0 +1,903 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/log.h> +#include "server_private.h" +#include "timer_private.h" +#include "alloc_private.h" +#include "auth.h" +#include "work_queue.h" +#include <stdio.h> +#include <time.h> +#include <signal.h> + +static char *module="SERVER"; + +typedef struct dx_thread_t { + int thread_id; + volatile int running; + volatile int canceled; + int using_thread; + sys_thread_t *thread; +} dx_thread_t; + + +typedef struct dx_server_t { + int thread_count; + pn_driver_t *driver; + dx_thread_start_cb_t start_handler; + dx_conn_handler_cb_t conn_handler; + dx_signal_handler_cb_t signal_handler; + dx_user_fd_handler_cb_t ufd_handler; + void *start_context; + void *conn_context; + void *signal_context; + sys_cond_t *cond; + sys_mutex_t *lock; + dx_thread_t **threads; + work_queue_t *work_queue; + dx_timer_list_t pending_timers; + bool a_thread_is_waiting; + int threads_active; + int pause_requests; + int threads_paused; + int pause_next_sequence; + int pause_now_serving; + int pending_signal; +} dx_server_t; + + +ALLOC_DEFINE(dx_listener_t); +ALLOC_DEFINE(dx_connector_t); +ALLOC_DEFINE(dx_connection_t); +ALLOC_DEFINE(dx_user_fd_t); + + +/** + * Singleton Concurrent Proton Driver object + */ +static dx_server_t *dx_server = 0; + + +static void signal_handler(int signum) +{ + dx_server->pending_signal = signum; + sys_cond_signal_all(dx_server->cond); +} + + +static dx_thread_t *thread(int id) +{ + dx_thread_t *thread = NEW(dx_thread_t); + if (!thread) + return 0; + + thread->thread_id = id; + thread->running = 0; + thread->canceled = 0; + thread->using_thread = 0; + + return thread; +} + + +static void thread_process_listeners(pn_driver_t *driver) +{ + pn_listener_t *listener = pn_driver_listener(driver); + pn_connector_t *cxtr; + dx_connection_t *ctx; + + while (listener) { + dx_log(module, LOG_TRACE, "Accepting Connection"); + cxtr = pn_listener_accept(listener); + ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_SASL_SERVER; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_cxtr = cxtr; + ctx->pn_conn = 0; + ctx->listener = (dx_listener_t*) pn_listener_context(listener); + ctx->connector = 0; + ctx->context = ctx->listener->context; + ctx->ufd = 0; + + pn_connector_set_context(cxtr, ctx); + listener = pn_driver_listener(driver); + } +} + + +static void handle_signals_LH(void) +{ + int signum = dx_server->pending_signal; + + if (signum) { + dx_server->pending_signal = 0; + if (dx_server->signal_handler) { + sys_mutex_unlock(dx_server->lock); + dx_server->signal_handler(dx_server->signal_context, signum); + sys_mutex_lock(dx_server->lock); + } + } +} + + +static void block_if_paused_LH(void) +{ + if (dx_server->pause_requests > 0) { + dx_server->threads_paused++; + sys_cond_signal_all(dx_server->cond); + while (dx_server->pause_requests > 0) + sys_cond_wait(dx_server->cond, dx_server->lock); + dx_server->threads_paused--; + } +} + + +static void process_connector(pn_connector_t *cxtr) +{ + dx_connection_t *ctx = pn_connector_context(cxtr); + int events = 0; + int auth_passes = 0; + + if (ctx->state == CONN_STATE_USER) { + dx_server->ufd_handler(ctx->ufd->context, ctx->ufd); + return; + } + + do { + // + // Step the engine for pre-handler processing + // + pn_connector_process(cxtr); + + // + // Call the handler that is appropriate for the connector's state. + // + switch (ctx->state) { + case CONN_STATE_CONNECTING: + if (!pn_connector_closed(cxtr)) { + ctx->state = CONN_STATE_SASL_CLIENT; + assert(ctx->connector); + ctx->connector->state = CXTR_STATE_OPEN; + events = 1; + } else { + ctx->state = CONN_STATE_FAILED; + events = 0; + } + break; + + case CONN_STATE_SASL_CLIENT: + if (auth_passes == 0) { + auth_client_handler(cxtr); + events = 1; + } else { + auth_passes++; + events = 0; + } + break; + + case CONN_STATE_SASL_SERVER: + if (auth_passes == 0) { + auth_server_handler(cxtr); + events = 1; + } else { + auth_passes++; + events = 0; + } + break; + + case CONN_STATE_OPENING: + ctx->state = CONN_STATE_OPERATIONAL; + + pn_connection_t *conn = pn_connection(); + pn_connection_set_container(conn, "dispatch"); // TODO - make unique + pn_connector_set_connection(cxtr, conn); + pn_connection_set_context(conn, ctx); + ctx->pn_conn = conn; + + dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + + if (ctx->listener) { + ce = DX_CONN_EVENT_LISTENER_OPEN; + } else if (ctx->connector) { + ce = DX_CONN_EVENT_CONNECTOR_OPEN; + ctx->connector->delay = 0; + } else + assert(0); + + dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + events = 1; + break; + + case CONN_STATE_OPERATIONAL: + if (pn_connector_closed(cxtr)) { + dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_CLOSE, + (dx_connection_t*) pn_connector_context(cxtr)); + events = 0; + } + else + events = dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_PROCESS, + (dx_connection_t*) pn_connector_context(cxtr)); + break; + + default: + break; + } + } while (events > 0); +} + + +// +// TEMPORARY FUNCTION PROTOTYPES +// +void pn_driver_wait_1(pn_driver_t *d); +int pn_driver_wait_2(pn_driver_t *d, int timeout); +void pn_driver_wait_3(pn_driver_t *d); +// +// END TEMPORARY +// + +static void *thread_run(void *arg) +{ + dx_thread_t *thread = (dx_thread_t*) arg; + pn_connector_t *work; + pn_connection_t *conn; + dx_connection_t *ctx; + int error; + int poll_result; + int timer_holdoff = 0; + + if (!thread) + return 0; + + thread->running = 1; + + if (thread->canceled) + return 0; + + // + // Invoke the start handler if the application supplied one. + // This handler can be used to set NUMA or processor affinnity for the thread. + // + if (dx_server->start_handler) + dx_server->start_handler(dx_server->start_context, thread->thread_id); + + // + // Main Loop + // + while (thread->running) { + sys_mutex_lock(dx_server->lock); + + // + // Check for pending signals to process + // + handle_signals_LH(); + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Check to see if the server is pausing. If so, block here. + // + block_if_paused_LH(); + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Service pending timers. + // + dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); + if (timer) { + DEQ_REMOVE_HEAD(dx_server->pending_timers); + + // + // Mark the timer as idle in case it reschedules itself. + // + dx_timer_idle_LH(timer); + + // + // Release the lock and invoke the connection handler. + // + sys_mutex_unlock(dx_server->lock); + timer->handler(timer->context); + pn_driver_wakeup(dx_server->driver); + continue; + } + + // + // Check the work queue for connectors scheduled for processing. + // + work = work_queue_get(dx_server->work_queue); + if (!work) { + // + // There is no pending work to do + // + if (dx_server->a_thread_is_waiting) { + // + // Another thread is waiting on the proton driver, this thread must + // wait on the condition variable until signaled. + // + sys_cond_wait(dx_server->cond, dx_server->lock); + } else { + // + // This thread elects itself to wait on the proton driver. Set the + // thread-is-waiting flag so other idle threads will not interfere. + // + dx_server->a_thread_is_waiting = true; + + // + // Ask the timer module when its next timer is scheduled to fire. We'll + // use this value in driver_wait as the timeout. If there are no scheduled + // timers, the returned value will be -1. + // + long duration = dx_timer_next_duration_LH(); + + // + // Invoke the proton driver's wait sequence. This is a bit of a hack for now + // and will be improved in the future. The wait process is divided into three parts, + // the first and third of which need to be non-reentrant, and the second of which + // must be reentrant (and blocks). + // + pn_driver_wait_1(dx_server->driver); + sys_mutex_unlock(dx_server->lock); + + do { + error = 0; + poll_result = pn_driver_wait_2(dx_server->driver, duration); + if (poll_result == -1) + error = pn_driver_errno(dx_server->driver); + } while (error == PN_INTR); + if (error) { + dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver))); + exit(-1); + } + + sys_mutex_lock(dx_server->lock); + pn_driver_wait_3(dx_server->driver); + + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Visit the timer module. + // + if (poll_result == 0 || ++timer_holdoff == 100) { + struct timespec tv; + clock_gettime(CLOCK_REALTIME, &tv); + long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000; + dx_timer_visit_LH(milliseconds); + timer_holdoff = 0; + } + + // + // Process listeners (incoming connections). + // + thread_process_listeners(dx_server->driver); + + // + // Traverse the list of connectors-needing-service from the proton driver. + // If the connector is not already in the work queue and it is not currently + // being processed by another thread, put it in the work queue and signal the + // condition variable. + // + work = pn_driver_connector(dx_server->driver); + while (work) { + ctx = pn_connector_context(work); + if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { + ctx->enqueued = 1; + work_queue_put(dx_server->work_queue, work); + sys_cond_signal(dx_server->cond); + } + work = pn_driver_connector(dx_server->driver); + } + + // + // Release our exclusive claim on pn_driver_wait. + // + dx_server->a_thread_is_waiting = false; + } + } + + // + // If we were given a connector to work on from the work queue, mark it as + // owned by this thread and as no longer enqueued. + // + if (work) { + ctx = pn_connector_context(work); + if (ctx->owner_thread == CONTEXT_NO_OWNER) { + ctx->owner_thread = thread->thread_id; + ctx->enqueued = 0; + dx_server->threads_active++; + } else { + // + // This connector is being processed by another thread, re-queue it. + // + work_queue_put(dx_server->work_queue, work); + work = 0; + } + } + sys_mutex_unlock(dx_server->lock); + + // + // Process the connector that we now have exclusive access to. + // + if (work) { + process_connector(work); + + // + // Check to see if the connector was closed during processing + // + if (pn_connector_closed(work)) { + // + // Connector is closed. Free the context and the connector. + // + conn = pn_connector_connection(work); + if (ctx->connector) { + ctx->connector->ctx = 0; + ctx->connector->state = CXTR_STATE_CONNECTING; + dx_timer_schedule(ctx->connector->timer, ctx->connector->delay); + } + sys_mutex_lock(dx_server->lock); + free_dx_connection_t(ctx); + pn_connector_free(work); + if (conn) + pn_connection_free(conn); + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); + } else { + // + // The connector lives on. Mark it as no longer owned by this thread. + // + sys_mutex_lock(dx_server->lock); + ctx->owner_thread = CONTEXT_NO_OWNER; + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); + } + + // + // Wake up the proton driver to force it to reconsider its set of FDs + // in light of the processing that just occurred. + // + pn_driver_wakeup(dx_server->driver); + } + } + + return 0; +} + + +static void thread_start(dx_thread_t *thread) +{ + if (!thread) + return; + + thread->using_thread = 1; + thread->thread = sys_thread(thread_run, (void*) thread); +} + + +static void thread_cancel(dx_thread_t *thread) +{ + if (!thread) + return; + + thread->running = 0; + thread->canceled = 1; +} + + +static void thread_join(dx_thread_t *thread) +{ + if (!thread) + return; + + if (thread->using_thread) + sys_thread_join(thread->thread); +} + + +static void thread_free(dx_thread_t *thread) +{ + if (!thread) + return; + + free(thread); +} + + +static void cxtr_try_open(void *context) +{ + dx_connector_t *ct = (dx_connector_t*) context; + if (ct->state != CXTR_STATE_CONNECTING) + return; + + dx_connection_t *ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_CONNECTING; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_conn = 0; + ctx->listener = 0; + ctx->connector = ct; + ctx->context = ct->context; + ctx->user_context = 0; + ctx->ufd = 0; + + // + // pn_connector is not thread safe + // + sys_mutex_lock(dx_server->lock); + ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx); + sys_mutex_unlock(dx_server->lock); + + ct->ctx = ctx; + ct->delay = 5000; + dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); +} + + +void dx_server_initialize(int thread_count) +{ + int i; + + if (dx_server) + return; // TODO - Fail in a more dramatic way + + dx_alloc_initialize(); + dx_server = NEW(dx_server_t); + + if (!dx_server) + return; // TODO - Fail in a more dramatic way + + dx_server->thread_count = thread_count; + dx_server->driver = pn_driver(); + dx_server->start_handler = 0; + dx_server->conn_handler = 0; + dx_server->signal_handler = 0; + dx_server->ufd_handler = 0; + dx_server->start_context = 0; + dx_server->signal_context = 0; + dx_server->lock = sys_mutex(); + dx_server->cond = sys_cond(); + + dx_timer_initialize(dx_server->lock); + + dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count); + for (i = 0; i < thread_count; i++) + dx_server->threads[i] = thread(i); + + dx_server->work_queue = work_queue(); + DEQ_INIT(dx_server->pending_timers); + dx_server->a_thread_is_waiting = false; + dx_server->threads_active = 0; + dx_server->pause_requests = 0; + dx_server->threads_paused = 0; + dx_server->pause_next_sequence = 0; + dx_server->pause_now_serving = 0; + dx_server->pending_signal = 0; +} + + +void dx_server_finalize(void) +{ + int i; + if (!dx_server) + return; + + for (i = 0; i < dx_server->thread_count; i++) + thread_free(dx_server->threads[i]); + + work_queue_free(dx_server->work_queue); + + pn_driver_free(dx_server->driver); + sys_mutex_free(dx_server->lock); + sys_cond_free(dx_server->cond); + free(dx_server); + dx_server = 0; +} + + +void dx_server_set_conn_handler(dx_conn_handler_cb_t handler) +{ + dx_server->conn_handler = handler; +} + + +void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context) +{ + dx_server->signal_handler = handler; + dx_server->signal_context = context; +} + + +void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context) +{ + dx_server->start_handler = handler; + dx_server->start_context = context; +} + + +void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler) +{ + dx_server->ufd_handler = ufd_handler; +} + + +void dx_server_run(void) +{ + int i; + if (!dx_server) + return; + + assert(dx_server->conn_handler); // Server can't run without a connection handler. + + for (i = 1; i < dx_server->thread_count; i++) + thread_start(dx_server->threads[i]); + + dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count); + + thread_run((void*) dx_server->threads[0]); + + for (i = 1; i < dx_server->thread_count; i++) + thread_join(dx_server->threads[i]); + + dx_log(module, LOG_INFO, "Shut Down"); +} + + +void dx_server_stop(void) +{ + int idx; + + sys_mutex_lock(dx_server->lock); + for (idx = 0; idx < dx_server->thread_count; idx++) + thread_cancel(dx_server->threads[idx]); + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_signal(int signum) +{ + signal(signum, signal_handler); +} + + +void dx_server_pause(void) +{ + sys_mutex_lock(dx_server->lock); + + // + // Bump the request count to stop all the threads. + // + dx_server->pause_requests++; + int my_sequence = dx_server->pause_next_sequence++; + + // + // Awaken all threads that are currently blocking. + // + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); + + // + // Wait for the paused thread count plus the number of threads requesting a pause to equal + // the total thread count. Also, don't exit the blocking loop until now_serving equals our + // sequence number. This ensures that concurrent pausers don't run at the same time. + // + while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) || + (my_sequence != dx_server->pause_now_serving)) + sys_cond_wait(dx_server->cond, dx_server->lock); + + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_resume(void) +{ + sys_mutex_lock(dx_server->lock); + dx_server->pause_requests--; + dx_server->pause_now_serving++; + sys_cond_signal_all(dx_server->cond); + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_activate(dx_connection_t *ctx) +{ + if (!ctx) + return; + + pn_connector_t *ctor = ctx->pn_cxtr; + if (!ctor) + return; + + if (!pn_connector_closed(ctor)) + pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE); +} + + +void dx_connection_set_context(dx_connection_t *conn, void *context) +{ + conn->user_context = context; +} + + +void *dx_connection_get_context(dx_connection_t *conn) +{ + return conn->user_context; +} + + +pn_connection_t *dx_connection_pn(dx_connection_t *conn) +{ + return conn->pn_conn; +} + + +dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context) +{ + dx_listener_t *li = new_dx_listener_t(); + + if (!li) + return 0; + + li->config = config; + li->context = context; + li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li); + + if (!li->pn_listener) { + dx_log(module, LOG_ERROR, "Driver Error %d (%s)", + pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver)); + free_dx_listener_t(li); + return 0; + } + dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port); + + return li; +} + + +void dx_server_listener_free(dx_listener_t* li) +{ + pn_listener_free(li->pn_listener); + free_dx_listener_t(li); +} + + +void dx_server_listener_close(dx_listener_t* li) +{ + pn_listener_close(li->pn_listener); +} + + +dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context) +{ + dx_connector_t *ct = new_dx_connector_t(); + + if (!ct) + return 0; + + ct->state = CXTR_STATE_CONNECTING; + ct->config = config; + ct->context = context; + ct->ctx = 0; + ct->timer = dx_timer(cxtr_try_open, (void*) ct); + ct->delay = 0; + + dx_timer_schedule(ct->timer, ct->delay); + return ct; +} + + +void dx_server_connector_free(dx_connector_t* ct) +{ + // Don't free the proton connector. This will be done by the connector + // processing/cleanup. + + if (ct->ctx) { + pn_connector_close(ct->ctx->pn_cxtr); + ct->ctx->connector = 0; + } + + dx_timer_free(ct->timer); + free_dx_connector_t(ct); +} + + +dx_user_fd_t *dx_user_fd(int fd, void *context) +{ + dx_user_fd_t *ufd = new_dx_user_fd_t(); + + if (!ufd) + return 0; + + dx_connection_t *ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_USER; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_conn = 0; + ctx->listener = 0; + ctx->connector = 0; + ctx->context = 0; + ctx->user_context = 0; + ctx->ufd = ufd; + + ufd->context = context; + ufd->fd = fd; + ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx); + pn_driver_wakeup(dx_server->driver); + + return ufd; +} + + +void dx_user_fd_free(dx_user_fd_t *ufd) +{ + pn_connector_close(ufd->pn_conn); + free_dx_user_fd_t(ufd); +} + + +void dx_user_fd_activate_read(dx_user_fd_t *ufd) +{ + pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE); + pn_driver_wakeup(dx_server->driver); +} + + +void dx_user_fd_activate_write(dx_user_fd_t *ufd) +{ + pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE); + pn_driver_wakeup(dx_server->driver); +} + + +bool dx_user_fd_is_readable(dx_user_fd_t *ufd) +{ + return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE); +} + + +bool dx_user_fd_is_writeable(dx_user_fd_t *ufd) +{ + return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE); +} + + +void dx_server_timer_pending_LH(dx_timer_t *timer) +{ + DEQ_INSERT_TAIL(dx_server->pending_timers, timer); +} + + +void dx_server_timer_cancel_LH(dx_timer_t *timer) +{ + DEQ_REMOVE(dx_server->pending_timers, timer); +} + |