diff options
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 40f827f4cd..90e2e0c9dc 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -448,6 +448,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del in_process_copy = dx_message_copy(msg); handler = addr->handler; handler_context = addr->handler_context; + addr->deliveries_egress++; } // @@ -472,6 +473,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; + addr->deliveries_egress++; dx_link_activate(dest_link_ref->link->link); dest_link_ref = DEQ_NEXT(dest_link_ref); } @@ -542,6 +544,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (fanout == 1) re->delivery = delivery; + addr->deliveries_transit++; dx_link_activate(dest_link->link); } } @@ -739,9 +742,8 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + memset(addr, 0, sizeof(dx_address_t)); DEQ_ITEM_INIT(addr); - addr->handler = 0; - addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); @@ -1004,12 +1006,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) } -void dx_router_setup_agent(dx_dispatch_t *dx) +void dx_router_setup_late(dx_dispatch_t *dx) { + dx_router_agent_setup(dx->router); dx_router_python_setup(dx->router); dx_timer_schedule(dx->router->timer, 1000); - - // TODO } @@ -1046,9 +1047,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + memset(addr, 0, sizeof(dx_address_t)); DEQ_ITEM_INIT(addr); - addr->handler = 0; - addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); @@ -1088,6 +1088,7 @@ void dx_router_send(dx_dispatch_t *dx, // // Forward to all of the local links receiving this address. // + addr->deliveries_ingress++; dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); @@ -1099,11 +1100,14 @@ void dx_router_send(dx_dispatch_t *dx, DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); dx_link_activate(dest_link_ref->link->link); + addr->deliveries_egress++; + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. + // FIXME - use link-mask to avoid dups. // dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); dx_router_link_t *dest_link; @@ -1121,6 +1125,7 @@ void dx_router_send(dx_dispatch_t *dx, re->disposition = 0; DEQ_INSERT_TAIL(dest_link->msg_fifo, re); dx_link_activate(dest_link->link); + addr->deliveries_transit++; } dest_node_ref = DEQ_NEXT(dest_node_ref); } |
