diff options
| author | Ted Ross <tross@apache.org> | 2013-10-24 18:01:00 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-10-24 18:01:00 +0000 |
| commit | 61f7da33c6efd0cea9e3ccb9653edd41f6dadcb8 (patch) | |
| tree | d264b5378b40b95b14c504a1429b89ac49ecc087 /qpid/extras/dispatch/src | |
| parent | 94d8a5c36b058b76d3e61db7d2028e395b0f2b44 (diff) | |
| download | qpid-python-61f7da33c6efd0cea9e3ccb9653edd41f6dadcb8.tar.gz | |
QPID-5257 - Removed dispatch code from its old location
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1535460 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
33 files changed, 0 insertions, 10169 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c deleted file mode 100644 index 784c333ec4..0000000000 --- a/qpid/extras/dispatch/src/agent.c +++ /dev/null @@ -1,525 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "dispatch_private.h" -#include <qpid/dispatch/error.h> -#include <qpid/dispatch/agent.h> -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/hash.h> -#include <qpid/dispatch/container.h> -#include <qpid/dispatch/message.h> -#include <qpid/dispatch/threading.h> -#include <qpid/dispatch/timer.h> -#include <qpid/dispatch/router.h> -#include <qpid/dispatch/log.h> -#include <qpid/dispatch/compose.h> -#include <qpid/dispatch/parse.h> -#include <qpid/dispatch/amqp.h> -#include <string.h> -#include <stdio.h> - -struct dx_agent_class_t { - DEQ_LINKS(dx_agent_class_t); - dx_hash_handle_t *hash_handle; - void *context; - dx_agent_schema_cb_t schema_handler; - dx_agent_query_cb_t query_handler; // 0 iff class is an event. -}; - -DEQ_DECLARE(dx_agent_class_t, dx_agent_class_list_t); - - -struct dx_agent_t { - dx_dispatch_t *dx; - dx_hash_t *class_hash; - dx_agent_class_list_t class_list; - dx_message_list_t in_fifo; - dx_message_list_t out_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - dx_address_t *address; - dx_agent_class_t *container_class; -}; - - -typedef struct { - dx_agent_t *agent; - dx_composed_field_t *response; -} dx_agent_request_t; - - -static char *log_module = "AGENT"; - - -static dx_composed_field_t *dx_agent_setup_response(dx_field_iterator_t *reply_to) -{ - // - // Compose the header - // - dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0); - dx_compose_start_list(field); - dx_compose_insert_bool(field, 0); // durable - dx_compose_end_list(field); - - // - // Compose the Properties - // - field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field); - dx_compose_start_list(field); - dx_compose_insert_null(field); // message-id - dx_compose_insert_null(field); // user-id - dx_compose_insert_string_iterator(field, reply_to); // to - dx_compose_insert_null(field); // subject - dx_compose_insert_null(field); // reply-to - dx_compose_insert_string(field, "1"); // correlation-id // TODO - fix - dx_compose_end_list(field); - - // - // Compose the Application Properties - // - field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field); - dx_compose_start_map(field); - dx_compose_insert_string(field, "status-code"); - dx_compose_insert_uint(field, 200); - - dx_compose_insert_string(field, "status-descriptor"); - dx_compose_insert_string(field, "OK"); - dx_compose_end_map(field); - - return field; -} - - -static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) -{ - dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type"); - if (cls == 0) - return; - - dx_field_iterator_t *cls_string = dx_parse_raw(cls); - const dx_agent_class_t *cls_record; - dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record); - if (cls_record == 0) - return; - - dx_log(log_module, LOG_TRACE, "Received GET request for type: %s", dx_hash_key_by_handle(cls_record->hash_handle)); - - dx_composed_field_t *field = dx_agent_setup_response(reply_to); - - // - // Open the Body (AMQP Value) to be filled in by the handler. - // - field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); - dx_compose_start_list(field); - dx_compose_start_map(field); - - // - // The request record is allocated locally because the entire processing of the request - // will be done synchronously. - // - dx_agent_request_t request; - request.agent = agent; - request.response = field; - - cls_record->query_handler(cls_record->context, 0, &request); - - // - // The response is complete, close the list. - // - dx_compose_end_list(field); - - // - // Create a message and send it. - // - dx_message_t *msg = dx_message(); - dx_message_compose_2(msg, field); - dx_router_send(agent->dx, reply_to, msg); - - dx_message_free(msg); - dx_compose_free(field); -} - - -static void dx_agent_process_discover_types(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) -{ - dx_log(log_module, LOG_TRACE, "Received DISCOVER-TYPES request"); - - dx_composed_field_t *field = dx_agent_setup_response(reply_to); - - // - // Open the Body (AMQP Value) to be filled in by the handler. - // - field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); - dx_compose_start_map(field); - - // - // Put entries into the map for each known entity type - // - sys_mutex_lock(agent->lock); - dx_agent_class_t *cls = DEQ_HEAD(agent->class_list); - while (cls) { - dx_compose_insert_string(field, (const char*) dx_hash_key_by_handle(cls->hash_handle)); - dx_compose_insert_null(field); // TODO - https://tools.oasis-open.org/issues/browse/AMQP-87 - cls = DEQ_NEXT(cls); - } - sys_mutex_unlock(agent->lock); - dx_compose_end_map(field); - - // - // Create a message and send it. - // - dx_message_t *msg = dx_message(); - dx_message_compose_2(msg, field); - dx_router_send(agent->dx, reply_to, msg); - - dx_message_free(msg); - dx_compose_free(field); -} - - -static void dx_agent_process_discover_operations(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) -{ - dx_log(log_module, LOG_TRACE, "Received DISCOVER-OPERATIONS request"); - - dx_composed_field_t *field = dx_agent_setup_response(reply_to); - - // - // Open the Body (AMQP Value) to be filled in by the handler. - // - field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); - dx_compose_start_map(field); - - // - // Put entries into the map for each known entity type - // - sys_mutex_lock(agent->lock); - dx_agent_class_t *cls = DEQ_HEAD(agent->class_list); - while (cls) { - dx_compose_insert_string(field, (const char*) dx_hash_key_by_handle(cls->hash_handle)); - dx_compose_start_list(field); - dx_compose_insert_string(field, "READ"); - dx_compose_end_list(field); - cls = DEQ_NEXT(cls); - } - sys_mutex_unlock(agent->lock); - dx_compose_end_map(field); - - // - // Create a message and send it. - // - dx_message_t *msg = dx_message(); - dx_message_compose_2(msg, field); - dx_router_send(agent->dx, reply_to, msg); - - dx_message_free(msg); - dx_compose_free(field); -} - - -static void dx_agent_process_discover_nodes(dx_agent_t *agent, dx_parsed_field_t *map, dx_field_iterator_t *reply_to) -{ - dx_log(log_module, LOG_TRACE, "Received DISCOVER-MGMT-NODES request"); - - dx_composed_field_t *field = dx_agent_setup_response(reply_to); - - // - // Open the Body (AMQP Value) to be filled in by the handler. - // - field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); - - // - // Put entries into the list for each known management node - // - dx_compose_start_list(field); - dx_compose_insert_string(field, "amqp:/_local/$management"); - dx_router_build_node_list(agent->dx, field); - dx_compose_end_list(field); - - // - // Create a message and send it. - // - dx_message_t *msg = dx_message(); - dx_message_compose_2(msg, field); - dx_router_send(agent->dx, reply_to, msg); - - dx_message_free(msg); - dx_compose_free(field); -} - - -static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg) -{ - // - // Parse the message through the body and exit if the message is not well formed. - // - if (!dx_message_check(msg, DX_DEPTH_BODY)) - return; - - // - // Get an iterator for the application-properties. Exit if the message has none. - // - dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES); - if (ap == 0) - return; - - // - // Get an iterator for the reply-to. Exit if not found. - // - dx_field_iterator_t *reply_to = dx_message_field_iterator(msg, DX_FIELD_REPLY_TO); - if (reply_to == 0) - return; - - // - // Try to get a map-view of the application-properties. - // - dx_parsed_field_t *map = dx_parse(ap); - if (map == 0) { - dx_field_iterator_free(ap); - return; - } - - // - // Exit if there was a parsing error. - // - if (!dx_parse_ok(map)) { - dx_log(log_module, LOG_TRACE, "Received unparsable App Properties: %s", dx_parse_error(map)); - dx_field_iterator_free(ap); - dx_parse_free(map); - return; - } - - // - // Exit if it is not a map. - // - if (!dx_parse_is_map(map)) { - dx_field_iterator_free(ap); - dx_parse_free(map); - return; - } - - // - // Get an iterator for the "operation" field in the map. Exit if the key is not found. - // - dx_parsed_field_t *operation = dx_parse_value_by_key(map, "operation"); - if (operation == 0) { - dx_parse_free(map); - dx_field_iterator_free(ap); - return; - } - - // - // Dispatch the operation to the appropriate handler - // - dx_field_iterator_t *operation_string = dx_parse_raw(operation); - if (dx_field_iterator_equal(operation_string, (unsigned char*) "GET")) - dx_agent_process_get(agent, map, reply_to); - if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-TYPES")) - dx_agent_process_discover_types(agent, map, reply_to); - if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-OPERATIONS")) - dx_agent_process_discover_operations(agent, map, reply_to); - if (dx_field_iterator_equal(operation_string, (unsigned char*) "DISCOVER-MGMT-NODES")) - dx_agent_process_discover_nodes(agent, map, reply_to); - - dx_parse_free(map); - dx_field_iterator_free(ap); - dx_field_iterator_free(reply_to); -} - - -static void dx_agent_deferred_handler(void *context) -{ - dx_agent_t *agent = (dx_agent_t*) context; - dx_message_t *msg; - - do { - sys_mutex_lock(agent->lock); - msg = DEQ_HEAD(agent->in_fifo); - if (msg) - DEQ_REMOVE_HEAD(agent->in_fifo); - sys_mutex_unlock(agent->lock); - - if (msg) { - dx_agent_process_request(agent, msg); - dx_message_free(msg); - } - } while (msg); -} - - -static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id) -{ - dx_agent_t *agent = (dx_agent_t*) context; - dx_message_t *copy = dx_message_copy(msg); - - sys_mutex_lock(agent->lock); - DEQ_INSERT_TAIL(agent->in_fifo, copy); - sys_mutex_unlock(agent->lock); - - dx_timer_schedule(agent->timer, 0); -} - - -static dx_agent_class_t *dx_agent_register_class_LH(dx_agent_t *agent, - const char *fqname, - void *context, - dx_agent_schema_cb_t schema_handler, - dx_agent_query_cb_t query_handler) -{ - dx_agent_class_t *cls = NEW(dx_agent_class_t); - assert(cls); - DEQ_ITEM_INIT(cls); - cls->context = context; - cls->schema_handler = schema_handler; - cls->query_handler = query_handler; - - dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); - int result = dx_hash_insert_const(agent->class_hash, iter, cls, &cls->hash_handle); - dx_field_iterator_free(iter); - if (result < 0) - assert(false); - - DEQ_INSERT_TAIL(agent->class_list, cls); - - dx_log(log_module, LOG_INFO, "Manageable Entity Type (%s) %s", query_handler ? "object" : "event", fqname); - return cls; -} - - -dx_agent_t *dx_agent(dx_dispatch_t *dx) -{ - dx_agent_t *agent = NEW(dx_agent_t); - agent->dx = dx; - agent->class_hash = dx_hash(6, 10, 1); - DEQ_INIT(agent->class_list); - DEQ_INIT(agent->in_fifo); - DEQ_INIT(agent->out_fifo); - agent->lock = sys_mutex(); - agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent); - agent->address = dx_router_register_address(dx, "$management", dx_agent_rx_handler, agent); - - return agent; -} - - -void dx_agent_free(dx_agent_t *agent) -{ - dx_router_unregister_address(agent->address); - sys_mutex_free(agent->lock); - dx_timer_free(agent->timer); - dx_hash_free(agent->class_hash); - free(agent); -} - - -dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx, - const char *fqname, - void *context, - dx_agent_schema_cb_t schema_handler, - dx_agent_query_cb_t query_handler) -{ - dx_agent_t *agent = dx->agent; - dx_agent_class_t *cls; - - sys_mutex_lock(agent->lock); - cls = dx_agent_register_class_LH(agent, fqname, context, schema_handler, query_handler); - sys_mutex_unlock(agent->lock); - return cls; -} - - -dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx, - const char *fqname, - void *context, - dx_agent_schema_cb_t schema_handler) -{ - return dx_agent_register_class(dx, fqname, context, schema_handler, 0); -} - - -void dx_agent_value_string(void *correlator, const char *key, const char *value) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_insert_string(request->response, key); - dx_compose_insert_string(request->response, value); -} - - -void dx_agent_value_uint(void *correlator, const char *key, uint64_t value) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_insert_string(request->response, key); - dx_compose_insert_uint(request->response, value); -} - - -void dx_agent_value_null(void *correlator, const char *key) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_insert_string(request->response, key); - dx_compose_insert_null(request->response); -} - - -void dx_agent_value_boolean(void *correlator, const char *key, bool value) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_insert_string(request->response, key); - dx_compose_insert_bool(request->response, value); -} - - -void dx_agent_value_binary(void *correlator, const char *key, const uint8_t *value, size_t len) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_insert_string(request->response, key); - dx_compose_insert_binary(request->response, value, len); -} - - -void dx_agent_value_uuid(void *correlator, const char *key, const uint8_t *value) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_insert_string(request->response, key); - dx_compose_insert_uuid(request->response, value); -} - - -void dx_agent_value_timestamp(void *correlator, const char *key, uint64_t value) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_insert_string(request->response, key); - dx_compose_insert_timestamp(request->response, value); -} - - -void dx_agent_value_complete(void *correlator, bool more) -{ - dx_agent_request_t *request = (dx_agent_request_t*) correlator; - dx_compose_end_map(request->response); - if (more) - dx_compose_start_map(request->response); -} - - -void *dx_agent_raise_event(dx_dispatch_t *dx, dx_agent_class_t *event) -{ - return 0; -} - diff --git a/qpid/extras/dispatch/src/alloc.c b/qpid/extras/dispatch/src/alloc.c deleted file mode 100644 index bf10f03633..0000000000 --- a/qpid/extras/dispatch/src/alloc.c +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/log.h> -#include <qpid/dispatch/agent.h> -#include <memory.h> -#include <stdio.h> - -typedef struct dx_alloc_type_t dx_alloc_type_t; -typedef struct dx_alloc_item_t dx_alloc_item_t; - -struct dx_alloc_type_t { - DEQ_LINKS(dx_alloc_type_t); - dx_alloc_type_desc_t *desc; -}; - -DEQ_DECLARE(dx_alloc_type_t, dx_alloc_type_list_t); - - -struct dx_alloc_item_t { - DEQ_LINKS(dx_alloc_item_t); -}; - -DEQ_DECLARE(dx_alloc_item_t, dx_alloc_item_list_t); - - -struct dx_alloc_pool_t { - dx_alloc_item_list_t free_list; -}; - -dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0}; -dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0}; -#define BIG_THRESHOLD 256 - -static sys_mutex_t *init_lock; -static dx_alloc_type_list_t type_list; - -static void dx_alloc_init(dx_alloc_type_desc_t *desc) -{ - sys_mutex_lock(init_lock); - - desc->total_size = desc->type_size; - if (desc->additional_size) - desc->total_size += *desc->additional_size; - - //dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d", - // desc->type_name, desc->type_size, desc->total_size); - - if (!desc->global_pool) { - if (desc->config == 0) - desc->config = desc->total_size > BIG_THRESHOLD ? - &dx_alloc_default_config_big : &dx_alloc_default_config_small; - - assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size); - - desc->global_pool = NEW(dx_alloc_pool_t); - DEQ_INIT(desc->global_pool->free_list); - desc->lock = sys_mutex(); - desc->stats = NEW(dx_alloc_stats_t); - memset(desc->stats, 0, sizeof(dx_alloc_stats_t)); - - dx_alloc_type_t *type_item = NEW(dx_alloc_type_t); - DEQ_ITEM_INIT(type_item); - type_item->desc = desc; - DEQ_INSERT_TAIL(type_list, type_item); - } - - sys_mutex_unlock(init_lock); -} - - -void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool) -{ - int idx; - - // - // If the descriptor is not initialized, set it up now. - // - if (!desc->global_pool) - dx_alloc_init(desc); - - // - // If this is the thread's first pass through here, allocate the - // thread-local pool for this type. - // - if (*tpool == 0) { - *tpool = NEW(dx_alloc_pool_t); - DEQ_INIT((*tpool)->free_list); - } - - dx_alloc_pool_t *pool = *tpool; - - // - // Fast case: If there's an item on the local free list, take it off the - // list and return it. Since everything we've touched is thread-local, - // there is no need to acquire a lock. - // - dx_alloc_item_t *item = DEQ_HEAD(pool->free_list); - if (item) { - DEQ_REMOVE_HEAD(pool->free_list); - return &item[1]; - } - - // - // The local free list is empty, we need to either rebalance a batch - // of items from the global list or go to the heap to get new memory. - // - sys_mutex_lock(desc->lock); - if (DEQ_SIZE(desc->global_pool->free_list) >= desc->config->transfer_batch_size) { - // - // Rebalance a full batch from the global free list to the thread list. - // - desc->stats->batches_rebalanced_to_threads++; - desc->stats->held_by_threads += desc->config->transfer_batch_size; - for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { - item = DEQ_HEAD(desc->global_pool->free_list); - DEQ_REMOVE_HEAD(desc->global_pool->free_list); - DEQ_INSERT_TAIL(pool->free_list, item); - } - } else { - // - // Allocate a full batch from the heap and put it on the thread list. - // - for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { - item = (dx_alloc_item_t*) malloc(sizeof(dx_alloc_item_t) + desc->total_size); - if (item == 0) - break; - DEQ_ITEM_INIT(item); - DEQ_INSERT_TAIL(pool->free_list, item); - desc->stats->held_by_threads++; - desc->stats->total_alloc_from_heap++; - } - } - sys_mutex_unlock(desc->lock); - - item = DEQ_HEAD(pool->free_list); - if (item) { - DEQ_REMOVE_HEAD(pool->free_list); - return &item[1]; - } - - return 0; -} - - -void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p) -{ - dx_alloc_item_t *item = ((dx_alloc_item_t*) p) - 1; - int idx; - - // - // If this is the thread's first pass through here, allocate the - // thread-local pool for this type. - // - if (*tpool == 0) { - *tpool = NEW(dx_alloc_pool_t); - DEQ_INIT((*tpool)->free_list); - } - - dx_alloc_pool_t *pool = *tpool; - - DEQ_INSERT_TAIL(pool->free_list, item); - - if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max) - return; - - // - // We've exceeded the maximum size of the local free list. A batch must be - // rebalanced back to the global list. - // - sys_mutex_lock(desc->lock); - desc->stats->batches_rebalanced_to_global++; - desc->stats->held_by_threads -= desc->config->transfer_batch_size; - for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { - item = DEQ_HEAD(pool->free_list); - DEQ_REMOVE_HEAD(pool->free_list); - DEQ_INSERT_TAIL(desc->global_pool->free_list, item); - } - - // - // If there's a global_free_list size limit, remove items until the limit is - // not exceeded. - // - if (desc->config->global_free_list_max != 0) { - while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) { - item = DEQ_HEAD(desc->global_pool->free_list); - DEQ_REMOVE_HEAD(desc->global_pool->free_list); - free(item); - desc->stats->total_free_to_heap++; - } - } - - sys_mutex_unlock(desc->lock); -} - - -void dx_alloc_initialize(void) -{ - init_lock = sys_mutex(); - DEQ_INIT(type_list); -} - - -static void alloc_schema_handler(void *context, void *correlator) -{ -} - - -static void alloc_query_handler(void* context, const char *id, void *cor) -{ - dx_alloc_type_t *item = DEQ_HEAD(type_list); - - while (item) { - dx_agent_value_string(cor, "name", item->desc->type_name); - dx_agent_value_uint(cor, "type_size", item->desc->total_size); - dx_agent_value_uint(cor, "transfer_batch_size", item->desc->config->transfer_batch_size); - dx_agent_value_uint(cor, "local_free_list_max", item->desc->config->local_free_list_max); - dx_agent_value_uint(cor, "global_free_list_max", item->desc->config->global_free_list_max); - dx_agent_value_uint(cor, "total_alloc_from_heap", item->desc->stats->total_alloc_from_heap); - dx_agent_value_uint(cor, "total_free_to_heap", item->desc->stats->total_free_to_heap); - dx_agent_value_uint(cor, "held_by_threads", item->desc->stats->held_by_threads); - dx_agent_value_uint(cor, "batches_rebalanced_to_threads", item->desc->stats->batches_rebalanced_to_threads); - dx_agent_value_uint(cor, "batches_rebalanced_to_global", item->desc->stats->batches_rebalanced_to_global); - - item = DEQ_NEXT(item); - dx_agent_value_complete(cor, item != 0); - } -} - - -void dx_alloc_setup_agent(dx_dispatch_t *dx) -{ - dx_agent_register_class(dx, "org.apache.qpid.dispatch.allocator", 0, alloc_schema_handler, alloc_query_handler); -} - diff --git a/qpid/extras/dispatch/src/alloc_private.h b/qpid/extras/dispatch/src/alloc_private.h deleted file mode 100644 index da773fef25..0000000000 --- a/qpid/extras/dispatch/src/alloc_private.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef __dispatch_alloc_private_h__ -#define __dispatch_alloc_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/dispatch.h> - -void dx_alloc_initialize(void); -void dx_alloc_setup_agent(dx_dispatch_t *dx); - -#endif diff --git a/qpid/extras/dispatch/src/amqp.c b/qpid/extras/dispatch/src/amqp.c deleted file mode 100644 index 6a8545b757..0000000000 --- a/qpid/extras/dispatch/src/amqp.c +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/amqp.h> - -const char * const DX_DA_INGRESS = "qdx.ingress"; -const char * const DX_DA_TRACE = "qdx.trace"; -const char * const DX_DA_TO = "qdx.to"; - -const char * const DX_CAPABILITY_ROUTER = "qdx.router"; - -const char * const DX_INTERNODE_LINK_NAME_1 = "qdx.internode.1"; -const char * const DX_INTERNODE_LINK_NAME_2 = "qdx.internode.2"; - diff --git a/qpid/extras/dispatch/src/bitmask.c b/qpid/extras/dispatch/src/bitmask.c deleted file mode 100644 index 88ba69dde1..0000000000 --- a/qpid/extras/dispatch/src/bitmask.c +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/bitmask.h> -#include <qpid/dispatch/alloc.h> -#include <assert.h> - -#define DX_BITMASK_LONGS 16 -#define DX_BITMASK_BITS (DX_BITMASK_LONGS * 64) - -struct dx_bitmask_t { - uint64_t array[DX_BITMASK_LONGS]; - int first_set; -}; - -ALLOC_DECLARE(dx_bitmask_t); -ALLOC_DEFINE(dx_bitmask_t); - -#define MASK_INDEX(num) (num / 64) -#define MASK_ONEHOT(num) (((uint64_t) 1) << (num % 64)) -#define FIRST_NONE -1 -#define FIRST_UNKNOWN -2 - - -int dx_bitmask_width() -{ - return DX_BITMASK_BITS; -} - - -dx_bitmask_t *dx_bitmask(int initial) -{ - dx_bitmask_t *b = new_dx_bitmask_t(); - if (initial) - dx_bitmask_set_all(b); - else - dx_bitmask_clear_all(b); - return b; -} - - -void dx_bitmask_free(dx_bitmask_t *b) -{ - free_dx_bitmask_t(b); -} - - -void dx_bitmask_set_all(dx_bitmask_t *b) -{ - for (int i = 0; i < DX_BITMASK_LONGS; i++) - b->array[i] = 0xFFFFFFFFFFFFFFFF; - b->first_set = 0; -} - - -void dx_bitmask_clear_all(dx_bitmask_t *b) -{ - for (int i = 0; i < DX_BITMASK_LONGS; i++) - b->array[i] = 0; - b->first_set = FIRST_NONE; -} - - -void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum) -{ - assert(bitnum < DX_BITMASK_BITS); - b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum); - if (b->first_set > bitnum || b->first_set < 0) - b->first_set = bitnum; -} - - -void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum) -{ - assert(bitnum < DX_BITMASK_BITS); - b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum)); - if (b->first_set == bitnum) - b->first_set = FIRST_UNKNOWN; -} - - -int dx_bitmask_value(dx_bitmask_t *b, int bitnum) -{ - return (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) ? 1 : 0; -} - - -int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum) -{ - if (b->first_set == FIRST_UNKNOWN) { - b->first_set = FIRST_NONE; - for (int i = 0; i < DX_BITMASK_LONGS; i++) - if (b->array[i]) { - for (int j = 0; j < 64; j++) - if ((((uint64_t) 1) << j) & b->array[i]) { - b->first_set = i * 64 + j; - break; - } - break; - } - } - - if (b->first_set == FIRST_NONE) - return 0; - *bitnum = b->first_set; - return 1; -} - diff --git a/qpid/extras/dispatch/src/buffer.c b/qpid/extras/dispatch/src/buffer.c deleted file mode 100644 index d0bbd13cb3..0000000000 --- a/qpid/extras/dispatch/src/buffer.c +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/buffer.h> -#include <qpid/dispatch/alloc.h> - -static size_t buffer_size = 512; -static int size_locked = 0; - -ALLOC_DECLARE(dx_buffer_t); -ALLOC_DEFINE_CONFIG(dx_buffer_t, sizeof(dx_buffer_t), &buffer_size, 0); - - -void dx_buffer_set_size(size_t size) -{ - assert(!size_locked); - buffer_size = size; -} - - -dx_buffer_t *dx_buffer(void) -{ - size_locked = 1; - dx_buffer_t *buf = new_dx_buffer_t(); - - DEQ_ITEM_INIT(buf); - buf->size = 0; - return buf; -} - - -void dx_buffer_free(dx_buffer_t *buf) -{ - free_dx_buffer_t(buf); -} - - -unsigned char *dx_buffer_base(dx_buffer_t *buf) -{ - return (unsigned char*) &buf[1]; -} - - -unsigned char *dx_buffer_cursor(dx_buffer_t *buf) -{ - return ((unsigned char*) &buf[1]) + buf->size; -} - - -size_t dx_buffer_capacity(dx_buffer_t *buf) -{ - return buffer_size - buf->size; -} - - -size_t dx_buffer_size(dx_buffer_t *buf) -{ - return buf->size; -} - - -void dx_buffer_insert(dx_buffer_t *buf, size_t len) -{ - buf->size += len; - assert(buf->size <= buffer_size); -} - diff --git a/qpid/extras/dispatch/src/compose.c b/qpid/extras/dispatch/src/compose.c deleted file mode 100644 index 66b2336e06..0000000000 --- a/qpid/extras/dispatch/src/compose.c +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/buffer.h> -#include <qpid/dispatch/amqp.h> -#include "compose_private.h" -#include <memory.h> - -ALLOC_DEFINE(dx_composite_t); -ALLOC_DEFINE(dx_composed_field_t); - - -static void bump_count(dx_composed_field_t *field) -{ - dx_composite_t *comp = DEQ_HEAD(field->fieldStack); - if (comp) - comp->count++; -} - - -static void dx_insert(dx_composed_field_t *field, const uint8_t *seq, size_t len) -{ - dx_buffer_t *buf = DEQ_TAIL(field->buffers); - dx_composite_t *comp = DEQ_HEAD(field->fieldStack); - - while (len > 0) { - if (buf == 0 || dx_buffer_capacity(buf) == 0) { - buf = dx_buffer(); - if (buf == 0) - return; - DEQ_INSERT_TAIL(field->buffers, buf); - } - - size_t to_copy = dx_buffer_capacity(buf); - if (to_copy > len) - to_copy = len; - memcpy(dx_buffer_cursor(buf), seq, to_copy); - dx_buffer_insert(buf, to_copy); - len -= to_copy; - seq += to_copy; - if (comp) - comp->length += to_copy; - } -} - - -static void dx_insert_8(dx_composed_field_t *field, uint8_t value) -{ - dx_insert(field, &value, 1); -} - - -static void dx_insert_32(dx_composed_field_t *field, uint32_t value) -{ - uint8_t buf[4]; - buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); - buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); - buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); - buf[3] = (uint8_t) (value & 0x000000FF); - dx_insert(field, buf, 4); -} - - -static void dx_insert_64(dx_composed_field_t *field, uint64_t value) -{ - uint8_t buf[8]; - buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); - buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48); - buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40); - buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32); - buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24); - buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); - buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); - buf[7] = (uint8_t) (value & 0x00000000000000FFL); - dx_insert(field, buf, 8); -} - - -static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) -{ - while (*buf) { - if (*cursor >= dx_buffer_size(*buf)) { - *buf = (*buf)->next; - *cursor = 0; - } else { - dx_buffer_base(*buf)[*cursor] = value; - (*cursor)++; - return; - } - } -} - - -static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) -{ - dx_buffer_t *buf = field->buffer; - size_t cursor = field->offset; - - dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); - dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 16)); - dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 8)); - dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); -} - - -static void dx_compose_start_composite(dx_composed_field_t *field, int isMap) -{ - if (isMap) - dx_insert_8(field, DX_AMQP_MAP32); - else - dx_insert_8(field, DX_AMQP_LIST32); - - // - // Push a composite descriptor on the field stack - // - dx_composite_t *comp = new_dx_composite_t(); - DEQ_ITEM_INIT(comp); - comp->isMap = isMap; - - // - // Mark the current location to later overwrite the length - // - comp->length_location.buffer = DEQ_TAIL(field->buffers); - comp->length_location.offset = dx_buffer_size(comp->length_location.buffer); - comp->length_location.length = 4; - comp->length_location.parsed = 1; - - dx_insert(field, (const uint8_t*) "\x00\x00\x00\x00", 4); - - // - // Mark the current location to later overwrite the count - // - comp->count_location.buffer = DEQ_TAIL(field->buffers); - comp->count_location.offset = dx_buffer_size(comp->count_location.buffer); - comp->count_location.length = 4; - comp->count_location.parsed = 1; - - dx_insert(field, (const uint8_t*) "\x00\x00\x00\x00", 4); - - comp->length = 4; // Include the length of the count field - comp->count = 0; - - DEQ_INSERT_HEAD(field->fieldStack, comp); -} - - -static void dx_compose_end_composite(dx_composed_field_t *field) -{ - dx_composite_t *comp = DEQ_HEAD(field->fieldStack); - assert(comp); - - dx_overwrite_32(&comp->length_location, comp->length); - dx_overwrite_32(&comp->count_location, comp->count); - - DEQ_REMOVE_HEAD(field->fieldStack); - - // - // If there is an enclosing composite, update its length and count - // - dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack); - if (enclosing) { - enclosing->length += (comp->length - 4); // the length and count were already accounted for - enclosing->count++; - } - - free_dx_composite_t(comp); -} - - -dx_composed_field_t *dx_compose(uint64_t performative, dx_composed_field_t *extend) -{ - dx_composed_field_t *field = extend; - - if (field) { - assert(DEQ_SIZE(field->fieldStack) == 0); - } else { - field = new_dx_composed_field_t(); - if (!field) - return 0; - - DEQ_INIT(field->buffers); - DEQ_INIT(field->fieldStack); - } - - dx_insert_8(field, 0x00); - dx_compose_insert_ulong(field, performative); - - return field; -} - - -void dx_compose_free(dx_composed_field_t *field) -{ - dx_buffer_t *buf = DEQ_HEAD(field->buffers); - while (buf) { - DEQ_REMOVE_HEAD(field->buffers); - dx_buffer_free(buf); - buf = DEQ_HEAD(field->buffers); - } - - dx_composite_t *comp = DEQ_HEAD(field->fieldStack); - while (comp) { - DEQ_REMOVE_HEAD(field->fieldStack); - free_dx_composite_t(comp); - comp = DEQ_HEAD(field->fieldStack); - } - - free_dx_composed_field_t(field); -} - - -void dx_compose_start_list(dx_composed_field_t *field) -{ - dx_compose_start_composite(field, 0); -} - - -void dx_compose_end_list(dx_composed_field_t *field) -{ - dx_compose_end_composite(field); -} - - -void dx_compose_start_map(dx_composed_field_t *field) -{ - dx_compose_start_composite(field, 1); -} - - -void dx_compose_end_map(dx_composed_field_t *field) -{ - dx_compose_end_composite(field); -} - - -void dx_compose_insert_null(dx_composed_field_t *field) -{ - dx_insert_8(field, DX_AMQP_NULL); - bump_count(field); -} - - -void dx_compose_insert_bool(dx_composed_field_t *field, int value) -{ - dx_insert_8(field, value ? DX_AMQP_TRUE : DX_AMQP_FALSE); - bump_count(field); -} - - -void dx_compose_insert_uint(dx_composed_field_t *field, uint32_t value) -{ - if (value == 0) { - dx_insert_8(field, DX_AMQP_UINT0); - } else if (value < 256) { - dx_insert_8(field, DX_AMQP_SMALLUINT); - dx_insert_8(field, (uint8_t) value); - } else { - dx_insert_8(field, DX_AMQP_UINT); - dx_insert_32(field, value); - } - bump_count(field); -} - - -void dx_compose_insert_ulong(dx_composed_field_t *field, uint64_t value) -{ - if (value == 0) { - dx_insert_8(field, DX_AMQP_ULONG0); - } else if (value < 256) { - dx_insert_8(field, DX_AMQP_SMALLULONG); - dx_insert_8(field, (uint8_t) value); - } else { - dx_insert_8(field, DX_AMQP_ULONG); - dx_insert_64(field, value); - } - bump_count(field); -} - - -void dx_compose_insert_int(dx_composed_field_t *field, int32_t value) -{ - if (value >= -128 && value <= 127) { - dx_insert_8(field, DX_AMQP_SMALLINT); - dx_insert_8(field, (uint8_t) value); - } else { - dx_insert_8(field, DX_AMQP_INT); - dx_insert_32(field, (uint32_t) value); - } - bump_count(field); -} - - -void dx_compose_insert_long(dx_composed_field_t *field, int64_t value) -{ - if (value >= -128 && value <= 127) { - dx_insert_8(field, DX_AMQP_SMALLLONG); - dx_insert_8(field, (uint8_t) value); - } else { - dx_insert_8(field, DX_AMQP_LONG); - dx_insert_64(field, (uint64_t) value); - } - bump_count(field); -} - - -void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value) -{ - dx_insert_8(field, DX_AMQP_TIMESTAMP); - dx_insert_64(field, value); - bump_count(field); -} - - -void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value) -{ - dx_insert_8(field, DX_AMQP_UUID); - dx_insert(field, value, 16); - bump_count(field); -} - - -void dx_compose_insert_binary(dx_composed_field_t *field, const uint8_t *value, uint32_t len) -{ - if (len < 256) { - dx_insert_8(field, DX_AMQP_VBIN8); - dx_insert_8(field, (uint8_t) len); - } else { - dx_insert_8(field, DX_AMQP_VBIN32); - dx_insert_32(field, len); - } - dx_insert(field, value, len); - bump_count(field); -} - - -void dx_compose_insert_binary_buffers(dx_composed_field_t *field, dx_buffer_list_t *buffers) -{ - dx_buffer_t *buf = DEQ_HEAD(*buffers); - uint32_t len = 0; - - // - // Calculate the size of the binary field to be appended. - // - while (buf) { - len += dx_buffer_size(buf); - buf = DEQ_NEXT(buf); - } - - // - // Supply the appropriate binary tag for the length. - // - if (len < 256) { - dx_insert_8(field, DX_AMQP_VBIN8); - dx_insert_8(field, (uint8_t) len); - } else { - dx_insert_8(field, DX_AMQP_VBIN32); - dx_insert_32(field, len); - } - - // - // Move the supplied buffers to the tail of the field's buffer list. - // - buf = DEQ_HEAD(*buffers); - while (buf) { - DEQ_REMOVE_HEAD(*buffers); - DEQ_INSERT_TAIL(field->buffers, buf); - buf = DEQ_HEAD(*buffers); - } -} - - -void dx_compose_insert_string(dx_composed_field_t *field, const char *value) -{ - uint32_t len = strlen(value); - - if (len < 256) { - dx_insert_8(field, DX_AMQP_STR8_UTF8); - dx_insert_8(field, (uint8_t) len); - } else { - dx_insert_8(field, DX_AMQP_STR32_UTF8); - dx_insert_32(field, len); - } - dx_insert(field, (const uint8_t*) value, len); - bump_count(field); -} - - -void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter) -{ - uint32_t len = 0; - - while (!dx_field_iterator_end(iter)) { - dx_field_iterator_octet(iter); - len++; - } - - dx_field_iterator_reset(iter); - - if (len < 256) { - dx_insert_8(field, DX_AMQP_STR8_UTF8); - dx_insert_8(field, (uint8_t) len); - } else { - dx_insert_8(field, DX_AMQP_STR32_UTF8); - dx_insert_32(field, len); - } - - while (!dx_field_iterator_end(iter)) { - uint8_t octet = dx_field_iterator_octet(iter); - dx_insert_8(field, octet); - } - - bump_count(field); -} - - -void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value) -{ - uint32_t len = strlen(value); - - if (len < 256) { - dx_insert_8(field, DX_AMQP_SYM8); - dx_insert_8(field, (uint8_t) len); - } else { - dx_insert_8(field, DX_AMQP_SYM32); - dx_insert_32(field, len); - } - dx_insert(field, (const uint8_t*) value, len); - bump_count(field); -} - - -dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field) -{ - return &field->buffers; -} - diff --git a/qpid/extras/dispatch/src/compose_private.h b/qpid/extras/dispatch/src/compose_private.h deleted file mode 100644 index 6e72185f3c..0000000000 --- a/qpid/extras/dispatch/src/compose_private.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef __compose_private_h__ -#define __compose_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/compose.h> -#include "message_private.h" - -dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field); - -typedef struct dx_composite_t { - DEQ_LINKS(struct dx_composite_t); - int isMap; - uint32_t count; - uint32_t length; - dx_field_location_t length_location; - dx_field_location_t count_location; -} dx_composite_t; - -ALLOC_DECLARE(dx_composite_t); -DEQ_DECLARE(dx_composite_t, dx_field_stack_t); - - -struct dx_composed_field_t { - dx_buffer_list_t buffers; - dx_field_stack_t fieldStack; -}; - -ALLOC_DECLARE(dx_composed_field_t); - -#endif diff --git a/qpid/extras/dispatch/src/config.c b/qpid/extras/dispatch/src/config.c deleted file mode 100644 index 9390115251..0000000000 --- a/qpid/extras/dispatch/src/config.c +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/python_embedded.h> -#include "config_private.h" -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/log.h> - -#define PYTHON_MODULE "qpid.dispatch.config" - -static const char *log_module = "CONFIG"; - -struct dx_config_t { - PyObject *pModule; - PyObject *pClass; - PyObject *pObject; -}; - -ALLOC_DECLARE(dx_config_t); -ALLOC_DEFINE(dx_config_t); - -void dx_config_initialize() -{ - dx_python_start(); -} - - -void dx_config_finalize() -{ - dx_python_stop(); -} - - -dx_config_t *dx_config(const char *filename) -{ - dx_config_t *config = new_dx_config_t(); - - // - // Load the Python configuration module and get a reference to the config class. - // - PyObject *pName = PyString_FromString(PYTHON_MODULE); - config->pModule = PyImport_Import(pName); - Py_DECREF(pName); - - if (!config->pModule) { - PyErr_Print(); - free_dx_config_t(config); - dx_log(log_module, LOG_ERROR, "Unable to load configuration module: %s", PYTHON_MODULE); - return 0; - } - - config->pClass = PyObject_GetAttrString(config->pModule, "DXConfig"); - if (!config->pClass || !PyClass_Check(config->pClass)) { - PyErr_Print(); - Py_DECREF(config->pModule); - free_dx_config_t(config); - dx_log(log_module, LOG_ERROR, "Problem with configuration module: Missing DXConfig class"); - return 0; - } - - // - // Instantiate the DXConfig class, passing in the configuration file name. - // - PyObject *pArgs = PyTuple_New(1); - PyObject *fname = PyString_FromString(filename); - PyTuple_SetItem(pArgs, 0, fname); - config->pObject = PyInstance_New(config->pClass, pArgs, 0); - Py_DECREF(pArgs); - - if (config->pObject == 0) { - PyErr_Print(); - Py_DECREF(config->pModule); - free_dx_config_t(config); - dx_log(log_module, LOG_ERROR, "Configuration file '%s' could not be read", filename); - return 0; - } - - return config; -} - - -void dx_config_read(dx_config_t *config) -{ - PyObject *pMethod; - PyObject *pArgs; - PyObject *pResult; - - if (!config) - return; - - pMethod = PyObject_GetAttrString(config->pObject, "read_file"); - if (!pMethod || !PyCallable_Check(pMethod)) { - dx_log(log_module, LOG_ERROR, "Problem with configuration module: No callable 'item_count'"); - if (pMethod) { - Py_DECREF(pMethod); - } - return; - } - - pArgs = PyTuple_New(0); - pResult = PyObject_CallObject(pMethod, pArgs); - Py_DECREF(pArgs); - if (pResult) { - Py_DECREF(pResult); - } else { - PyErr_Print(); - } - Py_DECREF(pMethod); -} - - -void dx_config_free(dx_config_t *config) -{ - if (config) { - Py_DECREF(config->pClass); - Py_DECREF(config->pModule); - free_dx_config_t(config); - } -} - - -int dx_config_item_count(const dx_config_t *config, const char *section) -{ - PyObject *pSection; - PyObject *pMethod; - PyObject *pArgs; - PyObject *pResult; - int result = 0; - - if (!config) - return 0; - - pMethod = PyObject_GetAttrString(config->pObject, "item_count"); - if (!pMethod || !PyCallable_Check(pMethod)) { - dx_log(log_module, LOG_ERROR, "Problem with configuration module: No callable 'item_count'"); - if (pMethod) { - Py_DECREF(pMethod); - } - return 0; - } - - pSection = PyString_FromString(section); - pArgs = PyTuple_New(1); - PyTuple_SetItem(pArgs, 0, pSection); - pResult = PyObject_CallObject(pMethod, pArgs); - Py_DECREF(pArgs); - if (pResult && PyInt_Check(pResult)) - result = (int) PyInt_AsLong(pResult); - if (pResult) { - Py_DECREF(pResult); - } - Py_DECREF(pMethod); - - return result; -} - - -static PyObject *item_value(const dx_config_t *config, const char *section, int index, const char* key, const char* method) -{ - PyObject *pSection; - PyObject *pIndex; - PyObject *pKey; - PyObject *pMethod; - PyObject *pArgs; - PyObject *pResult; - - if (!config) - return 0; - - pMethod = PyObject_GetAttrString(config->pObject, method); - if (!pMethod || !PyCallable_Check(pMethod)) { - dx_log(log_module, LOG_ERROR, "Problem with configuration module: No callable '%s'", method); - if (pMethod) { - Py_DECREF(pMethod); - } - return 0; - } - - pSection = PyString_FromString(section); - pIndex = PyInt_FromLong((long) index); - pKey = PyString_FromString(key); - pArgs = PyTuple_New(3); - PyTuple_SetItem(pArgs, 0, pSection); - PyTuple_SetItem(pArgs, 1, pIndex); - PyTuple_SetItem(pArgs, 2, pKey); - pResult = PyObject_CallObject(pMethod, pArgs); - Py_DECREF(pArgs); - Py_DECREF(pMethod); - - return pResult; -} - - -const char *dx_config_item_value_string(const dx_config_t *config, const char *section, int index, const char* key) -{ - PyObject *pResult = item_value(config, section, index, key, "value_string"); - char *value = 0; - - if (pResult && PyString_Check(pResult)) { - Py_ssize_t size = PyString_Size(pResult); - value = (char*) malloc(size + 1); - strncpy(value, PyString_AsString(pResult), size + 1); - } - - if (pResult) { - Py_DECREF(pResult); - } - - return value; -} - - -uint32_t dx_config_item_value_int(const dx_config_t *config, const char *section, int index, const char* key) -{ - PyObject *pResult = item_value(config, section, index, key, "value_int"); - uint32_t value = 0; - - if (pResult && PyLong_Check(pResult)) - value = (uint32_t) PyLong_AsLong(pResult); - - if (pResult) { - Py_DECREF(pResult); - } - - return value; -} - - -int dx_config_item_value_bool(const dx_config_t *config, const char *section, int index, const char* key) -{ - PyObject *pResult = item_value(config, section, index, key, "value_bool"); - int value = 0; - - if (pResult && pResult != Py_None) - value = 1; - - if (pResult) { - Py_DECREF(pResult); - } - - return value; -} - - diff --git a/qpid/extras/dispatch/src/config_private.h b/qpid/extras/dispatch/src/config_private.h deleted file mode 100644 index f4ecc64e4c..0000000000 --- a/qpid/extras/dispatch/src/config_private.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef __config_private_h__ -#define __config_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/config.h> - -void dx_config_initialize(); -void dx_config_finalize(); -dx_config_t *dx_config(const char *filename); -void dx_config_read(dx_config_t *config); -void dx_config_free(dx_config_t *config); - -#endif diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c deleted file mode 100644 index 3ae24d81b0..0000000000 --- a/qpid/extras/dispatch/src/container.c +++ /dev/null @@ -1,868 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <stdio.h> -#include <string.h> -#include "dispatch_private.h" -#include <qpid/dispatch/container.h> -#include <qpid/dispatch/server.h> -#include <qpid/dispatch/message.h> -#include <proton/engine.h> -#include <proton/message.h> -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/hash.h> -#include <qpid/dispatch/threading.h> -#include <qpid/dispatch/iterator.h> -#include <qpid/dispatch/log.h> -#include <qpid/dispatch/agent.h> - -static char *module="CONTAINER"; - -struct dx_node_t { - dx_container_t *container; - const dx_node_type_t *ntype; - char *name; - void *context; - dx_dist_mode_t supported_dist; - dx_lifetime_policy_t life_policy; -}; - -ALLOC_DECLARE(dx_node_t); -ALLOC_DEFINE(dx_node_t); -ALLOC_DEFINE(dx_link_item_t); - - -struct dx_link_t { - pn_link_t *pn_link; - void *context; - dx_node_t *node; -}; - -ALLOC_DECLARE(dx_link_t); -ALLOC_DEFINE(dx_link_t); - - -struct dx_delivery_t { - pn_delivery_t *pn_delivery; - dx_delivery_t *peer; - void *context; - uint64_t disposition; - dx_link_t *link; -}; - -ALLOC_DECLARE(dx_delivery_t); -ALLOC_DEFINE(dx_delivery_t); - - -typedef struct dxc_node_type_t { - DEQ_LINKS(struct dxc_node_type_t); - const dx_node_type_t *ntype; -} dxc_node_type_t; -DEQ_DECLARE(dxc_node_type_t, dxc_node_type_list_t); - -static int DX_CONTAINER_CLASS_CONTAINER = 1; -static int DX_CONTAINER_CLASS_NODE_TYPE = 2; -static int DX_CONTAINER_CLASS_NODE = 3; - -typedef struct container_class_t { - dx_container_t *container; - int class_id; -} container_class_t; - -struct dx_container_t { - dx_dispatch_t *dx; - dx_server_t *server; - dx_hash_t *node_type_map; - dx_hash_t *node_map; - sys_mutex_t *lock; - dx_node_t *default_node; - dxc_node_type_list_t node_type_list; - dx_agent_class_t *class_container; - dx_agent_class_t *class_node_type; - dx_agent_class_t *class_node; -}; - -static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link) -{ - sys_mutex_lock(container->lock); - dx_node_t *node = 0; - const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); - dx_field_iterator_t *iter; - // TODO - Extract the name from the structured source - - if (source) { - iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); - dx_hash_retrieve(container->node_map, iter, (void*) &node); - dx_field_iterator_free(iter); - } - sys_mutex_unlock(container->lock); - - if (node == 0) { - if (container->default_node) - node = container->default_node; - else { - // Reject the link - // TODO - When the API allows, add an error message for "no available node" - pn_link_close(pn_link); - return; - } - } - - dx_link_t *link = new_dx_link_t(); - if (!link) { - pn_link_close(pn_link); - return; - } - - link->pn_link = pn_link; - link->context = 0; - link->node = node; - - pn_link_set_context(pn_link, link); - node->ntype->outgoing_handler(node->context, link); -} - - -static void setup_incoming_link(dx_container_t *container, pn_link_t *pn_link) -{ - sys_mutex_lock(container->lock); - dx_node_t *node = 0; - const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); - dx_field_iterator_t *iter; - // TODO - Extract the name from the structured target - - if (target) { - iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); - dx_hash_retrieve(container->node_map, iter, (void*) &node); - dx_field_iterator_free(iter); - } - sys_mutex_unlock(container->lock); - - if (node == 0) { - if (container->default_node) - node = container->default_node; - else { - // Reject the link - // TODO - When the API allows, add an error message for "no available node" - pn_link_close(pn_link); - return; - } - } - - dx_link_t *link = new_dx_link_t(); - if (!link) { - pn_link_close(pn_link); - return; - } - - link->pn_link = pn_link; - link->context = 0; - link->node = node; - - pn_link_set_context(pn_link, link); - node->ntype->incoming_handler(node->context, link); -} - - -static int do_writable(pn_link_t *pn_link) -{ - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); - if (!link) - return 0; - - dx_node_t *node = link->node; - if (!node) - return 0; - - return node->ntype->writable_handler(node->context, link); -} - - -static void do_receive(pn_delivery_t *pnd) -{ - pn_link_t *pn_link = pn_delivery_link(pnd); - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); - dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd); - - if (link) { - dx_node_t *node = link->node; - if (node) { - if (!delivery) { - delivery = new_dx_delivery_t(); - delivery->pn_delivery = pnd; - delivery->peer = 0; - delivery->context = 0; - delivery->disposition = 0; - delivery->link = link; - pn_delivery_set_context(pnd, delivery); - } - - node->ntype->rx_handler(node->context, link, delivery); - return; - } - } - - // - // Reject the delivery if we couldn't find a node to handle it - // - pn_link_advance(pn_link); - pn_link_flow(pn_link, 1); - pn_delivery_update(pnd, PN_REJECTED); - pn_delivery_settle(pnd); -} - - -static void do_updated(pn_delivery_t *pnd) -{ - pn_link_t *pn_link = pn_delivery_link(pnd); - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); - dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd); - - if (link && delivery) { - dx_node_t *node = link->node; - if (node) - node->ntype->disp_handler(node->context, link, delivery); - } -} - - -static int close_handler(void* unused, pn_connection_t *conn) -{ - // - // Close all links, passing False as the 'closed' argument. These links are not - // being properly 'detached'. They are being orphaned. - // - pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE); - while (pn_link) { - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); - dx_node_t *node = link->node; - if (node && link) - node->ntype->link_detach_handler(node->context, link, 0); - pn_link_close(pn_link); - free_dx_link_t(link); - pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE); - } - - // teardown all sessions - pn_session_t *ssn = pn_session_head(conn, 0); - while (ssn) { - pn_session_close(ssn); - ssn = pn_session_next(ssn, 0); - } - - // teardown the connection - pn_connection_close(conn); - return 0; -} - - -static int process_handler(dx_container_t *container, void* unused, pn_connection_t *conn) -{ - pn_session_t *ssn; - pn_link_t *pn_link; - pn_delivery_t *delivery; - int event_count = 0; - - // Step 1: setup the engine's connection, and any sessions and links - // that may be pending. - - // initialize the connection if it's new - if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { - pn_connection_open(conn); - event_count++; - } - - // open all pending sessions - ssn = pn_session_head(conn, PN_LOCAL_UNINIT); - while (ssn) { - pn_session_open(ssn); - ssn = pn_session_next(ssn, PN_LOCAL_UNINIT); - event_count++; - } - - // configure and open any pending links - pn_link = pn_link_head(conn, PN_LOCAL_UNINIT); - while (pn_link) { - if (pn_link_is_sender(pn_link)) - setup_outgoing_link(container, pn_link); - else - setup_incoming_link(container, pn_link); - pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT); - event_count++; - } - - - // Step 2: Now drain all the pending deliveries from the connection's - // work queue and process them - - delivery = pn_work_head(conn); - while (delivery) { - if (pn_delivery_readable(delivery)) - do_receive(delivery); - - if (pn_delivery_updated(delivery)) { - do_updated(delivery); - pn_delivery_clear(delivery); - } - delivery = pn_work_next(delivery); - event_count++; - } - - // - // Step 2.5: Call the attached node's writable handler for all active links - // on the connection. Note that in Dispatch, links are considered - // bidirectional. Incoming and outgoing only pertains to deliveries and - // deliveries are a subset of the traffic that flows both directions on links. - // - pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); - while (pn_link) { - assert(pn_session_connection(pn_link_session(pn_link)) == conn); - event_count += do_writable(pn_link); - pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); - } - - // Step 3: Clean up any links or sessions that have been closed by the - // remote. If the connection has been closed remotely, clean that up - // also. - - // teardown any terminating links - pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); - while (pn_link) { - dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); - dx_node_t *node = link->node; - if (node) - node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message - pn_link_close(pn_link); - pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); - event_count++; - } - - // teardown any terminating sessions - ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); - while (ssn) { - pn_session_close(ssn); - ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); - event_count++; - } - - // teardown the connection if it's terminating - if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { - pn_connection_close(conn); - event_count++; - } - - return event_count; -} - - -static void open_handler(dx_container_t *container, dx_connection_t *conn, dx_direction_t dir) -{ - const dx_node_type_t *nt; - - // - // Note the locking structure in this function. Generally this would be unsafe, but since - // this particular list is only ever appended to and never has items inserted or deleted, - // this usage is safe in this case. - // - sys_mutex_lock(container->lock); - dxc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list); - sys_mutex_unlock(container->lock); - - pn_connection_open(dx_connection_pn(conn)); - - while (nt_item) { - nt = nt_item->ntype; - if (dir == DX_INCOMING) { - if (nt->inbound_conn_open_handler) - nt->inbound_conn_open_handler(nt->type_context, conn); - } else { - if (nt->outbound_conn_open_handler) - nt->outbound_conn_open_handler(nt->type_context, conn); - } - - sys_mutex_lock(container->lock); - nt_item = DEQ_NEXT(nt_item); - sys_mutex_unlock(container->lock); - } -} - - -static int handler(void *handler_context, void *conn_context, dx_conn_event_t event, dx_connection_t *dx_conn) -{ - dx_container_t *container = (dx_container_t*) handler_context; - pn_connection_t *conn = dx_connection_pn(dx_conn); - - switch (event) { - case DX_CONN_EVENT_LISTENER_OPEN: open_handler(container, dx_conn, DX_INCOMING); break; - case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, dx_conn, DX_OUTGOING); break; - case DX_CONN_EVENT_CLOSE: return close_handler(conn_context, conn); - case DX_CONN_EVENT_PROCESS: return process_handler(container, conn_context, conn); - } - - return 0; -} - - -static void container_schema_handler(void *context, void *correlator) -{ -} - - -static void container_query_handler(void* context, const char *id, void *correlator) -{ - container_class_t *cls = (container_class_t*) context; - - if (cls->class_id == DX_CONTAINER_CLASS_CONTAINER) { - dx_agent_value_uint(correlator, "node_type_count", dx_hash_size(cls->container->node_type_map)); - dx_agent_value_uint(correlator, "node_count", dx_hash_size(cls->container->node_map)); - if (cls->container->default_node) - dx_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name); - else - dx_agent_value_null(correlator, "default_node_type"); - dx_agent_value_complete(correlator, false); - - } else if (cls->class_id == DX_CONTAINER_CLASS_NODE_TYPE) { - - } else if (cls->class_id == DX_CONTAINER_CLASS_NODE) { - - } -} - - -dx_agent_class_t *setup_class(dx_container_t *container, const char *fqname, int id) -{ - container_class_t *cls = NEW(container_class_t); - cls->container = container; - cls->class_id = id; - - return dx_agent_register_class(container->dx, fqname, cls, - container_schema_handler, - container_query_handler); -} - - -dx_container_t *dx_container(dx_dispatch_t *dx) -{ - dx_container_t *container = NEW(dx_container_t); - - container->dx = dx; - container->server = dx->server; - container->node_type_map = dx_hash(6, 4, 1); // 64 buckets, item batches of 4 - container->node_map = dx_hash(10, 32, 0); // 1K buckets, item batches of 32 - container->lock = sys_mutex(); - container->default_node = 0; - DEQ_INIT(container->node_type_list); - - dx_log(module, LOG_TRACE, "Container Initializing"); - dx_server_set_conn_handler(dx, handler, container); - - return container; -} - - -void dx_container_setup_agent(dx_dispatch_t *dx) -{ - dx->container->class_container = - setup_class(dx->container, "org.apache.qpid.dispatch.container", DX_CONTAINER_CLASS_CONTAINER); - dx->container->class_node_type = - setup_class(dx->container, "org.apache.qpid.dispatch.container.node_type", DX_CONTAINER_CLASS_NODE_TYPE); - dx->container->class_node = - setup_class(dx->container, "org.apache.qpid.dispatch.container.node", DX_CONTAINER_CLASS_NODE); -} - - -void dx_container_free(dx_container_t *container) -{ - // TODO - Free the nodes - // TODO - Free the node types - sys_mutex_free(container->lock); - free(container); -} - - -int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt) -{ - dx_container_t *container = dx->container; - - int result; - dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL); - dxc_node_type_t *nt_item = NEW(dxc_node_type_t); - DEQ_ITEM_INIT(nt_item); - nt_item->ntype = nt; - - sys_mutex_lock(container->lock); - result = dx_hash_insert_const(container->node_type_map, iter, nt, 0); - DEQ_INSERT_TAIL(container->node_type_list, nt_item); - sys_mutex_unlock(container->lock); - - dx_field_iterator_free(iter); - if (result < 0) - return result; - dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name); - - return 0; -} - - -dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dx, - const dx_node_type_t *nt, - void *context, - dx_dist_mode_t supported_dist) -{ - dx_container_t *container = dx->container; - - if (container->default_node) - dx_container_destroy_node(container->default_node); - - if (nt) { - container->default_node = dx_container_create_node(dx, nt, 0, context, supported_dist, DX_LIFE_PERMANENT); - dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); - } else { - container->default_node = 0; - dx_log(module, LOG_TRACE, "Default node removed"); - } - - return container->default_node; -} - - -dx_node_t *dx_container_create_node(dx_dispatch_t *dx, - const dx_node_type_t *nt, - const char *name, - void *context, - dx_dist_mode_t supported_dist, - dx_lifetime_policy_t life_policy) -{ - dx_container_t *container = dx->container; - int result; - dx_node_t *node = new_dx_node_t(); - if (!node) - return 0; - - node->container = container; - node->ntype = nt; - node->name = 0; - node->context = context; - node->supported_dist = supported_dist; - node->life_policy = life_policy; - - if (name) { - dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL); - sys_mutex_lock(container->lock); - result = dx_hash_insert(container->node_map, iter, node, 0); - sys_mutex_unlock(container->lock); - dx_field_iterator_free(iter); - if (result < 0) { - free_dx_node_t(node); - return 0; - } - - node->name = (char*) malloc(strlen(name) + 1); - strcpy(node->name, name); - } - - if (name) - dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name); - - return node; -} - - -void dx_container_destroy_node(dx_node_t *node) -{ - dx_container_t *container = node->container; - - if (node->name) { - dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL); - sys_mutex_lock(container->lock); - dx_hash_remove(container->node_map, iter); - sys_mutex_unlock(container->lock); - dx_field_iterator_free(iter); - free(node->name); - } - - free_dx_node_t(node); -} - - -void dx_container_node_set_context(dx_node_t *node, void *node_context) -{ - node->context = node_context; -} - - -dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node) -{ - return node->supported_dist; -} - - -dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node) -{ - return node->life_policy; -} - - -dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name) -{ - pn_session_t *sess = pn_session(dx_connection_pn(conn)); - dx_link_t *link = new_dx_link_t(); - - if (dir == DX_OUTGOING) - link->pn_link = pn_sender(sess, name); - else - link->pn_link = pn_receiver(sess, name); - link->context = node->context; - link->node = node; - - pn_link_set_context(link->pn_link, link); - - pn_session_open(sess); - - return link; -} - - -void dx_link_set_context(dx_link_t *link, void *context) -{ - link->context = context; -} - - -void *dx_link_get_context(dx_link_t *link) -{ - return link->context; -} - - -void dx_link_set_conn_context(dx_link_t *link, void *context) -{ - pn_session_t *pn_sess = pn_link_session(link->pn_link); - if (!pn_sess) - return; - pn_connection_t *pn_conn = pn_session_connection(pn_sess); - if (!pn_conn) - return; - dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn); - if (!conn) - return; - dx_connection_set_link_context(conn, context); -} - - -void *dx_link_get_conn_context(dx_link_t *link) -{ - pn_session_t *pn_sess = pn_link_session(link->pn_link); - if (!pn_sess) - return 0; - pn_connection_t *pn_conn = pn_session_connection(pn_sess); - if (!pn_conn) - return 0; - dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn); - if (!conn) - return 0; - return dx_connection_get_link_context(conn); -} - - -pn_link_t *dx_link_pn(dx_link_t *link) -{ - return link->pn_link; -} - - -dx_connection_t *dx_link_connection(dx_link_t *link) -{ - if (!link || !link->pn_link) - return 0; - - pn_session_t *sess = pn_link_session(link->pn_link); - if (!sess) - return 0; - - pn_connection_t *conn = pn_session_connection(sess); - if (!conn) - return 0; - - dx_connection_t *ctx = pn_connection_get_context(conn); - if (!ctx) - return 0; - - return ctx; -} - - -pn_terminus_t *dx_link_source(dx_link_t *link) -{ - return pn_link_source(link->pn_link); -} - - -pn_terminus_t *dx_link_target(dx_link_t *link) -{ - return pn_link_target(link->pn_link); -} - - -pn_terminus_t *dx_link_remote_source(dx_link_t *link) -{ - return pn_link_remote_source(link->pn_link); -} - - -pn_terminus_t *dx_link_remote_target(dx_link_t *link) -{ - return pn_link_remote_target(link->pn_link); -} - - -void dx_link_activate(dx_link_t *link) -{ - if (!link || !link->pn_link) - return; - - pn_session_t *sess = pn_link_session(link->pn_link); - if (!sess) - return; - - pn_connection_t *conn = pn_session_connection(sess); - if (!conn) - return; - - dx_connection_t *ctx = pn_connection_get_context(conn); - if (!ctx) - return; - - dx_server_activate(ctx); -} - - -void dx_link_close(dx_link_t *link) -{ - pn_link_close(link->pn_link); -} - - -dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag) -{ - pn_link_t *pnl = dx_link_pn(link); - - // - // If there is a current delivery on this outgoing link, something - // is wrong with the delivey algorithm. We assume that the current - // delivery ('pnd' below) is the one created by pn_delivery. If it is - // not, then my understanding of how proton works is incorrect. - // - assert(!pn_link_current(pnl)); - - pn_delivery(pnl, tag); - pn_delivery_t *pnd = pn_link_current(pnl); - - if (!pnd) - return 0; - - dx_delivery_t *delivery = new_dx_delivery_t(); - delivery->pn_delivery = pnd; - delivery->peer = 0; - delivery->context = 0; - delivery->disposition = 0; - delivery->link = link; - pn_delivery_set_context(pnd, delivery); - - return delivery; -} - - -void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition) -{ - if (delivery->pn_delivery) { - if (final_disposition > 0) - pn_delivery_update(delivery->pn_delivery, final_disposition); - pn_delivery_set_context(delivery->pn_delivery, 0); - pn_delivery_settle(delivery->pn_delivery); - } - if (delivery->peer) - delivery->peer->peer = 0; - free_dx_delivery_t(delivery); -} - - -void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer) -{ - delivery->peer = peer; -} - - -void dx_delivery_set_context(dx_delivery_t *delivery, void *context) -{ - delivery->context = context; -} - - -void *dx_delivery_context(dx_delivery_t *delivery) -{ - return delivery->context; -} - - -dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery) -{ - return delivery->peer; -} - - -pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery) -{ - return delivery->pn_delivery; -} - - -void dx_delivery_settle(dx_delivery_t *delivery) -{ - if (delivery->pn_delivery) { - pn_delivery_settle(delivery->pn_delivery); - delivery->pn_delivery = 0; - } -} - - -bool dx_delivery_settled(dx_delivery_t *delivery) -{ - return pn_delivery_settled(delivery->pn_delivery); -} - - -bool dx_delivery_disp_changed(dx_delivery_t *delivery) -{ - return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery); -} - - -uint64_t dx_delivery_disp(dx_delivery_t *delivery) -{ - delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery); - return delivery->disposition; -} - - -dx_link_t *dx_delivery_link(dx_delivery_t *delivery) -{ - return delivery->link; -} - diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c deleted file mode 100644 index 46c256a603..0000000000 --- a/qpid/extras/dispatch/src/dispatch.c +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/python_embedded.h> -#include <qpid/dispatch.h> -#include <qpid/dispatch/server.h> -#include <qpid/dispatch/ctools.h> -#include "dispatch_private.h" -#include "alloc_private.h" -#include "log_private.h" -#include "router_private.h" - -/** - * Private Function Prototypes - */ -dx_server_t *dx_server(int tc, const char *container_name); -void dx_server_setup_agent(dx_dispatch_t *dx); -void dx_server_free(dx_server_t *server); -dx_container_t *dx_container(dx_dispatch_t *dx); -void dx_container_setup_agent(dx_dispatch_t *dx); -void dx_container_free(dx_container_t *container); -dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *area, const char *id); -void dx_router_setup_late(dx_dispatch_t *dx); -void dx_router_free(dx_router_t *router); -dx_agent_t *dx_agent(dx_dispatch_t *dx); -void dx_agent_free(dx_agent_t *agent); - -ALLOC_DEFINE(dx_config_listener_t); -ALLOC_DEFINE(dx_config_connector_t); - -static const char *CONF_CONTAINER = "container"; -static const char *CONF_ROUTER = "router"; -static const char *CONF_LISTENER = "listener"; -static const char *CONF_CONNECTOR = "connector"; - - -dx_dispatch_t *dx_dispatch(const char *config_path) -{ - dx_dispatch_t *dx = NEW(dx_dispatch_t); - - int thread_count = 0; - const char *container_name = 0; - const char *router_mode_str = 0; - const char *router_area = "0"; - const char *router_id = 0; - - dx_router_mode_t router_mode = DX_ROUTER_MODE_STANDALONE; - - DEQ_INIT(dx->config_listeners); - DEQ_INIT(dx->config_connectors); - - dx_python_initialize(dx); - dx_log_initialize(); - dx_alloc_initialize(); - - dx_config_initialize(); - dx->config = dx_config(config_path); - dx_config_read(dx->config); - - if (dx->config) { - int count = dx_config_item_count(dx->config, CONF_CONTAINER); - if (count == 1) { - thread_count = dx_config_item_value_int(dx->config, CONF_CONTAINER, 0, "worker-threads"); - container_name = dx_config_item_value_string(dx->config, CONF_CONTAINER, 0, "container-name"); - } - - count = dx_config_item_count(dx->config, CONF_ROUTER); - if (count == 1) { - router_mode_str = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "mode"); - //router_area = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "0"); - router_id = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "router-id"); - } - } - - if (thread_count == 0) - thread_count = 1; - - if (!container_name) - container_name = "00000000-0000-0000-0000-000000000000"; // TODO - gen a real uuid - - if (router_mode_str && strcmp(router_mode_str, "interior") == 0) - router_mode = DX_ROUTER_MODE_INTERIOR; - - if (router_mode_str && strcmp(router_mode_str, "edge") == 0) - router_mode = DX_ROUTER_MODE_EDGE; - - if (!router_area) - router_area = "area"; - - if (!router_id) - router_id = container_name; - - dx->server = dx_server(thread_count, container_name); - dx->container = dx_container(dx); - dx->router = dx_router(dx, router_mode, router_area, router_id); - dx->agent = dx_agent(dx); - - dx_alloc_setup_agent(dx); - dx_server_setup_agent(dx); - dx_container_setup_agent(dx); - dx_router_setup_late(dx); - - return dx; -} - - -void dx_dispatch_free(dx_dispatch_t *dx) -{ - dx_config_free(dx->config); - dx_config_finalize(); - dx_agent_free(dx->agent); - dx_router_free(dx->router); - dx_container_free(dx->container); - dx_server_free(dx->server); - dx_log_finalize(); - dx_python_finalize(); -} - - -static void load_server_config(dx_dispatch_t *dx, dx_server_config_t *config, const char *section, int i) -{ - config->host = dx_config_item_value_string(dx->config, section, i, "addr"); - config->port = dx_config_item_value_string(dx->config, section, i, "port"); - config->role = dx_config_item_value_string(dx->config, section, i, "role"); - config->sasl_mechanisms = - dx_config_item_value_string(dx->config, section, i, "sasl-mechanisms"); - config->ssl_enabled = - dx_config_item_value_bool(dx->config, section, i, "ssl-profile"); - if (config->ssl_enabled) { - config->ssl_server = 1; - config->ssl_allow_unsecured_client = - dx_config_item_value_bool(dx->config, section, i, "allow-unsecured"); - config->ssl_certificate_file = - dx_config_item_value_string(dx->config, section, i, "cert-file"); - config->ssl_private_key_file = - dx_config_item_value_string(dx->config, section, i, "key-file"); - config->ssl_password = - dx_config_item_value_string(dx->config, section, i, "password"); - config->ssl_trusted_certificate_db = - dx_config_item_value_string(dx->config, section, i, "cert-db"); - config->ssl_require_peer_authentication = - dx_config_item_value_bool(dx->config, section, i, "require-peer-auth"); - } -} - - -static void configure_listeners(dx_dispatch_t *dx) -{ - int count; - - if (!dx->config) - return; - - count = dx_config_item_count(dx->config, CONF_LISTENER); - for (int i = 0; i < count; i++) { - dx_config_listener_t *cl = new_dx_config_listener_t(); - load_server_config(dx, &cl->configuration, CONF_LISTENER, i); - - printf("\nListener : %s:%s\n", cl->configuration.host, cl->configuration.port); - printf(" SASL: %s\n", cl->configuration.sasl_mechanisms); - printf(" SSL: %d\n", cl->configuration.ssl_enabled); - if (cl->configuration.ssl_enabled) { - printf(" unsec: %d\n", cl->configuration.ssl_allow_unsecured_client); - printf(" cert-file: %s\n", cl->configuration.ssl_certificate_file); - printf(" key-file: %s\n", cl->configuration.ssl_private_key_file); - printf(" cert-db: %s\n", cl->configuration.ssl_trusted_certificate_db); - printf(" peer-auth: %d\n", cl->configuration.ssl_require_peer_authentication); - } - - cl->listener = dx_server_listen(dx, &cl->configuration, cl); - DEQ_ITEM_INIT(cl); - DEQ_INSERT_TAIL(dx->config_listeners, cl); - } -} - - -static void configure_connectors(dx_dispatch_t *dx) -{ - int count; - - if (!dx->config) - return; - - count = dx_config_item_count(dx->config, CONF_CONNECTOR); - for (int i = 0; i < count; i++) { - dx_config_connector_t *cc = new_dx_config_connector_t(); - load_server_config(dx, &cc->configuration, CONF_CONNECTOR, i); - - printf("\nConnector : %s:%s\n", cc->configuration.host, cc->configuration.port); - printf(" SASL: %s\n", cc->configuration.sasl_mechanisms); - printf(" SSL: %d\n", cc->configuration.ssl_enabled); - if (cc->configuration.ssl_enabled) { - printf(" cert-file: %s\n", cc->configuration.ssl_certificate_file); - printf(" key-file: %s\n", cc->configuration.ssl_private_key_file); - printf(" cert-db: %s\n", cc->configuration.ssl_trusted_certificate_db); - printf(" peer-auth: %d\n", cc->configuration.ssl_require_peer_authentication); - } - - cc->connector = dx_server_connect(dx, &cc->configuration, cc); - DEQ_ITEM_INIT(cc); - DEQ_INSERT_TAIL(dx->config_connectors, cc); - } -} - - -void dx_dispatch_configure(dx_dispatch_t *dx) -{ - configure_listeners(dx); - configure_connectors(dx); -} - diff --git a/qpid/extras/dispatch/src/dispatch_private.h b/qpid/extras/dispatch/src/dispatch_private.h deleted file mode 100644 index d96eb59afe..0000000000 --- a/qpid/extras/dispatch/src/dispatch_private.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef __dispatch_private_h__ -#define __dispatch_private_h__ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "server_private.h" -#include "config_private.h" -#include <qpid/dispatch/ctools.h> - -typedef struct dx_container_t dx_container_t; -typedef struct dx_router_t dx_router_t; -typedef struct dx_agent_t dx_agent_t; - -typedef struct dx_config_listener_t { - DEQ_LINKS(struct dx_config_listener_t); - dx_listener_t *listener; - dx_server_config_t configuration; -} dx_config_listener_t; - -DEQ_DECLARE(dx_config_listener_t, dx_config_listener_list_t); -ALLOC_DECLARE(dx_config_listener_t); - - -typedef struct dx_config_connector_t { - DEQ_LINKS(struct dx_config_connector_t); - dx_connector_t *connector; - dx_server_config_t configuration; -} dx_config_connector_t; - -DEQ_DECLARE(dx_config_connector_t, dx_config_connector_list_t); -ALLOC_DECLARE(dx_config_connector_t); - -struct dx_dispatch_t { - dx_server_t *server; - dx_container_t *container; - dx_router_t *router; - dx_agent_t *agent; - dx_config_t *config; - - dx_config_listener_list_t config_listeners; - dx_config_connector_list_t config_connectors; -}; - -#endif - diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c deleted file mode 100644 index 0cb32acd05..0000000000 --- a/qpid/extras/dispatch/src/hash.c +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/hash.h> -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/alloc.h> -#include <stdio.h> -#include <string.h> - -typedef struct dx_hash_item_t { - DEQ_LINKS(struct dx_hash_item_t); - unsigned char *key; - union { - void *val; - const void *val_const; - } v; -} dx_hash_item_t; - -ALLOC_DECLARE(dx_hash_item_t); -ALLOC_DEFINE(dx_hash_item_t); -DEQ_DECLARE(dx_hash_item_t, items_t); - - -typedef struct bucket_t { - items_t items; -} bucket_t; - - -struct dx_hash_t { - bucket_t *buckets; - unsigned int bucket_count; - unsigned int bucket_mask; - int batch_size; - size_t size; - int is_const; -}; - - -struct dx_hash_handle_t { - bucket_t *bucket; - dx_hash_item_t *item; -}; - -ALLOC_DECLARE(dx_hash_handle_t); -ALLOC_DEFINE(dx_hash_handle_t); - - -// djb2 hash algorithm -static unsigned long dx_hash_function(dx_field_iterator_t *iter) -{ - unsigned long hash = 5381; - int c; - - dx_field_iterator_reset(iter); - while (!dx_field_iterator_end(iter)) { - c = (int) dx_field_iterator_octet(iter); - hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ - } - - return hash; -} - - -dx_hash_t *dx_hash(int bucket_exponent, int batch_size, int value_is_const) -{ - int i; - dx_hash_t *h = NEW(dx_hash_t); - - if (!h) - return 0; - - h->bucket_count = 1 << bucket_exponent; - h->bucket_mask = h->bucket_count - 1; - h->batch_size = batch_size; - h->size = 0; - h->is_const = value_is_const; - h->buckets = NEW_ARRAY(bucket_t, h->bucket_count); - for (i = 0; i < h->bucket_count; i++) { - DEQ_INIT(h->buckets[i].items); - } - - return h; -} - - -void dx_hash_free(dx_hash_t *h) -{ - // TODO - Implement this -} - - -size_t dx_hash_size(dx_hash_t *h) -{ - return h ? h->size : 0; -} - - -static dx_hash_item_t *dx_hash_internal_insert(dx_hash_t *h, dx_field_iterator_t *key, int *exists, dx_hash_handle_t **handle) -{ - unsigned long idx = dx_hash_function(key) & h->bucket_mask; - dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); - - while (item) { - if (dx_field_iterator_equal(key, item->key)) - break; - item = item->next; - } - - if (item) { - *exists = 1; - if (handle) - *handle = 0; - return item; - } - - item = new_dx_hash_item_t(); - if (!item) - return 0; - - DEQ_ITEM_INIT(item); - item->key = dx_field_iterator_copy(key); - - DEQ_INSERT_TAIL(h->buckets[idx].items, item); - h->size++; - *exists = 0; - - // - // If a pointer to a handle-pointer was supplied, create a handle for this item. - // - if (handle) { - *handle = new_dx_hash_handle_t(); - (*handle)->bucket = &h->buckets[idx]; - (*handle)->item = item; - } - - return item; -} - - -dx_error_t dx_hash_insert(dx_hash_t *h, dx_field_iterator_t *key, void *val, dx_hash_handle_t **handle) -{ - int exists = 0; - dx_hash_item_t *item = dx_hash_internal_insert(h, key, &exists, handle); - - if (!item) - return DX_ERROR_ALLOC; - - if (exists) - return DX_ERROR_ALREADY_EXISTS; - - item->v.val = val; - - return DX_ERROR_NONE; -} - - -dx_error_t dx_hash_insert_const(dx_hash_t *h, dx_field_iterator_t *key, const void *val, dx_hash_handle_t **handle) -{ - assert(h->is_const); - - int error = 0; - dx_hash_item_t *item = dx_hash_internal_insert(h, key, &error, handle); - - if (item) - item->v.val_const = val; - return error; -} - - -static dx_hash_item_t *dx_hash_internal_retrieve(dx_hash_t *h, dx_field_iterator_t *key) -{ - unsigned long idx = dx_hash_function(key) & h->bucket_mask; - dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); - - while (item) { - if (dx_field_iterator_equal(key, item->key)) - break; - item = item->next; - } - - return item; -} - - -dx_error_t dx_hash_retrieve(dx_hash_t *h, dx_field_iterator_t *key, void **val) -{ - dx_hash_item_t *item = dx_hash_internal_retrieve(h, key); - if (item) - *val = item->v.val; - else - *val = 0; - - return DX_ERROR_NONE; -} - - -dx_error_t dx_hash_retrieve_const(dx_hash_t *h, dx_field_iterator_t *key, const void **val) -{ - assert(h->is_const); - - dx_hash_item_t *item = dx_hash_internal_retrieve(h, key); - if (item) - *val = item->v.val_const; - else - *val = 0; - - return DX_ERROR_NONE; -} - - -dx_error_t dx_hash_remove(dx_hash_t *h, dx_field_iterator_t *key) -{ - unsigned long idx = dx_hash_function(key) & h->bucket_mask; - dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); - - while (item) { - if (dx_field_iterator_equal(key, item->key)) - break; - item = item->next; - } - - if (item) { - free(item->key); - DEQ_REMOVE(h->buckets[idx].items, item); - free_dx_hash_item_t(item); - h->size--; - return DX_ERROR_NONE; - } - - return DX_ERROR_NOT_FOUND; -} - - -void dx_hash_handle_free(dx_hash_handle_t *handle) -{ - if (handle) - free_dx_hash_handle_t(handle); -} - - -const unsigned char *dx_hash_key_by_handle(const dx_hash_handle_t *handle) -{ - if (handle) - return handle->item->key; - return 0; -} - - -dx_error_t dx_hash_remove_by_handle(dx_hash_t *h, dx_hash_handle_t *handle) -{ - unsigned char *key = 0; - dx_error_t error = dx_hash_remove_by_handle2(h, handle, &key); - if (key) - free(key); - return error; -} - - -dx_error_t dx_hash_remove_by_handle2(dx_hash_t *h, dx_hash_handle_t *handle, unsigned char **key) -{ - if (!handle) - return DX_ERROR_NOT_FOUND; - *key = handle->item->key; - DEQ_REMOVE(handle->bucket->items, handle->item); - free_dx_hash_item_t(handle->item); - h->size--; - return DX_ERROR_NONE; -} - diff --git a/qpid/extras/dispatch/src/iovec.c b/qpid/extras/dispatch/src/iovec.c deleted file mode 100644 index a4ce937333..0000000000 --- a/qpid/extras/dispatch/src/iovec.c +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/iovec.h> -#include <qpid/dispatch/alloc.h> -#include <string.h> - -#define DX_IOVEC_MAX 64 - -struct dx_iovec_t { - struct iovec iov_array[DX_IOVEC_MAX]; - struct iovec *iov; - int iov_count; -}; - - -ALLOC_DECLARE(dx_iovec_t); -ALLOC_DEFINE(dx_iovec_t); - - -dx_iovec_t *dx_iovec(int vector_count) -{ - dx_iovec_t *iov = new_dx_iovec_t(); - if (!iov) - return 0; - - memset(iov, 0, sizeof(dx_iovec_t)); - - iov->iov_count = vector_count; - if (vector_count > DX_IOVEC_MAX) - iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count); - else - iov->iov = &iov->iov_array[0]; - - return iov; -} - - -void dx_iovec_free(dx_iovec_t *iov) -{ - if (!iov) - return; - - if (iov->iov && iov->iov != &iov->iov_array[0]) - free(iov->iov); - - free_dx_iovec_t(iov); -} - - -struct iovec *dx_iovec_array(dx_iovec_t *iov) -{ - if (!iov) - return 0; - return iov->iov; -} - - -int dx_iovec_count(dx_iovec_t *iov) -{ - if (!iov) - return 0; - return iov->iov_count; -} - diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c deleted file mode 100644 index 7898e04cc9..0000000000 --- a/qpid/extras/dispatch/src/iterator.c +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/iterator.h> -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/log.h> -#include "message_private.h" -#include <stdio.h> -#include <string.h> - -//static const char *log_module = "FIELD"; - -typedef enum { - MODE_TO_END, - MODE_TO_SLASH -} parse_mode_t; - -typedef struct { - dx_buffer_t *buffer; - unsigned char *cursor; - int length; -} pointer_t; - -struct dx_field_iterator_t { - pointer_t start_pointer; - pointer_t view_start_pointer; - pointer_t pointer; - dx_iterator_view_t view; - parse_mode_t mode; - unsigned char prefix; - int at_prefix; - int view_prefix; -}; - -ALLOC_DECLARE(dx_field_iterator_t); -ALLOC_DEFINE(dx_field_iterator_t); - - -typedef enum { - STATE_START, - STATE_SLASH_LEFT, - STATE_SKIPPING_TO_NEXT_SLASH, - STATE_SCANNING, - STATE_COLON, - STATE_COLON_SLASH, - STATE_AT_NODE_ID -} state_t; - - -static char *my_area = ""; -static char *my_router = ""; - - -static void parse_address_view(dx_field_iterator_t *iter) -{ - // - // This function starts with an iterator view that is identical to - // ITER_VIEW_NO_HOST. We will now further refine the view in order - // to aid the router in looking up addresses. - // - - if (dx_field_iterator_prefix(iter, "_")) { - if (dx_field_iterator_prefix(iter, "local/")) { - iter->prefix = 'L'; - iter->at_prefix = 1; - iter->view_prefix = 1; - return; - } - - if (dx_field_iterator_prefix(iter, "topo/")) { - if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_area)) { - if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_router)) { - iter->prefix = 'L'; - iter->at_prefix = 1; - iter->view_prefix = 1; - return; - } - - iter->prefix = 'R'; - iter->at_prefix = 1; - iter->view_prefix = 1; - iter->mode = MODE_TO_SLASH; - return; - } - - iter->prefix = 'A'; - iter->at_prefix = 1; - iter->view_prefix = 1; - iter->mode = MODE_TO_SLASH; - return; - } - } - - iter->prefix = 'M'; - iter->at_prefix = 1; - iter->view_prefix = 1; -} - - -static void parse_node_view(dx_field_iterator_t *iter) -{ - // - // This function starts with an iterator view that is identical to - // ITER_VIEW_NO_HOST. We will now further refine the view in order - // to aid the router in looking up nodes. - // - - if (dx_field_iterator_prefix(iter, my_area)) { - iter->prefix = 'R'; - iter->at_prefix = 1; - iter->view_prefix = 1; - iter->mode = MODE_TO_END; - return; - } - - iter->prefix = 'A'; - iter->at_prefix = 1; - iter->view_prefix = 1; - iter->mode = MODE_TO_SLASH; -} - - -static void view_initialize(dx_field_iterator_t *iter) -{ - // - // The default behavior is for the view to *not* have a prefix. - // We'll add one if it's needed later. - // - iter->at_prefix = 0; - iter->view_prefix = 0; - iter->mode = MODE_TO_END; - - if (iter->view == ITER_VIEW_ALL) - return; - - // - // Advance to the node-id. - // - state_t state = STATE_START; - unsigned int octet; - pointer_t save_pointer = {0,0,0}; - - while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) { - octet = dx_field_iterator_octet(iter); - switch (state) { - case STATE_START : - if (octet == '/') - state = STATE_SLASH_LEFT; - else - state = STATE_SCANNING; - break; - - case STATE_SLASH_LEFT : - if (octet == '/') - state = STATE_SKIPPING_TO_NEXT_SLASH; - else - state = STATE_AT_NODE_ID; - break; - - case STATE_SKIPPING_TO_NEXT_SLASH : - if (octet == '/') - state = STATE_AT_NODE_ID; - break; - - case STATE_SCANNING : - if (octet == ':') - state = STATE_COLON; - break; - - case STATE_COLON : - if (octet == '/') { - state = STATE_COLON_SLASH; - save_pointer = iter->pointer; - } else - state = STATE_SCANNING; - break; - - case STATE_COLON_SLASH : - if (octet == '/') - state = STATE_SKIPPING_TO_NEXT_SLASH; - else { - state = STATE_AT_NODE_ID; - iter->pointer = save_pointer; - } - break; - - case STATE_AT_NODE_ID : - break; - } - } - - if (state != STATE_AT_NODE_ID) { - // - // The address string was relative, not absolute. The node-id - // is at the beginning of the string. - // - iter->pointer = iter->start_pointer; - } - - // - // Cursor is now on the first octet of the node-id - // - if (iter->view == ITER_VIEW_NODE_ID) { - iter->mode = MODE_TO_SLASH; - return; - } - - if (iter->view == ITER_VIEW_NO_HOST) { - iter->mode = MODE_TO_END; - return; - } - - if (iter->view == ITER_VIEW_ADDRESS_HASH) { - iter->mode = MODE_TO_END; - parse_address_view(iter); - return; - } - - if (iter->view == ITER_VIEW_NODE_HASH) { - iter->mode = MODE_TO_END; - parse_node_view(iter); - return; - } - - if (iter->view == ITER_VIEW_NODE_SPECIFIC) { - iter->mode = MODE_TO_END; - while (!dx_field_iterator_end(iter)) { - octet = dx_field_iterator_octet(iter); - if (octet == '/') - break; - } - return; - } -} - - -void dx_field_iterator_set_address(const char *area, const char *router) -{ - my_area = (char*) malloc(strlen(area) + 2); - strcpy(my_area, area); - strcat(my_area, "/"); - - my_router = (char*) malloc(strlen(router) + 2); - strcpy(my_router, router); - strcat(my_router, "/"); -} - - -dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view) -{ - dx_field_iterator_t *iter = new_dx_field_iterator_t(); - if (!iter) - return 0; - - iter->start_pointer.buffer = 0; - iter->start_pointer.cursor = (unsigned char*) text; - iter->start_pointer.length = strlen(text); - - dx_field_iterator_reset_view(iter, view); - - return iter; -} - - -dx_field_iterator_t* dx_field_iterator_binary(const char *text, int length, dx_iterator_view_t view) -{ - dx_field_iterator_t *iter = new_dx_field_iterator_t(); - if (!iter) - return 0; - - iter->start_pointer.buffer = 0; - iter->start_pointer.cursor = (unsigned char*) text; - iter->start_pointer.length = length; - - dx_field_iterator_reset_view(iter, view); - - return iter; -} - - -dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, int length, dx_iterator_view_t view) -{ - dx_field_iterator_t *iter = new_dx_field_iterator_t(); - if (!iter) - return 0; - - iter->start_pointer.buffer = buffer; - iter->start_pointer.cursor = dx_buffer_base(buffer) + offset; - iter->start_pointer.length = length; - - dx_field_iterator_reset_view(iter, view); - - return iter; -} - - -void dx_field_iterator_free(dx_field_iterator_t *iter) -{ - free_dx_field_iterator_t(iter); -} - - -void dx_field_iterator_reset(dx_field_iterator_t *iter) -{ - iter->pointer = iter->view_start_pointer; - iter->at_prefix = iter->view_prefix; -} - - -void dx_field_iterator_reset_view(dx_field_iterator_t *iter, dx_iterator_view_t view) -{ - iter->pointer = iter->start_pointer; - iter->view = view; - - view_initialize(iter); - - iter->view_start_pointer = iter->pointer; -} - - -unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) -{ - if (iter->at_prefix) { - iter->at_prefix = 0; - return iter->prefix; - } - - if (iter->pointer.length == 0) - return (unsigned char) 0; - - unsigned char result = *(iter->pointer.cursor); - - iter->pointer.cursor++; - iter->pointer.length--; - - if (iter->pointer.length > 0) { - if (iter->pointer.buffer) { - if (iter->pointer.cursor - dx_buffer_base(iter->pointer.buffer) == dx_buffer_size(iter->pointer.buffer)) { - iter->pointer.buffer = iter->pointer.buffer->next; - if (iter->pointer.buffer == 0) - iter->pointer.length = 0; - iter->pointer.cursor = dx_buffer_base(iter->pointer.buffer); - } - } - } - - if (iter->pointer.length && iter->mode == MODE_TO_SLASH && *(iter->pointer.cursor) == '/') - iter->pointer.length = 0; - - return result; -} - - -int dx_field_iterator_end(dx_field_iterator_t *iter) -{ - return iter->pointer.length == 0; -} - - -dx_field_iterator_t *dx_field_iterator_sub(dx_field_iterator_t *iter, uint32_t length) -{ - dx_field_iterator_t *sub = new_dx_field_iterator_t(); - if (!sub) - return 0; - - sub->start_pointer = iter->pointer; - sub->start_pointer.length = length; - sub->view_start_pointer = sub->start_pointer; - sub->pointer = sub->start_pointer; - sub->view = iter->view; - sub->mode = iter->mode; - sub->at_prefix = 0; - sub->view_prefix = 0; - - return sub; -} - - -void dx_field_iterator_advance(dx_field_iterator_t *iter, uint32_t length) -{ - // TODO - Make this more efficient. - for (uint8_t idx = 0; idx < length && !dx_field_iterator_end(iter); idx++) - dx_field_iterator_octet(iter); -} - - -uint32_t dx_field_iterator_remaining(dx_field_iterator_t *iter) -{ - return iter->pointer.length; -} - - -int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string) -{ - dx_field_iterator_reset(iter); - while (!dx_field_iterator_end(iter) && *string) { - if (*string != dx_field_iterator_octet(iter)) - return 0; - string++; - } - - return (dx_field_iterator_end(iter) && (*string == 0)); -} - - -int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix) -{ - pointer_t save_pointer = iter->pointer; - unsigned char *c = (unsigned char*) prefix; - - while(*c) { - if (*c != dx_field_iterator_octet(iter)) - break; - c++; - } - - if (*c) { - iter->pointer = save_pointer; - return 0; - } - - return 1; -} - - -unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) -{ - int length = 0; - int idx = 0; - unsigned char *copy; - - dx_field_iterator_reset(iter); - while (!dx_field_iterator_end(iter)) { - dx_field_iterator_octet(iter); - length++; - } - - dx_field_iterator_reset(iter); - copy = (unsigned char*) malloc(length + 1); - while (!dx_field_iterator_end(iter)) - copy[idx++] = dx_field_iterator_octet(iter); - copy[idx] = '\0'; - - return copy; -} - - -dx_iovec_t *dx_field_iterator_iovec(const dx_field_iterator_t *iter) -{ - assert(!iter->view_prefix); // Not supported for views with a prefix - - // - // Count the number of buffers this field straddles - // - pointer_t pointer = iter->view_start_pointer; - int bufcnt = 1; - dx_buffer_t *buf = pointer.buffer; - size_t bufsize = dx_buffer_size(buf) - (pointer.cursor - dx_buffer_base(pointer.buffer)); - ssize_t remaining = pointer.length - bufsize; - - while (remaining > 0) { - bufcnt++; - buf = buf->next; - if (!buf) - return 0; - remaining -= dx_buffer_size(buf); - } - - // - // Allocate an iovec object big enough to hold the number of buffers - // - dx_iovec_t *iov = dx_iovec(bufcnt); - if (!iov) - return 0; - - // - // Build out the io vectors with pointers to the segments of the field in buffers - // - bufcnt = 0; - buf = pointer.buffer; - bufsize = dx_buffer_size(buf) - (pointer.cursor - dx_buffer_base(pointer.buffer)); - void *base = pointer.cursor; - remaining = pointer.length; - - while (remaining > 0) { - if (bufsize > remaining) - bufsize = remaining; - dx_iovec_array(iov)[bufcnt].iov_base = base; - dx_iovec_array(iov)[bufcnt].iov_len = bufsize; - bufcnt++; - remaining -= bufsize; - if (remaining > 0) { - buf = buf->next; - base = dx_buffer_base(buf); - bufsize = dx_buffer_size(buf); - } - } - - return iov; -} - - diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c deleted file mode 100644 index 281603255d..0000000000 --- a/qpid/extras/dispatch/src/log.c +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "log_private.h" -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/threading.h> -#include <stdarg.h> -#include <stdio.h> -#include <string.h> -#include <sys/time.h> - -#define TEXT_MAX 512 -#define LIST_MAX 1000 - -typedef struct dx_log_entry_t dx_log_entry_t; - -struct dx_log_entry_t { - DEQ_LINKS(dx_log_entry_t); - const char *module; - int cls; - const char *file; - int line; - struct timeval tv; - char text[TEXT_MAX]; -}; - -ALLOC_DECLARE(dx_log_entry_t); -ALLOC_DEFINE(dx_log_entry_t); - -DEQ_DECLARE(dx_log_entry_t, dx_log_list_t); - -static int mask = LOG_INFO; -static dx_log_list_t entries; -static sys_mutex_t *log_lock = 0; - - -static const char *cls_prefix(int cls) -{ - switch (cls) { - case LOG_TRACE : return "TRACE"; - case LOG_DEBUG : return "DEBUG"; - case LOG_INFO : return "INFO"; - case LOG_NOTICE : return "NOTICE"; - case LOG_WARNING : return "WARNING"; - case LOG_ERROR : return "ERROR"; - case LOG_CRITICAL : return "CRITICAL"; - } - - return ""; -} - -void dx_log_impl(const char *module, int cls, const char *file, int line, const char *fmt, ...) -{ - if (!(cls & mask)) - return; - - dx_log_entry_t *entry = new_dx_log_entry_t(); - DEQ_ITEM_INIT(entry); - entry->module = module; - entry->cls = cls; - entry->file = file; - entry->line = line; - gettimeofday(&entry->tv, 0); - - va_list ap; - - va_start(ap, fmt); - vsnprintf(entry->text, TEXT_MAX, fmt, ap); - va_end(ap); - fprintf(stderr, "%s (%s) %s\n", module, cls_prefix(cls), entry->text); - - sys_mutex_lock(log_lock); - DEQ_INSERT_TAIL(entries, entry); - if (DEQ_SIZE(entries) > LIST_MAX) { - entry = DEQ_HEAD(entries); - DEQ_REMOVE_HEAD(entries); - free_dx_log_entry_t(entry); - } - sys_mutex_unlock(log_lock); -} - -void dx_log_set_mask(int _mask) -{ - mask = _mask; -} - - -void dx_log_initialize(void) -{ - DEQ_INIT(entries); - log_lock = sys_mutex(); -} - - -void dx_log_finalize(void) -{ -} - - diff --git a/qpid/extras/dispatch/src/log_private.h b/qpid/extras/dispatch/src/log_private.h deleted file mode 100644 index 0b36bbc9a9..0000000000 --- a/qpid/extras/dispatch/src/log_private.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef __log_private_h__ -#define __log_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/log.h> - -void dx_log_initialize(void); -void dx_log_finalize(void); - -#endif diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c deleted file mode 100644 index 86ffcefcee..0000000000 --- a/qpid/extras/dispatch/src/message.c +++ /dev/null @@ -1,879 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/amqp.h> -#include <qpid/dispatch/threading.h> -#include "message_private.h" -#include "compose_private.h" -#include <string.h> -#include <stdio.h> - -static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"; -static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70"; -static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"; -static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71"; -static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"; -static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72"; -static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"; -static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73"; -static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"; -static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74"; -static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"; -static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75"; -static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"; -static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76"; -static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77"; -static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77"; -static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"; -static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78"; -static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0"; -static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1"; -static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0"; -static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0"; - -ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0); -ALLOC_DEFINE(dx_message_content_t); - -typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length); - -static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume, buffer_process_t handler, void *context) -{ - unsigned char *local_cursor = *cursor; - dx_buffer_t *local_buffer = *buffer; - - int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); - while (consume > 0) { - if (consume < remaining) { - if (handler) - handler(context, local_cursor, consume); - local_cursor += consume; - consume = 0; - } else { - if (handler) - handler(context, local_cursor, remaining); - consume -= remaining; - local_buffer = local_buffer->next; - if (local_buffer == 0){ - local_cursor = 0; - break; - } - local_cursor = dx_buffer_base(local_buffer); - remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); - } - } - - *cursor = local_cursor; - *buffer = local_buffer; -} - - -static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer) -{ - unsigned char result = **cursor; - advance(cursor, buffer, 1, 0, 0); - return result; -} - - -static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field) -{ - unsigned char tag = next_octet(cursor, buffer); - if (!(*cursor)) return 0; - - int consume = 0; - size_t hdr_length = 1; - - switch (tag & 0xF0) { - case 0x40 : consume = 0; break; - case 0x50 : consume = 1; break; - case 0x60 : consume = 2; break; - case 0x70 : consume = 4; break; - case 0x80 : consume = 8; break; - case 0x90 : consume = 16; break; - - case 0xB0 : - case 0xD0 : - case 0xF0 : - hdr_length += 3; - consume |= ((int) next_octet(cursor, buffer)) << 24; - if (!(*cursor)) return 0; - consume |= ((int) next_octet(cursor, buffer)) << 16; - if (!(*cursor)) return 0; - consume |= ((int) next_octet(cursor, buffer)) << 8; - if (!(*cursor)) return 0; - // Fall through to the next case... - - case 0xA0 : - case 0xC0 : - case 0xE0 : - hdr_length++; - consume |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; - break; - } - - if (field && !field->parsed) { - field->buffer = *buffer; - field->offset = *cursor - dx_buffer_base(*buffer); - field->length = consume; - field->hdr_length = hdr_length; - field->parsed = 1; - } - - advance(cursor, buffer, consume, 0, 0); - return 1; -} - - -static int start_list(unsigned char **cursor, dx_buffer_t **buffer) -{ - unsigned char tag = next_octet(cursor, buffer); - if (!(*cursor)) return 0; - int length = 0; - int count = 0; - - switch (tag) { - case 0x45 : // list0 - break; - case 0xd0 : // list32 - length |= ((int) next_octet(cursor, buffer)) << 24; - if (!(*cursor)) return 0; - length |= ((int) next_octet(cursor, buffer)) << 16; - if (!(*cursor)) return 0; - length |= ((int) next_octet(cursor, buffer)) << 8; - if (!(*cursor)) return 0; - length |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; - - count |= ((int) next_octet(cursor, buffer)) << 24; - if (!(*cursor)) return 0; - count |= ((int) next_octet(cursor, buffer)) << 16; - if (!(*cursor)) return 0; - count |= ((int) next_octet(cursor, buffer)) << 8; - if (!(*cursor)) return 0; - count |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; - - break; - - case 0xc0 : // list8 - length |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; - - count |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; - break; - } - - return count; -} - - -// -// Check the buffer chain, starting at cursor to see if it matches the pattern. -// If the pattern matches, check the next tag to see if it's in the set of expected -// tags. If not, return zero. If so, set the location descriptor to the good -// tag and advance the cursor (and buffer, if needed) to the end of the matched section. -// -// If there is no match, don't advance the cursor. -// -// Return 0 if the pattern matches but the following tag is unexpected -// Return 0 if the pattern matches and the location already has a pointer (duplicate section) -// Return 1 if the pattern matches and we've advanced the cursor/buffer -// Return 1 if the pattern does not match -// -static int dx_check_and_advance(dx_buffer_t **buffer, - unsigned char **cursor, - const unsigned char *pattern, - int pattern_length, - const unsigned char *expected_tags, - dx_field_location_t *location) -{ - dx_buffer_t *test_buffer = *buffer; - unsigned char *test_cursor = *cursor; - - if (!test_cursor) - return 1; // no match - - unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer); - int idx = 0; - - while (idx < pattern_length && *test_cursor == pattern[idx]) { - idx++; - test_cursor++; - if (test_cursor == end_of_buffer) { - test_buffer = test_buffer->next; - if (test_buffer == 0) - return 1; // Pattern didn't match - test_cursor = dx_buffer_base(test_buffer); - end_of_buffer = test_cursor + dx_buffer_size(test_buffer); - } - } - - if (idx < pattern_length) - return 1; // Pattern didn't match - - // - // Pattern matched, check the tag - // - while (*expected_tags && *test_cursor != *expected_tags) - expected_tags++; - if (*expected_tags == 0) - return 0; // Unexpected tag - - if (location->parsed) - return 0; // Duplicate section - - // - // Pattern matched and tag is expected. Mark the beginning of the section. - // - location->parsed = 1; - location->buffer = test_buffer; - location->offset = test_cursor - dx_buffer_base(test_buffer); - location->length = 0; - location->hdr_length = pattern_length; - - // - // Advance the pointers to consume the whole section. - // - int pre_consume = 1; // Count the already extracted tag - int consume = 0; - unsigned char tag = next_octet(&test_cursor, &test_buffer); - if (!test_cursor) return 0; - switch (tag) { - case 0x45 : // list0 - break; - - case 0xd0 : // list32 - case 0xd1 : // map32 - case 0xb0 : // vbin32 - pre_consume += 3; - consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24; - if (!test_cursor) return 0; - consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16; - if (!test_cursor) return 0; - consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8; - if (!test_cursor) return 0; - // Fall through to the next case... - - case 0xc0 : // list8 - case 0xc1 : // map8 - case 0xa0 : // vbin8 - pre_consume += 1; - consume |= (int) next_octet(&test_cursor, &test_buffer); - if (!test_cursor) return 0; - break; - } - - location->length = pre_consume + consume; - if (consume) - advance(&test_cursor, &test_buffer, consume, 0, 0); - - *cursor = test_cursor; - *buffer = test_buffer; - return 1; -} - - -static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - - switch (field) { - case DX_FIELD_TO: - while (1) { - if (content->field_to.parsed) - return &content->field_to; - - if (content->section_message_properties.parsed == 0) - break; - - dx_buffer_t *buffer = content->section_message_properties.buffer; - unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; - - int count = start_list(&cursor, &buffer); - int result; - - if (count < 3) - break; - - result = traverse_field(&cursor, &buffer, 0); // message_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, &content->field_user_id); // user_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, &content->field_to); // to - if (!result) return 0; - } - break; - - case DX_FIELD_REPLY_TO: - while (1) { - if (content->field_reply_to.parsed) - return &content->field_reply_to; - - if (content->section_message_properties.parsed == 0) - break; - - dx_buffer_t *buffer = content->section_message_properties.buffer; - unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; - - int count = start_list(&cursor, &buffer); - int result; - - if (count < 5) - break; - - result = traverse_field(&cursor, &buffer, 0); // message_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, &content->field_user_id); // user_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, &content->field_to); // to - if (!result) return 0; - result = traverse_field(&cursor, &buffer, 0); // subject - if (!result) return 0; - result = traverse_field(&cursor, &buffer, &content->field_reply_to); // reply_to - if (!result) return 0; - } - break; - - case DX_FIELD_DELIVERY_ANNOTATION: - if (content->section_delivery_annotation.parsed) - return &content->section_delivery_annotation; - break; - - case DX_FIELD_APPLICATION_PROPERTIES: - if (content->section_application_properties.parsed) - return &content->section_application_properties; - break; - - case DX_FIELD_BODY: - if (content->section_body.parsed) - return &content->section_body; - break; - - default: - break; - } - - return 0; -} - - -dx_message_t *dx_message() -{ - dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t(); - if (!msg) - return 0; - - DEQ_ITEM_INIT(msg); - msg->content = new_dx_message_content_t(); - - if (msg->content == 0) { - free_dx_message_t((dx_message_t*) msg); - return 0; - } - - memset(msg->content, 0, sizeof(dx_message_content_t)); - msg->content->lock = sys_mutex(); - msg->content->ref_count = 1; - msg->content->parse_depth = DX_DEPTH_NONE; - msg->content->parsed_delivery_annotations = 0; - - return (dx_message_t*) msg; -} - - -void dx_message_free(dx_message_t *in_msg) -{ - uint32_t rc; - dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; - dx_message_content_t *content = msg->content; - - sys_mutex_lock(content->lock); - rc = --content->ref_count; - sys_mutex_unlock(content->lock); - - if (rc == 0) { - if (content->parsed_delivery_annotations) - dx_parse_free(content->parsed_delivery_annotations); - - dx_buffer_t *buf = DEQ_HEAD(content->buffers); - while (buf) { - DEQ_REMOVE_HEAD(content->buffers); - dx_buffer_free(buf); - buf = DEQ_HEAD(content->buffers); - } - - buf = DEQ_HEAD(content->new_delivery_annotations); - while (buf) { - DEQ_REMOVE_HEAD(content->new_delivery_annotations); - dx_buffer_free(buf); - buf = DEQ_HEAD(content->new_delivery_annotations); - } - - sys_mutex_free(content->lock); - free_dx_message_content_t(content); - } - - free_dx_message_t((dx_message_t*) msg); -} - - -dx_message_t *dx_message_copy(dx_message_t *in_msg) -{ - dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; - dx_message_content_t *content = msg->content; - dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t(); - - if (!copy) - return 0; - - DEQ_ITEM_INIT(copy); - copy->content = content; - - sys_mutex_lock(content->lock); - content->ref_count++; - sys_mutex_unlock(content->lock); - - return (dx_message_t*) copy; -} - - -dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *in_msg) -{ - dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; - dx_message_content_t *content = msg->content; - - if (content->parsed_delivery_annotations) - return content->parsed_delivery_annotations; - - dx_field_iterator_t *da = dx_message_field_iterator(in_msg, DX_FIELD_DELIVERY_ANNOTATION); - if (da == 0) - return 0; - - content->parsed_delivery_annotations = dx_parse(da); - if (content->parsed_delivery_annotations == 0 || - !dx_parse_ok(content->parsed_delivery_annotations) || - !dx_parse_is_map(content->parsed_delivery_annotations)) { - dx_field_iterator_free(da); - dx_parse_free(content->parsed_delivery_annotations); - content->parsed_delivery_annotations = 0; - return 0; - } - - dx_field_iterator_free(da); - return content->parsed_delivery_annotations; -} - - -void dx_message_set_delivery_annotations(dx_message_t *msg, dx_composed_field_t *da) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - dx_buffer_list_t *field_buffers = dx_compose_buffers(da); - - assert(DEQ_SIZE(content->new_delivery_annotations) == 0); - content->new_delivery_annotations = *field_buffers; - DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. -} - - -dx_message_t *dx_message_receive(dx_delivery_t *delivery) -{ - pn_delivery_t *pnd = dx_delivery_pn(delivery); - dx_message_pvt_t *msg = (dx_message_pvt_t*) dx_delivery_context(delivery); - pn_link_t *link = pn_delivery_link(pnd); - ssize_t rc; - dx_buffer_t *buf; - - // - // If there is no message associated with the delivery, this is the first time - // we've received anything on this delivery. Allocate a message descriptor and - // link it and the delivery together. - // - if (!msg) { - msg = (dx_message_pvt_t*) dx_message(); - dx_delivery_set_context(delivery, (void*) msg); - } - - // - // Get a reference to the tail buffer on the message. This is the buffer into which - // we will store incoming message data. If there is no buffer in the message, allocate - // an empty one and add it to the message. - // - buf = DEQ_TAIL(msg->content->buffers); - if (!buf) { - buf = dx_buffer(); - DEQ_INSERT_TAIL(msg->content->buffers, buf); - } - - while (1) { - // - // Try to receive enough data to fill the remaining space in the tail buffer. - // - rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf)); - - // - // If we receive PN_EOS, we have come to the end of the message. - // - if (rc == PN_EOS) { - // - // If the last buffer in the list is empty, remove it and free it. This - // will only happen if the size of the message content is an exact multiple - // of the buffer size. - // - if (dx_buffer_size(buf) == 0) { - DEQ_REMOVE_TAIL(msg->content->buffers); - dx_buffer_free(buf); - } - dx_delivery_set_context(delivery, 0); - return (dx_message_t*) msg; - } - - if (rc > 0) { - // - // We have received a positive number of bytes for the message. Advance - // the cursor in the buffer. - // - dx_buffer_insert(buf, rc); - - // - // If the buffer is full, allocate a new empty buffer and append it to the - // tail of the message's list. - // - if (dx_buffer_capacity(buf) == 0) { - buf = dx_buffer(); - DEQ_INSERT_TAIL(msg->content->buffers, buf); - } - } else - // - // We received zero bytes, and no PN_EOS. This means that we've received - // all of the data available up to this point, but it does not constitute - // the entire message. We'll be back later to finish it up. - // - break; - } - - return 0; -} - - -static void send_handler(void *context, const unsigned char *start, int length) -{ - pn_link_t *pnl = (pn_link_t*) context; - pn_link_send(pnl, (const char*) start, length); -} - - -void dx_message_send(dx_message_t *in_msg, dx_link_t *link) -{ - dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; - dx_message_content_t *content = msg->content; - dx_buffer_t *buf = DEQ_HEAD(content->buffers); - unsigned char *cursor; - pn_link_t *pnl = dx_link_pn(link); - - if (DEQ_SIZE(content->new_delivery_annotations) > 0) { - // - // This is the case where the delivery annotations have been modified. - // The message send must be divided into sections: The existing header; - // the new delivery annotations; the rest of the existing message. - // Note that the original delivery annotations that are still in the - // buffer chain must not be sent. - // - // Start by making sure that we've parsed the message sections through - // the delivery annotations - // - if (!dx_message_check(in_msg, DX_DEPTH_DELIVERY_ANNOTATIONS)) - return; - - // - // Send header if present - // - cursor = dx_buffer_base(buf); - if (content->section_message_header.length > 0) { - pn_link_send(pnl, (const char*) MSG_HDR_SHORT, 3); - buf = content->section_message_header.buffer; - cursor = content->section_message_header.offset + dx_buffer_base(buf); - advance(&cursor, &buf, content->section_message_header.length, send_handler, (void*) pnl); - } - - // - // Send new delivery annotations - // - dx_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations); - while (da_buf) { - pn_link_send(pnl, (char*) dx_buffer_base(da_buf), dx_buffer_size(da_buf)); - da_buf = DEQ_NEXT(da_buf); - } - - // - // Skip over replaced delivery annotations - // - if (content->section_delivery_annotation.length > 0) - advance(&cursor, &buf, - content->section_delivery_annotation.hdr_length + content->section_delivery_annotation.length, - 0, 0); - - // - // Send remaining partial buffer - // - if (buf) { - size_t len = dx_buffer_size(buf) - (cursor - dx_buffer_base(buf)); - advance(&cursor, &buf, len, send_handler, (void*) pnl); - } - - // Fall through to process the remaining buffers normally - } - - while (buf) { - pn_link_send(pnl, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); - buf = DEQ_NEXT(buf); - } -} - - -static int dx_check_field_LH(dx_message_content_t *content, - dx_message_depth_t depth, - const unsigned char *long_pattern, - const unsigned char *short_pattern, - const unsigned char *expected_tags, - dx_field_location_t *location, - int more) -{ -#define LONG 10 -#define SHORT 3 - if (depth > content->parse_depth) { - if (0 == dx_check_and_advance(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location)) - return 0; - if (0 == dx_check_and_advance(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location)) - return 0; - if (!more) - content->parse_depth = depth; - } - return 1; -} - - -static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t depth) -{ - dx_buffer_t *buffer = DEQ_HEAD(content->buffers); - - if (!buffer) - return 0; // Invalid - No data in the message - - if (depth <= content->parse_depth) - return 1; // We've already parsed at least this deep - - if (content->parse_buffer == 0) { - content->parse_buffer = buffer; - content->parse_cursor = dx_buffer_base(content->parse_buffer); - } - - if (depth == DX_DEPTH_NONE) - return 1; - - // - // MESSAGE HEADER - // - if (0 == dx_check_field_LH(content, DX_DEPTH_HEADER, - MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, &content->section_message_header, 0)) - return 0; - if (depth == DX_DEPTH_HEADER) - return 1; - - // - // DELIVERY ANNOTATION - // - if (0 == dx_check_field_LH(content, DX_DEPTH_DELIVERY_ANNOTATIONS, - DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, &content->section_delivery_annotation, 0)) - return 0; - if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS) - return 1; - - // - // MESSAGE ANNOTATION - // - if (0 == dx_check_field_LH(content, DX_DEPTH_MESSAGE_ANNOTATIONS, - MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, &content->section_message_annotation, 0)) - return 0; - if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS) - return 1; - - // - // PROPERTIES - // - if (0 == dx_check_field_LH(content, DX_DEPTH_PROPERTIES, - PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, &content->section_message_properties, 0)) - return 0; - if (depth == DX_DEPTH_PROPERTIES) - return 1; - - // - // APPLICATION PROPERTIES - // - if (0 == dx_check_field_LH(content, DX_DEPTH_APPLICATION_PROPERTIES, - APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, &content->section_application_properties, 0)) - return 0; - if (depth == DX_DEPTH_APPLICATION_PROPERTIES) - return 1; - - // - // BODY - // Note that this function expects a limited set of types in a VALUE section. This is - // not a problem for messages passing through Dispatch because through-only messages won't - // be parsed to BODY-depth. - // - if (0 == dx_check_field_LH(content, DX_DEPTH_BODY, - BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, &content->section_body, 1)) - return 0; - if (0 == dx_check_field_LH(content, DX_DEPTH_BODY, - BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, &content->section_body, 1)) - return 0; - if (0 == dx_check_field_LH(content, DX_DEPTH_BODY, - BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY, &content->section_body, 0)) - return 0; - if (depth == DX_DEPTH_BODY) - return 1; - - // - // FOOTER - // - if (0 == dx_check_field_LH(content, DX_DEPTH_ALL, - FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, &content->section_footer, 0)) - return 0; - - return 1; -} - - -int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth) -{ - dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; - dx_message_content_t *content = msg->content; - int result; - - sys_mutex_lock(content->lock); - result = dx_message_check_LH(content, depth); - sys_mutex_unlock(content->lock); - - return result; -} - - -dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field) -{ - dx_field_location_t *loc = dx_message_field_location(msg, field); - if (!loc) - return 0; - - return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); -} - - -ssize_t dx_message_field_length(dx_message_t *msg, dx_message_field_t field) -{ - dx_field_location_t *loc = dx_message_field_location(msg, field); - if (!loc) - return -1; - - return loc->length; -} - - -ssize_t dx_message_field_copy(dx_message_t *msg, dx_message_field_t field, void *buffer) -{ - dx_field_location_t *loc = dx_message_field_location(msg, field); - if (!loc) - return -1; - - dx_buffer_t *buf = loc->buffer; - size_t bufsize = dx_buffer_size(buf) - loc->offset; - void *base = dx_buffer_base(buf) + loc->offset; - size_t remaining = loc->length; - - while (remaining > 0) { - if (bufsize > remaining) - bufsize = remaining; - memcpy(buffer, base, bufsize); - buffer += bufsize; - remaining -= bufsize; - if (remaining > 0) { - buf = buf->next; - base = dx_buffer_base(buf); - bufsize = dx_buffer_size(buf); - } - } - - return loc->length; -} - - -void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers) -{ - dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0); - dx_message_content_t *content = MSG_CONTENT(msg); - - dx_compose_start_list(field); - dx_compose_insert_bool(field, 0); // durable - //dx_compose_insert_null(field); // priority - //dx_compose_insert_null(field); // ttl - //dx_compose_insert_boolean(field, 0); // first-acquirer - //dx_compose_insert_uint(field, 0); // delivery-count - dx_compose_end_list(field); - - field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field); - dx_compose_start_list(field); - dx_compose_insert_null(field); // message-id - dx_compose_insert_null(field); // user-id - dx_compose_insert_string(field, to); // to - //dx_compose_insert_null(field); // subject - //dx_compose_insert_null(field); // reply-to - //dx_compose_insert_null(field); // correlation-id - //dx_compose_insert_null(field); // content-type - //dx_compose_insert_null(field); // content-encoding - //dx_compose_insert_timestamp(field, 0); // absolute-expiry-time - //dx_compose_insert_timestamp(field, 0); // creation-time - //dx_compose_insert_null(field); // group-id - //dx_compose_insert_uint(field, 0); // group-sequence - //dx_compose_insert_null(field); // reply-to-group-id - dx_compose_end_list(field); - - if (buffers) { - field = dx_compose(DX_PERFORMATIVE_BODY_DATA, field); - dx_compose_insert_binary_buffers(field, buffers); - } - - dx_buffer_list_t *field_buffers = dx_compose_buffers(field); - content->buffers = *field_buffers; - DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. - - dx_compose_free(field); -} - - -void dx_message_compose_2(dx_message_t *msg, dx_composed_field_t *field) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - dx_buffer_list_t *field_buffers = dx_compose_buffers(field); - - content->buffers = *field_buffers; - DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. -} - diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h deleted file mode 100644 index c57cea5f0d..0000000000 --- a/qpid/extras/dispatch/src/message_private.h +++ /dev/null @@ -1,98 +0,0 @@ -#ifndef __message_private_h__ -#define __message_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/message.h> -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/threading.h> - -/** - * Architecture of the message module: - * - * +--------------+ +----------------------+ - * | | | | - * | dx_message_t |----------->| dx_message_content_t | - * | | +----->| | - * +--------------+ | +----------------------+ - * | | - * +--------------+ | | +-------------+ +-------------+ +-------------+ - * | | | +--->| dx_buffer_t |-->| dx_buffer_t |-->| dx_buffer_t |--/ - * | dx_message_t |-----+ +-------------+ +-------------+ +-------------+ - * | | - * +--------------+ - * - * The message module provides chained-fixed-sized-buffer storage of message content with multiple - * references. If a message is received and is to be queued for multiple destinations, there is only - * one copy of the message content in memory but multiple lightweight references to the content. - * - */ - -typedef struct { - dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present - size_t offset; // Offset in the buffer to the first octet - size_t length; // Length of the field or zero if unneeded - size_t hdr_length; // Length of the field's header (not included in the length of the field) - int parsed; // non-zero iff the buffer chain has been parsed to find this field -} dx_field_location_t; - - -// TODO - consider using pointers to dx_field_location_t below to save memory -// TODO - we need a second buffer list for modified annotations and header -// There are three message scenarios: -// 1) Received message is held and forwarded unmodified - single buffer list -// 2) Received message is held and modified before forwarding - two buffer lists -// 3) Message is composed internally - single buffer list -// TODO - provide a way to allocate a message without a lock for the link-routing case. -// It's likely that link-routing will cause no contention for the message content. -// - -typedef struct { - sys_mutex_t *lock; - uint32_t ref_count; // The number of messages referencing this - dx_buffer_list_t buffers; // The buffer chain containing the message - dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT) - dx_field_location_t section_message_header; // The message header list - dx_field_location_t section_delivery_annotation; // The delivery annotation map - dx_field_location_t section_message_annotation; // The message annotation map - dx_field_location_t section_message_properties; // The message properties list - dx_field_location_t section_application_properties; // The application properties list - dx_field_location_t section_body; // The message body: Data - dx_field_location_t section_footer; // The footer - dx_field_location_t field_user_id; // The string value of the user-id - dx_field_location_t field_to; // The string value of the to field - dx_field_location_t field_reply_to; // The string value of the reply_to field - dx_field_location_t body; // The body of the message - dx_buffer_t *parse_buffer; - unsigned char *parse_cursor; - dx_message_depth_t parse_depth; - dx_parsed_field_t *parsed_delivery_annotations; -} dx_message_content_t; - -typedef struct { - DEQ_LINKS(dx_message_t); // Deque linkage that overlays the dx_message_t - dx_message_content_t *content; -} dx_message_pvt_t; - -ALLOC_DECLARE(dx_message_t); -ALLOC_DECLARE(dx_message_content_t); - -#define MSG_CONTENT(m) (((dx_message_pvt_t*) m)->content) - -#endif diff --git a/qpid/extras/dispatch/src/parse.c b/qpid/extras/dispatch/src/parse.c deleted file mode 100644 index c5ecc4e498..0000000000 --- a/qpid/extras/dispatch/src/parse.c +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/parse.h> -#include <qpid/dispatch/amqp.h> - -DEQ_DECLARE(dx_parsed_field_t, dx_parsed_field_list_t); - -struct dx_parsed_field_t { - DEQ_LINKS(dx_parsed_field_t); - dx_parsed_field_t *parent; - dx_parsed_field_list_t children; - uint8_t tag; - dx_field_iterator_t *raw_iter; - const char *parse_error; -}; - -ALLOC_DECLARE(dx_parsed_field_t); -ALLOC_DEFINE(dx_parsed_field_t); - - -static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen) -{ - if (dx_field_iterator_end(iter)) - return "Insufficient Data to Determine Tag"; - *tag = dx_field_iterator_octet(iter); - *count = 0; - *length = 0; - *clen = 0; - - switch (*tag & 0xF0) { - case 0x40: *length = 0; break; - case 0x50: *length = 1; break; - case 0x60: *length = 2; break; - case 0x70: *length = 4; break; - case 0x80: *length = 8; break; - case 0x90: *length = 16; break; - case 0xB0: - case 0xD0: - case 0xF0: - *length += ((unsigned int) dx_field_iterator_octet(iter)) << 24; - *length += ((unsigned int) dx_field_iterator_octet(iter)) << 16; - *length += ((unsigned int) dx_field_iterator_octet(iter)) << 8; - // fall through to the next case - - case 0xA0: - case 0xC0: - case 0xE0: - if (dx_field_iterator_end(iter)) - return "Insufficient Data to Determine Length"; - *length += (unsigned int) dx_field_iterator_octet(iter); - break; - - default: - return "Invalid Tag - No Length Information"; - } - - switch (*tag & 0xF0) { - case 0xD0: - case 0xF0: - *count += ((unsigned int) dx_field_iterator_octet(iter)) << 24; - *count += ((unsigned int) dx_field_iterator_octet(iter)) << 16; - *count += ((unsigned int) dx_field_iterator_octet(iter)) << 8; - *clen = 3; - // fall through to the next case - - case 0xC0: - case 0xE0: - if (dx_field_iterator_end(iter)) - return "Insufficient Data to Determine Count"; - *count += (unsigned int) dx_field_iterator_octet(iter); - *clen += 1; - break; - } - - if ((*tag == DX_AMQP_MAP8 || *tag == DX_AMQP_MAP32) && (*count & 1)) - return "Odd Number of Elements in a Map"; - - if (*clen > *length) - return "Insufficient Length to Determine Count"; - - return 0; -} - - -static dx_parsed_field_t *dx_parse_internal(dx_field_iterator_t *iter, dx_parsed_field_t *p) -{ - dx_parsed_field_t *field = new_dx_parsed_field_t(); - if (!field) - return 0; - - DEQ_ITEM_INIT(field); - DEQ_INIT(field->children); - field->parent = p; - field->raw_iter = 0; - - uint32_t length; - uint32_t count; - uint32_t length_of_count; - - field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count); - - if (!field->parse_error) { - field->raw_iter = dx_field_iterator_sub(iter, length); - dx_field_iterator_advance(iter, length - length_of_count); - for (uint32_t idx = 0; idx < count; idx++) { - dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field); - DEQ_INSERT_TAIL(field->children, child); - if (!dx_parse_ok(child)) { - field->parse_error = child->parse_error; - break; - } - } - } - - return field; -} - - -dx_parsed_field_t *dx_parse(dx_field_iterator_t *iter) -{ - return dx_parse_internal(iter, 0); -} - - -void dx_parse_free(dx_parsed_field_t *field) -{ - if (!field) - return; - - assert(field->parent == 0); - if (field->raw_iter) - dx_field_iterator_free(field->raw_iter); - - dx_parsed_field_t *sub_field = DEQ_HEAD(field->children); - while (sub_field) { - dx_parsed_field_t *next = DEQ_NEXT(sub_field); - DEQ_REMOVE_HEAD(field->children); - sub_field->parent = 0; - dx_parse_free(sub_field); - sub_field = next; - } - - free_dx_parsed_field_t(field); -} - - -int dx_parse_ok(dx_parsed_field_t *field) -{ - return field->parse_error == 0; -} - - -const char *dx_parse_error(dx_parsed_field_t *field) -{ - return field->parse_error; -} - - -uint8_t dx_parse_tag(dx_parsed_field_t *field) -{ - return field->tag; -} - - -dx_field_iterator_t *dx_parse_raw(dx_parsed_field_t *field) -{ - return field->raw_iter; -} - - -uint32_t dx_parse_as_uint(dx_parsed_field_t *field) -{ - uint32_t result = 0; - - dx_field_iterator_reset(field->raw_iter); - - switch (field->tag) { - case DX_AMQP_UINT: - result |= ((uint32_t) dx_field_iterator_octet(field->raw_iter)) << 24; - result |= ((uint32_t) dx_field_iterator_octet(field->raw_iter)) << 16; - - case DX_AMQP_USHORT: - result |= ((uint32_t) dx_field_iterator_octet(field->raw_iter)) << 8; - // Fall Through... - - case DX_AMQP_UBYTE: - case DX_AMQP_SMALLUINT: - case DX_AMQP_BOOLEAN: - result |= (uint32_t) dx_field_iterator_octet(field->raw_iter); - break; - - case DX_AMQP_TRUE: - result = 1; - break; - } - - return result; -} - - -uint64_t dx_parse_as_ulong(dx_parsed_field_t *field) -{ - uint64_t result = 0; - - dx_field_iterator_reset(field->raw_iter); - - switch (field->tag) { - case DX_AMQP_ULONG: - case DX_AMQP_TIMESTAMP: - result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 56; - result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 48; - result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 40; - result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 32; - result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 24; - result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 16; - result |= ((uint64_t) dx_field_iterator_octet(field->raw_iter)) << 8; - // Fall Through... - - case DX_AMQP_SMALLULONG: - result |= (uint64_t) dx_field_iterator_octet(field->raw_iter); - // Fall Through... - - case DX_AMQP_ULONG0: - break; - } - - return result; -} - - -int32_t dx_parse_as_int(dx_parsed_field_t *field) -{ - int32_t result = 0; - - dx_field_iterator_reset(field->raw_iter); - - switch (field->tag) { - case DX_AMQP_INT: - result |= ((int32_t) dx_field_iterator_octet(field->raw_iter)) << 24; - result |= ((int32_t) dx_field_iterator_octet(field->raw_iter)) << 16; - - case DX_AMQP_SHORT: - result |= ((int32_t) dx_field_iterator_octet(field->raw_iter)) << 8; - // Fall Through... - - case DX_AMQP_BYTE: - case DX_AMQP_SMALLINT: - case DX_AMQP_BOOLEAN: - result |= (int32_t) dx_field_iterator_octet(field->raw_iter); - break; - - case DX_AMQP_TRUE: - result = 1; - break; - } - - return result; -} - - -int64_t dx_parse_as_long(dx_parsed_field_t *field) -{ - int64_t result = 0; - - dx_field_iterator_reset(field->raw_iter); - - switch (field->tag) { - case DX_AMQP_LONG: - result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 56; - result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 48; - result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 40; - result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 32; - result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 24; - result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 16; - result |= ((int64_t) dx_field_iterator_octet(field->raw_iter)) << 8; - // Fall Through... - - case DX_AMQP_SMALLLONG: - result |= (uint64_t) dx_field_iterator_octet(field->raw_iter); - break; - } - - return result; -} - - -uint32_t dx_parse_sub_count(dx_parsed_field_t *field) -{ - uint32_t count = DEQ_SIZE(field->children); - - if (field->tag == DX_AMQP_MAP8 || field->tag == DX_AMQP_MAP32) - count = count >> 1; - - return count; -} - - -dx_parsed_field_t *dx_parse_sub_key(dx_parsed_field_t *field, uint32_t idx) -{ - if (field->tag != DX_AMQP_MAP8 && field->tag != DX_AMQP_MAP32) - return 0; - - idx = idx << 1; - dx_parsed_field_t *key = DEQ_HEAD(field->children); - while (idx && key) { - idx--; - key = DEQ_NEXT(key); - } - - return key; -} - - -dx_parsed_field_t *dx_parse_sub_value(dx_parsed_field_t *field, uint32_t idx) -{ - if (field->tag == DX_AMQP_MAP8 || field->tag == DX_AMQP_MAP32) - idx = (idx << 1) + 1; - - dx_parsed_field_t *key = DEQ_HEAD(field->children); - while (idx && key) { - idx--; - key = DEQ_NEXT(key); - } - - return key; -} - - -int dx_parse_is_map(dx_parsed_field_t *field) -{ - return field->tag == DX_AMQP_MAP8 || field->tag == DX_AMQP_MAP32; -} - - -int dx_parse_is_list(dx_parsed_field_t *field) -{ - return field->tag == DX_AMQP_LIST8 || field->tag == DX_AMQP_LIST32; -} - - -int dx_parse_is_scalar(dx_parsed_field_t *field) -{ - return DEQ_SIZE(field->children) == 0; -} - - -dx_parsed_field_t *dx_parse_value_by_key(dx_parsed_field_t *field, const char *key) -{ - uint32_t count = dx_parse_sub_count(field); - - for (uint32_t idx = 0; idx < count; idx++) { - dx_parsed_field_t *sub = dx_parse_sub_key(field, idx); - if (!sub) - return 0; - - dx_field_iterator_t *iter = dx_parse_raw(sub); - if (!iter) - return 0; - - if (dx_field_iterator_equal(iter, (const unsigned char*) key)) { - return dx_parse_sub_value(field, idx); - } - } - - return 0; -} diff --git a/qpid/extras/dispatch/src/posix/threading.c b/qpid/extras/dispatch/src/posix/threading.c deleted file mode 100644 index 1d7760ca88..0000000000 --- a/qpid/extras/dispatch/src/posix/threading.c +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/threading.h> -#include <qpid/dispatch/ctools.h> -#include <stdio.h> -#include <pthread.h> - -struct sys_mutex_t { - pthread_mutex_t mutex; - int acquired; -}; - -sys_mutex_t *sys_mutex(void) -{ - sys_mutex_t *mutex = NEW(sys_mutex_t); - pthread_mutex_init(&(mutex->mutex), 0); - mutex->acquired = 0; - return mutex; -} - - -void sys_mutex_free(sys_mutex_t *mutex) -{ - assert(!mutex->acquired); - pthread_mutex_destroy(&(mutex->mutex)); - free(mutex); -} - - -void sys_mutex_lock(sys_mutex_t *mutex) -{ - pthread_mutex_lock(&(mutex->mutex)); - assert(!mutex->acquired); - mutex->acquired++; -} - - -void sys_mutex_unlock(sys_mutex_t *mutex) -{ - mutex->acquired--; - assert(!mutex->acquired); - pthread_mutex_unlock(&(mutex->mutex)); -} - - -struct sys_cond_t { - pthread_cond_t cond; -}; - - -sys_cond_t *sys_cond(void) -{ - sys_cond_t *cond = NEW(sys_cond_t); - pthread_cond_init(&(cond->cond), 0); - return cond; -} - - -void sys_cond_free(sys_cond_t *cond) -{ - pthread_cond_destroy(&(cond->cond)); - free(cond); -} - - -void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex) -{ - assert(held_mutex->acquired); - held_mutex->acquired--; - pthread_cond_wait(&(cond->cond), &(held_mutex->mutex)); - held_mutex->acquired++; -} - - -void sys_cond_signal(sys_cond_t *cond) -{ - pthread_cond_signal(&(cond->cond)); -} - - -void sys_cond_signal_all(sys_cond_t *cond) -{ - pthread_cond_broadcast(&(cond->cond)); -} - - -struct sys_thread_t { - pthread_t thread; -}; - -sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg) -{ - sys_thread_t *thread = NEW(sys_thread_t); - pthread_create(&(thread->thread), 0, run_function, arg); - return thread; -} - - -void sys_thread_free(sys_thread_t *thread) -{ - free(thread); -} - - -void sys_thread_join(sys_thread_t *thread) -{ - pthread_join(thread->thread, 0); -} diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c deleted file mode 100644 index a6e35018c6..0000000000 --- a/qpid/extras/dispatch/src/python_embedded.c +++ /dev/null @@ -1,678 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/python_embedded.h> -#include <qpid/dispatch/threading.h> -#include <qpid/dispatch/log.h> -#include <qpid/dispatch/amqp.h> -#include <qpid/dispatch/alloc.h> -#include <qpid/dispatch/router.h> - - -//=============================================================================== -// Control Functions -//=============================================================================== - -static dx_dispatch_t *dispatch = 0; -static uint32_t ref_count = 0; -static sys_mutex_t *lock = 0; -static char *log_module = "PYTHON"; -static PyObject *dispatch_module = 0; - -static void dx_python_setup(); - - -void dx_python_initialize(dx_dispatch_t *dx) -{ - dispatch = dx; - lock = sys_mutex(); -} - - -void dx_python_finalize() -{ - assert(ref_count == 0); - sys_mutex_free(lock); -} - - -void dx_python_start() -{ - sys_mutex_lock(lock); - if (ref_count == 0) { - Py_Initialize(); - dx_python_setup(); - dx_log(log_module, LOG_TRACE, "Embedded Python Interpreter Initialized"); - } - ref_count++; - sys_mutex_unlock(lock); -} - - -void dx_python_stop() -{ - sys_mutex_lock(lock); - ref_count--; - if (ref_count == 0) { - Py_DECREF(dispatch_module); - dispatch_module = 0; - Py_Finalize(); - dx_log(log_module, LOG_TRACE, "Embedded Python Interpreter Shut Down"); - } - sys_mutex_unlock(lock); -} - - -PyObject *dx_python_module() -{ - assert(dispatch_module); - return dispatch_module; -} - - -//=============================================================================== -// Data Conversion Functions -//=============================================================================== - -static PyObject *parsed_to_py_string(dx_parsed_field_t *field) -{ - switch (dx_parse_tag(field)) { - case DX_AMQP_VBIN8: - case DX_AMQP_VBIN32: - case DX_AMQP_STR8_UTF8: - case DX_AMQP_STR32_UTF8: - case DX_AMQP_SYM8: - case DX_AMQP_SYM32: - break; - default: - return Py_None; - } - -#define SHORT_BUF 1024 - uint8_t short_buf[SHORT_BUF]; - PyObject *result; - dx_field_iterator_t *raw = dx_parse_raw(field); - dx_field_iterator_reset(raw); - uint32_t length = dx_field_iterator_remaining(raw); - uint8_t *buffer = short_buf; - uint8_t *ptr; - int alloc = 0; - - if (length > SHORT_BUF) { - alloc = 1; - buffer = (uint8_t*) malloc(length); - } - - ptr = buffer; - while (!dx_field_iterator_end(raw)) - *(ptr++) = dx_field_iterator_octet(raw); - result = PyString_FromStringAndSize((char*) buffer, ptr - buffer); - if (alloc) - free(buffer); - - return result; -} - - -void dx_py_to_composed(PyObject *value, dx_composed_field_t *field) -{ - if (PyBool_Check(value)) - dx_compose_insert_bool(field, PyInt_AS_LONG(value) ? 1 : 0); - - //else if (PyFloat_Check(value)) - // dx_compose_insert_double(field, PyFloat_AS_DOUBLE(value)); - - else if (PyInt_Check(value)) - dx_compose_insert_long(field, (int64_t) PyInt_AS_LONG(value)); - - else if (PyLong_Check(value)) - dx_compose_insert_long(field, (int64_t) PyLong_AsLongLong(value)); - - else if (PyString_Check(value)) - dx_compose_insert_string(field, PyString_AS_STRING(value)); - - else if (PyDict_Check(value)) { - Py_ssize_t iter = 0; - PyObject *key; - PyObject *val; - dx_compose_start_map(field); - while (PyDict_Next(value, &iter, &key, &val)) { - dx_py_to_composed(key, field); - dx_py_to_composed(val, field); - } - dx_compose_end_map(field); - } - - else if (PyList_Check(value)) { - Py_ssize_t count = PyList_Size(value); - dx_compose_start_list(field); - for (Py_ssize_t idx = 0; idx < count; idx++) { - PyObject *item = PyList_GetItem(value, idx); - dx_py_to_composed(item, field); - } - dx_compose_end_list(field); - } - - else if (PyTuple_Check(value)) { - Py_ssize_t count = PyTuple_Size(value); - dx_compose_start_list(field); - for (Py_ssize_t idx = 0; idx < count; idx++) { - PyObject *item = PyTuple_GetItem(value, idx); - dx_py_to_composed(item, field); - } - dx_compose_end_list(field); - } -} - - -PyObject *dx_field_to_py(dx_parsed_field_t *field) -{ - PyObject *result = Py_None; - uint8_t tag = dx_parse_tag(field); - - switch (tag) { - case DX_AMQP_NULL: - result = Py_None; - break; - - case DX_AMQP_BOOLEAN: - case DX_AMQP_TRUE: - case DX_AMQP_FALSE: - result = dx_parse_as_uint(field) ? Py_True : Py_False; - break; - - case DX_AMQP_UBYTE: - case DX_AMQP_USHORT: - case DX_AMQP_UINT: - case DX_AMQP_SMALLUINT: - case DX_AMQP_UINT0: - result = PyInt_FromLong((long) dx_parse_as_uint(field)); - break; - - case DX_AMQP_ULONG: - case DX_AMQP_SMALLULONG: - case DX_AMQP_ULONG0: - case DX_AMQP_TIMESTAMP: - result = PyLong_FromUnsignedLongLong((unsigned PY_LONG_LONG) dx_parse_as_ulong(field)); - break; - - case DX_AMQP_BYTE: - case DX_AMQP_SHORT: - case DX_AMQP_INT: - case DX_AMQP_SMALLINT: - result = PyInt_FromLong((long) dx_parse_as_int(field)); - break; - - case DX_AMQP_LONG: - case DX_AMQP_SMALLLONG: - result = PyLong_FromUnsignedLongLong((unsigned PY_LONG_LONG) dx_parse_as_long(field)); - break; - - case DX_AMQP_FLOAT: - case DX_AMQP_DOUBLE: - case DX_AMQP_DECIMAL32: - case DX_AMQP_DECIMAL64: - case DX_AMQP_DECIMAL128: - case DX_AMQP_UTF32: - case DX_AMQP_UUID: - break; - - case DX_AMQP_VBIN8: - case DX_AMQP_VBIN32: - case DX_AMQP_STR8_UTF8: - case DX_AMQP_STR32_UTF8: - case DX_AMQP_SYM8: - case DX_AMQP_SYM32: - result = parsed_to_py_string(field); - break; - - case DX_AMQP_LIST0: - case DX_AMQP_LIST8: - case DX_AMQP_LIST32: { - uint32_t count = dx_parse_sub_count(field); - result = PyList_New(count); - for (uint32_t idx = 0; idx < count; idx++) { - dx_parsed_field_t *sub = dx_parse_sub_value(field, idx); - PyObject *pysub = dx_field_to_py(sub); - if (pysub == 0) - return 0; - PyList_SetItem(result, idx, pysub); - } - break; - } - case DX_AMQP_MAP8: - case DX_AMQP_MAP32: { - uint32_t count = dx_parse_sub_count(field); - result = PyDict_New(); - for (uint32_t idx = 0; idx < count; idx++) { - dx_parsed_field_t *key = dx_parse_sub_key(field, idx); - dx_parsed_field_t *val = dx_parse_sub_value(field, idx); - PyObject *pykey = parsed_to_py_string(key); - PyObject *pyval = dx_field_to_py(val); - if (pyval == 0) - return 0; - PyDict_SetItem(result, pykey, pyval); - Py_DECREF(pykey); - Py_DECREF(pyval); - } - break; - } - case DX_AMQP_ARRAY8: - case DX_AMQP_ARRAY32: - break; - } - - return result; -} - - -//=============================================================================== -// Logging Object -//=============================================================================== - -typedef struct { - PyObject_HEAD - PyObject *module_name; -} LogAdapter; - - -static int LogAdapter_init(LogAdapter *self, PyObject *args, PyObject *kwds) -{ - const char *text; - if (!PyArg_ParseTuple(args, "s", &text)) - return -1; - - self->module_name = PyString_FromString(text); - return 0; -} - - -static void LogAdapter_dealloc(LogAdapter* self) -{ - Py_XDECREF(self->module_name); - self->ob_type->tp_free((PyObject*)self); -} - - -static PyObject* dx_python_log(PyObject *self, PyObject *args) -{ - int level; - const char* text; - - if (!PyArg_ParseTuple(args, "is", &level, &text)) - return 0; - - LogAdapter *self_ptr = (LogAdapter*) self; - char *logmod = PyString_AS_STRING(self_ptr->module_name); - - dx_log(logmod, level, text); - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyMethodDef LogAdapter_methods[] = { - {"log", dx_python_log, METH_VARARGS, "Emit a Log Line"}, - {0, 0, 0, 0} -}; - -static PyMethodDef empty_methods[] = { - {0, 0, 0, 0} -}; - -static PyTypeObject LogAdapterType = { - PyObject_HEAD_INIT(0) - 0, /* ob_size*/ - "dispatch.LogAdapter", /* tp_name*/ - sizeof(LogAdapter), /* tp_basicsize*/ - 0, /* tp_itemsize*/ - (destructor)LogAdapter_dealloc, /* tp_dealloc*/ - 0, /* tp_print*/ - 0, /* tp_getattr*/ - 0, /* tp_setattr*/ - 0, /* tp_compare*/ - 0, /* tp_repr*/ - 0, /* tp_as_number*/ - 0, /* tp_as_sequence*/ - 0, /* tp_as_mapping*/ - 0, /* tp_hash */ - 0, /* tp_call*/ - 0, /* tp_str*/ - 0, /* tp_getattro*/ - 0, /* tp_setattro*/ - 0, /* tp_as_buffer*/ - Py_TPFLAGS_DEFAULT, /* tp_flags*/ - "Dispatch Log Adapter", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - LogAdapter_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - (initproc)LogAdapter_init, /* tp_init */ - 0, /* tp_alloc */ - 0, /* tp_new */ - 0, /* tp_free */ - 0, /* tp_is_gc */ - 0, /* tp_bases */ - 0, /* tp_mro */ - 0, /* tp_cache */ - 0, /* tp_subclasses */ - 0, /* tp_weaklist */ - 0, /* tp_del */ - 0 /* tp_version_tag */ -}; - - -//=============================================================================== -// Message IO Object -//=============================================================================== - -typedef struct { - PyObject_HEAD - PyObject *handler; - PyObject *handler_rx_call; - dx_dispatch_t *dx; - Py_ssize_t addr_count; - dx_address_t **addrs; -} IoAdapter; - - -static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id) -{ - IoAdapter *self = (IoAdapter*) context; - - // - // Parse the message through the body and exit if the message is not well formed. - // - if (!dx_message_check(msg, DX_DEPTH_BODY)) - return; - - // - // Get an iterator for the application-properties. Exit if the message has none. - // - dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES); - if (ap == 0) - return; - - // - // Try to get a map-view of the application-properties. - // - dx_parsed_field_t *ap_map = dx_parse(ap); - if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) { - dx_field_iterator_free(ap); - dx_parse_free(ap_map); - return; - } - - // - // Get an iterator for the body. Exit if the message has none. - // - dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY); - if (body == 0) { - dx_field_iterator_free(ap); - dx_parse_free(ap_map); - return; - } - - // - // Try to get a map-view of the body. - // - dx_parsed_field_t *body_map = dx_parse(body); - if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) { - dx_field_iterator_free(ap); - dx_field_iterator_free(body); - dx_parse_free(ap_map); - dx_parse_free(body_map); - return; - } - - sys_mutex_lock(lock); - PyObject *pAP = dx_field_to_py(ap_map); - PyObject *pBody = dx_field_to_py(body_map); - - PyObject *pArgs = PyTuple_New(3); - PyTuple_SetItem(pArgs, 0, pAP); - PyTuple_SetItem(pArgs, 1, pBody); - PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) link_id)); - - PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs); - Py_DECREF(pArgs); - if (pValue) { - Py_DECREF(pValue); - } - sys_mutex_unlock(lock); - - dx_field_iterator_free(ap); - dx_field_iterator_free(body); - dx_parse_free(ap_map); - dx_parse_free(body_map); -} - - -static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) -{ - PyObject *addrs; - if (!PyArg_ParseTuple(args, "OO", &self->handler, &addrs)) - return -1; - - self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive"); - if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call)) - return -1; - - if (!PyTuple_Check(addrs)) - return -1; - - Py_INCREF(self->handler); - Py_INCREF(self->handler_rx_call); - self->dx = dispatch; - self->addr_count = PyTuple_Size(addrs); - self->addrs = NEW_PTR_ARRAY(dx_address_t, self->addr_count); - for (Py_ssize_t idx = 0; idx < self->addr_count; idx++) - self->addrs[idx] = dx_router_register_address(self->dx, - PyString_AS_STRING(PyTuple_GetItem(addrs, idx)), - dx_io_rx_handler, self); - return 0; -} - - -static void IoAdapter_dealloc(IoAdapter* self) -{ - for (Py_ssize_t idx = 0; idx < self->addr_count; idx++) - dx_router_unregister_address(self->addrs[idx]); - free(self->addrs); - Py_DECREF(self->handler); - Py_DECREF(self->handler_rx_call); - self->ob_type->tp_free((PyObject*)self); -} - - -static PyObject* dx_python_send(PyObject *self, PyObject *args) -{ - IoAdapter *ioa = (IoAdapter*) self; - dx_composed_field_t *field = 0; - const char *address; - PyObject *app_properties; - PyObject *body; - - if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body)) - return 0; - - field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field); - dx_compose_start_map(field); - - dx_compose_insert_string(field, DX_DA_INGRESS); - dx_compose_insert_string(field, dx_router_id(ioa->dx)); - - dx_compose_insert_string(field, DX_DA_TRACE); - dx_compose_start_list(field); - dx_compose_insert_string(field, dx_router_id(ioa->dx)); - dx_compose_end_list(field); - - dx_compose_end_map(field); - - field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field); - dx_compose_start_list(field); - dx_compose_insert_null(field); // message-id - dx_compose_insert_null(field); // user-id - dx_compose_insert_string(field, address); // to - dx_compose_end_list(field); - - field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field); - dx_py_to_composed(app_properties, field); - - field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); - dx_py_to_composed(body, field); - - dx_message_t *msg = dx_message(); - dx_message_compose_2(msg, field); - dx_router_send2(ioa->dx, address, msg); - dx_message_free(msg); - dx_compose_free(field); - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyMethodDef IoAdapter_methods[] = { - {"send", dx_python_send, METH_VARARGS, "Send a Message"}, - {0, 0, 0, 0} -}; - - -static PyTypeObject IoAdapterType = { - PyObject_HEAD_INIT(0) - 0, /* ob_size*/ - "dispatch.IoAdapter", /* tp_name*/ - sizeof(IoAdapter), /* tp_basicsize*/ - 0, /* tp_itemsize*/ - (destructor)IoAdapter_dealloc, /* tp_dealloc*/ - 0, /* tp_print*/ - 0, /* tp_getattr*/ - 0, /* tp_setattr*/ - 0, /* tp_compare*/ - 0, /* tp_repr*/ - 0, /* tp_as_number*/ - 0, /* tp_as_sequence*/ - 0, /* tp_as_mapping*/ - 0, /* tp_hash */ - 0, /* tp_call*/ - 0, /* tp_str*/ - 0, /* tp_getattro*/ - 0, /* tp_setattro*/ - 0, /* tp_as_buffer*/ - Py_TPFLAGS_DEFAULT, /* tp_flags*/ - "Dispatch IO Adapter", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - IoAdapter_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - (initproc)IoAdapter_init, /* tp_init */ - 0, /* tp_alloc */ - 0, /* tp_new */ - 0, /* tp_free */ - 0, /* tp_is_gc */ - 0, /* tp_bases */ - 0, /* tp_mro */ - 0, /* tp_cache */ - 0, /* tp_subclasses */ - 0, /* tp_weaklist */ - 0, /* tp_del */ - 0 /* tp_version_tag */ -}; - - -//=============================================================================== -// Initialization of Modules and Types -//=============================================================================== - -static void dx_register_log_constant(PyObject *module, const char *name, uint32_t value) -{ - PyObject *const_object = PyInt_FromLong((long) value); - Py_INCREF(const_object); - PyModule_AddObject(module, name, const_object); -} - - -static void dx_python_setup() -{ - LogAdapterType.tp_new = PyType_GenericNew; - IoAdapterType.tp_new = PyType_GenericNew; - if ((PyType_Ready(&LogAdapterType) < 0) || (PyType_Ready(&IoAdapterType) < 0)) { - PyErr_Print(); - dx_log(log_module, LOG_ERROR, "Unable to initialize Adapters"); - assert(0); - } else { - PyObject *m = Py_InitModule3("dispatch", empty_methods, "Dispatch Adapter Module"); - - // - // Add LogAdapter - // - PyTypeObject *laType = &LogAdapterType; - Py_INCREF(laType); - PyModule_AddObject(m, "LogAdapter", (PyObject*) &LogAdapterType); - - dx_register_log_constant(m, "LOG_TRACE", LOG_TRACE); - dx_register_log_constant(m, "LOG_DEBUG", LOG_DEBUG); - dx_register_log_constant(m, "LOG_INFO", LOG_INFO); - dx_register_log_constant(m, "LOG_NOTICE", LOG_NOTICE); - dx_register_log_constant(m, "LOG_WARNING", LOG_WARNING); - dx_register_log_constant(m, "LOG_ERROR", LOG_ERROR); - dx_register_log_constant(m, "LOG_CRITICAL", LOG_CRITICAL); - - // - PyTypeObject *ioaType = &IoAdapterType; - Py_INCREF(ioaType); - PyModule_AddObject(m, "IoAdapter", (PyObject*) &IoAdapterType); - - Py_INCREF(m); - dispatch_module = m; - } -} - -void dx_python_lock() -{ - sys_mutex_lock(lock); -} - -void dx_python_unlock() -{ - sys_mutex_unlock(lock); -} - diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c deleted file mode 100644 index 4a70dc4ede..0000000000 --- a/qpid/extras/dispatch/src/router_agent.c +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/python_embedded.h> -#include <stdio.h> -#include <string.h> -#include <stdbool.h> -#include <stdlib.h> -#include <qpid/dispatch.h> -#include <qpid/dispatch/agent.h> -#include "dispatch_private.h" -#include "router_private.h" - -//static char *module = "router.agent"; - -#define DX_ROUTER_CLASS_ROUTER 1 -#define DX_ROUTER_CLASS_LINK 2 -#define DX_ROUTER_CLASS_NODE 3 -#define DX_ROUTER_CLASS_ADDRESS 4 - -typedef struct dx_router_class_t { - dx_router_t *router; - int class_id; -} dx_router_class_t; - - -static void dx_router_schema_handler(void *context, void *correlator) -{ -} - - -static const char *dx_router_addr_text(dx_address_t *addr) -{ - if (addr) { - const unsigned char *text = dx_hash_key_by_handle(addr->hash_handle); - if (text) - return (const char*) text; - } - return 0; -} - - -static void dx_router_query_router(dx_router_t *router, void *cor) -{ - dx_agent_value_string(cor, "area", router->router_area); - dx_agent_value_string(cor, "router_id", router->router_id); - - sys_mutex_lock(router->lock); - dx_agent_value_uint(cor, "addr_count", DEQ_SIZE(router->addrs)); - dx_agent_value_uint(cor, "link_count", DEQ_SIZE(router->links)); - dx_agent_value_uint(cor, "node_count", DEQ_SIZE(router->routers)); - sys_mutex_unlock(router->lock); - - dx_agent_value_complete(cor, 0); -} - - -static void dx_router_query_link(dx_router_t *router, void *cor) -{ - sys_mutex_lock(router->lock); - dx_router_link_t *link = DEQ_HEAD(router->links); - const char *link_type = "?"; - const char *link_dir; - - while (link) { - dx_agent_value_uint(cor, "index", link->mask_bit); - switch (link->link_type) { - case DX_LINK_ENDPOINT: link_type = "endpoint"; break; - case DX_LINK_ROUTER: link_type = "inter-router"; break; - case DX_LINK_AREA: link_type = "inter-area"; break; - } - dx_agent_value_string(cor, "link-type", link_type); - - if (link->link_direction == DX_INCOMING) - link_dir = "in"; - else - link_dir = "out"; - dx_agent_value_string(cor, "link-dir", link_dir); - - const char *text = dx_router_addr_text(link->owning_addr); - if (text) - dx_agent_value_string(cor, "owning-addr", text); - else - dx_agent_value_null(cor, "owning-addr"); - - link = DEQ_NEXT(link); - dx_agent_value_complete(cor, link != 0); - } - sys_mutex_unlock(router->lock); -} - - -static void dx_router_query_node(dx_router_t *router, void *cor) -{ - sys_mutex_lock(router->lock); - dx_router_node_t *node = DEQ_HEAD(router->routers); - while (node) { - dx_agent_value_uint(cor, "index", node->mask_bit); - dx_agent_value_string(cor, "addr", dx_router_addr_text(node->owning_addr)); - if (node->next_hop) - dx_agent_value_uint(cor, "next-hop", node->next_hop->mask_bit); - else - dx_agent_value_null(cor, "next-hop"); - if (node->peer_link) - dx_agent_value_uint(cor, "router-link", node->peer_link->mask_bit); - else - dx_agent_value_null(cor, "router-link"); - node = DEQ_NEXT(node); - dx_agent_value_complete(cor, node != 0); - } - sys_mutex_unlock(router->lock); -} - - -static void dx_router_query_address(dx_router_t *router, void *cor) -{ - sys_mutex_lock(router->lock); - dx_address_t *addr = DEQ_HEAD(router->addrs); - while (addr) { - dx_agent_value_string(cor, "addr", dx_router_addr_text(addr)); - dx_agent_value_boolean(cor, "in-process", addr->handler != 0); - dx_agent_value_uint(cor, "subscriber-count", DEQ_SIZE(addr->rlinks)); - dx_agent_value_uint(cor, "remote-count", DEQ_SIZE(addr->rnodes)); - dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress); - dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress); - dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit); - dx_agent_value_uint(cor, "deliveries-to-container", addr->deliveries_to_container); - dx_agent_value_uint(cor, "deliveries-from-container", addr->deliveries_from_container); - addr = DEQ_NEXT(addr); - dx_agent_value_complete(cor, addr != 0); - } - sys_mutex_unlock(router->lock); -} - - -static void dx_router_query_handler(void* context, const char *id, void *correlator) -{ - dx_router_class_t *cls = (dx_router_class_t*) context; - switch (cls->class_id) { - case DX_ROUTER_CLASS_ROUTER: dx_router_query_router(cls->router, correlator); break; - case DX_ROUTER_CLASS_LINK: dx_router_query_link(cls->router, correlator); break; - case DX_ROUTER_CLASS_NODE: dx_router_query_node(cls->router, correlator); break; - case DX_ROUTER_CLASS_ADDRESS: dx_router_query_address(cls->router, correlator); break; - } -} - - -static dx_agent_class_t *dx_router_setup_class(dx_router_t *router, const char *fqname, int id) -{ - dx_router_class_t *cls = NEW(dx_router_class_t); - cls->router = router; - cls->class_id = id; - - return dx_agent_register_class(router->dx, fqname, cls, - dx_router_schema_handler, - dx_router_query_handler); -} - - -void dx_router_agent_setup(dx_router_t *router) -{ - router->class_router = - dx_router_setup_class(router, "org.apache.qpid.dispatch.router", DX_ROUTER_CLASS_ROUTER); - router->class_link = - dx_router_setup_class(router, "org.apache.qpid.dispatch.router.link", DX_ROUTER_CLASS_LINK); - router->class_node = - dx_router_setup_class(router, "org.apache.qpid.dispatch.router.node", DX_ROUTER_CLASS_NODE); - router->class_address = - dx_router_setup_class(router, "org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS); -} - - -void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field) -{ - dx_router_t *router = dx->router; - char temp[1000]; // FIXME - - sys_mutex_lock(router->lock); - dx_router_node_t *rnode = DEQ_HEAD(router->routers); - while (rnode) { - strcpy(temp, "amqp:/_topo/"); - strcat(temp, router->router_area); - strcat(temp, "/"); - const unsigned char* addr = dx_hash_key_by_handle(rnode->owning_addr->hash_handle); - strcat(temp, &((char*) addr)[1]); - strcat(temp, "/$management"); - dx_compose_insert_string(field, temp); - rnode = DEQ_NEXT(rnode); - } - sys_mutex_unlock(router->lock); -} - diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c deleted file mode 100644 index a810462f1b..0000000000 --- a/qpid/extras/dispatch/src/router_node.c +++ /dev/null @@ -1,1323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/python_embedded.h> -#include <stdio.h> -#include <string.h> -#include <stdbool.h> -#include <stdlib.h> -#include <qpid/dispatch.h> -#include "dispatch_private.h" -#include "router_private.h" - -static char *module = "ROUTER"; - -static char *router_role = "inter-router"; -static char *local_prefix = "_local/"; -static char *topo_prefix = "_topo/"; -static char *direct_prefix; -static char *node_id; - -/** - * Address Types and Processing: - * - * Address Hash Key onReceive - * =================================================================== - * _local/<local> L<local> handler - * _topo/<area>/<router>/<local> A<area> forward - * _topo/<my-area>/<router>/<local> R<router> forward - * _topo/<my-area>/<my-router>/<local> L<local> handler - * _topo/<area>/all/<local> A<area> forward - * _topo/<my-area>/all/<local> L<local> forward handler - * _topo/all/all/<local> L<local> forward handler - * <mobile> M<mobile> forward handler - */ - -ALLOC_DEFINE(dx_routed_event_t); -ALLOC_DEFINE(dx_router_link_t); -ALLOC_DEFINE(dx_router_node_t); -ALLOC_DEFINE(dx_router_ref_t); -ALLOC_DEFINE(dx_router_link_ref_t); -ALLOC_DEFINE(dx_address_t); -ALLOC_DEFINE(dx_router_conn_t); - - -void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) -{ - dx_router_link_ref_t *ref = new_dx_router_link_ref_t(); - DEQ_ITEM_INIT(ref); - ref->link = link; - link->ref = ref; - DEQ_INSERT_TAIL(*ref_list, ref); -} - - -void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) -{ - if (link->ref) { - DEQ_REMOVE(*ref_list, link->ref); - free_dx_router_link_ref_t(link->ref); - link->ref = 0; - } -} - - -void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode) -{ - dx_router_ref_t *ref = new_dx_router_ref_t(); - DEQ_ITEM_INIT(ref); - ref->router = rnode; - rnode->ref_count++; - DEQ_INSERT_TAIL(*ref_list, ref); -} - - -void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode) -{ - dx_router_ref_t *ref = DEQ_HEAD(*ref_list); - while (ref) { - if (ref->router == rnode) { - DEQ_REMOVE(*ref_list, ref); - free_dx_router_ref_t(ref); - rnode->ref_count--; - break; - } - ref = DEQ_NEXT(ref); - } -} - - -/** - * Check an address to see if it no longer has any associated destinations. - * Depending on its policy, the address may be eligible for being closed out - * (i.e. Logging its terminal statistics and freeing its resources). - */ -void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local) -{ - if (addr == 0) - return; - - unsigned char *key = 0; - int to_delete = 0; - int no_more_locals = 0; - - sys_mutex_lock(router->lock); - - // - // If the address has no handlers or destinations, it should be deleted. - // - if (addr->handler == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0) - to_delete = 1; - - // - // If we have just removed a local linkage and it was the last local linkage, - // we need to notify the router module that there is no longer a local - // presence of this address. - // - if (was_local && DEQ_SIZE(addr->rlinks) == 0) - no_more_locals = 1; - - if (to_delete) { - // - // Delete the address but grab the hash key so we can use it outside the - // critical section. - // - dx_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key); - DEQ_REMOVE(router->addrs, addr); - dx_hash_handle_free(addr->hash_handle); - free_dx_address_t(addr); - } - - // - // If we're not deleting but there are no more locals, get a copy of the hash key. - // - if (!to_delete && no_more_locals) { - const unsigned char *key_const = dx_hash_key_by_handle(addr->hash_handle); - key = (unsigned char*) malloc(strlen((const char*) key_const) + 1); - strcpy((char*) key, (const char*) key_const); - } - - sys_mutex_unlock(router->lock); - - // - // If the address is mobile-class and it was just removed from a local link, - // tell the router module that it is no longer attached locally. - // - if (no_more_locals && key && key[0] == 'M') - dx_router_mobile_removed(router, (const char*) key); - - // - // Free the key that was not freed by the hash table. - // - if (key) - free(key); -} - - -/** - * Determine whether a connection is configured in the inter-router role. - */ -static int dx_router_connection_is_inter_router(const dx_connection_t *conn) -{ - if (!conn) - return 0; - - const dx_server_config_t *cf = dx_connection_config(conn); - if (cf && strcmp(cf->role, router_role) == 0) - return 1; - - return 0; -} - - -/** - * Determine whether a terminus has router capability - */ -static int dx_router_terminus_is_router(pn_terminus_t *term) -{ - pn_data_t *cap = pn_terminus_capabilities(term); - - pn_data_rewind(cap); - pn_data_next(cap); - if (cap && pn_data_type(cap) == PN_SYMBOL) { - pn_bytes_t sym = pn_data_get_symbol(cap); - if (sym.size == strlen(DX_CAPABILITY_ROUTER) && - strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0) - return 1; - } - - return 0; -} - - -static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length) -{ - static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; - char discriminator[11]; - long int rnd = random(); - int idx; - - for (idx = 0; idx < 6; idx++) - discriminator[idx] = table[(rnd >> (idx * 6)) & 63]; - discriminator[idx] = '\0'; - - snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator); -} - - -static int dx_router_find_mask_bit_LH(dx_router_t *router, dx_link_t *link) -{ - dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link); - if (shared) - return shared->mask_bit; - - int mask_bit; - if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { - dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); - } else { - dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); - return -1; - } - - shared = new_dx_router_conn_t(); - shared->mask_bit = mask_bit; - dx_link_set_conn_context(link, shared); - return mask_bit; -} - - -/** - * Outgoing Link Writable Handler - */ -static int router_writable_link_handler(void* context, dx_link_t *link) -{ - dx_router_t *router = (dx_router_t*) context; - dx_delivery_t *delivery; - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - pn_link_t *pn_link = dx_link_pn(link); - uint64_t tag; - int link_credit = pn_link_credit(pn_link); - dx_routed_event_list_t to_send; - dx_routed_event_list_t events; - dx_routed_event_t *re; - size_t offer; - int event_count = 0; - - DEQ_INIT(to_send); - DEQ_INIT(events); - - sys_mutex_lock(router->lock); - - // - // Pull the non-delivery events into a local list so they can be processed without - // the lock being held. - // - re = DEQ_HEAD(rlink->event_fifo); - while (re) { - DEQ_REMOVE_HEAD(rlink->event_fifo); - DEQ_INSERT_TAIL(events, re); - re = DEQ_HEAD(rlink->event_fifo); - } - - // - // Under lock, move available deliveries from the msg_fifo to the local to_send - // list. Don't move more than we have credit to send. - // - if (link_credit > 0) { - tag = router->dtag; - re = DEQ_HEAD(rlink->msg_fifo); - while (re) { - DEQ_REMOVE_HEAD(rlink->msg_fifo); - DEQ_INSERT_TAIL(to_send, re); - if (DEQ_SIZE(to_send) == link_credit) - break; - re = DEQ_HEAD(rlink->msg_fifo); - } - router->dtag += DEQ_SIZE(to_send); - } - - offer = DEQ_SIZE(rlink->msg_fifo); - sys_mutex_unlock(router->lock); - - // - // Deliver all the to_send messages downrange - // - re = DEQ_HEAD(to_send); - while (re) { - DEQ_REMOVE_HEAD(to_send); - - // - // Get a delivery for the send. This will be the current delivery on the link. - // - tag++; - delivery = dx_delivery(link, pn_dtag((char*) &tag, 8)); - - // - // Send the message - // - dx_message_send(re->message, link); - - // - // If there is an incoming delivery associated with this message, link it - // with the outgoing delivery. Otherwise, the message arrived pre-settled - // and should be sent presettled. - // - if (re->delivery) { - dx_delivery_set_peer(re->delivery, delivery); - dx_delivery_set_peer(delivery, re->delivery); - } else - dx_delivery_free(delivery, 0); // settle and free - - pn_link_advance(pn_link); - event_count++; - - dx_message_free(re->message); - free_dx_routed_event_t(re); - re = DEQ_HEAD(to_send); - } - - // - // Process the non-delivery events. - // - re = DEQ_HEAD(events); - while (re) { - DEQ_REMOVE_HEAD(events); - - if (re->delivery) { - if (re->disposition) { - pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition); - event_count++; - } - if (re->settle) { - dx_delivery_free(re->delivery, 0); - event_count++; - } - } - - free_dx_routed_event_t(re); - re = DEQ_HEAD(events); - } - - // - // Set the offer to the number of messages remaining to be sent. - // - pn_link_offered(pn_link, offer); - return event_count; -} - - -static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop) -{ - dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); - dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); - dx_field_iterator_t *ingress_iter = 0; - - dx_parsed_field_t *trace = 0; - dx_parsed_field_t *ingress = 0; - - if (in_da) { - trace = dx_parse_value_by_key(in_da, DX_DA_TRACE); - ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS); - } - - dx_compose_start_map(out_da); - - // - // If there is a trace field, append this router's ID to the trace. - // - dx_compose_insert_string(out_da, DX_DA_TRACE); - dx_compose_start_list(out_da); - if (trace) { - if (dx_parse_is_list(trace)) { - uint32_t idx = 0; - dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx); - while (trace_item) { - dx_field_iterator_t *iter = dx_parse_raw(trace_item); - if (dx_field_iterator_equal(iter, (unsigned char*) node_id)) - *drop = 1; - dx_field_iterator_reset(iter); - dx_compose_insert_string_iterator(out_da, iter); - idx++; - trace_item = dx_parse_sub_value(trace, idx); - } - } - } - - dx_compose_insert_string(out_da, node_id); - dx_compose_end_list(out_da); - - // - // If there is no ingress field, annotate the ingress as this router else - // keep the original field. - // - dx_compose_insert_string(out_da, DX_DA_INGRESS); - if (ingress && dx_parse_is_scalar(ingress)) { - ingress_iter = dx_parse_raw(ingress); - dx_compose_insert_string_iterator(out_da, ingress_iter); - } else - dx_compose_insert_string(out_da, node_id); - - dx_compose_end_map(out_da); - - dx_message_set_delivery_annotations(msg, out_da); - dx_compose_free(out_da); - - // - // Return the iterator to the ingress field _if_ it was present. - // If we added the ingress, return NULL. - // - return ingress_iter; -} - - -/** - * Inbound Delivery Handler - */ -static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *delivery) -{ - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - dx_message_t *msg; - int valid_message = 0; - - // - // Receive the message into a local representation. If the returned message - // pointer is NULL, we have not yet received a complete message. - // - msg = dx_message_receive(delivery); - if (!msg) - return; - - // - // Consume the delivery and issue a replacement credit - // - pn_link_advance(pn_link); - pn_link_flow(pn_link, 1); - - sys_mutex_lock(router->lock); - - // - // Handle the Link-Routing case. If this incoming link is associated with a connected - // link, simply deliver the message to the outgoing link. There is no need to validate - // the message in this case. - // - if (rlink->connected_link) { - dx_router_link_t *clink = rlink->connected_link; - dx_routed_event_t *re = new_dx_routed_event_t(); - - DEQ_ITEM_INIT(re); - re->delivery = 0; - re->message = msg; - re->settle = false; - re->disposition = 0; - DEQ_INSERT_TAIL(clink->msg_fifo, re); - - // - // If the incoming delivery is settled (pre-settled), don't link it into the routed - // event. If it's not settled, link it into the event for later handling. - // - if (dx_delivery_settled(delivery)) - dx_delivery_free(delivery, 0); - else - re->delivery = delivery; - - sys_mutex_unlock(router->lock); - dx_link_activate(clink->link); - return; - } - - // - // We are performing Message-Routing, therefore we will need to validate the message - // through the Properties section so we can access the TO field. - // - dx_message_t *in_process_copy = 0; - dx_router_message_cb handler = 0; - void *handler_context = 0; - - valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); - - if (valid_message) { - dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); - dx_address_t *addr; - int fanout = 0; - - if (iter) { - dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - - // - // Note: This function is going to need to be refactored so we can put an - // asynchronous address lookup here. In the event there is a translation - // of the address (via namespace), it will have to be done here after - // obtaining the iterator and before doing the hash lookup. - // - // Note that this lookup is only done for global/mobile class addresses. - // - - dx_hash_retrieve(router->addr_hash, iter, (void*) &addr); - dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); - int is_local = dx_field_iterator_prefix(iter, local_prefix); - int is_direct = dx_field_iterator_prefix(iter, direct_prefix); - dx_field_iterator_free(iter); - - if (addr) { - // - // If the incoming link is an endpoint link, count this as an ingress delivery. - // - if (rlink->link_type == DX_LINK_ENDPOINT) - addr->deliveries_ingress++; - - // - // To field is valid and contains a known destination. Handle the various - // cases for forwarding. - // - - // - // Interpret and update the delivery annotations of the message. As a convenience, - // this function returns the iterator to the ingress field (if it exists). - // - int drop = 0; - dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop); - - // - // Forward to the in-process handler for this message if there is one. The - // actual invocation of the handler will occur later after we've released - // the lock. - // - if (!drop && addr->handler) { - in_process_copy = dx_message_copy(msg); - handler = addr->handler; - handler_context = addr->handler_context; - addr->deliveries_to_container++; - } - - // - // If the address form is local (i.e. is prefixed by _local), don't forward - // outside of the router process. - // - if (!drop && !is_local) { - // - // Forward to all of the local links receiving this address. - // - dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); - while (dest_link_ref) { - dx_routed_event_t *re = new_dx_routed_event_t(); - DEQ_ITEM_INIT(re); - re->delivery = 0; - re->message = dx_message_copy(msg); - re->settle = 0; - re->disposition = 0; - DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); - - fanout++; - if (fanout == 1 && !dx_delivery_settled(delivery)) - re->delivery = delivery; - - addr->deliveries_egress++; - dx_link_activate(dest_link_ref->link->link); - dest_link_ref = DEQ_NEXT(dest_link_ref); - } - - // - // If the address form is direct to this router node, don't relay it on - // to any other part of the network. - // - if (!is_direct) { - // - // Get the mask bit associated with the ingress router for the message. - // This will be compared against the "valid_origin" masks for each - // candidate destination router. - // - int origin = -1; - if (ingress_iter) { - dx_field_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH); - dx_address_t *origin_addr; - dx_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr); - if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) { - dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes); - origin = rref->router->mask_bit; - } - } else - origin = 0; - - // - // Forward to the next-hops for remote destinations. - // - if (origin >= 0) { - dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); - dx_router_link_t *dest_link; - dx_bitmask_t *link_set = dx_bitmask(0); - - // - // Loop over the target nodes for this address. Build a set of outgoing links - // for which there are valid targets. We do this to avoid sending more than one - // message down a given link. It's possible that there are multiple destinations - // for this address that are all reachable over the same link. In this case, we - // will send only one copy of the message over the link and allow a downstream - // router to fan the message out. - // - while (dest_node_ref) { - if (dest_node_ref->router->next_hop) - dest_link = dest_node_ref->router->next_hop->peer_link; - else - dest_link = dest_node_ref->router->peer_link; - if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) - dx_bitmask_set_bit(link_set, dest_link->mask_bit); - dest_node_ref = DEQ_NEXT(dest_node_ref); - } - - // - // Send a copy of the message outbound on each identified link. - // - int link_bit; - while (dx_bitmask_first_set(link_set, &link_bit)) { - dx_bitmask_clear_bit(link_set, link_bit); - dest_link = router->out_links_by_mask_bit[link_bit]; - if (dest_link) { - dx_routed_event_t *re = new_dx_routed_event_t(); - DEQ_ITEM_INIT(re); - re->delivery = 0; - re->message = dx_message_copy(msg); - re->settle = 0; - re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); - - fanout++; - if (fanout == 1 && !dx_delivery_settled(delivery)) - re->delivery = delivery; - - addr->deliveries_transit++; - dx_link_activate(dest_link->link); - } - } - - dx_bitmask_free(link_set); - } - } - } - } - - // - // In message-routing mode, the handling of the incoming delivery depends on the - // number of copies of the received message that were forwarded. - // - if (handler) { - dx_delivery_free(delivery, PN_ACCEPTED); - } else if (fanout == 0) { - dx_delivery_free(delivery, PN_RELEASED); - } - } - } else { - // - // Message is invalid. Reject the message. - // - dx_delivery_free(delivery, PN_REJECTED); - } - - sys_mutex_unlock(router->lock); - dx_message_free(msg); - - // - // Invoke the in-process handler now that the lock is released. - // - if (handler) { - handler(handler_context, in_process_copy, rlink->mask_bit); - dx_message_free(in_process_copy); - } -} - - -/** - * Delivery Disposition Handler - */ -static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *delivery) -{ - dx_router_t *router = (dx_router_t*) context; - bool changed = dx_delivery_disp_changed(delivery); - uint64_t disp = dx_delivery_disp(delivery); - bool settled = dx_delivery_settled(delivery); - dx_delivery_t *peer = dx_delivery_peer(delivery); - - if (peer) { - // - // The case where this delivery has a peer. - // - if (changed || settled) { - dx_link_t *peer_link = dx_delivery_link(peer); - dx_router_link_t *prl = (dx_router_link_t*) dx_link_get_context(peer_link); - dx_routed_event_t *re = new_dx_routed_event_t(); - DEQ_ITEM_INIT(re); - re->delivery = peer; - re->message = 0; - re->settle = settled; - re->disposition = changed ? disp : 0; - - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(prl->event_fifo, re); - sys_mutex_unlock(router->lock); - - dx_link_activate(peer_link); - } - - } else { - // - // The no-peer case. Ignore status changes and echo settlement. - // - if (settled) - dx_delivery_free(delivery, 0); - } -} - - -/** - * New Incoming Link Handler - */ -static int router_incoming_link_handler(void* context, dx_link_t *link) -{ - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - int is_router = dx_router_terminus_is_router(dx_link_remote_source(link)); - - if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) { - dx_log(module, LOG_WARNING, "Incoming link claims router capability but is not on an inter-router connection"); - pn_link_close(pn_link); - return 0; - } - - dx_router_link_t *rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; - rlink->link_direction = DX_INCOMING; - rlink->owning_addr = 0; - rlink->link = link; - rlink->connected_link = 0; - rlink->peer_link = 0; - rlink->ref = 0; - DEQ_INIT(rlink->event_fifo); - DEQ_INIT(rlink->msg_fifo); - - dx_link_set_context(link, rlink); - - sys_mutex_lock(router->lock); - rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0; - DEQ_INSERT_TAIL(router->links, rlink); - sys_mutex_unlock(router->lock); - - pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); - pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); - pn_link_flow(pn_link, 1000); - pn_link_open(pn_link); - - // - // TODO - If the address has link-route semantics, create all associated - // links needed to go with this one. - // - - return 0; -} - - -/** - * New Outgoing Link Handler - */ -static int router_outgoing_link_handler(void* context, dx_link_t *link) -{ - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); - int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); - int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); - int propagate = 0; - dx_field_iterator_t *iter = 0; - - if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) { - dx_log(module, LOG_WARNING, "Outgoing link claims router capability but is not on an inter-router connection"); - pn_link_close(pn_link); - return 0; - } - - // - // If this link is not a router link and it has no source address, we can't - // accept it. - // - if (r_src == 0 && !is_router && !is_dynamic) { - pn_link_close(pn_link); - return 0; - } - - - // - // If this is an endpoint link with a source address, make sure the address is - // appropriate for endpoint links. If it is not mobile address, it cannot be - // bound to an endpoint link. - // - if(r_src && !is_router && !is_dynamic) { - iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); - unsigned char prefix = dx_field_iterator_octet(iter); - dx_field_iterator_reset(iter); - - if (prefix != 'M') { - dx_field_iterator_free(iter); - pn_link_close(pn_link); - dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src); - return 0; - } - } - - // - // Create a router_link record for this link. Some of the fields will be - // modified in the different cases below. - // - dx_router_link_t *rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; - rlink->link_direction = DX_OUTGOING; - rlink->owning_addr = 0; - rlink->link = link; - rlink->connected_link = 0; - rlink->peer_link = 0; - rlink->ref = 0; - DEQ_INIT(rlink->event_fifo); - DEQ_INIT(rlink->msg_fifo); - - dx_link_set_context(link, rlink); - pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); - pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); - - sys_mutex_lock(router->lock); - rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0; - - if (is_router) { - // - // If this is a router link, put it in the hello_address link-list. - // - dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink); - rlink->owning_addr = router->hello_addr; - router->out_links_by_mask_bit[rlink->mask_bit] = rlink; - - } else { - // - // If this is an endpoint link, check the source. If it is dynamic, we will - // assign it an ephemeral and routable address. If it has a non-dymanic - // address, that address needs to be set up in the address list. - // - char temp_addr[1000]; // FIXME - dx_address_t *addr; - - if (is_dynamic) { - dx_router_generate_temp_addr(router, temp_addr, 1000); - iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); - pn_terminus_set_address(dx_link_source(link), temp_addr); - dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr); - } else - dx_log(module, LOG_INFO, "Registered local address: %s", r_src); - - dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - memset(addr, 0, sizeof(dx_address_t)); - DEQ_ITEM_INIT(addr); - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); - DEQ_INSERT_TAIL(router->addrs, addr); - } - - rlink->owning_addr = addr; - dx_router_add_link_ref_LH(&addr->rlinks, rlink); - - // - // If this is not a dynamic address and it is the first local subscription - // to the address, supply the address to the router module for propagation - // to other nodes. - // - propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1); - } - - DEQ_INSERT_TAIL(router->links, rlink); - sys_mutex_unlock(router->lock); - - if (propagate) - dx_router_mobile_added(router, iter); - - if (iter) - dx_field_iterator_free(iter); - pn_link_open(pn_link); - return 0; -} - - -/** - * Link Detached Handler - */ -static int router_link_detach_handler(void* context, dx_link_t *link, int closed) -{ - dx_router_t *router = (dx_router_t*) context; - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link); - dx_address_t *oaddr = 0; - - if (shared) { - dx_link_set_conn_context(link, 0); - free_dx_router_conn_t(shared); - } - - if (!rlink) - return 0; - - sys_mutex_lock(router->lock); - - // - // If the link is outgoing, we must disassociate it from its address. - // - if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) { - dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink); - oaddr = rlink->owning_addr; - } - - // - // If this is an outgoing inter-router link, we must remove the by-mask-bit - // index reference to this link. - // - if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_OUTGOING) { - if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink) - router->out_links_by_mask_bit[rlink->mask_bit] = 0; - else - dx_log(module, LOG_CRITICAL, "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit); - } - - // - // If this is an incoming inter-router link, we must free the mask_bit. - // - if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING) - dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit); - - // - // Remove the link from the master list-of-links. - // - DEQ_REMOVE(router->links, rlink); - sys_mutex_unlock(router->lock); - - // - // Check to see if the owning address should be deleted - // - dx_router_check_addr(router, oaddr, 1); - - // TODO - wrap the free to handle the recursive items - free_dx_router_link_t(rlink); - - return 0; -} - - -static void router_inbound_open_handler(void *type_context, dx_connection_t *conn) -{ -} - - -static void router_outbound_open_handler(void *type_context, dx_connection_t *conn) -{ - // - // Check the configured role of this connection. If it is not the inter-router - // role, ignore it. - // - if (!dx_router_connection_is_inter_router(conn)) { - dx_log(module, LOG_WARNING, "Outbound connection set up without inter-router role"); - return; - } - - dx_router_t *router = (dx_router_t*) type_context; - dx_link_t *sender; - dx_link_t *receiver; - dx_router_link_t *rlink; - int mask_bit = 0; - size_t clen = strlen(DX_CAPABILITY_ROUTER); - - // - // Allocate a mask bit to designate the pair of links connected to the neighbor router - // - sys_mutex_lock(router->lock); - if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { - dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); - } else { - sys_mutex_unlock(router->lock); - dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); - return; - } - - // - // Create an incoming link with router source capability - // - receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1); - // TODO - We don't want to have to cast away the constness of the literal string here! - // See PROTON-429 - pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER)); - - rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->mask_bit = mask_bit; - rlink->link_type = DX_LINK_ROUTER; - rlink->link_direction = DX_INCOMING; - rlink->owning_addr = 0; - rlink->link = receiver; - rlink->connected_link = 0; - rlink->peer_link = 0; - DEQ_INIT(rlink->event_fifo); - DEQ_INIT(rlink->msg_fifo); - - dx_link_set_context(receiver, rlink); - DEQ_INSERT_TAIL(router->links, rlink); - - // - // Create an outgoing link with router target capability - // - sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2); - // TODO - We don't want to have to cast away the constness of the literal string here! - // See PROTON-429 - pn_data_put_symbol(pn_terminus_capabilities(dx_link_source(sender)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); - - rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->mask_bit = mask_bit; - rlink->link_type = DX_LINK_ROUTER; - rlink->link_direction = DX_OUTGOING; - rlink->owning_addr = router->hello_addr; - rlink->link = sender; - rlink->connected_link = 0; - rlink->peer_link = 0; - DEQ_INIT(rlink->event_fifo); - DEQ_INIT(rlink->msg_fifo); - - // - // Add the new outgoing link to the hello_address's list of links. - // - dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink); - - // - // Index this link from the by-maskbit index so we can later find it quickly - // when provided with the mask bit. - // - router->out_links_by_mask_bit[mask_bit] = rlink; - - dx_link_set_context(sender, rlink); - DEQ_INSERT_TAIL(router->links, rlink); - sys_mutex_unlock(router->lock); - - pn_link_open(dx_link_pn(receiver)); - pn_link_open(dx_link_pn(sender)); - pn_link_flow(dx_link_pn(receiver), 1000); -} - - -static void dx_router_timer_handler(void *context) -{ - dx_router_t *router = (dx_router_t*) context; - - // - // Periodic processing. - // - dx_pyrouter_tick(router); - dx_timer_schedule(router->timer, 1000); -} - - -static dx_node_type_t router_node = {"router", 0, 0, - router_rx_handler, - router_disp_handler, - router_incoming_link_handler, - router_outgoing_link_handler, - router_writable_link_handler, - router_link_detach_handler, - 0, // node_created_handler - 0, // node_destroyed_handler - router_inbound_open_handler, - router_outbound_open_handler }; -static int type_registered = 0; - - -dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *area, const char *id) -{ - if (!type_registered) { - type_registered = 1; - dx_container_register_node_type(dx, &router_node); - } - - size_t dplen = 9 + strlen(area) + strlen(id); - direct_prefix = (char*) malloc(dplen); - strcpy(direct_prefix, "_topo/"); - strcat(direct_prefix, area); - strcat(direct_prefix, "/"); - strcat(direct_prefix, id); - strcat(direct_prefix, "/"); - - node_id = (char*) malloc(dplen); - strcpy(node_id, area); - strcat(node_id, "/"); - strcat(node_id, id); - - dx_router_t *router = NEW(dx_router_t); - - router_node.type_context = router; - - dx->router = router; - router->dx = dx; - router->router_mode = mode; - router->router_area = area; - router->router_id = id; - router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); - DEQ_INIT(router->addrs); - router->addr_hash = dx_hash(10, 32, 0); - - DEQ_INIT(router->links); - DEQ_INIT(router->routers); - - router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width()); - router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width()); - for (int idx = 0; idx < dx_bitmask_width(); idx++) { - router->out_links_by_mask_bit[idx] = 0; - router->routers_by_mask_bit[idx] = 0; - } - - router->neighbor_free_mask = dx_bitmask(1); - router->lock = sys_mutex(); - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - router->dtag = 1; - router->pyRouter = 0; - router->pyTick = 0; - router->pyAdded = 0; - router->pyRemoved = 0; - - // - // Create addresses for all of the routers in the topology. It will be registered - // locally later in the initialization sequence. - // - if (router->router_mode == DX_ROUTER_MODE_INTERIOR) { - router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0); - router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0); - } - - // - // Inform the field iterator module of this router's id and area. The field iterator - // uses this to offload some of the address-processing load from the router. - // - dx_field_iterator_set_address(area, id); - - // - // Set up the usage of the embedded python router module. - // - dx_python_start(); - - switch (router->router_mode) { - case DX_ROUTER_MODE_STANDALONE: dx_log(module, LOG_INFO, "Router started in Standalone mode"); break; - case DX_ROUTER_MODE_INTERIOR: dx_log(module, LOG_INFO, "Router started in Interior mode, area=%s id=%s", area, id); break; - case DX_ROUTER_MODE_EDGE: dx_log(module, LOG_INFO, "Router started in Edge mode"); break; - } - - return router; -} - - -void dx_router_setup_late(dx_dispatch_t *dx) -{ - dx_router_agent_setup(dx->router); - dx_router_python_setup(dx->router); - dx_timer_schedule(dx->router->timer, 1000); -} - - -void dx_router_free(dx_router_t *router) -{ - dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH); - sys_mutex_free(router->lock); - free(router); - dx_python_stop(); -} - - -const char *dx_router_id(const dx_dispatch_t *dx) -{ - return node_id; -} - - -dx_address_t *dx_router_register_address(dx_dispatch_t *dx, - const char *address, - dx_router_message_cb handler, - void *context) -{ - char addr_string[1000]; - dx_router_t *router = dx->router; - dx_address_t *addr; - dx_field_iterator_t *iter; - - strcpy(addr_string, "L"); // Local Hash-Key Space - strcat(addr_string, address); - iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); - - sys_mutex_lock(router->lock); - dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - memset(addr, 0, sizeof(dx_address_t)); - DEQ_ITEM_INIT(addr); - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); - DEQ_ITEM_INIT(addr); - DEQ_INSERT_TAIL(router->addrs, addr); - } - dx_field_iterator_free(iter); - - addr->handler = handler; - addr->handler_context = context; - - sys_mutex_unlock(router->lock); - - if (handler) - dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address); - return addr; -} - - -void dx_router_unregister_address(dx_address_t *ad) -{ - //free_dx_address_t(ad); -} - - -void dx_router_send(dx_dispatch_t *dx, - dx_field_iterator_t *address, - dx_message_t *msg) -{ - dx_router_t *router = dx->router; - dx_address_t *addr; - - dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); - sys_mutex_lock(router->lock); - dx_hash_retrieve(router->addr_hash, address, (void*) &addr); - if (addr) { - // - // Forward to all of the local links receiving this address. - // - addr->deliveries_from_container++; - dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); - while (dest_link_ref) { - dx_routed_event_t *re = new_dx_routed_event_t(); - DEQ_ITEM_INIT(re); - re->delivery = 0; - re->message = dx_message_copy(msg); - re->settle = 0; - re->disposition = 0; - DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); - - dx_link_activate(dest_link_ref->link->link); - addr->deliveries_egress++; - - dest_link_ref = DEQ_NEXT(dest_link_ref); - } - - // - // Forward to the next-hops for remote destinations. - // - dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); - dx_router_link_t *dest_link; - dx_bitmask_t *link_set = dx_bitmask(0); - - while (dest_node_ref) { - if (dest_node_ref->router->next_hop) - dest_link = dest_node_ref->router->next_hop->peer_link; - else - dest_link = dest_node_ref->router->peer_link; - if (dest_link) - dx_bitmask_set_bit(link_set, dest_link->mask_bit); - dest_node_ref = DEQ_NEXT(dest_node_ref); - } - - int link_bit; - while (dx_bitmask_first_set(link_set, &link_bit)) { - dx_bitmask_clear_bit(link_set, link_bit); - dest_link = router->out_links_by_mask_bit[link_bit]; - if (dest_link) { - dx_routed_event_t *re = new_dx_routed_event_t(); - DEQ_ITEM_INIT(re); - re->delivery = 0; - re->message = dx_message_copy(msg); - re->settle = 0; - re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); - dx_link_activate(dest_link->link); - addr->deliveries_transit++; - } - } - - dx_bitmask_free(link_set); - } - sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? -} - - -void dx_router_send2(dx_dispatch_t *dx, - const char *address, - dx_message_t *msg) -{ - dx_field_iterator_t *iter = dx_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH); - dx_router_send(dx, iter, msg); - dx_field_iterator_free(iter); -} - diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h deleted file mode 100644 index 066188b3f7..0000000000 --- a/qpid/extras/dispatch/src/router_private.h +++ /dev/null @@ -1,177 +0,0 @@ -#ifndef __router_private_h__ -#define __router_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -typedef struct dx_router_link_t dx_router_link_t; -typedef struct dx_router_node_t dx_router_node_t; -typedef struct dx_router_ref_t dx_router_ref_t; -typedef struct dx_router_link_ref_t dx_router_link_ref_t; -typedef struct dx_router_conn_t dx_router_conn_t; - -void dx_router_python_setup(dx_router_t *router); -void dx_pyrouter_tick(dx_router_t *router); -void dx_router_agent_setup(dx_router_t *router); - -typedef enum { - DX_ROUTER_MODE_STANDALONE, // Standalone router. No routing protocol participation - DX_ROUTER_MODE_INTERIOR, // Interior router. Full participation in routing protocol. - DX_ROUTER_MODE_EDGE // Edge router. No routing protocol participation, access via other protocols. -} dx_router_mode_t; - -typedef enum { - DX_LINK_ENDPOINT, // A link to a connected endpoint - DX_LINK_ROUTER, // A link to a peer router in the same area - DX_LINK_AREA // A link to a peer router in a different area (area boundary) -} dx_link_type_t; - - -typedef struct dx_routed_event_t { - DEQ_LINKS(struct dx_routed_event_t); - dx_delivery_t *delivery; - dx_message_t *message; - bool settle; - uint64_t disposition; -} dx_routed_event_t; - -ALLOC_DECLARE(dx_routed_event_t); -DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); - - -struct dx_router_link_t { - DEQ_LINKS(dx_router_link_t); - int mask_bit; // Unique mask bit if this is an inter-router link - dx_link_type_t link_type; - dx_direction_t link_direction; - dx_address_t *owning_addr; // [ref] Address record that owns this link - dx_link_t *link; // [own] Link pointer - dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link - dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link - dx_router_link_ref_t *ref; // Pointer to a containing reference object - dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) - dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries -}; - -ALLOC_DECLARE(dx_router_link_t); -DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); - -struct dx_router_node_t { - DEQ_LINKS(dx_router_node_t); - dx_address_t *owning_addr; - int mask_bit; - dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node - dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node - uint32_t ref_count; - dx_bitmask_t *valid_origins; -}; - -ALLOC_DECLARE(dx_router_node_t); -DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); - -struct dx_router_ref_t { - DEQ_LINKS(dx_router_ref_t); - dx_router_node_t *router; -}; - -ALLOC_DECLARE(dx_router_ref_t); -DEQ_DECLARE(dx_router_ref_t, dx_router_ref_list_t); - - -struct dx_router_link_ref_t { - DEQ_LINKS(dx_router_link_ref_t); - dx_router_link_t *link; -}; - -ALLOC_DECLARE(dx_router_link_ref_t); -DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t); - - -struct dx_router_conn_t { - int mask_bit; -}; - -ALLOC_DECLARE(dx_router_conn_t); - - -struct dx_address_t { - DEQ_LINKS(dx_address_t); - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; // In-Process Consumer context - dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers - dx_router_ref_list_t rnodes; // Remotely-Connected Consumers - dx_hash_handle_t *hash_handle; // Linkage back to the hash table entry - - uint64_t deliveries_ingress; - uint64_t deliveries_egress; - uint64_t deliveries_transit; - uint64_t deliveries_to_container; - uint64_t deliveries_from_container; -}; - -ALLOC_DECLARE(dx_address_t); -DEQ_DECLARE(dx_address_t, dx_address_list_t); - - -struct dx_router_t { - dx_dispatch_t *dx; - dx_router_mode_t router_mode; - const char *router_area; - const char *router_id; - dx_node_t *node; - - dx_address_list_t addrs; - dx_hash_t *addr_hash; - dx_address_t *router_addr; - dx_address_t *hello_addr; - - dx_router_link_list_t links; - dx_router_node_list_t routers; - dx_router_link_t **out_links_by_mask_bit; - dx_router_node_t **routers_by_mask_bit; - - dx_bitmask_t *neighbor_free_mask; - sys_mutex_t *lock; - dx_timer_t *timer; - uint64_t dtag; - - PyObject *pyRouter; - PyObject *pyTick; - PyObject *pyAdded; - PyObject *pyRemoved; - - dx_agent_class_t *class_router; - dx_agent_class_t *class_link; - dx_agent_class_t *class_node; - dx_agent_class_t *class_address; -}; - - - -void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local); -void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); -void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); - -void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); -void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); - -void dx_router_mobile_added(dx_router_t *router, dx_field_iterator_t *iter); -void dx_router_mobile_removed(dx_router_t *router, const char *addr); - - -#endif diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c deleted file mode 100644 index 82b08c0785..0000000000 --- a/qpid/extras/dispatch/src/router_pynode.c +++ /dev/null @@ -1,682 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/python_embedded.h> -#include <stdio.h> -#include <string.h> -#include <stdbool.h> -#include <stdlib.h> -#include <qpid/dispatch.h> -#include "dispatch_private.h" -#include "router_private.h" - -static char *module = "router.pynode"; - -typedef struct { - PyObject_HEAD - dx_router_t *router; -} RouterAdapter; - - -static char *dx_add_router(dx_router_t *router, const char *address, int router_maskbit, int link_maskbit) -{ - if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) - return "Router bit mask out of range"; - - if (link_maskbit >= dx_bitmask_width() || link_maskbit < -1) - return "Link bit mask out of range"; - - sys_mutex_lock(router->lock); - if (router->routers_by_mask_bit[router_maskbit] != 0) { - sys_mutex_unlock(router->lock); - return "Adding router over already existing router"; - } - - if (link_maskbit >= 0 && router->out_links_by_mask_bit[link_maskbit] == 0) { - sys_mutex_unlock(router->lock); - return "Adding neighbor router with invalid link reference"; - } - - // - // Hash lookup the address to ensure there isn't an existing router address. - // - dx_field_iterator_t *iter = dx_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - - dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); - assert(addr == 0); - - // - // Create an address record for this router and insert it in the hash table. - // This record will be found whenever a "foreign" topological address to this - // remote router is looked up. - // - addr = new_dx_address_t(); - memset(addr, 0, sizeof(dx_address_t)); - DEQ_ITEM_INIT(addr); - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); - DEQ_INSERT_TAIL(router->addrs, addr); - dx_field_iterator_free(iter); - - // - // Create a router-node record to represent the remote router. - // - dx_router_node_t *rnode = new_dx_router_node_t(); - DEQ_ITEM_INIT(rnode); - rnode->owning_addr = addr; - rnode->mask_bit = router_maskbit; - rnode->next_hop = 0; - rnode->peer_link = 0; - rnode->ref_count = 0; - rnode->valid_origins = dx_bitmask(0); - - DEQ_INSERT_TAIL(router->routers, rnode); - - // - // Link the router record to the address record. - // - dx_router_add_node_ref_LH(&addr->rnodes, rnode); - - // - // Link the router record to the router address record. - // - dx_router_add_node_ref_LH(&router->router_addr->rnodes, rnode); - - // - // Add the router record to the mask-bit index. - // - router->routers_by_mask_bit[router_maskbit] = rnode; - - // - // If this is a neighbor router, add the peer_link reference to the - // router record. - // - if (link_maskbit >= 0) - rnode->peer_link = router->out_links_by_mask_bit[link_maskbit]; - - sys_mutex_unlock(router->lock); - return 0; -} - - -static char *dx_del_router(dx_router_t *router, int router_maskbit) -{ - if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) - return "Router bit mask out of range"; - - sys_mutex_lock(router->lock); - if (router->routers_by_mask_bit[router_maskbit] == 0) { - sys_mutex_unlock(router->lock); - return "Deleting nonexistent router"; - } - - dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit]; - dx_address_t *oaddr = rnode->owning_addr; - assert(oaddr); - - // - // Unlink the router node from the address record - // - dx_router_del_node_ref_LH(&oaddr->rnodes, rnode); - - // - // While the router node has a non-zero reference count, look for addresses - // to unlink the node from. - // - dx_address_t *addr = DEQ_HEAD(router->addrs); - while (addr && rnode->ref_count > 0) { - dx_router_del_node_ref_LH(&addr->rnodes, rnode); - addr = DEQ_NEXT(addr); - } - assert(rnode->ref_count == 0); - - // - // Free the router node and the owning address records. - // - dx_bitmask_free(rnode->valid_origins); - DEQ_REMOVE(router->routers, rnode); - free_dx_router_node_t(rnode); - - dx_hash_remove_by_handle(router->addr_hash, oaddr->hash_handle); - DEQ_REMOVE(router->addrs, oaddr); - dx_hash_handle_free(oaddr->hash_handle); - router->routers_by_mask_bit[router_maskbit] = 0; - free_dx_address_t(oaddr); - - sys_mutex_unlock(router->lock); - return 0; -} - - -static PyObject* dx_add_remote_router(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - const char *address; - int router_maskbit; - - if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit)) - return 0; - - char *error = dx_add_router(router, address, router_maskbit, -1); - if (error) { - PyErr_SetString(PyExc_Exception, error); - return 0; - } - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_del_remote_router(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - int router_maskbit; - - if (!PyArg_ParseTuple(args, "i", &router_maskbit)) - return 0; - - char *error = dx_del_router(router, router_maskbit); - if (error) { - PyErr_SetString(PyExc_Exception, error); - return 0; - } - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_set_next_hop(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - int router_maskbit; - int next_hop_maskbit; - - if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit)) - return 0; - - if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) { - PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); - return 0; - } - - if (next_hop_maskbit >= dx_bitmask_width() || next_hop_maskbit < 0) { - PyErr_SetString(PyExc_Exception, "Next Hop bit mask out of range"); - return 0; - } - - if (router->routers_by_mask_bit[router_maskbit] == 0) { - PyErr_SetString(PyExc_Exception, "Router Not Found"); - return 0; - } - - if (router->routers_by_mask_bit[next_hop_maskbit] == 0) { - PyErr_SetString(PyExc_Exception, "Next Hop Not Found"); - return 0; - } - - if (router_maskbit != next_hop_maskbit) { - dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit]; - rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit]; - } - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_set_valid_origins(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - int router_maskbit; - PyObject *origin_list; - Py_ssize_t idx; - - if (!PyArg_ParseTuple(args, "iO", &router_maskbit, &origin_list)) - return 0; - - if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) { - PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); - return 0; - } - - if (router->routers_by_mask_bit[router_maskbit] == 0) { - PyErr_SetString(PyExc_Exception, "Router Not Found"); - return 0; - } - - if (!PyList_Check(origin_list)) { - PyErr_SetString(PyExc_Exception, "Expected List as argument 2"); - return 0; - } - - Py_ssize_t origin_count = PyList_Size(origin_list); - dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit]; - int maskbit; - - for (idx = 0; idx < origin_count; idx++) { - maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx)); - - if (maskbit >= dx_bitmask_width() || maskbit < 0) { - PyErr_SetString(PyExc_Exception, "Origin bit mask out of range"); - return 0; - } - - if (router->routers_by_mask_bit[maskbit] == 0) { - PyErr_SetString(PyExc_Exception, "Origin router Not Found"); - return 0; - } - } - - dx_bitmask_clear_all(rnode->valid_origins); - dx_bitmask_set_bit(rnode->valid_origins, 0); // This router is a valid origin for all destinations - for (idx = 0; idx < origin_count; idx++) { - maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx)); - dx_bitmask_set_bit(rnode->valid_origins, maskbit); - } - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - const char *address; - int router_maskbit; - int link_maskbit; - - if (!PyArg_ParseTuple(args, "sii", &address, &router_maskbit, &link_maskbit)) - return 0; - - char *error = dx_add_router(router, address, router_maskbit, link_maskbit); - if (error) { - PyErr_SetString(PyExc_Exception, error); - return 0; - } - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - int router_maskbit; - - if (!PyArg_ParseTuple(args, "i", &router_maskbit)) - return 0; - - char *error = dx_del_router(router, router_maskbit); - if (error) { - PyErr_SetString(PyExc_Exception, error); - return 0; - } - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_map_destination(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - const char *addr_string; - int maskbit; - dx_address_t *addr; - dx_field_iterator_t *iter; - - if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit)) - return 0; - - if (maskbit >= dx_bitmask_width() || maskbit < 0) { - PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); - return 0; - } - - if (router->routers_by_mask_bit[maskbit] == 0) { - PyErr_SetString(PyExc_Exception, "Router Not Found"); - return 0; - } - - iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH); - - sys_mutex_lock(router->lock); - dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - memset(addr, 0, sizeof(dx_address_t)); - DEQ_ITEM_INIT(addr); - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); - DEQ_ITEM_INIT(addr); - DEQ_INSERT_TAIL(router->addrs, addr); - } - dx_field_iterator_free(iter); - - dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit]; - dx_router_add_node_ref_LH(&addr->rnodes, rnode); - - sys_mutex_unlock(router->lock); - - dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit); - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_unmap_destination(PyObject *self, PyObject *args) -{ - RouterAdapter *adapter = (RouterAdapter*) self; - dx_router_t *router = adapter->router; - const char *addr_string; - int maskbit; - dx_address_t *addr; - - if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit)) - return 0; - - if (maskbit >= dx_bitmask_width() || maskbit < 0) { - PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); - return 0; - } - - if (router->routers_by_mask_bit[maskbit] == 0) { - PyErr_SetString(PyExc_Exception, "Router Not Found"); - return 0; - } - - dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit]; - dx_field_iterator_t *iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH); - - sys_mutex_lock(router->lock); - dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); - dx_field_iterator_free(iter); - - if (!addr) { - PyErr_SetString(PyExc_Exception, "Address Not Found"); - sys_mutex_unlock(router->lock); - return 0; - } - - dx_router_del_node_ref_LH(&addr->rnodes, rnode); - sys_mutex_unlock(router->lock); - - dx_router_check_addr(router, addr, 0); - - dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit); - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyMethodDef RouterAdapter_methods[] = { - {"add_remote_router", dx_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"}, - {"del_remote_router", dx_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"}, - {"set_next_hop", dx_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"}, - {"set_valid_origins", dx_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"}, - {"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"}, - {"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"}, - {"map_destination", dx_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"}, - {"unmap_destination", dx_unmap_destination, METH_VARARGS, "Delete a destination mapping"}, - {0, 0, 0, 0} -}; - -static PyTypeObject RouterAdapterType = { - PyObject_HEAD_INIT(0) - 0, /* ob_size*/ - "dispatch.RouterAdapter", /* tp_name*/ - sizeof(RouterAdapter), /* tp_basicsize*/ - 0, /* tp_itemsize*/ - 0, /* tp_dealloc*/ - 0, /* tp_print*/ - 0, /* tp_getattr*/ - 0, /* tp_setattr*/ - 0, /* tp_compare*/ - 0, /* tp_repr*/ - 0, /* tp_as_number*/ - 0, /* tp_as_sequence*/ - 0, /* tp_as_mapping*/ - 0, /* tp_hash */ - 0, /* tp_call*/ - 0, /* tp_str*/ - 0, /* tp_getattro*/ - 0, /* tp_setattro*/ - 0, /* tp_as_buffer*/ - Py_TPFLAGS_DEFAULT, /* tp_flags*/ - "Dispatch Router Adapter", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - RouterAdapter_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0, /* tp_alloc */ - 0, /* tp_new */ - 0, /* tp_free */ - 0, /* tp_is_gc */ - 0, /* tp_bases */ - 0, /* tp_mro */ - 0, /* tp_cache */ - 0, /* tp_subclasses */ - 0, /* tp_weaklist */ - 0, /* tp_del */ - 0 /* tp_version_tag */ -}; - - -void dx_router_python_setup(dx_router_t *router) -{ - // - // If we are not operating as an interior router, don't start the - // router module. - // - if (router->router_mode != DX_ROUTER_MODE_INTERIOR) - return; - - PyObject *pDispatchModule = dx_python_module(); - - RouterAdapterType.tp_new = PyType_GenericNew; - if (PyType_Ready(&RouterAdapterType) < 0) { - PyErr_Print(); - dx_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter"); - return; - } - - PyTypeObject *raType = &RouterAdapterType; - Py_INCREF(raType); - PyModule_AddObject(pDispatchModule, "RouterAdapter", (PyObject*) &RouterAdapterType); - - // - // Attempt to import the Python Router module - // - PyObject* pName; - PyObject* pId; - PyObject* pArea; - PyObject* pMaxRouters; - PyObject* pModule; - PyObject* pClass; - PyObject* pArgs; - - pName = PyString_FromString("qpid.dispatch.router"); - pModule = PyImport_Import(pName); - Py_DECREF(pName); - if (!pModule) { - dx_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module"); - return; - } - - pClass = PyObject_GetAttrString(pModule, "RouterEngine"); - if (!pClass || !PyClass_Check(pClass)) { - dx_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module"); - return; - } - - PyObject *adapterType = PyObject_GetAttrString(pDispatchModule, "RouterAdapter"); - PyObject *adapterInstance = PyObject_CallObject(adapterType, 0); - assert(adapterInstance); - - ((RouterAdapter*) adapterInstance)->router = router; - - // - // Constructor Arguments for RouterEngine - // - pArgs = PyTuple_New(4); - - // arg 0: adapter instance - PyTuple_SetItem(pArgs, 0, adapterInstance); - - // arg 1: router_id - pId = PyString_FromString(router->router_id); - PyTuple_SetItem(pArgs, 1, pId); - - // arg 2: area_id - pArea = PyString_FromString(router->router_area); - PyTuple_SetItem(pArgs, 2, pArea); - - // arg 3: max_routers - pMaxRouters = PyInt_FromLong((long) dx_bitmask_width()); - PyTuple_SetItem(pArgs, 3, pMaxRouters); - - // - // Instantiate the router - // - router->pyRouter = PyInstance_New(pClass, pArgs, 0); - Py_DECREF(pArgs); - Py_DECREF(adapterType); - - if (!router->pyRouter) { - PyErr_Print(); - dx_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated"); - return; - } - - router->pyTick = PyObject_GetAttrString(router->pyRouter, "handleTimerTick"); - if (!router->pyTick || !PyCallable_Check(router->pyTick)) { - dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method"); - return; - } - - router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded"); - if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) { - dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method"); - return; - } - - router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved"); - if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) { - dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method"); - return; - } -} - - -void dx_pyrouter_tick(dx_router_t *router) -{ - PyObject *pArgs; - PyObject *pValue; - - if (router->pyTick && router->router_mode == DX_ROUTER_MODE_INTERIOR) { - dx_python_lock(); - pArgs = PyTuple_New(0); - pValue = PyObject_CallObject(router->pyTick, pArgs); - if (PyErr_Occurred()) { - PyErr_Print(); - } - Py_DECREF(pArgs); - if (pValue) { - Py_DECREF(pValue); - } - dx_python_unlock(); - } -} - - -void dx_router_mobile_added(dx_router_t *router, dx_field_iterator_t *iter) -{ - PyObject *pArgs; - PyObject *pValue; - - if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) { - dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - char *address = (char*) dx_field_iterator_copy(iter); - - dx_python_lock(); - pArgs = PyTuple_New(1); - PyTuple_SetItem(pArgs, 0, PyString_FromString(address)); - pValue = PyObject_CallObject(router->pyAdded, pArgs); - if (PyErr_Occurred()) { - PyErr_Print(); - } - Py_DECREF(pArgs); - if (pValue) { - Py_DECREF(pValue); - } - dx_python_unlock(); - - free(address); - } -} - - -void dx_router_mobile_removed(dx_router_t *router, const char *address) -{ - PyObject *pArgs; - PyObject *pValue; - - if (router->pyRemoved && router->router_mode == DX_ROUTER_MODE_INTERIOR) { - dx_python_lock(); - pArgs = PyTuple_New(1); - PyTuple_SetItem(pArgs, 0, PyString_FromString(address)); - pValue = PyObject_CallObject(router->pyRemoved, pArgs); - if (PyErr_Occurred()) { - PyErr_Print(); - } - Py_DECREF(pArgs); - if (pValue) { - Py_DECREF(pValue); - } - dx_python_unlock(); - } -} - diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c deleted file mode 100644 index 65e181bd2c..0000000000 --- a/qpid/extras/dispatch/src/server.c +++ /dev/null @@ -1,1039 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/threading.h> -#include <qpid/dispatch/log.h> -#include "server_private.h" -#include "timer_private.h" -#include "alloc_private.h" -#include "dispatch_private.h" -#include "work_queue.h" -#include <stdio.h> -#include <time.h> - -static char *module="SERVER"; -static __thread dx_server_t *thread_server = 0; - -typedef struct dx_thread_t { - dx_server_t *dx_server; - int thread_id; - volatile int running; - volatile int canceled; - int using_thread; - sys_thread_t *thread; -} dx_thread_t; - - -struct dx_server_t { - int thread_count; - const char *container_name; - pn_driver_t *driver; - dx_thread_start_cb_t start_handler; - dx_conn_handler_cb_t conn_handler; - dx_user_fd_handler_cb_t ufd_handler; - void *start_context; - void *conn_handler_context; - sys_cond_t *cond; - sys_mutex_t *lock; - dx_thread_t **threads; - work_queue_t *work_queue; - dx_timer_list_t pending_timers; - bool a_thread_is_waiting; - int threads_active; - int pause_requests; - int threads_paused; - int pause_next_sequence; - int pause_now_serving; - dx_signal_handler_cb_t signal_handler; - void *signal_context; - int pending_signal; -}; - - - - -ALLOC_DEFINE(dx_listener_t); -ALLOC_DEFINE(dx_connector_t); -ALLOC_DEFINE(dx_connection_t); -ALLOC_DEFINE(dx_user_fd_t); - - -static dx_thread_t *thread(dx_server_t *dx_server, int id) -{ - dx_thread_t *thread = NEW(dx_thread_t); - if (!thread) - return 0; - - thread->dx_server = dx_server; - thread->thread_id = id; - thread->running = 0; - thread->canceled = 0; - thread->using_thread = 0; - - return thread; -} - - -static void thread_process_listeners(dx_server_t *dx_server) -{ - pn_driver_t *driver = dx_server->driver; - pn_listener_t *listener = pn_driver_listener(driver); - pn_connector_t *cxtr; - dx_connection_t *ctx; - - while (listener) { - cxtr = pn_listener_accept(listener); - dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr)); - ctx = new_dx_connection_t(); - ctx->state = CONN_STATE_OPENING; - ctx->owner_thread = CONTEXT_NO_OWNER; - ctx->enqueued = 0; - ctx->pn_cxtr = cxtr; - ctx->listener = (dx_listener_t*) pn_listener_context(listener); - ctx->connector = 0; - ctx->context = ctx->listener->context; - ctx->user_context = 0; - ctx->link_context = 0; - ctx->ufd = 0; - - pn_connection_t *conn = pn_connection(); - pn_connection_set_container(conn, dx_server->container_name); - pn_connector_set_connection(cxtr, conn); - pn_connection_set_context(conn, ctx); - ctx->pn_conn = conn; - - // - // Get a pointer to the transport so we can insert security components into it - // - pn_transport_t *tport = pn_connector_transport(cxtr); - const dx_server_config_t *config = ctx->listener->config; - - // - // Set up SSL if appropriate - // - if (config->ssl_enabled) { - pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_SERVER); - pn_ssl_domain_set_credentials(domain, - config->ssl_certificate_file, - config->ssl_private_key_file, - config->ssl_password); - if (config->ssl_allow_unsecured_client) - pn_ssl_domain_allow_unsecured_client(domain); - - if (config->ssl_require_peer_authentication) - pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db); - - pn_ssl_t *ssl = pn_ssl(tport); - pn_ssl_init(ssl, domain, 0); - pn_ssl_domain_free(domain); - } - - // - // Set up SASL - // - pn_sasl_t *sasl = pn_sasl(tport); - pn_sasl_mechanisms(sasl, config->sasl_mechanisms); - pn_sasl_server(sasl); - pn_sasl_done(sasl, PN_SASL_OK); // TODO - This needs to go away - - pn_connector_set_context(cxtr, ctx); - listener = pn_driver_listener(driver); - } -} - - -static void handle_signals_LH(dx_server_t *dx_server) -{ - int signum = dx_server->pending_signal; - - if (signum) { - dx_server->pending_signal = 0; - if (dx_server->signal_handler) { - sys_mutex_unlock(dx_server->lock); - dx_server->signal_handler(dx_server->signal_context, signum); - sys_mutex_lock(dx_server->lock); - } - } -} - - -static void block_if_paused_LH(dx_server_t *dx_server) -{ - if (dx_server->pause_requests > 0) { - dx_server->threads_paused++; - sys_cond_signal_all(dx_server->cond); - while (dx_server->pause_requests > 0) - sys_cond_wait(dx_server->cond, dx_server->lock); - dx_server->threads_paused--; - } -} - - -static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) -{ - dx_connection_t *ctx = pn_connector_context(cxtr); - int events = 0; - int passes = 0; - - if (ctx->state == CONN_STATE_USER) { - dx_server->ufd_handler(ctx->ufd->context, ctx->ufd); - return 1; - } - - do { - passes++; - - // - // Step the engine for pre-handler processing - // - pn_connector_process(cxtr); - - // - // Call the handler that is appropriate for the connector's state. - // - switch (ctx->state) { - case CONN_STATE_CONNECTING: { - if (pn_connector_closed(cxtr)) { - ctx->state = CONN_STATE_FAILED; - events = 0; - break; - } - - pn_connection_t *conn = pn_connection(); - pn_connection_set_container(conn, dx_server->container_name); - pn_connector_set_connection(cxtr, conn); - pn_connection_set_context(conn, ctx); - ctx->pn_conn = conn; - - pn_transport_t *tport = pn_connector_transport(cxtr); - const dx_server_config_t *config = ctx->connector->config; - - // - // Set up SSL if appropriate - // - if (config->ssl_enabled) { - pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT); - pn_ssl_domain_set_credentials(domain, - config->ssl_certificate_file, - config->ssl_private_key_file, - config->ssl_password); - - if (config->ssl_require_peer_authentication) - pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db); - - pn_ssl_t *ssl = pn_ssl(tport); - pn_ssl_init(ssl, domain, 0); - pn_ssl_domain_free(domain); - } - - // - // Set up SASL - // - pn_sasl_t *sasl = pn_sasl(tport); - pn_sasl_mechanisms(sasl, config->sasl_mechanisms); - pn_sasl_client(sasl); - - ctx->state = CONN_STATE_OPENING; - assert(ctx->connector); - ctx->connector->state = CXTR_STATE_OPEN; - events = 1; - break; - } - - case CONN_STATE_OPENING: { - pn_transport_t *tport = pn_connector_transport(cxtr); - pn_sasl_t *sasl = pn_sasl(tport); - - if (pn_sasl_outcome(sasl) == PN_SASL_OK) { - ctx->state = CONN_STATE_OPERATIONAL; - - dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy - - if (ctx->listener) { - ce = DX_CONN_EVENT_LISTENER_OPEN; - } else if (ctx->connector) { - ce = DX_CONN_EVENT_CONNECTOR_OPEN; - ctx->connector->delay = 0; - } else - assert(0); - - dx_server->conn_handler(dx_server->conn_handler_context, - ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); - events = 1; - break; - } - else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) { - ctx->state = CONN_STATE_FAILED; - if (ctx->connector) { - const dx_server_config_t *config = ctx->connector->config; - dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port); - } - } - } - - case CONN_STATE_OPERATIONAL: - if (pn_connector_closed(cxtr)) { - dx_server->conn_handler(dx_server->conn_handler_context, ctx->context, - DX_CONN_EVENT_CLOSE, - (dx_connection_t*) pn_connector_context(cxtr)); - events = 0; - } - else - events = dx_server->conn_handler(dx_server->conn_handler_context, ctx->context, - DX_CONN_EVENT_PROCESS, - (dx_connection_t*) pn_connector_context(cxtr)); - break; - - default: - break; - } - } while (events > 0); - - return passes > 1; -} - - -// -// TEMPORARY FUNCTION PROTOTYPES -// -void pn_driver_wait_1(pn_driver_t *d); -int pn_driver_wait_2(pn_driver_t *d, int timeout); -void pn_driver_wait_3(pn_driver_t *d); -// -// END TEMPORARY -// - -static void *thread_run(void *arg) -{ - dx_thread_t *thread = (dx_thread_t*) arg; - dx_server_t *dx_server = thread->dx_server; - pn_connector_t *work; - pn_connection_t *conn; - dx_connection_t *ctx; - int error; - int poll_result; - - if (!thread) - return 0; - - thread_server = dx_server; - thread->running = 1; - - if (thread->canceled) - return 0; - - // - // Invoke the start handler if the application supplied one. - // This handler can be used to set NUMA or processor affinnity for the thread. - // - if (dx_server->start_handler) - dx_server->start_handler(dx_server->start_context, thread->thread_id); - - // - // Main Loop - // - while (thread->running) { - sys_mutex_lock(dx_server->lock); - - // - // Check for pending signals to process - // - handle_signals_LH(dx_server); - if (!thread->running) { - sys_mutex_unlock(dx_server->lock); - break; - } - - // - // Check to see if the server is pausing. If so, block here. - // - block_if_paused_LH(dx_server); - if (!thread->running) { - sys_mutex_unlock(dx_server->lock); - break; - } - - // - // Service pending timers. - // - dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); - if (timer) { - DEQ_REMOVE_HEAD(dx_server->pending_timers); - - // - // Mark the timer as idle in case it reschedules itself. - // - dx_timer_idle_LH(timer); - - // - // Release the lock and invoke the connection handler. - // - sys_mutex_unlock(dx_server->lock); - timer->handler(timer->context); - pn_driver_wakeup(dx_server->driver); - continue; - } - - // - // Check the work queue for connectors scheduled for processing. - // - work = work_queue_get(dx_server->work_queue); - if (!work) { - // - // There is no pending work to do - // - if (dx_server->a_thread_is_waiting) { - // - // Another thread is waiting on the proton driver, this thread must - // wait on the condition variable until signaled. - // - sys_cond_wait(dx_server->cond, dx_server->lock); - } else { - // - // This thread elects itself to wait on the proton driver. Set the - // thread-is-waiting flag so other idle threads will not interfere. - // - dx_server->a_thread_is_waiting = true; - - // - // Ask the timer module when its next timer is scheduled to fire. We'll - // use this value in driver_wait as the timeout. If there are no scheduled - // timers, the returned value will be -1. - // - long duration = dx_timer_next_duration_LH(); - - // - // Invoke the proton driver's wait sequence. This is a bit of a hack for now - // and will be improved in the future. The wait process is divided into three parts, - // the first and third of which need to be non-reentrant, and the second of which - // must be reentrant (and blocks). - // - pn_driver_wait_1(dx_server->driver); - sys_mutex_unlock(dx_server->lock); - - do { - error = 0; - poll_result = pn_driver_wait_2(dx_server->driver, duration); - if (poll_result == -1) - error = pn_driver_errno(dx_server->driver); - } while (error == PN_INTR); - if (error) { - dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver))); - exit(-1); - } - - sys_mutex_lock(dx_server->lock); - pn_driver_wait_3(dx_server->driver); - - if (!thread->running) { - sys_mutex_unlock(dx_server->lock); - break; - } - - // - // Visit the timer module. - // - struct timespec tv; - clock_gettime(CLOCK_REALTIME, &tv); - long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000; - dx_timer_visit_LH(milliseconds); - - // - // Process listeners (incoming connections). - // - thread_process_listeners(dx_server); - - // - // Traverse the list of connectors-needing-service from the proton driver. - // If the connector is not already in the work queue and it is not currently - // being processed by another thread, put it in the work queue and signal the - // condition variable. - // - work = pn_driver_connector(dx_server->driver); - while (work) { - ctx = pn_connector_context(work); - if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { - ctx->enqueued = 1; - work_queue_put(dx_server->work_queue, work); - sys_cond_signal(dx_server->cond); - } - work = pn_driver_connector(dx_server->driver); - } - - // - // Release our exclusive claim on pn_driver_wait. - // - dx_server->a_thread_is_waiting = false; - } - } - - // - // If we were given a connector to work on from the work queue, mark it as - // owned by this thread and as no longer enqueued. - // - if (work) { - ctx = pn_connector_context(work); - if (ctx->owner_thread == CONTEXT_NO_OWNER) { - ctx->owner_thread = thread->thread_id; - ctx->enqueued = 0; - dx_server->threads_active++; - } else { - // - // This connector is being processed by another thread, re-queue it. - // - work_queue_put(dx_server->work_queue, work); - work = 0; - } - } - sys_mutex_unlock(dx_server->lock); - - // - // Process the connector that we now have exclusive access to. - // - if (work) { - int work_done = process_connector(dx_server, work); - - // - // Check to see if the connector was closed during processing - // - if (pn_connector_closed(work)) { - // - // Connector is closed. Free the context and the connector. - // - conn = pn_connector_connection(work); - - // - // If this is a dispatch connector, schedule the re-connect timer - // - if (ctx->connector) { - ctx->connector->ctx = 0; - ctx->connector->state = CXTR_STATE_CONNECTING; - dx_timer_schedule(ctx->connector->timer, ctx->connector->delay); - } - - sys_mutex_lock(dx_server->lock); - free_dx_connection_t(ctx); - pn_connector_free(work); - if (conn) - pn_connection_free(conn); - dx_server->threads_active--; - sys_mutex_unlock(dx_server->lock); - } else { - // - // The connector lives on. Mark it as no longer owned by this thread. - // - sys_mutex_lock(dx_server->lock); - ctx->owner_thread = CONTEXT_NO_OWNER; - dx_server->threads_active--; - sys_mutex_unlock(dx_server->lock); - } - - // - // Wake up the proton driver to force it to reconsider its set of FDs - // in light of the processing that just occurred. - // - if (work_done) - pn_driver_wakeup(dx_server->driver); - } - } - - return 0; -} - - -static void thread_start(dx_thread_t *thread) -{ - if (!thread) - return; - - thread->using_thread = 1; - thread->thread = sys_thread(thread_run, (void*) thread); -} - - -static void thread_cancel(dx_thread_t *thread) -{ - if (!thread) - return; - - thread->running = 0; - thread->canceled = 1; -} - - -static void thread_join(dx_thread_t *thread) -{ - if (!thread) - return; - - if (thread->using_thread) - sys_thread_join(thread->thread); -} - - -static void thread_free(dx_thread_t *thread) -{ - if (!thread) - return; - - free(thread); -} - - -static void cxtr_try_open(void *context) -{ - dx_connector_t *ct = (dx_connector_t*) context; - if (ct->state != CXTR_STATE_CONNECTING) - return; - - dx_connection_t *ctx = new_dx_connection_t(); - ctx->server = ct->server; - ctx->state = CONN_STATE_CONNECTING; - ctx->owner_thread = CONTEXT_NO_OWNER; - ctx->enqueued = 0; - ctx->pn_conn = 0; - ctx->listener = 0; - ctx->connector = ct; - ctx->context = ct->context; - ctx->user_context = 0; - ctx->link_context = 0; - ctx->ufd = 0; - - // - // pn_connector is not thread safe - // - sys_mutex_lock(ct->server->lock); - ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host, ct->config->port, (void*) ctx); - sys_mutex_unlock(ct->server->lock); - - ct->ctx = ctx; - ct->delay = 5000; - dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); -} - - -dx_server_t *dx_server(int thread_count, const char *container_name) -{ - int i; - - dx_server_t *dx_server = NEW(dx_server_t); - if (dx_server == 0) - return 0; - - dx_server->thread_count = thread_count; - dx_server->container_name = container_name; - dx_server->driver = pn_driver(); - dx_server->start_handler = 0; - dx_server->conn_handler = 0; - dx_server->signal_handler = 0; - dx_server->ufd_handler = 0; - dx_server->start_context = 0; - dx_server->signal_context = 0; - dx_server->lock = sys_mutex(); - dx_server->cond = sys_cond(); - - dx_timer_initialize(dx_server->lock); - - dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count); - for (i = 0; i < thread_count; i++) - dx_server->threads[i] = thread(dx_server, i); - - dx_server->work_queue = work_queue(); - DEQ_INIT(dx_server->pending_timers); - dx_server->a_thread_is_waiting = false; - dx_server->threads_active = 0; - dx_server->pause_requests = 0; - dx_server->threads_paused = 0; - dx_server->pause_next_sequence = 0; - dx_server->pause_now_serving = 0; - dx_server->pending_signal = 0; - - dx_log(module, LOG_INFO, "Container Name: %s", dx_server->container_name); - - return dx_server; -} - - -void dx_server_setup_agent(dx_dispatch_t *dx) -{ - // TODO -} - - -void dx_server_free(dx_server_t *dx_server) -{ - int i; - if (!dx_server) - return; - - for (i = 0; i < dx_server->thread_count; i++) - thread_free(dx_server->threads[i]); - - work_queue_free(dx_server->work_queue); - - pn_driver_free(dx_server->driver); - sys_mutex_free(dx_server->lock); - sys_cond_free(dx_server->cond); - free(dx_server); -} - - -void dx_server_set_conn_handler(dx_dispatch_t *dx, dx_conn_handler_cb_t handler, void *handler_context) -{ - dx->server->conn_handler = handler; - dx->server->conn_handler_context = handler_context; -} - - -void dx_server_set_signal_handler(dx_dispatch_t *dx, dx_signal_handler_cb_t handler, void *context) -{ - dx->server->signal_handler = handler; - dx->server->signal_context = context; -} - - -void dx_server_set_start_handler(dx_dispatch_t *dx, dx_thread_start_cb_t handler, void *context) -{ - dx->server->start_handler = handler; - dx->server->start_context = context; -} - - -void dx_server_set_user_fd_handler(dx_dispatch_t *dx, dx_user_fd_handler_cb_t ufd_handler) -{ - dx->server->ufd_handler = ufd_handler; -} - - -void dx_server_run(dx_dispatch_t *dx) -{ - dx_server_t *dx_server = dx->server; - - int i; - if (!dx_server) - return; - - assert(dx_server->conn_handler); // Server can't run without a connection handler. - - for (i = 1; i < dx_server->thread_count; i++) - thread_start(dx_server->threads[i]); - - dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count); - - thread_run((void*) dx_server->threads[0]); - - for (i = 1; i < dx_server->thread_count; i++) - thread_join(dx_server->threads[i]); - - dx_log(module, LOG_INFO, "Shut Down"); -} - - -void dx_server_start(dx_dispatch_t *dx) -{ - dx_server_t *dx_server = dx->server; - int i; - - if (!dx_server) - return; - - assert(dx_server->conn_handler); // Server can't run without a connection handler. - - for (i = 0; i < dx_server->thread_count; i++) - thread_start(dx_server->threads[i]); - - dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count); -} - - -void dx_server_stop(dx_dispatch_t *dx) -{ - dx_server_t *dx_server = dx->server; - int idx; - - sys_mutex_lock(dx_server->lock); - for (idx = 0; idx < dx_server->thread_count; idx++) - thread_cancel(dx_server->threads[idx]); - sys_cond_signal_all(dx_server->cond); - pn_driver_wakeup(dx_server->driver); - sys_mutex_unlock(dx_server->lock); - - if (thread_server != dx_server) { - for (idx = 0; idx < dx_server->thread_count; idx++) - thread_join(dx_server->threads[idx]); - dx_log(module, LOG_INFO, "Shut Down"); - } -} - - -void dx_server_signal(dx_dispatch_t *dx, int signum) -{ - dx_server_t *dx_server = dx->server; - - dx_server->pending_signal = signum; - sys_cond_signal_all(dx_server->cond); - pn_driver_wakeup(dx_server->driver); -} - - -void dx_server_pause(dx_dispatch_t *dx) -{ - dx_server_t *dx_server = dx->server; - - sys_mutex_lock(dx_server->lock); - - // - // Bump the request count to stop all the threads. - // - dx_server->pause_requests++; - int my_sequence = dx_server->pause_next_sequence++; - - // - // Awaken all threads that are currently blocking. - // - sys_cond_signal_all(dx_server->cond); - pn_driver_wakeup(dx_server->driver); - - // - // Wait for the paused thread count plus the number of threads requesting a pause to equal - // the total thread count. Also, don't exit the blocking loop until now_serving equals our - // sequence number. This ensures that concurrent pausers don't run at the same time. - // - while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) || - (my_sequence != dx_server->pause_now_serving)) - sys_cond_wait(dx_server->cond, dx_server->lock); - - sys_mutex_unlock(dx_server->lock); -} - - -void dx_server_resume(dx_dispatch_t *dx) -{ - dx_server_t *dx_server = dx->server; - - sys_mutex_lock(dx_server->lock); - dx_server->pause_requests--; - dx_server->pause_now_serving++; - sys_cond_signal_all(dx_server->cond); - sys_mutex_unlock(dx_server->lock); -} - - -void dx_server_activate(dx_connection_t *ctx) -{ - if (!ctx) - return; - - pn_connector_t *ctor = ctx->pn_cxtr; - if (!ctor) - return; - - if (!pn_connector_closed(ctor)) - pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE); -} - - -void dx_connection_set_context(dx_connection_t *conn, void *context) -{ - conn->user_context = context; -} - - -void *dx_connection_get_context(dx_connection_t *conn) -{ - return conn->user_context; -} - - -void dx_connection_set_link_context(dx_connection_t *conn, void *context) -{ - conn->link_context = context; -} - - -void *dx_connection_get_link_context(dx_connection_t *conn) -{ - return conn->link_context; -} - - -pn_connection_t *dx_connection_pn(dx_connection_t *conn) -{ - return conn->pn_conn; -} - - -const dx_server_config_t *dx_connection_config(const dx_connection_t *conn) -{ - if (conn->listener) - return conn->listener->config; - return conn->connector->config; -} - - -dx_listener_t *dx_server_listen(dx_dispatch_t *dx, const dx_server_config_t *config, void *context) -{ - dx_server_t *dx_server = dx->server; - dx_listener_t *li = new_dx_listener_t(); - - if (!li) - return 0; - - li->server = dx_server; - li->config = config; - li->context = context; - li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li); - - if (!li->pn_listener) { - dx_log(module, LOG_ERROR, "Driver Error %d (%s)", - pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver)); - free_dx_listener_t(li); - return 0; - } - dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port); - - return li; -} - - -void dx_server_listener_free(dx_listener_t* li) -{ - pn_listener_free(li->pn_listener); - free_dx_listener_t(li); -} - - -void dx_server_listener_close(dx_listener_t* li) -{ - pn_listener_close(li->pn_listener); -} - - -dx_connector_t *dx_server_connect(dx_dispatch_t *dx, const dx_server_config_t *config, void *context) -{ - dx_server_t *dx_server = dx->server; - dx_connector_t *ct = new_dx_connector_t(); - - if (!ct) - return 0; - - ct->server = dx_server; - ct->state = CXTR_STATE_CONNECTING; - ct->config = config; - ct->context = context; - ct->ctx = 0; - ct->timer = dx_timer(dx, cxtr_try_open, (void*) ct); - ct->delay = 0; - - dx_timer_schedule(ct->timer, ct->delay); - return ct; -} - - -void dx_server_connector_free(dx_connector_t* ct) -{ - // Don't free the proton connector. This will be done by the connector - // processing/cleanup. - - if (ct->ctx) { - pn_connector_close(ct->ctx->pn_cxtr); - ct->ctx->connector = 0; - } - - dx_timer_free(ct->timer); - free_dx_connector_t(ct); -} - - -dx_user_fd_t *dx_user_fd(dx_dispatch_t *dx, int fd, void *context) -{ - dx_server_t *dx_server = dx->server; - dx_user_fd_t *ufd = new_dx_user_fd_t(); - - if (!ufd) - return 0; - - dx_connection_t *ctx = new_dx_connection_t(); - ctx->server = dx_server; - ctx->state = CONN_STATE_USER; - ctx->owner_thread = CONTEXT_NO_OWNER; - ctx->enqueued = 0; - ctx->pn_conn = 0; - ctx->listener = 0; - ctx->connector = 0; - ctx->context = 0; - ctx->user_context = 0; - ctx->link_context = 0; - ctx->ufd = ufd; - - ufd->context = context; - ufd->server = dx_server; - ufd->fd = fd; - ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx); - pn_driver_wakeup(dx_server->driver); - - return ufd; -} - - -void dx_user_fd_free(dx_user_fd_t *ufd) -{ - pn_connector_close(ufd->pn_conn); - free_dx_user_fd_t(ufd); -} - - -void dx_user_fd_activate_read(dx_user_fd_t *ufd) -{ - pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE); - pn_driver_wakeup(ufd->server->driver); -} - - -void dx_user_fd_activate_write(dx_user_fd_t *ufd) -{ - pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE); - pn_driver_wakeup(ufd->server->driver); -} - - -bool dx_user_fd_is_readable(dx_user_fd_t *ufd) -{ - return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE); -} - - -bool dx_user_fd_is_writeable(dx_user_fd_t *ufd) -{ - return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE); -} - - -void dx_server_timer_pending_LH(dx_timer_t *timer) -{ - DEQ_INSERT_TAIL(timer->server->pending_timers, timer); -} - - -void dx_server_timer_cancel_LH(dx_timer_t *timer) -{ - DEQ_REMOVE(timer->server->pending_timers, timer); -} - diff --git a/qpid/extras/dispatch/src/server_private.h b/qpid/extras/dispatch/src/server_private.h deleted file mode 100644 index 86a3ef98f1..0000000000 --- a/qpid/extras/dispatch/src/server_private.h +++ /dev/null @@ -1,100 +0,0 @@ -#ifndef __server_private_h__ -#define __server_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/server.h> -#include <qpid/dispatch/user_fd.h> -#include <qpid/dispatch/timer.h> -#include <qpid/dispatch/alloc.h> -#include <proton/driver.h> -#include <proton/driver_extras.h> - -void dx_server_timer_pending_LH(dx_timer_t *timer); -void dx_server_timer_cancel_LH(dx_timer_t *timer); - - -typedef enum { - CONN_STATE_CONNECTING = 0, - CONN_STATE_OPENING, - CONN_STATE_OPERATIONAL, - CONN_STATE_FAILED, - CONN_STATE_USER -} conn_state_t; - -#define CONTEXT_NO_OWNER -1 - -typedef enum { - CXTR_STATE_CONNECTING = 0, - CXTR_STATE_OPEN, - CXTR_STATE_FAILED -} cxtr_state_t; - -typedef struct dx_server_t dx_server_t; - -struct dx_listener_t { - dx_server_t *server; - const dx_server_config_t *config; - void *context; - pn_listener_t *pn_listener; -}; - - -struct dx_connector_t { - dx_server_t *server; - cxtr_state_t state; - const dx_server_config_t *config; - void *context; - dx_connection_t *ctx; - dx_timer_t *timer; - long delay; -}; - - -struct dx_connection_t { - dx_server_t *server; - conn_state_t state; - int owner_thread; - int enqueued; - pn_connector_t *pn_cxtr; - pn_connection_t *pn_conn; - dx_listener_t *listener; - dx_connector_t *connector; - void *context; // Copy of context from listener or connector - void *user_context; - void *link_context; // Context shared by this connection's links - dx_user_fd_t *ufd; -}; - - -struct dx_user_fd_t { - dx_server_t *server; - void *context; - int fd; - pn_connector_t *pn_conn; -}; - - -ALLOC_DECLARE(dx_listener_t); -ALLOC_DECLARE(dx_connector_t); -ALLOC_DECLARE(dx_connection_t); -ALLOC_DECLARE(dx_user_fd_t); - - -#endif diff --git a/qpid/extras/dispatch/src/timer.c b/qpid/extras/dispatch/src/timer.c deleted file mode 100644 index a7f36e149b..0000000000 --- a/qpid/extras/dispatch/src/timer.c +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "timer_private.h" -#include "server_private.h" -#include "dispatch_private.h" -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/threading.h> -#include <qpid/dispatch/alloc.h> -#include <assert.h> -#include <stdio.h> - -static sys_mutex_t *lock; -static dx_timer_list_t idle_timers; -static dx_timer_list_t scheduled_timers; -static long time_base; - -ALLOC_DECLARE(dx_timer_t); -ALLOC_DEFINE(dx_timer_t); - -//========================================================================= -// Private static functions -//========================================================================= - -static void dx_timer_cancel_LH(dx_timer_t *timer) -{ - switch (timer->state) { - case TIMER_FREE: - assert(0); - break; - - case TIMER_IDLE: - break; - - case TIMER_SCHEDULED: - if (timer->next) - timer->next->delta_time += timer->delta_time; - DEQ_REMOVE(scheduled_timers, timer); - DEQ_INSERT_TAIL(idle_timers, timer); - break; - - case TIMER_PENDING: - dx_server_timer_cancel_LH(timer); - DEQ_INSERT_TAIL(idle_timers, timer); - break; - } - - timer->state = TIMER_IDLE; -} - - -//========================================================================= -// Public Functions from timer.h -//========================================================================= - -dx_timer_t *dx_timer(dx_dispatch_t *dx, dx_timer_cb_t cb, void* context) -{ - dx_timer_t *timer = new_dx_timer_t(); - if (!timer) - return 0; - - DEQ_ITEM_INIT(timer); - - timer->server = dx ? dx->server : 0; - timer->handler = cb; - timer->context = context; - timer->delta_time = 0; - timer->state = TIMER_IDLE; - - sys_mutex_lock(lock); - DEQ_INSERT_TAIL(idle_timers, timer); - sys_mutex_unlock(lock); - - return timer; -} - - -void dx_timer_free(dx_timer_t *timer) -{ - sys_mutex_lock(lock); - dx_timer_cancel_LH(timer); - DEQ_REMOVE(idle_timers, timer); - sys_mutex_unlock(lock); - - timer->state = TIMER_FREE; - free_dx_timer_t(timer); -} - - -void dx_timer_schedule(dx_timer_t *timer, long duration) -{ - dx_timer_t *ptr; - dx_timer_t *last; - long total_time; - - sys_mutex_lock(lock); - dx_timer_cancel_LH(timer); // Timer is now on the idle list - assert(timer->state == TIMER_IDLE); - DEQ_REMOVE(idle_timers, timer); - - // - // Handle the special case of a zero-time scheduling. In this case, - // the timer doesn't go on the scheduled list. It goes straight to the - // pending list in the server. - // - if (duration == 0) { - timer->state = TIMER_PENDING; - dx_server_timer_pending_LH(timer); - sys_mutex_unlock(lock); - return; - } - - // - // Find the insert point in the schedule. - // - total_time = 0; - ptr = DEQ_HEAD(scheduled_timers); - assert(!ptr || ptr->prev == 0); - while (ptr) { - total_time += ptr->delta_time; - if (total_time > duration) - break; - ptr = ptr->next; - } - - // - // Insert the timer into the schedule and adjust the delta time - // of the following timer if present. - // - if (total_time <= duration) { - assert(ptr == 0); - timer->delta_time = duration - total_time; - DEQ_INSERT_TAIL(scheduled_timers, timer); - } else { - total_time -= ptr->delta_time; - timer->delta_time = duration - total_time; - assert(ptr->delta_time > timer->delta_time); - ptr->delta_time -= timer->delta_time; - last = ptr->prev; - if (last) - DEQ_INSERT_AFTER(scheduled_timers, timer, last); - else - DEQ_INSERT_HEAD(scheduled_timers, timer); - } - - timer->state = TIMER_SCHEDULED; - - sys_mutex_unlock(lock); -} - - -void dx_timer_cancel(dx_timer_t *timer) -{ - sys_mutex_lock(lock); - dx_timer_cancel_LH(timer); - sys_mutex_unlock(lock); -} - - -//========================================================================= -// Private Functions from timer_private.h -//========================================================================= - -void dx_timer_initialize(sys_mutex_t *server_lock) -{ - lock = server_lock; - DEQ_INIT(idle_timers); - DEQ_INIT(scheduled_timers); - time_base = 0; -} - - -void dx_timer_finalize(void) -{ - lock = 0; -} - - -long dx_timer_next_duration_LH(void) -{ - dx_timer_t *timer = DEQ_HEAD(scheduled_timers); - if (timer) - return timer->delta_time; - return -1; -} - - -void dx_timer_visit_LH(long current_time) -{ - long delta; - dx_timer_t *timer = DEQ_HEAD(scheduled_timers); - - if (time_base == 0) { - time_base = current_time; - return; - } - - delta = current_time - time_base; - time_base = current_time; - - while (timer) { - assert(delta >= 0); - if (timer->delta_time > delta) { - timer->delta_time -= delta; - break; - } else { - DEQ_REMOVE_HEAD(scheduled_timers); - delta -= timer->delta_time; - timer->state = TIMER_PENDING; - dx_server_timer_pending_LH(timer); - - } - timer = DEQ_HEAD(scheduled_timers); - } -} - - -void dx_timer_idle_LH(dx_timer_t *timer) -{ - timer->state = TIMER_IDLE; - DEQ_INSERT_TAIL(idle_timers, timer); -} - diff --git a/qpid/extras/dispatch/src/timer_private.h b/qpid/extras/dispatch/src/timer_private.h deleted file mode 100644 index 4e173b7ad4..0000000000 --- a/qpid/extras/dispatch/src/timer_private.h +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef __timer_private_h__ -#define __timer_private_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/timer.h> -#include <qpid/dispatch/threading.h> -#include "server_private.h" - -typedef enum { - TIMER_FREE, - TIMER_IDLE, - TIMER_SCHEDULED, - TIMER_PENDING -} dx_timer_state_t; - - -struct dx_timer_t { - DEQ_LINKS(dx_timer_t); - dx_server_t *server; - dx_timer_cb_t handler; - void *context; - long delta_time; - dx_timer_state_t state; -}; - -DEQ_DECLARE(dx_timer_t, dx_timer_list_t); - -void dx_timer_initialize(sys_mutex_t *server_lock); -void dx_timer_finalize(void); -long dx_timer_next_duration_LH(void); -void dx_timer_visit_LH(long current_time); -void dx_timer_idle_LH(dx_timer_t *timer); - - -#endif diff --git a/qpid/extras/dispatch/src/work_queue.c b/qpid/extras/dispatch/src/work_queue.c deleted file mode 100644 index 93c1cda1a9..0000000000 --- a/qpid/extras/dispatch/src/work_queue.c +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/ctools.h> -#include "work_queue.h" -#include <string.h> -#include <stdio.h> - -#define BATCH_SIZE 100 -typedef struct work_item_t work_item_t; - -struct work_item_t { - DEQ_LINKS(work_item_t); - pn_connector_t *conn; -}; - -DEQ_DECLARE(work_item_t, work_list_t); - -struct work_queue_t { - work_list_t items; - work_list_t free_list; -}; - -static void allocate_batch(work_queue_t *w) -{ - int i; - work_item_t *batch = NEW_ARRAY(work_item_t, BATCH_SIZE); - if (!batch) - return; - - memset(batch, 0, sizeof(work_item_t) * BATCH_SIZE); - - for (i = 0; i < BATCH_SIZE; i++) - DEQ_INSERT_TAIL(w->free_list, &batch[i]); -} - - -work_queue_t *work_queue(void) -{ - work_queue_t *w = NEW(work_queue_t); - if (!w) - return 0; - - DEQ_INIT(w->items); - DEQ_INIT(w->free_list); - - allocate_batch(w); - - return w; -} - - -void work_queue_free(work_queue_t *w) -{ - if (!w) - return; - - // KEEP TRACK OF BATCHES AND FREE - free(w); -} - - -void work_queue_put(work_queue_t *w, pn_connector_t *conn) -{ - work_item_t *item; - - if (!w) - return; - if (DEQ_SIZE(w->free_list) == 0) - allocate_batch(w); - if (DEQ_SIZE(w->free_list) == 0) - return; - - item = DEQ_HEAD(w->free_list); - DEQ_REMOVE_HEAD(w->free_list); - - item->conn = conn; - - DEQ_INSERT_TAIL(w->items, item); -} - - -pn_connector_t *work_queue_get(work_queue_t *w) -{ - work_item_t *item; - pn_connector_t *conn; - - if (!w) - return 0; - item = DEQ_HEAD(w->items); - if (!item) - return 0; - - DEQ_REMOVE_HEAD(w->items); - conn = item->conn; - item->conn = 0; - - DEQ_INSERT_TAIL(w->free_list, item); - - return conn; -} - - -int work_queue_empty(work_queue_t *w) -{ - return !w || DEQ_SIZE(w->items) == 0; -} - - -int work_queue_depth(work_queue_t *w) -{ - if (!w) - return 0; - return DEQ_SIZE(w->items); -} - diff --git a/qpid/extras/dispatch/src/work_queue.h b/qpid/extras/dispatch/src/work_queue.h deleted file mode 100644 index 3ad5c21c81..0000000000 --- a/qpid/extras/dispatch/src/work_queue.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef __work_queue_h__ -#define __work_queue_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/driver.h> - -typedef struct work_queue_t work_queue_t; - -work_queue_t *work_queue(void); -void work_queue_free(work_queue_t *w); -void work_queue_put(work_queue_t *w, pn_connector_t *conn); -pn_connector_t *work_queue_get(work_queue_t *w); -int work_queue_empty(work_queue_t *w); -int work_queue_depth(work_queue_t *w); - -#endif |
