summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-10-15 21:05:50 +0000
committerTed Ross <tross@apache.org>2013-10-15 21:05:50 +0000
commit5748a0eabf88cf96f62c9220accb46797d4dcf4d (patch)
tree4dc00f613a0df0973ede1b54f1df5b199c51fbf8 /qpid/extras/dispatch/src
parent06eadbac0a814f1701fa8ccfc982c8e5f9b4eba7 (diff)
downloadqpid-python-5748a0eabf88cf96f62c9220accb46797d4dcf4d.tar.gz
QPID-5216
- Removed unneeded python router code - Added propagation of subscribed global addresses - Broke out address statistics to include to/from-container counts - Trace no longer optional, broke down and added loop prevention - Don't allow endpoint subscriptions to subscribe to local-class addresses git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1532528 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
-rw-r--r--qpid/extras/dispatch/src/router_agent.c2
-rw-r--r--qpid/extras/dispatch/src/router_node.c51
-rw-r--r--qpid/extras/dispatch/src/router_private.h9
-rw-r--r--qpid/extras/dispatch/src/router_pynode.c129
4 files changed, 162 insertions, 29 deletions
diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c
index 0dee4841d5..d4b719732c 100644
--- a/qpid/extras/dispatch/src/router_agent.c
+++ b/qpid/extras/dispatch/src/router_agent.c
@@ -140,6 +140,8 @@ static void dx_router_query_address(dx_router_t *router, void *cor)
dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress);
dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress);
dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit);
+ dx_agent_value_uint(cor, "deliveries-to-container", addr->deliveries_to_container);
+ dx_agent_value_uint(cor, "deliveries-from-container", addr->deliveries_from_container);
addr = DEQ_NEXT(addr);
dx_agent_value_complete(cor, addr != 0);
}
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 414d920428..84dd1c0fa7 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -107,7 +107,7 @@ void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t
* Depending on its policy, the address may be eligible for being closed out
* (i.e. Logging its terminal statistics and freeing its resources).
*/
-static void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
{
if (addr == 0)
return;
@@ -314,7 +314,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
}
-static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop)
{
dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
@@ -333,25 +333,27 @@ static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_mess
//
// If there is a trace field, append this router's ID to the trace.
//
+ dx_compose_insert_string(out_da, DX_DA_TRACE);
+ dx_compose_start_list(out_da);
if (trace) {
- dx_compose_insert_string(out_da, DX_DA_TRACE);
- dx_compose_start_list(out_da);
-
if (dx_parse_is_list(trace)) {
uint32_t idx = 0;
dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
while (trace_item) {
dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+ if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix))
+ *drop = 1;
+ dx_field_iterator_reset(iter);
dx_compose_insert_string_iterator(out_da, iter);
idx++;
trace_item = dx_parse_sub_value(trace, idx);
}
}
-
- dx_compose_insert_string(out_da, direct_prefix);
- dx_compose_end_list(out_da);
}
+ dx_compose_insert_string(out_da, direct_prefix);
+ dx_compose_end_list(out_da);
+
//
// If there is no ingress field, annotate the ingress as this router else
// keep the original field.
@@ -475,25 +477,26 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
// Interpret and update the delivery annotations of the message. As a convenience,
// this function returns the iterator to the ingress field (if it exists).
//
- dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
+ int drop = 0;
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop);
//
// Forward to the in-process handler for this message if there is one. The
// actual invocation of the handler will occur later after we've released
// the lock.
//
- if (addr->handler) {
+ if (!drop && addr->handler) {
in_process_copy = dx_message_copy(msg);
handler = addr->handler;
handler_context = addr->handler_context;
- addr->deliveries_egress++;
+ addr->deliveries_to_container++;
}
//
// If the address form is local (i.e. is prefixed by _local), don't forward
// outside of the router process.
//
- if (!is_local) {
+ if (!drop && !is_local) {
//
// Forward to all of the local links receiving this address.
//
@@ -725,6 +728,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
const char *r_src = pn_terminus_get_address(dx_link_remote_source(link));
int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link));
int is_router = dx_router_terminus_is_router(dx_link_remote_target(link));
+ int propagate = 0;
dx_field_iterator_t *iter = 0;
if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) {
@@ -745,15 +749,15 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
//
// If this is an endpoint link with a source address, make sure the address is
- // appropriate for endpoint links. If it is not a local or mobile address, (i.e.
- // a router or area address), it cannot be bound to an endpoint link.
+ // appropriate for endpoint links. If it is not mobile address, it cannot be
+ // bound to an endpoint link.
//
if(r_src && !is_router && !is_dynamic) {
iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
unsigned char prefix = dx_field_iterator_octet(iter);
dx_field_iterator_reset(iter);
- if (prefix != 'L' && prefix != 'M') {
+ if (prefix != 'M') {
dx_field_iterator_free(iter);
pn_link_close(pn_link);
dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src);
@@ -819,15 +823,26 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
}
- dx_field_iterator_free(iter);
rlink->owning_addr = addr;
dx_router_add_link_ref_LH(&addr->rlinks, rlink);
+
+ //
+ // If this is not a dynamic address and it is the first local subscription
+ // to the address, supply the address to the router module for propagation
+ // to other nodes.
+ //
+ propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
}
DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
+ if (propagate)
+ dx_router_global_added(router, iter);
+
+ if (iter)
+ dx_field_iterator_free(iter);
pn_link_open(pn_link);
return 0;
}
@@ -1059,6 +1074,8 @@ dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *are
router->dtag = 1;
router->pyRouter = 0;
router->pyTick = 0;
+ router->pyAdded = 0;
+ router->pyRemoved = 0;
//
// Create addresses for all of the routers in the topology. It will be registered
@@ -1172,7 +1189,7 @@ void dx_router_send(dx_dispatch_t *dx,
//
// Forward to all of the local links receiving this address.
//
- addr->deliveries_ingress++;
+ addr->deliveries_from_container++;
dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h
index 35664f8e29..9800189548 100644
--- a/qpid/extras/dispatch/src/router_private.h
+++ b/qpid/extras/dispatch/src/router_private.h
@@ -120,6 +120,8 @@ struct dx_address_t {
uint64_t deliveries_ingress;
uint64_t deliveries_egress;
uint64_t deliveries_transit;
+ uint64_t deliveries_to_container;
+ uint64_t deliveries_from_container;
};
ALLOC_DECLARE(dx_address_t);
@@ -150,6 +152,8 @@ struct dx_router_t {
PyObject *pyRouter;
PyObject *pyTick;
+ PyObject *pyAdded;
+ PyObject *pyRemoved;
dx_agent_class_t *class_router;
dx_agent_class_t *class_link;
@@ -158,11 +162,16 @@ struct dx_router_t {
};
+
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr);
void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter);
+void dx_router_global_removed(dx_router_t *router, const char *addr);
+
#endif
diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c
index 0638c06c3e..358fa50447 100644
--- a/qpid/extras/dispatch/src/router_pynode.c
+++ b/qpid/extras/dispatch/src/router_pynode.c
@@ -347,15 +347,48 @@ static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args)
static PyObject* dx_map_destination(PyObject *self, PyObject *args)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
- const char *addr;
- int router_maskbit;
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+ dx_address_t *addr;
+ dx_field_iterator_t *iter;
- if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
- // TODO
+ if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
+ DEQ_ITEM_INIT(addr);
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ }
+ dx_field_iterator_free(iter);
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+
+ sys_mutex_unlock(router->lock);
+
+ dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -364,15 +397,43 @@ static PyObject* dx_map_destination(PyObject *self, PyObject *args)
static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
- const char *addr;
- int router_maskbit;
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+ dx_address_t *addr;
- if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
- // TODO
+ if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ dx_field_iterator_t *iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ dx_field_iterator_free(iter);
+
+ if (!addr) {
+ PyErr_SetString(PyExc_Exception, "Address Not Found");
+ sys_mutex_unlock(router->lock);
+ return 0;
+ }
+
+ dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ dx_router_check_addr_LH(router, addr);
+ sys_mutex_unlock(router->lock);
+
+ dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -534,6 +595,18 @@ void dx_router_python_setup(dx_router_t *router)
dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
return;
}
+
+ router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded");
+ if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) {
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method");
+ return;
+ }
+
+ router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved");
+ if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) {
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
+ return;
+ }
}
@@ -557,3 +630,35 @@ void dx_pyrouter_tick(dx_router_t *router)
}
}
+
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter)
+{
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ char *address = (char*) dx_field_iterator_copy(iter);
+
+ dx_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
+ pValue = PyObject_CallObject(router->pyAdded, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
+ dx_python_unlock();
+
+ free(address);
+ }
+}
+
+
+void dx_router_global_removed(dx_router_t *router, const char *addr)
+{
+}
+