summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-10-04 13:40:59 +0000
committerTed Ross <tross@apache.org>2013-10-04 13:40:59 +0000
commit28302f5604920d7cf7941481ff376f92cdd0e535 (patch)
tree3f7e2686d87cb713f7be84f46c5b4d7eb027c116 /qpid/extras/dispatch/src
parent20fbc65eba57a9526a39652cb1473a8551d3d97b (diff)
downloadqpid-python-28302f5604920d7cf7941481ff376f92cdd0e535.tar.gz
QPID-4967 - Work in progress on multi-router networks
- Added a feature to the hash table to allow referenced objects to hold a direct linkage back to the hash structure for fast deletion and access to the key. This allows the key to be stored in only one place and allows items to be removed without requiring a hash lookup on the key. - Completed the integration of the Python router and the C data structures that track remote routers (neighbor and multi-hop). - Allow multiple addresses in the ioAdapter from Python. - Added a separate address for the hello messages because the messaging pattern is different for these messages. - Added some content to the TODO file. - Added test configurations for a two-router network. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1529163 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
-rw-r--r--qpid/extras/dispatch/src/agent.c2
-rw-r--r--qpid/extras/dispatch/src/container.c4
-rw-r--r--qpid/extras/dispatch/src/hash.c59
-rw-r--r--qpid/extras/dispatch/src/python_embedded.c23
-rw-r--r--qpid/extras/dispatch/src/router_node.c163
-rw-r--r--qpid/extras/dispatch/src/router_private.h14
-rw-r--r--qpid/extras/dispatch/src/router_pynode.c279
7 files changed, 460 insertions, 84 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 2063b8fcc4..3aad9f3427 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -295,7 +295,7 @@ dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx,
cls->query_handler = query_handler;
dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
- int result = hash_insert_const(agent->class_hash, iter, cls);
+ int result = hash_insert_const(agent->class_hash, iter, cls, 0);
dx_field_iterator_free(iter);
if (result < 0)
assert(false);
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index 3500e08406..123e2ffb60 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -507,7 +507,7 @@ int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt)
nt_item->ntype = nt;
sys_mutex_lock(container->lock);
- result = hash_insert_const(container->node_type_map, iter, nt);
+ result = hash_insert_const(container->node_type_map, iter, nt, 0);
DEQ_INSERT_TAIL(container->node_type_list, nt_item);
sys_mutex_unlock(container->lock);
@@ -565,7 +565,7 @@ dx_node_t *dx_container_create_node(dx_dispatch_t *dx,
if (name) {
dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
sys_mutex_lock(container->lock);
- result = hash_insert(container->node_map, iter, node);
+ result = hash_insert(container->node_map, iter, node, 0);
sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
if (result < 0) {
diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c
index 1f7d8aa3f5..49f79925f6 100644
--- a/qpid/extras/dispatch/src/hash.c
+++ b/qpid/extras/dispatch/src/hash.c
@@ -52,6 +52,15 @@ struct hash_t {
};
+struct hash_handle_t {
+ bucket_t *bucket;
+ hash_item_t *item;
+};
+
+ALLOC_DECLARE(hash_handle_t);
+ALLOC_DEFINE(hash_handle_t);
+
+
// djb2 hash algorithm
static unsigned long hash_function(dx_field_iterator_t *iter)
{
@@ -102,7 +111,7 @@ size_t hash_size(hash_t *h)
}
-static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists)
+static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists, hash_handle_t **handle)
{
unsigned long idx = hash_function(key) & h->bucket_mask;
hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
@@ -115,6 +124,8 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
if (item) {
*exists = 1;
+ if (handle)
+ *handle = 0;
return item;
}
@@ -128,14 +139,24 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
DEQ_INSERT_TAIL(h->buckets[idx].items, item);
h->size++;
*exists = 0;
+
+ //
+ // If a pointer to a handle-pointer was supplied, create a handle for this item.
+ //
+ if (handle) {
+ *handle = new_hash_handle_t();
+ (*handle)->bucket = &h->buckets[idx];
+ (*handle)->item = item;
+ }
+
return item;
}
-dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val, hash_handle_t **handle)
{
int exists = 0;
- hash_item_t *item = hash_internal_insert(h, key, &exists);
+ hash_item_t *item = hash_internal_insert(h, key, &exists, handle);
if (!item)
return DX_ERROR_ALLOC;
@@ -149,12 +170,12 @@ dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
}
-dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val, hash_handle_t **handle)
{
assert(h->is_const);
int error = 0;
- hash_item_t *item = hash_internal_insert(h, key, &error);
+ hash_item_t *item = hash_internal_insert(h, key, &error, handle);
if (item)
item->v.val_const = val;
@@ -225,3 +246,31 @@ dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key)
return DX_ERROR_NOT_FOUND;
}
+
+void hash_handle_free(hash_handle_t *handle)
+{
+ if (handle)
+ free_hash_handle_t(handle);
+}
+
+
+const unsigned char *hash_key_by_handle(const hash_handle_t *handle)
+{
+ if (handle)
+ return handle->item->key;
+ return 0;
+}
+
+
+dx_error_t hash_remove_by_handle(hash_t *h, hash_handle_t *handle)
+{
+ if (!handle)
+ return DX_ERROR_NOT_FOUND;
+ free(handle->item->key);
+ DEQ_REMOVE(handle->bucket->items, handle->item);
+ free_hash_item_t(handle->item);
+ h->size--;
+ free_hash_handle_t(handle);
+ return DX_ERROR_NONE;
+}
+
diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c
index a3c41f86f5..c7c8f82dfc 100644
--- a/qpid/extras/dispatch/src/python_embedded.c
+++ b/qpid/extras/dispatch/src/python_embedded.c
@@ -398,7 +398,8 @@ typedef struct {
PyObject *handler;
PyObject *handler_rx_call;
dx_dispatch_t *dx;
- dx_address_t *address;
+ Py_ssize_t addr_count;
+ dx_address_t **addrs;
} IoAdapter;
@@ -469,25 +470,35 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id)
static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds)
{
- const char *address;
- if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
+ PyObject *addrs;
+ if (!PyArg_ParseTuple(args, "OO", &self->handler, &addrs))
return -1;
self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
return -1;
+ if (!PyTuple_Check(addrs))
+ return -1;
+
Py_INCREF(self->handler);
Py_INCREF(self->handler_rx_call);
- self->dx = dispatch;
- self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
+ self->dx = dispatch;
+ self->addr_count = PyTuple_Size(addrs);
+ self->addrs = NEW_PTR_ARRAY(dx_address_t, self->addr_count);
+ for (Py_ssize_t idx = 0; idx < self->addr_count; idx++)
+ self->addrs[idx] = dx_router_register_address(self->dx,
+ PyString_AS_STRING(PyTuple_GetItem(addrs, idx)),
+ dx_io_rx_handler, self);
return 0;
}
static void IoAdapter_dealloc(IoAdapter* self)
{
- dx_router_unregister_address(self->address);
+ for (Py_ssize_t idx = 0; idx < self->addr_count; idx++)
+ dx_router_unregister_address(self->addrs[idx]);
+ free(self->addrs);
Py_DECREF(self->handler);
Py_DECREF(self->handler_rx_call);
self->ob_type->tp_free((PyObject*)self);
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index fea3695620..35fb834029 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -28,8 +28,9 @@
static char *module = "ROUTER";
-static char *local_prefix = "_local/";
-static char *topo_prefix = "_topo/";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
+static char *direct_prefix;
/**
* Address Types and Processing:
@@ -55,7 +56,7 @@ ALLOC_DEFINE(dx_address_t);
ALLOC_DEFINE(dx_router_conn_t);
-static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
DEQ_ITEM_INIT(ref);
@@ -65,7 +66,7 @@ static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
-static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
if (link->ref) {
DEQ_REMOVE(*ref_list, link->ref);
@@ -75,6 +76,31 @@ static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = new_dx_router_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->router = rnode;
+ rnode->ref_count++;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
+
+
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = DEQ_HEAD(*ref_list);
+ while (ref) {
+ if (ref->router == rnode) {
+ DEQ_REMOVE(*ref_list, ref);
+ free_dx_router_ref_t(ref);
+ rnode->ref_count--;
+ break;
+ }
+ ref = DEQ_NEXT(ref);
+ }
+}
+
+
/**
* Check an address to see if it no longer has any associated destinations.
* Depending on its policy, the address may be eligible for being closed out
@@ -262,10 +288,11 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
}
-static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
{
- dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
- dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_field_iterator_t *ingress_iter = 0;
dx_parsed_field_t *trace = 0;
dx_parsed_field_t *ingress = 0;
@@ -293,7 +320,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
trace_item = dx_parse_sub_value(trace, idx);
}
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_list(out_da);
}
@@ -303,15 +330,17 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
//
dx_compose_insert_string(out_da, DX_DA_INGRESS);
if (ingress && dx_parse_is_scalar(ingress)) {
- dx_field_iterator_t *iter = dx_parse_raw(ingress);
- dx_compose_insert_string_iterator(out_da, iter);
+ ingress_iter = dx_parse_raw(ingress);
+ dx_compose_insert_string_iterator(out_da, ingress_iter);
} else
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_map(out_da);
dx_message_set_delivery_annotations(msg, out_da);
dx_compose_free(out_da);
+
+ return ingress_iter;
}
@@ -394,7 +423,8 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
- int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_direct = dx_field_iterator_prefix(iter, direct_prefix);
dx_field_iterator_free(iter);
if (addr) {
@@ -404,9 +434,10 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
//
//
- // Interpret and update the delivery annotations of the message
+ // Interpret and update the delivery annotations of the message. As a convenience,
+ // this function returns the iterator to the ingress field (if it exists).
//
- router_annotate_message(router, msg);
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
//
// Forward to the in-process handler for this message if there is one. The
@@ -446,31 +477,54 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
}
//
- // Forward to the next-hops for remote destinations.
+ // If the address form is direct to this router node, don't relay it on
+ // to any other part of the network.
//
- dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
- dx_router_link_t *dest_link;
- while (dest_node_ref) {
- if (dest_node_ref->router->next_hop)
- dest_link = dest_node_ref->router->next_hop->peer_link;
- else
- dest_link = dest_node_ref->router->peer_link;
- if (dest_link) {
- dx_routed_event_t *re = new_dx_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = dx_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
-
- fanout++;
- if (fanout == 1)
- re->delivery = delivery;
-
- dx_link_activate(dest_link->link);
+ if (!is_direct) {
+ //
+ // Get the mask bit associated with the ingress router for the message.
+ // This will be compared against the "valid_origin" masks for each
+ // candidate destination router.
+ //
+ int origin = -1;
+ if (ingress_iter) {
+ dx_address_t *origin_addr;
+ hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
+ if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+ dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+ origin = rref->router->mask_bit;
+ }
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ if (origin >= 0) {
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
+ else
+ dest_link = dest_node_ref->router->peer_link;
+ if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1)
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ }
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
}
- dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
}
@@ -633,10 +687,10 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
if (is_router) {
//
- // If this is a router link, put it in the router_address link-list.
+ // If this is a router link, put it in the hello_address link-list.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
- rlink->owning_addr = router->router_addr;
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
+ rlink->owning_addr = router->hello_addr;
router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
} else {
@@ -667,7 +721,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
}
dx_field_iterator_free(iter);
@@ -806,7 +860,7 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
rlink->link_direction = DX_OUTGOING;
- rlink->owning_addr = router->router_addr;
+ rlink->owning_addr = router->hello_addr;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -814,9 +868,9 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
DEQ_INIT(rlink->msg_fifo);
//
- // Add the new outgoing link to the router_address's list of links.
+ // Add the new outgoing link to the hello_address's list of links.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
//
// Index this link from the by-maskbit index so we can later find it quickly
@@ -867,6 +921,14 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
dx_container_register_node_type(dx, &router_node);
}
+ size_t dplen = 9 + strlen(area) + strlen(id);
+ direct_prefix = (char*) malloc(dplen);
+ strcpy(direct_prefix, "_topo/");
+ strcat(direct_prefix, area);
+ strcat(direct_prefix, "/");
+ strcat(direct_prefix, id);
+ strcat(direct_prefix, "/");
+
dx_router_t *router = NEW(dx_router_t);
router_node.type_context = router;
@@ -883,8 +945,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
DEQ_INIT(router->routers);
router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width());
- for (int idx = 0; idx < dx_bitmask_width(); idx++)
+ router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width());
+ for (int idx = 0; idx < dx_bitmask_width(); idx++) {
router->out_links_by_mask_bit[idx] = 0;
+ router->routers_by_mask_bit[idx] = 0;
+ }
router->neighbor_free_mask = dx_bitmask(1);
router->lock = sys_mutex();
@@ -894,10 +959,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router->pyTick = 0;
//
- // Create an address for all of the routers in the topology. It will be registered
+ // Create addresses for all of the routers in the topology. It will be registered
// locally later in the initialization sequence.
//
router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
+ router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0);
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -935,8 +1001,7 @@ void dx_router_free(dx_router_t *router)
const char *dx_router_id(const dx_dispatch_t *dx)
{
- dx_router_t *router = dx->router;
- return router->router_id;
+ return direct_prefix;
}
@@ -963,7 +1028,7 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
}
diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h
index 7f1f7486fb..a21e4f1d5c 100644
--- a/qpid/extras/dispatch/src/router_private.h
+++ b/qpid/extras/dispatch/src/router_private.h
@@ -66,10 +66,11 @@ DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
struct dx_router_node_t {
DEQ_LINKS(dx_router_node_t);
- const char *id;
+ dx_address_t *owning_addr;
int mask_bit;
dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
+ uint32_t ref_count;
dx_bitmask_t *valid_origins;
};
@@ -107,6 +108,7 @@ struct dx_address_t {
void *handler_context; // In-Process Consumer context
dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers
dx_router_ref_list_t rnodes; // Remotely-Connected Consumers
+ hash_handle_t *hash_handle; // Linkage back to the hash table entry
};
ALLOC_DECLARE(dx_address_t);
@@ -122,10 +124,12 @@ struct dx_router_t {
dx_address_list_t addrs;
hash_t *addr_hash;
dx_address_t *router_addr;
+ dx_address_t *hello_addr;
dx_router_link_list_t links;
dx_router_node_list_t routers;
dx_router_link_t **out_links_by_mask_bit;
+ dx_router_node_t **routers_by_mask_bit;
dx_bitmask_t *neighbor_free_mask;
sys_mutex_t *lock;
@@ -136,4 +140,12 @@ struct dx_router_t {
PyObject *pyTick;
};
+
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
+
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+
+
#endif
diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c
index 9930c5dde9..7ada6dad4b 100644
--- a/qpid/extras/dispatch/src/router_pynode.c
+++ b/qpid/extras/dispatch/src/router_pynode.c
@@ -34,34 +34,268 @@ typedef struct {
} RouterAdapter;
-static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
+static char *dx_add_router(dx_router_t *router, const char *address, int router_maskbit, int link_maskbit)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ return "Router bit mask out of range";
+
+ if (link_maskbit >= dx_bitmask_width() || link_maskbit < -1)
+ return "Link bit mask out of range";
+
+ sys_mutex_lock(router->lock);
+ if (router->routers_by_mask_bit[router_maskbit] != 0) {
+ sys_mutex_unlock(router->lock);
+ return "Adding router over already existing router";
+ }
+
+ if (link_maskbit >= 0 && router->out_links_by_mask_bit[link_maskbit] == 0) {
+ sys_mutex_unlock(router->lock);
+ return "Adding neighbor router with invalid link reference";
+ }
+
+ //
+ // Hash lookup the address to ensure there isn't an existing router address.
+ //
+ dx_field_iterator_t *iter = dx_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
+
+ hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ assert(addr == 0);
+
+ //
+ // Create an address record for this router and insert it in the hash table.
+ // This record will be found whenever a "foreign" topological address to this
+ // remote router is looked up.
+ //
+ addr = new_dx_address_t();
+ DEQ_ITEM_INIT(addr);
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+
+ //
+ // Create a router-node record to represent the remote router.
+ //
+ dx_router_node_t *rnode = new_dx_router_node_t();
+ DEQ_ITEM_INIT(rnode);
+ rnode->owning_addr = addr;
+ rnode->mask_bit = router_maskbit;
+ rnode->next_hop = 0;
+ rnode->peer_link = 0;
+ rnode->ref_count = 0;
+ rnode->valid_origins = dx_bitmask(0);
+
+ DEQ_INSERT_TAIL(router->routers, rnode);
+
+ //
+ // Link the router record to the address record.
+ //
+ dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+
+ //
+ // Link the router record to the router address record.
+ //
+ dx_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
+
+ //
+ // Add the router record to the mask-bit index.
+ //
+ router->routers_by_mask_bit[router_maskbit] = rnode;
+
+ //
+ // If this is a neighbor router, add the peer_link reference to the
+ // router record.
+ //
+ if (link_maskbit >= 0)
+ rnode->peer_link = router->out_links_by_mask_bit[link_maskbit];
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static char *dx_del_router(dx_router_t *router, int router_maskbit)
+{
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ return "Router bit mask out of range";
+
+ sys_mutex_lock(router->lock);
+ if (router->routers_by_mask_bit[router_maskbit] == 0) {
+ sys_mutex_unlock(router->lock);
+ return "Deleting nonexistent router";
+ }
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ dx_address_t *oaddr = rnode->owning_addr;
+ assert(oaddr);
+
+ //
+ // Unlink the router node from the address record
+ //
+ dx_router_del_node_ref_LH(&oaddr->rnodes, rnode);
+
+ //
+ // While the router node has a non-zero reference count, look for addresses
+ // to unlink the node from.
+ //
+ dx_address_t *addr = DEQ_HEAD(router->addrs);
+ while (addr && rnode->ref_count > 0) {
+ dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ addr = DEQ_NEXT(addr);
+ }
+ assert(rnode->ref_count == 0);
+
+ //
+ // Free the router node and the owning address records.
+ //
+ dx_bitmask_free(rnode->valid_origins);
+ free_dx_router_node_t(rnode);
+
+ hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
+ DEQ_REMOVE(router->addrs, oaddr);
+ hash_handle_free(oaddr->hash_handle);
+ router->routers_by_mask_bit[router_maskbit] = 0;
+ free_dx_address_t(oaddr);
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static PyObject* dx_add_remote_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
const char *address;
- int is_reachable;
- int is_neighbor;
- int link_maskbit;
int router_maskbit;
- if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor,
- &link_maskbit, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit))
return 0;
- // TODO
+ char *error = dx_add_router(router, address, router_maskbit, -1);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
Py_INCREF(Py_None);
return Py_None;
}
-static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
+static PyObject* dx_del_remote_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+
+ if (!PyArg_ParseTuple(args, "i", &router_maskbit))
+ return 0;
+
+ char *error = dx_del_router(router, router_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_set_next_hop(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+ int next_hop_maskbit;
+
+ if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit))
+ return 0;
+
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (next_hop_maskbit >= dx_bitmask_width() || next_hop_maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Next Hop bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[router_maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[next_hop_maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Next Hop Not Found");
+ return 0;
+ }
+
+ if (router_maskbit != next_hop_maskbit) {
+ dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit];
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *address;
+ int router_maskbit;
+ int link_maskbit;
+
+ if (!PyArg_ParseTuple(args, "sii", &address, &router_maskbit, &link_maskbit))
+ return 0;
+
+ char *error = dx_add_router(router, address, router_maskbit, link_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+
+ if (!PyArg_ParseTuple(args, "i", &router_maskbit))
+ return 0;
+
+ char *error = dx_del_router(router, router_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_map_destination(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
+ //dx_router_t *router = adapter->router;
+ const char *addr;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
return 0;
// TODO
@@ -71,13 +305,14 @@ static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
}
-static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
+static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
+ //dx_router_t *router = adapter->router;
+ const char *addr;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
return 0;
// TODO
@@ -88,9 +323,13 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
static PyMethodDef RouterAdapter_methods[] = {
- {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
- {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
- {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+ {"add_remote_router", dx_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
+ {"del_remote_router", dx_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"},
+ {"set_next_hop", dx_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
+ {"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"},
+ {"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"},
+ {"map_destination", dx_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
+ {"unmap_destination", dx_unmap_destination, METH_VARARGS, "Delete a destination mapping"},
{0, 0, 0, 0}
};