diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 18:59:30 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 18:59:30 +0000 |
| commit | c70bf3ea28cdf6bafd8571690d3e5c466a0658a2 (patch) | |
| tree | 68b24940e433f3f9c278b054d9ea1622389bd332 /qpid/extras/dispatch/src | |
| parent | fcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (diff) | |
| download | qpid-python-c70bf3ea28cdf6bafd8571690d3e5c466a0658a2.tar.gz | |
QPID-4984: WIP - Merge from trunk r.1525056
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
45 files changed, 1295 insertions, 2595 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 88b2cf2bcd..557410910c 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -262,7 +262,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx) DEQ_INIT(agent->out_fifo); agent->lock = sys_mutex(); agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent); - agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent); + agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent); return agent; } diff --git a/qpid/extras/dispatch/src/alloc.c b/qpid/extras/dispatch/src/alloc.c index 87b490fabe..bf10f03633 100644 --- a/qpid/extras/dispatch/src/alloc.c +++ b/qpid/extras/dispatch/src/alloc.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,24 +24,34 @@ #include <memory.h> #include <stdio.h> -typedef struct item_t item_t; +typedef struct dx_alloc_type_t dx_alloc_type_t; +typedef struct dx_alloc_item_t dx_alloc_item_t; -struct item_t { - DEQ_LINKS(item_t); +struct dx_alloc_type_t { + DEQ_LINKS(dx_alloc_type_t); dx_alloc_type_desc_t *desc; }; -DEQ_DECLARE(item_t, item_list_t); +DEQ_DECLARE(dx_alloc_type_t, dx_alloc_type_list_t); + + +struct dx_alloc_item_t { + DEQ_LINKS(dx_alloc_item_t); +}; + +DEQ_DECLARE(dx_alloc_item_t, dx_alloc_item_list_t); + struct dx_alloc_pool_t { - item_list_t free_list; + dx_alloc_item_list_t free_list; }; dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0}; dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0}; +#define BIG_THRESHOLD 256 -static sys_mutex_t *init_lock; -static item_list_t type_list; +static sys_mutex_t *init_lock; +static dx_alloc_type_list_t type_list; static void dx_alloc_init(dx_alloc_type_desc_t *desc) { @@ -56,7 +66,7 @@ static void dx_alloc_init(dx_alloc_type_desc_t *desc) if (!desc->global_pool) { if (desc->config == 0) - desc->config = desc->total_size > 256 ? + desc->config = desc->total_size > BIG_THRESHOLD ? &dx_alloc_default_config_big : &dx_alloc_default_config_small; assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size); @@ -66,12 +76,12 @@ static void dx_alloc_init(dx_alloc_type_desc_t *desc) desc->lock = sys_mutex(); desc->stats = NEW(dx_alloc_stats_t); memset(desc->stats, 0, sizeof(dx_alloc_stats_t)); - } - item_t *type_item = NEW(item_t); - DEQ_ITEM_INIT(type_item); - type_item->desc = desc; - DEQ_INSERT_TAIL(type_list, type_item); + dx_alloc_type_t *type_item = NEW(dx_alloc_type_t); + DEQ_ITEM_INIT(type_item); + type_item->desc = desc; + DEQ_INSERT_TAIL(type_list, type_item); + } sys_mutex_unlock(init_lock); } @@ -103,7 +113,7 @@ void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool) // list and return it. Since everything we've touched is thread-local, // there is no need to acquire a lock. // - item_t *item = DEQ_HEAD(pool->free_list); + dx_alloc_item_t *item = DEQ_HEAD(pool->free_list); if (item) { DEQ_REMOVE_HEAD(pool->free_list); return &item[1]; @@ -130,11 +140,10 @@ void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool) // Allocate a full batch from the heap and put it on the thread list. // for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { - item = (item_t*) malloc(sizeof(item_t) + desc->total_size); + item = (dx_alloc_item_t*) malloc(sizeof(dx_alloc_item_t) + desc->total_size); if (item == 0) break; DEQ_ITEM_INIT(item); - item->desc = desc; DEQ_INSERT_TAIL(pool->free_list, item); desc->stats->held_by_threads++; desc->stats->total_alloc_from_heap++; @@ -154,7 +163,7 @@ void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool) void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p) { - item_t *item = ((item_t*) p) - 1; + dx_alloc_item_t *item = ((dx_alloc_item_t*) p) - 1; int idx; // @@ -217,7 +226,7 @@ static void alloc_schema_handler(void *context, void *correlator) static void alloc_query_handler(void* context, const char *id, void *cor) { - item_t *item = DEQ_HEAD(type_list); + dx_alloc_type_t *item = DEQ_HEAD(type_list); while (item) { dx_agent_value_string(cor, "name", item->desc->type_name); diff --git a/qpid/extras/dispatch/src/alloc_private.h b/qpid/extras/dispatch/src/alloc_private.h index c812c6a661..da773fef25 100644 --- a/qpid/extras/dispatch/src/alloc_private.h +++ b/qpid/extras/dispatch/src/alloc_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/buffer.c b/qpid/extras/dispatch/src/buffer.c index 015711afd9..a5e609b996 100644 --- a/qpid/extras/dispatch/src/buffer.c +++ b/qpid/extras/dispatch/src/buffer.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/compose.c b/qpid/extras/dispatch/src/compose.c index 0f298c8217..6eb2ab1c73 100644 --- a/qpid/extras/dispatch/src/compose.c +++ b/qpid/extras/dispatch/src/compose.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,30 +21,10 @@ #include <qpid/dispatch/alloc.h> #include <qpid/dispatch/buffer.h> #include <qpid/dispatch/amqp.h> -#include "message_private.h" #include "compose_private.h" #include <memory.h> -typedef struct dx_composite_t { - DEQ_LINKS(struct dx_composite_t); - int isMap; - uint32_t count; - uint32_t length; - dx_field_location_t length_location; - dx_field_location_t count_location; -} dx_composite_t; - -ALLOC_DECLARE(dx_composite_t); ALLOC_DEFINE(dx_composite_t); -DEQ_DECLARE(dx_composite_t, dx_field_stack_t); - - -struct dx_composed_field_t { - dx_buffer_list_t buffers; - dx_field_stack_t fieldStack; -}; - -ALLOC_DECLARE(dx_composed_field_t); ALLOC_DEFINE(dx_composed_field_t); @@ -197,7 +177,7 @@ static void dx_compose_end_composite(dx_composed_field_t *field) // dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack); if (enclosing) { - enclosing->length += 4 + comp->length; + enclosing->length += (comp->length - 4); // the length and count were already accounted for enclosing->count++; } @@ -233,12 +213,14 @@ void dx_compose_free(dx_composed_field_t *field) while (buf) { DEQ_REMOVE_HEAD(field->buffers); dx_free_buffer(buf); + buf = DEQ_HEAD(field->buffers); } dx_composite_t *comp = DEQ_HEAD(field->fieldStack); while (comp) { DEQ_REMOVE_HEAD(field->fieldStack); free_dx_composite_t(comp); + comp = DEQ_HEAD(field->fieldStack); } free_dx_composed_field_t(field); diff --git a/qpid/extras/dispatch/src/compose_private.h b/qpid/extras/dispatch/src/compose_private.h index 58eec6097b..6e72185f3c 100644 --- a/qpid/extras/dispatch/src/compose_private.h +++ b/qpid/extras/dispatch/src/compose_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,7 +20,28 @@ */ #include <qpid/dispatch/compose.h> +#include "message_private.h" dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field); +typedef struct dx_composite_t { + DEQ_LINKS(struct dx_composite_t); + int isMap; + uint32_t count; + uint32_t length; + dx_field_location_t length_location; + dx_field_location_t count_location; +} dx_composite_t; + +ALLOC_DECLARE(dx_composite_t); +DEQ_DECLARE(dx_composite_t, dx_field_stack_t); + + +struct dx_composed_field_t { + dx_buffer_list_t buffers; + dx_field_stack_t fieldStack; +}; + +ALLOC_DECLARE(dx_composed_field_t); + #endif diff --git a/qpid/extras/dispatch/src/config.c b/qpid/extras/dispatch/src/config.c index d92a1e71dc..9390115251 100644 --- a/qpid/extras/dispatch/src/config.c +++ b/qpid/extras/dispatch/src/config.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,7 +22,7 @@ #include <qpid/dispatch/alloc.h> #include <qpid/dispatch/log.h> -#define PYTHON_MODULE "config" +#define PYTHON_MODULE "qpid.dispatch.config" static const char *log_module = "CONFIG"; diff --git a/qpid/extras/dispatch/src/config_private.h b/qpid/extras/dispatch/src/config_private.h index 4919dfc4ee..f4ecc64e4c 100644 --- a/qpid/extras/dispatch/src/config_private.h +++ b/qpid/extras/dispatch/src/config_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c index b6cc92440d..cc46f3ce77 100644 --- a/qpid/extras/dispatch/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,6 +47,7 @@ ALLOC_DECLARE(dx_node_t); ALLOC_DEFINE(dx_node_t); ALLOC_DEFINE(dx_link_item_t); + struct dx_link_t { pn_link_t *pn_link; void *context; @@ -56,6 +57,19 @@ struct dx_link_t { ALLOC_DECLARE(dx_link_t); ALLOC_DEFINE(dx_link_t); + +struct dx_delivery_t { + pn_delivery_t *pn_delivery; + dx_delivery_t *peer; + void *context; + uint64_t disposition; + dx_link_t *link; +}; + +ALLOC_DECLARE(dx_delivery_t); +ALLOC_DEFINE(dx_delivery_t); + + typedef struct dxc_node_type_t { DEQ_LINKS(struct dxc_node_type_t); const dx_node_type_t *ntype; @@ -180,14 +194,25 @@ static int do_writable(pn_link_t *pn_link) } -static void process_receive(pn_delivery_t *delivery) +static void do_receive(pn_delivery_t *pnd) { - pn_link_t *pn_link = pn_delivery_link(delivery); - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + pn_link_t *pn_link = pn_delivery_link(pnd); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd); if (link) { dx_node_t *node = link->node; if (node) { + if (!delivery) { + delivery = new_dx_delivery_t(); + delivery->pn_delivery = pnd; + delivery->peer = 0; + delivery->context = 0; + delivery->disposition = 0; + delivery->link = link; + pn_delivery_set_context(pnd, delivery); + } + node->ntype->rx_handler(node->context, link, delivery); return; } @@ -198,34 +223,18 @@ static void process_receive(pn_delivery_t *delivery) // pn_link_advance(pn_link); pn_link_flow(pn_link, 1); - pn_delivery_update(delivery, PN_REJECTED); - pn_delivery_settle(delivery); -} - - -static void do_send(pn_delivery_t *delivery) -{ - pn_link_t *pn_link = pn_delivery_link(delivery); - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); - - if (link) { - dx_node_t *node = link->node; - if (node) { - node->ntype->tx_handler(node->context, link, delivery); - return; - } - } - - // TODO - Cancel the delivery + pn_delivery_update(pnd, PN_REJECTED); + pn_delivery_settle(pnd); } -static void do_updated(pn_delivery_t *delivery) +static void do_updated(pn_delivery_t *pnd) { - pn_link_t *pn_link = pn_delivery_link(delivery); - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + pn_link_t *pn_link = pn_delivery_link(pnd); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd); - if (link) { + if (link && delivery) { dx_node_t *node = link->node; if (node) node->ntype->disp_handler(node->context, link, delivery); @@ -239,15 +248,15 @@ static int close_handler(void* unused, pn_connection_t *conn) // Close all links, passing False as the 'closed' argument. These links are not // being properly 'detached'. They are being orphaned. // - pn_link_t *pn_link = pn_link_head(conn, 0); + pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE); while (pn_link) { dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); dx_node_t *node = link->node; - if (node) + if (node && link) node->ntype->link_detach_handler(node->context, link, 0); pn_link_close(pn_link); free_dx_link_t(link); - pn_link = pn_link_next(pn_link, 0); + pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE); } // teardown all sessions @@ -305,9 +314,7 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio delivery = pn_work_head(conn); while (delivery) { if (pn_delivery_readable(delivery)) - process_receive(delivery); - else if (pn_delivery_writable(delivery)) - do_send(delivery); + do_receive(delivery); if (pn_delivery_updated(delivery)) { do_updated(delivery); @@ -318,15 +325,15 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio } // - // Step 2.5: Traverse all of the links on the connection looking for - // outgoing links with non-zero credit. Call the attached node's - // writable handler for such links. + // Step 2.5: Call the attached node's writable handler for all active links + // on the connection. Note that in Dispatch, links are considered + // bidirectional. Incoming and outgoing only pertains to deliveries and + // deliveries are a subset of the traffic that flows both directions on links. // pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); while (pn_link) { assert(pn_session_connection(pn_link_session(pn_link)) == conn); - if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0) - event_count += do_writable(pn_link); + event_count += do_writable(pn_link); pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } @@ -513,10 +520,10 @@ int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt) } -void dx_container_set_default_node_type(dx_dispatch_t *dx, - const dx_node_type_t *nt, - void *context, - dx_dist_mode_t supported_dist) +dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dx, + const dx_node_type_t *nt, + void *context, + dx_dist_mode_t supported_dist) { dx_container_t *container = dx->container; @@ -530,6 +537,8 @@ void dx_container_set_default_node_type(dx_dispatch_t *dx, container->default_node = 0; dx_log(module, LOG_TRACE, "Default node removed"); } + + return container->default_node; } @@ -699,3 +708,110 @@ void dx_link_close(dx_link_t *link) } +dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag) +{ + pn_link_t *pnl = dx_link_pn(link); + + // + // If there is a current delivery on this outgoing link, something + // is wrong with the delivey algorithm. We assume that the current + // delivery ('pnd' below) is the one created by pn_delivery. If it is + // not, then my understanding of how proton works is incorrect. + // + assert(!pn_link_current(pnl)); + + pn_delivery(pnl, tag); + pn_delivery_t *pnd = pn_link_current(pnl); + + if (!pnd) + return 0; + + dx_delivery_t *delivery = new_dx_delivery_t(); + delivery->pn_delivery = pnd; + delivery->peer = 0; + delivery->context = 0; + delivery->disposition = 0; + delivery->link = link; + pn_delivery_set_context(pnd, delivery); + + return delivery; +} + + +void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition) +{ + if (delivery->pn_delivery) { + if (final_disposition > 0) + pn_delivery_update(delivery->pn_delivery, final_disposition); + pn_delivery_set_context(delivery->pn_delivery, 0); + pn_delivery_settle(delivery->pn_delivery); + } + if (delivery->peer) + delivery->peer->peer = 0; + free_dx_delivery_t(delivery); +} + + +void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer) +{ + delivery->peer = peer; +} + + +void dx_delivery_set_context(dx_delivery_t *delivery, void *context) +{ + delivery->context = context; +} + + +void *dx_delivery_context(dx_delivery_t *delivery) +{ + return delivery->context; +} + + +dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery) +{ + return delivery->peer; +} + + +pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery) +{ + return delivery->pn_delivery; +} + + +void dx_delivery_settle(dx_delivery_t *delivery) +{ + if (delivery->pn_delivery) { + pn_delivery_settle(delivery->pn_delivery); + delivery->pn_delivery = 0; + } +} + + +bool dx_delivery_settled(dx_delivery_t *delivery) +{ + return pn_delivery_settled(delivery->pn_delivery); +} + + +bool dx_delivery_disp_changed(dx_delivery_t *delivery) +{ + return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery); +} + + +uint64_t dx_delivery_disp(dx_delivery_t *delivery) +{ + delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery); + return delivery->disposition; +} + + +dx_link_t *dx_delivery_link(dx_delivery_t *delivery) +{ + return delivery->link; +} + diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c index 86398bb946..a1a659fd74 100644 --- a/qpid/extras/dispatch/src/dispatch.c +++ b/qpid/extras/dispatch/src/dispatch.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/dispatch_private.h b/qpid/extras/dispatch/src/dispatch_private.h index 71b9f85c0e..d96eb59afe 100644 --- a/qpid/extras/dispatch/src/dispatch_private.h +++ b/qpid/extras/dispatch/src/dispatch_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c index 19744366aa..1f7d8aa3f5 100644 --- a/qpid/extras/dispatch/src/hash.c +++ b/qpid/extras/dispatch/src/hash.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,7 +38,7 @@ DEQ_DECLARE(hash_item_t, items_t); typedef struct bucket_t { - items_t items; + items_t items; } bucket_t; diff --git a/qpid/extras/dispatch/src/iovec.c b/qpid/extras/dispatch/src/iovec.c index 6ff6874440..a4ce937333 100644 --- a/qpid/extras/dispatch/src/iovec.c +++ b/qpid/extras/dispatch/src/iovec.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c index eb302fe9ad..0239a083ca 100644 --- a/qpid/extras/dispatch/src/iterator.c +++ b/qpid/extras/dispatch/src/iterator.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -367,7 +367,7 @@ dx_field_iterator_t *dx_field_iterator_sub(dx_field_iterator_t *iter, uint32_t l void dx_field_iterator_advance(dx_field_iterator_t *iter, uint32_t length) { // TODO - Make this more efficient. - for (uint8_t idx = 0; idx < length; idx++) + for (uint8_t idx = 0; idx < length && !dx_field_iterator_end(iter); idx++) dx_field_iterator_octet(iter); } diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c index 555d9ed7f3..281603255d 100644 --- a/qpid/extras/dispatch/src/log.c +++ b/qpid/extras/dispatch/src/log.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/log_private.h b/qpid/extras/dispatch/src/log_private.h index 731822ec6f..0b36bbc9a9 100644 --- a/qpid/extras/dispatch/src/log_private.h +++ b/qpid/extras/dispatch/src/log_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c index 6d75de76db..772c0d4e16 100644 --- a/qpid/extras/dispatch/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,11 +25,35 @@ #include <string.h> #include <stdio.h> +static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"; +static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70"; +static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"; +static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71"; +static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"; +static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72"; +static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"; +static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73"; +static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"; +static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74"; +static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"; +static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75"; +static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"; +static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76"; +static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77"; +static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77"; +static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"; +static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78"; +static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0"; +static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1"; +static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0"; +static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0"; + ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0); ALLOC_DEFINE(dx_message_content_t); +typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length); -static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) +static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume, buffer_process_t handler, void *context) { unsigned char *local_cursor = *cursor; dx_buffer_t *local_buffer = *buffer; @@ -37,9 +61,13 @@ static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); while (consume > 0) { if (consume < remaining) { + if (handler) + handler(context, local_cursor, consume); local_cursor += consume; consume = 0; } else { + if (handler) + handler(context, local_cursor, remaining); consume -= remaining; local_buffer = local_buffer->next; if (local_buffer == 0){ @@ -59,7 +87,7 @@ static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer) { unsigned char result = **cursor; - advance(cursor, buffer, 1); + advance(cursor, buffer, 1, 0, 0); return result; } @@ -68,7 +96,10 @@ static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field { unsigned char tag = next_octet(cursor, buffer); if (!(*cursor)) return 0; - int consume = 0; + + int consume = 0; + size_t hdr_length = 1; + switch (tag & 0xF0) { case 0x40 : consume = 0; break; case 0x50 : consume = 1; break; @@ -80,6 +111,7 @@ static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field case 0xB0 : case 0xD0 : case 0xF0 : + hdr_length += 3; consume |= ((int) next_octet(cursor, buffer)) << 24; if (!(*cursor)) return 0; consume |= ((int) next_octet(cursor, buffer)) << 16; @@ -91,19 +123,21 @@ static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field case 0xA0 : case 0xC0 : case 0xE0 : + hdr_length++; consume |= (int) next_octet(cursor, buffer); if (!(*cursor)) return 0; break; } if (field && !field->parsed) { - field->buffer = *buffer; - field->offset = *cursor - dx_buffer_base(*buffer); - field->length = consume; - field->parsed = 1; + field->buffer = *buffer; + field->offset = *cursor - dx_buffer_base(*buffer); + field->length = consume; + field->hdr_length = hdr_length; + field->parsed = 1; } - advance(cursor, buffer, consume); + advance(cursor, buffer, consume, 0, 0); return 1; } @@ -210,10 +244,11 @@ static int dx_check_and_advance(dx_buffer_t **buffer, // // Pattern matched and tag is expected. Mark the beginning of the section. // - location->parsed = 1; - location->buffer = test_buffer; - location->offset = test_cursor - dx_buffer_base(test_buffer); - location->length = 0; + location->parsed = 1; + location->buffer = test_buffer; + location->offset = test_cursor - dx_buffer_base(test_buffer); + location->length = 0; + location->hdr_length = pattern_length; // // Advance the pointers to consume the whole section. @@ -249,7 +284,7 @@ static int dx_check_and_advance(dx_buffer_t **buffer, location->length = pre_consume + consume; if (consume) - advance(&test_cursor, &test_buffer, consume); + advance(&test_cursor, &test_buffer, consume, 0, 0); *cursor = test_cursor; *buffer = test_buffer; @@ -318,6 +353,11 @@ static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_mess } break; + case DX_FIELD_DELIVERY_ANNOTATION: + if (content->section_delivery_annotation.parsed) + return &content->section_delivery_annotation; + break; + case DX_FIELD_APPLICATION_PROPERTIES: if (content->section_application_properties.parsed) return &content->section_application_properties; @@ -343,8 +383,7 @@ dx_message_t *dx_allocate_message() return 0; DEQ_ITEM_INIT(msg); - msg->content = new_dx_message_content_t(); - msg->out_delivery = 0; + msg->content = new_dx_message_content_t(); if (msg->content == 0) { free_dx_message_t((dx_message_t*) msg); @@ -355,6 +394,7 @@ dx_message_t *dx_allocate_message() msg->content->lock = sys_mutex(); msg->content->ref_count = 1; msg->content->parse_depth = DX_DEPTH_NONE; + msg->content->parsed_delivery_annotations = 0; return (dx_message_t*) msg; } @@ -371,14 +411,23 @@ void dx_free_message(dx_message_t *in_msg) sys_mutex_unlock(content->lock); if (rc == 0) { - dx_buffer_t *buf = DEQ_HEAD(content->buffers); + if (content->parsed_delivery_annotations) + dx_parse_free(content->parsed_delivery_annotations); + dx_buffer_t *buf = DEQ_HEAD(content->buffers); while (buf) { DEQ_REMOVE_HEAD(content->buffers); dx_free_buffer(buf); buf = DEQ_HEAD(content->buffers); } + buf = DEQ_HEAD(content->new_delivery_annotations); + while (buf) { + DEQ_REMOVE_HEAD(content->new_delivery_annotations); + dx_free_buffer(buf); + buf = DEQ_HEAD(content->new_delivery_annotations); + } + sys_mutex_free(content->lock); free_dx_message_content_t(content); } @@ -397,8 +446,7 @@ dx_message_t *dx_message_copy(dx_message_t *in_msg) return 0; DEQ_ITEM_INIT(copy); - copy->content = content; - copy->out_delivery = 0; + copy->content = content; sys_mutex_lock(content->lock); content->ref_count++; @@ -408,55 +456,58 @@ dx_message_t *dx_message_copy(dx_message_t *in_msg) } -void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery) +dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *in_msg) { - ((dx_message_pvt_t*) msg)->out_delivery = delivery; -} + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + if (content->parsed_delivery_annotations) + return content->parsed_delivery_annotations; -pn_delivery_t *dx_message_out_delivery(dx_message_t *msg) -{ - return ((dx_message_pvt_t*) msg)->out_delivery; -} + dx_field_iterator_t *da = dx_message_field_iterator(in_msg, DX_FIELD_DELIVERY_ANNOTATION); + if (da == 0) + return 0; + content->parsed_delivery_annotations = dx_parse(da); + if (content->parsed_delivery_annotations == 0 || + !dx_parse_ok(content->parsed_delivery_annotations) || + !dx_parse_is_map(content->parsed_delivery_annotations)) { + dx_field_iterator_free(da); + dx_parse_free(content->parsed_delivery_annotations); + return 0; + } -void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - content->in_delivery = delivery; + return content->parsed_delivery_annotations; } -pn_delivery_t *dx_message_in_delivery(dx_message_t *msg) +void dx_message_set_delivery_annotations(dx_message_t *msg, dx_composed_field_t *da) { - dx_message_content_t *content = MSG_CONTENT(msg); - return content->in_delivery; + dx_message_content_t *content = MSG_CONTENT(msg); + dx_buffer_list_t *field_buffers = dx_compose_buffers(da); + + assert(DEQ_SIZE(content->new_delivery_annotations) == 0); + content->new_delivery_annotations = *field_buffers; + DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. } -dx_message_t *dx_message_receive(pn_delivery_t *delivery) +dx_message_t *dx_message_receive(dx_delivery_t *delivery) { - pn_link_t *link = pn_delivery_link(delivery); - dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery); + pn_delivery_t *pnd = dx_delivery_pn(delivery); + dx_message_pvt_t *msg = (dx_message_pvt_t*) dx_delivery_context(delivery); + pn_link_t *link = pn_delivery_link(pnd); ssize_t rc; dx_buffer_t *buf; // // If there is no message associated with the delivery, this is the first time - // we've received anything on this delivery. Allocate a message descriptor and + // we've received anything on this delivery. Allocate a message descriptor and // link it and the delivery together. // if (!msg) { msg = (dx_message_pvt_t*) dx_allocate_message(); - pn_delivery_set_context(delivery, (void*) msg); - - // - // Record the incoming delivery only if it is not settled. If it is - // settled, it should not be recorded as no future operations on it are - // permitted. - // - if (!pn_delivery_settled(delivery)) - msg->content->in_delivery = delivery; + dx_delivery_set_context(delivery, (void*) msg); } // @@ -489,6 +540,7 @@ dx_message_t *dx_message_receive(pn_delivery_t *delivery) DEQ_REMOVE_TAIL(msg->content->buffers); dx_free_buffer(buf); } + dx_delivery_set_context(delivery, 0); return (dx_message_t*) msg; } @@ -520,14 +572,76 @@ dx_message_t *dx_message_receive(pn_delivery_t *delivery) } -void dx_message_send(dx_message_t *in_msg, pn_link_t *link) +static void send_handler(void *context, const unsigned char *start, int length) { - dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; - dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); + pn_link_t *pnl = (pn_link_t*) context; + pn_link_send(pnl, (const char*) start, length); +} + + +void dx_message_send(dx_message_t *in_msg, dx_link_t *link) +{ + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + dx_buffer_t *buf = DEQ_HEAD(content->buffers); + unsigned char *cursor; + pn_link_t *pnl = dx_link_pn(link); + + if (DEQ_SIZE(content->new_delivery_annotations) > 0) { + // + // This is the case where the delivery annotations have been modified. + // The message send must be divided into sections: The existing header; + // the new delivery annotations; the rest of the existing message. + // Note that the original delivery annotations that are still in the + // buffer chain must not be sent. + // + // Start by making sure that we've parsed the message sections through + // the delivery annotations + // + if (!dx_message_check(in_msg, DX_DEPTH_DELIVERY_ANNOTATIONS)) + return; + + // + // Send header if present + // + cursor = dx_buffer_base(buf); + if (content->section_message_header.length > 0) { + pn_link_send(pnl, (const char*) MSG_HDR_SHORT, 3); + buf = content->section_message_header.buffer; + cursor = content->section_message_header.offset + dx_buffer_base(buf); + advance(&cursor, &buf, content->section_message_header.length, send_handler, (void*) pnl); + } + + // + // Send new delivery annotations + // + dx_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations); + while (da_buf) { + pn_link_send(pnl, (char*) dx_buffer_base(da_buf), dx_buffer_size(da_buf)); + da_buf = DEQ_NEXT(da_buf); + } + + // + // Skip over replaced delivery annotations + // + if (content->section_delivery_annotation.length > 0) + advance(&cursor, &buf, + content->section_delivery_annotation.hdr_length + content->section_delivery_annotation.length, + 0, 0); + + // + // Send remaining partial buffer + // + if (buf) { + size_t len = dx_buffer_size(buf) - (cursor - dx_buffer_base(buf)); + advance(&cursor, &buf, len, send_handler, (void*) pnl); + } + + // Fall through to process the remaining buffers normally + } - // TODO - Handle cases where annotations have been added or modified while (buf) { - pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); + pn_link_send(pnl, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); buf = DEQ_NEXT(buf); } } @@ -557,29 +671,6 @@ static int dx_check_field_LH(dx_message_content_t *content, static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t depth) { - static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"; - static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70"; - static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"; - static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71"; - static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"; - static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72"; - static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"; - static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73"; - static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"; - static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74"; - static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"; - static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75"; - static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"; - static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76"; - static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77"; - static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77"; - static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"; - static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78"; - static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0"; - static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1"; - static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0"; - static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0"; - dx_buffer_t *buffer = DEQ_HEAD(content->buffers); if (!buffer) diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h index ca39a84660..27b81bbb4c 100644 --- a/qpid/extras/dispatch/src/message_private.h +++ b/qpid/extras/dispatch/src/message_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,10 +45,11 @@ */ typedef struct { - dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present - size_t offset; // Offset in the buffer to the first octet - size_t length; // Length of the field or zero if unneeded - int parsed; // non-zero iff the buffer chain has been parsed to find this field + dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present + size_t offset; // Offset in the buffer to the first octet + size_t length; // Length of the field or zero if unneeded + size_t hdr_length; // Length of the field's header (not included in the length of the field) + int parsed; // non-zero iff the buffer chain has been parsed to find this field } dx_field_location_t; @@ -58,12 +59,15 @@ typedef struct { // 1) Received message is held and forwarded unmodified - single buffer list // 2) Received message is held and modified before forwarding - two buffer lists // 3) Message is composed internally - single buffer list +// TODO - provide a way to allocate a message without a lock for the link-routing case. +// It's likely that link-routing will cause no contention for the message content. +// typedef struct { sys_mutex_t *lock; - uint32_t ref_count; // The number of qmessages referencing this + uint32_t ref_count; // The number of messages referencing this dx_buffer_list_t buffers; // The buffer chain containing the message - pn_delivery_t *in_delivery; // The delivery on which the message arrived + dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations dx_field_location_t section_message_header; // The message header list dx_field_location_t section_delivery_annotation; // The delivery annotation map dx_field_location_t section_message_annotation; // The message annotation map @@ -78,12 +82,12 @@ typedef struct { dx_buffer_t *parse_buffer; unsigned char *parse_cursor; dx_message_depth_t parse_depth; + dx_parsed_field_t *parsed_delivery_annotations; } dx_message_content_t; typedef struct { - DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t + DEQ_LINKS(dx_message_t); // Deque linkage that overlays the dx_message_t dx_message_content_t *content; - pn_delivery_t *out_delivery; } dx_message_pvt_t; ALLOC_DECLARE(dx_message_t); diff --git a/qpid/extras/dispatch/src/parse.c b/qpid/extras/dispatch/src/parse.c index 3b47ad216a..c5ecc4e498 100644 --- a/qpid/extras/dispatch/src/parse.c +++ b/qpid/extras/dispatch/src/parse.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -37,13 +37,14 @@ ALLOC_DECLARE(dx_parsed_field_t); ALLOC_DEFINE(dx_parsed_field_t); -static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count) +static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen) { if (dx_field_iterator_end(iter)) return "Insufficient Data to Determine Tag"; - *tag = dx_field_iterator_octet(iter); - *count = 0; - *length = 0; + *tag = dx_field_iterator_octet(iter); + *count = 0; + *length = 0; + *clen = 0; switch (*tag & 0xF0) { case 0x40: *length = 0; break; @@ -59,7 +60,7 @@ static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *le *length += ((unsigned int) dx_field_iterator_octet(iter)) << 16; *length += ((unsigned int) dx_field_iterator_octet(iter)) << 8; // fall through to the next case - + case 0xA0: case 0xC0: case 0xE0: @@ -78,19 +79,24 @@ static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *le *count += ((unsigned int) dx_field_iterator_octet(iter)) << 24; *count += ((unsigned int) dx_field_iterator_octet(iter)) << 16; *count += ((unsigned int) dx_field_iterator_octet(iter)) << 8; + *clen = 3; // fall through to the next case - + case 0xC0: case 0xE0: if (dx_field_iterator_end(iter)) return "Insufficient Data to Determine Count"; *count += (unsigned int) dx_field_iterator_octet(iter); + *clen += 1; break; } if ((*tag == DX_AMQP_MAP8 || *tag == DX_AMQP_MAP32) && (*count & 1)) return "Odd Number of Elements in a Map"; + if (*clen > *length) + return "Insufficient Length to Determine Count"; + return 0; } @@ -108,13 +114,13 @@ static dx_parsed_field_t *dx_parse_internal(dx_field_iterator_t *iter, dx_parsed uint32_t length; uint32_t count; + uint32_t length_of_count; - field->parse_error = get_type_info(iter, &field->tag, &length, &count); + field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count); if (!field->parse_error) { field->raw_iter = dx_field_iterator_sub(iter, length); - if (count == 0 && length > 0) - dx_field_iterator_advance(iter, length); + dx_field_iterator_advance(iter, length - length_of_count); for (uint32_t idx = 0; idx < count; idx++) { dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field); DEQ_INSERT_TAIL(field->children, child); @@ -377,4 +383,3 @@ dx_parsed_field_t *dx_parse_value_by_key(dx_parsed_field_t *field, const char *k return 0; } - diff --git a/qpid/extras/dispatch/src/posix/threading.c b/qpid/extras/dispatch/src/posix/threading.c index 8edce86cdc..1d7760ca88 100644 --- a/qpid/extras/dispatch/src/posix/threading.c +++ b/qpid/extras/dispatch/src/posix/threading.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -103,7 +103,7 @@ void sys_cond_signal_all(sys_cond_t *cond) struct sys_thread_t { pthread_t thread; -}; +}; sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg) { @@ -123,4 +123,3 @@ void sys_thread_join(sys_thread_t *thread) { pthread_join(thread->thread, 0); } - diff --git a/qpid/extras/dispatch/src/py/config/__init__.py b/qpid/extras/dispatch/src/py/config/__init__.py deleted file mode 100644 index ebaeea5203..0000000000 --- a/qpid/extras/dispatch/src/py/config/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from config.parser import DXConfig diff --git a/qpid/extras/dispatch/src/py/config/parser.py b/qpid/extras/dispatch/src/py/config/parser.py deleted file mode 100644 index 7619b91461..0000000000 --- a/qpid/extras/dispatch/src/py/config/parser.py +++ /dev/null @@ -1,337 +0,0 @@ -## -## Licensed to the Apache Software Foundation (ASF) under one -## or more contributor license agreements. See the NOTICE file -## distributed with this work for additional information -## regarding copyright ownership. The ASF licenses this file -## to you under the Apache License, Version 2.0 (the -## "License"); you may not use this file except in compliance -## with the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, -## software distributed under the License is distributed on an -## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -## KIND, either express or implied. See the License for the -## specific language governing permissions and limitations -## under the License -## - -import json -from schema import config_schema -from dispatch import LogAdapter, LOG_TRACE, LOG_ERROR, LOG_INFO - -class Section: - """ - """ - - def __init__(self, name, kv_pairs, schema_section): - self.name = name - self.schema_section = schema_section - self.values = schema_section.check_and_default(kv_pairs) - self.index = schema_section.index_of(kv_pairs) - - def __repr__(self): - return "%r" % self.values - - -class ConfigMain: - """ - """ - - def __init__(self, schema): - self.sections_by_name = {} - self.sections_by_index = {} - self.schema = schema - - - def update(self, raw_config): - for sec_map in raw_config: - name = sec_map.keys()[0] - kv = sec_map[name] - schema_section = self.schema.sections[name] - sec = Section(name, kv, schema_section) - if name not in self.sections_by_name: - self.sections_by_name[name] = [] - self.sections_by_name[name].append(sec) - self.sections_by_index[sec.index] = sec - self._expand_references() - - - def item_count(self, name): - if name in self.sections_by_name: - return len(self.sections_by_name[name]) - return 0 - - - def get_value(self, name, idx, key): - if name in self.sections_by_name: - sec = self.sections_by_name[name] - if idx <= len(sec): - if key in sec[idx].values: - return sec[idx].values[key] - return None - - - def _expand_references(self): - for name, sec_list in self.sections_by_name.items(): - for sec in sec_list: - for k,v in sec.values.items(): - if sec.schema_section.is_expandable(k): - ref_name = "%s:%s" % (k, v) - if ref_name in self.sections_by_index: - ref_section = self.sections_by_index[ref_name] - for ek,ev in ref_section.values.items(): - if ref_section.schema_section.expand_copy(ek): - sec.values[ek] = ev - - - - -SECTION_SINGLETON = 0 -SECTION_VALUES = 1 - -VALUE_TYPE = 0 -VALUE_INDEX = 1 -VALUE_FLAGS = 2 -VALUE_DEFAULT = 3 - - -class SchemaSection: - """ - """ - - def __init__(self, name, section_tuple): - self.name = name - self.singleton = section_tuple[SECTION_SINGLETON] - self.values = section_tuple[SECTION_VALUES] - self.index_keys = [] - finding_index = True - index_ord = 0 - while finding_index: - finding_index = False - for k,v in self.values.items(): - if v[VALUE_INDEX] == index_ord: - self.index_keys.append(k) - index_ord += 1 - finding_index = True - - - def is_mandatory(self, key): - return self.values[key][VALUE_FLAGS].find('M') >= 0 - - - def is_expandable(self, key): - return self.values[key][VALUE_FLAGS].find('E') >= 0 - - - def expand_copy(self, key): - return self.values[key][VALUE_FLAGS].find('S') >= 0 - - - def default_value(self, key): - return self.values[key][VALUE_DEFAULT] - - - def check_and_default(self, kv_map): - copy = {} - for k,v in self.values.items(): - if k not in kv_map: - if self.is_mandatory(k): - raise Exception("In section '%s', missing mandatory key '%s'" % (self.name, k)) - else: - copy[k] = self.default_value(k) - for k,v in kv_map.items(): - if k not in self.values: - raise Exception("In section '%s', unknown key '%s'" % (self.name, k)) - copy[k] = v - return copy - - - def index_of(self, kv_map): - result = self.name - for key in self.index_keys: - result += ':%s' % kv_map[key] - if result == "": - result = "SINGLE" - return result - - -class Schema: - """ - """ - - def __init__(self): - self.sections = {} - for k,v in config_schema.items(): - self.sections[k] = SchemaSection(k, v) - - - -class DXConfig: - """ - Configuration File Parser for Qpid Dispatch - - Configuration files are made up of "sections" having the following form: - - section-name { - key0: value0 - key1: value1 - ... - keyN: valueN - } - - Sections may be repeated (i.e. there may be multiple instances with the same section name). - The keys must be unique within a section. Values can be of string or integer types. No - quoting is necessary anywhere in the configuration file. Values may contain whitespace. - - Comment lines starting with the '#' character will be ignored. - - This parser converts the configuration file into a json string where the file is represented - as a list of maps. Each map has one item, the key being the section name and the value being - a nested map of keys and values from the file. This json string is parsed into a data - structure that may then be queried. - - """ - - def __init__(self, path): - self.path = path - self.raw_config = None - self.config = None - self.schema = Schema() - self.log = LogAdapter('config.parser') - - - def read_file(self): - try: - self.log.log(LOG_INFO, "Reading Configuration File: %s" % self.path) - cfile = open(self.path) - text = cfile.read() - cfile.close() - - self.json_text = "[" + self._toJson(text) + "]" - self.raw_config = json.loads(self.json_text); - self._validate_raw_config() - self._process_schema() - except Exception, E: - print "Exception in read_file: %r" % E - raise - - - def __repr__(self): - return "%r" % self.config - - - def _toJson(self, text): - lines = text.split('\n') - stripped = "" - for line in lines: - sline = line.strip() - - # - # Ignore empty lines - # - if len(sline) == 0: - continue - - # - # Ignore comment lines - # - if sline.find('#') == 0: - continue - - # - # Convert section opens, closes, and colon-separated key:value lines into json - # - if sline[-1:] == '{': - sline = '{"' + sline[:-1].strip() + '" : {' - elif sline == '}': - sline = '}},' - else: - colon = sline.find(':') - if colon > 1: - sline = '"' + sline[:colon] + '":"' + sline[colon+1:].strip() + '",' - stripped += sline - - # - # Remove the trailing commas in map entries - # - stripped = stripped.replace(",}", "}") - - # - # Return the entire document minus the trailing comma - # - return stripped[:-1] - - - def _validate_raw_config(self): - """ - Ensure that the configuration is well-formed. Once this is validated, - further functions can assume a well-formed data structure is in place. - """ - if self.raw_config.__class__ != list: - raise Exception("Invalid Config: Expected List at top level") - for section in self.raw_config: - if section.__class__ != dict: - raise Exception("Invalid Config: List items must be maps") - if len(section) != 1: - raise Exception("Invalid Config: Map must have only one entry") - for key,val in section.items(): - if key.__class__ != str and key.__class__ != unicode: - raise Exception("Invalid Config: Key in map must be a string") - if val.__class__ != dict: - raise Exception("Invalid Config: Value in map must be a map") - for k,v in val.items(): - if k.__class__ != str and k.__class__ != unicode: - raise Exception("Invalid Config: Key in section must be a string") - if v.__class__ != str and v.__class__ != unicode: - raise Exception("Invalid Config: Value in section must be a string") - - - def _process_schema(self): - self.config = ConfigMain(self.schema) - self.config.update(self.raw_config) - self.raw_config = None - - - def item_count(self, section): - """ - Return the number of items in a section (i.e. the number if instances of a section-name). - """ - result = self.config.item_count(section) - return result - - - def _value(self, section, idx, key): - return self.config.get_value(section, idx, key) - - - def value_string(self, section, idx, key): - """ - Return the string value for the key in the idx'th item in the section. - """ - value = self._value(section, idx, key) - if value: - return str(value) - return None - - - def value_int(self, section, idx, key): - """ - Return the integer value for the key in the idx'th item in the section. - """ - value = self._value(section, idx, key) - return long(value) - - - def value_bool(self, section, idx, key): - """ - Return the boolean value for the key in the idx'th item in the section. - """ - value = self._value(section, idx, key) - if value: - if str(value) != "no": - return True - return None - - diff --git a/qpid/extras/dispatch/src/py/config/schema.py b/qpid/extras/dispatch/src/py/config/schema.py deleted file mode 100644 index 545139f0df..0000000000 --- a/qpid/extras/dispatch/src/py/config/schema.py +++ /dev/null @@ -1,82 +0,0 @@ -## -## Licensed to the Apache Software Foundation (ASF) under one -## or more contributor license agreements. See the NOTICE file -## distributed with this work for additional information -## regarding copyright ownership. The ASF licenses this file -## to you under the Apache License, Version 2.0 (the -## "License"); you may not use this file except in compliance -## with the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, -## software distributed under the License is distributed on an -## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -## KIND, either express or implied. See the License for the -## specific language governing permissions and limitations -## under the License -## - -# -# config_schema = -# { <section_name> : -# (<singleton>, -# {<key> : (<value-type>, <index>, <flags>, <default-value>) -# ) -# } -# -# <section-name> = String name of a configuration section -# <singleton> = False => There may be 0 or more sections with this name -# True => There must be exactly one section with this name -# <key> = String key of a section's key-value pair -# <value-type> = Python type for the value -# <index> = None => This value is not an index for multiple sections -# >= 0 => Ordinal of this value in the section primary-key -# <flags> = Set of characters: -# M = Mandatory (no default value) -# E = Expand referenced section into this record -# S = During expansion, this key should be copied -# <default-value> = If not mandatory and not specified, the value defaults to this -# value -# - -config_schema = { - 'container' : (True, { - 'worker-threads' : (int, None, "", 1), - 'container-name' : (str, None, "", None) - }), - 'ssl-profile' : (False, { - 'name' : (str, 0, "M"), - 'cert-db' : (str, None, "S", None), - 'cert-file' : (str, None, "S", None), - 'key-file' : (str, None, "S", None), - 'password-file' : (str, None, "S", None), - 'password' : (str, None, "S", None) - }), - 'listener' : (False, { - 'addr' : (str, 0, "M"), - 'port' : (str, 1, "M"), - 'label' : (str, None, "", None), - 'sasl-mechanisms' : (str, None, "M"), - 'ssl-profile' : (str, None, "E", None), - 'require-peer-auth' : (bool, None, "", True), - 'allow-unsecured' : (bool, None, "", False) - }), - 'connector' : (False, { - 'addr' : (str, 0, "M"), - 'port' : (str, 1, "M"), - 'label' : (str, None, "", None), - 'sasl-mechanisms' : (str, None, "M"), - 'ssl-profile' : (str, None, "E", None), - 'allow-redirect' : (bool, None, "", True) - }), - 'router' : (True, { - 'router-id' : (str, None, "M"), - 'area' : (str, None, "", None), - 'hello-interval' : (int, None, "", 1), - 'hello-max-age' : (int, None, "", 3), - 'ra-interval' : (int, None, "", 30), - 'remote-ls-max-age' : (int, None, "", 60), - 'mobile-addr-max-age' : (int, None, "", 60) - })} - diff --git a/qpid/extras/dispatch/src/py/router/__init__.py b/qpid/extras/dispatch/src/py/router/__init__.py deleted file mode 100644 index bfc600d469..0000000000 --- a/qpid/extras/dispatch/src/py/router/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from router.router_engine import * diff --git a/qpid/extras/dispatch/src/py/router/adapter.py b/qpid/extras/dispatch/src/py/router/adapter.py deleted file mode 100644 index 76c5a3ea48..0000000000 --- a/qpid/extras/dispatch/src/py/router/adapter.py +++ /dev/null @@ -1,99 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from stubs import * - -ENTRY_OLD = 1 -ENTRY_CURRENT = 2 -ENTRY_NEW = 3 - -class AdapterEngine(object): - """ - This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop). - Key binding lists are kept in disjoint key-classes that can come from different parts of the router - (i.e. topological keys for inter-router communication and mobile keys for end users). - - For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the - routing tables to be efficiently communicated to the adapter in the form of table deltas. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.key_classes = {} # map [key_class] => (addr-key, next-hop) - - - def tick(self, now): - """ - There is no periodic processing needed for this module. - """ - pass - - - def remote_routes_changed(self, key_class, new_table): - old_table = [] - if key_class in self.key_classes: - old_table = self.key_classes[key_class] - - # flag all of the old entries - old_flags = {} - for a,b in old_table: - old_flags[(a,b)] = ENTRY_OLD - - # flag the new entries - new_flags = {} - for a,b in new_table: - new_flags[(a,b)] = ENTRY_NEW - - # calculate the differences from old to new - for a,b in new_table: - if old_table.count((a,b)) > 0: - old_flags[(a,b)] = ENTRY_CURRENT - new_flags[(a,b)] = ENTRY_CURRENT - - # make to_add and to_delete lists - to_add = [] - to_delete = [] - for (a,b),f in old_flags.items(): - if f == ENTRY_OLD: - to_delete.append((a,b)) - for (a,b),f in new_flags.items(): - if f == ENTRY_NEW: - to_add.append((a,b)) - - # set the routing table to the new contents - self.key_classes[key_class] = new_table - - # update the adapter's routing tables - # Note: Do deletions before adds to avoid overlapping routes that may cause - # messages to be duplicated. It's better to have gaps in the routing - # tables momentarily because unroutable messages are stored for retry. - for a,b in to_delete: - self.container.router_adapter.remote_unbind(a, b) - for a,b in to_add: - self.container.router_adapter.remote_bind(a, b) - - self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class) - for a,b in new_table: - self.container.log(LOG_INFO, " %s => %s" % (a, b)) - - diff --git a/qpid/extras/dispatch/src/py/router/binding.py b/qpid/extras/dispatch/src/py/router/binding.py deleted file mode 100644 index cd0641d12f..0000000000 --- a/qpid/extras/dispatch/src/py/router/binding.py +++ /dev/null @@ -1,136 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from stubs import * - - -class BindingEngine(object): - """ - This module is responsible for responding to two different events: - 1) The learning of new remote mobile addresses - 2) The change of topology (i.e. different next-hops for remote routers) - When these occur, this module converts the mobile routing table (address => router) - to a next-hop routing table (address => next-hop), compresses the keys in case there - are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.current_keys = {} - - - def tick(self, now): - pass - - - def mobile_keys_changed(self, keys): - self.current_keys = keys - next_hop_keys = self._convert_ids_to_next_hops(keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def next_hops_changed(self): - next_hop_keys = self._convert_ids_to_next_hops(self.current_keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def _convert_ids_to_next_hops(self, keys): - next_hops = self.container.get_next_hops() - new_keys = {} - for _id, value in keys.items(): - if _id in next_hops: - next_hop = next_hops[_id] - if next_hop not in new_keys: - new_keys[next_hop] = [] - new_keys[next_hop].extend(value) - return new_keys - - - def _compress_keys(self, keys): - trees = {} - for _id, key_list in keys.items(): - trees[_id] = TopicElementList() - for key in key_list: - trees[_id].add_key(key) - routing_table = [] - for _id, tree in trees.items(): - tree_keys = tree.get_list() - for tk in tree_keys: - routing_table.append((tk, _id)) - return routing_table - - -class TopicElementList(object): - """ - """ - def __init__(self): - self.elements = {} # map text => (terminal, sub-list) - - def __repr__(self): - return "%r" % self.elements - - def add_key(self, key): - self.add_tokens(key.split('.')) - - def add_tokens(self, tokens): - first = tokens.pop(0) - terminal = len(tokens) == 0 - - if terminal and first == '#': - ## Optimization #1A (A.B.C.D followed by A.B.#) - self.elements = {'#':(True, TopicElementList())} - return - - if '#' in self.elements: - _t,_el = self.elements['#'] - if _t: - ## Optimization #1B (A.B.# followed by A.B.C.D) - return - - if first not in self.elements: - self.elements[first] = (terminal, TopicElementList()) - else: - _t,_el = self.elements[first] - if terminal and not _t: - self.elements[first] = (terminal, _el) - - if not terminal: - _t,_el = self.elements[first] - _el.add_tokens(tokens) - - def get_list(self): - keys = [] - for token, (_t,_el) in self.elements.items(): - if _t: keys.append(token) - _el.build_list(token, keys) - return keys - - def build_list(self, prefix, keys): - for token, (_t,_el) in self.elements.items(): - if _t: keys.append("%s.%s" % (prefix, token)) - _el.build_list("%s.%s" % (prefix, token), keys) - - - diff --git a/qpid/extras/dispatch/src/py/router/configuration.py b/qpid/extras/dispatch/src/py/router/configuration.py deleted file mode 100644 index f87d2ee7d2..0000000000 --- a/qpid/extras/dispatch/src/py/router/configuration.py +++ /dev/null @@ -1,47 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -class Configuration(object): - """ - This module manages and holds the configuration and tuning parameters for a router. - """ - def __init__(self, overrides={}): - ## - ## Load default values - ## - self.values = { 'hello_interval' : 1.0, - 'hello_max_age' : 3.0, - 'ra_interval' : 30.0, - 'remote_ls_max_age' : 60.0, - 'mobile_addr_max_age' : 60.0 } - - ## - ## Apply supplied overrides - ## - for k, v in overrides.items(): - self.values[k] = v - - def __getattr__(self, key): - if key in self.values: - return self.values[key] - raise KeyError - - def __repr__(self): - return "%r" % self.values - diff --git a/qpid/extras/dispatch/src/py/router/data.py b/qpid/extras/dispatch/src/py/router/data.py deleted file mode 100644 index 79d6a0d0fe..0000000000 --- a/qpid/extras/dispatch/src/py/router/data.py +++ /dev/null @@ -1,275 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -try: - from dispatch import * -except ImportError: - from stubs import * - - -def getMandatory(data, key, cls=None): - """ - Get the value mapped to the requested key. If it's not present, raise an exception. - """ - if key in data: - value = data[key] - if cls and value.__class__ != cls: - raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) - return value - raise Exception("Mandatory protocol field missing: '%s'" % key) - - -def getOptional(data, key, default=None, cls=None): - """ - Get the value mapped to the requested key. If it's not present, return the default value. - """ - if key in data: - value = data[key] - if cls and value.__class__ != cls: - raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) - return value - return default - - -class LinkState(object): - """ - The link-state of a single router. The link state consists of a list of neighbor routers reachable from - the reporting router. The link-state-sequence number is incremented each time the link state changes. - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None): - self.last_seen = 0 - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.peers = getMandatory(body, 'peers', list) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.peers = _peers - - def __repr__(self): - return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'peers' : self.peers} - - def add_peer(self, _id): - if self.peers.count(_id) == 0: - self.peers.append(_id) - return True - return False - - def del_peer(self, _id): - if self.peers.count(_id) > 0: - self.peers.remove(_id) - return True - return False - - def bump_sequence(self): - self.ls_seq += 1 - - -class MessageHELLO(object): - """ - HELLO Message - scope: neighbors only - HELLO messages travel at most one hop - This message is used by directly connected routers to determine with whom they have - bidirectional connectivity. - """ - def __init__(self, body, _id=None, _area=None, _seen_peers=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.seen_peers = getMandatory(body, 'seen', list) - else: - self.id = _id - self.area = _area - self.seen_peers = _seen_peers - - def __repr__(self): - return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers) - - def get_opcode(self): - return 'HELLO' - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'seen' : self.seen_peers} - - def is_seen(self, _id): - return self.seen_peers.count(_id) > 0 - - -class MessageRA(object): - """ - Router Advertisement (RA) Message - scope: all routers in the area and all designated routers - This message is sent periodically to indicate the originating router's sequence numbers - for link-state and mobile-address-state. - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.mobile_seq = getMandatory(body, 'mobile_seq', long) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.mobile_seq = long(_mobile_seq) - - def get_opcode(self): - return 'RA' - - def __repr__(self): - return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \ - (self.id, self.area, self.ls_seq, self.mobile_seq) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'mobile_seq' : self.mobile_seq} - - -class MessageLSU(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.ls = LinkState(getMandatory(body, 'ls', dict)) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.ls = _ls - - def get_opcode(self): - return 'LSU' - - def __repr__(self): - return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \ - (self.id, self.area, self.ls_seq, self.ls) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'ls' : self.ls.to_dict()} - - -class MessageLSR(object): - """ - """ - def __init__(self, body, _id=None, _area=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - else: - self.id = _id - self.area = _area - - def get_opcode(self): - return 'LSR' - - def __repr__(self): - return "LSR(id=%s area=%s)" % (self.id, self.area) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area} - - -class MessageMAU(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.mobile_seq = getMandatory(body, 'mobile_seq', long) - self.add_list = getOptional(body, 'add', None, list) - self.del_list = getOptional(body, 'del', None, list) - self.exist_list = getOptional(body, 'exist', None, list) - else: - self.id = _id - self.area = _area - self.mobile_seq = long(_seq) - self.add_list = _add_list - self.del_list = _del_list - self.exist_list = _exist_list - - def get_opcode(self): - return 'MAU' - - def __repr__(self): - _add = '' - _del = '' - _exist = '' - if self.add_list: _add = ' add=%r' % self.add_list - if self.del_list: _del = ' del=%r' % self.del_list - if self.exist_list: _exist = ' exist=%r' % self.exist_list - return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \ - (self.id, self.area, self.mobile_seq, _add, _del, _exist) - - def to_dict(self): - body = { 'id' : self.id, - 'area' : self.area, - 'mobile_seq' : self.mobile_seq } - if self.add_list: body['add'] = self.add_list - if self.del_list: body['del'] = self.del_list - if self.exist_list: body['exist'] = self.exist_list - return body - - -class MessageMAR(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _have_seq=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.have_seq = getMandatory(body, 'have_seq', long) - else: - self.id = _id - self.area = _area - self.have_seq = long(_have_seq) - - def get_opcode(self): - return 'MAR' - - def __repr__(self): - return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'have_seq' : self.have_seq} - diff --git a/qpid/extras/dispatch/src/py/router/link.py b/qpid/extras/dispatch/src/py/router/link.py deleted file mode 100644 index 1e06d161f6..0000000000 --- a/qpid/extras/dispatch/src/py/router/link.py +++ /dev/null @@ -1,140 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from data import MessageRA, MessageLSU, MessageLSR -from time import time - -try: - from dispatch import * -except ImportError: - from stubs import * - -class LinkStateEngine(object): - """ - This module is responsible for running the Link State protocol and maintaining the set - of link states that are gathered from the domain. It notifies outbound when changes to - the link-state-collection are detected. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.ra_interval = self.container.config.ra_interval - self.remote_ls_max_age = self.container.config.remote_ls_max_age - self.last_ra_time = 0 - self.collection = {} - self.collection_changed = False - self.mobile_seq = 0 - self.needed_lsrs = {} - - - def tick(self, now): - self._expire_ls(now) - self._send_lsrs() - - if now - self.last_ra_time >= self.ra_interval: - self.last_ra_time = now - self._send_ra() - - if self.collection_changed: - self.collection_changed = False - self.container.log(LOG_INFO, "New Link-State Collection:") - for a,b in self.collection.items(): - self.container.log(LOG_INFO, " %s => %r" % (a, b.peers)) - self.container.ls_collection_changed(self.collection) - - - def handle_ra(self, msg, now): - if msg.id == self.id: - return - if msg.id in self.collection: - ls = self.collection[msg.id] - ls.last_seen = now - if ls.ls_seq < msg.ls_seq: - self.needed_lsrs[(msg.area, msg.id)] = None - else: - self.needed_lsrs[(msg.area, msg.id)] = None - - - def handle_lsu(self, msg, now): - if msg.id == self.id: - return - if msg.id in self.collection: - ls = self.collection[msg.id] - if ls.ls_seq < msg.ls_seq: - ls = msg.ls - self.collection[msg.id] = ls - self.collection_changed = True - ls.last_seen = now - else: - ls = msg.ls - self.collection[msg.id] = ls - self.collection_changed = True - ls.last_seen = now - self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id) - # Schedule LSRs for any routers referenced in this LS that we don't know about - for _id in msg.ls.peers: - if _id not in self.collection: - self.needed_lsrs[(msg.area, _id)] = None - - - def handle_lsr(self, msg, now): - if msg.id == self.id: - return - if self.id not in self.collection: - return - my_ls = self.collection[self.id] - self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) - - - def new_local_link_state(self, link_state): - self.collection[self.id] = link_state - self.collection_changed = True - self._send_ra() - - def set_mobile_sequence(self, seq): - self.mobile_seq = seq - - - def get_collection(self): - return self.collection - - - def _expire_ls(self, now): - to_delete = [] - for key, ls in self.collection.items(): - if key != self.id and now - ls.last_seen > self.remote_ls_max_age: - to_delete.append(key) - for key in to_delete: - ls = self.collection.pop(key) - self.collection_changed = True - self.container.log(LOG_INFO, "Expired link-state from router: %s" % key) - - - def _send_lsrs(self): - for (_area, _id) in self.needed_lsrs.keys(): - self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area)) - self.needed_lsrs = {} - - - def _send_ra(self): - ls_seq = 0 - if self.id in self.collection: - ls_seq = self.collection[self.id].ls_seq - self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) diff --git a/qpid/extras/dispatch/src/py/router/mobile.py b/qpid/extras/dispatch/src/py/router/mobile.py deleted file mode 100644 index acea759e37..0000000000 --- a/qpid/extras/dispatch/src/py/router/mobile.py +++ /dev/null @@ -1,188 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from data import MessageRA, MessageMAR, MessageMAU - -try: - from dispatch import * -except ImportError: - from stubs import * - -class MobileAddressEngine(object): - """ - This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. - It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses. - Note that this routing table maps from the mobile address to the remote router where that address - is directly bound. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.mobile_addr_max_age = self.container.config.mobile_addr_max_age - self.mobile_seq = 0 - self.local_keys = [] - self.added_keys = [] - self.deleted_keys = [] - self.remote_lists = {} # map router_id => (sequence, list of keys) - self.remote_last_seen = {} # map router_id => time of last seen advertizement/update - self.remote_changed = False - self.needed_mars = {} - - - def tick(self, now): - self._expire_remotes(now) - self._send_mars() - - ## - ## If local keys have changed, collect the changes and send a MAU with the diffs - ## Note: it is important that the differential-MAU be sent before a RA is sent - ## - if len(self.added_keys) > 0 or len(self.deleted_keys) > 0: - self.mobile_seq += 1 - self.container.send('_topo.%s.all' % self.area, - MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys)) - self.local_keys.extend(self.added_keys) - for key in self.deleted_keys: - self.local_keys.remove(key) - self.added_keys = [] - self.deleted_keys = [] - self.container.mobile_sequence_changed(self.mobile_seq) - - ## - ## If remotes have changed, start the process of updating local bindings - ## - if self.remote_changed: - self.remote_changed = False - self._update_remote_keys() - - - def add_local_address(self, key): - """ - """ - if self.local_keys.count(key) == 0: - if self.added_keys.count(key) == 0: - self.added_keys.append(key) - else: - if self.deleted_keys.count(key) > 0: - self.deleted_keys.remove(key) - - - def del_local_address(self, key): - """ - """ - if self.local_keys.count(key) > 0: - if self.deleted_keys.count(key) == 0: - self.deleted_keys.append(key) - else: - if self.added_keys.count(key) > 0: - self.added_keys.remove(key) - - - def handle_ra(self, msg, now): - if msg.id == self.id: - return - - if msg.mobile_seq == 0: - return - - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - self.remote_last_seen[msg.id] = now - if _seq < msg.mobile_seq: - self.needed_mars[(msg.id, msg.area, _seq)] = None - else: - self.needed_mars[(msg.id, msg.area, 0)] = None - - - def handle_mau(self, msg, now): - ## - ## If the MAU is differential, we can only use it if its sequence is exactly one greater - ## than our stored sequence. If not, we will ignore the content and schedule a MAR. - ## - ## If the MAU is absolute, we can use it in all cases. - ## - if msg.id == self.id: - return - - if msg.exist_list: - ## - ## Absolute MAU - ## - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - if _seq >= msg.mobile_seq: # ignore duplicates - return - self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list) - self.remote_last_seen[msg.id] = now - self.remote_changed = True - else: - ## - ## Differential MAU - ## - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - if _seq == msg.mobile_seq: # ignore duplicates - return - self.remote_last_seen[msg.id] = now - if _seq + 1 == msg.mobile_seq: - ## - ## This is one greater than our stored value, incorporate the deltas - ## - if msg.add_list and msg.add_list.__class__ == list: - _list.extend(msg.add_list) - if msg.del_list and msg.del_list.__class__ == list: - for key in msg.del_list: - _list.remove(key) - self.remote_lists[msg.id] = (msg.mobile_seq, _list) - self.remote_changed = True - else: - self.needed_mars[(msg.id, msg.area, _seq)] = None - else: - self.needed_mars[(msg.id, msg.area, 0)] = None - - - def handle_mar(self, msg, now): - if msg.id == self.id: - return - if msg.have_seq < self.mobile_seq: - self.container.send('_topo.%s.%s' % (msg.area, msg.id), - MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys)) - - - def _update_remote_keys(self): - keys = {} - for _id,(seq,key_list) in self.remote_lists.items(): - keys[_id] = key_list - self.container.mobile_keys_changed(keys) - - - def _expire_remotes(self, now): - for _id, t in self.remote_last_seen.items(): - if now - t > self.mobile_addr_max_age: - self.remote_lists.pop(_id) - self.remote_last_seen.pop(_id) - self.remote_changed = True - - - def _send_mars(self): - for _id, _area, _seq in self.needed_mars.keys(): - self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) - self.needed_mars = {} - diff --git a/qpid/extras/dispatch/src/py/router/neighbor.py b/qpid/extras/dispatch/src/py/router/neighbor.py deleted file mode 100644 index 55c6bab62f..0000000000 --- a/qpid/extras/dispatch/src/py/router/neighbor.py +++ /dev/null @@ -1,83 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from data import LinkState, MessageHELLO -from time import time - -try: - from dispatch import * -except ImportError: - from stubs import * - - -class NeighborEngine(object): - """ - This module is responsible for maintaining this router's link-state. It runs the HELLO protocol - with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the - link-state) changes. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.last_hello_time = 0.0 - self.hello_interval = container.config.hello_interval - self.hello_max_age = container.config.hello_max_age - self.hellos = {} - self.link_state_changed = False - self.link_state = LinkState(None, self.id, self.area, 0, []) - - - def tick(self, now): - self._expire_hellos(now) - - if now - self.last_hello_time >= self.hello_interval: - self.last_hello_time = now - self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys())) - - if self.link_state_changed: - self.link_state_changed = False - self.link_state.bump_sequence() - self.container.local_link_state_changed(self.link_state) - - - def handle_hello(self, msg, now): - if msg.id == self.id: - return - self.hellos[msg.id] = now - if msg.is_seen(self.id): - if self.link_state.add_peer(msg.id): - self.link_state_changed = True - self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id) - ## - ## TODO - Use this function to detect area boundaries - ## - - def _expire_hellos(self, now): - to_delete = [] - for key, last_seen in self.hellos.items(): - if now - last_seen > self.hello_max_age: - to_delete.append(key) - for key in to_delete: - self.hellos.pop(key) - if self.link_state.del_peer(key): - self.link_state_changed = True - self.container.log(LOG_INFO, "Neighbor lost: %s" % key) - - diff --git a/qpid/extras/dispatch/src/py/router/path.py b/qpid/extras/dispatch/src/py/router/path.py deleted file mode 100644 index 3be6c40608..0000000000 --- a/qpid/extras/dispatch/src/py/router/path.py +++ /dev/null @@ -1,202 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from stubs import * - -class PathEngine(object): - """ - This module is responsible for computing the next-hop for every router/area in the domain - based on the collection of link states that have been gathered. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.recalculate = False - self.collection = None - - - def tick(self, now_unused): - if self.recalculate: - self.recalculate = False - self._calculate_routes() - - - def ls_collection_changed(self, collection): - self.recalculate = True - self.collection = collection - - - def _calculate_tree_from_root(self, root): - ## - ## Make a copy of the current collection of link-states that contains - ## an empty link-state for nodes that are known-peers but are not in the - ## collection currently. This is needed to establish routes to those nodes - ## so we can trade link-state information with them. - ## - link_states = {} - for _id, ls in self.collection.items(): - link_states[_id] = ls.peers - for p in ls.peers: - if p not in link_states: - link_states[p] = [] - - ## - ## Setup Dijkstra's Algorithm - ## - cost = {} - prev = {} - for _id in link_states: - cost[_id] = None # infinite - prev[_id] = None # undefined - cost[root] = 0 # no cost to the root node - unresolved = NodeSet(cost) - - ## - ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found. - ## - while not unresolved.empty(): - u = unresolved.lowest_cost() - if cost[u] == None: - # There are no more reachable nodes in unresolved - break - for v in link_states[u]: - if unresolved.contains(v): - alt = cost[u] + 1 # TODO - Use link cost instead of 1 - if cost[v] == None or alt < cost[v]: - cost[v] = alt - prev[v] = u - unresolved.set_cost(v, alt) - - ## - ## Remove unreachable nodes from the map. Note that this will also remove the - ## root node (has no previous node) from the map. - ## - for u, val in prev.items(): - if not val: - prev.pop(u) - - ## - ## Return previous-node map. This is a map of all reachable, remote nodes to - ## their predecessor node. - ## - return prev - - - def _calculate_routes(self): - ## - ## Generate the shortest-path tree with the local node as root - ## - prev = self._calculate_tree_from_root(self.id) - nodes = prev.keys() - - ## - ## Distill the path tree into a map of next hops for each node - ## - next_hops = {} - while len(nodes) > 0: - u = nodes[0] # pick any destination - path = [u] - nodes.remove(u) - v = prev[u] - while v != self.id: # build a list of nodes in the path back to the root - if v in nodes: - path.append(v) - nodes.remove(v) - u = v - v = prev[u] - for w in path: # mark each node in the path as reachable via the next hop - next_hops[w] = u - - ## - ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest - ## for which the path from origin to dest passes through us. This is the set - ## of valid origins for forwarding to the destination. - ## - - self.container.next_hops_changed(next_hops) - - -class NodeSet(object): - """ - This data structure is an ordered list of node IDs, sorted in increasing order by their cost. - Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and - repeatable ordering. - """ - def __init__(self, cost_map): - self.nodes = [] - for _id, cost in cost_map.items(): - ## - ## Assume that nodes are either unreachable (cost = None) or local (cost = 0) - ## during this initialization. - ## - if cost == 0: - self.nodes.insert(0, (_id, cost)) - else: - ## - ## There is no need to sort unreachable nodes by ID - ## - self.nodes.append((_id, cost)) - - - def __repr__(self): - return self.nodes.__repr__() - - - def empty(self): - return len(self.nodes) == 0 - - - def contains(self, _id): - for a, b in self.nodes: - if a == _id: - return True - return False - - - def lowest_cost(self): - """ - Remove and return the lowest cost node ID. - """ - _id, cost = self.nodes.pop(0) - return _id - - - def set_cost(self, _id, new_cost): - """ - Set the cost for an ID in the NodeSet and re-insert the ID so that the list - remains sorted in increasing cost order. - """ - index = 0 - for i, c in self.nodes: - if i == _id: - break - index += 1 - self.nodes.pop(index) - - index = 0 - for i, c in self.nodes: - if c == None or new_cost < c or (new_cost == c and _id < i): - break - index += 1 - - self.nodes.insert(index, (_id, new_cost)) diff --git a/qpid/extras/dispatch/src/py/router/router_engine.py b/qpid/extras/dispatch/src/py/router/router_engine.py deleted file mode 100644 index 065204ad62..0000000000 --- a/qpid/extras/dispatch/src/py/router/router_engine.py +++ /dev/null @@ -1,255 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from time import time -from uuid import uuid4 - -from configuration import Configuration -from data import * -from neighbor import NeighborEngine -from link import LinkStateEngine -from path import PathEngine -from mobile import MobileAddressEngine -from routing import RoutingTableEngine -from binding import BindingEngine -from adapter import AdapterEngine - -## -## Import the Dispatch adapters from the environment. If they are not found -## (i.e. we are in a test bench, etc.), load the stub versions. -## -try: - from dispatch import * -except ImportError: - from stubs import * - - -class RouterEngine: - """ - """ - - def __init__(self, router_adapter, router_id=None, area='area', config_override={}): - """ - Initialize an instance of a router for a domain. - """ - ## - ## Record important information about this router instance - ## - self.domain = "domain" - self.router_adapter = router_adapter - self.log_adapter = LogAdapter("dispatch.router") - self.io_adapter = IoAdapter(self, "qdxrouter") - - if router_id: - self.id = router_id - else: - self.id = str(uuid4()) - self.area = area - self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id)) - - ## - ## Setup configuration - ## - self.config = Configuration(config_override) - self.log(LOG_INFO, "Config: %r" % self.config) - - ## - ## Launch the sub-module engines - ## - self.neighbor_engine = NeighborEngine(self) - self.link_state_engine = LinkStateEngine(self) - self.path_engine = PathEngine(self) - self.mobile_address_engine = MobileAddressEngine(self) - self.routing_table_engine = RoutingTableEngine(self) - self.binding_engine = BindingEngine(self) - self.adapter_engine = AdapterEngine(self) - - - - ##======================================================================================== - ## Adapter Entry Points - invoked from the adapter - ##======================================================================================== - def getId(self): - """ - Return the router's ID - """ - return self.id - - - def addLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.add_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) - - def delLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.del_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) - - - def handleTimerTick(self): - """ - """ - try: - now = time() - self.neighbor_engine.tick(now) - self.link_state_engine.tick(now) - self.path_engine.tick(now) - self.mobile_address_engine.tick(now) - self.routing_table_engine.tick(now) - self.binding_engine.tick(now) - self.adapter_engine.tick(now) - except Exception, e: - self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) - - - def handleControlMessage(self, opcode, body): - """ - """ - try: - now = time() - if opcode == 'HELLO': - msg = MessageHELLO(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.neighbor_engine.handle_hello(msg, now) - - elif opcode == 'RA': - msg = MessageRA(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_ra(msg, now) - self.mobile_address_engine.handle_ra(msg, now) - - elif opcode == 'LSU': - msg = MessageLSU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsu(msg, now) - - elif opcode == 'LSR': - msg = MessageLSR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsr(msg, now) - - elif opcode == 'MAU': - msg = MessageMAU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mau(msg, now) - - elif opcode == 'MAR': - msg = MessageMAR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mar(msg, now) - - except Exception, e: - self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) - - - def receive(self, message_properties, body): - """ - This is the IoAdapter message-receive handler - """ - try: - self.handleControlMessage(message_properties['opcode'], body) - except Exception, e: - self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % - (message_properties, body, e)) - - def getRouterData(self, kind): - """ - """ - if kind == 'help': - return { 'help' : "Get list of supported values for kind", - 'link-state' : "This router's link state", - 'link-state-set' : "The set of link states from known routers", - 'next-hops' : "Next hops to each known router", - 'topo-table' : "Topological routing table", - 'mobile-table' : "Mobile key routing table" - } - if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() - if kind == 'next-hops' : return self.routing_table_engine.next_hops - if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} - if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} - if kind == 'link-state-set' : - copy = {} - for _id,_ls in self.link_state_engine.collection.items(): - copy[_id] = _ls.to_dict() - return copy - - return {'notice':'Use kind="help" to get a list of possibilities'} - - - ##======================================================================================== - ## Adapter Calls - outbound calls to Dispatch - ##======================================================================================== - def log(self, level, text): - """ - Emit a log message to the host's event log - """ - self.log_adapter.log(level, text) - - - def send(self, dest, msg): - """ - Send a control message to another router. - """ - app_props = {'opcode' : msg.get_opcode() } - self.io_adapter.send(dest, app_props, msg.to_dict()) - self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) - - - ##======================================================================================== - ## Interconnect between the Sub-Modules - ##======================================================================================== - def local_link_state_changed(self, link_state): - self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) - self.link_state_engine.new_local_link_state(link_state) - - def ls_collection_changed(self, collection): - self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) - self.path_engine.ls_collection_changed(collection) - - def next_hops_changed(self, next_hop_table): - self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) - self.routing_table_engine.next_hops_changed(next_hop_table) - self.binding_engine.next_hops_changed() - - def mobile_sequence_changed(self, mobile_seq): - self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) - self.link_state_engine.set_mobile_sequence(mobile_seq) - - def mobile_keys_changed(self, keys): - self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) - self.binding_engine.mobile_keys_changed(keys) - - def get_next_hops(self): - return self.routing_table_engine.get_next_hops() - - def remote_routes_changed(self, key_class, routes): - self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) - self.adapter_engine.remote_routes_changed(key_class, routes) - diff --git a/qpid/extras/dispatch/src/py/router/routing.py b/qpid/extras/dispatch/src/py/router/routing.py deleted file mode 100644 index 0d4ceed955..0000000000 --- a/qpid/extras/dispatch/src/py/router/routing.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from stubs import * - -class RoutingTableEngine(object): - """ - This module is responsible for converting the set of next hops to remote routers to a routing - table in the "topological" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.next_hops = {} - - - def tick(self, now): - pass - - - def next_hops_changed(self, next_hops): - # Convert next_hops into routing table - self.next_hops = next_hops - new_table = [] - for _id, next_hop in next_hops.items(): - new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) - pair = ('_topo.%s.all' % (self.area), next_hop) - if new_table.count(pair) == 0: - new_table.append(pair) - - self.container.remote_routes_changed('topological', new_table) - - - def get_next_hops(self): - return self.next_hops - diff --git a/qpid/extras/dispatch/src/py/stubs/__init__.py b/qpid/extras/dispatch/src/py/stubs/__init__.py deleted file mode 100644 index 98180eadee..0000000000 --- a/qpid/extras/dispatch/src/py/stubs/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from stubs.logadapter import * -from stubs.ioadapter import * - diff --git a/qpid/extras/dispatch/src/py/stubs/ioadapter.py b/qpid/extras/dispatch/src/py/stubs/ioadapter.py deleted file mode 100644 index 1e465f98c3..0000000000 --- a/qpid/extras/dispatch/src/py/stubs/ioadapter.py +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -class IoAdapter: - def __init__(self, handler, address): - self.handler = handler - self.address = address - - def send(self, address, app_properties, body): - print "IO: send(addr=%s props=%r body=%r" % (address, app_properties, body) - diff --git a/qpid/extras/dispatch/src/py/stubs/logadapter.py b/qpid/extras/dispatch/src/py/stubs/logadapter.py deleted file mode 100644 index 3d717d3ed2..0000000000 --- a/qpid/extras/dispatch/src/py/stubs/logadapter.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -LOG_TRACE = 1 -LOG_DEBUG = 2 -LOG_INFO = 4 -LOG_NOTICE = 8 -LOG_WARNING = 16 -LOG_ERROR = 32 -LOG_CRITICAL = 64 - -class LogAdapter: - def __init__(self, mod_name): - self.mod_name = mod_name - - def log(self, level, text): - print "LOG: mod=%s level=%d text=%s" % (self.mod_name, level, text) diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c index 6b5250a34a..0b0cc11025 100644 --- a/qpid/extras/dispatch/src/python_embedded.c +++ b/qpid/extras/dispatch/src/python_embedded.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = { typedef struct { PyObject_HEAD PyObject *handler; + PyObject *handler_rx_call; dx_dispatch_t *dx; dx_address_t *address; } IoAdapter; @@ -403,9 +404,65 @@ typedef struct { static void dx_io_rx_handler(void *context, dx_message_t *msg) { - //IoAdapter *self = (IoAdapter*) context; + IoAdapter *self = (IoAdapter*) context; + + // + // Parse the message through the body and exit if the message is not well formed. + // + if (!dx_message_check(msg, DX_DEPTH_BODY)) + return; + + // + // Get an iterator for the application-properties. Exit if the message has none. + // + dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES); + if (ap == 0) + return; + + // + // Try to get a map-view of the application-properties. + // + dx_parsed_field_t *ap_map = dx_parse(ap); + if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) { + dx_field_iterator_free(ap); + dx_parse_free(ap_map); + return; + } + + // + // Get an iterator for the body. Exit if the message has none. + // + dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY); + if (body == 0) { + dx_field_iterator_free(ap); + dx_parse_free(ap_map); + return; + } + + // + // Try to get a map-view of the body. + // + dx_parsed_field_t *body_map = dx_parse(body); + if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) { + dx_field_iterator_free(ap); + dx_field_iterator_free(body); + dx_parse_free(ap_map); + dx_parse_free(body_map); + return; + } + + PyObject *pAP = dx_field_to_py(ap_map); + PyObject *pBody = dx_field_to_py(body_map); - // TODO - Parse the incoming message and send it to the python handler. + PyObject *pArgs = PyTuple_New(2); + PyTuple_SetItem(pArgs, 0, pAP); + PyTuple_SetItem(pArgs, 1, pBody); + + PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs); + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } } @@ -415,9 +472,14 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) if (!PyArg_ParseTuple(args, "Os", &self->handler, &address)) return -1; + self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive"); + if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call)) + return -1; + Py_INCREF(self->handler); + Py_INCREF(self->handler_rx_call); self->dx = dispatch; - self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self); + self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self); return 0; } @@ -426,24 +488,35 @@ static void IoAdapter_dealloc(IoAdapter* self) { dx_router_unregister_address(self->address); Py_DECREF(self->handler); + Py_DECREF(self->handler_rx_call); self->ob_type->tp_free((PyObject*)self); } static PyObject* dx_python_send(PyObject *self, PyObject *args) { - IoAdapter *ioa = (IoAdapter*) self; - const char *address; - PyObject *app_properties; - PyObject *body; + IoAdapter *ioa = (IoAdapter*) self; + dx_composed_field_t *field = 0; + const char *address; + PyObject *app_properties; + PyObject *body; + if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body)) return 0; - dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0); + field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field); + dx_compose_start_map(field); + + dx_compose_insert_string(field, "qdx.ingress"); + dx_compose_insert_string(field, dx_router_id(ioa->dx)); + + dx_compose_insert_string(field, "qdx.trace"); dx_compose_start_list(field); - dx_compose_insert_bool(field, 0); // durable + dx_compose_insert_string(field, dx_router_id(ioa->dx)); dx_compose_end_list(field); + dx_compose_end_map(field); + field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field); dx_compose_start_list(field); dx_compose_insert_null(field); // message-id diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 235c381d20..d2704c4bd5 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,7 @@ #include <qpid/dispatch/python_embedded.h> #include <stdio.h> #include <string.h> +#include <stdbool.h> #include <qpid/dispatch.h> #include "dispatch_private.h" @@ -28,123 +29,290 @@ static char *module = "ROUTER"; static void dx_router_python_setup(dx_router_t *router); static void dx_pyrouter_tick(dx_router_t *router); -//static char *local_prefix = "_local/"; -//static char *topo_prefix = "_topo/"; +static char *router_address = "_local/qdxrouter"; +static char *local_prefix = "_local/"; +//static char *topo_prefix = "_topo/"; /** * Address Types and Processing: * - * Address Hash Key onReceive onEmit - * ============================================================================= - * _local/<local> L<local> handler forward - * _topo/<area>/<router>/<local> A<area> forward forward - * _topo/<my-area>/<router>/<local> R<router> forward forward - * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward - * _topo/<area>/all/<local> A<area> forward forward - * _topo/<my-area>/all/<local> L<local> forward+handler forward - * _topo/all/all/<local> L<local> forward+handler forward - * <mobile> M<mobile> forward+handler forward + * Address Hash Key onReceive + * =================================================================== + * _local/<local> L<local> handler + * _topo/<area>/<router>/<local> A<area> forward + * _topo/<my-area>/<router>/<local> R<router> forward + * _topo/<my-area>/<my-router>/<local> L<local> handler + * _topo/<area>/all/<local> A<area> forward + * _topo/<my-area>/all/<local> L<local> forward handler + * _topo/all/all/<local> L<local> forward handler + * <mobile> M<mobile> forward handler */ -struct dx_router_t { - dx_dispatch_t *dx; - const char *router_area; - const char *router_id; - dx_node_t *node; - dx_link_list_t in_links; - dx_link_list_t out_links; - dx_message_list_t in_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - hash_t *out_hash; - uint64_t dtag; - PyObject *pyRouter; - PyObject *pyTick; -}; +typedef struct dx_router_link_t dx_router_link_t; +typedef struct dx_router_node_t dx_router_node_t; -typedef struct { - dx_link_t *link; - dx_message_list_t out_fifo; -} dx_router_link_t; + +typedef enum { + DX_LINK_ENDPOINT, // A link to a connected endpoint + DX_LINK_ROUTER, // A link to a peer router in the same area + DX_LINK_AREA // A link to a peer router in a different area (area boundary) +} dx_link_type_t; + + +typedef struct dx_routed_event_t { + DEQ_LINKS(struct dx_routed_event_t); + dx_delivery_t *delivery; + dx_message_t *message; + bool settle; + uint64_t disposition; +} dx_routed_event_t; + +ALLOC_DECLARE(dx_routed_event_t); +ALLOC_DEFINE(dx_routed_event_t); +DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); + + +struct dx_router_link_t { + DEQ_LINKS(dx_router_link_t); + dx_direction_t link_direction; + dx_link_type_t link_type; + dx_address_t *owning_addr; // [ref] Address record that owns this link + dx_link_t *link; // [own] Link pointer + dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link + dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link + dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) + dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries +}; ALLOC_DECLARE(dx_router_link_t); ALLOC_DEFINE(dx_router_link_t); +DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); - -typedef struct { +struct dx_router_node_t { + DEQ_LINKS(dx_router_node_t); const char *id; - dx_router_link_t *next_hop; + dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node + dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node // list of valid origins (pointers to router_node) - (bit masks?) -} dx_router_node_t; +}; ALLOC_DECLARE(dx_router_node_t); ALLOC_DEFINE(dx_router_node_t); +DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); struct dx_address_t { - int is_local; - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; - dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list - dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list + dx_router_message_cb handler; // In-Process Consumer + void *handler_context; + dx_router_link_list_t rlinks; // Locally-Connected Consumers + dx_router_node_list_t rnodes; // Remotely-Connected Consumers }; ALLOC_DECLARE(dx_address_t); ALLOC_DEFINE(dx_address_t); +struct dx_router_t { + dx_dispatch_t *dx; + const char *router_area; + const char *router_id; + dx_node_t *node; + dx_router_link_list_t in_links; + dx_router_node_list_t routers; + dx_message_list_t in_fifo; + sys_mutex_t *lock; + dx_timer_t *timer; + hash_t *out_hash; + uint64_t dtag; + PyObject *pyRouter; + PyObject *pyTick; +}; + + /** - * Outbound Delivery Handler + * Outgoing Link Writable Handler */ -static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +static int router_writable_link_handler(void* context, dx_link_t *link) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = pn_delivery_link(delivery); - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - dx_message_t *msg; - size_t size; + dx_router_t *router = (dx_router_t*) context; + dx_delivery_t *delivery; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + pn_link_t *pn_link = dx_link_pn(link); + uint64_t tag; + int link_credit = pn_link_credit(pn_link); + dx_routed_event_list_t to_send; + dx_routed_event_list_t events; + dx_routed_event_t *re; + size_t offer; + int event_count = 0; + + DEQ_INIT(to_send); + DEQ_INIT(events); sys_mutex_lock(router->lock); - msg = DEQ_HEAD(rlink->out_fifo); - if (!msg) { - // TODO - Recind the delivery - sys_mutex_unlock(router->lock); - return; + + // + // Pull the non-delivery events into a local list so they can be processed without + // the lock being held. + // + re = DEQ_HEAD(rlink->event_fifo); + while (re) { + DEQ_REMOVE_HEAD(rlink->event_fifo); + DEQ_INSERT_TAIL(events, re); + re = DEQ_HEAD(rlink->event_fifo); } - DEQ_REMOVE_HEAD(rlink->out_fifo); - size = (DEQ_SIZE(rlink->out_fifo)); + // + // Under lock, move available deliveries from the msg_fifo to the local to_send + // list. Don't move more than we have credit to send. + // + if (link_credit > 0) { + tag = router->dtag; + re = DEQ_HEAD(rlink->msg_fifo); + while (re) { + DEQ_REMOVE_HEAD(rlink->msg_fifo); + DEQ_INSERT_TAIL(to_send, re); + if (DEQ_SIZE(to_send) == link_credit) + break; + re = DEQ_HEAD(rlink->msg_fifo); + } + router->dtag += DEQ_SIZE(to_send); + } + + offer = DEQ_SIZE(rlink->msg_fifo); sys_mutex_unlock(router->lock); - dx_message_send(msg, pn_link); + // + // Deliver all the to_send messages downrange + // + re = DEQ_HEAD(to_send); + while (re) { + DEQ_REMOVE_HEAD(to_send); + + // + // Get a delivery for the send. This will be the current deliver on the link. + // + tag++; + delivery = dx_delivery(link, pn_dtag((char*) &tag, 8)); + + // + // Send the message + // + dx_message_send(re->message, link); + + // + // If there is an incoming delivery associated with this message, link it + // with the outgoing delivery. Otherwise, the message arrived pre-settled + // and should be sent presettled. + // + if (re->delivery) { + dx_delivery_set_peer(re->delivery, delivery); + dx_delivery_set_peer(delivery, re->delivery); + } else + dx_delivery_free(delivery, 0); // settle and free + + pn_link_advance(pn_link); + event_count++; + + dx_free_message(re->message); + free_dx_routed_event_t(re); + re = DEQ_HEAD(to_send); + } // - // If there is no incoming delivery, it was pre-settled. In this case, - // we must pre-settle the outgoing delivery as well. + // Process the non-delivery events. // - if (dx_message_in_delivery(msg)) { - pn_delivery_set_context(delivery, (void*) msg); - dx_message_set_out_delivery(msg, delivery); - } else { - pn_delivery_settle(delivery); - dx_free_message(msg); + re = DEQ_HEAD(events); + while (re) { + DEQ_REMOVE_HEAD(events); + + if (re->delivery) { + if (re->disposition) { + pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition); + event_count++; + } + if (re->settle) { + dx_delivery_free(re->delivery, 0); + event_count++; + } + } + + free_dx_routed_event_t(re); + re = DEQ_HEAD(events); } - pn_link_advance(pn_link); - pn_link_offered(pn_link, size); + // + // Set the offer to the number of messages remaining to be sent. + // + pn_link_offered(pn_link, offer); + return event_count; +} + + +static void router_annotate_message(dx_router_t *router, dx_message_t *msg) +{ + dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); + dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); + + dx_parsed_field_t *trace = 0; + dx_parsed_field_t *ingress = 0; + + if (in_da) { + trace = dx_parse_value_by_key(in_da, "qdx.trace"); + ingress = dx_parse_value_by_key(in_da, "qdx.ingress"); + } + + dx_compose_start_map(out_da); + + // + // If there is a trace field, append this router's ID to the trace. + // + if (trace && dx_parse_is_list(trace)) { + dx_compose_insert_string(out_da, "qdx.trace"); + dx_compose_start_list(out_da); + + uint32_t idx = 0; + dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx); + while (trace_item) { + dx_field_iterator_t *iter = dx_parse_raw(trace_item); + dx_compose_insert_string_iterator(out_da, iter); + idx++; + trace_item = dx_parse_sub_value(trace, idx); + } + + dx_compose_insert_string(out_da, router->router_id); + dx_compose_end_list(out_da); + } + + // + // If there is no ingress field, annotate the ingress as this router else + // keep the original field. + // + dx_compose_insert_string(out_da, "qdx.ingress"); + if (ingress && dx_parse_is_scalar(ingress)) { + dx_field_iterator_t *iter = dx_parse_raw(ingress); + dx_compose_insert_string_iterator(out_da, iter); + } else + dx_compose_insert_string(out_da, router->router_id); + + dx_compose_end_map(out_da); + + dx_message_set_delivery_annotations(msg, out_da); + dx_compose_free(out_da); } /** * Inbound Delivery Handler */ -static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *delivery) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = pn_delivery_link(delivery); - dx_message_t *msg; - int valid_message = 0; + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + dx_message_t *msg; + int valid_message = 0; // // Receive the message into a local representation. If the returned message @@ -158,20 +326,63 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del return; // - // Validate the message through the Properties section + // Consume the delivery and issue a replacement credit // - valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); - pn_link_advance(pn_link); pn_link_flow(pn_link, 1); + sys_mutex_lock(router->lock); + + // + // Handle the Link-Routing case. If this incoming link is associated with a connected + // link, simply deliver the message to the outgoing link. There is no need to validate + // the message in this case. + // + if (rlink->connected_link) { + dx_router_link_t *clink = rlink->connected_link; + dx_routed_event_t *re = new_dx_routed_event_t(); + + DEQ_ITEM_INIT(re); + re->delivery = 0; + re->message = msg; + re->settle = false; + re->disposition = 0; + DEQ_INSERT_TAIL(clink->msg_fifo, re); + + // + // If the incoming delivery is settled (pre-settled), don't link it into the routed + // event. If it's not settled, link it into the event for later handling. + // + if (dx_delivery_settled(delivery)) + dx_delivery_free(delivery, 0); + else + re->delivery = delivery; + + sys_mutex_unlock(router->lock); + dx_link_activate(clink->link); + return; + } + + // + // We are performing Message-Routing, therefore we will need to validate the message + // through the Properties section so we can access the TO field. + // + dx_message_t *in_process_copy = 0; + dx_router_message_cb handler = 0; + void *handler_context = 0; + + valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); + if (valid_message) { dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); dx_address_t *addr; + int fanout = 0; + if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - sys_mutex_lock(router->lock); hash_retrieve(router->out_hash, iter, (void*) &addr); + dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); + int is_local = dx_field_iterator_prefix(iter, local_prefix); dx_field_iterator_free(iter); if (addr) { @@ -179,108 +390,145 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del // To field is valid and contains a known destination. Handle the various // cases for forwarding. // - // Forward to the in-process handler for this message if there is one. - // Note: If the handler is going to queue the message for deferred processing, - // it must copy the message. This function assumes that the handler - // will process the message synchronously and be finished with it upon - // completion. - // - if (addr->handler) - addr->handler(addr->handler_context, msg); // - // Forward to the local link for the locally-connected consumer, if present. - // TODO - Don't forward if this is a "_local" address. + // Interpret and update the delivery annotations of the message // - if (addr->rlink) { - pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link); - dx_message_t *copy = dx_message_copy(msg); - DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy); - pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); - dx_link_activate(addr->rlink->link); - } + router_annotate_message(router, msg); // - // Forward to the next-hop for a remotely-connected consumer, if present. - // Don't forward if this is a "_local" address. + // Forward to the in-process handler for this message if there is one. The + // actual invocation of the handler will occur later after we've released + // the lock. // - if (addr->rnode) { - // TODO + if (addr->handler) { + in_process_copy = dx_message_copy(msg); + handler = addr->handler; + handler_context = addr->handler_context; } - } else { // - // To field contains an unknown address. Release the message. + // If the address form is local (i.e. is prefixed by _local), don't forward + // outside of the router process. // - pn_delivery_update(delivery, PN_RELEASED); - pn_delivery_settle(delivery); + if (!is_local) { + // + // Forward to all of the local links receiving this address. + // + dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); + while (dest_link) { + dx_routed_event_t *re = new_dx_routed_event_t(); + DEQ_ITEM_INIT(re); + re->delivery = 0; + re->message = dx_message_copy(msg); + re->settle = 0; + re->disposition = 0; + DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + + fanout++; + if (fanout == 1 && !dx_delivery_settled(delivery)) + re->delivery = delivery; + + dx_link_activate(dest_link->link); + dest_link = DEQ_NEXT(dest_link); + } + + // + // Forward to the next-hops for remote destinations. + // + dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); + while (dest_node) { + if (dest_node->next_hop) + dest_link = dest_node->next_hop->peer_link; + else + dest_link = dest_node->peer_link; + if (dest_link) { + dx_routed_event_t *re = new_dx_routed_event_t(); + DEQ_ITEM_INIT(re); + re->delivery = 0; + re->message = dx_message_copy(msg); + re->settle = 0; + re->disposition = 0; + DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + + fanout++; + if (fanout == 1) + re->delivery = delivery; + + dx_link_activate(dest_link->link); + } + dest_node = DEQ_NEXT(dest_node); + } + } } - sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? - dx_free_message(msg); - // - // If this was a pre-settled delivery, we must also locally settle it. + // In message-routing mode, the handling of the incoming delivery depends on the + // number of copies of the received message that were forwarded. // - if (pn_delivery_settled(delivery)) - pn_delivery_settle(delivery); + if (handler) { + dx_delivery_free(delivery, PN_ACCEPTED); + } else if (fanout == 0) { + dx_delivery_free(delivery, PN_RELEASED); + } else if (fanout > 1) + dx_delivery_free(delivery, PN_ACCEPTED); } } else { // // Message is invalid. Reject the message. // - pn_delivery_update(delivery, PN_REJECTED); - pn_delivery_settle(delivery); - pn_delivery_set_context(delivery, 0); - dx_free_message(msg); + dx_delivery_free(delivery, PN_REJECTED); } + + sys_mutex_unlock(router->lock); + dx_free_message(msg); + + // + // Invoke the in-process handler now that the lock is released. + // + if (handler) + handler(handler_context, in_process_copy); } /** * Delivery Disposition Handler */ -static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *delivery) { - pn_link_t *pn_link = pn_delivery_link(delivery); + dx_router_t *router = (dx_router_t*) context; + bool changed = dx_delivery_disp_changed(delivery); + uint64_t disp = dx_delivery_disp(delivery); + bool settled = dx_delivery_settled(delivery); + dx_delivery_t *peer = dx_delivery_peer(delivery); - if (pn_link_is_sender(pn_link)) { - uint64_t disp = pn_delivery_remote_state(delivery); - dx_message_t *msg = pn_delivery_get_context(delivery); - pn_delivery_t *activate = 0; - - if (msg) { - assert(delivery == dx_message_out_delivery(msg)); - if (disp != 0) { - activate = dx_message_in_delivery(msg); - pn_delivery_update(activate, disp); - // TODO - handling of the data accompanying RECEIVED/MODIFIED - } - - if (pn_delivery_settled(delivery)) { - // - // Downstream delivery has been settled. Propagate the settlement - // upstream. - // - activate = dx_message_in_delivery(msg); - pn_delivery_settle(activate); - pn_delivery_settle(delivery); - dx_free_message(msg); - } + if (peer) { + // + // The case where this delivery has a peer. + // + if (changed || settled) { + dx_link_t *peer_link = dx_delivery_link(peer); + dx_router_link_t *prl = (dx_router_link_t*) dx_link_get_context(peer_link); + dx_routed_event_t *re = new_dx_routed_event_t(); + DEQ_ITEM_INIT(re); + re->delivery = peer; + re->message = 0; + re->settle = settled; + re->disposition = changed ? disp : 0; - if (activate) { - // - // Activate the upstream/incoming link so that the settlement will - // get pushed out. - // - dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate)); - dx_link_activate(act_link); - } + sys_mutex_lock(router->lock); + DEQ_INSERT_TAIL(prl->event_fifo, re); + sys_mutex_unlock(router->lock); - return; + dx_link_activate(peer_link); } + } else { - // TODO - Handle disposition updates from upstream + // + // The no-peer case. Ignore status changes and echo settlement. + // + if (settled) + dx_delivery_free(delivery, 0); } } @@ -290,25 +538,36 @@ static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *d */ static int router_incoming_link_handler(void* context, dx_link_t *link) { - dx_router_t *router = (dx_router_t*) context; - dx_link_item_t *item = new_dx_link_item_t(); - pn_link_t *pn_link = dx_link_pn(link); + dx_router_t *router = (dx_router_t*) context; + dx_router_link_t *rlink = new_dx_router_link_t(); + pn_link_t *pn_link = dx_link_pn(link); - if (item) { - DEQ_ITEM_INIT(item); - item->link = link; + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_INCOMING; + rlink->link_type = DX_LINK_ENDPOINT; + rlink->owning_addr = 0; + rlink->link = link; + rlink->connected_link = 0; + rlink->peer_link = 0; + DEQ_INIT(rlink->event_fifo); + DEQ_INIT(rlink->msg_fifo); - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, item); - sys_mutex_unlock(router->lock); + dx_link_set_context(link, rlink); + + sys_mutex_lock(router->lock); + DEQ_INSERT_TAIL(router->in_links, rlink); + sys_mutex_unlock(router->lock); + + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_flow(pn_link, 1000); + pn_link_open(pn_link); + + // + // TODO - If the address has link-route semantics, create all associated + // links needed to go with this one. + // - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); - pn_link_flow(pn_link, 1000); - pn_link_open(pn_link); - } else { - pn_link_close(pn_link); - } return 0; } @@ -327,73 +586,45 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) return 0; } - dx_router_link_t *rlink = new_dx_router_link_t(); - rlink->link = link; - DEQ_INIT(rlink->out_fifo); + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + dx_router_link_t *rlink = new_dx_router_link_t(); + + int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address); + + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_OUTGOING; + rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; + rlink->link = link; + rlink->connected_link = 0; + rlink->peer_link = 0; + DEQ_INIT(rlink->event_fifo); + DEQ_INIT(rlink->msg_fifo); + dx_link_set_context(link, rlink); + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); dx_address_t *addr; - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - sys_mutex_lock(router->lock); hash_retrieve(router->out_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); - addr->is_local = 0; addr->handler = 0; addr->handler_context = 0; - addr->rlink = 0; - addr->rnode = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); hash_insert(router->out_hash, iter, addr); } dx_field_iterator_free(iter); - if (addr->rlink == 0) { - addr->rlink = rlink; - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); - pn_link_open(pn_link); - sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); - return 0; - } + rlink->owning_addr = addr; + DEQ_INSERT_TAIL(addr->rlinks, rlink); - dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt); - pn_link_close(pn_link); + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_open(pn_link); sys_mutex_unlock(router->lock); - return 0; -} - - -/** - * Outgoing Link Writable Handler - */ -static int router_writable_link_handler(void* context, dx_link_t *link) -{ - dx_router_t *router = (dx_router_t*) context; - int grant_delivery = 0; - pn_delivery_t *delivery; - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - pn_link_t *pn_link = dx_link_pn(link); - uint64_t tag; - - sys_mutex_lock(router->lock); - if (DEQ_SIZE(rlink->out_fifo) > 0) { - grant_delivery = 1; - tag = router->dtag++; - } - sys_mutex_unlock(router->lock); - - if (grant_delivery) { - pn_delivery(pn_link, pn_dtag((char*) &tag, 8)); - delivery = pn_link_current(pn_link); - if (delivery) { - router_tx_handler(context, link, delivery); - return 1; - } - } - + dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } @@ -403,44 +634,37 @@ static int router_writable_link_handler(void* context, dx_link_t *link) */ static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); - dx_link_item_t *item; + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); - if (!r_tgt) + if (!rlink) return 0; sys_mutex_lock(router->lock); if (pn_link_is_sender(pn_link)) { - item = DEQ_HEAD(router->out_links); - - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - if (iter) { - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (addr) { - hash_remove(router->out_hash, iter); - free_dx_router_link_t(addr->rlink); - free_dx_address_t(addr); - dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); + DEQ_REMOVE(rlink->owning_addr->rlinks, rlink); + + if ((rlink->owning_addr->handler == 0) && + (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) && + (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) { + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); + dx_address_t *addr; + if (iter) { + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (addr == rlink->owning_addr) { + hash_remove(router->out_hash, iter); + free_dx_router_link_t(rlink); + free_dx_address_t(addr); + dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); + } + dx_field_iterator_free(iter); } - dx_field_iterator_free(iter); } - } - else - item = DEQ_HEAD(router->in_links); - - while (item) { - if (item->link == link) { - if (pn_link_is_sender(pn_link)) - DEQ_REMOVE(router->out_links, item); - else - DEQ_REMOVE(router->in_links, item); - free_dx_link_item_t(item); - break; - } - item = item->next; + } else { + DEQ_REMOVE(router->in_links, rlink); + free_dx_router_link_t(rlink); } sys_mutex_unlock(router->lock); @@ -455,6 +679,83 @@ static void router_inbound_open_handler(void *type_context, dx_connection_t *con static void router_outbound_open_handler(void *type_context, dx_connection_t *conn) { + // TODO - Make sure this connection is annotated as an inter-router transport. + // Ignore otherwise + + dx_router_t *router = (dx_router_t*) type_context; + dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH); + dx_link_t *sender; + dx_link_t *receiver; + dx_router_link_t *rlink; + + // + // Create an incoming link and put it in the in-links collection. The address + // of the remote source of this link is '_local/qdxrouter'. + // + receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx"); + pn_terminus_set_address(dx_link_remote_source(receiver), router_address); + pn_terminus_set_address(dx_link_target(receiver), router_address); + + rlink = new_dx_router_link_t(); + + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_INCOMING; + rlink->link_type = DX_LINK_ROUTER; + rlink->owning_addr = 0; + rlink->link = receiver; + rlink->connected_link = 0; + rlink->peer_link = 0; + DEQ_INIT(rlink->event_fifo); + DEQ_INIT(rlink->msg_fifo); + + dx_link_set_context(receiver, rlink); + + sys_mutex_lock(router->lock); + DEQ_INSERT_TAIL(router->in_links, rlink); + sys_mutex_unlock(router->lock); + + // + // Create an outgoing link with a local source of '_local/qdxrouter' and place + // it in the routing table. + // + sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx"); + pn_terminus_set_address(dx_link_remote_target(sender), router_address); + pn_terminus_set_address(dx_link_source(sender), router_address); + + rlink = new_dx_router_link_t(); + + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_OUTGOING; + rlink->link_type = DX_LINK_ROUTER; + rlink->link = sender; + rlink->connected_link = 0; + rlink->peer_link = 0; + DEQ_INIT(rlink->event_fifo); + DEQ_INIT(rlink->msg_fifo); + + dx_link_set_context(sender, rlink); + + dx_address_t *addr; + + sys_mutex_lock(router->lock); + hash_retrieve(router->out_hash, aiter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->out_hash, aiter, addr); + } + + rlink->owning_addr = addr; + DEQ_INSERT_TAIL(addr->rlinks, rlink); + sys_mutex_unlock(router->lock); + + pn_link_open(dx_link_pn(receiver)); + pn_link_open(dx_link_pn(sender)); + pn_link_flow(dx_link_pn(receiver), 1000); + dx_field_iterator_free(aiter); } @@ -473,7 +774,6 @@ static void dx_router_timer_handler(void *context) static dx_node_type_t router_node = {"router", 0, 0, router_rx_handler, - router_tx_handler, router_disp_handler, router_incoming_link_handler, router_outgoing_link_handler, @@ -494,22 +794,23 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) } dx_router_t *router = NEW(dx_router_t); - dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); + router_node.type_context = router; + + router->dx = dx; + router->router_area = area; + router->router_id = id; + router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); DEQ_INIT(router->in_links); - DEQ_INIT(router->out_links); + DEQ_INIT(router->routers); DEQ_INIT(router->in_fifo); + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->out_hash = hash(10, 32, 0); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; - router->dx = dx; - router->lock = sys_mutex(); - router->router_area = area; - router->router_id = id; - - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; // // Inform the field iterator module of this router's id and area. The field iterator @@ -546,47 +847,52 @@ void dx_router_free(dx_router_t *router) } +const char *dx_router_id(const dx_dispatch_t *dx) +{ + dx_router_t *router = dx->router; + return router->router_id; +} + + dx_address_t *dx_router_register_address(dx_dispatch_t *dx, - bool is_local, const char *address, dx_router_message_cb handler, void *context) { - char addr[1000]; - dx_address_t *ad = new_dx_address_t(); + char addr_string[1000]; + dx_router_t *router = dx->router; + dx_address_t *addr; dx_field_iterator_t *iter; - int result; - if (!ad) - return 0; + strcpy(addr_string, "L"); // Local Hash-Key Space + strcat(addr_string, address); + iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); - ad->is_local = is_local; - ad->handler = handler; - ad->handler_context = context; - ad->rlink = 0; + sys_mutex_lock(router->lock); + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->out_hash, iter, addr); + } + dx_field_iterator_free(iter); - if (ad->is_local) - strcpy(addr, "L"); // Local Hash-Key Space - else - strcpy(addr, "M"); // Mobile Hash-Key Space + addr->handler = handler; + addr->handler_context = context; - strcat(addr, address); - iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST); - result = hash_insert(dx->router->out_hash, iter, ad); - dx_field_iterator_free(iter); - if (result != 0) { - free_dx_address_t(ad); - return 0; - } + sys_mutex_unlock(router->lock); dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); - return ad; + return addr; } void dx_router_unregister_address(dx_address_t *ad) { - free_dx_address_t(ad); + //free_dx_address_t(ad); } @@ -601,12 +907,43 @@ void dx_router_send(dx_dispatch_t *dx, sys_mutex_lock(router->lock); hash_retrieve(router->out_hash, address, (void*) &addr); if (addr) { - if (addr->rlink) { - pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link); - dx_message_t *copy = dx_message_copy(msg); - DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy); - pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); - dx_link_activate(addr->rlink->link); + // + // Forward to all of the local links receiving this address. + // + dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); + while (dest_link) { + dx_routed_event_t *re = new_dx_routed_event_t(); + DEQ_ITEM_INIT(re); + re->delivery = 0; + re->message = dx_message_copy(msg); + re->settle = 0; + re->disposition = 0; + DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + + dx_link_activate(dest_link->link); + dest_link = DEQ_NEXT(dest_link); + } + + // + // Forward to the next-hops for remote destinations. + // + dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); + while (dest_node) { + if (dest_node->next_hop) + dest_link = dest_node->next_hop->peer_link; + else + dest_link = dest_node->peer_link; + if (dest_link) { + dx_routed_event_t *re = new_dx_routed_event_t(); + DEQ_ITEM_INIT(re); + re->delivery = 0; + re->message = dx_message_copy(msg); + re->settle = 0; + re->disposition = 0; + DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + dx_link_activate(dest_link->link); + } + dest_node = DEQ_NEXT(dest_node); } } sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? @@ -633,6 +970,24 @@ typedef struct { } RouterAdapter; +static PyObject* dx_router_node_updated(PyObject *self, PyObject *args) +{ + //RouterAdapter *adapter = (RouterAdapter*) self; + //dx_router_t *router = adapter->router; + const char *address; + int is_reachable; + int is_neighbor; + + if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor)) + return 0; + + // TODO + + Py_INCREF(Py_None); + return Py_None; +} + + static PyObject* dx_router_add_route(PyObject *self, PyObject *args) { //RouterAdapter *adapter = (RouterAdapter*) self; @@ -666,8 +1021,9 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args) static PyMethodDef RouterAdapter_methods[] = { - {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, - {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, + {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"}, + {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, + {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, {0, 0, 0, 0} }; @@ -747,7 +1103,7 @@ static void dx_router_python_setup(dx_router_t *router) PyObject* pClass; PyObject* pArgs; - pName = PyString_FromString("router"); + pName = PyString_FromString("qpid.dispatch.router"); pModule = PyImport_Import(pName); Py_DECREF(pName); if (!pModule) { @@ -809,14 +1165,16 @@ static void dx_pyrouter_tick(dx_router_t *router) PyObject *pArgs; PyObject *pValue; - pArgs = PyTuple_New(0); - pValue = PyObject_CallObject(router->pyTick, pArgs); - if (PyErr_Occurred()) { - PyErr_Print(); - } - Py_DECREF(pArgs); - if (pValue) { - Py_DECREF(pValue); + if (router->pyTick) { + pArgs = PyTuple_New(0); + pValue = PyObject_CallObject(router->pyTick, pArgs); + if (PyErr_Occurred()) { + PyErr_Print(); + } + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } } } diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c index 52450b42b6..5420d3b776 100644 --- a/qpid/extras/dispatch/src/server.c +++ b/qpid/extras/dispatch/src/server.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *dx_server, int id) } -static void thread_process_listeners(pn_driver_t *driver) +static void thread_process_listeners(dx_server_t *dx_server) { + pn_driver_t *driver = dx_server->driver; pn_listener_t *listener = pn_driver_listener(driver); pn_connector_t *cxtr; dx_connection_t *ctx; while (listener) { - dx_log(module, LOG_TRACE, "Accepting Connection"); cxtr = pn_listener_accept(listener); + dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr)); ctx = new_dx_connection_t(); ctx->state = CONN_STATE_OPENING; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; ctx->pn_cxtr = cxtr; - ctx->pn_conn = 0; ctx->listener = (dx_listener_t*) pn_listener_context(listener); ctx->connector = 0; ctx->context = ctx->listener->context; ctx->ufd = 0; + pn_connection_t *conn = pn_connection(); + pn_connection_set_container(conn, dx_server->container_name); + pn_connector_set_connection(cxtr, conn); + pn_connection_set_context(conn, ctx); + ctx->pn_conn = conn; + // // Get a pointer to the transport so we can insert security components into it // @@ -201,20 +207,12 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) // Call the handler that is appropriate for the connector's state. // switch (ctx->state) { - case CONN_STATE_CONNECTING: - if (!pn_connector_closed(cxtr)) { - //ctx->state = CONN_STATE_SASL_CLIENT; - assert(ctx->connector); - ctx->connector->state = CXTR_STATE_OPEN; - events = 1; - } else { + case CONN_STATE_CONNECTING: { + if (pn_connector_closed(cxtr)) { ctx->state = CONN_STATE_FAILED; events = 0; + break; } - break; - - case CONN_STATE_OPENING: - ctx->state = CONN_STATE_OPERATIONAL; pn_connection_t *conn = pn_connection(); pn_connection_set_container(conn, dx_server->container_name); @@ -222,20 +220,71 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; - dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + pn_transport_t *tport = pn_connector_transport(cxtr); + const dx_server_config_t *config = ctx->connector->config; + + // + // Set up SSL if appropriate + // + if (config->ssl_enabled) { + pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT); + pn_ssl_domain_set_credentials(domain, + config->ssl_certificate_file, + config->ssl_private_key_file, + config->ssl_password); + + if (config->ssl_require_peer_authentication) + pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db); + + pn_ssl_t *ssl = pn_ssl(tport); + pn_ssl_init(ssl, domain, 0); + pn_ssl_domain_free(domain); + } - if (ctx->listener) { - ce = DX_CONN_EVENT_LISTENER_OPEN; - } else if (ctx->connector) { - ce = DX_CONN_EVENT_CONNECTOR_OPEN; - ctx->connector->delay = 0; - } else - assert(0); + // + // Set up SASL + // + pn_sasl_t *sasl = pn_sasl(tport); + pn_sasl_mechanisms(sasl, config->sasl_mechanisms); + pn_sasl_client(sasl); - dx_server->conn_handler(dx_server->conn_handler_context, - ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + ctx->state = CONN_STATE_OPENING; + assert(ctx->connector); + ctx->connector->state = CXTR_STATE_OPEN; events = 1; break; + } + + case CONN_STATE_OPENING: { + pn_transport_t *tport = pn_connector_transport(cxtr); + pn_sasl_t *sasl = pn_sasl(tport); + + if (pn_sasl_outcome(sasl) == PN_SASL_OK) { + ctx->state = CONN_STATE_OPERATIONAL; + + dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + + if (ctx->listener) { + ce = DX_CONN_EVENT_LISTENER_OPEN; + } else if (ctx->connector) { + ce = DX_CONN_EVENT_CONNECTOR_OPEN; + ctx->connector->delay = 0; + } else + assert(0); + + dx_server->conn_handler(dx_server->conn_handler_context, + ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + events = 1; + break; + } + else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) { + ctx->state = CONN_STATE_FAILED; + if (ctx->connector) { + const dx_server_config_t *config = ctx->connector->config; + dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port); + } + } + } case CONN_STATE_OPERATIONAL: if (pn_connector_closed(cxtr)) { @@ -323,20 +372,35 @@ static void *thread_run(void *arg) // // Service pending timers. // - dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); - if (timer) { - DEQ_REMOVE_HEAD(dx_server->pending_timers); - - // - // Mark the timer as idle in case it reschedules itself. - // - dx_timer_idle_LH(timer); + if (DEQ_SIZE(dx_server->pending_timers) > 0) { + dx_timer_list_t local_list; + dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); + + DEQ_INIT(local_list); + while (timer) { + DEQ_REMOVE_HEAD(dx_server->pending_timers); + DEQ_INSERT_TAIL(local_list, timer); + timer = DEQ_HEAD(dx_server->pending_timers); + } // - // Release the lock and invoke the connection handler. + // Release the lock and invoke the connection handlers. // sys_mutex_unlock(dx_server->lock); - timer->handler(timer->context); + + timer = DEQ_HEAD(local_list); + while (timer) { + DEQ_REMOVE_HEAD(local_list); + + // + // Mark the timer as idle in case it reschedules itself. + // + dx_timer_idle_LH(timer); + + timer->handler(timer->context); + timer = DEQ_HEAD(local_list); + } + pn_driver_wakeup(dx_server->driver); continue; } @@ -411,7 +475,7 @@ static void *thread_run(void *arg) // // Process listeners (incoming connections). // - thread_process_listeners(dx_server->driver); + thread_process_listeners(dx_server); // // Traverse the list of connectors-needing-service from the proton driver. diff --git a/qpid/extras/dispatch/src/server_private.h b/qpid/extras/dispatch/src/server_private.h index 49ab4724d2..5782492f22 100644 --- a/qpid/extras/dispatch/src/server_private.h +++ b/qpid/extras/dispatch/src/server_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/timer.c b/qpid/extras/dispatch/src/timer.c index 0d5b301a6e..a7f36e149b 100644 --- a/qpid/extras/dispatch/src/timer.c +++ b/qpid/extras/dispatch/src/timer.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/timer_private.h b/qpid/extras/dispatch/src/timer_private.h index 905a8f5bd1..4e173b7ad4 100644 --- a/qpid/extras/dispatch/src/timer_private.h +++ b/qpid/extras/dispatch/src/timer_private.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/work_queue.c b/qpid/extras/dispatch/src/work_queue.c index 4b3c5d7fa5..93c1cda1a9 100644 --- a/qpid/extras/dispatch/src/work_queue.c +++ b/qpid/extras/dispatch/src/work_queue.c @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/qpid/extras/dispatch/src/work_queue.h b/qpid/extras/dispatch/src/work_queue.h index 597a484a9c..3ad5c21c81 100644 --- a/qpid/extras/dispatch/src/work_queue.h +++ b/qpid/extras/dispatch/src/work_queue.h @@ -8,9 +8,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
