summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-09-26 21:14:59 +0000
committerTed Ross <tross@apache.org>2013-09-26 21:14:59 +0000
commit966b9314395d655f7fea8db90e47ae089d81ee6f (patch)
treec32473d2cecc4bc97e827ec74a8d10def4c67907 /qpid/extras/dispatch/src
parentec0e68b606c81f92c33b842384d7ca8a5152d91a (diff)
downloadqpid-python-966b9314395d655f7fea8db90e47ae089d81ee6f.tar.gz
QPID-5173
QPID-5045 QPID-5181 - Major refactor of the routing data structure in preparation for multi-router operation. - Fixed the CMake bug in QPID-5173 - Added Dynamic assignment of routable addresses for outbound links (QPID-5181) - Changed the indentation of the Python code from 2 spaces to 4 spaces. - Reduced the default log level to make the console less chatty. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1526694 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
-rw-r--r--qpid/extras/dispatch/src/agent.c2
-rw-r--r--qpid/extras/dispatch/src/amqp.c30
-rw-r--r--qpid/extras/dispatch/src/bitmask.c124
-rw-r--r--qpid/extras/dispatch/src/message_private.h2
-rw-r--r--qpid/extras/dispatch/src/python_embedded.c9
-rw-r--r--qpid/extras/dispatch/src/router_node.c472
-rw-r--r--qpid/extras/dispatch/src/router_private.h128
7 files changed, 553 insertions, 214 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 557410910c..2063b8fcc4 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -240,7 +240,7 @@ static void dx_agent_deferred_handler(void *context)
}
-static void dx_agent_rx_handler(void *context, dx_message_t *msg)
+static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id)
{
dx_agent_t *agent = (dx_agent_t*) context;
dx_message_t *copy = dx_message_copy(msg);
diff --git a/qpid/extras/dispatch/src/amqp.c b/qpid/extras/dispatch/src/amqp.c
new file mode 100644
index 0000000000..6a8545b757
--- /dev/null
+++ b/qpid/extras/dispatch/src/amqp.c
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/amqp.h>
+
+const char * const DX_DA_INGRESS = "qdx.ingress";
+const char * const DX_DA_TRACE = "qdx.trace";
+const char * const DX_DA_TO = "qdx.to";
+
+const char * const DX_CAPABILITY_ROUTER = "qdx.router";
+
+const char * const DX_INTERNODE_LINK_NAME_1 = "qdx.internode.1";
+const char * const DX_INTERNODE_LINK_NAME_2 = "qdx.internode.2";
+
diff --git a/qpid/extras/dispatch/src/bitmask.c b/qpid/extras/dispatch/src/bitmask.c
new file mode 100644
index 0000000000..5341f8d83a
--- /dev/null
+++ b/qpid/extras/dispatch/src/bitmask.c
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/bitmask.h>
+#include <qpid/dispatch/alloc.h>
+#include <assert.h>
+
+#define DX_BITMASK_LONGS 16
+#define DX_BITMASK_BITS (DX_BITMASK_LONGS * 64)
+
+struct dx_bitmask_t {
+ uint64_t array[DX_BITMASK_LONGS];
+ int first_set;
+};
+
+ALLOC_DECLARE(dx_bitmask_t);
+ALLOC_DEFINE(dx_bitmask_t);
+
+#define MASK_INDEX(num) (num / 64)
+#define MASK_ONEHOT(num) (1 << (num % 64))
+#define FIRST_NONE -1
+#define FIRST_UNKNOWN -2
+
+
+int dx_bitmask_width()
+{
+ return DX_BITMASK_BITS;
+}
+
+
+dx_bitmask_t *dx_bitmask(int initial)
+{
+ dx_bitmask_t *b = new_dx_bitmask_t();
+ if (initial)
+ dx_bitmask_set_all(b);
+ else
+ dx_bitmask_clear_all(b);
+ return b;
+}
+
+
+void dx_bitmask_free(dx_bitmask_t *b)
+{
+ free_dx_bitmask_t(b);
+}
+
+
+void dx_bitmask_set_all(dx_bitmask_t *b)
+{
+ for (int i = 0; i < DX_BITMASK_LONGS; i++)
+ b->array[i] = 0xFFFFFFFFFFFFFFFF;
+ b->first_set = 0;
+}
+
+
+void dx_bitmask_clear_all(dx_bitmask_t *b)
+{
+ for (int i = 0; i < DX_BITMASK_LONGS; i++)
+ b->array[i] = 0;
+ b->first_set = FIRST_NONE;
+}
+
+
+void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum)
+{
+ assert(bitnum < DX_BITMASK_BITS);
+ b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum);
+ if (b->first_set > bitnum)
+ b->first_set = bitnum;
+}
+
+
+void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum)
+{
+ assert(bitnum < DX_BITMASK_BITS);
+ b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum));
+ if (b->first_set == bitnum)
+ b->first_set = FIRST_UNKNOWN;
+}
+
+
+int dx_bitmask_value(dx_bitmask_t *b, int bitnum)
+{
+ return (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) ? 1 : 0;
+}
+
+
+int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum)
+{
+ if (b->first_set == FIRST_UNKNOWN) {
+ b->first_set = FIRST_NONE;
+ for (int i = 0; i < DX_BITMASK_LONGS; i++)
+ if (b->array[i]) {
+ for (int j = 0; j < 64; j++)
+ if ((1 << j) & b->array[i]) {
+ b->first_set = i * 64 + j;
+ break;
+ }
+ break;
+ }
+ }
+
+ if (b->first_set == FIRST_NONE)
+ return 0;
+ *bitnum = b->first_set;
+ return 1;
+}
+
diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h
index 27b81bbb4c..c57cea5f0d 100644
--- a/qpid/extras/dispatch/src/message_private.h
+++ b/qpid/extras/dispatch/src/message_private.h
@@ -67,7 +67,7 @@ typedef struct {
sys_mutex_t *lock;
uint32_t ref_count; // The number of messages referencing this
dx_buffer_list_t buffers; // The buffer chain containing the message
- dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations
+ dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT)
dx_field_location_t section_message_header; // The message header list
dx_field_location_t section_delivery_annotation; // The delivery annotation map
dx_field_location_t section_message_annotation; // The message annotation map
diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c
index 0b0cc11025..2db8f583de 100644
--- a/qpid/extras/dispatch/src/python_embedded.c
+++ b/qpid/extras/dispatch/src/python_embedded.c
@@ -402,7 +402,7 @@ typedef struct {
} IoAdapter;
-static void dx_io_rx_handler(void *context, dx_message_t *msg)
+static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id)
{
IoAdapter *self = (IoAdapter*) context;
@@ -454,9 +454,10 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg)
PyObject *pAP = dx_field_to_py(ap_map);
PyObject *pBody = dx_field_to_py(body_map);
- PyObject *pArgs = PyTuple_New(2);
+ PyObject *pArgs = PyTuple_New(3);
PyTuple_SetItem(pArgs, 0, pAP);
PyTuple_SetItem(pArgs, 1, pBody);
+ PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) link_id));
PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
Py_DECREF(pArgs);
@@ -507,10 +508,10 @@ static PyObject* dx_python_send(PyObject *self, PyObject *args)
field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
dx_compose_start_map(field);
- dx_compose_insert_string(field, "qdx.ingress");
+ dx_compose_insert_string(field, DX_DA_INGRESS);
dx_compose_insert_string(field, dx_router_id(ioa->dx));
- dx_compose_insert_string(field, "qdx.trace");
+ dx_compose_insert_string(field, DX_DA_TRACE);
dx_compose_start_list(field);
dx_compose_insert_string(field, dx_router_id(ioa->dx));
dx_compose_end_list(field);
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index d2704c4bd5..dc63a45451 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -21,17 +21,18 @@
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
+#include <stdlib.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
+#include "router_private.h"
static char *module = "ROUTER";
static void dx_router_python_setup(dx_router_t *router);
static void dx_pyrouter_tick(dx_router_t *router);
-static char *router_address = "_local/qdxrouter";
-static char *local_prefix = "_local/";
-//static char *topo_prefix = "_topo/";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
/**
* Address Types and Processing:
@@ -48,86 +49,82 @@ static char *local_prefix = "_local/";
* <mobile> M<mobile> forward handler
*/
+ALLOC_DEFINE(dx_routed_event_t);
+ALLOC_DEFINE(dx_router_link_t);
+ALLOC_DEFINE(dx_router_node_t);
+ALLOC_DEFINE(dx_router_ref_t);
+ALLOC_DEFINE(dx_router_link_ref_t);
+ALLOC_DEFINE(dx_address_t);
-typedef struct dx_router_link_t dx_router_link_t;
-typedef struct dx_router_node_t dx_router_node_t;
+static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+ dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->link = link;
+ link->ref = ref;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
-typedef enum {
- DX_LINK_ENDPOINT, // A link to a connected endpoint
- DX_LINK_ROUTER, // A link to a peer router in the same area
- DX_LINK_AREA // A link to a peer router in a different area (area boundary)
-} dx_link_type_t;
+static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+ if (link->ref) {
+ DEQ_REMOVE(*ref_list, link->ref);
+ free_dx_router_link_ref_t(link->ref);
+ link->ref = 0;
+ }
+}
-typedef struct dx_routed_event_t {
- DEQ_LINKS(struct dx_routed_event_t);
- dx_delivery_t *delivery;
- dx_message_t *message;
- bool settle;
- uint64_t disposition;
-} dx_routed_event_t;
-ALLOC_DECLARE(dx_routed_event_t);
-ALLOC_DEFINE(dx_routed_event_t);
-DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
-
-
-struct dx_router_link_t {
- DEQ_LINKS(dx_router_link_t);
- dx_direction_t link_direction;
- dx_link_type_t link_type;
- dx_address_t *owning_addr; // [ref] Address record that owns this link
- dx_link_t *link; // [own] Link pointer
- dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
- dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
- dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
- dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
-};
+/**
+ * Check an address to see if it no longer has any associated destinations.
+ * Depending on its policy, the address may be eligible for being closed out
+ * (i.e. Logging its terminal statistics and freeing its resources).
+ */
+static void dx_router_check_addr_LH(dx_address_t *addr)
+{
+ // TODO
+}
-ALLOC_DECLARE(dx_router_link_t);
-ALLOC_DEFINE(dx_router_link_t);
-DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
-
-struct dx_router_node_t {
- DEQ_LINKS(dx_router_node_t);
- const char *id;
- dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
- dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
- // list of valid origins (pointers to router_node) - (bit masks?)
-};
-ALLOC_DECLARE(dx_router_node_t);
-ALLOC_DEFINE(dx_router_node_t);
-DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
+/**
+ * Determine whether a terminus has router capability
+ */
+static int dx_router_terminus_is_router(pn_terminus_t *term)
+{
+ pn_data_t *cap = pn_terminus_capabilities(term);
+ if (cap && pn_data_type(cap) == PN_SYMBOL) {
+ pn_bytes_t sym = pn_data_get_symbol(cap);
+ if (sym.size == strlen(DX_CAPABILITY_ROUTER) &&
+ strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0)
+ return 1;
+ }
-struct dx_address_t {
- dx_router_message_cb handler; // In-Process Consumer
- void *handler_context;
- dx_router_link_list_t rlinks; // Locally-Connected Consumers
- dx_router_node_list_t rnodes; // Remotely-Connected Consumers
-};
+ return 0;
+}
-ALLOC_DECLARE(dx_address_t);
-ALLOC_DEFINE(dx_address_t);
+static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length)
+{
+ static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
+ char discriminator[11];
+ long int rnd = random();
+ int idx;
+
+ for (idx = 0; idx < 10; idx++)
+ discriminator[idx] = table[(rnd >> (idx * 6)) & 63];
+ discriminator[idx] = '\0';
-struct dx_router_t {
- dx_dispatch_t *dx;
- const char *router_area;
- const char *router_id;
- dx_node_t *node;
- dx_router_link_list_t in_links;
- dx_router_node_list_t routers;
- dx_message_list_t in_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- hash_t *out_hash;
- uint64_t dtag;
- PyObject *pyRouter;
- PyObject *pyTick;
-};
+ snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator);
+}
+
+
+static int dx_router_find_mask_bit(dx_link_t *link)
+{
+ return 0; // TODO
+}
/**
@@ -191,7 +188,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
DEQ_REMOVE_HEAD(to_send);
//
- // Get a delivery for the send. This will be the current deliver on the link.
+ // Get a delivery for the send. This will be the current delivery on the link.
//
tag++;
delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
@@ -259,8 +256,8 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
dx_parsed_field_t *ingress = 0;
if (in_da) {
- trace = dx_parse_value_by_key(in_da, "qdx.trace");
- ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+ trace = dx_parse_value_by_key(in_da, DX_DA_TRACE);
+ ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS);
}
dx_compose_start_map(out_da);
@@ -269,7 +266,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
// If there is a trace field, append this router's ID to the trace.
//
if (trace && dx_parse_is_list(trace)) {
- dx_compose_insert_string(out_da, "qdx.trace");
+ dx_compose_insert_string(out_da, DX_DA_TRACE);
dx_compose_start_list(out_da);
uint32_t idx = 0;
@@ -289,7 +286,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
// If there is no ingress field, annotate the ingress as this router else
// keep the original field.
//
- dx_compose_insert_string(out_da, "qdx.ingress");
+ dx_compose_insert_string(out_da, DX_DA_INGRESS);
if (ingress && dx_parse_is_scalar(ingress)) {
dx_field_iterator_t *iter = dx_parse_raw(ingress);
dx_compose_insert_string_iterator(out_da, iter);
@@ -380,7 +377,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- hash_retrieve(router->out_hash, iter, (void*) &addr);
+ hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
int is_local = dx_field_iterator_prefix(iter, local_prefix);
dx_field_iterator_free(iter);
@@ -415,33 +412,34 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
//
// Forward to all of the local links receiving this address.
//
- dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
- while (dest_link) {
+ dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+ while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = dx_message_copy(msg);
re->settle = 0;
re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
fanout++;
if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
- dx_link_activate(dest_link->link);
- dest_link = DEQ_NEXT(dest_link);
+ dx_link_activate(dest_link_ref->link->link);
+ dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
//
- dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
- while (dest_node) {
- if (dest_node->next_hop)
- dest_link = dest_node->next_hop->peer_link;
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
else
- dest_link = dest_node->peer_link;
+ dest_link = dest_node_ref->router->peer_link;
if (dest_link) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
@@ -457,7 +455,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_link_activate(dest_link->link);
}
- dest_node = DEQ_NEXT(dest_node);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
}
@@ -487,7 +485,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
// Invoke the in-process handler now that the lock is released.
//
if (handler)
- handler(handler_context, in_process_copy);
+ handler(handler_context, in_process_copy, rlink->mask_bit);
}
@@ -541,25 +539,28 @@ static int router_incoming_link_handler(void* context, dx_link_t *link)
dx_router_t *router = (dx_router_t*) context;
dx_router_link_t *rlink = new_dx_router_link_t();
pn_link_t *pn_link = dx_link_pn(link);
+ int is_router = dx_router_terminus_is_router(dx_link_remote_source(link));
DEQ_ITEM_INIT(rlink);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
+ rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
rlink->link_direction = DX_INCOMING;
- rlink->link_type = DX_LINK_ENDPOINT;
rlink->owning_addr = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+ pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
pn_link_flow(pn_link, 1000);
pn_link_open(pn_link);
@@ -579,52 +580,90 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
{
dx_router_t *router = (dx_router_t*) context;
pn_link_t *pn_link = dx_link_pn(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ const char *r_src = pn_terminus_get_address(dx_link_remote_source(link));
+ int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link));
+ int is_router = dx_router_terminus_is_router(dx_link_remote_target(link));
- if (!r_tgt) {
+ //
+ // If this link is not a router link and it has no source address, we can't
+ // accept it.
+ //
+ if (r_src == 0 && !is_router && !is_dynamic) {
pn_link_close(pn_link);
return 0;
}
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
- dx_router_link_t *rlink = new_dx_router_link_t();
-
- int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
-
+ //
+ // Create a router_link record for this link. Some of the fields will be
+ // modified in the different cases below.
+ //
+ dx_router_link_t *rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_OUTGOING;
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+ rlink->link_direction = DX_OUTGOING;
+ rlink->owning_addr = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
-
- dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
+ pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+ pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (!addr) {
- addr = new_dx_address_t();
- addr->handler = 0;
- addr->handler_context = 0;
- DEQ_INIT(addr->rlinks);
- DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, iter, addr);
+
+ if (is_router) {
+ //
+ // If this is a router link, put it in the router_address link-list.
+ //
+ dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
+ rlink->owning_addr = router->router_addr;
+
+ } else {
+ //
+ // If this is an endpoint link, check the source. If it is dynamic, we will
+ // assign it an ephemeral and routable address. If it has a non-dymanic
+ // address, that address needs to be set up in the address list.
+ //
+ dx_field_iterator_t *iter;
+ char temp_addr[1000];
+ dx_address_t *addr;
+
+ if (is_dynamic) {
+ dx_router_generate_temp_addr(router, temp_addr, 1000);
+ iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+ pn_terminus_set_address(dx_link_source(link), temp_addr);
+ dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr);
+ } else {
+ iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
+ dx_log(module, LOG_INFO, "Registered local address: %s", r_src);
+ }
+
+ hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ DEQ_ITEM_INIT(addr);
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->addr_hash, iter, addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ }
+ dx_field_iterator_free(iter);
+
+ rlink->owning_addr = addr;
+ dx_router_add_link_ref_LH(&addr->rlinks, rlink);
}
- dx_field_iterator_free(iter);
- rlink->owning_addr = addr;
- DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
+ sys_mutex_unlock(router->lock);
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
pn_link_open(pn_link);
- sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
return 0;
}
@@ -634,40 +673,37 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
*/
static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = dx_link_pn(link);
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_router_t *router = (dx_router_t*) context;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
if (!rlink)
return 0;
sys_mutex_lock(router->lock);
- if (pn_link_is_sender(pn_link)) {
- DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
-
- if ((rlink->owning_addr->handler == 0) &&
- (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
- (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
- if (iter) {
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (addr == rlink->owning_addr) {
- hash_remove(router->out_hash, iter);
- free_dx_router_link_t(rlink);
- free_dx_address_t(addr);
- dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
- }
- dx_field_iterator_free(iter);
- }
- }
- } else {
- DEQ_REMOVE(router->in_links, rlink);
- free_dx_router_link_t(rlink);
+
+ //
+ // If the link is outgoing, we must disassociate it from its address.
+ //
+ if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) {
+ dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
+ dx_router_check_addr_LH(rlink->owning_addr);
}
+ //
+ // If this is an incoming inter-router link, we must free the mask_bit.
+ //
+ if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING)
+ dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
+
+ //
+ // Remove the link from the master list-of-links.
+ //
+ DEQ_REMOVE(router->links, rlink);
sys_mutex_unlock(router->lock);
+
+ // TODO - wrap the free to handle the recursive items
+ free_dx_router_link_t(rlink);
+
return 0;
}
@@ -683,24 +719,37 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
// Ignore otherwise
dx_router_t *router = (dx_router_t*) type_context;
- dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
dx_link_t *sender;
dx_link_t *receiver;
dx_router_link_t *rlink;
+ int mask_bit = 0;
+ size_t clen = strlen(DX_CAPABILITY_ROUTER);
+
+ //
+ // Allocate a mask bit to designate the pair of links connected to the neighbor router
+ //
+ sys_mutex_lock(router->lock);
+ if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+ dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+ } else {
+ sys_mutex_unlock(router->lock);
+ dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+ return;
+ }
//
- // Create an incoming link and put it in the in-links collection. The address
- // of the remote source of this link is '_local/qdxrouter'.
+ // Create an incoming link with router source capability
//
- receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
- pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
- pn_terminus_set_address(dx_link_target(receiver), router_address);
+ receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1);
+ // TODO - We don't want to have to cast away the constness of the literal string here!
+ // See PROTON-429
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER));
rlink = new_dx_router_link_t();
-
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_INCOMING;
+ rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
+ rlink->link_direction = DX_INCOMING;
rlink->owning_addr = 0;
rlink->link = receiver;
rlink->connected_link = 0;
@@ -709,53 +758,40 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(receiver, rlink);
-
- sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, rlink);
- sys_mutex_unlock(router->lock);
+ DEQ_INSERT_TAIL(router->links, rlink);
//
- // Create an outgoing link with a local source of '_local/qdxrouter' and place
- // it in the routing table.
+ // Create an outgoing link with router target capability
//
- sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
- pn_terminus_set_address(dx_link_remote_target(sender), router_address);
- pn_terminus_set_address(dx_link_source(sender), router_address);
+ sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2);
+ // TODO - We don't want to have to cast away the constness of the literal string here!
+ // See PROTON-429
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
rlink = new_dx_router_link_t();
-
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_OUTGOING;
+ rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
+ rlink->link_direction = DX_OUTGOING;
+ rlink->owning_addr = router->router_addr;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
- dx_link_set_context(sender, rlink);
-
- dx_address_t *addr;
-
- sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, aiter, (void**) &addr);
- if (!addr) {
- addr = new_dx_address_t();
- addr->handler = 0;
- addr->handler_context = 0;
- DEQ_INIT(addr->rlinks);
- DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, aiter, addr);
- }
+ //
+ // Add the new outgoing link to the router_address's list of links.
+ //
+ dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
- rlink->owning_addr = addr;
- DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ dx_link_set_context(sender, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
pn_link_open(dx_link_pn(receiver));
pn_link_open(dx_link_pn(sender));
pn_link_flow(dx_link_pn(receiver), 1000);
- dx_field_iterator_free(aiter);
}
@@ -767,7 +803,6 @@ static void dx_router_timer_handler(void *context)
// Periodic processing.
//
dx_pyrouter_tick(router);
-
dx_timer_schedule(router->timer, 1000);
}
@@ -797,20 +832,29 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router_node.type_context = router;
+ dx->router = router;
router->dx = dx;
router->router_area = area;
router->router_id = id;
router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
- DEQ_INIT(router->in_links);
+ DEQ_INIT(router->addrs);
+ router->addr_hash = hash(10, 32, 0);
+
+ DEQ_INIT(router->links);
DEQ_INIT(router->routers);
- DEQ_INIT(router->in_fifo);
- router->lock = sys_mutex();
- router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
- router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
- router->pyRouter = 0;
- router->pyTick = 0;
+ router->neighbor_free_mask = dx_bitmask(1);
+ router->lock = sys_mutex();
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
+ router->dtag = 1;
+ router->pyRouter = 0;
+ router->pyTick = 0;
+
+ //
+ // Create an address for all of the routers in the topology. It will be registered
+ // locally later in the initialization sequence.
+ //
+ router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -824,7 +868,6 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
dx_python_start();
dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id);
-
return router;
}
@@ -869,14 +912,17 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, iter, (void**) &addr);
+ hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
+ DEQ_ITEM_INIT(addr);
addr->handler = 0;
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
}
dx_field_iterator_free(iter);
@@ -885,7 +931,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
+ if (handler)
+ dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address);
return addr;
}
@@ -905,34 +952,35 @@ void dx_router_send(dx_dispatch_t *dx,
dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, address, (void*) &addr);
+ hash_retrieve(router->addr_hash, address, (void*) &addr);
if (addr) {
//
// Forward to all of the local links receiving this address.
//
- dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
- while (dest_link) {
+ dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+ while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = dx_message_copy(msg);
re->settle = 0;
re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
- dx_link_activate(dest_link->link);
- dest_link = DEQ_NEXT(dest_link);
+ dx_link_activate(dest_link_ref->link->link);
+ dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
//
- dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
- while (dest_node) {
- if (dest_node->next_hop)
- dest_link = dest_node->next_hop->peer_link;
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
else
- dest_link = dest_node->peer_link;
+ dest_link = dest_node_ref->router->peer_link;
if (dest_link) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
@@ -943,7 +991,7 @@ void dx_router_send(dx_dispatch_t *dx,
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
dx_link_activate(dest_link->link);
}
- dest_node = DEQ_NEXT(dest_node);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
@@ -977,8 +1025,11 @@ static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
const char *address;
int is_reachable;
int is_neighbor;
+ int link_maskbit;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
+ if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor,
+ &link_maskbit, &router_maskbit))
return 0;
// TODO
@@ -1099,6 +1150,7 @@ static void dx_router_python_setup(dx_router_t *router)
PyObject* pName;
PyObject* pId;
PyObject* pArea;
+ PyObject* pMaxRouters;
PyObject* pModule;
PyObject* pClass;
PyObject* pArgs;
@@ -1126,7 +1178,7 @@ static void dx_router_python_setup(dx_router_t *router)
//
// Constructor Arguments for RouterEngine
//
- pArgs = PyTuple_New(3);
+ pArgs = PyTuple_New(4);
// arg 0: adapter instance
PyTuple_SetItem(pArgs, 0, adapterInstance);
@@ -1135,10 +1187,14 @@ static void dx_router_python_setup(dx_router_t *router)
pId = PyString_FromString(router->router_id);
PyTuple_SetItem(pArgs, 1, pId);
- // arg 2: area id
+ // arg 2: area_id
pArea = PyString_FromString(router->router_area);
PyTuple_SetItem(pArgs, 2, pArea);
+ // arg 3: max_routers
+ pMaxRouters = PyInt_FromLong((long) dx_bitmask_width());
+ PyTuple_SetItem(pArgs, 3, pMaxRouters);
+
//
// Instantiate the router
//
diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h
new file mode 100644
index 0000000000..8e481375ab
--- /dev/null
+++ b/qpid/extras/dispatch/src/router_private.h
@@ -0,0 +1,128 @@
+#ifndef __router_private_h__
+#define __router_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct dx_router_link_t dx_router_link_t;
+typedef struct dx_router_node_t dx_router_node_t;
+typedef struct dx_router_ref_t dx_router_ref_t;
+typedef struct dx_router_link_ref_t dx_router_link_ref_t;
+
+
+typedef enum {
+ DX_LINK_ENDPOINT, // A link to a connected endpoint
+ DX_LINK_ROUTER, // A link to a peer router in the same area
+ DX_LINK_AREA // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+typedef struct dx_routed_event_t {
+ DEQ_LINKS(struct dx_routed_event_t);
+ dx_delivery_t *delivery;
+ dx_message_t *message;
+ bool settle;
+ uint64_t disposition;
+} dx_routed_event_t;
+
+ALLOC_DECLARE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
+
+struct dx_router_link_t {
+ DEQ_LINKS(dx_router_link_t);
+ int mask_bit; // Unique mask bit if this is an inter-router link
+ dx_link_type_t link_type;
+ dx_direction_t link_direction;
+ dx_address_t *owning_addr; // [ref] Address record that owns this link
+ dx_link_t *link; // [own] Link pointer
+ dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
+ dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
+ dx_router_link_ref_t *ref; // Pointer to a containing reference object
+ dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
+ dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
+};
+
+ALLOC_DECLARE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
+
+struct dx_router_node_t {
+ DEQ_LINKS(dx_router_node_t);
+ const char *id;
+ int mask_bit;
+ dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
+ dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
+ dx_bitmask_t *valid_origins;
+};
+
+ALLOC_DECLARE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
+
+struct dx_router_ref_t {
+ DEQ_LINKS(dx_router_ref_t);
+ dx_router_node_t *router;
+};
+
+ALLOC_DECLARE(dx_router_ref_t);
+DEQ_DECLARE(dx_router_ref_t, dx_router_ref_list_t);
+
+
+struct dx_router_link_ref_t {
+ DEQ_LINKS(dx_router_link_ref_t);
+ dx_router_link_t *link;
+};
+
+ALLOC_DECLARE(dx_router_link_ref_t);
+DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t);
+
+
+struct dx_address_t {
+ DEQ_LINKS(dx_address_t);
+ dx_router_message_cb handler; // In-Process Consumer
+ void *handler_context; // In-Process Consumer context
+ dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers
+ dx_router_ref_list_t rnodes; // Remotely-Connected Consumers
+};
+
+ALLOC_DECLARE(dx_address_t);
+DEQ_DECLARE(dx_address_t, dx_address_list_t);
+
+
+struct dx_router_t {
+ dx_dispatch_t *dx;
+ const char *router_area;
+ const char *router_id;
+ dx_node_t *node;
+
+ dx_address_list_t addrs;
+ hash_t *addr_hash;
+ dx_address_t *router_addr;
+
+ dx_router_link_list_t links;
+ dx_router_node_list_t routers;
+
+ dx_bitmask_t *neighbor_free_mask;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ uint64_t dtag;
+
+ PyObject *pyRouter;
+ PyObject *pyTick;
+};
+
+#endif