summaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-09-27 17:31:48 +0000
committerTed Ross <tross@apache.org>2013-09-27 17:31:48 +0000
commit28f69d5c791b558b6336f6e4b092dd13e2ea4adb (patch)
treef322b3c727b4cdc42f11edecdafb9d6ea568bd19 /extras
parent7278dd8b4063da2473ae961cf3e50fa783d9b775 (diff)
downloadqpid-python-28f69d5c791b558b6336f6e4b092dd13e2ea4adb.tar.gz
QPID-5045 - Added connection-resident shared state for links to associate links on a connection.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1526988 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
-rw-r--r--extras/dispatch/include/qpid/dispatch/container.h12
-rw-r--r--extras/dispatch/include/qpid/dispatch/server.h18
-rw-r--r--extras/dispatch/python/qpid/dispatch/router/router_engine.py2
-rw-r--r--extras/dispatch/src/container.c30
-rw-r--r--extras/dispatch/src/router_node.c34
-rw-r--r--extras/dispatch/src/router_private.h8
-rw-r--r--extras/dispatch/src/server.c16
-rw-r--r--extras/dispatch/src/server_private.h1
8 files changed, 115 insertions, 6 deletions
diff --git a/extras/dispatch/include/qpid/dispatch/container.h b/extras/dispatch/include/qpid/dispatch/container.h
index a306895b50..62373f7e61 100644
--- a/extras/dispatch/include/qpid/dispatch/container.h
+++ b/extras/dispatch/include/qpid/dispatch/container.h
@@ -152,8 +152,20 @@ dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node);
dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node);
dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name);
+
+/**
+ * Context associated with the link for storing link-specific state.
+ */
void dx_link_set_context(dx_link_t *link, void *link_context);
void *dx_link_get_context(dx_link_t *link);
+
+/**
+ * Link context associated with the link's connection for storing state shared across
+ * all links in a connection.
+ */
+void dx_link_set_conn_context(dx_link_t *link, void *link_context);
+void *dx_link_get_conn_context(dx_link_t *link);
+
pn_link_t *dx_link_pn(dx_link_t *link);
pn_terminus_t *dx_link_source(dx_link_t *link);
pn_terminus_t *dx_link_target(dx_link_t *link);
diff --git a/extras/dispatch/include/qpid/dispatch/server.h b/extras/dispatch/include/qpid/dispatch/server.h
index 7b2d4a2432..dcdc6ad73f 100644
--- a/extras/dispatch/include/qpid/dispatch/server.h
+++ b/extras/dispatch/include/qpid/dispatch/server.h
@@ -254,6 +254,24 @@ void *dx_connection_get_context(dx_connection_t *conn);
/**
+ * \brief Set the link context for a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @param context Link context to be stored with the connection.
+ */
+void dx_connection_set_link_context(dx_connection_t *conn, void *context);
+
+
+/**
+ * \brief Get the link context from a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The link context stored with the connection.
+ */
+void *dx_connection_get_link_context(dx_connection_t *conn);
+
+
+/**
* \brief Activate a connection for output.
*
* This function is used to request that the server activate the indicated
diff --git a/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/extras/dispatch/python/qpid/dispatch/router/router_engine.py
index 82f51184a9..6fde822444 100644
--- a/extras/dispatch/python/qpid/dispatch/router/router_engine.py
+++ b/extras/dispatch/python/qpid/dispatch/router/router_engine.py
@@ -281,4 +281,4 @@ class RouterEngine:
def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
(address, reachable, neighbor, link_bit, router_bit))
- self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit)
+ self.router_adapter.node_updated(address, reachable, neighbor, link_bit, router_bit)
diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c
index cc46f3ce77..3500e08406 100644
--- a/extras/dispatch/src/container.c
+++ b/extras/dispatch/src/container.c
@@ -651,6 +651,36 @@ void *dx_link_get_context(dx_link_t *link)
}
+void dx_link_set_conn_context(dx_link_t *link, void *context)
+{
+ pn_session_t *pn_sess = pn_link_session(link->pn_link);
+ if (!pn_sess)
+ return;
+ pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+ if (!pn_conn)
+ return;
+ dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+ if (!conn)
+ return;
+ dx_connection_set_link_context(conn, context);
+}
+
+
+void *dx_link_get_conn_context(dx_link_t *link)
+{
+ pn_session_t *pn_sess = pn_link_session(link->pn_link);
+ if (!pn_sess)
+ return 0;
+ pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+ if (!pn_conn)
+ return 0;
+ dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+ if (!conn)
+ return 0;
+ return dx_connection_get_link_context(conn);
+}
+
+
pn_link_t *dx_link_pn(dx_link_t *link)
{
return link->pn_link;
diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c
index 806363c5ce..240f6cdc41 100644
--- a/extras/dispatch/src/router_node.c
+++ b/extras/dispatch/src/router_node.c
@@ -55,6 +55,7 @@ ALLOC_DEFINE(dx_router_node_t);
ALLOC_DEFINE(dx_router_ref_t);
ALLOC_DEFINE(dx_router_link_ref_t);
ALLOC_DEFINE(dx_address_t);
+ALLOC_DEFINE(dx_router_conn_t);
static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
@@ -95,6 +96,8 @@ static int dx_router_terminus_is_router(pn_terminus_t *term)
{
pn_data_t *cap = pn_terminus_capabilities(term);
+ pn_data_rewind(cap);
+ pn_data_next(cap);
if (cap && pn_data_type(cap) == PN_SYMBOL) {
pn_bytes_t sym = pn_data_get_symbol(cap);
if (sym.size == strlen(DX_CAPABILITY_ROUTER) &&
@@ -121,9 +124,24 @@ static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size
}
-static int dx_router_find_mask_bit(dx_link_t *link)
+static int dx_router_find_mask_bit_LH(dx_router_t *router, dx_link_t *link)
{
- return 0; // TODO
+ dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+ if (shared)
+ return shared->mask_bit;
+
+ int mask_bit;
+ if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+ dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+ } else {
+ dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+ return -1;
+ }
+
+ shared = new_dx_router_conn_t();
+ shared->mask_bit = mask_bit;
+ dx_link_set_conn_context(link, shared);
+ return mask_bit;
}
@@ -542,7 +560,6 @@ static int router_incoming_link_handler(void* context, dx_link_t *link)
int is_router = dx_router_terminus_is_router(dx_link_remote_source(link));
DEQ_ITEM_INIT(rlink);
- rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
rlink->link_direction = DX_INCOMING;
rlink->owning_addr = 0;
@@ -556,6 +573,7 @@ static int router_incoming_link_handler(void* context, dx_link_t *link)
dx_link_set_context(link, rlink);
sys_mutex_lock(router->lock);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
@@ -599,7 +617,6 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
//
dx_router_link_t *rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
- rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
rlink->link_direction = DX_OUTGOING;
rlink->owning_addr = 0;
@@ -615,6 +632,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
sys_mutex_lock(router->lock);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
if (is_router) {
//
@@ -675,6 +693,12 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed
{
dx_router_t *router = (dx_router_t*) context;
dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+
+ if (shared) {
+ dx_link_set_conn_context(link, 0);
+ free_dx_router_conn_t(shared);
+ }
if (!rlink)
return 0;
@@ -766,7 +790,7 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2);
// TODO - We don't want to have to cast away the constness of the literal string here!
// See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_source(sender)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
diff --git a/extras/dispatch/src/router_private.h b/extras/dispatch/src/router_private.h
index 8e481375ab..23c8b74a62 100644
--- a/extras/dispatch/src/router_private.h
+++ b/extras/dispatch/src/router_private.h
@@ -23,6 +23,7 @@ typedef struct dx_router_link_t dx_router_link_t;
typedef struct dx_router_node_t dx_router_node_t;
typedef struct dx_router_ref_t dx_router_ref_t;
typedef struct dx_router_link_ref_t dx_router_link_ref_t;
+typedef struct dx_router_conn_t dx_router_conn_t;
typedef enum {
@@ -91,6 +92,13 @@ ALLOC_DECLARE(dx_router_link_ref_t);
DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t);
+struct dx_router_conn_t {
+ int mask_bit;
+};
+
+ALLOC_DECLARE(dx_router_conn_t);
+
+
struct dx_address_t {
DEQ_LINKS(dx_address_t);
dx_router_message_cb handler; // In-Process Consumer
diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c
index 5420d3b776..7424a45871 100644
--- a/extras/dispatch/src/server.c
+++ b/extras/dispatch/src/server.c
@@ -109,6 +109,8 @@ static void thread_process_listeners(dx_server_t *dx_server)
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
+ ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = 0;
pn_connection_t *conn = pn_connection();
@@ -630,6 +632,7 @@ static void cxtr_try_open(void *context)
ctx->connector = ct;
ctx->context = ct->context;
ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = 0;
//
@@ -878,6 +881,18 @@ void *dx_connection_get_context(dx_connection_t *conn)
}
+void dx_connection_set_link_context(dx_connection_t *conn, void *context)
+{
+ conn->link_context = context;
+}
+
+
+void *dx_connection_get_link_context(dx_connection_t *conn)
+{
+ return conn->link_context;
+}
+
+
pn_connection_t *dx_connection_pn(dx_connection_t *conn)
{
return conn->pn_conn;
@@ -976,6 +991,7 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t *dx, int fd, void *context)
ctx->connector = 0;
ctx->context = 0;
ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = ufd;
ufd->context = context;
diff --git a/extras/dispatch/src/server_private.h b/extras/dispatch/src/server_private.h
index 5782492f22..86a3ef98f1 100644
--- a/extras/dispatch/src/server_private.h
+++ b/extras/dispatch/src/server_private.h
@@ -78,6 +78,7 @@ struct dx_connection_t {
dx_connector_t *connector;
void *context; // Copy of context from listener or connector
void *user_context;
+ void *link_context; // Context shared by this connection's links
dx_user_fd_t *ufd;
};