summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-09-20 18:59:30 +0000
committerKim van der Riet <kpvdr@apache.org>2013-09-20 18:59:30 +0000
commitc70bf3ea28cdf6bafd8571690d3e5c466a0658a2 (patch)
tree68b24940e433f3f9c278b054d9ea1622389bd332 /qpid/extras/dispatch/src
parentfcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (diff)
downloadqpid-python-c70bf3ea28cdf6bafd8571690d3e5c466a0658a2.tar.gz
QPID-4984: WIP - Merge from trunk r.1525056
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
-rw-r--r--qpid/extras/dispatch/src/agent.c6
-rw-r--r--qpid/extras/dispatch/src/alloc.c49
-rw-r--r--qpid/extras/dispatch/src/alloc_private.h4
-rw-r--r--qpid/extras/dispatch/src/buffer.c4
-rw-r--r--qpid/extras/dispatch/src/compose.c28
-rw-r--r--qpid/extras/dispatch/src/compose_private.h25
-rw-r--r--qpid/extras/dispatch/src/config.c6
-rw-r--r--qpid/extras/dispatch/src/config_private.h4
-rw-r--r--qpid/extras/dispatch/src/container.c202
-rw-r--r--qpid/extras/dispatch/src/dispatch.c4
-rw-r--r--qpid/extras/dispatch/src/dispatch_private.h4
-rw-r--r--qpid/extras/dispatch/src/hash.c6
-rw-r--r--qpid/extras/dispatch/src/iovec.c4
-rw-r--r--qpid/extras/dispatch/src/iterator.c6
-rw-r--r--qpid/extras/dispatch/src/log.c4
-rw-r--r--qpid/extras/dispatch/src/log_private.h4
-rw-r--r--qpid/extras/dispatch/src/message.c241
-rw-r--r--qpid/extras/dispatch/src/message_private.h24
-rw-r--r--qpid/extras/dispatch/src/parse.c29
-rw-r--r--qpid/extras/dispatch/src/posix/threading.c7
-rw-r--r--qpid/extras/dispatch/src/py/config/__init__.py20
-rw-r--r--qpid/extras/dispatch/src/py/config/parser.py337
-rw-r--r--qpid/extras/dispatch/src/py/config/schema.py82
-rw-r--r--qpid/extras/dispatch/src/py/router/__init__.py20
-rw-r--r--qpid/extras/dispatch/src/py/router/adapter.py99
-rw-r--r--qpid/extras/dispatch/src/py/router/binding.py136
-rw-r--r--qpid/extras/dispatch/src/py/router/configuration.py47
-rw-r--r--qpid/extras/dispatch/src/py/router/data.py275
-rw-r--r--qpid/extras/dispatch/src/py/router/link.py140
-rw-r--r--qpid/extras/dispatch/src/py/router/mobile.py188
-rw-r--r--qpid/extras/dispatch/src/py/router/neighbor.py83
-rw-r--r--qpid/extras/dispatch/src/py/router/path.py202
-rw-r--r--qpid/extras/dispatch/src/py/router/router_engine.py255
-rw-r--r--qpid/extras/dispatch/src/py/router/routing.py56
-rw-r--r--qpid/extras/dispatch/src/py/stubs/__init__.py22
-rw-r--r--qpid/extras/dispatch/src/py/stubs/ioadapter.py27
-rw-r--r--qpid/extras/dispatch/src/py/stubs/logadapter.py33
-rw-r--r--qpid/extras/dispatch/src/python_embedded.c95
-rw-r--r--qpid/extras/dispatch/src/router_node.c954
-rw-r--r--qpid/extras/dispatch/src/server.c138
-rw-r--r--qpid/extras/dispatch/src/server_private.h4
-rw-r--r--qpid/extras/dispatch/src/timer.c4
-rw-r--r--qpid/extras/dispatch/src/timer_private.h4
-rw-r--r--qpid/extras/dispatch/src/work_queue.c4
-rw-r--r--qpid/extras/dispatch/src/work_queue.h4
45 files changed, 1295 insertions, 2595 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 88b2cf2bcd..557410910c 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -262,7 +262,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent);
- agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
+ agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent);
return agent;
}
diff --git a/qpid/extras/dispatch/src/alloc.c b/qpid/extras/dispatch/src/alloc.c
index 87b490fabe..bf10f03633 100644
--- a/qpid/extras/dispatch/src/alloc.c
+++ b/qpid/extras/dispatch/src/alloc.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,24 +24,34 @@
#include <memory.h>
#include <stdio.h>
-typedef struct item_t item_t;
+typedef struct dx_alloc_type_t dx_alloc_type_t;
+typedef struct dx_alloc_item_t dx_alloc_item_t;
-struct item_t {
- DEQ_LINKS(item_t);
+struct dx_alloc_type_t {
+ DEQ_LINKS(dx_alloc_type_t);
dx_alloc_type_desc_t *desc;
};
-DEQ_DECLARE(item_t, item_list_t);
+DEQ_DECLARE(dx_alloc_type_t, dx_alloc_type_list_t);
+
+
+struct dx_alloc_item_t {
+ DEQ_LINKS(dx_alloc_item_t);
+};
+
+DEQ_DECLARE(dx_alloc_item_t, dx_alloc_item_list_t);
+
struct dx_alloc_pool_t {
- item_list_t free_list;
+ dx_alloc_item_list_t free_list;
};
dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0};
dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0};
+#define BIG_THRESHOLD 256
-static sys_mutex_t *init_lock;
-static item_list_t type_list;
+static sys_mutex_t *init_lock;
+static dx_alloc_type_list_t type_list;
static void dx_alloc_init(dx_alloc_type_desc_t *desc)
{
@@ -56,7 +66,7 @@ static void dx_alloc_init(dx_alloc_type_desc_t *desc)
if (!desc->global_pool) {
if (desc->config == 0)
- desc->config = desc->total_size > 256 ?
+ desc->config = desc->total_size > BIG_THRESHOLD ?
&dx_alloc_default_config_big : &dx_alloc_default_config_small;
assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size);
@@ -66,12 +76,12 @@ static void dx_alloc_init(dx_alloc_type_desc_t *desc)
desc->lock = sys_mutex();
desc->stats = NEW(dx_alloc_stats_t);
memset(desc->stats, 0, sizeof(dx_alloc_stats_t));
- }
- item_t *type_item = NEW(item_t);
- DEQ_ITEM_INIT(type_item);
- type_item->desc = desc;
- DEQ_INSERT_TAIL(type_list, type_item);
+ dx_alloc_type_t *type_item = NEW(dx_alloc_type_t);
+ DEQ_ITEM_INIT(type_item);
+ type_item->desc = desc;
+ DEQ_INSERT_TAIL(type_list, type_item);
+ }
sys_mutex_unlock(init_lock);
}
@@ -103,7 +113,7 @@ void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool)
// list and return it. Since everything we've touched is thread-local,
// there is no need to acquire a lock.
//
- item_t *item = DEQ_HEAD(pool->free_list);
+ dx_alloc_item_t *item = DEQ_HEAD(pool->free_list);
if (item) {
DEQ_REMOVE_HEAD(pool->free_list);
return &item[1];
@@ -130,11 +140,10 @@ void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool)
// Allocate a full batch from the heap and put it on the thread list.
//
for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
- item = (item_t*) malloc(sizeof(item_t) + desc->total_size);
+ item = (dx_alloc_item_t*) malloc(sizeof(dx_alloc_item_t) + desc->total_size);
if (item == 0)
break;
DEQ_ITEM_INIT(item);
- item->desc = desc;
DEQ_INSERT_TAIL(pool->free_list, item);
desc->stats->held_by_threads++;
desc->stats->total_alloc_from_heap++;
@@ -154,7 +163,7 @@ void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool)
void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p)
{
- item_t *item = ((item_t*) p) - 1;
+ dx_alloc_item_t *item = ((dx_alloc_item_t*) p) - 1;
int idx;
//
@@ -217,7 +226,7 @@ static void alloc_schema_handler(void *context, void *correlator)
static void alloc_query_handler(void* context, const char *id, void *cor)
{
- item_t *item = DEQ_HEAD(type_list);
+ dx_alloc_type_t *item = DEQ_HEAD(type_list);
while (item) {
dx_agent_value_string(cor, "name", item->desc->type_name);
diff --git a/qpid/extras/dispatch/src/alloc_private.h b/qpid/extras/dispatch/src/alloc_private.h
index c812c6a661..da773fef25 100644
--- a/qpid/extras/dispatch/src/alloc_private.h
+++ b/qpid/extras/dispatch/src/alloc_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/buffer.c b/qpid/extras/dispatch/src/buffer.c
index 015711afd9..a5e609b996 100644
--- a/qpid/extras/dispatch/src/buffer.c
+++ b/qpid/extras/dispatch/src/buffer.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/compose.c b/qpid/extras/dispatch/src/compose.c
index 0f298c8217..6eb2ab1c73 100644
--- a/qpid/extras/dispatch/src/compose.c
+++ b/qpid/extras/dispatch/src/compose.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,30 +21,10 @@
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/amqp.h>
-#include "message_private.h"
#include "compose_private.h"
#include <memory.h>
-typedef struct dx_composite_t {
- DEQ_LINKS(struct dx_composite_t);
- int isMap;
- uint32_t count;
- uint32_t length;
- dx_field_location_t length_location;
- dx_field_location_t count_location;
-} dx_composite_t;
-
-ALLOC_DECLARE(dx_composite_t);
ALLOC_DEFINE(dx_composite_t);
-DEQ_DECLARE(dx_composite_t, dx_field_stack_t);
-
-
-struct dx_composed_field_t {
- dx_buffer_list_t buffers;
- dx_field_stack_t fieldStack;
-};
-
-ALLOC_DECLARE(dx_composed_field_t);
ALLOC_DEFINE(dx_composed_field_t);
@@ -197,7 +177,7 @@ static void dx_compose_end_composite(dx_composed_field_t *field)
//
dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack);
if (enclosing) {
- enclosing->length += 4 + comp->length;
+ enclosing->length += (comp->length - 4); // the length and count were already accounted for
enclosing->count++;
}
@@ -233,12 +213,14 @@ void dx_compose_free(dx_composed_field_t *field)
while (buf) {
DEQ_REMOVE_HEAD(field->buffers);
dx_free_buffer(buf);
+ buf = DEQ_HEAD(field->buffers);
}
dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
while (comp) {
DEQ_REMOVE_HEAD(field->fieldStack);
free_dx_composite_t(comp);
+ comp = DEQ_HEAD(field->fieldStack);
}
free_dx_composed_field_t(field);
diff --git a/qpid/extras/dispatch/src/compose_private.h b/qpid/extras/dispatch/src/compose_private.h
index 58eec6097b..6e72185f3c 100644
--- a/qpid/extras/dispatch/src/compose_private.h
+++ b/qpid/extras/dispatch/src/compose_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,28 @@
*/
#include <qpid/dispatch/compose.h>
+#include "message_private.h"
dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field);
+typedef struct dx_composite_t {
+ DEQ_LINKS(struct dx_composite_t);
+ int isMap;
+ uint32_t count;
+ uint32_t length;
+ dx_field_location_t length_location;
+ dx_field_location_t count_location;
+} dx_composite_t;
+
+ALLOC_DECLARE(dx_composite_t);
+DEQ_DECLARE(dx_composite_t, dx_field_stack_t);
+
+
+struct dx_composed_field_t {
+ dx_buffer_list_t buffers;
+ dx_field_stack_t fieldStack;
+};
+
+ALLOC_DECLARE(dx_composed_field_t);
+
#endif
diff --git a/qpid/extras/dispatch/src/config.c b/qpid/extras/dispatch/src/config.c
index d92a1e71dc..9390115251 100644
--- a/qpid/extras/dispatch/src/config.c
+++ b/qpid/extras/dispatch/src/config.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,7 @@
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/log.h>
-#define PYTHON_MODULE "config"
+#define PYTHON_MODULE "qpid.dispatch.config"
static const char *log_module = "CONFIG";
diff --git a/qpid/extras/dispatch/src/config_private.h b/qpid/extras/dispatch/src/config_private.h
index 4919dfc4ee..f4ecc64e4c 100644
--- a/qpid/extras/dispatch/src/config_private.h
+++ b/qpid/extras/dispatch/src/config_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index b6cc92440d..cc46f3ce77 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,6 +47,7 @@ ALLOC_DECLARE(dx_node_t);
ALLOC_DEFINE(dx_node_t);
ALLOC_DEFINE(dx_link_item_t);
+
struct dx_link_t {
pn_link_t *pn_link;
void *context;
@@ -56,6 +57,19 @@ struct dx_link_t {
ALLOC_DECLARE(dx_link_t);
ALLOC_DEFINE(dx_link_t);
+
+struct dx_delivery_t {
+ pn_delivery_t *pn_delivery;
+ dx_delivery_t *peer;
+ void *context;
+ uint64_t disposition;
+ dx_link_t *link;
+};
+
+ALLOC_DECLARE(dx_delivery_t);
+ALLOC_DEFINE(dx_delivery_t);
+
+
typedef struct dxc_node_type_t {
DEQ_LINKS(struct dxc_node_type_t);
const dx_node_type_t *ntype;
@@ -180,14 +194,25 @@ static int do_writable(pn_link_t *pn_link)
}
-static void process_receive(pn_delivery_t *delivery)
+static void do_receive(pn_delivery_t *pnd)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ pn_link_t *pn_link = pn_delivery_link(pnd);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
if (link) {
dx_node_t *node = link->node;
if (node) {
+ if (!delivery) {
+ delivery = new_dx_delivery_t();
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ pn_delivery_set_context(pnd, delivery);
+ }
+
node->ntype->rx_handler(node->context, link, delivery);
return;
}
@@ -198,34 +223,18 @@ static void process_receive(pn_delivery_t *delivery)
//
pn_link_advance(pn_link);
pn_link_flow(pn_link, 1);
- pn_delivery_update(delivery, PN_REJECTED);
- pn_delivery_settle(delivery);
-}
-
-
-static void do_send(pn_delivery_t *delivery)
-{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
-
- if (link) {
- dx_node_t *node = link->node;
- if (node) {
- node->ntype->tx_handler(node->context, link, delivery);
- return;
- }
- }
-
- // TODO - Cancel the delivery
+ pn_delivery_update(pnd, PN_REJECTED);
+ pn_delivery_settle(pnd);
}
-static void do_updated(pn_delivery_t *delivery)
+static void do_updated(pn_delivery_t *pnd)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ pn_link_t *pn_link = pn_delivery_link(pnd);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
- if (link) {
+ if (link && delivery) {
dx_node_t *node = link->node;
if (node)
node->ntype->disp_handler(node->context, link, delivery);
@@ -239,15 +248,15 @@ static int close_handler(void* unused, pn_connection_t *conn)
// Close all links, passing False as the 'closed' argument. These links are not
// being properly 'detached'. They are being orphaned.
//
- pn_link_t *pn_link = pn_link_head(conn, 0);
+ pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (pn_link) {
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
dx_node_t *node = link->node;
- if (node)
+ if (node && link)
node->ntype->link_detach_handler(node->context, link, 0);
pn_link_close(pn_link);
free_dx_link_t(link);
- pn_link = pn_link_next(pn_link, 0);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE);
}
// teardown all sessions
@@ -305,9 +314,7 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio
delivery = pn_work_head(conn);
while (delivery) {
if (pn_delivery_readable(delivery))
- process_receive(delivery);
- else if (pn_delivery_writable(delivery))
- do_send(delivery);
+ do_receive(delivery);
if (pn_delivery_updated(delivery)) {
do_updated(delivery);
@@ -318,15 +325,15 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio
}
//
- // Step 2.5: Traverse all of the links on the connection looking for
- // outgoing links with non-zero credit. Call the attached node's
- // writable handler for such links.
+ // Step 2.5: Call the attached node's writable handler for all active links
+ // on the connection. Note that in Dispatch, links are considered
+ // bidirectional. Incoming and outgoing only pertains to deliveries and
+ // deliveries are a subset of the traffic that flows both directions on links.
//
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
while (pn_link) {
assert(pn_session_connection(pn_link_session(pn_link)) == conn);
- if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
- event_count += do_writable(pn_link);
+ event_count += do_writable(pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
@@ -513,10 +520,10 @@ int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt)
}
-void dx_container_set_default_node_type(dx_dispatch_t *dx,
- const dx_node_type_t *nt,
- void *context,
- dx_dist_mode_t supported_dist)
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dx,
+ const dx_node_type_t *nt,
+ void *context,
+ dx_dist_mode_t supported_dist)
{
dx_container_t *container = dx->container;
@@ -530,6 +537,8 @@ void dx_container_set_default_node_type(dx_dispatch_t *dx,
container->default_node = 0;
dx_log(module, LOG_TRACE, "Default node removed");
}
+
+ return container->default_node;
}
@@ -699,3 +708,110 @@ void dx_link_close(dx_link_t *link)
}
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag)
+{
+ pn_link_t *pnl = dx_link_pn(link);
+
+ //
+ // If there is a current delivery on this outgoing link, something
+ // is wrong with the delivey algorithm. We assume that the current
+ // delivery ('pnd' below) is the one created by pn_delivery. If it is
+ // not, then my understanding of how proton works is incorrect.
+ //
+ assert(!pn_link_current(pnl));
+
+ pn_delivery(pnl, tag);
+ pn_delivery_t *pnd = pn_link_current(pnl);
+
+ if (!pnd)
+ return 0;
+
+ dx_delivery_t *delivery = new_dx_delivery_t();
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ pn_delivery_set_context(pnd, delivery);
+
+ return delivery;
+}
+
+
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition)
+{
+ if (delivery->pn_delivery) {
+ if (final_disposition > 0)
+ pn_delivery_update(delivery->pn_delivery, final_disposition);
+ pn_delivery_set_context(delivery->pn_delivery, 0);
+ pn_delivery_settle(delivery->pn_delivery);
+ }
+ if (delivery->peer)
+ delivery->peer->peer = 0;
+ free_dx_delivery_t(delivery);
+}
+
+
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer)
+{
+ delivery->peer = peer;
+}
+
+
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context)
+{
+ delivery->context = context;
+}
+
+
+void *dx_delivery_context(dx_delivery_t *delivery)
+{
+ return delivery->context;
+}
+
+
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery)
+{
+ return delivery->peer;
+}
+
+
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery)
+{
+ return delivery->pn_delivery;
+}
+
+
+void dx_delivery_settle(dx_delivery_t *delivery)
+{
+ if (delivery->pn_delivery) {
+ pn_delivery_settle(delivery->pn_delivery);
+ delivery->pn_delivery = 0;
+ }
+}
+
+
+bool dx_delivery_settled(dx_delivery_t *delivery)
+{
+ return pn_delivery_settled(delivery->pn_delivery);
+}
+
+
+bool dx_delivery_disp_changed(dx_delivery_t *delivery)
+{
+ return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery);
+}
+
+
+uint64_t dx_delivery_disp(dx_delivery_t *delivery)
+{
+ delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery);
+ return delivery->disposition;
+}
+
+
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery)
+{
+ return delivery->link;
+}
+
diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c
index 86398bb946..a1a659fd74 100644
--- a/qpid/extras/dispatch/src/dispatch.c
+++ b/qpid/extras/dispatch/src/dispatch.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/dispatch_private.h b/qpid/extras/dispatch/src/dispatch_private.h
index 71b9f85c0e..d96eb59afe 100644
--- a/qpid/extras/dispatch/src/dispatch_private.h
+++ b/qpid/extras/dispatch/src/dispatch_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c
index 19744366aa..1f7d8aa3f5 100644
--- a/qpid/extras/dispatch/src/hash.c
+++ b/qpid/extras/dispatch/src/hash.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ DEQ_DECLARE(hash_item_t, items_t);
typedef struct bucket_t {
- items_t items;
+ items_t items;
} bucket_t;
diff --git a/qpid/extras/dispatch/src/iovec.c b/qpid/extras/dispatch/src/iovec.c
index 6ff6874440..a4ce937333 100644
--- a/qpid/extras/dispatch/src/iovec.c
+++ b/qpid/extras/dispatch/src/iovec.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c
index eb302fe9ad..0239a083ca 100644
--- a/qpid/extras/dispatch/src/iterator.c
+++ b/qpid/extras/dispatch/src/iterator.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -367,7 +367,7 @@ dx_field_iterator_t *dx_field_iterator_sub(dx_field_iterator_t *iter, uint32_t l
void dx_field_iterator_advance(dx_field_iterator_t *iter, uint32_t length)
{
// TODO - Make this more efficient.
- for (uint8_t idx = 0; idx < length; idx++)
+ for (uint8_t idx = 0; idx < length && !dx_field_iterator_end(iter); idx++)
dx_field_iterator_octet(iter);
}
diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c
index 555d9ed7f3..281603255d 100644
--- a/qpid/extras/dispatch/src/log.c
+++ b/qpid/extras/dispatch/src/log.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/log_private.h b/qpid/extras/dispatch/src/log_private.h
index 731822ec6f..0b36bbc9a9 100644
--- a/qpid/extras/dispatch/src/log_private.h
+++ b/qpid/extras/dispatch/src/log_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c
index 6d75de76db..772c0d4e16 100644
--- a/qpid/extras/dispatch/src/message.c
+++ b/qpid/extras/dispatch/src/message.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,11 +25,35 @@
#include <string.h>
#include <stdio.h>
+static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
+static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70";
+static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
+static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71";
+static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
+static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72";
+static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
+static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73";
+static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
+static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
+static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
+static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75";
+static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
+static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76";
+static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
+static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77";
+static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
+static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78";
+static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0";
+static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1";
+static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0";
+static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
+
ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0);
ALLOC_DEFINE(dx_message_content_t);
+typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
-static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
+static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
{
unsigned char *local_cursor = *cursor;
dx_buffer_t *local_buffer = *buffer;
@@ -37,9 +61,13 @@ static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer));
while (consume > 0) {
if (consume < remaining) {
+ if (handler)
+ handler(context, local_cursor, consume);
local_cursor += consume;
consume = 0;
} else {
+ if (handler)
+ handler(context, local_cursor, remaining);
consume -= remaining;
local_buffer = local_buffer->next;
if (local_buffer == 0){
@@ -59,7 +87,7 @@ static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer)
{
unsigned char result = **cursor;
- advance(cursor, buffer, 1);
+ advance(cursor, buffer, 1, 0, 0);
return result;
}
@@ -68,7 +96,10 @@ static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field
{
unsigned char tag = next_octet(cursor, buffer);
if (!(*cursor)) return 0;
- int consume = 0;
+
+ int consume = 0;
+ size_t hdr_length = 1;
+
switch (tag & 0xF0) {
case 0x40 : consume = 0; break;
case 0x50 : consume = 1; break;
@@ -80,6 +111,7 @@ static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field
case 0xB0 :
case 0xD0 :
case 0xF0 :
+ hdr_length += 3;
consume |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
consume |= ((int) next_octet(cursor, buffer)) << 16;
@@ -91,19 +123,21 @@ static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field
case 0xA0 :
case 0xC0 :
case 0xE0 :
+ hdr_length++;
consume |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
}
if (field && !field->parsed) {
- field->buffer = *buffer;
- field->offset = *cursor - dx_buffer_base(*buffer);
- field->length = consume;
- field->parsed = 1;
+ field->buffer = *buffer;
+ field->offset = *cursor - dx_buffer_base(*buffer);
+ field->length = consume;
+ field->hdr_length = hdr_length;
+ field->parsed = 1;
}
- advance(cursor, buffer, consume);
+ advance(cursor, buffer, consume, 0, 0);
return 1;
}
@@ -210,10 +244,11 @@ static int dx_check_and_advance(dx_buffer_t **buffer,
//
// Pattern matched and tag is expected. Mark the beginning of the section.
//
- location->parsed = 1;
- location->buffer = test_buffer;
- location->offset = test_cursor - dx_buffer_base(test_buffer);
- location->length = 0;
+ location->parsed = 1;
+ location->buffer = test_buffer;
+ location->offset = test_cursor - dx_buffer_base(test_buffer);
+ location->length = 0;
+ location->hdr_length = pattern_length;
//
// Advance the pointers to consume the whole section.
@@ -249,7 +284,7 @@ static int dx_check_and_advance(dx_buffer_t **buffer,
location->length = pre_consume + consume;
if (consume)
- advance(&test_cursor, &test_buffer, consume);
+ advance(&test_cursor, &test_buffer, consume, 0, 0);
*cursor = test_cursor;
*buffer = test_buffer;
@@ -318,6 +353,11 @@ static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_mess
}
break;
+ case DX_FIELD_DELIVERY_ANNOTATION:
+ if (content->section_delivery_annotation.parsed)
+ return &content->section_delivery_annotation;
+ break;
+
case DX_FIELD_APPLICATION_PROPERTIES:
if (content->section_application_properties.parsed)
return &content->section_application_properties;
@@ -343,8 +383,7 @@ dx_message_t *dx_allocate_message()
return 0;
DEQ_ITEM_INIT(msg);
- msg->content = new_dx_message_content_t();
- msg->out_delivery = 0;
+ msg->content = new_dx_message_content_t();
if (msg->content == 0) {
free_dx_message_t((dx_message_t*) msg);
@@ -355,6 +394,7 @@ dx_message_t *dx_allocate_message()
msg->content->lock = sys_mutex();
msg->content->ref_count = 1;
msg->content->parse_depth = DX_DEPTH_NONE;
+ msg->content->parsed_delivery_annotations = 0;
return (dx_message_t*) msg;
}
@@ -371,14 +411,23 @@ void dx_free_message(dx_message_t *in_msg)
sys_mutex_unlock(content->lock);
if (rc == 0) {
- dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+ if (content->parsed_delivery_annotations)
+ dx_parse_free(content->parsed_delivery_annotations);
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
while (buf) {
DEQ_REMOVE_HEAD(content->buffers);
dx_free_buffer(buf);
buf = DEQ_HEAD(content->buffers);
}
+ buf = DEQ_HEAD(content->new_delivery_annotations);
+ while (buf) {
+ DEQ_REMOVE_HEAD(content->new_delivery_annotations);
+ dx_free_buffer(buf);
+ buf = DEQ_HEAD(content->new_delivery_annotations);
+ }
+
sys_mutex_free(content->lock);
free_dx_message_content_t(content);
}
@@ -397,8 +446,7 @@ dx_message_t *dx_message_copy(dx_message_t *in_msg)
return 0;
DEQ_ITEM_INIT(copy);
- copy->content = content;
- copy->out_delivery = 0;
+ copy->content = content;
sys_mutex_lock(content->lock);
content->ref_count++;
@@ -408,55 +456,58 @@ dx_message_t *dx_message_copy(dx_message_t *in_msg)
}
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *in_msg)
{
- ((dx_message_pvt_t*) msg)->out_delivery = delivery;
-}
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ if (content->parsed_delivery_annotations)
+ return content->parsed_delivery_annotations;
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg)
-{
- return ((dx_message_pvt_t*) msg)->out_delivery;
-}
+ dx_field_iterator_t *da = dx_message_field_iterator(in_msg, DX_FIELD_DELIVERY_ANNOTATION);
+ if (da == 0)
+ return 0;
+ content->parsed_delivery_annotations = dx_parse(da);
+ if (content->parsed_delivery_annotations == 0 ||
+ !dx_parse_ok(content->parsed_delivery_annotations) ||
+ !dx_parse_is_map(content->parsed_delivery_annotations)) {
+ dx_field_iterator_free(da);
+ dx_parse_free(content->parsed_delivery_annotations);
+ return 0;
+ }
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- content->in_delivery = delivery;
+ return content->parsed_delivery_annotations;
}
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg)
+void dx_message_set_delivery_annotations(dx_message_t *msg, dx_composed_field_t *da)
{
- dx_message_content_t *content = MSG_CONTENT(msg);
- return content->in_delivery;
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_buffer_list_t *field_buffers = dx_compose_buffers(da);
+
+ assert(DEQ_SIZE(content->new_delivery_annotations) == 0);
+ content->new_delivery_annotations = *field_buffers;
+ DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
}
-dx_message_t *dx_message_receive(pn_delivery_t *delivery)
+dx_message_t *dx_message_receive(dx_delivery_t *delivery)
{
- pn_link_t *link = pn_delivery_link(delivery);
- dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery);
+ pn_delivery_t *pnd = dx_delivery_pn(delivery);
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) dx_delivery_context(delivery);
+ pn_link_t *link = pn_delivery_link(pnd);
ssize_t rc;
dx_buffer_t *buf;
//
// If there is no message associated with the delivery, this is the first time
- // we've received anything on this delivery. Allocate a message descriptor and
+ // we've received anything on this delivery. Allocate a message descriptor and
// link it and the delivery together.
//
if (!msg) {
msg = (dx_message_pvt_t*) dx_allocate_message();
- pn_delivery_set_context(delivery, (void*) msg);
-
- //
- // Record the incoming delivery only if it is not settled. If it is
- // settled, it should not be recorded as no future operations on it are
- // permitted.
- //
- if (!pn_delivery_settled(delivery))
- msg->content->in_delivery = delivery;
+ dx_delivery_set_context(delivery, (void*) msg);
}
//
@@ -489,6 +540,7 @@ dx_message_t *dx_message_receive(pn_delivery_t *delivery)
DEQ_REMOVE_TAIL(msg->content->buffers);
dx_free_buffer(buf);
}
+ dx_delivery_set_context(delivery, 0);
return (dx_message_t*) msg;
}
@@ -520,14 +572,76 @@ dx_message_t *dx_message_receive(pn_delivery_t *delivery)
}
-void dx_message_send(dx_message_t *in_msg, pn_link_t *link)
+static void send_handler(void *context, const unsigned char *start, int length)
{
- dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
- dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
+ pn_link_t *pnl = (pn_link_t*) context;
+ pn_link_send(pnl, (const char*) start, length);
+}
+
+
+void dx_message_send(dx_message_t *in_msg, dx_link_t *link)
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+ unsigned char *cursor;
+ pn_link_t *pnl = dx_link_pn(link);
+
+ if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
+ //
+ // This is the case where the delivery annotations have been modified.
+ // The message send must be divided into sections: The existing header;
+ // the new delivery annotations; the rest of the existing message.
+ // Note that the original delivery annotations that are still in the
+ // buffer chain must not be sent.
+ //
+ // Start by making sure that we've parsed the message sections through
+ // the delivery annotations
+ //
+ if (!dx_message_check(in_msg, DX_DEPTH_DELIVERY_ANNOTATIONS))
+ return;
+
+ //
+ // Send header if present
+ //
+ cursor = dx_buffer_base(buf);
+ if (content->section_message_header.length > 0) {
+ pn_link_send(pnl, (const char*) MSG_HDR_SHORT, 3);
+ buf = content->section_message_header.buffer;
+ cursor = content->section_message_header.offset + dx_buffer_base(buf);
+ advance(&cursor, &buf, content->section_message_header.length, send_handler, (void*) pnl);
+ }
+
+ //
+ // Send new delivery annotations
+ //
+ dx_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations);
+ while (da_buf) {
+ pn_link_send(pnl, (char*) dx_buffer_base(da_buf), dx_buffer_size(da_buf));
+ da_buf = DEQ_NEXT(da_buf);
+ }
+
+ //
+ // Skip over replaced delivery annotations
+ //
+ if (content->section_delivery_annotation.length > 0)
+ advance(&cursor, &buf,
+ content->section_delivery_annotation.hdr_length + content->section_delivery_annotation.length,
+ 0, 0);
+
+ //
+ // Send remaining partial buffer
+ //
+ if (buf) {
+ size_t len = dx_buffer_size(buf) - (cursor - dx_buffer_base(buf));
+ advance(&cursor, &buf, len, send_handler, (void*) pnl);
+ }
+
+ // Fall through to process the remaining buffers normally
+ }
- // TODO - Handle cases where annotations have been added or modified
while (buf) {
- pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
+ pn_link_send(pnl, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}
@@ -557,29 +671,6 @@ static int dx_check_field_LH(dx_message_content_t *content,
static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t depth)
{
- static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
- static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70";
- static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
- static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71";
- static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
- static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72";
- static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
- static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73";
- static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
- static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
- static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
- static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75";
- static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
- static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76";
- static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
- static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77";
- static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
- static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78";
- static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0";
- static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1";
- static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0";
- static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
-
dx_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer)
diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h
index ca39a84660..27b81bbb4c 100644
--- a/qpid/extras/dispatch/src/message_private.h
+++ b/qpid/extras/dispatch/src/message_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,10 +45,11 @@
*/
typedef struct {
- dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
- size_t offset; // Offset in the buffer to the first octet
- size_t length; // Length of the field or zero if unneeded
- int parsed; // non-zero iff the buffer chain has been parsed to find this field
+ dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
+ size_t offset; // Offset in the buffer to the first octet
+ size_t length; // Length of the field or zero if unneeded
+ size_t hdr_length; // Length of the field's header (not included in the length of the field)
+ int parsed; // non-zero iff the buffer chain has been parsed to find this field
} dx_field_location_t;
@@ -58,12 +59,15 @@ typedef struct {
// 1) Received message is held and forwarded unmodified - single buffer list
// 2) Received message is held and modified before forwarding - two buffer lists
// 3) Message is composed internally - single buffer list
+// TODO - provide a way to allocate a message without a lock for the link-routing case.
+// It's likely that link-routing will cause no contention for the message content.
+//
typedef struct {
sys_mutex_t *lock;
- uint32_t ref_count; // The number of qmessages referencing this
+ uint32_t ref_count; // The number of messages referencing this
dx_buffer_list_t buffers; // The buffer chain containing the message
- pn_delivery_t *in_delivery; // The delivery on which the message arrived
+ dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations
dx_field_location_t section_message_header; // The message header list
dx_field_location_t section_delivery_annotation; // The delivery annotation map
dx_field_location_t section_message_annotation; // The message annotation map
@@ -78,12 +82,12 @@ typedef struct {
dx_buffer_t *parse_buffer;
unsigned char *parse_cursor;
dx_message_depth_t parse_depth;
+ dx_parsed_field_t *parsed_delivery_annotations;
} dx_message_content_t;
typedef struct {
- DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t
+ DEQ_LINKS(dx_message_t); // Deque linkage that overlays the dx_message_t
dx_message_content_t *content;
- pn_delivery_t *out_delivery;
} dx_message_pvt_t;
ALLOC_DECLARE(dx_message_t);
diff --git a/qpid/extras/dispatch/src/parse.c b/qpid/extras/dispatch/src/parse.c
index 3b47ad216a..c5ecc4e498 100644
--- a/qpid/extras/dispatch/src/parse.c
+++ b/qpid/extras/dispatch/src/parse.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,13 +37,14 @@ ALLOC_DECLARE(dx_parsed_field_t);
ALLOC_DEFINE(dx_parsed_field_t);
-static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count)
+static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen)
{
if (dx_field_iterator_end(iter))
return "Insufficient Data to Determine Tag";
- *tag = dx_field_iterator_octet(iter);
- *count = 0;
- *length = 0;
+ *tag = dx_field_iterator_octet(iter);
+ *count = 0;
+ *length = 0;
+ *clen = 0;
switch (*tag & 0xF0) {
case 0x40: *length = 0; break;
@@ -59,7 +60,7 @@ static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *le
*length += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
*length += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
// fall through to the next case
-
+
case 0xA0:
case 0xC0:
case 0xE0:
@@ -78,19 +79,24 @@ static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *le
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
+ *clen = 3;
// fall through to the next case
-
+
case 0xC0:
case 0xE0:
if (dx_field_iterator_end(iter))
return "Insufficient Data to Determine Count";
*count += (unsigned int) dx_field_iterator_octet(iter);
+ *clen += 1;
break;
}
if ((*tag == DX_AMQP_MAP8 || *tag == DX_AMQP_MAP32) && (*count & 1))
return "Odd Number of Elements in a Map";
+ if (*clen > *length)
+ return "Insufficient Length to Determine Count";
+
return 0;
}
@@ -108,13 +114,13 @@ static dx_parsed_field_t *dx_parse_internal(dx_field_iterator_t *iter, dx_parsed
uint32_t length;
uint32_t count;
+ uint32_t length_of_count;
- field->parse_error = get_type_info(iter, &field->tag, &length, &count);
+ field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count);
if (!field->parse_error) {
field->raw_iter = dx_field_iterator_sub(iter, length);
- if (count == 0 && length > 0)
- dx_field_iterator_advance(iter, length);
+ dx_field_iterator_advance(iter, length - length_of_count);
for (uint32_t idx = 0; idx < count; idx++) {
dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field);
DEQ_INSERT_TAIL(field->children, child);
@@ -377,4 +383,3 @@ dx_parsed_field_t *dx_parse_value_by_key(dx_parsed_field_t *field, const char *k
return 0;
}
-
diff --git a/qpid/extras/dispatch/src/posix/threading.c b/qpid/extras/dispatch/src/posix/threading.c
index 8edce86cdc..1d7760ca88 100644
--- a/qpid/extras/dispatch/src/posix/threading.c
+++ b/qpid/extras/dispatch/src/posix/threading.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -103,7 +103,7 @@ void sys_cond_signal_all(sys_cond_t *cond)
struct sys_thread_t {
pthread_t thread;
-};
+};
sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg)
{
@@ -123,4 +123,3 @@ void sys_thread_join(sys_thread_t *thread)
{
pthread_join(thread->thread, 0);
}
-
diff --git a/qpid/extras/dispatch/src/py/config/__init__.py b/qpid/extras/dispatch/src/py/config/__init__.py
deleted file mode 100644
index ebaeea5203..0000000000
--- a/qpid/extras/dispatch/src/py/config/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from config.parser import DXConfig
diff --git a/qpid/extras/dispatch/src/py/config/parser.py b/qpid/extras/dispatch/src/py/config/parser.py
deleted file mode 100644
index 7619b91461..0000000000
--- a/qpid/extras/dispatch/src/py/config/parser.py
+++ /dev/null
@@ -1,337 +0,0 @@
-##
-## Licensed to the Apache Software Foundation (ASF) under one
-## or more contributor license agreements. See the NOTICE file
-## distributed with this work for additional information
-## regarding copyright ownership. The ASF licenses this file
-## to you under the Apache License, Version 2.0 (the
-## "License"); you may not use this file except in compliance
-## with the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing,
-## software distributed under the License is distributed on an
-## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-## KIND, either express or implied. See the License for the
-## specific language governing permissions and limitations
-## under the License
-##
-
-import json
-from schema import config_schema
-from dispatch import LogAdapter, LOG_TRACE, LOG_ERROR, LOG_INFO
-
-class Section:
- """
- """
-
- def __init__(self, name, kv_pairs, schema_section):
- self.name = name
- self.schema_section = schema_section
- self.values = schema_section.check_and_default(kv_pairs)
- self.index = schema_section.index_of(kv_pairs)
-
- def __repr__(self):
- return "%r" % self.values
-
-
-class ConfigMain:
- """
- """
-
- def __init__(self, schema):
- self.sections_by_name = {}
- self.sections_by_index = {}
- self.schema = schema
-
-
- def update(self, raw_config):
- for sec_map in raw_config:
- name = sec_map.keys()[0]
- kv = sec_map[name]
- schema_section = self.schema.sections[name]
- sec = Section(name, kv, schema_section)
- if name not in self.sections_by_name:
- self.sections_by_name[name] = []
- self.sections_by_name[name].append(sec)
- self.sections_by_index[sec.index] = sec
- self._expand_references()
-
-
- def item_count(self, name):
- if name in self.sections_by_name:
- return len(self.sections_by_name[name])
- return 0
-
-
- def get_value(self, name, idx, key):
- if name in self.sections_by_name:
- sec = self.sections_by_name[name]
- if idx <= len(sec):
- if key in sec[idx].values:
- return sec[idx].values[key]
- return None
-
-
- def _expand_references(self):
- for name, sec_list in self.sections_by_name.items():
- for sec in sec_list:
- for k,v in sec.values.items():
- if sec.schema_section.is_expandable(k):
- ref_name = "%s:%s" % (k, v)
- if ref_name in self.sections_by_index:
- ref_section = self.sections_by_index[ref_name]
- for ek,ev in ref_section.values.items():
- if ref_section.schema_section.expand_copy(ek):
- sec.values[ek] = ev
-
-
-
-
-SECTION_SINGLETON = 0
-SECTION_VALUES = 1
-
-VALUE_TYPE = 0
-VALUE_INDEX = 1
-VALUE_FLAGS = 2
-VALUE_DEFAULT = 3
-
-
-class SchemaSection:
- """
- """
-
- def __init__(self, name, section_tuple):
- self.name = name
- self.singleton = section_tuple[SECTION_SINGLETON]
- self.values = section_tuple[SECTION_VALUES]
- self.index_keys = []
- finding_index = True
- index_ord = 0
- while finding_index:
- finding_index = False
- for k,v in self.values.items():
- if v[VALUE_INDEX] == index_ord:
- self.index_keys.append(k)
- index_ord += 1
- finding_index = True
-
-
- def is_mandatory(self, key):
- return self.values[key][VALUE_FLAGS].find('M') >= 0
-
-
- def is_expandable(self, key):
- return self.values[key][VALUE_FLAGS].find('E') >= 0
-
-
- def expand_copy(self, key):
- return self.values[key][VALUE_FLAGS].find('S') >= 0
-
-
- def default_value(self, key):
- return self.values[key][VALUE_DEFAULT]
-
-
- def check_and_default(self, kv_map):
- copy = {}
- for k,v in self.values.items():
- if k not in kv_map:
- if self.is_mandatory(k):
- raise Exception("In section '%s', missing mandatory key '%s'" % (self.name, k))
- else:
- copy[k] = self.default_value(k)
- for k,v in kv_map.items():
- if k not in self.values:
- raise Exception("In section '%s', unknown key '%s'" % (self.name, k))
- copy[k] = v
- return copy
-
-
- def index_of(self, kv_map):
- result = self.name
- for key in self.index_keys:
- result += ':%s' % kv_map[key]
- if result == "":
- result = "SINGLE"
- return result
-
-
-class Schema:
- """
- """
-
- def __init__(self):
- self.sections = {}
- for k,v in config_schema.items():
- self.sections[k] = SchemaSection(k, v)
-
-
-
-class DXConfig:
- """
- Configuration File Parser for Qpid Dispatch
-
- Configuration files are made up of "sections" having the following form:
-
- section-name {
- key0: value0
- key1: value1
- ...
- keyN: valueN
- }
-
- Sections may be repeated (i.e. there may be multiple instances with the same section name).
- The keys must be unique within a section. Values can be of string or integer types. No
- quoting is necessary anywhere in the configuration file. Values may contain whitespace.
-
- Comment lines starting with the '#' character will be ignored.
-
- This parser converts the configuration file into a json string where the file is represented
- as a list of maps. Each map has one item, the key being the section name and the value being
- a nested map of keys and values from the file. This json string is parsed into a data
- structure that may then be queried.
-
- """
-
- def __init__(self, path):
- self.path = path
- self.raw_config = None
- self.config = None
- self.schema = Schema()
- self.log = LogAdapter('config.parser')
-
-
- def read_file(self):
- try:
- self.log.log(LOG_INFO, "Reading Configuration File: %s" % self.path)
- cfile = open(self.path)
- text = cfile.read()
- cfile.close()
-
- self.json_text = "[" + self._toJson(text) + "]"
- self.raw_config = json.loads(self.json_text);
- self._validate_raw_config()
- self._process_schema()
- except Exception, E:
- print "Exception in read_file: %r" % E
- raise
-
-
- def __repr__(self):
- return "%r" % self.config
-
-
- def _toJson(self, text):
- lines = text.split('\n')
- stripped = ""
- for line in lines:
- sline = line.strip()
-
- #
- # Ignore empty lines
- #
- if len(sline) == 0:
- continue
-
- #
- # Ignore comment lines
- #
- if sline.find('#') == 0:
- continue
-
- #
- # Convert section opens, closes, and colon-separated key:value lines into json
- #
- if sline[-1:] == '{':
- sline = '{"' + sline[:-1].strip() + '" : {'
- elif sline == '}':
- sline = '}},'
- else:
- colon = sline.find(':')
- if colon > 1:
- sline = '"' + sline[:colon] + '":"' + sline[colon+1:].strip() + '",'
- stripped += sline
-
- #
- # Remove the trailing commas in map entries
- #
- stripped = stripped.replace(",}", "}")
-
- #
- # Return the entire document minus the trailing comma
- #
- return stripped[:-1]
-
-
- def _validate_raw_config(self):
- """
- Ensure that the configuration is well-formed. Once this is validated,
- further functions can assume a well-formed data structure is in place.
- """
- if self.raw_config.__class__ != list:
- raise Exception("Invalid Config: Expected List at top level")
- for section in self.raw_config:
- if section.__class__ != dict:
- raise Exception("Invalid Config: List items must be maps")
- if len(section) != 1:
- raise Exception("Invalid Config: Map must have only one entry")
- for key,val in section.items():
- if key.__class__ != str and key.__class__ != unicode:
- raise Exception("Invalid Config: Key in map must be a string")
- if val.__class__ != dict:
- raise Exception("Invalid Config: Value in map must be a map")
- for k,v in val.items():
- if k.__class__ != str and k.__class__ != unicode:
- raise Exception("Invalid Config: Key in section must be a string")
- if v.__class__ != str and v.__class__ != unicode:
- raise Exception("Invalid Config: Value in section must be a string")
-
-
- def _process_schema(self):
- self.config = ConfigMain(self.schema)
- self.config.update(self.raw_config)
- self.raw_config = None
-
-
- def item_count(self, section):
- """
- Return the number of items in a section (i.e. the number if instances of a section-name).
- """
- result = self.config.item_count(section)
- return result
-
-
- def _value(self, section, idx, key):
- return self.config.get_value(section, idx, key)
-
-
- def value_string(self, section, idx, key):
- """
- Return the string value for the key in the idx'th item in the section.
- """
- value = self._value(section, idx, key)
- if value:
- return str(value)
- return None
-
-
- def value_int(self, section, idx, key):
- """
- Return the integer value for the key in the idx'th item in the section.
- """
- value = self._value(section, idx, key)
- return long(value)
-
-
- def value_bool(self, section, idx, key):
- """
- Return the boolean value for the key in the idx'th item in the section.
- """
- value = self._value(section, idx, key)
- if value:
- if str(value) != "no":
- return True
- return None
-
-
diff --git a/qpid/extras/dispatch/src/py/config/schema.py b/qpid/extras/dispatch/src/py/config/schema.py
deleted file mode 100644
index 545139f0df..0000000000
--- a/qpid/extras/dispatch/src/py/config/schema.py
+++ /dev/null
@@ -1,82 +0,0 @@
-##
-## Licensed to the Apache Software Foundation (ASF) under one
-## or more contributor license agreements. See the NOTICE file
-## distributed with this work for additional information
-## regarding copyright ownership. The ASF licenses this file
-## to you under the Apache License, Version 2.0 (the
-## "License"); you may not use this file except in compliance
-## with the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing,
-## software distributed under the License is distributed on an
-## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-## KIND, either express or implied. See the License for the
-## specific language governing permissions and limitations
-## under the License
-##
-
-#
-# config_schema =
-# { <section_name> :
-# (<singleton>,
-# {<key> : (<value-type>, <index>, <flags>, <default-value>)
-# )
-# }
-#
-# <section-name> = String name of a configuration section
-# <singleton> = False => There may be 0 or more sections with this name
-# True => There must be exactly one section with this name
-# <key> = String key of a section's key-value pair
-# <value-type> = Python type for the value
-# <index> = None => This value is not an index for multiple sections
-# >= 0 => Ordinal of this value in the section primary-key
-# <flags> = Set of characters:
-# M = Mandatory (no default value)
-# E = Expand referenced section into this record
-# S = During expansion, this key should be copied
-# <default-value> = If not mandatory and not specified, the value defaults to this
-# value
-#
-
-config_schema = {
- 'container' : (True, {
- 'worker-threads' : (int, None, "", 1),
- 'container-name' : (str, None, "", None)
- }),
- 'ssl-profile' : (False, {
- 'name' : (str, 0, "M"),
- 'cert-db' : (str, None, "S", None),
- 'cert-file' : (str, None, "S", None),
- 'key-file' : (str, None, "S", None),
- 'password-file' : (str, None, "S", None),
- 'password' : (str, None, "S", None)
- }),
- 'listener' : (False, {
- 'addr' : (str, 0, "M"),
- 'port' : (str, 1, "M"),
- 'label' : (str, None, "", None),
- 'sasl-mechanisms' : (str, None, "M"),
- 'ssl-profile' : (str, None, "E", None),
- 'require-peer-auth' : (bool, None, "", True),
- 'allow-unsecured' : (bool, None, "", False)
- }),
- 'connector' : (False, {
- 'addr' : (str, 0, "M"),
- 'port' : (str, 1, "M"),
- 'label' : (str, None, "", None),
- 'sasl-mechanisms' : (str, None, "M"),
- 'ssl-profile' : (str, None, "E", None),
- 'allow-redirect' : (bool, None, "", True)
- }),
- 'router' : (True, {
- 'router-id' : (str, None, "M"),
- 'area' : (str, None, "", None),
- 'hello-interval' : (int, None, "", 1),
- 'hello-max-age' : (int, None, "", 3),
- 'ra-interval' : (int, None, "", 30),
- 'remote-ls-max-age' : (int, None, "", 60),
- 'mobile-addr-max-age' : (int, None, "", 60)
- })}
-
diff --git a/qpid/extras/dispatch/src/py/router/__init__.py b/qpid/extras/dispatch/src/py/router/__init__.py
deleted file mode 100644
index bfc600d469..0000000000
--- a/qpid/extras/dispatch/src/py/router/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from router.router_engine import *
diff --git a/qpid/extras/dispatch/src/py/router/adapter.py b/qpid/extras/dispatch/src/py/router/adapter.py
deleted file mode 100644
index 76c5a3ea48..0000000000
--- a/qpid/extras/dispatch/src/py/router/adapter.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-ENTRY_OLD = 1
-ENTRY_CURRENT = 2
-ENTRY_NEW = 3
-
-class AdapterEngine(object):
- """
- This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop).
- Key binding lists are kept in disjoint key-classes that can come from different parts of the router
- (i.e. topological keys for inter-router communication and mobile keys for end users).
-
- For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the
- routing tables to be efficiently communicated to the adapter in the form of table deltas.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.key_classes = {} # map [key_class] => (addr-key, next-hop)
-
-
- def tick(self, now):
- """
- There is no periodic processing needed for this module.
- """
- pass
-
-
- def remote_routes_changed(self, key_class, new_table):
- old_table = []
- if key_class in self.key_classes:
- old_table = self.key_classes[key_class]
-
- # flag all of the old entries
- old_flags = {}
- for a,b in old_table:
- old_flags[(a,b)] = ENTRY_OLD
-
- # flag the new entries
- new_flags = {}
- for a,b in new_table:
- new_flags[(a,b)] = ENTRY_NEW
-
- # calculate the differences from old to new
- for a,b in new_table:
- if old_table.count((a,b)) > 0:
- old_flags[(a,b)] = ENTRY_CURRENT
- new_flags[(a,b)] = ENTRY_CURRENT
-
- # make to_add and to_delete lists
- to_add = []
- to_delete = []
- for (a,b),f in old_flags.items():
- if f == ENTRY_OLD:
- to_delete.append((a,b))
- for (a,b),f in new_flags.items():
- if f == ENTRY_NEW:
- to_add.append((a,b))
-
- # set the routing table to the new contents
- self.key_classes[key_class] = new_table
-
- # update the adapter's routing tables
- # Note: Do deletions before adds to avoid overlapping routes that may cause
- # messages to be duplicated. It's better to have gaps in the routing
- # tables momentarily because unroutable messages are stored for retry.
- for a,b in to_delete:
- self.container.router_adapter.remote_unbind(a, b)
- for a,b in to_add:
- self.container.router_adapter.remote_bind(a, b)
-
- self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class)
- for a,b in new_table:
- self.container.log(LOG_INFO, " %s => %s" % (a, b))
-
-
diff --git a/qpid/extras/dispatch/src/py/router/binding.py b/qpid/extras/dispatch/src/py/router/binding.py
deleted file mode 100644
index cd0641d12f..0000000000
--- a/qpid/extras/dispatch/src/py/router/binding.py
+++ /dev/null
@@ -1,136 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-
-class BindingEngine(object):
- """
- This module is responsible for responding to two different events:
- 1) The learning of new remote mobile addresses
- 2) The change of topology (i.e. different next-hops for remote routers)
- When these occur, this module converts the mobile routing table (address => router)
- to a next-hop routing table (address => next-hop), compresses the keys in case there
- are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.current_keys = {}
-
-
- def tick(self, now):
- pass
-
-
- def mobile_keys_changed(self, keys):
- self.current_keys = keys
- next_hop_keys = self._convert_ids_to_next_hops(keys)
- routing_table = self._compress_keys(next_hop_keys)
- self.container.remote_routes_changed('mobile-key', routing_table)
-
-
- def next_hops_changed(self):
- next_hop_keys = self._convert_ids_to_next_hops(self.current_keys)
- routing_table = self._compress_keys(next_hop_keys)
- self.container.remote_routes_changed('mobile-key', routing_table)
-
-
- def _convert_ids_to_next_hops(self, keys):
- next_hops = self.container.get_next_hops()
- new_keys = {}
- for _id, value in keys.items():
- if _id in next_hops:
- next_hop = next_hops[_id]
- if next_hop not in new_keys:
- new_keys[next_hop] = []
- new_keys[next_hop].extend(value)
- return new_keys
-
-
- def _compress_keys(self, keys):
- trees = {}
- for _id, key_list in keys.items():
- trees[_id] = TopicElementList()
- for key in key_list:
- trees[_id].add_key(key)
- routing_table = []
- for _id, tree in trees.items():
- tree_keys = tree.get_list()
- for tk in tree_keys:
- routing_table.append((tk, _id))
- return routing_table
-
-
-class TopicElementList(object):
- """
- """
- def __init__(self):
- self.elements = {} # map text => (terminal, sub-list)
-
- def __repr__(self):
- return "%r" % self.elements
-
- def add_key(self, key):
- self.add_tokens(key.split('.'))
-
- def add_tokens(self, tokens):
- first = tokens.pop(0)
- terminal = len(tokens) == 0
-
- if terminal and first == '#':
- ## Optimization #1A (A.B.C.D followed by A.B.#)
- self.elements = {'#':(True, TopicElementList())}
- return
-
- if '#' in self.elements:
- _t,_el = self.elements['#']
- if _t:
- ## Optimization #1B (A.B.# followed by A.B.C.D)
- return
-
- if first not in self.elements:
- self.elements[first] = (terminal, TopicElementList())
- else:
- _t,_el = self.elements[first]
- if terminal and not _t:
- self.elements[first] = (terminal, _el)
-
- if not terminal:
- _t,_el = self.elements[first]
- _el.add_tokens(tokens)
-
- def get_list(self):
- keys = []
- for token, (_t,_el) in self.elements.items():
- if _t: keys.append(token)
- _el.build_list(token, keys)
- return keys
-
- def build_list(self, prefix, keys):
- for token, (_t,_el) in self.elements.items():
- if _t: keys.append("%s.%s" % (prefix, token))
- _el.build_list("%s.%s" % (prefix, token), keys)
-
-
-
diff --git a/qpid/extras/dispatch/src/py/router/configuration.py b/qpid/extras/dispatch/src/py/router/configuration.py
deleted file mode 100644
index f87d2ee7d2..0000000000
--- a/qpid/extras/dispatch/src/py/router/configuration.py
+++ /dev/null
@@ -1,47 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-class Configuration(object):
- """
- This module manages and holds the configuration and tuning parameters for a router.
- """
- def __init__(self, overrides={}):
- ##
- ## Load default values
- ##
- self.values = { 'hello_interval' : 1.0,
- 'hello_max_age' : 3.0,
- 'ra_interval' : 30.0,
- 'remote_ls_max_age' : 60.0,
- 'mobile_addr_max_age' : 60.0 }
-
- ##
- ## Apply supplied overrides
- ##
- for k, v in overrides.items():
- self.values[k] = v
-
- def __getattr__(self, key):
- if key in self.values:
- return self.values[key]
- raise KeyError
-
- def __repr__(self):
- return "%r" % self.values
-
diff --git a/qpid/extras/dispatch/src/py/router/data.py b/qpid/extras/dispatch/src/py/router/data.py
deleted file mode 100644
index 79d6a0d0fe..0000000000
--- a/qpid/extras/dispatch/src/py/router/data.py
+++ /dev/null
@@ -1,275 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-
-def getMandatory(data, key, cls=None):
- """
- Get the value mapped to the requested key. If it's not present, raise an exception.
- """
- if key in data:
- value = data[key]
- if cls and value.__class__ != cls:
- raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls))
- return value
- raise Exception("Mandatory protocol field missing: '%s'" % key)
-
-
-def getOptional(data, key, default=None, cls=None):
- """
- Get the value mapped to the requested key. If it's not present, return the default value.
- """
- if key in data:
- value = data[key]
- if cls and value.__class__ != cls:
- raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls))
- return value
- return default
-
-
-class LinkState(object):
- """
- The link-state of a single router. The link state consists of a list of neighbor routers reachable from
- the reporting router. The link-state-sequence number is incremented each time the link state changes.
- """
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None):
- self.last_seen = 0
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.ls_seq = getMandatory(body, 'ls_seq', long)
- self.peers = getMandatory(body, 'peers', list)
- else:
- self.id = _id
- self.area = _area
- self.ls_seq = long(_ls_seq)
- self.peers = _peers
-
- def __repr__(self):
- return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'ls_seq' : self.ls_seq,
- 'peers' : self.peers}
-
- def add_peer(self, _id):
- if self.peers.count(_id) == 0:
- self.peers.append(_id)
- return True
- return False
-
- def del_peer(self, _id):
- if self.peers.count(_id) > 0:
- self.peers.remove(_id)
- return True
- return False
-
- def bump_sequence(self):
- self.ls_seq += 1
-
-
-class MessageHELLO(object):
- """
- HELLO Message
- scope: neighbors only - HELLO messages travel at most one hop
- This message is used by directly connected routers to determine with whom they have
- bidirectional connectivity.
- """
- def __init__(self, body, _id=None, _area=None, _seen_peers=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.seen_peers = getMandatory(body, 'seen', list)
- else:
- self.id = _id
- self.area = _area
- self.seen_peers = _seen_peers
-
- def __repr__(self):
- return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers)
-
- def get_opcode(self):
- return 'HELLO'
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'seen' : self.seen_peers}
-
- def is_seen(self, _id):
- return self.seen_peers.count(_id) > 0
-
-
-class MessageRA(object):
- """
- Router Advertisement (RA) Message
- scope: all routers in the area and all designated routers
- This message is sent periodically to indicate the originating router's sequence numbers
- for link-state and mobile-address-state.
- """
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.ls_seq = getMandatory(body, 'ls_seq', long)
- self.mobile_seq = getMandatory(body, 'mobile_seq', long)
- else:
- self.id = _id
- self.area = _area
- self.ls_seq = long(_ls_seq)
- self.mobile_seq = long(_mobile_seq)
-
- def get_opcode(self):
- return 'RA'
-
- def __repr__(self):
- return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \
- (self.id, self.area, self.ls_seq, self.mobile_seq)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'ls_seq' : self.ls_seq,
- 'mobile_seq' : self.mobile_seq}
-
-
-class MessageLSU(object):
- """
- """
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.ls_seq = getMandatory(body, 'ls_seq', long)
- self.ls = LinkState(getMandatory(body, 'ls', dict))
- else:
- self.id = _id
- self.area = _area
- self.ls_seq = long(_ls_seq)
- self.ls = _ls
-
- def get_opcode(self):
- return 'LSU'
-
- def __repr__(self):
- return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \
- (self.id, self.area, self.ls_seq, self.ls)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'ls_seq' : self.ls_seq,
- 'ls' : self.ls.to_dict()}
-
-
-class MessageLSR(object):
- """
- """
- def __init__(self, body, _id=None, _area=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- else:
- self.id = _id
- self.area = _area
-
- def get_opcode(self):
- return 'LSR'
-
- def __repr__(self):
- return "LSR(id=%s area=%s)" % (self.id, self.area)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area}
-
-
-class MessageMAU(object):
- """
- """
- def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.mobile_seq = getMandatory(body, 'mobile_seq', long)
- self.add_list = getOptional(body, 'add', None, list)
- self.del_list = getOptional(body, 'del', None, list)
- self.exist_list = getOptional(body, 'exist', None, list)
- else:
- self.id = _id
- self.area = _area
- self.mobile_seq = long(_seq)
- self.add_list = _add_list
- self.del_list = _del_list
- self.exist_list = _exist_list
-
- def get_opcode(self):
- return 'MAU'
-
- def __repr__(self):
- _add = ''
- _del = ''
- _exist = ''
- if self.add_list: _add = ' add=%r' % self.add_list
- if self.del_list: _del = ' del=%r' % self.del_list
- if self.exist_list: _exist = ' exist=%r' % self.exist_list
- return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \
- (self.id, self.area, self.mobile_seq, _add, _del, _exist)
-
- def to_dict(self):
- body = { 'id' : self.id,
- 'area' : self.area,
- 'mobile_seq' : self.mobile_seq }
- if self.add_list: body['add'] = self.add_list
- if self.del_list: body['del'] = self.del_list
- if self.exist_list: body['exist'] = self.exist_list
- return body
-
-
-class MessageMAR(object):
- """
- """
- def __init__(self, body, _id=None, _area=None, _have_seq=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.have_seq = getMandatory(body, 'have_seq', long)
- else:
- self.id = _id
- self.area = _area
- self.have_seq = long(_have_seq)
-
- def get_opcode(self):
- return 'MAR'
-
- def __repr__(self):
- return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'have_seq' : self.have_seq}
-
diff --git a/qpid/extras/dispatch/src/py/router/link.py b/qpid/extras/dispatch/src/py/router/link.py
deleted file mode 100644
index 1e06d161f6..0000000000
--- a/qpid/extras/dispatch/src/py/router/link.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from data import MessageRA, MessageLSU, MessageLSR
-from time import time
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-class LinkStateEngine(object):
- """
- This module is responsible for running the Link State protocol and maintaining the set
- of link states that are gathered from the domain. It notifies outbound when changes to
- the link-state-collection are detected.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.ra_interval = self.container.config.ra_interval
- self.remote_ls_max_age = self.container.config.remote_ls_max_age
- self.last_ra_time = 0
- self.collection = {}
- self.collection_changed = False
- self.mobile_seq = 0
- self.needed_lsrs = {}
-
-
- def tick(self, now):
- self._expire_ls(now)
- self._send_lsrs()
-
- if now - self.last_ra_time >= self.ra_interval:
- self.last_ra_time = now
- self._send_ra()
-
- if self.collection_changed:
- self.collection_changed = False
- self.container.log(LOG_INFO, "New Link-State Collection:")
- for a,b in self.collection.items():
- self.container.log(LOG_INFO, " %s => %r" % (a, b.peers))
- self.container.ls_collection_changed(self.collection)
-
-
- def handle_ra(self, msg, now):
- if msg.id == self.id:
- return
- if msg.id in self.collection:
- ls = self.collection[msg.id]
- ls.last_seen = now
- if ls.ls_seq < msg.ls_seq:
- self.needed_lsrs[(msg.area, msg.id)] = None
- else:
- self.needed_lsrs[(msg.area, msg.id)] = None
-
-
- def handle_lsu(self, msg, now):
- if msg.id == self.id:
- return
- if msg.id in self.collection:
- ls = self.collection[msg.id]
- if ls.ls_seq < msg.ls_seq:
- ls = msg.ls
- self.collection[msg.id] = ls
- self.collection_changed = True
- ls.last_seen = now
- else:
- ls = msg.ls
- self.collection[msg.id] = ls
- self.collection_changed = True
- ls.last_seen = now
- self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id)
- # Schedule LSRs for any routers referenced in this LS that we don't know about
- for _id in msg.ls.peers:
- if _id not in self.collection:
- self.needed_lsrs[(msg.area, _id)] = None
-
-
- def handle_lsr(self, msg, now):
- if msg.id == self.id:
- return
- if self.id not in self.collection:
- return
- my_ls = self.collection[self.id]
- self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls))
-
-
- def new_local_link_state(self, link_state):
- self.collection[self.id] = link_state
- self.collection_changed = True
- self._send_ra()
-
- def set_mobile_sequence(self, seq):
- self.mobile_seq = seq
-
-
- def get_collection(self):
- return self.collection
-
-
- def _expire_ls(self, now):
- to_delete = []
- for key, ls in self.collection.items():
- if key != self.id and now - ls.last_seen > self.remote_ls_max_age:
- to_delete.append(key)
- for key in to_delete:
- ls = self.collection.pop(key)
- self.collection_changed = True
- self.container.log(LOG_INFO, "Expired link-state from router: %s" % key)
-
-
- def _send_lsrs(self):
- for (_area, _id) in self.needed_lsrs.keys():
- self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area))
- self.needed_lsrs = {}
-
-
- def _send_ra(self):
- ls_seq = 0
- if self.id in self.collection:
- ls_seq = self.collection[self.id].ls_seq
- self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq))
diff --git a/qpid/extras/dispatch/src/py/router/mobile.py b/qpid/extras/dispatch/src/py/router/mobile.py
deleted file mode 100644
index acea759e37..0000000000
--- a/qpid/extras/dispatch/src/py/router/mobile.py
+++ /dev/null
@@ -1,188 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from data import MessageRA, MessageMAR, MessageMAU
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-class MobileAddressEngine(object):
- """
- This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
- It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
- Note that this routing table maps from the mobile address to the remote router where that address
- is directly bound.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
- self.mobile_seq = 0
- self.local_keys = []
- self.added_keys = []
- self.deleted_keys = []
- self.remote_lists = {} # map router_id => (sequence, list of keys)
- self.remote_last_seen = {} # map router_id => time of last seen advertizement/update
- self.remote_changed = False
- self.needed_mars = {}
-
-
- def tick(self, now):
- self._expire_remotes(now)
- self._send_mars()
-
- ##
- ## If local keys have changed, collect the changes and send a MAU with the diffs
- ## Note: it is important that the differential-MAU be sent before a RA is sent
- ##
- if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
- self.mobile_seq += 1
- self.container.send('_topo.%s.all' % self.area,
- MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
- self.local_keys.extend(self.added_keys)
- for key in self.deleted_keys:
- self.local_keys.remove(key)
- self.added_keys = []
- self.deleted_keys = []
- self.container.mobile_sequence_changed(self.mobile_seq)
-
- ##
- ## If remotes have changed, start the process of updating local bindings
- ##
- if self.remote_changed:
- self.remote_changed = False
- self._update_remote_keys()
-
-
- def add_local_address(self, key):
- """
- """
- if self.local_keys.count(key) == 0:
- if self.added_keys.count(key) == 0:
- self.added_keys.append(key)
- else:
- if self.deleted_keys.count(key) > 0:
- self.deleted_keys.remove(key)
-
-
- def del_local_address(self, key):
- """
- """
- if self.local_keys.count(key) > 0:
- if self.deleted_keys.count(key) == 0:
- self.deleted_keys.append(key)
- else:
- if self.added_keys.count(key) > 0:
- self.added_keys.remove(key)
-
-
- def handle_ra(self, msg, now):
- if msg.id == self.id:
- return
-
- if msg.mobile_seq == 0:
- return
-
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- self.remote_last_seen[msg.id] = now
- if _seq < msg.mobile_seq:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
- else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
- def handle_mau(self, msg, now):
- ##
- ## If the MAU is differential, we can only use it if its sequence is exactly one greater
- ## than our stored sequence. If not, we will ignore the content and schedule a MAR.
- ##
- ## If the MAU is absolute, we can use it in all cases.
- ##
- if msg.id == self.id:
- return
-
- if msg.exist_list:
- ##
- ## Absolute MAU
- ##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq >= msg.mobile_seq: # ignore duplicates
- return
- self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
- self.remote_last_seen[msg.id] = now
- self.remote_changed = True
- else:
- ##
- ## Differential MAU
- ##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq == msg.mobile_seq: # ignore duplicates
- return
- self.remote_last_seen[msg.id] = now
- if _seq + 1 == msg.mobile_seq:
- ##
- ## This is one greater than our stored value, incorporate the deltas
- ##
- if msg.add_list and msg.add_list.__class__ == list:
- _list.extend(msg.add_list)
- if msg.del_list and msg.del_list.__class__ == list:
- for key in msg.del_list:
- _list.remove(key)
- self.remote_lists[msg.id] = (msg.mobile_seq, _list)
- self.remote_changed = True
- else:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
- else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
- def handle_mar(self, msg, now):
- if msg.id == self.id:
- return
- if msg.have_seq < self.mobile_seq:
- self.container.send('_topo.%s.%s' % (msg.area, msg.id),
- MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
-
-
- def _update_remote_keys(self):
- keys = {}
- for _id,(seq,key_list) in self.remote_lists.items():
- keys[_id] = key_list
- self.container.mobile_keys_changed(keys)
-
-
- def _expire_remotes(self, now):
- for _id, t in self.remote_last_seen.items():
- if now - t > self.mobile_addr_max_age:
- self.remote_lists.pop(_id)
- self.remote_last_seen.pop(_id)
- self.remote_changed = True
-
-
- def _send_mars(self):
- for _id, _area, _seq in self.needed_mars.keys():
- self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
- self.needed_mars = {}
-
diff --git a/qpid/extras/dispatch/src/py/router/neighbor.py b/qpid/extras/dispatch/src/py/router/neighbor.py
deleted file mode 100644
index 55c6bab62f..0000000000
--- a/qpid/extras/dispatch/src/py/router/neighbor.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from data import LinkState, MessageHELLO
-from time import time
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-
-class NeighborEngine(object):
- """
- This module is responsible for maintaining this router's link-state. It runs the HELLO protocol
- with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the
- link-state) changes.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.last_hello_time = 0.0
- self.hello_interval = container.config.hello_interval
- self.hello_max_age = container.config.hello_max_age
- self.hellos = {}
- self.link_state_changed = False
- self.link_state = LinkState(None, self.id, self.area, 0, [])
-
-
- def tick(self, now):
- self._expire_hellos(now)
-
- if now - self.last_hello_time >= self.hello_interval:
- self.last_hello_time = now
- self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
-
- if self.link_state_changed:
- self.link_state_changed = False
- self.link_state.bump_sequence()
- self.container.local_link_state_changed(self.link_state)
-
-
- def handle_hello(self, msg, now):
- if msg.id == self.id:
- return
- self.hellos[msg.id] = now
- if msg.is_seen(self.id):
- if self.link_state.add_peer(msg.id):
- self.link_state_changed = True
- self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id)
- ##
- ## TODO - Use this function to detect area boundaries
- ##
-
- def _expire_hellos(self, now):
- to_delete = []
- for key, last_seen in self.hellos.items():
- if now - last_seen > self.hello_max_age:
- to_delete.append(key)
- for key in to_delete:
- self.hellos.pop(key)
- if self.link_state.del_peer(key):
- self.link_state_changed = True
- self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
-
-
diff --git a/qpid/extras/dispatch/src/py/router/path.py b/qpid/extras/dispatch/src/py/router/path.py
deleted file mode 100644
index 3be6c40608..0000000000
--- a/qpid/extras/dispatch/src/py/router/path.py
+++ /dev/null
@@ -1,202 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-class PathEngine(object):
- """
- This module is responsible for computing the next-hop for every router/area in the domain
- based on the collection of link states that have been gathered.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.recalculate = False
- self.collection = None
-
-
- def tick(self, now_unused):
- if self.recalculate:
- self.recalculate = False
- self._calculate_routes()
-
-
- def ls_collection_changed(self, collection):
- self.recalculate = True
- self.collection = collection
-
-
- def _calculate_tree_from_root(self, root):
- ##
- ## Make a copy of the current collection of link-states that contains
- ## an empty link-state for nodes that are known-peers but are not in the
- ## collection currently. This is needed to establish routes to those nodes
- ## so we can trade link-state information with them.
- ##
- link_states = {}
- for _id, ls in self.collection.items():
- link_states[_id] = ls.peers
- for p in ls.peers:
- if p not in link_states:
- link_states[p] = []
-
- ##
- ## Setup Dijkstra's Algorithm
- ##
- cost = {}
- prev = {}
- for _id in link_states:
- cost[_id] = None # infinite
- prev[_id] = None # undefined
- cost[root] = 0 # no cost to the root node
- unresolved = NodeSet(cost)
-
- ##
- ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found.
- ##
- while not unresolved.empty():
- u = unresolved.lowest_cost()
- if cost[u] == None:
- # There are no more reachable nodes in unresolved
- break
- for v in link_states[u]:
- if unresolved.contains(v):
- alt = cost[u] + 1 # TODO - Use link cost instead of 1
- if cost[v] == None or alt < cost[v]:
- cost[v] = alt
- prev[v] = u
- unresolved.set_cost(v, alt)
-
- ##
- ## Remove unreachable nodes from the map. Note that this will also remove the
- ## root node (has no previous node) from the map.
- ##
- for u, val in prev.items():
- if not val:
- prev.pop(u)
-
- ##
- ## Return previous-node map. This is a map of all reachable, remote nodes to
- ## their predecessor node.
- ##
- return prev
-
-
- def _calculate_routes(self):
- ##
- ## Generate the shortest-path tree with the local node as root
- ##
- prev = self._calculate_tree_from_root(self.id)
- nodes = prev.keys()
-
- ##
- ## Distill the path tree into a map of next hops for each node
- ##
- next_hops = {}
- while len(nodes) > 0:
- u = nodes[0] # pick any destination
- path = [u]
- nodes.remove(u)
- v = prev[u]
- while v != self.id: # build a list of nodes in the path back to the root
- if v in nodes:
- path.append(v)
- nodes.remove(v)
- u = v
- v = prev[u]
- for w in path: # mark each node in the path as reachable via the next hop
- next_hops[w] = u
-
- ##
- ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest
- ## for which the path from origin to dest passes through us. This is the set
- ## of valid origins for forwarding to the destination.
- ##
-
- self.container.next_hops_changed(next_hops)
-
-
-class NodeSet(object):
- """
- This data structure is an ordered list of node IDs, sorted in increasing order by their cost.
- Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and
- repeatable ordering.
- """
- def __init__(self, cost_map):
- self.nodes = []
- for _id, cost in cost_map.items():
- ##
- ## Assume that nodes are either unreachable (cost = None) or local (cost = 0)
- ## during this initialization.
- ##
- if cost == 0:
- self.nodes.insert(0, (_id, cost))
- else:
- ##
- ## There is no need to sort unreachable nodes by ID
- ##
- self.nodes.append((_id, cost))
-
-
- def __repr__(self):
- return self.nodes.__repr__()
-
-
- def empty(self):
- return len(self.nodes) == 0
-
-
- def contains(self, _id):
- for a, b in self.nodes:
- if a == _id:
- return True
- return False
-
-
- def lowest_cost(self):
- """
- Remove and return the lowest cost node ID.
- """
- _id, cost = self.nodes.pop(0)
- return _id
-
-
- def set_cost(self, _id, new_cost):
- """
- Set the cost for an ID in the NodeSet and re-insert the ID so that the list
- remains sorted in increasing cost order.
- """
- index = 0
- for i, c in self.nodes:
- if i == _id:
- break
- index += 1
- self.nodes.pop(index)
-
- index = 0
- for i, c in self.nodes:
- if c == None or new_cost < c or (new_cost == c and _id < i):
- break
- index += 1
-
- self.nodes.insert(index, (_id, new_cost))
diff --git a/qpid/extras/dispatch/src/py/router/router_engine.py b/qpid/extras/dispatch/src/py/router/router_engine.py
deleted file mode 100644
index 065204ad62..0000000000
--- a/qpid/extras/dispatch/src/py/router/router_engine.py
+++ /dev/null
@@ -1,255 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from time import time
-from uuid import uuid4
-
-from configuration import Configuration
-from data import *
-from neighbor import NeighborEngine
-from link import LinkStateEngine
-from path import PathEngine
-from mobile import MobileAddressEngine
-from routing import RoutingTableEngine
-from binding import BindingEngine
-from adapter import AdapterEngine
-
-##
-## Import the Dispatch adapters from the environment. If they are not found
-## (i.e. we are in a test bench, etc.), load the stub versions.
-##
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-
-class RouterEngine:
- """
- """
-
- def __init__(self, router_adapter, router_id=None, area='area', config_override={}):
- """
- Initialize an instance of a router for a domain.
- """
- ##
- ## Record important information about this router instance
- ##
- self.domain = "domain"
- self.router_adapter = router_adapter
- self.log_adapter = LogAdapter("dispatch.router")
- self.io_adapter = IoAdapter(self, "qdxrouter")
-
- if router_id:
- self.id = router_id
- else:
- self.id = str(uuid4())
- self.area = area
- self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id))
-
- ##
- ## Setup configuration
- ##
- self.config = Configuration(config_override)
- self.log(LOG_INFO, "Config: %r" % self.config)
-
- ##
- ## Launch the sub-module engines
- ##
- self.neighbor_engine = NeighborEngine(self)
- self.link_state_engine = LinkStateEngine(self)
- self.path_engine = PathEngine(self)
- self.mobile_address_engine = MobileAddressEngine(self)
- self.routing_table_engine = RoutingTableEngine(self)
- self.binding_engine = BindingEngine(self)
- self.adapter_engine = AdapterEngine(self)
-
-
-
- ##========================================================================================
- ## Adapter Entry Points - invoked from the adapter
- ##========================================================================================
- def getId(self):
- """
- Return the router's ID
- """
- return self.id
-
-
- def addLocalAddress(self, key):
- """
- """
- try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
- return
- self.mobile_address_engine.add_local_address(key)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
-
- def delLocalAddress(self, key):
- """
- """
- try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
- return
- self.mobile_address_engine.del_local_address(key)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
-
-
- def handleTimerTick(self):
- """
- """
- try:
- now = time()
- self.neighbor_engine.tick(now)
- self.link_state_engine.tick(now)
- self.path_engine.tick(now)
- self.mobile_address_engine.tick(now)
- self.routing_table_engine.tick(now)
- self.binding_engine.tick(now)
- self.adapter_engine.tick(now)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
-
-
- def handleControlMessage(self, opcode, body):
- """
- """
- try:
- now = time()
- if opcode == 'HELLO':
- msg = MessageHELLO(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.neighbor_engine.handle_hello(msg, now)
-
- elif opcode == 'RA':
- msg = MessageRA(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_ra(msg, now)
- self.mobile_address_engine.handle_ra(msg, now)
-
- elif opcode == 'LSU':
- msg = MessageLSU(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_lsu(msg, now)
-
- elif opcode == 'LSR':
- msg = MessageLSR(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_lsr(msg, now)
-
- elif opcode == 'MAU':
- msg = MessageMAU(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mau(msg, now)
-
- elif opcode == 'MAR':
- msg = MessageMAR(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mar(msg, now)
-
- except Exception, e:
- self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
-
-
- def receive(self, message_properties, body):
- """
- This is the IoAdapter message-receive handler
- """
- try:
- self.handleControlMessage(message_properties['opcode'], body)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
- (message_properties, body, e))
-
- def getRouterData(self, kind):
- """
- """
- if kind == 'help':
- return { 'help' : "Get list of supported values for kind",
- 'link-state' : "This router's link state",
- 'link-state-set' : "The set of link states from known routers",
- 'next-hops' : "Next hops to each known router",
- 'topo-table' : "Topological routing table",
- 'mobile-table' : "Mobile key routing table"
- }
- if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict()
- if kind == 'next-hops' : return self.routing_table_engine.next_hops
- if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']}
- if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']}
- if kind == 'link-state-set' :
- copy = {}
- for _id,_ls in self.link_state_engine.collection.items():
- copy[_id] = _ls.to_dict()
- return copy
-
- return {'notice':'Use kind="help" to get a list of possibilities'}
-
-
- ##========================================================================================
- ## Adapter Calls - outbound calls to Dispatch
- ##========================================================================================
- def log(self, level, text):
- """
- Emit a log message to the host's event log
- """
- self.log_adapter.log(level, text)
-
-
- def send(self, dest, msg):
- """
- Send a control message to another router.
- """
- app_props = {'opcode' : msg.get_opcode() }
- self.io_adapter.send(dest, app_props, msg.to_dict())
- self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
-
-
- ##========================================================================================
- ## Interconnect between the Sub-Modules
- ##========================================================================================
- def local_link_state_changed(self, link_state):
- self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
- self.link_state_engine.new_local_link_state(link_state)
-
- def ls_collection_changed(self, collection):
- self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
- self.path_engine.ls_collection_changed(collection)
-
- def next_hops_changed(self, next_hop_table):
- self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
- self.routing_table_engine.next_hops_changed(next_hop_table)
- self.binding_engine.next_hops_changed()
-
- def mobile_sequence_changed(self, mobile_seq):
- self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
- self.link_state_engine.set_mobile_sequence(mobile_seq)
-
- def mobile_keys_changed(self, keys):
- self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
- self.binding_engine.mobile_keys_changed(keys)
-
- def get_next_hops(self):
- return self.routing_table_engine.get_next_hops()
-
- def remote_routes_changed(self, key_class, routes):
- self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
- self.adapter_engine.remote_routes_changed(key_class, routes)
-
diff --git a/qpid/extras/dispatch/src/py/router/routing.py b/qpid/extras/dispatch/src/py/router/routing.py
deleted file mode 100644
index 0d4ceed955..0000000000
--- a/qpid/extras/dispatch/src/py/router/routing.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-try:
- from dispatch import *
-except ImportError:
- from stubs import *
-
-class RoutingTableEngine(object):
- """
- This module is responsible for converting the set of next hops to remote routers to a routing
- table in the "topological" address class.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.next_hops = {}
-
-
- def tick(self, now):
- pass
-
-
- def next_hops_changed(self, next_hops):
- # Convert next_hops into routing table
- self.next_hops = next_hops
- new_table = []
- for _id, next_hop in next_hops.items():
- new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop))
- pair = ('_topo.%s.all' % (self.area), next_hop)
- if new_table.count(pair) == 0:
- new_table.append(pair)
-
- self.container.remote_routes_changed('topological', new_table)
-
-
- def get_next_hops(self):
- return self.next_hops
-
diff --git a/qpid/extras/dispatch/src/py/stubs/__init__.py b/qpid/extras/dispatch/src/py/stubs/__init__.py
deleted file mode 100644
index 98180eadee..0000000000
--- a/qpid/extras/dispatch/src/py/stubs/__init__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from stubs.logadapter import *
-from stubs.ioadapter import *
-
diff --git a/qpid/extras/dispatch/src/py/stubs/ioadapter.py b/qpid/extras/dispatch/src/py/stubs/ioadapter.py
deleted file mode 100644
index 1e465f98c3..0000000000
--- a/qpid/extras/dispatch/src/py/stubs/ioadapter.py
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-class IoAdapter:
- def __init__(self, handler, address):
- self.handler = handler
- self.address = address
-
- def send(self, address, app_properties, body):
- print "IO: send(addr=%s props=%r body=%r" % (address, app_properties, body)
-
diff --git a/qpid/extras/dispatch/src/py/stubs/logadapter.py b/qpid/extras/dispatch/src/py/stubs/logadapter.py
deleted file mode 100644
index 3d717d3ed2..0000000000
--- a/qpid/extras/dispatch/src/py/stubs/logadapter.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-LOG_TRACE = 1
-LOG_DEBUG = 2
-LOG_INFO = 4
-LOG_NOTICE = 8
-LOG_WARNING = 16
-LOG_ERROR = 32
-LOG_CRITICAL = 64
-
-class LogAdapter:
- def __init__(self, mod_name):
- self.mod_name = mod_name
-
- def log(self, level, text):
- print "LOG: mod=%s level=%d text=%s" % (self.mod_name, level, text)
diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c
index 6b5250a34a..0b0cc11025 100644
--- a/qpid/extras/dispatch/src/python_embedded.c
+++ b/qpid/extras/dispatch/src/python_embedded.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = {
typedef struct {
PyObject_HEAD
PyObject *handler;
+ PyObject *handler_rx_call;
dx_dispatch_t *dx;
dx_address_t *address;
} IoAdapter;
@@ -403,9 +404,65 @@ typedef struct {
static void dx_io_rx_handler(void *context, dx_message_t *msg)
{
- //IoAdapter *self = (IoAdapter*) context;
+ IoAdapter *self = (IoAdapter*) context;
+
+ //
+ // Parse the message through the body and exit if the message is not well formed.
+ //
+ if (!dx_message_check(msg, DX_DEPTH_BODY))
+ return;
+
+ //
+ // Get an iterator for the application-properties. Exit if the message has none.
+ //
+ dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES);
+ if (ap == 0)
+ return;
+
+ //
+ // Try to get a map-view of the application-properties.
+ //
+ dx_parsed_field_t *ap_map = dx_parse(ap);
+ if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Get an iterator for the body. Exit if the message has none.
+ //
+ dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY);
+ if (body == 0) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Try to get a map-view of the body.
+ //
+ dx_parsed_field_t *body_map = dx_parse(body);
+ if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) {
+ dx_field_iterator_free(ap);
+ dx_field_iterator_free(body);
+ dx_parse_free(ap_map);
+ dx_parse_free(body_map);
+ return;
+ }
+
+ PyObject *pAP = dx_field_to_py(ap_map);
+ PyObject *pBody = dx_field_to_py(body_map);
- // TODO - Parse the incoming message and send it to the python handler.
+ PyObject *pArgs = PyTuple_New(2);
+ PyTuple_SetItem(pArgs, 0, pAP);
+ PyTuple_SetItem(pArgs, 1, pBody);
+
+ PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
}
@@ -415,9 +472,14 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds)
if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
return -1;
+ self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
+ if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
+ return -1;
+
Py_INCREF(self->handler);
+ Py_INCREF(self->handler_rx_call);
self->dx = dispatch;
- self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self);
+ self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
return 0;
}
@@ -426,24 +488,35 @@ static void IoAdapter_dealloc(IoAdapter* self)
{
dx_router_unregister_address(self->address);
Py_DECREF(self->handler);
+ Py_DECREF(self->handler_rx_call);
self->ob_type->tp_free((PyObject*)self);
}
static PyObject* dx_python_send(PyObject *self, PyObject *args)
{
- IoAdapter *ioa = (IoAdapter*) self;
- const char *address;
- PyObject *app_properties;
- PyObject *body;
+ IoAdapter *ioa = (IoAdapter*) self;
+ dx_composed_field_t *field = 0;
+ const char *address;
+ PyObject *app_properties;
+ PyObject *body;
+
if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body))
return 0;
- dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+ field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
+ dx_compose_start_map(field);
+
+ dx_compose_insert_string(field, "qdx.ingress");
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
+
+ dx_compose_insert_string(field, "qdx.trace");
dx_compose_start_list(field);
- dx_compose_insert_bool(field, 0); // durable
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
dx_compose_end_list(field);
+ dx_compose_end_map(field);
+
field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
dx_compose_start_list(field);
dx_compose_insert_null(field); // message-id
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 235c381d20..d2704c4bd5 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
#include <qpid/dispatch/python_embedded.h>
#include <stdio.h>
#include <string.h>
+#include <stdbool.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
@@ -28,123 +29,290 @@ static char *module = "ROUTER";
static void dx_router_python_setup(dx_router_t *router);
static void dx_pyrouter_tick(dx_router_t *router);
-//static char *local_prefix = "_local/";
-//static char *topo_prefix = "_topo/";
+static char *router_address = "_local/qdxrouter";
+static char *local_prefix = "_local/";
+//static char *topo_prefix = "_topo/";
/**
* Address Types and Processing:
*
- * Address Hash Key onReceive onEmit
- * =============================================================================
- * _local/<local> L<local> handler forward
- * _topo/<area>/<router>/<local> A<area> forward forward
- * _topo/<my-area>/<router>/<local> R<router> forward forward
- * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward
- * _topo/<area>/all/<local> A<area> forward forward
- * _topo/<my-area>/all/<local> L<local> forward+handler forward
- * _topo/all/all/<local> L<local> forward+handler forward
- * <mobile> M<mobile> forward+handler forward
+ * Address Hash Key onReceive
+ * ===================================================================
+ * _local/<local> L<local> handler
+ * _topo/<area>/<router>/<local> A<area> forward
+ * _topo/<my-area>/<router>/<local> R<router> forward
+ * _topo/<my-area>/<my-router>/<local> L<local> handler
+ * _topo/<area>/all/<local> A<area> forward
+ * _topo/<my-area>/all/<local> L<local> forward handler
+ * _topo/all/all/<local> L<local> forward handler
+ * <mobile> M<mobile> forward handler
*/
-struct dx_router_t {
- dx_dispatch_t *dx;
- const char *router_area;
- const char *router_id;
- dx_node_t *node;
- dx_link_list_t in_links;
- dx_link_list_t out_links;
- dx_message_list_t in_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- hash_t *out_hash;
- uint64_t dtag;
- PyObject *pyRouter;
- PyObject *pyTick;
-};
+typedef struct dx_router_link_t dx_router_link_t;
+typedef struct dx_router_node_t dx_router_node_t;
-typedef struct {
- dx_link_t *link;
- dx_message_list_t out_fifo;
-} dx_router_link_t;
+
+typedef enum {
+ DX_LINK_ENDPOINT, // A link to a connected endpoint
+ DX_LINK_ROUTER, // A link to a peer router in the same area
+ DX_LINK_AREA // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+typedef struct dx_routed_event_t {
+ DEQ_LINKS(struct dx_routed_event_t);
+ dx_delivery_t *delivery;
+ dx_message_t *message;
+ bool settle;
+ uint64_t disposition;
+} dx_routed_event_t;
+
+ALLOC_DECLARE(dx_routed_event_t);
+ALLOC_DEFINE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
+
+struct dx_router_link_t {
+ DEQ_LINKS(dx_router_link_t);
+ dx_direction_t link_direction;
+ dx_link_type_t link_type;
+ dx_address_t *owning_addr; // [ref] Address record that owns this link
+ dx_link_t *link; // [own] Link pointer
+ dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
+ dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
+ dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
+ dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
+};
ALLOC_DECLARE(dx_router_link_t);
ALLOC_DEFINE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
-
-typedef struct {
+struct dx_router_node_t {
+ DEQ_LINKS(dx_router_node_t);
const char *id;
- dx_router_link_t *next_hop;
+ dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
+ dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
// list of valid origins (pointers to router_node) - (bit masks?)
-} dx_router_node_t;
+};
ALLOC_DECLARE(dx_router_node_t);
ALLOC_DEFINE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
struct dx_address_t {
- int is_local;
- dx_router_message_cb handler; // In-Process Consumer
- void *handler_context;
- dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list
- dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list
+ dx_router_message_cb handler; // In-Process Consumer
+ void *handler_context;
+ dx_router_link_list_t rlinks; // Locally-Connected Consumers
+ dx_router_node_list_t rnodes; // Remotely-Connected Consumers
};
ALLOC_DECLARE(dx_address_t);
ALLOC_DEFINE(dx_address_t);
+struct dx_router_t {
+ dx_dispatch_t *dx;
+ const char *router_area;
+ const char *router_id;
+ dx_node_t *node;
+ dx_router_link_list_t in_links;
+ dx_router_node_list_t routers;
+ dx_message_list_t in_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ hash_t *out_hash;
+ uint64_t dtag;
+ PyObject *pyRouter;
+ PyObject *pyTick;
+};
+
+
/**
- * Outbound Delivery Handler
+ * Outgoing Link Writable Handler
*/
-static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static int router_writable_link_handler(void* context, dx_link_t *link)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- dx_message_t *msg;
- size_t size;
+ dx_router_t *router = (dx_router_t*) context;
+ dx_delivery_t *delivery;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ pn_link_t *pn_link = dx_link_pn(link);
+ uint64_t tag;
+ int link_credit = pn_link_credit(pn_link);
+ dx_routed_event_list_t to_send;
+ dx_routed_event_list_t events;
+ dx_routed_event_t *re;
+ size_t offer;
+ int event_count = 0;
+
+ DEQ_INIT(to_send);
+ DEQ_INIT(events);
sys_mutex_lock(router->lock);
- msg = DEQ_HEAD(rlink->out_fifo);
- if (!msg) {
- // TODO - Recind the delivery
- sys_mutex_unlock(router->lock);
- return;
+
+ //
+ // Pull the non-delivery events into a local list so they can be processed without
+ // the lock being held.
+ //
+ re = DEQ_HEAD(rlink->event_fifo);
+ while (re) {
+ DEQ_REMOVE_HEAD(rlink->event_fifo);
+ DEQ_INSERT_TAIL(events, re);
+ re = DEQ_HEAD(rlink->event_fifo);
}
- DEQ_REMOVE_HEAD(rlink->out_fifo);
- size = (DEQ_SIZE(rlink->out_fifo));
+ //
+ // Under lock, move available deliveries from the msg_fifo to the local to_send
+ // list. Don't move more than we have credit to send.
+ //
+ if (link_credit > 0) {
+ tag = router->dtag;
+ re = DEQ_HEAD(rlink->msg_fifo);
+ while (re) {
+ DEQ_REMOVE_HEAD(rlink->msg_fifo);
+ DEQ_INSERT_TAIL(to_send, re);
+ if (DEQ_SIZE(to_send) == link_credit)
+ break;
+ re = DEQ_HEAD(rlink->msg_fifo);
+ }
+ router->dtag += DEQ_SIZE(to_send);
+ }
+
+ offer = DEQ_SIZE(rlink->msg_fifo);
sys_mutex_unlock(router->lock);
- dx_message_send(msg, pn_link);
+ //
+ // Deliver all the to_send messages downrange
+ //
+ re = DEQ_HEAD(to_send);
+ while (re) {
+ DEQ_REMOVE_HEAD(to_send);
+
+ //
+ // Get a delivery for the send. This will be the current deliver on the link.
+ //
+ tag++;
+ delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
+
+ //
+ // Send the message
+ //
+ dx_message_send(re->message, link);
+
+ //
+ // If there is an incoming delivery associated with this message, link it
+ // with the outgoing delivery. Otherwise, the message arrived pre-settled
+ // and should be sent presettled.
+ //
+ if (re->delivery) {
+ dx_delivery_set_peer(re->delivery, delivery);
+ dx_delivery_set_peer(delivery, re->delivery);
+ } else
+ dx_delivery_free(delivery, 0); // settle and free
+
+ pn_link_advance(pn_link);
+ event_count++;
+
+ dx_free_message(re->message);
+ free_dx_routed_event_t(re);
+ re = DEQ_HEAD(to_send);
+ }
//
- // If there is no incoming delivery, it was pre-settled. In this case,
- // we must pre-settle the outgoing delivery as well.
+ // Process the non-delivery events.
//
- if (dx_message_in_delivery(msg)) {
- pn_delivery_set_context(delivery, (void*) msg);
- dx_message_set_out_delivery(msg, delivery);
- } else {
- pn_delivery_settle(delivery);
- dx_free_message(msg);
+ re = DEQ_HEAD(events);
+ while (re) {
+ DEQ_REMOVE_HEAD(events);
+
+ if (re->delivery) {
+ if (re->disposition) {
+ pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition);
+ event_count++;
+ }
+ if (re->settle) {
+ dx_delivery_free(re->delivery, 0);
+ event_count++;
+ }
+ }
+
+ free_dx_routed_event_t(re);
+ re = DEQ_HEAD(events);
}
- pn_link_advance(pn_link);
- pn_link_offered(pn_link, size);
+ //
+ // Set the offer to the number of messages remaining to be sent.
+ //
+ pn_link_offered(pn_link, offer);
+ return event_count;
+}
+
+
+static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+{
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+
+ dx_parsed_field_t *trace = 0;
+ dx_parsed_field_t *ingress = 0;
+
+ if (in_da) {
+ trace = dx_parse_value_by_key(in_da, "qdx.trace");
+ ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+ }
+
+ dx_compose_start_map(out_da);
+
+ //
+ // If there is a trace field, append this router's ID to the trace.
+ //
+ if (trace && dx_parse_is_list(trace)) {
+ dx_compose_insert_string(out_da, "qdx.trace");
+ dx_compose_start_list(out_da);
+
+ uint32_t idx = 0;
+ dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
+ while (trace_item) {
+ dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+ dx_compose_insert_string_iterator(out_da, iter);
+ idx++;
+ trace_item = dx_parse_sub_value(trace, idx);
+ }
+
+ dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_end_list(out_da);
+ }
+
+ //
+ // If there is no ingress field, annotate the ingress as this router else
+ // keep the original field.
+ //
+ dx_compose_insert_string(out_da, "qdx.ingress");
+ if (ingress && dx_parse_is_scalar(ingress)) {
+ dx_field_iterator_t *iter = dx_parse_raw(ingress);
+ dx_compose_insert_string_iterator(out_da, iter);
+ } else
+ dx_compose_insert_string(out_da, router->router_id);
+
+ dx_compose_end_map(out_da);
+
+ dx_message_set_delivery_annotations(msg, out_da);
+ dx_compose_free(out_da);
}
/**
* Inbound Delivery Handler
*/
-static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_message_t *msg;
- int valid_message = 0;
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_message_t *msg;
+ int valid_message = 0;
//
// Receive the message into a local representation. If the returned message
@@ -158,20 +326,63 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
return;
//
- // Validate the message through the Properties section
+ // Consume the delivery and issue a replacement credit
//
- valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
-
pn_link_advance(pn_link);
pn_link_flow(pn_link, 1);
+ sys_mutex_lock(router->lock);
+
+ //
+ // Handle the Link-Routing case. If this incoming link is associated with a connected
+ // link, simply deliver the message to the outgoing link. There is no need to validate
+ // the message in this case.
+ //
+ if (rlink->connected_link) {
+ dx_router_link_t *clink = rlink->connected_link;
+ dx_routed_event_t *re = new_dx_routed_event_t();
+
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = msg;
+ re->settle = false;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(clink->msg_fifo, re);
+
+ //
+ // If the incoming delivery is settled (pre-settled), don't link it into the routed
+ // event. If it's not settled, link it into the event for later handling.
+ //
+ if (dx_delivery_settled(delivery))
+ dx_delivery_free(delivery, 0);
+ else
+ re->delivery = delivery;
+
+ sys_mutex_unlock(router->lock);
+ dx_link_activate(clink->link);
+ return;
+ }
+
+ //
+ // We are performing Message-Routing, therefore we will need to validate the message
+ // through the Properties section so we can access the TO field.
+ //
+ dx_message_t *in_process_copy = 0;
+ dx_router_message_cb handler = 0;
+ void *handler_context = 0;
+
+ valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
+
if (valid_message) {
dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
dx_address_t *addr;
+ int fanout = 0;
+
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, iter, (void*) &addr);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
dx_field_iterator_free(iter);
if (addr) {
@@ -179,108 +390,145 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
// To field is valid and contains a known destination. Handle the various
// cases for forwarding.
//
- // Forward to the in-process handler for this message if there is one.
- // Note: If the handler is going to queue the message for deferred processing,
- // it must copy the message. This function assumes that the handler
- // will process the message synchronously and be finished with it upon
- // completion.
- //
- if (addr->handler)
- addr->handler(addr->handler_context, msg);
//
- // Forward to the local link for the locally-connected consumer, if present.
- // TODO - Don't forward if this is a "_local" address.
+ // Interpret and update the delivery annotations of the message
//
- if (addr->rlink) {
- pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
- dx_link_activate(addr->rlink->link);
- }
+ router_annotate_message(router, msg);
//
- // Forward to the next-hop for a remotely-connected consumer, if present.
- // Don't forward if this is a "_local" address.
+ // Forward to the in-process handler for this message if there is one. The
+ // actual invocation of the handler will occur later after we've released
+ // the lock.
//
- if (addr->rnode) {
- // TODO
+ if (addr->handler) {
+ in_process_copy = dx_message_copy(msg);
+ handler = addr->handler;
+ handler_context = addr->handler_context;
}
- } else {
//
- // To field contains an unknown address. Release the message.
+ // If the address form is local (i.e. is prefixed by _local), don't forward
+ // outside of the router process.
//
- pn_delivery_update(delivery, PN_RELEASED);
- pn_delivery_settle(delivery);
+ if (!is_local) {
+ //
+ // Forward to all of the local links receiving this address.
+ //
+ dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+ while (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1 && !dx_delivery_settled(delivery))
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ dest_link = DEQ_NEXT(dest_link);
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+ while (dest_node) {
+ if (dest_node->next_hop)
+ dest_link = dest_node->next_hop->peer_link;
+ else
+ dest_link = dest_node->peer_link;
+ if (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1)
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ }
+ dest_node = DEQ_NEXT(dest_node);
+ }
+ }
}
- sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
- dx_free_message(msg);
-
//
- // If this was a pre-settled delivery, we must also locally settle it.
+ // In message-routing mode, the handling of the incoming delivery depends on the
+ // number of copies of the received message that were forwarded.
//
- if (pn_delivery_settled(delivery))
- pn_delivery_settle(delivery);
+ if (handler) {
+ dx_delivery_free(delivery, PN_ACCEPTED);
+ } else if (fanout == 0) {
+ dx_delivery_free(delivery, PN_RELEASED);
+ } else if (fanout > 1)
+ dx_delivery_free(delivery, PN_ACCEPTED);
}
} else {
//
// Message is invalid. Reject the message.
//
- pn_delivery_update(delivery, PN_REJECTED);
- pn_delivery_settle(delivery);
- pn_delivery_set_context(delivery, 0);
- dx_free_message(msg);
+ dx_delivery_free(delivery, PN_REJECTED);
}
+
+ sys_mutex_unlock(router->lock);
+ dx_free_message(msg);
+
+ //
+ // Invoke the in-process handler now that the lock is released.
+ //
+ if (handler)
+ handler(handler_context, in_process_copy);
}
/**
* Delivery Disposition Handler
*/
-static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_router_t *router = (dx_router_t*) context;
+ bool changed = dx_delivery_disp_changed(delivery);
+ uint64_t disp = dx_delivery_disp(delivery);
+ bool settled = dx_delivery_settled(delivery);
+ dx_delivery_t *peer = dx_delivery_peer(delivery);
- if (pn_link_is_sender(pn_link)) {
- uint64_t disp = pn_delivery_remote_state(delivery);
- dx_message_t *msg = pn_delivery_get_context(delivery);
- pn_delivery_t *activate = 0;
-
- if (msg) {
- assert(delivery == dx_message_out_delivery(msg));
- if (disp != 0) {
- activate = dx_message_in_delivery(msg);
- pn_delivery_update(activate, disp);
- // TODO - handling of the data accompanying RECEIVED/MODIFIED
- }
-
- if (pn_delivery_settled(delivery)) {
- //
- // Downstream delivery has been settled. Propagate the settlement
- // upstream.
- //
- activate = dx_message_in_delivery(msg);
- pn_delivery_settle(activate);
- pn_delivery_settle(delivery);
- dx_free_message(msg);
- }
+ if (peer) {
+ //
+ // The case where this delivery has a peer.
+ //
+ if (changed || settled) {
+ dx_link_t *peer_link = dx_delivery_link(peer);
+ dx_router_link_t *prl = (dx_router_link_t*) dx_link_get_context(peer_link);
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = peer;
+ re->message = 0;
+ re->settle = settled;
+ re->disposition = changed ? disp : 0;
- if (activate) {
- //
- // Activate the upstream/incoming link so that the settlement will
- // get pushed out.
- //
- dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate));
- dx_link_activate(act_link);
- }
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(prl->event_fifo, re);
+ sys_mutex_unlock(router->lock);
- return;
+ dx_link_activate(peer_link);
}
+
} else {
- // TODO - Handle disposition updates from upstream
+ //
+ // The no-peer case. Ignore status changes and echo settlement.
+ //
+ if (settled)
+ dx_delivery_free(delivery, 0);
}
}
@@ -290,25 +538,36 @@ static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *d
*/
static int router_incoming_link_handler(void* context, dx_link_t *link)
{
- dx_router_t *router = (dx_router_t*) context;
- dx_link_item_t *item = new_dx_link_item_t();
- pn_link_t *pn_link = dx_link_pn(link);
+ dx_router_t *router = (dx_router_t*) context;
+ dx_router_link_t *rlink = new_dx_router_link_t();
+ pn_link_t *pn_link = dx_link_pn(link);
- if (item) {
- DEQ_ITEM_INIT(item);
- item->link = link;
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_INCOMING;
+ rlink->link_type = DX_LINK_ENDPOINT;
+ rlink->owning_addr = 0;
+ rlink->link = link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
- sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, item);
- sys_mutex_unlock(router->lock);
+ dx_link_set_context(link, rlink);
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, rlink);
+ sys_mutex_unlock(router->lock);
+
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_flow(pn_link, 1000);
+ pn_link_open(pn_link);
+
+ //
+ // TODO - If the address has link-route semantics, create all associated
+ // links needed to go with this one.
+ //
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
- pn_link_flow(pn_link, 1000);
- pn_link_open(pn_link);
- } else {
- pn_link_close(pn_link);
- }
return 0;
}
@@ -327,73 +586,45 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
return 0;
}
- dx_router_link_t *rlink = new_dx_router_link_t();
- rlink->link = link;
- DEQ_INIT(rlink->out_fifo);
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+ dx_router_link_t *rlink = new_dx_router_link_t();
+
+ int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_OUTGOING;
+ rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+ rlink->link = link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
dx_link_set_context(link, rlink);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
dx_address_t *addr;
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-
sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
- addr->is_local = 0;
addr->handler = 0;
addr->handler_context = 0;
- addr->rlink = 0;
- addr->rnode = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
hash_insert(router->out_hash, iter, addr);
}
dx_field_iterator_free(iter);
- if (addr->rlink == 0) {
- addr->rlink = rlink;
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
- pn_link_open(pn_link);
- sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
- return 0;
- }
+ rlink->owning_addr = addr;
+ DEQ_INSERT_TAIL(addr->rlinks, rlink);
- dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
- pn_link_close(pn_link);
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_open(pn_link);
sys_mutex_unlock(router->lock);
- return 0;
-}
-
-
-/**
- * Outgoing Link Writable Handler
- */
-static int router_writable_link_handler(void* context, dx_link_t *link)
-{
- dx_router_t *router = (dx_router_t*) context;
- int grant_delivery = 0;
- pn_delivery_t *delivery;
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- pn_link_t *pn_link = dx_link_pn(link);
- uint64_t tag;
-
- sys_mutex_lock(router->lock);
- if (DEQ_SIZE(rlink->out_fifo) > 0) {
- grant_delivery = 1;
- tag = router->dtag++;
- }
- sys_mutex_unlock(router->lock);
-
- if (grant_delivery) {
- pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
- delivery = pn_link_current(pn_link);
- if (delivery) {
- router_tx_handler(context, link, delivery);
- return 1;
- }
- }
-
+ dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
return 0;
}
@@ -403,44 +634,37 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
*/
static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = dx_link_pn(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- dx_link_item_t *item;
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- if (!r_tgt)
+ if (!rlink)
return 0;
sys_mutex_lock(router->lock);
if (pn_link_is_sender(pn_link)) {
- item = DEQ_HEAD(router->out_links);
-
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
- if (iter) {
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (addr) {
- hash_remove(router->out_hash, iter);
- free_dx_router_link_t(addr->rlink);
- free_dx_address_t(addr);
- dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
+
+ if ((rlink->owning_addr->handler == 0) &&
+ (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
+ (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
+ if (iter) {
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (addr == rlink->owning_addr) {
+ hash_remove(router->out_hash, iter);
+ free_dx_router_link_t(rlink);
+ free_dx_address_t(addr);
+ dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ }
+ dx_field_iterator_free(iter);
}
- dx_field_iterator_free(iter);
}
- }
- else
- item = DEQ_HEAD(router->in_links);
-
- while (item) {
- if (item->link == link) {
- if (pn_link_is_sender(pn_link))
- DEQ_REMOVE(router->out_links, item);
- else
- DEQ_REMOVE(router->in_links, item);
- free_dx_link_item_t(item);
- break;
- }
- item = item->next;
+ } else {
+ DEQ_REMOVE(router->in_links, rlink);
+ free_dx_router_link_t(rlink);
}
sys_mutex_unlock(router->lock);
@@ -455,6 +679,83 @@ static void router_inbound_open_handler(void *type_context, dx_connection_t *con
static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
{
+ // TODO - Make sure this connection is annotated as an inter-router transport.
+ // Ignore otherwise
+
+ dx_router_t *router = (dx_router_t*) type_context;
+ dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
+ dx_link_t *sender;
+ dx_link_t *receiver;
+ dx_router_link_t *rlink;
+
+ //
+ // Create an incoming link and put it in the in-links collection. The address
+ // of the remote source of this link is '_local/qdxrouter'.
+ //
+ receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
+ pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
+ pn_terminus_set_address(dx_link_target(receiver), router_address);
+
+ rlink = new_dx_router_link_t();
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_INCOMING;
+ rlink->link_type = DX_LINK_ROUTER;
+ rlink->owning_addr = 0;
+ rlink->link = receiver;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
+ dx_link_set_context(receiver, rlink);
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, rlink);
+ sys_mutex_unlock(router->lock);
+
+ //
+ // Create an outgoing link with a local source of '_local/qdxrouter' and place
+ // it in the routing table.
+ //
+ sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
+ pn_terminus_set_address(dx_link_remote_target(sender), router_address);
+ pn_terminus_set_address(dx_link_source(sender), router_address);
+
+ rlink = new_dx_router_link_t();
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_OUTGOING;
+ rlink->link_type = DX_LINK_ROUTER;
+ rlink->link = sender;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
+ dx_link_set_context(sender, rlink);
+
+ dx_address_t *addr;
+
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, aiter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->out_hash, aiter, addr);
+ }
+
+ rlink->owning_addr = addr;
+ DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ sys_mutex_unlock(router->lock);
+
+ pn_link_open(dx_link_pn(receiver));
+ pn_link_open(dx_link_pn(sender));
+ pn_link_flow(dx_link_pn(receiver), 1000);
+ dx_field_iterator_free(aiter);
}
@@ -473,7 +774,6 @@ static void dx_router_timer_handler(void *context)
static dx_node_type_t router_node = {"router", 0, 0,
router_rx_handler,
- router_tx_handler,
router_disp_handler,
router_incoming_link_handler,
router_outgoing_link_handler,
@@ -494,22 +794,23 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
}
dx_router_t *router = NEW(dx_router_t);
- dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
+ router_node.type_context = router;
+
+ router->dx = dx;
+ router->router_area = area;
+ router->router_id = id;
+ router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
DEQ_INIT(router->in_links);
- DEQ_INIT(router->out_links);
+ DEQ_INIT(router->routers);
DEQ_INIT(router->in_fifo);
+ router->lock = sys_mutex();
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
+ router->out_hash = hash(10, 32, 0);
+ router->dtag = 1;
+ router->pyRouter = 0;
+ router->pyTick = 0;
- router->dx = dx;
- router->lock = sys_mutex();
- router->router_area = area;
- router->router_id = id;
-
- router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
-
- router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
- router->pyRouter = 0;
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -546,47 +847,52 @@ void dx_router_free(dx_router_t *router)
}
+const char *dx_router_id(const dx_dispatch_t *dx)
+{
+ dx_router_t *router = dx->router;
+ return router->router_id;
+}
+
+
dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
- bool is_local,
const char *address,
dx_router_message_cb handler,
void *context)
{
- char addr[1000];
- dx_address_t *ad = new_dx_address_t();
+ char addr_string[1000];
+ dx_router_t *router = dx->router;
+ dx_address_t *addr;
dx_field_iterator_t *iter;
- int result;
- if (!ad)
- return 0;
+ strcpy(addr_string, "L"); // Local Hash-Key Space
+ strcat(addr_string, address);
+ iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
- ad->is_local = is_local;
- ad->handler = handler;
- ad->handler_context = context;
- ad->rlink = 0;
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->out_hash, iter, addr);
+ }
+ dx_field_iterator_free(iter);
- if (ad->is_local)
- strcpy(addr, "L"); // Local Hash-Key Space
- else
- strcpy(addr, "M"); // Mobile Hash-Key Space
+ addr->handler = handler;
+ addr->handler_context = context;
- strcat(addr, address);
- iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST);
- result = hash_insert(dx->router->out_hash, iter, ad);
- dx_field_iterator_free(iter);
- if (result != 0) {
- free_dx_address_t(ad);
- return 0;
- }
+ sys_mutex_unlock(router->lock);
dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
- return ad;
+ return addr;
}
void dx_router_unregister_address(dx_address_t *ad)
{
- free_dx_address_t(ad);
+ //free_dx_address_t(ad);
}
@@ -601,12 +907,43 @@ void dx_router_send(dx_dispatch_t *dx,
sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, address, (void*) &addr);
if (addr) {
- if (addr->rlink) {
- pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
- dx_link_activate(addr->rlink->link);
+ //
+ // Forward to all of the local links receiving this address.
+ //
+ dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+ while (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ dx_link_activate(dest_link->link);
+ dest_link = DEQ_NEXT(dest_link);
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+ while (dest_node) {
+ if (dest_node->next_hop)
+ dest_link = dest_node->next_hop->peer_link;
+ else
+ dest_link = dest_node->peer_link;
+ if (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ dx_link_activate(dest_link->link);
+ }
+ dest_node = DEQ_NEXT(dest_node);
}
}
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
@@ -633,6 +970,24 @@ typedef struct {
} RouterAdapter;
+static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
+{
+ //RouterAdapter *adapter = (RouterAdapter*) self;
+ //dx_router_t *router = adapter->router;
+ const char *address;
+ int is_reachable;
+ int is_neighbor;
+
+ if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
+ return 0;
+
+ // TODO
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
@@ -666,8 +1021,9 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
static PyMethodDef RouterAdapter_methods[] = {
- {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
- {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+ {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
+ {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
+ {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
{0, 0, 0, 0}
};
@@ -747,7 +1103,7 @@ static void dx_router_python_setup(dx_router_t *router)
PyObject* pClass;
PyObject* pArgs;
- pName = PyString_FromString("router");
+ pName = PyString_FromString("qpid.dispatch.router");
pModule = PyImport_Import(pName);
Py_DECREF(pName);
if (!pModule) {
@@ -809,14 +1165,16 @@ static void dx_pyrouter_tick(dx_router_t *router)
PyObject *pArgs;
PyObject *pValue;
- pArgs = PyTuple_New(0);
- pValue = PyObject_CallObject(router->pyTick, pArgs);
- if (PyErr_Occurred()) {
- PyErr_Print();
- }
- Py_DECREF(pArgs);
- if (pValue) {
- Py_DECREF(pValue);
+ if (router->pyTick) {
+ pArgs = PyTuple_New(0);
+ pValue = PyObject_CallObject(router->pyTick, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
}
}
diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c
index 52450b42b6..5420d3b776 100644
--- a/qpid/extras/dispatch/src/server.c
+++ b/qpid/extras/dispatch/src/server.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *dx_server, int id)
}
-static void thread_process_listeners(pn_driver_t *driver)
+static void thread_process_listeners(dx_server_t *dx_server)
{
+ pn_driver_t *driver = dx_server->driver;
pn_listener_t *listener = pn_driver_listener(driver);
pn_connector_t *cxtr;
dx_connection_t *ctx;
while (listener) {
- dx_log(module, LOG_TRACE, "Accepting Connection");
cxtr = pn_listener_accept(listener);
+ dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
ctx = new_dx_connection_t();
ctx->state = CONN_STATE_OPENING;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
- ctx->pn_conn = 0;
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
ctx->ufd = 0;
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, dx_server->container_name);
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
//
// Get a pointer to the transport so we can insert security components into it
//
@@ -201,20 +207,12 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
// Call the handler that is appropriate for the connector's state.
//
switch (ctx->state) {
- case CONN_STATE_CONNECTING:
- if (!pn_connector_closed(cxtr)) {
- //ctx->state = CONN_STATE_SASL_CLIENT;
- assert(ctx->connector);
- ctx->connector->state = CXTR_STATE_OPEN;
- events = 1;
- } else {
+ case CONN_STATE_CONNECTING: {
+ if (pn_connector_closed(cxtr)) {
ctx->state = CONN_STATE_FAILED;
events = 0;
+ break;
}
- break;
-
- case CONN_STATE_OPENING:
- ctx->state = CONN_STATE_OPERATIONAL;
pn_connection_t *conn = pn_connection();
pn_connection_set_container(conn, dx_server->container_name);
@@ -222,20 +220,71 @@ static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
- dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ const dx_server_config_t *config = ctx->connector->config;
+
+ //
+ // Set up SSL if appropriate
+ //
+ if (config->ssl_enabled) {
+ pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+ pn_ssl_domain_set_credentials(domain,
+ config->ssl_certificate_file,
+ config->ssl_private_key_file,
+ config->ssl_password);
+
+ if (config->ssl_require_peer_authentication)
+ pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db);
+
+ pn_ssl_t *ssl = pn_ssl(tport);
+ pn_ssl_init(ssl, domain, 0);
+ pn_ssl_domain_free(domain);
+ }
- if (ctx->listener) {
- ce = DX_CONN_EVENT_LISTENER_OPEN;
- } else if (ctx->connector) {
- ce = DX_CONN_EVENT_CONNECTOR_OPEN;
- ctx->connector->delay = 0;
- } else
- assert(0);
+ //
+ // Set up SASL
+ //
+ pn_sasl_t *sasl = pn_sasl(tport);
+ pn_sasl_mechanisms(sasl, config->sasl_mechanisms);
+ pn_sasl_client(sasl);
- dx_server->conn_handler(dx_server->conn_handler_context,
- ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ ctx->state = CONN_STATE_OPENING;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
events = 1;
break;
+ }
+
+ case CONN_STATE_OPENING: {
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ pn_sasl_t *sasl = pn_sasl(tport);
+
+ if (pn_sasl_outcome(sasl) == PN_SASL_OK) {
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = DX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ dx_server->conn_handler(dx_server->conn_handler_context,
+ ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+ }
+ else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) {
+ ctx->state = CONN_STATE_FAILED;
+ if (ctx->connector) {
+ const dx_server_config_t *config = ctx->connector->config;
+ dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+ }
+ }
+ }
case CONN_STATE_OPERATIONAL:
if (pn_connector_closed(cxtr)) {
@@ -323,20 +372,35 @@ static void *thread_run(void *arg)
//
// Service pending timers.
//
- dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
- if (timer) {
- DEQ_REMOVE_HEAD(dx_server->pending_timers);
-
- //
- // Mark the timer as idle in case it reschedules itself.
- //
- dx_timer_idle_LH(timer);
+ if (DEQ_SIZE(dx_server->pending_timers) > 0) {
+ dx_timer_list_t local_list;
+ dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+
+ DEQ_INIT(local_list);
+ while (timer) {
+ DEQ_REMOVE_HEAD(dx_server->pending_timers);
+ DEQ_INSERT_TAIL(local_list, timer);
+ timer = DEQ_HEAD(dx_server->pending_timers);
+ }
//
- // Release the lock and invoke the connection handler.
+ // Release the lock and invoke the connection handlers.
//
sys_mutex_unlock(dx_server->lock);
- timer->handler(timer->context);
+
+ timer = DEQ_HEAD(local_list);
+ while (timer) {
+ DEQ_REMOVE_HEAD(local_list);
+
+ //
+ // Mark the timer as idle in case it reschedules itself.
+ //
+ dx_timer_idle_LH(timer);
+
+ timer->handler(timer->context);
+ timer = DEQ_HEAD(local_list);
+ }
+
pn_driver_wakeup(dx_server->driver);
continue;
}
@@ -411,7 +475,7 @@ static void *thread_run(void *arg)
//
// Process listeners (incoming connections).
//
- thread_process_listeners(dx_server->driver);
+ thread_process_listeners(dx_server);
//
// Traverse the list of connectors-needing-service from the proton driver.
diff --git a/qpid/extras/dispatch/src/server_private.h b/qpid/extras/dispatch/src/server_private.h
index 49ab4724d2..5782492f22 100644
--- a/qpid/extras/dispatch/src/server_private.h
+++ b/qpid/extras/dispatch/src/server_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/timer.c b/qpid/extras/dispatch/src/timer.c
index 0d5b301a6e..a7f36e149b 100644
--- a/qpid/extras/dispatch/src/timer.c
+++ b/qpid/extras/dispatch/src/timer.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/timer_private.h b/qpid/extras/dispatch/src/timer_private.h
index 905a8f5bd1..4e173b7ad4 100644
--- a/qpid/extras/dispatch/src/timer_private.h
+++ b/qpid/extras/dispatch/src/timer_private.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/work_queue.c b/qpid/extras/dispatch/src/work_queue.c
index 4b3c5d7fa5..93c1cda1a9 100644
--- a/qpid/extras/dispatch/src/work_queue.c
+++ b/qpid/extras/dispatch/src/work_queue.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/qpid/extras/dispatch/src/work_queue.h b/qpid/extras/dispatch/src/work_queue.h
index 597a484a9c..3ad5c21c81 100644
--- a/qpid/extras/dispatch/src/work_queue.h
+++ b/qpid/extras/dispatch/src/work_queue.h
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY