summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/router_node.c
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
-rw-r--r--qpid/extras/dispatch/src/router_node.c19
1 files changed, 12 insertions, 7 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 40f827f4cd..90e2e0c9dc 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -448,6 +448,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
in_process_copy = dx_message_copy(msg);
handler = addr->handler;
handler_context = addr->handler_context;
+ addr->deliveries_egress++;
}
//
@@ -472,6 +473,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
+ addr->deliveries_egress++;
dx_link_activate(dest_link_ref->link->link);
dest_link_ref = DEQ_NEXT(dest_link_ref);
}
@@ -542,6 +544,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
if (fanout == 1)
re->delivery = delivery;
+ addr->deliveries_transit++;
dx_link_activate(dest_link->link);
}
}
@@ -739,9 +742,8 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
DEQ_ITEM_INIT(addr);
- addr->handler = 0;
- addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
@@ -1004,12 +1006,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
}
-void dx_router_setup_agent(dx_dispatch_t *dx)
+void dx_router_setup_late(dx_dispatch_t *dx)
{
+ dx_router_agent_setup(dx->router);
dx_router_python_setup(dx->router);
dx_timer_schedule(dx->router->timer, 1000);
-
- // TODO
}
@@ -1046,9 +1047,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
DEQ_ITEM_INIT(addr);
- addr->handler = 0;
- addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
@@ -1088,6 +1088,7 @@ void dx_router_send(dx_dispatch_t *dx,
//
// Forward to all of the local links receiving this address.
//
+ addr->deliveries_ingress++;
dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
@@ -1099,11 +1100,14 @@ void dx_router_send(dx_dispatch_t *dx,
DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
dx_link_activate(dest_link_ref->link->link);
+ addr->deliveries_egress++;
+
dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
+ // FIXME - use link-mask to avoid dups.
//
dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
dx_router_link_t *dest_link;
@@ -1121,6 +1125,7 @@ void dx_router_send(dx_dispatch_t *dx,
re->disposition = 0;
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
dx_link_activate(dest_link->link);
+ addr->deliveries_transit++;
}
dest_node_ref = DEQ_NEXT(dest_node_ref);
}