summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/server.c
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-07-12 21:44:14 +0000
committerTed Ross <tross@apache.org>2013-07-12 21:44:14 +0000
commit5755e220f97b0595db84a3d547d622f5965a58bc (patch)
tree438841bcca4b8a42a2b1f99e7533f7a1da56e7cf /qpid/extras/dispatch/src/server.c
parent4af3b671aa91f9346648cc91eedf3b145881ea12 (diff)
downloadqpid-python-5755e220f97b0595db84a3d547d622f5965a58bc.tar.gz
QPID-4967 - Router code advances
o Fixed handling of SASL on outbound connections o Added Send and Receive message paths in and out of Python modules o Overhauled the route-table data structures - Multicasting is now supported (multiple sender links with the same address) - Support has been added for message-based routing semantics as well as link-based o Two Dispatch processes connected to each other will now discover each other as neighbors git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1502698 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/server.c')
-rw-r--r--qpid/extras/dispatch/src/server.c99
1 files changed, 74 insertions, 25 deletions
diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c
index 52450b42b6..a1b5bab641 100644
--- a/qpid/extras/dispatch/src/server.c
+++ b/qpid/extras/dispatch/src/server.c
@@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *dx_server, int id)
}
-static void thread_process_listeners(pn_driver_t *driver)
+static void thread_process_listeners(dx_server_t *dx_server)
{
+ pn_driver_t *driver = dx_server->driver;
pn_listener_t *listener = pn_driver_listener(driver);
pn_connector_t *cxtr;
dx_connection_t *ctx;
while (listener) {
- dx_log(module, LOG_TRACE, "Accepting Connection");
cxtr = pn_listener_accept(listener);
+ dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
ctx = new_dx_connection_t();
ctx->state = CONN_STATE_OPENING;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
- ctx->pn_conn = 0;
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
ctx->ufd = 0;
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, dx_server->container_name);
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
//
// Get a pointer to the transport so we can insert security components into it
//
@@ -201,20 +207,12 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
// Call the handler that is appropriate for the connector's state.
//
switch (ctx->state) {
- case CONN_STATE_CONNECTING:
- if (!pn_connector_closed(cxtr)) {
- //ctx->state = CONN_STATE_SASL_CLIENT;
- assert(ctx->connector);
- ctx->connector->state = CXTR_STATE_OPEN;
- events = 1;
- } else {
+ case CONN_STATE_CONNECTING: {
+ if (pn_connector_closed(cxtr)) {
ctx->state = CONN_STATE_FAILED;
events = 0;
+ break;
}
- break;
-
- case CONN_STATE_OPENING:
- ctx->state = CONN_STATE_OPERATIONAL;
pn_connection_t *conn = pn_connection();
pn_connection_set_container(conn, dx_server->container_name);
@@ -222,20 +220,71 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
- dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ const dx_server_config_t *config = ctx->connector->config;
- if (ctx->listener) {
- ce = DX_CONN_EVENT_LISTENER_OPEN;
- } else if (ctx->connector) {
- ce = DX_CONN_EVENT_CONNECTOR_OPEN;
- ctx->connector->delay = 0;
- } else
- assert(0);
+ //
+ // Set up SSL if appropriate
+ //
+ if (config->ssl_enabled) {
+ pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+ pn_ssl_domain_set_credentials(domain,
+ config->ssl_certificate_file,
+ config->ssl_private_key_file,
+ config->ssl_password);
+
+ if (config->ssl_require_peer_authentication)
+ pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db);
+
+ pn_ssl_t *ssl = pn_ssl(tport);
+ pn_ssl_init(ssl, domain, 0);
+ pn_ssl_domain_free(domain);
+ }
+
+ //
+ // Set up SASL
+ //
+ pn_sasl_t *sasl = pn_sasl(tport);
+ pn_sasl_mechanisms(sasl, config->sasl_mechanisms);
+ pn_sasl_client(sasl);
- dx_server->conn_handler(dx_server->conn_handler_context,
- ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ ctx->state = CONN_STATE_OPENING;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
events = 1;
break;
+ }
+
+ case CONN_STATE_OPENING: {
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ pn_sasl_t *sasl = pn_sasl(tport);
+
+ if (pn_sasl_outcome(sasl) == PN_SASL_OK) {
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = DX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ dx_server->conn_handler(dx_server->conn_handler_context,
+ ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+ }
+ else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) {
+ ctx->state = CONN_STATE_FAILED;
+ if (ctx->connector) {
+ const dx_server_config_t *config = ctx->connector->config;
+ dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+ }
+ }
+ }
case CONN_STATE_OPERATIONAL:
if (pn_connector_closed(cxtr)) {
@@ -411,7 +460,7 @@ static void *thread_run(void *arg)
//
// Process listeners (incoming connections).
//
- thread_process_listeners(dx_server->driver);
+ thread_process_listeners(dx_server);
//
// Traverse the list of connectors-needing-service from the proton driver.