/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include "server_private.h" #include "timer_private.h" #include "alloc_private.h" #include "auth.h" #include "work_queue.h" #include #include #include static char *module="SERVER"; typedef struct dx_thread_t { int thread_id; volatile int running; volatile int canceled; int using_thread; sys_thread_t *thread; } dx_thread_t; typedef struct dx_server_t { int thread_count; pn_driver_t *driver; dx_thread_start_cb_t start_handler; dx_conn_handler_cb_t conn_handler; dx_signal_handler_cb_t signal_handler; dx_user_fd_handler_cb_t ufd_handler; void *start_context; void *conn_context; void *signal_context; sys_cond_t *cond; sys_mutex_t *lock; dx_thread_t **threads; work_queue_t *work_queue; dx_timer_list_t pending_timers; bool a_thread_is_waiting; int threads_active; int pause_requests; int threads_paused; int pause_next_sequence; int pause_now_serving; int pending_signal; } dx_server_t; ALLOC_DEFINE(dx_listener_t); ALLOC_DEFINE(dx_connector_t); ALLOC_DEFINE(dx_connection_t); ALLOC_DEFINE(dx_user_fd_t); /** * Singleton Concurrent Proton Driver object */ static dx_server_t *dx_server = 0; static void signal_handler(int signum) { dx_server->pending_signal = signum; sys_cond_signal_all(dx_server->cond); } static dx_thread_t *thread(int id) { dx_thread_t *thread = NEW(dx_thread_t); if (!thread) return 0; thread->thread_id = id; thread->running = 0; thread->canceled = 0; thread->using_thread = 0; return thread; } static void thread_process_listeners(pn_driver_t *driver) { pn_listener_t *listener = pn_driver_listener(driver); pn_connector_t *cxtr; dx_connection_t *ctx; while (listener) { dx_log(module, LOG_TRACE, "Accepting Connection"); cxtr = pn_listener_accept(listener); ctx = new_dx_connection_t(); ctx->state = CONN_STATE_SASL_SERVER; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; ctx->pn_cxtr = cxtr; ctx->pn_conn = 0; ctx->listener = (dx_listener_t*) pn_listener_context(listener); ctx->connector = 0; ctx->context = ctx->listener->context; ctx->ufd = 0; pn_connector_set_context(cxtr, ctx); listener = pn_driver_listener(driver); } } static void handle_signals_LH(void) { int signum = dx_server->pending_signal; if (signum) { dx_server->pending_signal = 0; if (dx_server->signal_handler) { sys_mutex_unlock(dx_server->lock); dx_server->signal_handler(dx_server->signal_context, signum); sys_mutex_lock(dx_server->lock); } } } static void block_if_paused_LH(void) { if (dx_server->pause_requests > 0) { dx_server->threads_paused++; sys_cond_signal_all(dx_server->cond); while (dx_server->pause_requests > 0) sys_cond_wait(dx_server->cond, dx_server->lock); dx_server->threads_paused--; } } static void process_connector(pn_connector_t *cxtr) { dx_connection_t *ctx = pn_connector_context(cxtr); int events = 0; int auth_passes = 0; if (ctx->state == CONN_STATE_USER) { dx_server->ufd_handler(ctx->ufd->context, ctx->ufd); return; } do { // // Step the engine for pre-handler processing // pn_connector_process(cxtr); // // Call the handler that is appropriate for the connector's state. // switch (ctx->state) { case CONN_STATE_CONNECTING: if (!pn_connector_closed(cxtr)) { ctx->state = CONN_STATE_SASL_CLIENT; assert(ctx->connector); ctx->connector->state = CXTR_STATE_OPEN; events = 1; } else { ctx->state = CONN_STATE_FAILED; events = 0; } break; case CONN_STATE_SASL_CLIENT: if (auth_passes == 0) { auth_client_handler(cxtr); events = 1; } else { auth_passes++; events = 0; } break; case CONN_STATE_SASL_SERVER: if (auth_passes == 0) { auth_server_handler(cxtr); events = 1; } else { auth_passes++; events = 0; } break; case CONN_STATE_OPENING: ctx->state = CONN_STATE_OPERATIONAL; pn_connection_t *conn = pn_connection(); pn_connection_set_container(conn, "dispatch"); // TODO - make unique pn_connector_set_connection(cxtr, conn); pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy if (ctx->listener) { ce = DX_CONN_EVENT_LISTENER_OPEN; } else if (ctx->connector) { ce = DX_CONN_EVENT_CONNECTOR_OPEN; ctx->connector->delay = 0; } else assert(0); dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); events = 1; break; case CONN_STATE_OPERATIONAL: if (pn_connector_closed(cxtr)) { dx_server->conn_handler(ctx->context, DX_CONN_EVENT_CLOSE, (dx_connection_t*) pn_connector_context(cxtr)); events = 0; } else events = dx_server->conn_handler(ctx->context, DX_CONN_EVENT_PROCESS, (dx_connection_t*) pn_connector_context(cxtr)); break; default: break; } } while (events > 0); } // // TEMPORARY FUNCTION PROTOTYPES // void pn_driver_wait_1(pn_driver_t *d); int pn_driver_wait_2(pn_driver_t *d, int timeout); void pn_driver_wait_3(pn_driver_t *d); // // END TEMPORARY // static void *thread_run(void *arg) { dx_thread_t *thread = (dx_thread_t*) arg; pn_connector_t *work; pn_connection_t *conn; dx_connection_t *ctx; int error; int poll_result; int timer_holdoff = 0; if (!thread) return 0; thread->running = 1; if (thread->canceled) return 0; // // Invoke the start handler if the application supplied one. // This handler can be used to set NUMA or processor affinnity for the thread. // if (dx_server->start_handler) dx_server->start_handler(dx_server->start_context, thread->thread_id); // // Main Loop // while (thread->running) { sys_mutex_lock(dx_server->lock); // // Check for pending signals to process // handle_signals_LH(); if (!thread->running) { sys_mutex_unlock(dx_server->lock); break; } // // Check to see if the server is pausing. If so, block here. // block_if_paused_LH(); if (!thread->running) { sys_mutex_unlock(dx_server->lock); break; } // // Service pending timers. // dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); if (timer) { DEQ_REMOVE_HEAD(dx_server->pending_timers); // // Mark the timer as idle in case it reschedules itself. // dx_timer_idle_LH(timer); // // Release the lock and invoke the connection handler. // sys_mutex_unlock(dx_server->lock); timer->handler(timer->context); pn_driver_wakeup(dx_server->driver); continue; } // // Check the work queue for connectors scheduled for processing. // work = work_queue_get(dx_server->work_queue); if (!work) { // // There is no pending work to do // if (dx_server->a_thread_is_waiting) { // // Another thread is waiting on the proton driver, this thread must // wait on the condition variable until signaled. // sys_cond_wait(dx_server->cond, dx_server->lock); } else { // // This thread elects itself to wait on the proton driver. Set the // thread-is-waiting flag so other idle threads will not interfere. // dx_server->a_thread_is_waiting = true; // // Ask the timer module when its next timer is scheduled to fire. We'll // use this value in driver_wait as the timeout. If there are no scheduled // timers, the returned value will be -1. // long duration = dx_timer_next_duration_LH(); // // Invoke the proton driver's wait sequence. This is a bit of a hack for now // and will be improved in the future. The wait process is divided into three parts, // the first and third of which need to be non-reentrant, and the second of which // must be reentrant (and blocks). // pn_driver_wait_1(dx_server->driver); sys_mutex_unlock(dx_server->lock); do { error = 0; poll_result = pn_driver_wait_2(dx_server->driver, duration); if (poll_result == -1) error = pn_driver_errno(dx_server->driver); } while (error == PN_INTR); if (error) { dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver))); exit(-1); } sys_mutex_lock(dx_server->lock); pn_driver_wait_3(dx_server->driver); if (!thread->running) { sys_mutex_unlock(dx_server->lock); break; } // // Visit the timer module. // if (poll_result == 0 || ++timer_holdoff == 100) { struct timespec tv; clock_gettime(CLOCK_REALTIME, &tv); long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000; dx_timer_visit_LH(milliseconds); timer_holdoff = 0; } // // Process listeners (incoming connections). // thread_process_listeners(dx_server->driver); // // Traverse the list of connectors-needing-service from the proton driver. // If the connector is not already in the work queue and it is not currently // being processed by another thread, put it in the work queue and signal the // condition variable. // work = pn_driver_connector(dx_server->driver); while (work) { ctx = pn_connector_context(work); if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { ctx->enqueued = 1; work_queue_put(dx_server->work_queue, work); sys_cond_signal(dx_server->cond); } work = pn_driver_connector(dx_server->driver); } // // Release our exclusive claim on pn_driver_wait. // dx_server->a_thread_is_waiting = false; } } // // If we were given a connector to work on from the work queue, mark it as // owned by this thread and as no longer enqueued. // if (work) { ctx = pn_connector_context(work); if (ctx->owner_thread == CONTEXT_NO_OWNER) { ctx->owner_thread = thread->thread_id; ctx->enqueued = 0; dx_server->threads_active++; } else { // // This connector is being processed by another thread, re-queue it. // work_queue_put(dx_server->work_queue, work); work = 0; } } sys_mutex_unlock(dx_server->lock); // // Process the connector that we now have exclusive access to. // if (work) { process_connector(work); // // Check to see if the connector was closed during processing // if (pn_connector_closed(work)) { // // Connector is closed. Free the context and the connector. // conn = pn_connector_connection(work); if (ctx->connector) { ctx->connector->ctx = 0; ctx->connector->state = CXTR_STATE_CONNECTING; dx_timer_schedule(ctx->connector->timer, ctx->connector->delay); } sys_mutex_lock(dx_server->lock); free_dx_connection_t(ctx); pn_connector_free(work); if (conn) pn_connection_free(conn); dx_server->threads_active--; sys_mutex_unlock(dx_server->lock); } else { // // The connector lives on. Mark it as no longer owned by this thread. // sys_mutex_lock(dx_server->lock); ctx->owner_thread = CONTEXT_NO_OWNER; dx_server->threads_active--; sys_mutex_unlock(dx_server->lock); } // // Wake up the proton driver to force it to reconsider its set of FDs // in light of the processing that just occurred. // pn_driver_wakeup(dx_server->driver); } } return 0; } static void thread_start(dx_thread_t *thread) { if (!thread) return; thread->using_thread = 1; thread->thread = sys_thread(thread_run, (void*) thread); } static void thread_cancel(dx_thread_t *thread) { if (!thread) return; thread->running = 0; thread->canceled = 1; } static void thread_join(dx_thread_t *thread) { if (!thread) return; if (thread->using_thread) sys_thread_join(thread->thread); } static void thread_free(dx_thread_t *thread) { if (!thread) return; free(thread); } static void cxtr_try_open(void *context) { dx_connector_t *ct = (dx_connector_t*) context; if (ct->state != CXTR_STATE_CONNECTING) return; dx_connection_t *ctx = new_dx_connection_t(); ctx->state = CONN_STATE_CONNECTING; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; ctx->pn_conn = 0; ctx->listener = 0; ctx->connector = ct; ctx->context = ct->context; ctx->user_context = 0; ctx->ufd = 0; // // pn_connector is not thread safe // sys_mutex_lock(dx_server->lock); ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx); sys_mutex_unlock(dx_server->lock); ct->ctx = ctx; ct->delay = 5000; dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); } void dx_server_initialize(int thread_count) { int i; if (dx_server) return; // TODO - Fail in a more dramatic way dx_alloc_initialize(); dx_server = NEW(dx_server_t); if (!dx_server) return; // TODO - Fail in a more dramatic way dx_server->thread_count = thread_count; dx_server->driver = pn_driver(); dx_server->start_handler = 0; dx_server->conn_handler = 0; dx_server->signal_handler = 0; dx_server->ufd_handler = 0; dx_server->start_context = 0; dx_server->signal_context = 0; dx_server->lock = sys_mutex(); dx_server->cond = sys_cond(); dx_timer_initialize(dx_server->lock); dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count); for (i = 0; i < thread_count; i++) dx_server->threads[i] = thread(i); dx_server->work_queue = work_queue(); DEQ_INIT(dx_server->pending_timers); dx_server->a_thread_is_waiting = false; dx_server->threads_active = 0; dx_server->pause_requests = 0; dx_server->threads_paused = 0; dx_server->pause_next_sequence = 0; dx_server->pause_now_serving = 0; dx_server->pending_signal = 0; } void dx_server_finalize(void) { int i; if (!dx_server) return; for (i = 0; i < dx_server->thread_count; i++) thread_free(dx_server->threads[i]); work_queue_free(dx_server->work_queue); pn_driver_free(dx_server->driver); sys_mutex_free(dx_server->lock); sys_cond_free(dx_server->cond); free(dx_server); dx_server = 0; } void dx_server_set_conn_handler(dx_conn_handler_cb_t handler) { dx_server->conn_handler = handler; } void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context) { dx_server->signal_handler = handler; dx_server->signal_context = context; } void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context) { dx_server->start_handler = handler; dx_server->start_context = context; } void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler) { dx_server->ufd_handler = ufd_handler; } void dx_server_run(void) { int i; if (!dx_server) return; assert(dx_server->conn_handler); // Server can't run without a connection handler. for (i = 1; i < dx_server->thread_count; i++) thread_start(dx_server->threads[i]); dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count); thread_run((void*) dx_server->threads[0]); for (i = 1; i < dx_server->thread_count; i++) thread_join(dx_server->threads[i]); dx_log(module, LOG_INFO, "Shut Down"); } void dx_server_stop(void) { int idx; sys_mutex_lock(dx_server->lock); for (idx = 0; idx < dx_server->thread_count; idx++) thread_cancel(dx_server->threads[idx]); sys_cond_signal_all(dx_server->cond); pn_driver_wakeup(dx_server->driver); sys_mutex_unlock(dx_server->lock); } void dx_server_signal(int signum) { signal(signum, signal_handler); } void dx_server_pause(void) { sys_mutex_lock(dx_server->lock); // // Bump the request count to stop all the threads. // dx_server->pause_requests++; int my_sequence = dx_server->pause_next_sequence++; // // Awaken all threads that are currently blocking. // sys_cond_signal_all(dx_server->cond); pn_driver_wakeup(dx_server->driver); // // Wait for the paused thread count plus the number of threads requesting a pause to equal // the total thread count. Also, don't exit the blocking loop until now_serving equals our // sequence number. This ensures that concurrent pausers don't run at the same time. // while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) || (my_sequence != dx_server->pause_now_serving)) sys_cond_wait(dx_server->cond, dx_server->lock); sys_mutex_unlock(dx_server->lock); } void dx_server_resume(void) { sys_mutex_lock(dx_server->lock); dx_server->pause_requests--; dx_server->pause_now_serving++; sys_cond_signal_all(dx_server->cond); sys_mutex_unlock(dx_server->lock); } void dx_server_activate(dx_connection_t *ctx) { if (!ctx) return; pn_connector_t *ctor = ctx->pn_cxtr; if (!ctor) return; if (!pn_connector_closed(ctor)) pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE); } void dx_connection_set_context(dx_connection_t *conn, void *context) { conn->user_context = context; } void *dx_connection_get_context(dx_connection_t *conn) { return conn->user_context; } pn_connection_t *dx_connection_pn(dx_connection_t *conn) { return conn->pn_conn; } dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context) { dx_listener_t *li = new_dx_listener_t(); if (!li) return 0; li->config = config; li->context = context; li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li); if (!li->pn_listener) { dx_log(module, LOG_ERROR, "Driver Error %d (%s)", pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver)); free_dx_listener_t(li); return 0; } dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port); return li; } void dx_server_listener_free(dx_listener_t* li) { pn_listener_free(li->pn_listener); free_dx_listener_t(li); } void dx_server_listener_close(dx_listener_t* li) { pn_listener_close(li->pn_listener); } dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context) { dx_connector_t *ct = new_dx_connector_t(); if (!ct) return 0; ct->state = CXTR_STATE_CONNECTING; ct->config = config; ct->context = context; ct->ctx = 0; ct->timer = dx_timer(cxtr_try_open, (void*) ct); ct->delay = 0; dx_timer_schedule(ct->timer, ct->delay); return ct; } void dx_server_connector_free(dx_connector_t* ct) { // Don't free the proton connector. This will be done by the connector // processing/cleanup. if (ct->ctx) { pn_connector_close(ct->ctx->pn_cxtr); ct->ctx->connector = 0; } dx_timer_free(ct->timer); free_dx_connector_t(ct); } dx_user_fd_t *dx_user_fd(int fd, void *context) { dx_user_fd_t *ufd = new_dx_user_fd_t(); if (!ufd) return 0; dx_connection_t *ctx = new_dx_connection_t(); ctx->state = CONN_STATE_USER; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; ctx->pn_conn = 0; ctx->listener = 0; ctx->connector = 0; ctx->context = 0; ctx->user_context = 0; ctx->ufd = ufd; ufd->context = context; ufd->fd = fd; ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx); pn_driver_wakeup(dx_server->driver); return ufd; } void dx_user_fd_free(dx_user_fd_t *ufd) { pn_connector_close(ufd->pn_conn); free_dx_user_fd_t(ufd); } void dx_user_fd_activate_read(dx_user_fd_t *ufd) { pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE); pn_driver_wakeup(dx_server->driver); } void dx_user_fd_activate_write(dx_user_fd_t *ufd) { pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE); pn_driver_wakeup(dx_server->driver); } bool dx_user_fd_is_readable(dx_user_fd_t *ufd) { return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE); } bool dx_user_fd_is_writeable(dx_user_fd_t *ufd) { return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE); } void dx_server_timer_pending_LH(dx_timer_t *timer) { DEQ_INSERT_TAIL(dx_server->pending_timers, timer); } void dx_server_timer_cancel_LH(dx_timer_t *timer) { DEQ_REMOVE(dx_server->pending_timers, timer); }