diff options
| author | Ted Ross <tross@apache.org> | 2013-10-15 21:05:50 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-10-15 21:05:50 +0000 |
| commit | 5748a0eabf88cf96f62c9220accb46797d4dcf4d (patch) | |
| tree | 4dc00f613a0df0973ede1b54f1df5b199c51fbf8 /qpid/extras/dispatch/src | |
| parent | 06eadbac0a814f1701fa8ccfc982c8e5f9b4eba7 (diff) | |
| download | qpid-python-5748a0eabf88cf96f62c9220accb46797d4dcf4d.tar.gz | |
QPID-5216
- Removed unneeded python router code
- Added propagation of subscribed global addresses
- Broke out address statistics to include to/from-container counts
- Trace no longer optional, broke down and added loop prevention
- Don't allow endpoint subscriptions to subscribe to local-class addresses
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1532528 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
| -rw-r--r-- | qpid/extras/dispatch/src/router_agent.c | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 51 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_private.h | 9 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_pynode.c | 129 |
4 files changed, 162 insertions, 29 deletions
diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c index 0dee4841d5..d4b719732c 100644 --- a/qpid/extras/dispatch/src/router_agent.c +++ b/qpid/extras/dispatch/src/router_agent.c @@ -140,6 +140,8 @@ static void dx_router_query_address(dx_router_t *router, void *cor) dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress); dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress); dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit); + dx_agent_value_uint(cor, "deliveries-to-container", addr->deliveries_to_container); + dx_agent_value_uint(cor, "deliveries-from-container", addr->deliveries_from_container); addr = DEQ_NEXT(addr); dx_agent_value_complete(cor, addr != 0); } diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 414d920428..84dd1c0fa7 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -107,7 +107,7 @@ void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t * Depending on its policy, the address may be eligible for being closed out * (i.e. Logging its terminal statistics and freeing its resources). */ -static void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr) +void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr) { if (addr == 0) return; @@ -314,7 +314,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link) } -static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg) +static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop) { dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); @@ -333,25 +333,27 @@ static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_mess // // If there is a trace field, append this router's ID to the trace. // + dx_compose_insert_string(out_da, DX_DA_TRACE); + dx_compose_start_list(out_da); if (trace) { - dx_compose_insert_string(out_da, DX_DA_TRACE); - dx_compose_start_list(out_da); - if (dx_parse_is_list(trace)) { uint32_t idx = 0; dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx); while (trace_item) { dx_field_iterator_t *iter = dx_parse_raw(trace_item); + if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix)) + *drop = 1; + dx_field_iterator_reset(iter); dx_compose_insert_string_iterator(out_da, iter); idx++; trace_item = dx_parse_sub_value(trace, idx); } } - - dx_compose_insert_string(out_da, direct_prefix); - dx_compose_end_list(out_da); } + dx_compose_insert_string(out_da, direct_prefix); + dx_compose_end_list(out_da); + // // If there is no ingress field, annotate the ingress as this router else // keep the original field. @@ -475,25 +477,26 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // Interpret and update the delivery annotations of the message. As a convenience, // this function returns the iterator to the ingress field (if it exists). // - dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg); + int drop = 0; + dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop); // // Forward to the in-process handler for this message if there is one. The // actual invocation of the handler will occur later after we've released // the lock. // - if (addr->handler) { + if (!drop && addr->handler) { in_process_copy = dx_message_copy(msg); handler = addr->handler; handler_context = addr->handler_context; - addr->deliveries_egress++; + addr->deliveries_to_container++; } // // If the address form is local (i.e. is prefixed by _local), don't forward // outside of the router process. // - if (!is_local) { + if (!drop && !is_local) { // // Forward to all of the local links receiving this address. // @@ -725,6 +728,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); + int propagate = 0; dx_field_iterator_t *iter = 0; if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) { @@ -745,15 +749,15 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) // // If this is an endpoint link with a source address, make sure the address is - // appropriate for endpoint links. If it is not a local or mobile address, (i.e. - // a router or area address), it cannot be bound to an endpoint link. + // appropriate for endpoint links. If it is not mobile address, it cannot be + // bound to an endpoint link. // if(r_src && !is_router && !is_dynamic) { iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); unsigned char prefix = dx_field_iterator_octet(iter); dx_field_iterator_reset(iter); - if (prefix != 'L' && prefix != 'M') { + if (prefix != 'M') { dx_field_iterator_free(iter); pn_link_close(pn_link); dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src); @@ -819,15 +823,26 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(router->addrs, addr); } - dx_field_iterator_free(iter); rlink->owning_addr = addr; dx_router_add_link_ref_LH(&addr->rlinks, rlink); + + // + // If this is not a dynamic address and it is the first local subscription + // to the address, supply the address to the router module for propagation + // to other nodes. + // + propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1); } DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); + if (propagate) + dx_router_global_added(router, iter); + + if (iter) + dx_field_iterator_free(iter); pn_link_open(pn_link); return 0; } @@ -1059,6 +1074,8 @@ dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *are router->dtag = 1; router->pyRouter = 0; router->pyTick = 0; + router->pyAdded = 0; + router->pyRemoved = 0; // // Create addresses for all of the routers in the topology. It will be registered @@ -1172,7 +1189,7 @@ void dx_router_send(dx_dispatch_t *dx, // // Forward to all of the local links receiving this address. // - addr->deliveries_ingress++; + addr->deliveries_from_container++; dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h index 35664f8e29..9800189548 100644 --- a/qpid/extras/dispatch/src/router_private.h +++ b/qpid/extras/dispatch/src/router_private.h @@ -120,6 +120,8 @@ struct dx_address_t { uint64_t deliveries_ingress; uint64_t deliveries_egress; uint64_t deliveries_transit; + uint64_t deliveries_to_container; + uint64_t deliveries_from_container; }; ALLOC_DECLARE(dx_address_t); @@ -150,6 +152,8 @@ struct dx_router_t { PyObject *pyRouter; PyObject *pyTick; + PyObject *pyAdded; + PyObject *pyRemoved; dx_agent_class_t *class_router; dx_agent_class_t *class_link; @@ -158,11 +162,16 @@ struct dx_router_t { }; + +void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr); void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); +void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter); +void dx_router_global_removed(dx_router_t *router, const char *addr); + #endif diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c index 0638c06c3e..358fa50447 100644 --- a/qpid/extras/dispatch/src/router_pynode.c +++ b/qpid/extras/dispatch/src/router_pynode.c @@ -347,15 +347,48 @@ static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args) static PyObject* dx_map_destination(PyObject *self, PyObject *args) { - //RouterAdapter *adapter = (RouterAdapter*) self; - //dx_router_t *router = adapter->router; - const char *addr; - int router_maskbit; + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + const char *addr_string; + int maskbit; + dx_address_t *addr; + dx_field_iterator_t *iter; - if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit)) + if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit)) return 0; - // TODO + if (maskbit >= dx_bitmask_width() || maskbit < 0) { + PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); + return 0; + } + + if (router->routers_by_mask_bit[maskbit] == 0) { + PyErr_SetString(PyExc_Exception, "Router Not Found"); + return 0; + } + + iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH); + + sys_mutex_lock(router->lock); + dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + memset(addr, 0, sizeof(dx_address_t)); + DEQ_ITEM_INIT(addr); + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(router->addrs, addr); + } + dx_field_iterator_free(iter); + + dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit]; + dx_router_add_node_ref_LH(&addr->rnodes, rnode); + + sys_mutex_unlock(router->lock); + + dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit); Py_INCREF(Py_None); return Py_None; @@ -364,15 +397,43 @@ static PyObject* dx_map_destination(PyObject *self, PyObject *args) static PyObject* dx_unmap_destination(PyObject *self, PyObject *args) { - //RouterAdapter *adapter = (RouterAdapter*) self; - //dx_router_t *router = adapter->router; - const char *addr; - int router_maskbit; + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + const char *addr_string; + int maskbit; + dx_address_t *addr; - if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit)) + if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit)) return 0; - // TODO + if (maskbit >= dx_bitmask_width() || maskbit < 0) { + PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); + return 0; + } + + if (router->routers_by_mask_bit[maskbit] == 0) { + PyErr_SetString(PyExc_Exception, "Router Not Found"); + return 0; + } + + dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit]; + dx_field_iterator_t *iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH); + + sys_mutex_lock(router->lock); + dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); + dx_field_iterator_free(iter); + + if (!addr) { + PyErr_SetString(PyExc_Exception, "Address Not Found"); + sys_mutex_unlock(router->lock); + return 0; + } + + dx_router_del_node_ref_LH(&addr->rnodes, rnode); + dx_router_check_addr_LH(router, addr); + sys_mutex_unlock(router->lock); + + dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit); Py_INCREF(Py_None); return Py_None; @@ -534,6 +595,18 @@ void dx_router_python_setup(dx_router_t *router) dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method"); return; } + + router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded"); + if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) { + dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method"); + return; + } + + router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved"); + if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) { + dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method"); + return; + } } @@ -557,3 +630,35 @@ void dx_pyrouter_tick(dx_router_t *router) } } + +void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter) +{ + PyObject *pArgs; + PyObject *pValue; + + if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) { + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + char *address = (char*) dx_field_iterator_copy(iter); + + dx_python_lock(); + pArgs = PyTuple_New(1); + PyTuple_SetItem(pArgs, 0, PyString_FromString(address)); + pValue = PyObject_CallObject(router->pyAdded, pArgs); + if (PyErr_Occurred()) { + PyErr_Print(); + } + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } + dx_python_unlock(); + + free(address); + } +} + + +void dx_router_global_removed(dx_router_t *router, const char *addr) +{ +} + |
