diff options
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 472 |
1 files changed, 264 insertions, 208 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index d2704c4bd5..dc63a45451 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -21,17 +21,18 @@ #include <stdio.h> #include <string.h> #include <stdbool.h> +#include <stdlib.h> #include <qpid/dispatch.h> #include "dispatch_private.h" +#include "router_private.h" static char *module = "ROUTER"; static void dx_router_python_setup(dx_router_t *router); static void dx_pyrouter_tick(dx_router_t *router); -static char *router_address = "_local/qdxrouter"; -static char *local_prefix = "_local/"; -//static char *topo_prefix = "_topo/"; +static char *local_prefix = "_local/"; +static char *topo_prefix = "_topo/"; /** * Address Types and Processing: @@ -48,86 +49,82 @@ static char *local_prefix = "_local/"; * <mobile> M<mobile> forward handler */ +ALLOC_DEFINE(dx_routed_event_t); +ALLOC_DEFINE(dx_router_link_t); +ALLOC_DEFINE(dx_router_node_t); +ALLOC_DEFINE(dx_router_ref_t); +ALLOC_DEFINE(dx_router_link_ref_t); +ALLOC_DEFINE(dx_address_t); -typedef struct dx_router_link_t dx_router_link_t; -typedef struct dx_router_node_t dx_router_node_t; +static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + dx_router_link_ref_t *ref = new_dx_router_link_ref_t(); + DEQ_ITEM_INIT(ref); + ref->link = link; + link->ref = ref; + DEQ_INSERT_TAIL(*ref_list, ref); +} -typedef enum { - DX_LINK_ENDPOINT, // A link to a connected endpoint - DX_LINK_ROUTER, // A link to a peer router in the same area - DX_LINK_AREA // A link to a peer router in a different area (area boundary) -} dx_link_type_t; +static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + if (link->ref) { + DEQ_REMOVE(*ref_list, link->ref); + free_dx_router_link_ref_t(link->ref); + link->ref = 0; + } +} -typedef struct dx_routed_event_t { - DEQ_LINKS(struct dx_routed_event_t); - dx_delivery_t *delivery; - dx_message_t *message; - bool settle; - uint64_t disposition; -} dx_routed_event_t; -ALLOC_DECLARE(dx_routed_event_t); -ALLOC_DEFINE(dx_routed_event_t); -DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); - - -struct dx_router_link_t { - DEQ_LINKS(dx_router_link_t); - dx_direction_t link_direction; - dx_link_type_t link_type; - dx_address_t *owning_addr; // [ref] Address record that owns this link - dx_link_t *link; // [own] Link pointer - dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link - dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link - dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) - dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries -}; +/** + * Check an address to see if it no longer has any associated destinations. + * Depending on its policy, the address may be eligible for being closed out + * (i.e. Logging its terminal statistics and freeing its resources). + */ +static void dx_router_check_addr_LH(dx_address_t *addr) +{ + // TODO +} -ALLOC_DECLARE(dx_router_link_t); -ALLOC_DEFINE(dx_router_link_t); -DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); - -struct dx_router_node_t { - DEQ_LINKS(dx_router_node_t); - const char *id; - dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node - dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node - // list of valid origins (pointers to router_node) - (bit masks?) -}; -ALLOC_DECLARE(dx_router_node_t); -ALLOC_DEFINE(dx_router_node_t); -DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); +/** + * Determine whether a terminus has router capability + */ +static int dx_router_terminus_is_router(pn_terminus_t *term) +{ + pn_data_t *cap = pn_terminus_capabilities(term); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + if (sym.size == strlen(DX_CAPABILITY_ROUTER) && + strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0) + return 1; + } -struct dx_address_t { - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; - dx_router_link_list_t rlinks; // Locally-Connected Consumers - dx_router_node_list_t rnodes; // Remotely-Connected Consumers -}; + return 0; +} -ALLOC_DECLARE(dx_address_t); -ALLOC_DEFINE(dx_address_t); +static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length) +{ + static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; + char discriminator[11]; + long int rnd = random(); + int idx; + + for (idx = 0; idx < 10; idx++) + discriminator[idx] = table[(rnd >> (idx * 6)) & 63]; + discriminator[idx] = '\0'; -struct dx_router_t { - dx_dispatch_t *dx; - const char *router_area; - const char *router_id; - dx_node_t *node; - dx_router_link_list_t in_links; - dx_router_node_list_t routers; - dx_message_list_t in_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - hash_t *out_hash; - uint64_t dtag; - PyObject *pyRouter; - PyObject *pyTick; -}; + snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator); +} + + +static int dx_router_find_mask_bit(dx_link_t *link) +{ + return 0; // TODO +} /** @@ -191,7 +188,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link) DEQ_REMOVE_HEAD(to_send); // - // Get a delivery for the send. This will be the current deliver on the link. + // Get a delivery for the send. This will be the current delivery on the link. // tag++; delivery = dx_delivery(link, pn_dtag((char*) &tag, 8)); @@ -259,8 +256,8 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) dx_parsed_field_t *ingress = 0; if (in_da) { - trace = dx_parse_value_by_key(in_da, "qdx.trace"); - ingress = dx_parse_value_by_key(in_da, "qdx.ingress"); + trace = dx_parse_value_by_key(in_da, DX_DA_TRACE); + ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS); } dx_compose_start_map(out_da); @@ -269,7 +266,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // If there is a trace field, append this router's ID to the trace. // if (trace && dx_parse_is_list(trace)) { - dx_compose_insert_string(out_da, "qdx.trace"); + dx_compose_insert_string(out_da, DX_DA_TRACE); dx_compose_start_list(out_da); uint32_t idx = 0; @@ -289,7 +286,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // If there is no ingress field, annotate the ingress as this router else // keep the original field. // - dx_compose_insert_string(out_da, "qdx.ingress"); + dx_compose_insert_string(out_da, DX_DA_INGRESS); if (ingress && dx_parse_is_scalar(ingress)) { dx_field_iterator_t *iter = dx_parse_raw(ingress); dx_compose_insert_string_iterator(out_da, iter); @@ -380,7 +377,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - hash_retrieve(router->out_hash, iter, (void*) &addr); + hash_retrieve(router->addr_hash, iter, (void*) &addr); dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); int is_local = dx_field_iterator_prefix(iter, local_prefix); dx_field_iterator_free(iter); @@ -415,33 +412,34 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); fanout++; if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -457,7 +455,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } } @@ -487,7 +485,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // Invoke the in-process handler now that the lock is released. // if (handler) - handler(handler_context, in_process_copy); + handler(handler_context, in_process_copy, rlink->mask_bit); } @@ -541,25 +539,28 @@ static int router_incoming_link_handler(void* context, dx_link_t *link) dx_router_t *router = (dx_router_t*) context; dx_router_link_t *rlink = new_dx_router_link_t(); pn_link_t *pn_link = dx_link_pn(link); + int is_router = dx_router_terminus_is_router(dx_link_remote_source(link)); DEQ_ITEM_INIT(rlink); + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; + rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; rlink->link_direction = DX_INCOMING; - rlink->link_type = DX_LINK_ENDPOINT; rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); pn_link_flow(pn_link, 1000); pn_link_open(pn_link); @@ -579,52 +580,90 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) { dx_router_t *router = (dx_router_t*) context; pn_link_t *pn_link = dx_link_pn(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); + int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); + int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); - if (!r_tgt) { + // + // If this link is not a router link and it has no source address, we can't + // accept it. + // + if (r_src == 0 && !is_router && !is_dynamic) { pn_link_close(pn_link); return 0; } - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - dx_router_link_t *rlink = new_dx_router_link_t(); - - int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address); - + // + // Create a router_link record for this link. Some of the fields will be + // modified in the different cases below. + // + dx_router_link_t *rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); - - dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + + if (is_router) { + // + // If this is a router link, put it in the router_address link-list. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); + rlink->owning_addr = router->router_addr; + + } else { + // + // If this is an endpoint link, check the source. If it is dynamic, we will + // assign it an ephemeral and routable address. If it has a non-dymanic + // address, that address needs to be set up in the address list. + // + dx_field_iterator_t *iter; + char temp_addr[1000]; + dx_address_t *addr; + + if (is_dynamic) { + dx_router_generate_temp_addr(router, temp_addr, 1000); + iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); + pn_terminus_set_address(dx_link_source(link), temp_addr); + dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr); + } else { + iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); + dx_log(module, LOG_INFO, "Registered local address: %s", r_src); + } + + hash_retrieve(router->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->addr_hash, iter, addr); + DEQ_INSERT_TAIL(router->addrs, addr); + } + dx_field_iterator_free(iter); + + rlink->owning_addr = addr; + dx_router_add_link_ref_LH(&addr->rlinks, rlink); } - dx_field_iterator_free(iter); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + DEQ_INSERT_TAIL(router->links, rlink); + sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); pn_link_open(pn_link); - sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } @@ -634,40 +673,37 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) */ static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_router_t *router = (dx_router_t*) context; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); if (!rlink) return 0; sys_mutex_lock(router->lock); - if (pn_link_is_sender(pn_link)) { - DEQ_REMOVE(rlink->owning_addr->rlinks, rlink); - - if ((rlink->owning_addr->handler == 0) && - (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) && - (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) { - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - if (iter) { - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (addr == rlink->owning_addr) { - hash_remove(router->out_hash, iter); - free_dx_router_link_t(rlink); - free_dx_address_t(addr); - dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); - } - dx_field_iterator_free(iter); - } - } - } else { - DEQ_REMOVE(router->in_links, rlink); - free_dx_router_link_t(rlink); + + // + // If the link is outgoing, we must disassociate it from its address. + // + if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) { + dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink); + dx_router_check_addr_LH(rlink->owning_addr); } + // + // If this is an incoming inter-router link, we must free the mask_bit. + // + if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING) + dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit); + + // + // Remove the link from the master list-of-links. + // + DEQ_REMOVE(router->links, rlink); sys_mutex_unlock(router->lock); + + // TODO - wrap the free to handle the recursive items + free_dx_router_link_t(rlink); + return 0; } @@ -683,24 +719,37 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co // Ignore otherwise dx_router_t *router = (dx_router_t*) type_context; - dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH); dx_link_t *sender; dx_link_t *receiver; dx_router_link_t *rlink; + int mask_bit = 0; + size_t clen = strlen(DX_CAPABILITY_ROUTER); + + // + // Allocate a mask bit to designate the pair of links connected to the neighbor router + // + sys_mutex_lock(router->lock); + if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { + dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); + } else { + sys_mutex_unlock(router->lock); + dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); + return; + } // - // Create an incoming link and put it in the in-links collection. The address - // of the remote source of this link is '_local/qdxrouter'. + // Create an incoming link with router source capability // - receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx"); - pn_terminus_set_address(dx_link_remote_source(receiver), router_address); - pn_terminus_set_address(dx_link_target(receiver), router_address); + receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_INCOMING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_INCOMING; rlink->owning_addr = 0; rlink->link = receiver; rlink->connected_link = 0; @@ -709,53 +758,40 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co DEQ_INIT(rlink->msg_fifo); dx_link_set_context(receiver, rlink); - - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); - sys_mutex_unlock(router->lock); + DEQ_INSERT_TAIL(router->links, rlink); // - // Create an outgoing link with a local source of '_local/qdxrouter' and place - // it in the routing table. + // Create an outgoing link with router target capability // - sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx"); - pn_terminus_set_address(dx_link_remote_target(sender), router_address); - pn_terminus_set_address(dx_link_source(sender), router_address); + sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = router->router_addr; rlink->link = sender; rlink->connected_link = 0; rlink->peer_link = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); - dx_link_set_context(sender, rlink); - - dx_address_t *addr; - - sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, aiter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, aiter, addr); - } + // + // Add the new outgoing link to the router_address's list of links. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + dx_link_set_context(sender, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); pn_link_open(dx_link_pn(receiver)); pn_link_open(dx_link_pn(sender)); pn_link_flow(dx_link_pn(receiver), 1000); - dx_field_iterator_free(aiter); } @@ -767,7 +803,6 @@ static void dx_router_timer_handler(void *context) // Periodic processing. // dx_pyrouter_tick(router); - dx_timer_schedule(router->timer, 1000); } @@ -797,20 +832,29 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) router_node.type_context = router; + dx->router = router; router->dx = dx; router->router_area = area; router->router_id = id; router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); - DEQ_INIT(router->in_links); + DEQ_INIT(router->addrs); + router->addr_hash = hash(10, 32, 0); + + DEQ_INIT(router->links); DEQ_INIT(router->routers); - DEQ_INIT(router->in_fifo); - router->lock = sys_mutex(); - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; - router->pyTick = 0; + router->neighbor_free_mask = dx_bitmask(1); + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; + + // + // Create an address for all of the routers in the topology. It will be registered + // locally later in the initialization sequence. + // + router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0); // // Inform the field iterator module of this router's id and area. The field iterator @@ -824,7 +868,6 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) dx_python_start(); dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id); - return router; } @@ -869,14 +912,17 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); + hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); addr->handler = 0; addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + hash_insert(router->addr_hash, iter, addr); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(router->addrs, addr); } dx_field_iterator_free(iter); @@ -885,7 +931,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); + if (handler) + dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address); return addr; } @@ -905,34 +952,35 @@ void dx_router_send(dx_dispatch_t *dx, dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, address, (void*) &addr); + hash_retrieve(router->addr_hash, address, (void*) &addr); if (addr) { // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -943,7 +991,7 @@ void dx_router_send(dx_dispatch_t *dx, DEQ_INSERT_TAIL(dest_link->msg_fifo, re); dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? @@ -977,8 +1025,11 @@ static PyObject* dx_router_node_updated(PyObject *self, PyObject *args) const char *address; int is_reachable; int is_neighbor; + int link_maskbit; + int router_maskbit; - if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor)) + if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor, + &link_maskbit, &router_maskbit)) return 0; // TODO @@ -1099,6 +1150,7 @@ static void dx_router_python_setup(dx_router_t *router) PyObject* pName; PyObject* pId; PyObject* pArea; + PyObject* pMaxRouters; PyObject* pModule; PyObject* pClass; PyObject* pArgs; @@ -1126,7 +1178,7 @@ static void dx_router_python_setup(dx_router_t *router) // // Constructor Arguments for RouterEngine // - pArgs = PyTuple_New(3); + pArgs = PyTuple_New(4); // arg 0: adapter instance PyTuple_SetItem(pArgs, 0, adapterInstance); @@ -1135,10 +1187,14 @@ static void dx_router_python_setup(dx_router_t *router) pId = PyString_FromString(router->router_id); PyTuple_SetItem(pArgs, 1, pId); - // arg 2: area id + // arg 2: area_id pArea = PyString_FromString(router->router_area); PyTuple_SetItem(pArgs, 2, pArea); + // arg 3: max_routers + pMaxRouters = PyInt_FromLong((long) dx_bitmask_width()); + PyTuple_SetItem(pArgs, 3, pMaxRouters); + // // Instantiate the router // |
