diff options
| author | Ted Ross <tross@apache.org> | 2013-07-12 21:44:14 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-07-12 21:44:14 +0000 |
| commit | 5755e220f97b0595db84a3d547d622f5965a58bc (patch) | |
| tree | 438841bcca4b8a42a2b1f99e7533f7a1da56e7cf /qpid/extras/dispatch/src/server.c | |
| parent | 4af3b671aa91f9346648cc91eedf3b145881ea12 (diff) | |
| download | qpid-python-5755e220f97b0595db84a3d547d622f5965a58bc.tar.gz | |
QPID-4967 - Router code advances
o Fixed handling of SASL on outbound connections
o Added Send and Receive message paths in and out of Python modules
o Overhauled the route-table data structures
- Multicasting is now supported (multiple sender links with the same address)
- Support has been added for message-based routing semantics as well as link-based
o Two Dispatch processes connected to each other will now discover each other as neighbors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1502698 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/server.c')
| -rw-r--r-- | qpid/extras/dispatch/src/server.c | 99 |
1 files changed, 74 insertions, 25 deletions
diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c index 52450b42b6..a1b5bab641 100644 --- a/qpid/extras/dispatch/src/server.c +++ b/qpid/extras/dispatch/src/server.c @@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *dx_server, int id) } -static void thread_process_listeners(pn_driver_t *driver) +static void thread_process_listeners(dx_server_t *dx_server) { + pn_driver_t *driver = dx_server->driver; pn_listener_t *listener = pn_driver_listener(driver); pn_connector_t *cxtr; dx_connection_t *ctx; while (listener) { - dx_log(module, LOG_TRACE, "Accepting Connection"); cxtr = pn_listener_accept(listener); + dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr)); ctx = new_dx_connection_t(); ctx->state = CONN_STATE_OPENING; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; ctx->pn_cxtr = cxtr; - ctx->pn_conn = 0; ctx->listener = (dx_listener_t*) pn_listener_context(listener); ctx->connector = 0; ctx->context = ctx->listener->context; ctx->ufd = 0; + pn_connection_t *conn = pn_connection(); + pn_connection_set_container(conn, dx_server->container_name); + pn_connector_set_connection(cxtr, conn); + pn_connection_set_context(conn, ctx); + ctx->pn_conn = conn; + // // Get a pointer to the transport so we can insert security components into it // @@ -201,20 +207,12 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) // Call the handler that is appropriate for the connector's state. // switch (ctx->state) { - case CONN_STATE_CONNECTING: - if (!pn_connector_closed(cxtr)) { - //ctx->state = CONN_STATE_SASL_CLIENT; - assert(ctx->connector); - ctx->connector->state = CXTR_STATE_OPEN; - events = 1; - } else { + case CONN_STATE_CONNECTING: { + if (pn_connector_closed(cxtr)) { ctx->state = CONN_STATE_FAILED; events = 0; + break; } - break; - - case CONN_STATE_OPENING: - ctx->state = CONN_STATE_OPERATIONAL; pn_connection_t *conn = pn_connection(); pn_connection_set_container(conn, dx_server->container_name); @@ -222,20 +220,71 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; - dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + pn_transport_t *tport = pn_connector_transport(cxtr); + const dx_server_config_t *config = ctx->connector->config; - if (ctx->listener) { - ce = DX_CONN_EVENT_LISTENER_OPEN; - } else if (ctx->connector) { - ce = DX_CONN_EVENT_CONNECTOR_OPEN; - ctx->connector->delay = 0; - } else - assert(0); + // + // Set up SSL if appropriate + // + if (config->ssl_enabled) { + pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT); + pn_ssl_domain_set_credentials(domain, + config->ssl_certificate_file, + config->ssl_private_key_file, + config->ssl_password); + + if (config->ssl_require_peer_authentication) + pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db); + + pn_ssl_t *ssl = pn_ssl(tport); + pn_ssl_init(ssl, domain, 0); + pn_ssl_domain_free(domain); + } + + // + // Set up SASL + // + pn_sasl_t *sasl = pn_sasl(tport); + pn_sasl_mechanisms(sasl, config->sasl_mechanisms); + pn_sasl_client(sasl); - dx_server->conn_handler(dx_server->conn_handler_context, - ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + ctx->state = CONN_STATE_OPENING; + assert(ctx->connector); + ctx->connector->state = CXTR_STATE_OPEN; events = 1; break; + } + + case CONN_STATE_OPENING: { + pn_transport_t *tport = pn_connector_transport(cxtr); + pn_sasl_t *sasl = pn_sasl(tport); + + if (pn_sasl_outcome(sasl) == PN_SASL_OK) { + ctx->state = CONN_STATE_OPERATIONAL; + + dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + + if (ctx->listener) { + ce = DX_CONN_EVENT_LISTENER_OPEN; + } else if (ctx->connector) { + ce = DX_CONN_EVENT_CONNECTOR_OPEN; + ctx->connector->delay = 0; + } else + assert(0); + + dx_server->conn_handler(dx_server->conn_handler_context, + ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + events = 1; + break; + } + else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) { + ctx->state = CONN_STATE_FAILED; + if (ctx->connector) { + const dx_server_config_t *config = ctx->connector->config; + dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port); + } + } + } case CONN_STATE_OPERATIONAL: if (pn_connector_closed(cxtr)) { @@ -411,7 +460,7 @@ static void *thread_run(void *arg) // // Process listeners (incoming connections). // - thread_process_listeners(dx_server->driver); + thread_process_listeners(dx_server); // // Traverse the list of connectors-needing-service from the proton driver. |
