diff options
| author | Ted Ross <tross@apache.org> | 2013-09-26 21:14:59 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-09-26 21:14:59 +0000 |
| commit | 966b9314395d655f7fea8db90e47ae089d81ee6f (patch) | |
| tree | c32473d2cecc4bc97e827ec74a8d10def4c67907 /qpid/extras/dispatch/src | |
| parent | ec0e68b606c81f92c33b842384d7ca8a5152d91a (diff) | |
| download | qpid-python-966b9314395d655f7fea8db90e47ae089d81ee6f.tar.gz | |
QPID-5173
QPID-5045
QPID-5181
- Major refactor of the routing data structure in preparation for multi-router operation.
- Fixed the CMake bug in QPID-5173
- Added Dynamic assignment of routable addresses for outbound links (QPID-5181)
- Changed the indentation of the Python code from 2 spaces to 4 spaces.
- Reduced the default log level to make the console less chatty.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1526694 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
| -rw-r--r-- | qpid/extras/dispatch/src/agent.c | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/amqp.c | 30 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/bitmask.c | 124 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message_private.h | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/python_embedded.c | 9 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 472 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_private.h | 128 |
7 files changed, 553 insertions, 214 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 557410910c..2063b8fcc4 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -240,7 +240,7 @@ static void dx_agent_deferred_handler(void *context) } -static void dx_agent_rx_handler(void *context, dx_message_t *msg) +static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id) { dx_agent_t *agent = (dx_agent_t*) context; dx_message_t *copy = dx_message_copy(msg); diff --git a/qpid/extras/dispatch/src/amqp.c b/qpid/extras/dispatch/src/amqp.c new file mode 100644 index 0000000000..6a8545b757 --- /dev/null +++ b/qpid/extras/dispatch/src/amqp.c @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/amqp.h> + +const char * const DX_DA_INGRESS = "qdx.ingress"; +const char * const DX_DA_TRACE = "qdx.trace"; +const char * const DX_DA_TO = "qdx.to"; + +const char * const DX_CAPABILITY_ROUTER = "qdx.router"; + +const char * const DX_INTERNODE_LINK_NAME_1 = "qdx.internode.1"; +const char * const DX_INTERNODE_LINK_NAME_2 = "qdx.internode.2"; + diff --git a/qpid/extras/dispatch/src/bitmask.c b/qpid/extras/dispatch/src/bitmask.c new file mode 100644 index 0000000000..5341f8d83a --- /dev/null +++ b/qpid/extras/dispatch/src/bitmask.c @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/bitmask.h> +#include <qpid/dispatch/alloc.h> +#include <assert.h> + +#define DX_BITMASK_LONGS 16 +#define DX_BITMASK_BITS (DX_BITMASK_LONGS * 64) + +struct dx_bitmask_t { + uint64_t array[DX_BITMASK_LONGS]; + int first_set; +}; + +ALLOC_DECLARE(dx_bitmask_t); +ALLOC_DEFINE(dx_bitmask_t); + +#define MASK_INDEX(num) (num / 64) +#define MASK_ONEHOT(num) (1 << (num % 64)) +#define FIRST_NONE -1 +#define FIRST_UNKNOWN -2 + + +int dx_bitmask_width() +{ + return DX_BITMASK_BITS; +} + + +dx_bitmask_t *dx_bitmask(int initial) +{ + dx_bitmask_t *b = new_dx_bitmask_t(); + if (initial) + dx_bitmask_set_all(b); + else + dx_bitmask_clear_all(b); + return b; +} + + +void dx_bitmask_free(dx_bitmask_t *b) +{ + free_dx_bitmask_t(b); +} + + +void dx_bitmask_set_all(dx_bitmask_t *b) +{ + for (int i = 0; i < DX_BITMASK_LONGS; i++) + b->array[i] = 0xFFFFFFFFFFFFFFFF; + b->first_set = 0; +} + + +void dx_bitmask_clear_all(dx_bitmask_t *b) +{ + for (int i = 0; i < DX_BITMASK_LONGS; i++) + b->array[i] = 0; + b->first_set = FIRST_NONE; +} + + +void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum) +{ + assert(bitnum < DX_BITMASK_BITS); + b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum); + if (b->first_set > bitnum) + b->first_set = bitnum; +} + + +void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum) +{ + assert(bitnum < DX_BITMASK_BITS); + b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum)); + if (b->first_set == bitnum) + b->first_set = FIRST_UNKNOWN; +} + + +int dx_bitmask_value(dx_bitmask_t *b, int bitnum) +{ + return (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) ? 1 : 0; +} + + +int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum) +{ + if (b->first_set == FIRST_UNKNOWN) { + b->first_set = FIRST_NONE; + for (int i = 0; i < DX_BITMASK_LONGS; i++) + if (b->array[i]) { + for (int j = 0; j < 64; j++) + if ((1 << j) & b->array[i]) { + b->first_set = i * 64 + j; + break; + } + break; + } + } + + if (b->first_set == FIRST_NONE) + return 0; + *bitnum = b->first_set; + return 1; +} + diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h index 27b81bbb4c..c57cea5f0d 100644 --- a/qpid/extras/dispatch/src/message_private.h +++ b/qpid/extras/dispatch/src/message_private.h @@ -67,7 +67,7 @@ typedef struct { sys_mutex_t *lock; uint32_t ref_count; // The number of messages referencing this dx_buffer_list_t buffers; // The buffer chain containing the message - dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations + dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT) dx_field_location_t section_message_header; // The message header list dx_field_location_t section_delivery_annotation; // The delivery annotation map dx_field_location_t section_message_annotation; // The message annotation map diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c index 0b0cc11025..2db8f583de 100644 --- a/qpid/extras/dispatch/src/python_embedded.c +++ b/qpid/extras/dispatch/src/python_embedded.c @@ -402,7 +402,7 @@ typedef struct { } IoAdapter; -static void dx_io_rx_handler(void *context, dx_message_t *msg) +static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id) { IoAdapter *self = (IoAdapter*) context; @@ -454,9 +454,10 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg) PyObject *pAP = dx_field_to_py(ap_map); PyObject *pBody = dx_field_to_py(body_map); - PyObject *pArgs = PyTuple_New(2); + PyObject *pArgs = PyTuple_New(3); PyTuple_SetItem(pArgs, 0, pAP); PyTuple_SetItem(pArgs, 1, pBody); + PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) link_id)); PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs); Py_DECREF(pArgs); @@ -507,10 +508,10 @@ static PyObject* dx_python_send(PyObject *self, PyObject *args) field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field); dx_compose_start_map(field); - dx_compose_insert_string(field, "qdx.ingress"); + dx_compose_insert_string(field, DX_DA_INGRESS); dx_compose_insert_string(field, dx_router_id(ioa->dx)); - dx_compose_insert_string(field, "qdx.trace"); + dx_compose_insert_string(field, DX_DA_TRACE); dx_compose_start_list(field); dx_compose_insert_string(field, dx_router_id(ioa->dx)); dx_compose_end_list(field); diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index d2704c4bd5..dc63a45451 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -21,17 +21,18 @@ #include <stdio.h> #include <string.h> #include <stdbool.h> +#include <stdlib.h> #include <qpid/dispatch.h> #include "dispatch_private.h" +#include "router_private.h" static char *module = "ROUTER"; static void dx_router_python_setup(dx_router_t *router); static void dx_pyrouter_tick(dx_router_t *router); -static char *router_address = "_local/qdxrouter"; -static char *local_prefix = "_local/"; -//static char *topo_prefix = "_topo/"; +static char *local_prefix = "_local/"; +static char *topo_prefix = "_topo/"; /** * Address Types and Processing: @@ -48,86 +49,82 @@ static char *local_prefix = "_local/"; * <mobile> M<mobile> forward handler */ +ALLOC_DEFINE(dx_routed_event_t); +ALLOC_DEFINE(dx_router_link_t); +ALLOC_DEFINE(dx_router_node_t); +ALLOC_DEFINE(dx_router_ref_t); +ALLOC_DEFINE(dx_router_link_ref_t); +ALLOC_DEFINE(dx_address_t); -typedef struct dx_router_link_t dx_router_link_t; -typedef struct dx_router_node_t dx_router_node_t; +static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + dx_router_link_ref_t *ref = new_dx_router_link_ref_t(); + DEQ_ITEM_INIT(ref); + ref->link = link; + link->ref = ref; + DEQ_INSERT_TAIL(*ref_list, ref); +} -typedef enum { - DX_LINK_ENDPOINT, // A link to a connected endpoint - DX_LINK_ROUTER, // A link to a peer router in the same area - DX_LINK_AREA // A link to a peer router in a different area (area boundary) -} dx_link_type_t; +static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + if (link->ref) { + DEQ_REMOVE(*ref_list, link->ref); + free_dx_router_link_ref_t(link->ref); + link->ref = 0; + } +} -typedef struct dx_routed_event_t { - DEQ_LINKS(struct dx_routed_event_t); - dx_delivery_t *delivery; - dx_message_t *message; - bool settle; - uint64_t disposition; -} dx_routed_event_t; -ALLOC_DECLARE(dx_routed_event_t); -ALLOC_DEFINE(dx_routed_event_t); -DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); - - -struct dx_router_link_t { - DEQ_LINKS(dx_router_link_t); - dx_direction_t link_direction; - dx_link_type_t link_type; - dx_address_t *owning_addr; // [ref] Address record that owns this link - dx_link_t *link; // [own] Link pointer - dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link - dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link - dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) - dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries -}; +/** + * Check an address to see if it no longer has any associated destinations. + * Depending on its policy, the address may be eligible for being closed out + * (i.e. Logging its terminal statistics and freeing its resources). + */ +static void dx_router_check_addr_LH(dx_address_t *addr) +{ + // TODO +} -ALLOC_DECLARE(dx_router_link_t); -ALLOC_DEFINE(dx_router_link_t); -DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); - -struct dx_router_node_t { - DEQ_LINKS(dx_router_node_t); - const char *id; - dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node - dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node - // list of valid origins (pointers to router_node) - (bit masks?) -}; -ALLOC_DECLARE(dx_router_node_t); -ALLOC_DEFINE(dx_router_node_t); -DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); +/** + * Determine whether a terminus has router capability + */ +static int dx_router_terminus_is_router(pn_terminus_t *term) +{ + pn_data_t *cap = pn_terminus_capabilities(term); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + if (sym.size == strlen(DX_CAPABILITY_ROUTER) && + strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0) + return 1; + } -struct dx_address_t { - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; - dx_router_link_list_t rlinks; // Locally-Connected Consumers - dx_router_node_list_t rnodes; // Remotely-Connected Consumers -}; + return 0; +} -ALLOC_DECLARE(dx_address_t); -ALLOC_DEFINE(dx_address_t); +static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length) +{ + static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; + char discriminator[11]; + long int rnd = random(); + int idx; + + for (idx = 0; idx < 10; idx++) + discriminator[idx] = table[(rnd >> (idx * 6)) & 63]; + discriminator[idx] = '\0'; -struct dx_router_t { - dx_dispatch_t *dx; - const char *router_area; - const char *router_id; - dx_node_t *node; - dx_router_link_list_t in_links; - dx_router_node_list_t routers; - dx_message_list_t in_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - hash_t *out_hash; - uint64_t dtag; - PyObject *pyRouter; - PyObject *pyTick; -}; + snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator); +} + + +static int dx_router_find_mask_bit(dx_link_t *link) +{ + return 0; // TODO +} /** @@ -191,7 +188,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link) DEQ_REMOVE_HEAD(to_send); // - // Get a delivery for the send. This will be the current deliver on the link. + // Get a delivery for the send. This will be the current delivery on the link. // tag++; delivery = dx_delivery(link, pn_dtag((char*) &tag, 8)); @@ -259,8 +256,8 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) dx_parsed_field_t *ingress = 0; if (in_da) { - trace = dx_parse_value_by_key(in_da, "qdx.trace"); - ingress = dx_parse_value_by_key(in_da, "qdx.ingress"); + trace = dx_parse_value_by_key(in_da, DX_DA_TRACE); + ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS); } dx_compose_start_map(out_da); @@ -269,7 +266,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // If there is a trace field, append this router's ID to the trace. // if (trace && dx_parse_is_list(trace)) { - dx_compose_insert_string(out_da, "qdx.trace"); + dx_compose_insert_string(out_da, DX_DA_TRACE); dx_compose_start_list(out_da); uint32_t idx = 0; @@ -289,7 +286,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // If there is no ingress field, annotate the ingress as this router else // keep the original field. // - dx_compose_insert_string(out_da, "qdx.ingress"); + dx_compose_insert_string(out_da, DX_DA_INGRESS); if (ingress && dx_parse_is_scalar(ingress)) { dx_field_iterator_t *iter = dx_parse_raw(ingress); dx_compose_insert_string_iterator(out_da, iter); @@ -380,7 +377,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - hash_retrieve(router->out_hash, iter, (void*) &addr); + hash_retrieve(router->addr_hash, iter, (void*) &addr); dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); int is_local = dx_field_iterator_prefix(iter, local_prefix); dx_field_iterator_free(iter); @@ -415,33 +412,34 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); fanout++; if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -457,7 +455,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } } @@ -487,7 +485,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // Invoke the in-process handler now that the lock is released. // if (handler) - handler(handler_context, in_process_copy); + handler(handler_context, in_process_copy, rlink->mask_bit); } @@ -541,25 +539,28 @@ static int router_incoming_link_handler(void* context, dx_link_t *link) dx_router_t *router = (dx_router_t*) context; dx_router_link_t *rlink = new_dx_router_link_t(); pn_link_t *pn_link = dx_link_pn(link); + int is_router = dx_router_terminus_is_router(dx_link_remote_source(link)); DEQ_ITEM_INIT(rlink); + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; + rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; rlink->link_direction = DX_INCOMING; - rlink->link_type = DX_LINK_ENDPOINT; rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); pn_link_flow(pn_link, 1000); pn_link_open(pn_link); @@ -579,52 +580,90 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) { dx_router_t *router = (dx_router_t*) context; pn_link_t *pn_link = dx_link_pn(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); + int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); + int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); - if (!r_tgt) { + // + // If this link is not a router link and it has no source address, we can't + // accept it. + // + if (r_src == 0 && !is_router && !is_dynamic) { pn_link_close(pn_link); return 0; } - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - dx_router_link_t *rlink = new_dx_router_link_t(); - - int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address); - + // + // Create a router_link record for this link. Some of the fields will be + // modified in the different cases below. + // + dx_router_link_t *rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); - - dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + + if (is_router) { + // + // If this is a router link, put it in the router_address link-list. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); + rlink->owning_addr = router->router_addr; + + } else { + // + // If this is an endpoint link, check the source. If it is dynamic, we will + // assign it an ephemeral and routable address. If it has a non-dymanic + // address, that address needs to be set up in the address list. + // + dx_field_iterator_t *iter; + char temp_addr[1000]; + dx_address_t *addr; + + if (is_dynamic) { + dx_router_generate_temp_addr(router, temp_addr, 1000); + iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); + pn_terminus_set_address(dx_link_source(link), temp_addr); + dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr); + } else { + iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); + dx_log(module, LOG_INFO, "Registered local address: %s", r_src); + } + + hash_retrieve(router->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->addr_hash, iter, addr); + DEQ_INSERT_TAIL(router->addrs, addr); + } + dx_field_iterator_free(iter); + + rlink->owning_addr = addr; + dx_router_add_link_ref_LH(&addr->rlinks, rlink); } - dx_field_iterator_free(iter); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + DEQ_INSERT_TAIL(router->links, rlink); + sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); pn_link_open(pn_link); - sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } @@ -634,40 +673,37 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) */ static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_router_t *router = (dx_router_t*) context; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); if (!rlink) return 0; sys_mutex_lock(router->lock); - if (pn_link_is_sender(pn_link)) { - DEQ_REMOVE(rlink->owning_addr->rlinks, rlink); - - if ((rlink->owning_addr->handler == 0) && - (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) && - (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) { - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - if (iter) { - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (addr == rlink->owning_addr) { - hash_remove(router->out_hash, iter); - free_dx_router_link_t(rlink); - free_dx_address_t(addr); - dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); - } - dx_field_iterator_free(iter); - } - } - } else { - DEQ_REMOVE(router->in_links, rlink); - free_dx_router_link_t(rlink); + + // + // If the link is outgoing, we must disassociate it from its address. + // + if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) { + dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink); + dx_router_check_addr_LH(rlink->owning_addr); } + // + // If this is an incoming inter-router link, we must free the mask_bit. + // + if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING) + dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit); + + // + // Remove the link from the master list-of-links. + // + DEQ_REMOVE(router->links, rlink); sys_mutex_unlock(router->lock); + + // TODO - wrap the free to handle the recursive items + free_dx_router_link_t(rlink); + return 0; } @@ -683,24 +719,37 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co // Ignore otherwise dx_router_t *router = (dx_router_t*) type_context; - dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH); dx_link_t *sender; dx_link_t *receiver; dx_router_link_t *rlink; + int mask_bit = 0; + size_t clen = strlen(DX_CAPABILITY_ROUTER); + + // + // Allocate a mask bit to designate the pair of links connected to the neighbor router + // + sys_mutex_lock(router->lock); + if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { + dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); + } else { + sys_mutex_unlock(router->lock); + dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); + return; + } // - // Create an incoming link and put it in the in-links collection. The address - // of the remote source of this link is '_local/qdxrouter'. + // Create an incoming link with router source capability // - receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx"); - pn_terminus_set_address(dx_link_remote_source(receiver), router_address); - pn_terminus_set_address(dx_link_target(receiver), router_address); + receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_INCOMING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_INCOMING; rlink->owning_addr = 0; rlink->link = receiver; rlink->connected_link = 0; @@ -709,53 +758,40 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co DEQ_INIT(rlink->msg_fifo); dx_link_set_context(receiver, rlink); - - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); - sys_mutex_unlock(router->lock); + DEQ_INSERT_TAIL(router->links, rlink); // - // Create an outgoing link with a local source of '_local/qdxrouter' and place - // it in the routing table. + // Create an outgoing link with router target capability // - sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx"); - pn_terminus_set_address(dx_link_remote_target(sender), router_address); - pn_terminus_set_address(dx_link_source(sender), router_address); + sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = router->router_addr; rlink->link = sender; rlink->connected_link = 0; rlink->peer_link = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); - dx_link_set_context(sender, rlink); - - dx_address_t *addr; - - sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, aiter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, aiter, addr); - } + // + // Add the new outgoing link to the router_address's list of links. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + dx_link_set_context(sender, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); pn_link_open(dx_link_pn(receiver)); pn_link_open(dx_link_pn(sender)); pn_link_flow(dx_link_pn(receiver), 1000); - dx_field_iterator_free(aiter); } @@ -767,7 +803,6 @@ static void dx_router_timer_handler(void *context) // Periodic processing. // dx_pyrouter_tick(router); - dx_timer_schedule(router->timer, 1000); } @@ -797,20 +832,29 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) router_node.type_context = router; + dx->router = router; router->dx = dx; router->router_area = area; router->router_id = id; router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); - DEQ_INIT(router->in_links); + DEQ_INIT(router->addrs); + router->addr_hash = hash(10, 32, 0); + + DEQ_INIT(router->links); DEQ_INIT(router->routers); - DEQ_INIT(router->in_fifo); - router->lock = sys_mutex(); - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; - router->pyTick = 0; + router->neighbor_free_mask = dx_bitmask(1); + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; + + // + // Create an address for all of the routers in the topology. It will be registered + // locally later in the initialization sequence. + // + router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0); // // Inform the field iterator module of this router's id and area. The field iterator @@ -824,7 +868,6 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) dx_python_start(); dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id); - return router; } @@ -869,14 +912,17 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); + hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); addr->handler = 0; addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + hash_insert(router->addr_hash, iter, addr); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(router->addrs, addr); } dx_field_iterator_free(iter); @@ -885,7 +931,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); + if (handler) + dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address); return addr; } @@ -905,34 +952,35 @@ void dx_router_send(dx_dispatch_t *dx, dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, address, (void*) &addr); + hash_retrieve(router->addr_hash, address, (void*) &addr); if (addr) { // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -943,7 +991,7 @@ void dx_router_send(dx_dispatch_t *dx, DEQ_INSERT_TAIL(dest_link->msg_fifo, re); dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? @@ -977,8 +1025,11 @@ static PyObject* dx_router_node_updated(PyObject *self, PyObject *args) const char *address; int is_reachable; int is_neighbor; + int link_maskbit; + int router_maskbit; - if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor)) + if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor, + &link_maskbit, &router_maskbit)) return 0; // TODO @@ -1099,6 +1150,7 @@ static void dx_router_python_setup(dx_router_t *router) PyObject* pName; PyObject* pId; PyObject* pArea; + PyObject* pMaxRouters; PyObject* pModule; PyObject* pClass; PyObject* pArgs; @@ -1126,7 +1178,7 @@ static void dx_router_python_setup(dx_router_t *router) // // Constructor Arguments for RouterEngine // - pArgs = PyTuple_New(3); + pArgs = PyTuple_New(4); // arg 0: adapter instance PyTuple_SetItem(pArgs, 0, adapterInstance); @@ -1135,10 +1187,14 @@ static void dx_router_python_setup(dx_router_t *router) pId = PyString_FromString(router->router_id); PyTuple_SetItem(pArgs, 1, pId); - // arg 2: area id + // arg 2: area_id pArea = PyString_FromString(router->router_area); PyTuple_SetItem(pArgs, 2, pArea); + // arg 3: max_routers + pMaxRouters = PyInt_FromLong((long) dx_bitmask_width()); + PyTuple_SetItem(pArgs, 3, pMaxRouters); + // // Instantiate the router // diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h new file mode 100644 index 0000000000..8e481375ab --- /dev/null +++ b/qpid/extras/dispatch/src/router_private.h @@ -0,0 +1,128 @@ +#ifndef __router_private_h__ +#define __router_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct dx_router_link_t dx_router_link_t; +typedef struct dx_router_node_t dx_router_node_t; +typedef struct dx_router_ref_t dx_router_ref_t; +typedef struct dx_router_link_ref_t dx_router_link_ref_t; + + +typedef enum { + DX_LINK_ENDPOINT, // A link to a connected endpoint + DX_LINK_ROUTER, // A link to a peer router in the same area + DX_LINK_AREA // A link to a peer router in a different area (area boundary) +} dx_link_type_t; + + +typedef struct dx_routed_event_t { + DEQ_LINKS(struct dx_routed_event_t); + dx_delivery_t *delivery; + dx_message_t *message; + bool settle; + uint64_t disposition; +} dx_routed_event_t; + +ALLOC_DECLARE(dx_routed_event_t); +DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); + + +struct dx_router_link_t { + DEQ_LINKS(dx_router_link_t); + int mask_bit; // Unique mask bit if this is an inter-router link + dx_link_type_t link_type; + dx_direction_t link_direction; + dx_address_t *owning_addr; // [ref] Address record that owns this link + dx_link_t *link; // [own] Link pointer + dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link + dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link + dx_router_link_ref_t *ref; // Pointer to a containing reference object + dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) + dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries +}; + +ALLOC_DECLARE(dx_router_link_t); +DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); + +struct dx_router_node_t { + DEQ_LINKS(dx_router_node_t); + const char *id; + int mask_bit; + dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node + dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node + dx_bitmask_t *valid_origins; +}; + +ALLOC_DECLARE(dx_router_node_t); +DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); + +struct dx_router_ref_t { + DEQ_LINKS(dx_router_ref_t); + dx_router_node_t *router; +}; + +ALLOC_DECLARE(dx_router_ref_t); +DEQ_DECLARE(dx_router_ref_t, dx_router_ref_list_t); + + +struct dx_router_link_ref_t { + DEQ_LINKS(dx_router_link_ref_t); + dx_router_link_t *link; +}; + +ALLOC_DECLARE(dx_router_link_ref_t); +DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t); + + +struct dx_address_t { + DEQ_LINKS(dx_address_t); + dx_router_message_cb handler; // In-Process Consumer + void *handler_context; // In-Process Consumer context + dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers + dx_router_ref_list_t rnodes; // Remotely-Connected Consumers +}; + +ALLOC_DECLARE(dx_address_t); +DEQ_DECLARE(dx_address_t, dx_address_list_t); + + +struct dx_router_t { + dx_dispatch_t *dx; + const char *router_area; + const char *router_id; + dx_node_t *node; + + dx_address_list_t addrs; + hash_t *addr_hash; + dx_address_t *router_addr; + + dx_router_link_list_t links; + dx_router_node_list_t routers; + + dx_bitmask_t *neighbor_free_mask; + sys_mutex_t *lock; + dx_timer_t *timer; + uint64_t dtag; + + PyObject *pyRouter; + PyObject *pyTick; +}; + +#endif |
