diff options
| author | Ted Ross <tross@apache.org> | 2013-04-26 16:34:33 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-04-26 16:34:33 +0000 |
| commit | 64ab8be9c34528ef71ca5c58ff075ed57a48c9e0 (patch) | |
| tree | 82c66d38e2d710d780b22c71d5afd6e00297082e /qpid/extras/dispatch/src | |
| parent | cf474845241b0c206710dbd9c87fe2e752c512a0 (diff) | |
| download | qpid-python-64ab8be9c34528ef71ca5c58ff075ed57a48c9e0.tar.gz | |
NO-JIRA - Development update to Dispatch Router
- Began refactoring of the routing table to support in-process destinations and multi-hop paths.
- Added API for the internal management agent. Began integrating the agent with the router
module for communication.
- Added field parsing to handle topological addresses.
- Added tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1476280 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
| -rw-r--r-- | qpid/extras/dispatch/src/agent.c | 46 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/container.c | 83 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/dispatch.c | 27 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/hash.c | 68 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/iterator.c | 231 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/log.c | 53 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message.c | 8 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 190 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/server.c | 12 |
9 files changed, 564 insertions, 154 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 864d58fadc..aca9ab3560 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -18,15 +18,18 @@ */ #include "dispatch_private.h" +#include <qpid/dispatch/error.h> #include <qpid/dispatch/agent.h> +#include <qpid/dispatch/alloc.h> #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/hash.h> #include <qpid/dispatch/container.h> #include <qpid/dispatch/message.h> #include <qpid/dispatch/threading.h> #include <qpid/dispatch/timer.h> +#include <qpid/dispatch/router.h> #include <string.h> - +#include <stdio.h> typedef struct dx_agent_t { dx_server_t *server; @@ -35,6 +38,7 @@ typedef struct dx_agent_t { dx_message_list_t out_fifo; sys_mutex_t *lock; dx_timer_t *timer; + dx_address_t *address; } dx_agent_t; @@ -46,12 +50,30 @@ struct dx_agent_class_t { }; +typedef struct { + dx_agent_t *agent; + dx_message_t *response_msg; +} dx_agent_request_t; + +ALLOC_DECLARE(dx_agent_request_t); +ALLOC_DEFINE(dx_agent_request_t); + + static void dx_agent_timer_handler(void *context) { // TODO - Process the in_fifo here } +static void dx_agent_rx_handler(void *context, dx_message_t *msg) +{ + dx_agent_t *agent = (dx_agent_t*) context; + DEQ_INSERT_TAIL(agent->in_fifo, msg); + dx_timer_schedule(agent->timer, 0); + printf("dx_agent_rx_handler - inbound message\n"); +} + + dx_agent_t *dx_agent(dx_dispatch_t *dx) { dx_agent_t *agent = NEW(dx_agent_t); @@ -59,8 +81,9 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx) agent->class_hash = hash(6, 10, 1); DEQ_INIT(agent->in_fifo); DEQ_INIT(agent->out_fifo); - agent->lock = sys_mutex(); - agent->timer = dx_timer(dx, dx_agent_timer_handler, agent); + agent->lock = sys_mutex(); + agent->timer = dx_timer(dx, dx_agent_timer_handler, agent); + agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent); return agent; } @@ -68,6 +91,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx) void dx_agent_free(dx_agent_t *agent) { + dx_router_unregister_address(agent->address); sys_mutex_free(agent->lock); dx_timer_free(agent->timer); hash_free(agent->class_hash); @@ -110,42 +134,42 @@ dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx, } -void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value) +void dx_agent_value_string(const void *correlator, const char *key, const char *value) { } -void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value) +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value) { } -void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key) +void dx_agent_value_null(const void *correlator, const char *key) { } -void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value) +void dx_agent_value_boolean(const void *correlator, const char *key, bool value) { } -void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len) +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len) { } -void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value) +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value) { } -void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value) +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value) { } -void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more) +void dx_agent_value_complete(const void *correlator, bool more) { } diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c index 0b31ab8ab5..e65d0c4b63 100644 --- a/qpid/extras/dispatch/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -30,6 +30,7 @@ #include <qpid/dispatch/threading.h> #include <qpid/dispatch/iterator.h> #include <qpid/dispatch/log.h> +#include <qpid/dispatch/agent.h> static char *module="CONTAINER"; @@ -63,34 +64,44 @@ typedef struct dxc_node_type_t { } dxc_node_type_t; DEQ_DECLARE(dxc_node_type_t, dxc_node_type_list_t); +static int DX_CONTAINER_CLASS_CONTAINER = 1; +static int DX_CONTAINER_CLASS_NODE_TYPE = 2; +static int DX_CONTAINER_CLASS_NODE = 3; + +typedef struct container_class_t { + dx_container_t *container; + int class_id; +} container_class_t; struct dx_container_t { + dx_dispatch_t *dx; dx_server_t *server; hash_t *node_type_map; hash_t *node_map; sys_mutex_t *lock; dx_node_t *default_node; dxc_node_type_list_t node_type_list; + dx_agent_class_t *class_container; + dx_agent_class_t *class_node_type; + dx_agent_class_t *class_node; }; static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link) { sys_mutex_lock(container->lock); - dx_node_t *node; - int result; + dx_node_t *node = 0; const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); dx_field_iterator_t *iter; // TODO - Extract the name from the structured source if (source) { iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); - result = hash_retrieve(container->node_map, iter, (void*) &node); + hash_retrieve(container->node_map, iter, (void*) &node); dx_field_iterator_free(iter); - } else - result = -1; + } sys_mutex_unlock(container->lock); - if (result < 0) { + if (node == 0) { if (container->default_node) node = container->default_node; else { @@ -119,21 +130,19 @@ static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link) static void setup_incoming_link(dx_container_t *container, pn_link_t *pn_link) { sys_mutex_lock(container->lock); - dx_node_t *node; - int result; + dx_node_t *node = 0; const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); dx_field_iterator_t *iter; // TODO - Extract the name from the structured target if (target) { iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); - result = hash_retrieve(container->node_map, iter, (void*) &node); + hash_retrieve(container->node_map, iter, (void*) &node); dx_field_iterator_free(iter); - } else - result = -1; + } sys_mutex_unlock(container->lock); - if (result < 0) { + if (node == 0) { if (container->default_node) node = container->default_node; else { @@ -404,10 +413,49 @@ static int handler(void *handler_context, void *conn_context, dx_conn_event_t ev } +static void container_schema_handler(void *context, const void *correlator) +{ +} + + +static void container_query_handler(void* context, const char *id, const void *correlator) +{ + container_class_t *cls = (container_class_t*) context; + + if (cls->class_id == DX_CONTAINER_CLASS_CONTAINER) { + dx_agent_value_uint(correlator, "node_type_count", hash_size(cls->container->node_type_map)); + dx_agent_value_uint(correlator, "node_count", hash_size(cls->container->node_map)); + if (cls->container->default_node) + dx_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name); + else + dx_agent_value_null(correlator, "default_node_type"); + dx_agent_value_complete(correlator, false); + + } else if (cls->class_id == DX_CONTAINER_CLASS_NODE_TYPE) { + + } else if (cls->class_id == DX_CONTAINER_CLASS_NODE) { + + } +} + + +dx_agent_class_t *setup_class(dx_container_t *container, const char *fqname, int id) +{ + container_class_t *cls = NEW(container_class_t); + cls->container = container; + cls->class_id = id; + + return dx_agent_register_class(container->dx, fqname, cls, + container_schema_handler, + container_query_handler); +} + + dx_container_t *dx_container(dx_dispatch_t *dx) { dx_container_t *container = NEW(dx_container_t); + container->dx = dx; container->server = dx->server; container->node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 container->node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 @@ -422,6 +470,17 @@ dx_container_t *dx_container(dx_dispatch_t *dx) } +void dx_container_setup_agent(dx_dispatch_t *dx) +{ + dx->container->class_container = + setup_class(dx->container, "org.apache.qpid.dispatch.container", DX_CONTAINER_CLASS_CONTAINER); + dx->container->class_node_type = + setup_class(dx->container, "org.apache.qpid.dispatch.container.node_type", DX_CONTAINER_CLASS_NODE_TYPE); + dx->container->class_node = + setup_class(dx->container, "org.apache.qpid.dispatch.container.node", DX_CONTAINER_CLASS_NODE); +} + + void dx_container_free(dx_container_t *container) { // TODO - Free the nodes diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c index 87eb535b8a..47a1a07330 100644 --- a/qpid/extras/dispatch/src/dispatch.c +++ b/qpid/extras/dispatch/src/dispatch.c @@ -24,27 +24,44 @@ /** * Private Function Prototypes */ -dx_server_t *dx_server(int tc); +dx_server_t *dx_server(int tc, const char *container_name); +void dx_server_setup_agent(dx_dispatch_t *dx); void dx_server_free(dx_server_t *server); dx_container_t *dx_container(dx_dispatch_t *dx); +void dx_container_setup_agent(dx_dispatch_t *dx); void dx_container_free(dx_container_t *container); -dx_router_t *dx_router(dx_dispatch_t *dx); +dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id); +void dx_router_setup_agent(dx_dispatch_t *dx); void dx_router_free(dx_router_t *router); dx_agent_t *dx_agent(dx_dispatch_t *dx); void dx_agent_free(dx_agent_t *agent); -dx_dispatch_t *dx_dispatch(int thread_count) +dx_dispatch_t *dx_dispatch(int thread_count, const char *container_name, + const char *router_area, const char *router_id) { dx_dispatch_t *dx = NEW(dx_dispatch_t); dx_alloc_initialize(); - dx->server = dx_server(thread_count); + if (!container_name) + container_name = "00000000-0000-0000-0000-000000000000"; // TODO - gen a real uuid + + if (!router_area) + router_area = "area"; + + if (!router_id) + router_id = container_name; + + dx->server = dx_server(thread_count, container_name); dx->container = dx_container(dx); - dx->router = dx_router(dx); + dx->router = dx_router(dx, router_area, router_id); dx->agent = dx_agent(dx); + dx_server_setup_agent(dx); + dx_container_setup_agent(dx); + dx_router_setup_agent(dx); + return dx; } diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c index c54d5d6fcf..19744366aa 100644 --- a/qpid/extras/dispatch/src/hash.c +++ b/qpid/extras/dispatch/src/hash.c @@ -58,6 +58,7 @@ static unsigned long hash_function(dx_field_iterator_t *iter) unsigned long hash = 5381; int c; + dx_field_iterator_reset(iter); while (!dx_field_iterator_end(iter)) { c = (int) dx_field_iterator_octet(iter); hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ @@ -101,13 +102,11 @@ size_t hash_size(hash_t *h) } -static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error) +static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); - *error = 0; - while (item) { if (dx_field_iterator_equal(key, item->key)) break; @@ -115,40 +114,44 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in } if (item) { - *error = -1; - return 0; + *exists = 1; + return item; } item = new_hash_item_t(); - if (!item) { - *error = -2; + if (!item) return 0; - } DEQ_ITEM_INIT(item); item->key = dx_field_iterator_copy(key); DEQ_INSERT_TAIL(h->buckets[idx].items, item); h->size++; + *exists = 0; return item; } -int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) +dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) { - int error = 0; - hash_item_t *item = hash_internal_insert(h, key, &error); + int exists = 0; + hash_item_t *item = hash_internal_insert(h, key, &exists); - if (item) - item->v.val = val; - return error; + if (!item) + return DX_ERROR_ALLOC; + + if (exists) + return DX_ERROR_ALREADY_EXISTS; + + item->v.val = val; + + return DX_ERROR_NONE; } -int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) +dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) { - if (!h->is_const) - return -3; + assert(h->is_const); int error = 0; hash_item_t *item = hash_internal_insert(h, key, &error); @@ -174,32 +177,33 @@ static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key) } -int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val) +dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val) { hash_item_t *item = hash_internal_retrieve(h, key); - if (item) { + if (item) *val = item->v.val; - return 0; - } - return -1; + else + *val = 0; + + return DX_ERROR_NONE; } -int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val) +dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val) { - if (!h->is_const) - return -3; + assert(h->is_const); hash_item_t *item = hash_internal_retrieve(h, key); - if (item) { + if (item) *val = item->v.val_const; - return 0; - } - return -1; + else + *val = 0; + + return DX_ERROR_NONE; } -int hash_remove(hash_t *h, dx_field_iterator_t *key) +dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); @@ -215,9 +219,9 @@ int hash_remove(hash_t *h, dx_field_iterator_t *key) DEQ_REMOVE(h->buckets[idx].items, item); free_hash_item_t(item); h->size--; - return 0; + return DX_ERROR_NONE; } - return -1; + return DX_ERROR_NOT_FOUND; } diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c index 6ab67f948d..92a7a1f479 100644 --- a/qpid/extras/dispatch/src/iterator.c +++ b/qpid/extras/dispatch/src/iterator.c @@ -25,19 +25,25 @@ #include <string.h> typedef enum { -MODE_TO_END, -MODE_TO_SLASH + MODE_TO_END, + MODE_TO_SLASH } parse_mode_t; +typedef struct { + dx_buffer_t *buffer; + unsigned char *cursor; + int length; +} pointer_t; + struct dx_field_iterator_t { - dx_buffer_t *start_buffer; - unsigned char *start_cursor; - int start_length; - dx_buffer_t *buffer; - unsigned char *cursor; - int length; + pointer_t start_pointer; + pointer_t view_start_pointer; + pointer_t pointer; dx_iterator_view_t view; parse_mode_t mode; + unsigned char prefix; + int at_prefix; + int view_prefix; }; @@ -46,28 +52,86 @@ ALLOC_DEFINE(dx_field_iterator_t); typedef enum { -STATE_START, -STATE_SLASH_LEFT, -STATE_SKIPPING_TO_NEXT_SLASH, -STATE_SCANNING, -STATE_COLON, -STATE_COLON_SLASH, -STATE_AT_NODE_ID + STATE_START, + STATE_SLASH_LEFT, + STATE_SKIPPING_TO_NEXT_SLASH, + STATE_SCANNING, + STATE_COLON, + STATE_COLON_SLASH, + STATE_AT_NODE_ID } state_t; +static char *my_area = ""; +static char *my_router = ""; + + +static void parse_address_view(dx_field_iterator_t *iter) +{ + // + // This function starts with an iterator view that is identical to + // ITER_VIEW_NO_HOST. We will now further refine the view in order + // to aid the router in looking up addresses. + // + + if (dx_field_iterator_prefix(iter, "_")) { + if (dx_field_iterator_prefix(iter, "local/")) { + iter->prefix = 'L'; + iter->at_prefix = 1; + iter->view_prefix = 1; + return; + } + + if (dx_field_iterator_prefix(iter, "topo/")) { + if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_area)) { + if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_router)) { + iter->prefix = 'L'; + iter->at_prefix = 1; + iter->view_prefix = 1; + return; + } + + iter->prefix = 'R'; + iter->at_prefix = 1; + iter->view_prefix = 1; + iter->mode = MODE_TO_SLASH; + return; + } + + iter->prefix = 'A'; + iter->at_prefix = 1; + iter->view_prefix = 1; + iter->mode = MODE_TO_SLASH; + return; + } + } + + iter->prefix = 'M'; + iter->at_prefix = 1; + iter->view_prefix = 1; +} + + static void view_initialize(dx_field_iterator_t *iter) { - if (iter->view == ITER_VIEW_ALL) { - iter->mode = MODE_TO_END; + // + // The default behavior is for the view to *not* have a prefix. + // We'll add one if it's needed later. + // + iter->at_prefix = 0; + iter->view_prefix = 0; + iter->mode = MODE_TO_END; + + if (iter->view == ITER_VIEW_ALL) return; - } // // Advance to the node-id. // - state_t state = STATE_START; - unsigned int octet; + state_t state = STATE_START; + unsigned int octet; + pointer_t save_pointer = {0,0,0}; + while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) { octet = dx_field_iterator_octet(iter); switch (state) { @@ -96,17 +160,20 @@ static void view_initialize(dx_field_iterator_t *iter) break; case STATE_COLON : - if (octet == '/') + if (octet == '/') { state = STATE_COLON_SLASH; - else + save_pointer = iter->pointer; + } else state = STATE_SCANNING; break; case STATE_COLON_SLASH : if (octet == '/') state = STATE_SKIPPING_TO_NEXT_SLASH; - else - state = STATE_SCANNING; + else { + state = STATE_AT_NODE_ID; + iter->pointer = save_pointer; + } break; case STATE_AT_NODE_ID : @@ -119,9 +186,7 @@ static void view_initialize(dx_field_iterator_t *iter) // The address string was relative, not absolute. The node-id // is at the beginning of the string. // - iter->buffer = iter->start_buffer; - iter->cursor = iter->start_cursor; - iter->length = iter->start_length; + iter->pointer = iter->start_pointer; } // @@ -137,6 +202,12 @@ static void view_initialize(dx_field_iterator_t *iter) return; } + if (iter->view == ITER_VIEW_ADDRESS_HASH) { + iter->mode = MODE_TO_END; + parse_address_view(iter); + return; + } + if (iter->view == ITER_VIEW_NODE_SPECIFIC) { iter->mode = MODE_TO_END; while (!dx_field_iterator_end(iter)) { @@ -149,17 +220,29 @@ static void view_initialize(dx_field_iterator_t *iter) } +void dx_field_iterator_set_address(const char *area, const char *router) +{ + my_area = (char*) malloc(strlen(area) + 2); + strcpy(my_area, area); + strcat(my_area, "/"); + + my_router = (char*) malloc(strlen(router) + 2); + strcpy(my_router, router); + strcat(my_router, "/"); +} + + dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view) { dx_field_iterator_t *iter = new_dx_field_iterator_t(); if (!iter) return 0; - iter->start_buffer = 0; - iter->start_cursor = (unsigned char*) text; - iter->start_length = strlen(text); + iter->start_pointer.buffer = 0; + iter->start_pointer.cursor = (unsigned char*) text; + iter->start_pointer.length = strlen(text); - dx_field_iterator_reset(iter, view); + dx_field_iterator_reset_view(iter, view); return iter; } @@ -171,11 +254,11 @@ dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, i if (!iter) return 0; - iter->start_buffer = buffer; - iter->start_cursor = dx_buffer_base(buffer) + offset; - iter->start_length = length; + iter->start_pointer.buffer = buffer; + iter->start_pointer.cursor = dx_buffer_base(buffer) + offset; + iter->start_pointer.length = length; - dx_field_iterator_reset(iter, view); + dx_field_iterator_reset_view(iter, view); return iter; } @@ -187,40 +270,52 @@ void dx_field_iterator_free(dx_field_iterator_t *iter) } -void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view) +void dx_field_iterator_reset(dx_field_iterator_t *iter) +{ + iter->pointer = iter->view_start_pointer; + iter->at_prefix = iter->view_prefix; +} + + +void dx_field_iterator_reset_view(dx_field_iterator_t *iter, dx_iterator_view_t view) { - iter->buffer = iter->start_buffer; - iter->cursor = iter->start_cursor; - iter->length = iter->start_length; - iter->view = view; + iter->pointer = iter->start_pointer; + iter->view = view; view_initialize(iter); + + iter->view_start_pointer = iter->pointer; } unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) { - if (iter->length == 0) + if (iter->at_prefix) { + iter->at_prefix = 0; + return iter->prefix; + } + + if (iter->pointer.length == 0) return (unsigned char) 0; - unsigned char result = *(iter->cursor); + unsigned char result = *(iter->pointer.cursor); - iter->cursor++; - iter->length--; + iter->pointer.cursor++; + iter->pointer.length--; - if (iter->length > 0) { - if (iter->buffer) { - if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) { - iter->buffer = iter->buffer->next; - if (iter->buffer == 0) - iter->length = 0; - iter->cursor = dx_buffer_base(iter->buffer); + if (iter->pointer.length > 0) { + if (iter->pointer.buffer) { + if (iter->pointer.cursor - dx_buffer_base(iter->pointer.buffer) == dx_buffer_size(iter->pointer.buffer)) { + iter->pointer.buffer = iter->pointer.buffer->next; + if (iter->pointer.buffer == 0) + iter->pointer.length = 0; + iter->pointer.cursor = dx_buffer_base(iter->pointer.buffer); } } } - if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/') - iter->length = 0; + if (iter->pointer.length && iter->mode == MODE_TO_SLASH && *(iter->pointer.cursor) == '/') + iter->pointer.length = 0; return result; } @@ -228,13 +323,13 @@ unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) int dx_field_iterator_end(dx_field_iterator_t *iter) { - return iter->length == 0; + return iter->pointer.length == 0; } -int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string) +int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string) { - dx_field_iterator_reset(iter, iter->view); + dx_field_iterator_reset(iter); while (!dx_field_iterator_end(iter) && *string) { if (*string != dx_field_iterator_octet(iter)) return 0; @@ -245,19 +340,39 @@ int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string) } +int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix) +{ + pointer_t save_pointer = iter->pointer; + unsigned char *c = (unsigned char*) prefix; + + while(*c) { + if (*c != dx_field_iterator_octet(iter)) + break; + c++; + } + + if (*c) { + iter->pointer = save_pointer; + return 0; + } + + return 1; +} + + unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) { int length = 0; int idx = 0; unsigned char *copy; - dx_field_iterator_reset(iter, iter->view); + dx_field_iterator_reset(iter); while (!dx_field_iterator_end(iter)) { dx_field_iterator_octet(iter); length++; } - dx_field_iterator_reset(iter, iter->view); + dx_field_iterator_reset(iter); copy = (unsigned char*) malloc(length + 1); while (!dx_field_iterator_end(iter)) copy[idx++] = dx_field_iterator_octet(iter); diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c index d4ec534915..c6cffe0321 100644 --- a/qpid/extras/dispatch/src/log.c +++ b/qpid/extras/dispatch/src/log.c @@ -18,11 +18,36 @@ */ #include <qpid/dispatch/log.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> #include <stdarg.h> #include <stdio.h> #include <string.h> +#include <sys/time.h> -static int mask=LOG_INFO; +#define TEXT_MAX 512 +#define LIST_MAX 1000 + +typedef struct dx_log_entry_t dx_log_entry_t; + +struct dx_log_entry_t { + DEQ_LINKS(dx_log_entry_t); + const char *module; + int cls; + const char *file; + int line; + struct timeval tv; + char text[TEXT_MAX]; +}; + +ALLOC_DECLARE(dx_log_entry_t); +ALLOC_DEFINE(dx_log_entry_t); + +DEQ_DECLARE(dx_log_entry_t, dx_log_list_t); + +static int mask = LOG_INFO; +static dx_log_list_t entries; +static int list_init = 0; static char *cls_prefix(int cls) { @@ -35,18 +60,36 @@ static char *cls_prefix(int cls) return ""; } -void dx_log(const char *module, int cls, const char *fmt, ...) +void dx_log_impl(const char *module, int cls, const char *file, int line, const char *fmt, ...) { if (!(cls & mask)) return; + if (list_init == 0) { + list_init = 1; + DEQ_INIT(entries); + } + + dx_log_entry_t *entry = new_dx_log_entry_t(); + entry->module = module; + entry->cls = cls; + entry->file = file; + entry->line = line; + gettimeofday(&entry->tv, 0); + va_list ap; - char line[128]; va_start(ap, fmt); - vsnprintf(line, 127, fmt, ap); + vsnprintf(entry->text, TEXT_MAX, fmt, ap); va_end(ap); - fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line); + fprintf(stderr, "%s (%s) %s\n", module, cls_prefix(cls), entry->text); + + DEQ_INSERT_TAIL(entries, entry); + if (DEQ_SIZE(entries) > LIST_MAX) { + entry = DEQ_HEAD(entries); + DEQ_REMOVE_HEAD(entries); + free_dx_log_entry_t(entry); + } } void dx_log_set_mask(int _mask) diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c index c914c5ca7b..fd08753f4d 100644 --- a/qpid/extras/dispatch/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -1081,19 +1081,19 @@ void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t le } -void dx_message_insert_string(dx_message_t *msg, const char *start) +void dx_message_insert_string(dx_message_t *msg, const char *str) { dx_message_content_t *content = MSG_CONTENT(msg); - uint32_t len = strlen(start); + uint32_t len = strlen(str); if (len < 256) { dx_insert_8(content, 0xa1); // str8-utf8 dx_insert_8(content, (uint8_t) len); - dx_insert(content, (const uint8_t*) start, len); + dx_insert(content, (const uint8_t*) str, len); } else { dx_insert_8(content, 0xb1); // str32-utf8 dx_insert_32(content, len); - dx_insert(content, (const uint8_t*) start, len); + dx_insert(content, (const uint8_t*) str, len); } content->count++; } diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 0513b08a6b..65756be215 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -18,13 +18,34 @@ */ #include <stdio.h> +#include <string.h> #include <qpid/dispatch.h> #include "dispatch_private.h" -static char *module="ROUTER_NODE"; +static char *module = "ROUTER"; + +//static char *local_prefix = "_local/"; +//static char *topo_prefix = "_topo/"; + +/** + * Address Types and Processing: + * + * Address Hash Compare onReceive onEmit + * ============================================================================= + * _local/<local> L<local> handler forward + * _topo/<area>/<router>/<local> A<area> forward forward + * _topo/<my-area>/<router>/<local> R<router> forward forward + * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward + * _topo/<area>/all/<local> A<area> forward forward + * _topo/<my-area>/all/<local> L<local> forward+handler forward + * _topo/all/all/<local> L<local> forward+handler forward + * <mobile> M<mobile> forward+handler forward + */ struct dx_router_t { dx_dispatch_t *dx; + const char *router_area; + const char *router_id; dx_node_t *node; dx_link_list_t in_links; dx_link_list_t out_links; @@ -41,11 +62,32 @@ typedef struct { dx_message_list_t out_fifo; } dx_router_link_t; - ALLOC_DECLARE(dx_router_link_t); ALLOC_DEFINE(dx_router_link_t); +typedef struct { + const char *id; + dx_router_link_t *next_hop; + // list of valid origins (pointers to router_node) - (bit masks?) +} dx_router_node_t; + +ALLOC_DECLARE(dx_router_node_t); +ALLOC_DEFINE(dx_router_node_t); + + +struct dx_address_t { + int is_local; + dx_router_message_cb handler; // In-Process Consumer + void *handler_context; + dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list + dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list +}; + +ALLOC_DECLARE(dx_address_t); +ALLOC_DEFINE(dx_address_t); + + /** * Outbound Delivery Handler */ @@ -119,22 +161,42 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del if (valid_message) { dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); - dx_router_link_t *rlink; + dx_address_t *addr; if (iter) { - dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); sys_mutex_lock(router->lock); - int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); + hash_retrieve(router->out_hash, iter, (void*) &addr); dx_field_iterator_free(iter); - if (result == 0) { + if (addr) { + // + // To field is valid and contains a known destination. Handle the various + // cases for forwarding. + // + // Forward to the in-process handler for this message if there is one. + // + if (addr->handler) + addr->handler(addr->handler_context, msg); + // - // To field is valid and contains a known destination. Enqueue on - // the output fifo for the next-hop-to-destination. + // Forward to the local link for the locally-connected consumer, if present. + // TODO - Don't forward if this is a "_local" address. // - pn_link_t* pn_outlink = dx_link_pn(rlink->link); - DEQ_INSERT_TAIL(rlink->out_fifo, msg); - pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo)); - dx_link_activate(rlink->link); + if (addr->rlink) { + pn_link_t* pn_outlink = dx_link_pn(addr->rlink->link); + DEQ_INSERT_TAIL(addr->rlink->out_fifo, msg); + pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); + dx_link_activate(addr->rlink->link); + } + + // + // Forward to the next-hop for a remotely-connected consumer, if present. + // Don't forward if this is a "_local" address. + // + if (addr->rnode) { + // TODO + } + } else { // // To field contains an unknown address. Release the message. @@ -242,17 +304,30 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) pn_link_t *pn_link = dx_link_pn(link); const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); - sys_mutex_lock(router->lock); dx_router_link_t *rlink = new_dx_router_link_t(); rlink->link = link; DEQ_INIT(rlink->out_fifo); dx_link_set_context(link, rlink); - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - int result = hash_insert(router->out_hash, iter, rlink); + dx_address_t *addr; + + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); + + sys_mutex_lock(router->lock); + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + addr->is_local = 0; + addr->handler = 0; + addr->handler_context = 0; + addr->rlink = 0; + addr->rnode = 0; + hash_insert(router->out_hash, iter, addr); + } dx_field_iterator_free(iter); - if (result == 0) { + if (addr->rlink == 0) { + addr->rlink = rlink; pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); pn_link_open(pn_link); @@ -314,14 +389,14 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed if (pn_link_is_sender(pn_link)) { item = DEQ_HEAD(router->out_links); - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - dx_router_link_t *rlink; + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); + dx_address_t *addr; if (iter) { - int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); - if (result == 0) { - dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (addr) { hash_remove(router->out_hash, iter); - free_dx_router_link_t(rlink); + free_dx_router_link_t(addr->rlink); + free_dx_address_t(addr); dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); } dx_field_iterator_free(iter); @@ -383,7 +458,7 @@ static dx_node_type_t router_node = {"router", 0, 0, static int type_registered = 0; -dx_router_t *dx_router(dx_dispatch_t *dx) +dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) { if (!type_registered) { type_registered = 1; @@ -397,8 +472,10 @@ dx_router_t *dx_router(dx_dispatch_t *dx) DEQ_INIT(router->out_links); DEQ_INIT(router->in_fifo); - router->dx = dx; - router->lock = sys_mutex(); + router->dx = dx; + router->lock = sys_mutex(); + router->router_area = area; + router->router_id = id; router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); dx_timer_schedule(router->timer, 0); // Immediate @@ -406,10 +483,22 @@ dx_router_t *dx_router(dx_dispatch_t *dx) router->out_hash = hash(10, 32, 0); router->dtag = 1; + // + // Inform the field iterator module of this router's id and area. The field iterator + // uses this to offload some of the address-processing load from the router. + // + dx_field_iterator_set_address(area, id); + return router; } +void dx_router_setup_agent(dx_dispatch_t *dx) +{ + // TODO +} + + void dx_router_free(dx_router_t *router) { dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH); @@ -417,3 +506,54 @@ void dx_router_free(dx_router_t *router) free(router); } + +dx_address_t *dx_router_register_address(dx_dispatch_t *dx, + bool is_local, + const char *address, + dx_router_message_cb handler, + void *context) +{ + char addr[1000]; + dx_address_t *ad = new_dx_address_t(); + dx_field_iterator_t *iter; + int result; + + if (!ad) + return 0; + + ad->is_local = is_local; + ad->handler = handler; + ad->handler_context = context; + ad->rlink = 0; + + if (ad->is_local) + strcpy(addr, "L"); // Local Hash-Key Space + else + strcpy(addr, "M"); // Mobile Hash-Key Space + + strcat(addr, address); + iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST); + result = hash_insert(dx->router->out_hash, iter, ad); + dx_field_iterator_free(iter); + if (result != 0) { + free_dx_address_t(ad); + return 0; + } + + dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); + return ad; +} + + +void dx_router_unregister_address(dx_address_t *ad) +{ + free_dx_address_t(ad); +} + + +void dx_router_send(dx_dispatch_t *dx, + const char *address, + dx_message_t *msg) +{ +} + diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c index a2d2d4980a..536af048d8 100644 --- a/qpid/extras/dispatch/src/server.c +++ b/qpid/extras/dispatch/src/server.c @@ -44,6 +44,7 @@ typedef struct dx_thread_t { struct dx_server_t { int thread_count; + const char *container_name; pn_driver_t *driver; dx_thread_start_cb_t start_handler; dx_conn_handler_cb_t conn_handler; @@ -201,7 +202,7 @@ static void process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) ctx->state = CONN_STATE_OPERATIONAL; pn_connection_t *conn = pn_connection(); - pn_connection_set_container(conn, "dispatch"); // TODO - make unique + pn_connection_set_container(conn, dx_server->container_name); pn_connector_set_connection(cxtr, conn); pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; @@ -557,7 +558,7 @@ static void cxtr_try_open(void *context) } -dx_server_t *dx_server(int thread_count) +dx_server_t *dx_server(int thread_count, const char *container_name) { int i; @@ -566,6 +567,7 @@ dx_server_t *dx_server(int thread_count) return 0; dx_server->thread_count = thread_count; + dx_server->container_name = container_name; dx_server->driver = pn_driver(); dx_server->start_handler = 0; dx_server->conn_handler = 0; @@ -596,6 +598,12 @@ dx_server_t *dx_server(int thread_count) } +void dx_server_setup_agent(dx_dispatch_t *dx) +{ + // TODO +} + + void dx_server_free(dx_server_t *dx_server) { int i; |
