summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-04-26 16:34:33 +0000
committerTed Ross <tross@apache.org>2013-04-26 16:34:33 +0000
commit64ab8be9c34528ef71ca5c58ff075ed57a48c9e0 (patch)
tree82c66d38e2d710d780b22c71d5afd6e00297082e /qpid/extras/dispatch/src
parentcf474845241b0c206710dbd9c87fe2e752c512a0 (diff)
downloadqpid-python-64ab8be9c34528ef71ca5c58ff075ed57a48c9e0.tar.gz
NO-JIRA - Development update to Dispatch Router
- Began refactoring of the routing table to support in-process destinations and multi-hop paths. - Added API for the internal management agent. Began integrating the agent with the router module for communication. - Added field parsing to handle topological addresses. - Added tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1476280 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
-rw-r--r--qpid/extras/dispatch/src/agent.c46
-rw-r--r--qpid/extras/dispatch/src/container.c83
-rw-r--r--qpid/extras/dispatch/src/dispatch.c27
-rw-r--r--qpid/extras/dispatch/src/hash.c68
-rw-r--r--qpid/extras/dispatch/src/iterator.c231
-rw-r--r--qpid/extras/dispatch/src/log.c53
-rw-r--r--qpid/extras/dispatch/src/message.c8
-rw-r--r--qpid/extras/dispatch/src/router_node.c190
-rw-r--r--qpid/extras/dispatch/src/server.c12
9 files changed, 564 insertions, 154 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 864d58fadc..aca9ab3560 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -18,15 +18,18 @@
*/
#include "dispatch_private.h"
+#include <qpid/dispatch/error.h>
#include <qpid/dispatch/agent.h>
+#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/hash.h>
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/message.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/router.h>
#include <string.h>
-
+#include <stdio.h>
typedef struct dx_agent_t {
dx_server_t *server;
@@ -35,6 +38,7 @@ typedef struct dx_agent_t {
dx_message_list_t out_fifo;
sys_mutex_t *lock;
dx_timer_t *timer;
+ dx_address_t *address;
} dx_agent_t;
@@ -46,12 +50,30 @@ struct dx_agent_class_t {
};
+typedef struct {
+ dx_agent_t *agent;
+ dx_message_t *response_msg;
+} dx_agent_request_t;
+
+ALLOC_DECLARE(dx_agent_request_t);
+ALLOC_DEFINE(dx_agent_request_t);
+
+
static void dx_agent_timer_handler(void *context)
{
// TODO - Process the in_fifo here
}
+static void dx_agent_rx_handler(void *context, dx_message_t *msg)
+{
+ dx_agent_t *agent = (dx_agent_t*) context;
+ DEQ_INSERT_TAIL(agent->in_fifo, msg);
+ dx_timer_schedule(agent->timer, 0);
+ printf("dx_agent_rx_handler - inbound message\n");
+}
+
+
dx_agent_t *dx_agent(dx_dispatch_t *dx)
{
dx_agent_t *agent = NEW(dx_agent_t);
@@ -59,8 +81,9 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
agent->class_hash = hash(6, 10, 1);
DEQ_INIT(agent->in_fifo);
DEQ_INIT(agent->out_fifo);
- agent->lock = sys_mutex();
- agent->timer = dx_timer(dx, dx_agent_timer_handler, agent);
+ agent->lock = sys_mutex();
+ agent->timer = dx_timer(dx, dx_agent_timer_handler, agent);
+ agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
return agent;
}
@@ -68,6 +91,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
void dx_agent_free(dx_agent_t *agent)
{
+ dx_router_unregister_address(agent->address);
sys_mutex_free(agent->lock);
dx_timer_free(agent->timer);
hash_free(agent->class_hash);
@@ -110,42 +134,42 @@ dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx,
}
-void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value)
+void dx_agent_value_string(const void *correlator, const char *key, const char *value)
{
}
-void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value)
+void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value)
{
}
-void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key)
+void dx_agent_value_null(const void *correlator, const char *key)
{
}
-void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value)
+void dx_agent_value_boolean(const void *correlator, const char *key, bool value)
{
}
-void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len)
+void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len)
{
}
-void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value)
+void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value)
{
}
-void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value)
+void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value)
{
}
-void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more)
+void dx_agent_value_complete(const void *correlator, bool more)
{
}
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index 0b31ab8ab5..e65d0c4b63 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -30,6 +30,7 @@
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/agent.h>
static char *module="CONTAINER";
@@ -63,34 +64,44 @@ typedef struct dxc_node_type_t {
} dxc_node_type_t;
DEQ_DECLARE(dxc_node_type_t, dxc_node_type_list_t);
+static int DX_CONTAINER_CLASS_CONTAINER = 1;
+static int DX_CONTAINER_CLASS_NODE_TYPE = 2;
+static int DX_CONTAINER_CLASS_NODE = 3;
+
+typedef struct container_class_t {
+ dx_container_t *container;
+ int class_id;
+} container_class_t;
struct dx_container_t {
+ dx_dispatch_t *dx;
dx_server_t *server;
hash_t *node_type_map;
hash_t *node_map;
sys_mutex_t *lock;
dx_node_t *default_node;
dxc_node_type_list_t node_type_list;
+ dx_agent_class_t *class_container;
+ dx_agent_class_t *class_node_type;
+ dx_agent_class_t *class_node;
};
static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link)
{
sys_mutex_lock(container->lock);
- dx_node_t *node;
- int result;
+ dx_node_t *node = 0;
const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
dx_field_iterator_t *iter;
// TODO - Extract the name from the structured source
if (source) {
iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
- result = hash_retrieve(container->node_map, iter, (void*) &node);
+ hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
- } else
- result = -1;
+ }
sys_mutex_unlock(container->lock);
- if (result < 0) {
+ if (node == 0) {
if (container->default_node)
node = container->default_node;
else {
@@ -119,21 +130,19 @@ static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link)
static void setup_incoming_link(dx_container_t *container, pn_link_t *pn_link)
{
sys_mutex_lock(container->lock);
- dx_node_t *node;
- int result;
+ dx_node_t *node = 0;
const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
dx_field_iterator_t *iter;
// TODO - Extract the name from the structured target
if (target) {
iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
- result = hash_retrieve(container->node_map, iter, (void*) &node);
+ hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
- } else
- result = -1;
+ }
sys_mutex_unlock(container->lock);
- if (result < 0) {
+ if (node == 0) {
if (container->default_node)
node = container->default_node;
else {
@@ -404,10 +413,49 @@ static int handler(void *handler_context, void *conn_context, dx_conn_event_t ev
}
+static void container_schema_handler(void *context, const void *correlator)
+{
+}
+
+
+static void container_query_handler(void* context, const char *id, const void *correlator)
+{
+ container_class_t *cls = (container_class_t*) context;
+
+ if (cls->class_id == DX_CONTAINER_CLASS_CONTAINER) {
+ dx_agent_value_uint(correlator, "node_type_count", hash_size(cls->container->node_type_map));
+ dx_agent_value_uint(correlator, "node_count", hash_size(cls->container->node_map));
+ if (cls->container->default_node)
+ dx_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name);
+ else
+ dx_agent_value_null(correlator, "default_node_type");
+ dx_agent_value_complete(correlator, false);
+
+ } else if (cls->class_id == DX_CONTAINER_CLASS_NODE_TYPE) {
+
+ } else if (cls->class_id == DX_CONTAINER_CLASS_NODE) {
+
+ }
+}
+
+
+dx_agent_class_t *setup_class(dx_container_t *container, const char *fqname, int id)
+{
+ container_class_t *cls = NEW(container_class_t);
+ cls->container = container;
+ cls->class_id = id;
+
+ return dx_agent_register_class(container->dx, fqname, cls,
+ container_schema_handler,
+ container_query_handler);
+}
+
+
dx_container_t *dx_container(dx_dispatch_t *dx)
{
dx_container_t *container = NEW(dx_container_t);
+ container->dx = dx;
container->server = dx->server;
container->node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
container->node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
@@ -422,6 +470,17 @@ dx_container_t *dx_container(dx_dispatch_t *dx)
}
+void dx_container_setup_agent(dx_dispatch_t *dx)
+{
+ dx->container->class_container =
+ setup_class(dx->container, "org.apache.qpid.dispatch.container", DX_CONTAINER_CLASS_CONTAINER);
+ dx->container->class_node_type =
+ setup_class(dx->container, "org.apache.qpid.dispatch.container.node_type", DX_CONTAINER_CLASS_NODE_TYPE);
+ dx->container->class_node =
+ setup_class(dx->container, "org.apache.qpid.dispatch.container.node", DX_CONTAINER_CLASS_NODE);
+}
+
+
void dx_container_free(dx_container_t *container)
{
// TODO - Free the nodes
diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c
index 87eb535b8a..47a1a07330 100644
--- a/qpid/extras/dispatch/src/dispatch.c
+++ b/qpid/extras/dispatch/src/dispatch.c
@@ -24,27 +24,44 @@
/**
* Private Function Prototypes
*/
-dx_server_t *dx_server(int tc);
+dx_server_t *dx_server(int tc, const char *container_name);
+void dx_server_setup_agent(dx_dispatch_t *dx);
void dx_server_free(dx_server_t *server);
dx_container_t *dx_container(dx_dispatch_t *dx);
+void dx_container_setup_agent(dx_dispatch_t *dx);
void dx_container_free(dx_container_t *container);
-dx_router_t *dx_router(dx_dispatch_t *dx);
+dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id);
+void dx_router_setup_agent(dx_dispatch_t *dx);
void dx_router_free(dx_router_t *router);
dx_agent_t *dx_agent(dx_dispatch_t *dx);
void dx_agent_free(dx_agent_t *agent);
-dx_dispatch_t *dx_dispatch(int thread_count)
+dx_dispatch_t *dx_dispatch(int thread_count, const char *container_name,
+ const char *router_area, const char *router_id)
{
dx_dispatch_t *dx = NEW(dx_dispatch_t);
dx_alloc_initialize();
- dx->server = dx_server(thread_count);
+ if (!container_name)
+ container_name = "00000000-0000-0000-0000-000000000000"; // TODO - gen a real uuid
+
+ if (!router_area)
+ router_area = "area";
+
+ if (!router_id)
+ router_id = container_name;
+
+ dx->server = dx_server(thread_count, container_name);
dx->container = dx_container(dx);
- dx->router = dx_router(dx);
+ dx->router = dx_router(dx, router_area, router_id);
dx->agent = dx_agent(dx);
+ dx_server_setup_agent(dx);
+ dx_container_setup_agent(dx);
+ dx_router_setup_agent(dx);
+
return dx;
}
diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c
index c54d5d6fcf..19744366aa 100644
--- a/qpid/extras/dispatch/src/hash.c
+++ b/qpid/extras/dispatch/src/hash.c
@@ -58,6 +58,7 @@ static unsigned long hash_function(dx_field_iterator_t *iter)
unsigned long hash = 5381;
int c;
+ dx_field_iterator_reset(iter);
while (!dx_field_iterator_end(iter)) {
c = (int) dx_field_iterator_octet(iter);
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
@@ -101,13 +102,11 @@ size_t hash_size(hash_t *h)
}
-static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error)
+static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists)
{
unsigned long idx = hash_function(key) & h->bucket_mask;
hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
- *error = 0;
-
while (item) {
if (dx_field_iterator_equal(key, item->key))
break;
@@ -115,40 +114,44 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
}
if (item) {
- *error = -1;
- return 0;
+ *exists = 1;
+ return item;
}
item = new_hash_item_t();
- if (!item) {
- *error = -2;
+ if (!item)
return 0;
- }
DEQ_ITEM_INIT(item);
item->key = dx_field_iterator_copy(key);
DEQ_INSERT_TAIL(h->buckets[idx].items, item);
h->size++;
+ *exists = 0;
return item;
}
-int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
{
- int error = 0;
- hash_item_t *item = hash_internal_insert(h, key, &error);
+ int exists = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &exists);
- if (item)
- item->v.val = val;
- return error;
+ if (!item)
+ return DX_ERROR_ALLOC;
+
+ if (exists)
+ return DX_ERROR_ALREADY_EXISTS;
+
+ item->v.val = val;
+
+ return DX_ERROR_NONE;
}
-int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
{
- if (!h->is_const)
- return -3;
+ assert(h->is_const);
int error = 0;
hash_item_t *item = hash_internal_insert(h, key, &error);
@@ -174,32 +177,33 @@ static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key)
}
-int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
+dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
{
hash_item_t *item = hash_internal_retrieve(h, key);
- if (item) {
+ if (item)
*val = item->v.val;
- return 0;
- }
- return -1;
+ else
+ *val = 0;
+
+ return DX_ERROR_NONE;
}
-int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
+dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
{
- if (!h->is_const)
- return -3;
+ assert(h->is_const);
hash_item_t *item = hash_internal_retrieve(h, key);
- if (item) {
+ if (item)
*val = item->v.val_const;
- return 0;
- }
- return -1;
+ else
+ *val = 0;
+
+ return DX_ERROR_NONE;
}
-int hash_remove(hash_t *h, dx_field_iterator_t *key)
+dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key)
{
unsigned long idx = hash_function(key) & h->bucket_mask;
hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
@@ -215,9 +219,9 @@ int hash_remove(hash_t *h, dx_field_iterator_t *key)
DEQ_REMOVE(h->buckets[idx].items, item);
free_hash_item_t(item);
h->size--;
- return 0;
+ return DX_ERROR_NONE;
}
- return -1;
+ return DX_ERROR_NOT_FOUND;
}
diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c
index 6ab67f948d..92a7a1f479 100644
--- a/qpid/extras/dispatch/src/iterator.c
+++ b/qpid/extras/dispatch/src/iterator.c
@@ -25,19 +25,25 @@
#include <string.h>
typedef enum {
-MODE_TO_END,
-MODE_TO_SLASH
+ MODE_TO_END,
+ MODE_TO_SLASH
} parse_mode_t;
+typedef struct {
+ dx_buffer_t *buffer;
+ unsigned char *cursor;
+ int length;
+} pointer_t;
+
struct dx_field_iterator_t {
- dx_buffer_t *start_buffer;
- unsigned char *start_cursor;
- int start_length;
- dx_buffer_t *buffer;
- unsigned char *cursor;
- int length;
+ pointer_t start_pointer;
+ pointer_t view_start_pointer;
+ pointer_t pointer;
dx_iterator_view_t view;
parse_mode_t mode;
+ unsigned char prefix;
+ int at_prefix;
+ int view_prefix;
};
@@ -46,28 +52,86 @@ ALLOC_DEFINE(dx_field_iterator_t);
typedef enum {
-STATE_START,
-STATE_SLASH_LEFT,
-STATE_SKIPPING_TO_NEXT_SLASH,
-STATE_SCANNING,
-STATE_COLON,
-STATE_COLON_SLASH,
-STATE_AT_NODE_ID
+ STATE_START,
+ STATE_SLASH_LEFT,
+ STATE_SKIPPING_TO_NEXT_SLASH,
+ STATE_SCANNING,
+ STATE_COLON,
+ STATE_COLON_SLASH,
+ STATE_AT_NODE_ID
} state_t;
+static char *my_area = "";
+static char *my_router = "";
+
+
+static void parse_address_view(dx_field_iterator_t *iter)
+{
+ //
+ // This function starts with an iterator view that is identical to
+ // ITER_VIEW_NO_HOST. We will now further refine the view in order
+ // to aid the router in looking up addresses.
+ //
+
+ if (dx_field_iterator_prefix(iter, "_")) {
+ if (dx_field_iterator_prefix(iter, "local/")) {
+ iter->prefix = 'L';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ return;
+ }
+
+ if (dx_field_iterator_prefix(iter, "topo/")) {
+ if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_area)) {
+ if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_router)) {
+ iter->prefix = 'L';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ return;
+ }
+
+ iter->prefix = 'R';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ iter->mode = MODE_TO_SLASH;
+ return;
+ }
+
+ iter->prefix = 'A';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ iter->mode = MODE_TO_SLASH;
+ return;
+ }
+ }
+
+ iter->prefix = 'M';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+}
+
+
static void view_initialize(dx_field_iterator_t *iter)
{
- if (iter->view == ITER_VIEW_ALL) {
- iter->mode = MODE_TO_END;
+ //
+ // The default behavior is for the view to *not* have a prefix.
+ // We'll add one if it's needed later.
+ //
+ iter->at_prefix = 0;
+ iter->view_prefix = 0;
+ iter->mode = MODE_TO_END;
+
+ if (iter->view == ITER_VIEW_ALL)
return;
- }
//
// Advance to the node-id.
//
- state_t state = STATE_START;
- unsigned int octet;
+ state_t state = STATE_START;
+ unsigned int octet;
+ pointer_t save_pointer = {0,0,0};
+
while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) {
octet = dx_field_iterator_octet(iter);
switch (state) {
@@ -96,17 +160,20 @@ static void view_initialize(dx_field_iterator_t *iter)
break;
case STATE_COLON :
- if (octet == '/')
+ if (octet == '/') {
state = STATE_COLON_SLASH;
- else
+ save_pointer = iter->pointer;
+ } else
state = STATE_SCANNING;
break;
case STATE_COLON_SLASH :
if (octet == '/')
state = STATE_SKIPPING_TO_NEXT_SLASH;
- else
- state = STATE_SCANNING;
+ else {
+ state = STATE_AT_NODE_ID;
+ iter->pointer = save_pointer;
+ }
break;
case STATE_AT_NODE_ID :
@@ -119,9 +186,7 @@ static void view_initialize(dx_field_iterator_t *iter)
// The address string was relative, not absolute. The node-id
// is at the beginning of the string.
//
- iter->buffer = iter->start_buffer;
- iter->cursor = iter->start_cursor;
- iter->length = iter->start_length;
+ iter->pointer = iter->start_pointer;
}
//
@@ -137,6 +202,12 @@ static void view_initialize(dx_field_iterator_t *iter)
return;
}
+ if (iter->view == ITER_VIEW_ADDRESS_HASH) {
+ iter->mode = MODE_TO_END;
+ parse_address_view(iter);
+ return;
+ }
+
if (iter->view == ITER_VIEW_NODE_SPECIFIC) {
iter->mode = MODE_TO_END;
while (!dx_field_iterator_end(iter)) {
@@ -149,17 +220,29 @@ static void view_initialize(dx_field_iterator_t *iter)
}
+void dx_field_iterator_set_address(const char *area, const char *router)
+{
+ my_area = (char*) malloc(strlen(area) + 2);
+ strcpy(my_area, area);
+ strcat(my_area, "/");
+
+ my_router = (char*) malloc(strlen(router) + 2);
+ strcpy(my_router, router);
+ strcat(my_router, "/");
+}
+
+
dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view)
{
dx_field_iterator_t *iter = new_dx_field_iterator_t();
if (!iter)
return 0;
- iter->start_buffer = 0;
- iter->start_cursor = (unsigned char*) text;
- iter->start_length = strlen(text);
+ iter->start_pointer.buffer = 0;
+ iter->start_pointer.cursor = (unsigned char*) text;
+ iter->start_pointer.length = strlen(text);
- dx_field_iterator_reset(iter, view);
+ dx_field_iterator_reset_view(iter, view);
return iter;
}
@@ -171,11 +254,11 @@ dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, i
if (!iter)
return 0;
- iter->start_buffer = buffer;
- iter->start_cursor = dx_buffer_base(buffer) + offset;
- iter->start_length = length;
+ iter->start_pointer.buffer = buffer;
+ iter->start_pointer.cursor = dx_buffer_base(buffer) + offset;
+ iter->start_pointer.length = length;
- dx_field_iterator_reset(iter, view);
+ dx_field_iterator_reset_view(iter, view);
return iter;
}
@@ -187,40 +270,52 @@ void dx_field_iterator_free(dx_field_iterator_t *iter)
}
-void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view)
+void dx_field_iterator_reset(dx_field_iterator_t *iter)
+{
+ iter->pointer = iter->view_start_pointer;
+ iter->at_prefix = iter->view_prefix;
+}
+
+
+void dx_field_iterator_reset_view(dx_field_iterator_t *iter, dx_iterator_view_t view)
{
- iter->buffer = iter->start_buffer;
- iter->cursor = iter->start_cursor;
- iter->length = iter->start_length;
- iter->view = view;
+ iter->pointer = iter->start_pointer;
+ iter->view = view;
view_initialize(iter);
+
+ iter->view_start_pointer = iter->pointer;
}
unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter)
{
- if (iter->length == 0)
+ if (iter->at_prefix) {
+ iter->at_prefix = 0;
+ return iter->prefix;
+ }
+
+ if (iter->pointer.length == 0)
return (unsigned char) 0;
- unsigned char result = *(iter->cursor);
+ unsigned char result = *(iter->pointer.cursor);
- iter->cursor++;
- iter->length--;
+ iter->pointer.cursor++;
+ iter->pointer.length--;
- if (iter->length > 0) {
- if (iter->buffer) {
- if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) {
- iter->buffer = iter->buffer->next;
- if (iter->buffer == 0)
- iter->length = 0;
- iter->cursor = dx_buffer_base(iter->buffer);
+ if (iter->pointer.length > 0) {
+ if (iter->pointer.buffer) {
+ if (iter->pointer.cursor - dx_buffer_base(iter->pointer.buffer) == dx_buffer_size(iter->pointer.buffer)) {
+ iter->pointer.buffer = iter->pointer.buffer->next;
+ if (iter->pointer.buffer == 0)
+ iter->pointer.length = 0;
+ iter->pointer.cursor = dx_buffer_base(iter->pointer.buffer);
}
}
}
- if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/')
- iter->length = 0;
+ if (iter->pointer.length && iter->mode == MODE_TO_SLASH && *(iter->pointer.cursor) == '/')
+ iter->pointer.length = 0;
return result;
}
@@ -228,13 +323,13 @@ unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter)
int dx_field_iterator_end(dx_field_iterator_t *iter)
{
- return iter->length == 0;
+ return iter->pointer.length == 0;
}
-int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string)
+int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string)
{
- dx_field_iterator_reset(iter, iter->view);
+ dx_field_iterator_reset(iter);
while (!dx_field_iterator_end(iter) && *string) {
if (*string != dx_field_iterator_octet(iter))
return 0;
@@ -245,19 +340,39 @@ int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string)
}
+int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix)
+{
+ pointer_t save_pointer = iter->pointer;
+ unsigned char *c = (unsigned char*) prefix;
+
+ while(*c) {
+ if (*c != dx_field_iterator_octet(iter))
+ break;
+ c++;
+ }
+
+ if (*c) {
+ iter->pointer = save_pointer;
+ return 0;
+ }
+
+ return 1;
+}
+
+
unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter)
{
int length = 0;
int idx = 0;
unsigned char *copy;
- dx_field_iterator_reset(iter, iter->view);
+ dx_field_iterator_reset(iter);
while (!dx_field_iterator_end(iter)) {
dx_field_iterator_octet(iter);
length++;
}
- dx_field_iterator_reset(iter, iter->view);
+ dx_field_iterator_reset(iter);
copy = (unsigned char*) malloc(length + 1);
while (!dx_field_iterator_end(iter))
copy[idx++] = dx_field_iterator_octet(iter);
diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c
index d4ec534915..c6cffe0321 100644
--- a/qpid/extras/dispatch/src/log.c
+++ b/qpid/extras/dispatch/src/log.c
@@ -18,11 +18,36 @@
*/
#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
+#include <sys/time.h>
-static int mask=LOG_INFO;
+#define TEXT_MAX 512
+#define LIST_MAX 1000
+
+typedef struct dx_log_entry_t dx_log_entry_t;
+
+struct dx_log_entry_t {
+ DEQ_LINKS(dx_log_entry_t);
+ const char *module;
+ int cls;
+ const char *file;
+ int line;
+ struct timeval tv;
+ char text[TEXT_MAX];
+};
+
+ALLOC_DECLARE(dx_log_entry_t);
+ALLOC_DEFINE(dx_log_entry_t);
+
+DEQ_DECLARE(dx_log_entry_t, dx_log_list_t);
+
+static int mask = LOG_INFO;
+static dx_log_list_t entries;
+static int list_init = 0;
static char *cls_prefix(int cls)
{
@@ -35,18 +60,36 @@ static char *cls_prefix(int cls)
return "";
}
-void dx_log(const char *module, int cls, const char *fmt, ...)
+void dx_log_impl(const char *module, int cls, const char *file, int line, const char *fmt, ...)
{
if (!(cls & mask))
return;
+ if (list_init == 0) {
+ list_init = 1;
+ DEQ_INIT(entries);
+ }
+
+ dx_log_entry_t *entry = new_dx_log_entry_t();
+ entry->module = module;
+ entry->cls = cls;
+ entry->file = file;
+ entry->line = line;
+ gettimeofday(&entry->tv, 0);
+
va_list ap;
- char line[128];
va_start(ap, fmt);
- vsnprintf(line, 127, fmt, ap);
+ vsnprintf(entry->text, TEXT_MAX, fmt, ap);
va_end(ap);
- fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line);
+ fprintf(stderr, "%s (%s) %s\n", module, cls_prefix(cls), entry->text);
+
+ DEQ_INSERT_TAIL(entries, entry);
+ if (DEQ_SIZE(entries) > LIST_MAX) {
+ entry = DEQ_HEAD(entries);
+ DEQ_REMOVE_HEAD(entries);
+ free_dx_log_entry_t(entry);
+ }
}
void dx_log_set_mask(int _mask)
diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c
index c914c5ca7b..fd08753f4d 100644
--- a/qpid/extras/dispatch/src/message.c
+++ b/qpid/extras/dispatch/src/message.c
@@ -1081,19 +1081,19 @@ void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t le
}
-void dx_message_insert_string(dx_message_t *msg, const char *start)
+void dx_message_insert_string(dx_message_t *msg, const char *str)
{
dx_message_content_t *content = MSG_CONTENT(msg);
- uint32_t len = strlen(start);
+ uint32_t len = strlen(str);
if (len < 256) {
dx_insert_8(content, 0xa1); // str8-utf8
dx_insert_8(content, (uint8_t) len);
- dx_insert(content, (const uint8_t*) start, len);
+ dx_insert(content, (const uint8_t*) str, len);
} else {
dx_insert_8(content, 0xb1); // str32-utf8
dx_insert_32(content, len);
- dx_insert(content, (const uint8_t*) start, len);
+ dx_insert(content, (const uint8_t*) str, len);
}
content->count++;
}
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 0513b08a6b..65756be215 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -18,13 +18,34 @@
*/
#include <stdio.h>
+#include <string.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
-static char *module="ROUTER_NODE";
+static char *module = "ROUTER";
+
+//static char *local_prefix = "_local/";
+//static char *topo_prefix = "_topo/";
+
+/**
+ * Address Types and Processing:
+ *
+ * Address Hash Compare onReceive onEmit
+ * =============================================================================
+ * _local/<local> L<local> handler forward
+ * _topo/<area>/<router>/<local> A<area> forward forward
+ * _topo/<my-area>/<router>/<local> R<router> forward forward
+ * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward
+ * _topo/<area>/all/<local> A<area> forward forward
+ * _topo/<my-area>/all/<local> L<local> forward+handler forward
+ * _topo/all/all/<local> L<local> forward+handler forward
+ * <mobile> M<mobile> forward+handler forward
+ */
struct dx_router_t {
dx_dispatch_t *dx;
+ const char *router_area;
+ const char *router_id;
dx_node_t *node;
dx_link_list_t in_links;
dx_link_list_t out_links;
@@ -41,11 +62,32 @@ typedef struct {
dx_message_list_t out_fifo;
} dx_router_link_t;
-
ALLOC_DECLARE(dx_router_link_t);
ALLOC_DEFINE(dx_router_link_t);
+typedef struct {
+ const char *id;
+ dx_router_link_t *next_hop;
+ // list of valid origins (pointers to router_node) - (bit masks?)
+} dx_router_node_t;
+
+ALLOC_DECLARE(dx_router_node_t);
+ALLOC_DEFINE(dx_router_node_t);
+
+
+struct dx_address_t {
+ int is_local;
+ dx_router_message_cb handler; // In-Process Consumer
+ void *handler_context;
+ dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list
+ dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list
+};
+
+ALLOC_DECLARE(dx_address_t);
+ALLOC_DEFINE(dx_address_t);
+
+
/**
* Outbound Delivery Handler
*/
@@ -119,22 +161,42 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
if (valid_message) {
dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
- dx_router_link_t *rlink;
+ dx_address_t *addr;
if (iter) {
- dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
- int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+ hash_retrieve(router->out_hash, iter, (void*) &addr);
dx_field_iterator_free(iter);
- if (result == 0) {
+ if (addr) {
+ //
+ // To field is valid and contains a known destination. Handle the various
+ // cases for forwarding.
+ //
+ // Forward to the in-process handler for this message if there is one.
+ //
+ if (addr->handler)
+ addr->handler(addr->handler_context, msg);
+
//
- // To field is valid and contains a known destination. Enqueue on
- // the output fifo for the next-hop-to-destination.
+ // Forward to the local link for the locally-connected consumer, if present.
+ // TODO - Don't forward if this is a "_local" address.
//
- pn_link_t* pn_outlink = dx_link_pn(rlink->link);
- DEQ_INSERT_TAIL(rlink->out_fifo, msg);
- pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo));
- dx_link_activate(rlink->link);
+ if (addr->rlink) {
+ pn_link_t* pn_outlink = dx_link_pn(addr->rlink->link);
+ DEQ_INSERT_TAIL(addr->rlink->out_fifo, msg);
+ pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
+ dx_link_activate(addr->rlink->link);
+ }
+
+ //
+ // Forward to the next-hop for a remotely-connected consumer, if present.
+ // Don't forward if this is a "_local" address.
+ //
+ if (addr->rnode) {
+ // TODO
+ }
+
} else {
//
// To field contains an unknown address. Release the message.
@@ -242,17 +304,30 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
pn_link_t *pn_link = dx_link_pn(link);
const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- sys_mutex_lock(router->lock);
dx_router_link_t *rlink = new_dx_router_link_t();
rlink->link = link;
DEQ_INIT(rlink->out_fifo);
dx_link_set_context(link, rlink);
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
- int result = hash_insert(router->out_hash, iter, rlink);
+ dx_address_t *addr;
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->is_local = 0;
+ addr->handler = 0;
+ addr->handler_context = 0;
+ addr->rlink = 0;
+ addr->rnode = 0;
+ hash_insert(router->out_hash, iter, addr);
+ }
dx_field_iterator_free(iter);
- if (result == 0) {
+ if (addr->rlink == 0) {
+ addr->rlink = rlink;
pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
pn_link_open(pn_link);
@@ -314,14 +389,14 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed
if (pn_link_is_sender(pn_link)) {
item = DEQ_HEAD(router->out_links);
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
- dx_router_link_t *rlink;
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
if (iter) {
- int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
- if (result == 0) {
- dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (addr) {
hash_remove(router->out_hash, iter);
- free_dx_router_link_t(rlink);
+ free_dx_router_link_t(addr->rlink);
+ free_dx_address_t(addr);
dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
}
dx_field_iterator_free(iter);
@@ -383,7 +458,7 @@ static dx_node_type_t router_node = {"router", 0, 0,
static int type_registered = 0;
-dx_router_t *dx_router(dx_dispatch_t *dx)
+dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
{
if (!type_registered) {
type_registered = 1;
@@ -397,8 +472,10 @@ dx_router_t *dx_router(dx_dispatch_t *dx)
DEQ_INIT(router->out_links);
DEQ_INIT(router->in_fifo);
- router->dx = dx;
- router->lock = sys_mutex();
+ router->dx = dx;
+ router->lock = sys_mutex();
+ router->router_area = area;
+ router->router_id = id;
router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
dx_timer_schedule(router->timer, 0); // Immediate
@@ -406,10 +483,22 @@ dx_router_t *dx_router(dx_dispatch_t *dx)
router->out_hash = hash(10, 32, 0);
router->dtag = 1;
+ //
+ // Inform the field iterator module of this router's id and area. The field iterator
+ // uses this to offload some of the address-processing load from the router.
+ //
+ dx_field_iterator_set_address(area, id);
+
return router;
}
+void dx_router_setup_agent(dx_dispatch_t *dx)
+{
+ // TODO
+}
+
+
void dx_router_free(dx_router_t *router)
{
dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH);
@@ -417,3 +506,54 @@ void dx_router_free(dx_router_t *router)
free(router);
}
+
+dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
+ bool is_local,
+ const char *address,
+ dx_router_message_cb handler,
+ void *context)
+{
+ char addr[1000];
+ dx_address_t *ad = new_dx_address_t();
+ dx_field_iterator_t *iter;
+ int result;
+
+ if (!ad)
+ return 0;
+
+ ad->is_local = is_local;
+ ad->handler = handler;
+ ad->handler_context = context;
+ ad->rlink = 0;
+
+ if (ad->is_local)
+ strcpy(addr, "L"); // Local Hash-Key Space
+ else
+ strcpy(addr, "M"); // Mobile Hash-Key Space
+
+ strcat(addr, address);
+ iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST);
+ result = hash_insert(dx->router->out_hash, iter, ad);
+ dx_field_iterator_free(iter);
+ if (result != 0) {
+ free_dx_address_t(ad);
+ return 0;
+ }
+
+ dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
+ return ad;
+}
+
+
+void dx_router_unregister_address(dx_address_t *ad)
+{
+ free_dx_address_t(ad);
+}
+
+
+void dx_router_send(dx_dispatch_t *dx,
+ const char *address,
+ dx_message_t *msg)
+{
+}
+
diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c
index a2d2d4980a..536af048d8 100644
--- a/qpid/extras/dispatch/src/server.c
+++ b/qpid/extras/dispatch/src/server.c
@@ -44,6 +44,7 @@ typedef struct dx_thread_t {
struct dx_server_t {
int thread_count;
+ const char *container_name;
pn_driver_t *driver;
dx_thread_start_cb_t start_handler;
dx_conn_handler_cb_t conn_handler;
@@ -201,7 +202,7 @@ static void process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
ctx->state = CONN_STATE_OPERATIONAL;
pn_connection_t *conn = pn_connection();
- pn_connection_set_container(conn, "dispatch"); // TODO - make unique
+ pn_connection_set_container(conn, dx_server->container_name);
pn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
@@ -557,7 +558,7 @@ static void cxtr_try_open(void *context)
}
-dx_server_t *dx_server(int thread_count)
+dx_server_t *dx_server(int thread_count, const char *container_name)
{
int i;
@@ -566,6 +567,7 @@ dx_server_t *dx_server(int thread_count)
return 0;
dx_server->thread_count = thread_count;
+ dx_server->container_name = container_name;
dx_server->driver = pn_driver();
dx_server->start_handler = 0;
dx_server->conn_handler = 0;
@@ -596,6 +598,12 @@ dx_server_t *dx_server(int thread_count)
}
+void dx_server_setup_agent(dx_dispatch_t *dx)
+{
+ // TODO
+}
+
+
void dx_server_free(dx_server_t *dx_server)
{
int i;