diff options
| author | Ted Ross <tross@apache.org> | 2013-09-27 17:31:48 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-09-27 17:31:48 +0000 |
| commit | 28f69d5c791b558b6336f6e4b092dd13e2ea4adb (patch) | |
| tree | f322b3c727b4cdc42f11edecdafb9d6ea568bd19 /extras | |
| parent | 7278dd8b4063da2473ae961cf3e50fa783d9b775 (diff) | |
| download | qpid-python-28f69d5c791b558b6336f6e4b092dd13e2ea4adb.tar.gz | |
QPID-5045 - Added connection-resident shared state for links to associate links on a connection.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1526988 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
| -rw-r--r-- | extras/dispatch/include/qpid/dispatch/container.h | 12 | ||||
| -rw-r--r-- | extras/dispatch/include/qpid/dispatch/server.h | 18 | ||||
| -rw-r--r-- | extras/dispatch/python/qpid/dispatch/router/router_engine.py | 2 | ||||
| -rw-r--r-- | extras/dispatch/src/container.c | 30 | ||||
| -rw-r--r-- | extras/dispatch/src/router_node.c | 34 | ||||
| -rw-r--r-- | extras/dispatch/src/router_private.h | 8 | ||||
| -rw-r--r-- | extras/dispatch/src/server.c | 16 | ||||
| -rw-r--r-- | extras/dispatch/src/server_private.h | 1 |
8 files changed, 115 insertions, 6 deletions
diff --git a/extras/dispatch/include/qpid/dispatch/container.h b/extras/dispatch/include/qpid/dispatch/container.h index a306895b50..62373f7e61 100644 --- a/extras/dispatch/include/qpid/dispatch/container.h +++ b/extras/dispatch/include/qpid/dispatch/container.h @@ -152,8 +152,20 @@ dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node); dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node); dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name); + +/** + * Context associated with the link for storing link-specific state. + */ void dx_link_set_context(dx_link_t *link, void *link_context); void *dx_link_get_context(dx_link_t *link); + +/** + * Link context associated with the link's connection for storing state shared across + * all links in a connection. + */ +void dx_link_set_conn_context(dx_link_t *link, void *link_context); +void *dx_link_get_conn_context(dx_link_t *link); + pn_link_t *dx_link_pn(dx_link_t *link); pn_terminus_t *dx_link_source(dx_link_t *link); pn_terminus_t *dx_link_target(dx_link_t *link); diff --git a/extras/dispatch/include/qpid/dispatch/server.h b/extras/dispatch/include/qpid/dispatch/server.h index 7b2d4a2432..dcdc6ad73f 100644 --- a/extras/dispatch/include/qpid/dispatch/server.h +++ b/extras/dispatch/include/qpid/dispatch/server.h @@ -254,6 +254,24 @@ void *dx_connection_get_context(dx_connection_t *conn); /** + * \brief Set the link context for a connection. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @param context Link context to be stored with the connection. + */ +void dx_connection_set_link_context(dx_connection_t *conn, void *context); + + +/** + * \brief Get the link context from a connection. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @return The link context stored with the connection. + */ +void *dx_connection_get_link_context(dx_connection_t *conn); + + +/** * \brief Activate a connection for output. * * This function is used to request that the server activate the indicated diff --git a/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/extras/dispatch/python/qpid/dispatch/router/router_engine.py index 82f51184a9..6fde822444 100644 --- a/extras/dispatch/python/qpid/dispatch/router/router_engine.py +++ b/extras/dispatch/python/qpid/dispatch/router/router_engine.py @@ -281,4 +281,4 @@ class RouterEngine: def node_updated(self, address, reachable, neighbor, link_bit, router_bit): self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \ (address, reachable, neighbor, link_bit, router_bit)) - self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit) + self.router_adapter.node_updated(address, reachable, neighbor, link_bit, router_bit) diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c index cc46f3ce77..3500e08406 100644 --- a/extras/dispatch/src/container.c +++ b/extras/dispatch/src/container.c @@ -651,6 +651,36 @@ void *dx_link_get_context(dx_link_t *link) } +void dx_link_set_conn_context(dx_link_t *link, void *context) +{ + pn_session_t *pn_sess = pn_link_session(link->pn_link); + if (!pn_sess) + return; + pn_connection_t *pn_conn = pn_session_connection(pn_sess); + if (!pn_conn) + return; + dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn); + if (!conn) + return; + dx_connection_set_link_context(conn, context); +} + + +void *dx_link_get_conn_context(dx_link_t *link) +{ + pn_session_t *pn_sess = pn_link_session(link->pn_link); + if (!pn_sess) + return 0; + pn_connection_t *pn_conn = pn_session_connection(pn_sess); + if (!pn_conn) + return 0; + dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn); + if (!conn) + return 0; + return dx_connection_get_link_context(conn); +} + + pn_link_t *dx_link_pn(dx_link_t *link) { return link->pn_link; diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c index 806363c5ce..240f6cdc41 100644 --- a/extras/dispatch/src/router_node.c +++ b/extras/dispatch/src/router_node.c @@ -55,6 +55,7 @@ ALLOC_DEFINE(dx_router_node_t); ALLOC_DEFINE(dx_router_ref_t); ALLOC_DEFINE(dx_router_link_ref_t); ALLOC_DEFINE(dx_address_t); +ALLOC_DEFINE(dx_router_conn_t); static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) @@ -95,6 +96,8 @@ static int dx_router_terminus_is_router(pn_terminus_t *term) { pn_data_t *cap = pn_terminus_capabilities(term); + pn_data_rewind(cap); + pn_data_next(cap); if (cap && pn_data_type(cap) == PN_SYMBOL) { pn_bytes_t sym = pn_data_get_symbol(cap); if (sym.size == strlen(DX_CAPABILITY_ROUTER) && @@ -121,9 +124,24 @@ static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size } -static int dx_router_find_mask_bit(dx_link_t *link) +static int dx_router_find_mask_bit_LH(dx_router_t *router, dx_link_t *link) { - return 0; // TODO + dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link); + if (shared) + return shared->mask_bit; + + int mask_bit; + if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { + dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); + } else { + dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); + return -1; + } + + shared = new_dx_router_conn_t(); + shared->mask_bit = mask_bit; + dx_link_set_conn_context(link, shared); + return mask_bit; } @@ -542,7 +560,6 @@ static int router_incoming_link_handler(void* context, dx_link_t *link) int is_router = dx_router_terminus_is_router(dx_link_remote_source(link)); DEQ_ITEM_INIT(rlink); - rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; rlink->link_direction = DX_INCOMING; rlink->owning_addr = 0; @@ -556,6 +573,7 @@ static int router_incoming_link_handler(void* context, dx_link_t *link) dx_link_set_context(link, rlink); sys_mutex_lock(router->lock); + rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0; DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); @@ -599,7 +617,6 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) // dx_router_link_t *rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; rlink->link_direction = DX_OUTGOING; rlink->owning_addr = 0; @@ -615,6 +632,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); sys_mutex_lock(router->lock); + rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0; if (is_router) { // @@ -675,6 +693,12 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed { dx_router_t *router = (dx_router_t*) context; dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link); + + if (shared) { + dx_link_set_conn_context(link, 0); + free_dx_router_conn_t(shared); + } if (!rlink) return 0; @@ -766,7 +790,7 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2); // TODO - We don't want to have to cast away the constness of the literal string here! // See PROTON-429 - pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); + pn_data_put_symbol(pn_terminus_capabilities(dx_link_source(sender)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); diff --git a/extras/dispatch/src/router_private.h b/extras/dispatch/src/router_private.h index 8e481375ab..23c8b74a62 100644 --- a/extras/dispatch/src/router_private.h +++ b/extras/dispatch/src/router_private.h @@ -23,6 +23,7 @@ typedef struct dx_router_link_t dx_router_link_t; typedef struct dx_router_node_t dx_router_node_t; typedef struct dx_router_ref_t dx_router_ref_t; typedef struct dx_router_link_ref_t dx_router_link_ref_t; +typedef struct dx_router_conn_t dx_router_conn_t; typedef enum { @@ -91,6 +92,13 @@ ALLOC_DECLARE(dx_router_link_ref_t); DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t); +struct dx_router_conn_t { + int mask_bit; +}; + +ALLOC_DECLARE(dx_router_conn_t); + + struct dx_address_t { DEQ_LINKS(dx_address_t); dx_router_message_cb handler; // In-Process Consumer diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c index 5420d3b776..7424a45871 100644 --- a/extras/dispatch/src/server.c +++ b/extras/dispatch/src/server.c @@ -109,6 +109,8 @@ static void thread_process_listeners(dx_server_t *dx_server) ctx->listener = (dx_listener_t*) pn_listener_context(listener); ctx->connector = 0; ctx->context = ctx->listener->context; + ctx->user_context = 0; + ctx->link_context = 0; ctx->ufd = 0; pn_connection_t *conn = pn_connection(); @@ -630,6 +632,7 @@ static void cxtr_try_open(void *context) ctx->connector = ct; ctx->context = ct->context; ctx->user_context = 0; + ctx->link_context = 0; ctx->ufd = 0; // @@ -878,6 +881,18 @@ void *dx_connection_get_context(dx_connection_t *conn) } +void dx_connection_set_link_context(dx_connection_t *conn, void *context) +{ + conn->link_context = context; +} + + +void *dx_connection_get_link_context(dx_connection_t *conn) +{ + return conn->link_context; +} + + pn_connection_t *dx_connection_pn(dx_connection_t *conn) { return conn->pn_conn; @@ -976,6 +991,7 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t *dx, int fd, void *context) ctx->connector = 0; ctx->context = 0; ctx->user_context = 0; + ctx->link_context = 0; ctx->ufd = ufd; ufd->context = context; diff --git a/extras/dispatch/src/server_private.h b/extras/dispatch/src/server_private.h index 5782492f22..86a3ef98f1 100644 --- a/extras/dispatch/src/server_private.h +++ b/extras/dispatch/src/server_private.h @@ -78,6 +78,7 @@ struct dx_connection_t { dx_connector_t *connector; void *context; // Copy of context from listener or connector void *user_context; + void *link_context; // Context shared by this connection's links dx_user_fd_t *ufd; }; |
