diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /extras/dispatch/src | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras/dispatch/src')
-rw-r--r-- | extras/dispatch/src/agent.c | 151 | ||||
-rw-r--r-- | extras/dispatch/src/alloc.c | 210 | ||||
-rw-r--r-- | extras/dispatch/src/alloc_private.h | 26 | ||||
-rw-r--r-- | extras/dispatch/src/auth.c | 75 | ||||
-rw-r--r-- | extras/dispatch/src/auth.h | 27 | ||||
-rw-r--r-- | extras/dispatch/src/buffer.c | 83 | ||||
-rw-r--r-- | extras/dispatch/src/container.c | 616 | ||||
-rw-r--r-- | extras/dispatch/src/hash.c | 223 | ||||
-rw-r--r-- | extras/dispatch/src/iovec.c | 81 | ||||
-rw-r--r-- | extras/dispatch/src/iterator.c | 268 | ||||
-rw-r--r-- | extras/dispatch/src/log.c | 56 | ||||
-rw-r--r-- | extras/dispatch/src/message.c | 1120 | ||||
-rw-r--r-- | extras/dispatch/src/message_private.h | 94 | ||||
-rw-r--r-- | extras/dispatch/src/posix/threading.c | 126 | ||||
-rw-r--r-- | extras/dispatch/src/router_node.c | 424 | ||||
-rw-r--r-- | extras/dispatch/src/server.c | 903 | ||||
-rw-r--r-- | extras/dispatch/src/server_private.h | 96 | ||||
-rw-r--r-- | extras/dispatch/src/timer.c | 236 | ||||
-rw-r--r-- | extras/dispatch/src/timer_private.h | 51 | ||||
-rw-r--r-- | extras/dispatch/src/work_queue.c | 132 | ||||
-rw-r--r-- | extras/dispatch/src/work_queue.h | 33 |
21 files changed, 5031 insertions, 0 deletions
diff --git a/extras/dispatch/src/agent.c b/extras/dispatch/src/agent.c new file mode 100644 index 0000000000..a885042b45 --- /dev/null +++ b/extras/dispatch/src/agent.c @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/agent.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/timer.h> +#include <string.h> + + +typedef struct dx_agent_t { + hash_t *class_hash; + dx_message_list_t in_fifo; + dx_message_list_t out_fifo; + sys_mutex_t *lock; + dx_timer_t *timer; +} dx_agent_t; + +static dx_agent_t *agent = 0; + + +struct dx_agent_class_t { + char *fqname; + void *context; + dx_agent_schema_cb_t schema_handler; + dx_agent_query_cb_t query_handler; // 0 iff class is an event. +}; + + +static void dx_agent_timer_handler(void *context) +{ + // TODO - Process the in_fifo here +} + + +void dx_agent_initialize() +{ + assert(!agent); + agent = NEW(dx_agent_t); + agent->class_hash = hash(6, 10, 1); + DEQ_INIT(agent->in_fifo); + DEQ_INIT(agent->out_fifo); + agent->lock = sys_mutex(); + agent->timer = dx_timer(dx_agent_timer_handler, agent); +} + + +void dx_agent_finalize(void) +{ + sys_mutex_free(agent->lock); + dx_timer_free(agent->timer); + hash_free(agent->class_hash); + free(agent); + agent = 0; +} + + +dx_agent_class_t *dx_agent_register_class(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler, + dx_agent_query_cb_t query_handler) +{ + dx_agent_class_t *cls = NEW(dx_agent_class_t); + assert(cls); + cls->fqname = (char*) malloc(strlen(fqname) + 1); + strcpy(cls->fqname, fqname); + cls->context = context; + cls->schema_handler = schema_handler; + cls->query_handler = query_handler; + + dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); + int result = hash_insert_const(agent->class_hash, iter, cls); + dx_field_iterator_free(iter); + assert(result >= 0); + + return cls; +} + + +dx_agent_class_t *dx_agent_register_event(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler) +{ + return dx_agent_register_class(fqname, context, schema_handler, 0); +} + + +void dx_agent_value_string(const void *correlator, const char *key, const char *value) +{ +} + + +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value) +{ +} + + +void dx_agent_value_null(const void *correlator, const char *key) +{ +} + + +void dx_agent_value_boolean(const void *correlator, const char *key, bool value) +{ +} + + +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len) +{ +} + + +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value) +{ +} + + +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value) +{ +} + + +void dx_agent_value_complete(const void *correlator, bool more) +{ +} + + +void *dx_agent_raise_event(dx_agent_class_t *event) +{ + return 0; +} + diff --git a/extras/dispatch/src/alloc.c b/extras/dispatch/src/alloc.c new file mode 100644 index 0000000000..2b3b953aad --- /dev/null +++ b/extras/dispatch/src/alloc.c @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/log.h> +#include <memory.h> +#include <stdio.h> + +typedef struct item_t item_t; + +struct item_t { + DEQ_LINKS(item_t); + dx_alloc_type_desc_t *desc; +}; + +DEQ_DECLARE(item_t, item_list_t); + +struct dx_alloc_pool_t { + item_list_t free_list; +}; + +dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0}; +dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0}; + +sys_mutex_t *init_lock; +item_list_t type_list; + +static void dx_alloc_init(dx_alloc_type_desc_t *desc) +{ + sys_mutex_lock(init_lock); + + desc->total_size = desc->type_size; + if (desc->additional_size) + desc->total_size += *desc->additional_size; + + dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d", + desc->type_name, desc->type_size, desc->total_size); + + if (!desc->global_pool) { + if (desc->config == 0) + desc->config = desc->total_size > 256 ? + &dx_alloc_default_config_big : &dx_alloc_default_config_small; + + assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size); + + desc->global_pool = NEW(dx_alloc_pool_t); + DEQ_INIT(desc->global_pool->free_list); + desc->lock = sys_mutex(); + desc->stats = NEW(dx_alloc_stats_t); + memset(desc->stats, 0, sizeof(dx_alloc_stats_t)); + } + + item_t *type_item = NEW(item_t); + DEQ_ITEM_INIT(type_item); + type_item->desc = desc; + DEQ_INSERT_TAIL(type_list, type_item); + + sys_mutex_unlock(init_lock); +} + + +void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool) +{ + int idx; + + // + // If the descriptor is not initialized, set it up now. + // + if (!desc->global_pool) + dx_alloc_init(desc); + + // + // If this is the thread's first pass through here, allocate the + // thread-local pool for this type. + // + if (*tpool == 0) { + *tpool = NEW(dx_alloc_pool_t); + DEQ_INIT((*tpool)->free_list); + } + + dx_alloc_pool_t *pool = *tpool; + + // + // Fast case: If there's an item on the local free list, take it off the + // list and return it. Since everything we've touched is thread-local, + // there is no need to acquire a lock. + // + item_t *item = DEQ_HEAD(pool->free_list); + if (item) { + DEQ_REMOVE_HEAD(pool->free_list); + return &item[1]; + } + + // + // The local free list is empty, we need to either rebalance a batch + // of items from the global list or go to the heap to get new memory. + // + sys_mutex_lock(desc->lock); + if (DEQ_SIZE(desc->global_pool->free_list) >= desc->config->transfer_batch_size) { + // + // Rebalance a full batch from the global free list to the thread list. + // + desc->stats->batches_rebalanced_to_threads++; + desc->stats->held_by_threads += desc->config->transfer_batch_size; + for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { + item = DEQ_HEAD(desc->global_pool->free_list); + DEQ_REMOVE_HEAD(desc->global_pool->free_list); + DEQ_INSERT_TAIL(pool->free_list, item); + } + } else { + // + // Allocate a full batch from the heap and put it on the thread list. + // + for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { + item = (item_t*) malloc(sizeof(item_t) + desc->total_size); + if (item == 0) + break; + DEQ_ITEM_INIT(item); + item->desc = desc; + DEQ_INSERT_TAIL(pool->free_list, item); + desc->stats->held_by_threads++; + desc->stats->total_alloc_from_heap++; + } + } + sys_mutex_unlock(desc->lock); + + item = DEQ_HEAD(pool->free_list); + if (item) { + DEQ_REMOVE_HEAD(pool->free_list); + return &item[1]; + } + + return 0; +} + + +void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p) +{ + item_t *item = ((item_t*) p) - 1; + int idx; + + // + // If this is the thread's first pass through here, allocate the + // thread-local pool for this type. + // + if (*tpool == 0) { + *tpool = NEW(dx_alloc_pool_t); + DEQ_INIT((*tpool)->free_list); + } + + dx_alloc_pool_t *pool = *tpool; + + DEQ_INSERT_TAIL(pool->free_list, item); + + if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max) + return; + + // + // We've exceeded the maximum size of the local free list. A batch must be + // rebalanced back to the global list. + // + sys_mutex_lock(desc->lock); + desc->stats->batches_rebalanced_to_global++; + desc->stats->held_by_threads -= desc->config->transfer_batch_size; + for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { + item = DEQ_HEAD(pool->free_list); + DEQ_REMOVE_HEAD(pool->free_list); + DEQ_INSERT_TAIL(desc->global_pool->free_list, item); + } + + // + // If there's a global_free_list size limit, remove items until the limit is + // not exceeded. + // + if (desc->config->global_free_list_max != 0) { + while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) { + item = DEQ_HEAD(desc->global_pool->free_list); + DEQ_REMOVE_HEAD(desc->global_pool->free_list); + free(item); + desc->stats->total_free_to_heap++; + } + } + + sys_mutex_unlock(desc->lock); +} + + +void dx_alloc_initialize(void) +{ + init_lock = sys_mutex(); + DEQ_INIT(type_list); +} + diff --git a/extras/dispatch/src/alloc_private.h b/extras/dispatch/src/alloc_private.h new file mode 100644 index 0000000000..fbb18ccd48 --- /dev/null +++ b/extras/dispatch/src/alloc_private.h @@ -0,0 +1,26 @@ +#ifndef __dispatch_alloc_private_h__ +#define __dispatch_alloc_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/alloc.h> + +void dx_alloc_initialize(void); + +#endif diff --git a/extras/dispatch/src/auth.c b/extras/dispatch/src/auth.c new file mode 100644 index 0000000000..f0df58f6c2 --- /dev/null +++ b/extras/dispatch/src/auth.c @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <string.h> +#include "auth.h" +#include "server_private.h" +#include <proton/sasl.h> + + +void auth_client_handler(pn_connector_t *cxtr) +{ + pn_sasl_t *sasl = pn_connector_sasl(cxtr); + pn_sasl_state_t state = pn_sasl_state(sasl); + dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr); + + if (state == PN_SASL_CONF) { + pn_sasl_mechanisms(sasl, "ANONYMOUS"); + pn_sasl_client(sasl); + } + + state = pn_sasl_state(sasl); + + if (state == PN_SASL_PASS) { + ctx->state = CONN_STATE_OPENING; + } else if (state == PN_SASL_FAIL) { + ctx->state = CONN_STATE_FAILED; + } +} + + +void auth_server_handler(pn_connector_t *cxtr) +{ + pn_sasl_t *sasl = pn_connector_sasl(cxtr); + pn_sasl_state_t state = pn_sasl_state(sasl); + dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr); + + while (state == PN_SASL_CONF || state == PN_SASL_STEP) { + if (state == PN_SASL_CONF) { + pn_sasl_mechanisms(sasl, "ANONYMOUS"); + pn_sasl_server(sasl); + } else if (state == PN_SASL_STEP) { + const char* mechanisms = pn_sasl_remote_mechanisms(sasl); + if (strcmp(mechanisms, "ANONYMOUS") == 0) + pn_sasl_done(sasl, PN_SASL_OK); + else + pn_sasl_done(sasl, PN_SASL_AUTH); + } + state = pn_sasl_state(sasl); + } + + if (state == PN_SASL_PASS) { + ctx->state = CONN_STATE_OPENING; + } else if (state == PN_SASL_FAIL) { + ctx->state = CONN_STATE_FAILED; + } +} + + diff --git a/extras/dispatch/src/auth.h b/extras/dispatch/src/auth.h new file mode 100644 index 0000000000..c551c8ff76 --- /dev/null +++ b/extras/dispatch/src/auth.h @@ -0,0 +1,27 @@ +#ifndef __auth_h__ +#define __auth_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/driver.h> + +void auth_client_handler(pn_connector_t *conn); +void auth_server_handler(pn_connector_t *conn); + +#endif diff --git a/extras/dispatch/src/buffer.c b/extras/dispatch/src/buffer.c new file mode 100644 index 0000000000..015711afd9 --- /dev/null +++ b/extras/dispatch/src/buffer.c @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/buffer.h> +#include <qpid/dispatch/alloc.h> + +static size_t buffer_size = 512; +static int size_locked = 0; + +ALLOC_DECLARE(dx_buffer_t); +ALLOC_DEFINE_CONFIG(dx_buffer_t, sizeof(dx_buffer_t), &buffer_size, 0); + + +void dx_buffer_set_size(size_t size) +{ + assert(!size_locked); + buffer_size = size; +} + + +dx_buffer_t *dx_allocate_buffer(void) +{ + size_locked = 1; + dx_buffer_t *buf = new_dx_buffer_t(); + + DEQ_ITEM_INIT(buf); + buf->size = 0; + return buf; +} + + +void dx_free_buffer(dx_buffer_t *buf) +{ + free_dx_buffer_t(buf); +} + + +unsigned char *dx_buffer_base(dx_buffer_t *buf) +{ + return (unsigned char*) &buf[1]; +} + + +unsigned char *dx_buffer_cursor(dx_buffer_t *buf) +{ + return ((unsigned char*) &buf[1]) + buf->size; +} + + +size_t dx_buffer_capacity(dx_buffer_t *buf) +{ + return buffer_size - buf->size; +} + + +size_t dx_buffer_size(dx_buffer_t *buf) +{ + return buf->size; +} + + +void dx_buffer_insert(dx_buffer_t *buf, size_t len) +{ + buf->size += len; + assert(buf->size <= buffer_size); +} + diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c new file mode 100644 index 0000000000..68e2afa3eb --- /dev/null +++ b/extras/dispatch/src/container.c @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <string.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/message.h> +#include <proton/engine.h> +#include <proton/message.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/log.h> + +static char *module="CONTAINER"; + +struct dx_node_t { + const dx_node_type_t *ntype; + char *name; + void *context; + dx_dist_mode_t supported_dist; + dx_lifetime_policy_t life_policy; +}; + +ALLOC_DECLARE(dx_node_t); +ALLOC_DEFINE(dx_node_t); +ALLOC_DEFINE(dx_link_item_t); + +struct dx_link_t { + pn_link_t *pn_link; + void *context; + dx_node_t *node; +}; + +ALLOC_DECLARE(dx_link_t); +ALLOC_DEFINE(dx_link_t); + +typedef struct nxc_node_type_t { + DEQ_LINKS(struct nxc_node_type_t); + const dx_node_type_t *ntype; +} nxc_node_type_t; +DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t); + + +static hash_t *node_type_map; +static hash_t *node_map; +static sys_mutex_t *lock; +static dx_node_t *default_node; +static nxc_node_type_list_t node_type_list; + +static void setup_outgoing_link(pn_link_t *pn_link) +{ + sys_mutex_lock(lock); + dx_node_t *node; + int result; + const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); + dx_field_iterator_t *iter; + // TODO - Extract the name from the structured source + + if (source) { + iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); + result = hash_retrieve(node_map, iter, (void*) &node); + dx_field_iterator_free(iter); + } else + result = -1; + sys_mutex_unlock(lock); + + if (result < 0) { + if (default_node) + node = default_node; + else { + // Reject the link + // TODO - When the API allows, add an error message for "no available node" + pn_link_close(pn_link); + return; + } + } + + dx_link_t *link = new_dx_link_t(); + if (!link) { + pn_link_close(pn_link); + return; + } + + link->pn_link = pn_link; + link->context = 0; + link->node = node; + + pn_link_set_context(pn_link, link); + node->ntype->outgoing_handler(node->context, link); +} + + +static void setup_incoming_link(pn_link_t *pn_link) +{ + sys_mutex_lock(lock); + dx_node_t *node; + int result; + const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_field_iterator_t *iter; + // TODO - Extract the name from the structured target + + if (target) { + iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); + result = hash_retrieve(node_map, iter, (void*) &node); + dx_field_iterator_free(iter); + } else + result = -1; + sys_mutex_unlock(lock); + + if (result < 0) { + if (default_node) + node = default_node; + else { + // Reject the link + // TODO - When the API allows, add an error message for "no available node" + pn_link_close(pn_link); + return; + } + } + + dx_link_t *link = new_dx_link_t(); + if (!link) { + pn_link_close(pn_link); + return; + } + + link->pn_link = pn_link; + link->context = 0; + link->node = node; + + pn_link_set_context(pn_link, link); + node->ntype->incoming_handler(node->context, link); +} + + +static int do_writable(pn_link_t *pn_link) +{ + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + if (!link) + return 0; + + dx_node_t *node = link->node; + if (!node) + return 0; + + return node->ntype->writable_handler(node->context, link); +} + + +static void process_receive(pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + + if (link) { + dx_node_t *node = link->node; + if (node) { + node->ntype->rx_handler(node->context, link, delivery); + return; + } + } + + // + // Reject the delivery if we couldn't find a node to handle it + // + pn_link_advance(pn_link); + pn_link_flow(pn_link, 1); + pn_delivery_update(delivery, PN_REJECTED); + pn_delivery_settle(delivery); +} + + +static void do_send(pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + + if (link) { + dx_node_t *node = link->node; + if (node) { + node->ntype->tx_handler(node->context, link, delivery); + return; + } + } + + // TODO - Cancel the delivery +} + + +static void do_updated(pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + + if (link) { + dx_node_t *node = link->node; + if (node) + node->ntype->disp_handler(node->context, link, delivery); + } +} + + +static int close_handler(void* unused, pn_connection_t *conn) +{ + // + // Close all links, passing False as the 'closed' argument. These links are not + // being properly 'detached'. They are being orphaned. + // + pn_link_t *pn_link = pn_link_head(conn, 0); + while (pn_link) { + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_node_t *node = link->node; + if (node) + node->ntype->link_detach_handler(node->context, link, 0); + pn_link_close(pn_link); + free_dx_link_t(link); + pn_link = pn_link_next(pn_link, 0); + } + + // teardown all sessions + pn_session_t *ssn = pn_session_head(conn, 0); + while (ssn) { + pn_session_close(ssn); + ssn = pn_session_next(ssn, 0); + } + + // teardown the connection + pn_connection_close(conn); + return 0; +} + + +static int process_handler(void* unused, pn_connection_t *conn) +{ + pn_session_t *ssn; + pn_link_t *pn_link; + pn_delivery_t *delivery; + int event_count = 0; + + // Step 1: setup the engine's connection, and any sessions and links + // that may be pending. + + // initialize the connection if it's new + if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { + pn_connection_open(conn); + event_count++; + } + + // open all pending sessions + ssn = pn_session_head(conn, PN_LOCAL_UNINIT); + while (ssn) { + pn_session_open(ssn); + ssn = pn_session_next(ssn, PN_LOCAL_UNINIT); + event_count++; + } + + // configure and open any pending links + pn_link = pn_link_head(conn, PN_LOCAL_UNINIT); + while (pn_link) { + if (pn_link_is_sender(pn_link)) + setup_outgoing_link(pn_link); + else + setup_incoming_link(pn_link); + pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT); + event_count++; + } + + + // Step 2: Now drain all the pending deliveries from the connection's + // work queue and process them + + delivery = pn_work_head(conn); + while (delivery) { + if (pn_delivery_readable(delivery)) + process_receive(delivery); + else if (pn_delivery_writable(delivery)) + do_send(delivery); + + if (pn_delivery_updated(delivery)) + do_updated(delivery); + + delivery = pn_work_next(delivery); + event_count++; + } + + // + // Step 2.5: Traverse all of the links on the connection looking for + // outgoing links with non-zero credit. Call the attached node's + // writable handler for such links. + // + pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); + while (pn_link) { + assert(pn_session_connection(pn_link_session(pn_link)) == conn); + if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0) + event_count += do_writable(pn_link); + pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); + } + + // Step 3: Clean up any links or sessions that have been closed by the + // remote. If the connection has been closed remotely, clean that up + // also. + + // teardown any terminating links + pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + while (pn_link) { + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_node_t *node = link->node; + if (node) + node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message + pn_link_close(pn_link); + pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + event_count++; + } + + // teardown any terminating sessions + ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + while (ssn) { + pn_session_close(ssn); + ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + event_count++; + } + + // teardown the connection if it's terminating + if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { + pn_connection_close(conn); + event_count++; + } + + return event_count; +} + + +static void open_handler(dx_connection_t *conn, dx_direction_t dir) +{ + const dx_node_type_t *nt; + + // + // Note the locking structure in this function. Generally this would be unsafe, but since + // this particular list is only ever appended to and never has items inserted or deleted, + // this usage is safe in this case. + // + sys_mutex_lock(lock); + nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list); + sys_mutex_unlock(lock); + + pn_connection_open(dx_connection_pn(conn)); + + while (nt_item) { + nt = nt_item->ntype; + if (dir == DX_INCOMING) { + if (nt->inbound_conn_open_handler) + nt->inbound_conn_open_handler(nt->type_context, conn); + } else { + if (nt->outbound_conn_open_handler) + nt->outbound_conn_open_handler(nt->type_context, conn); + } + + sys_mutex_lock(lock); + nt_item = DEQ_NEXT(nt_item); + sys_mutex_unlock(lock); + } +} + + +static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn) +{ + pn_connection_t *conn = dx_connection_pn(dx_conn); + + switch (event) { + case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break; + case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break; + case DX_CONN_EVENT_CLOSE: return close_handler(context, conn); + case DX_CONN_EVENT_PROCESS: return process_handler(context, conn); + } + + return 0; +} + + +void dx_container_initialize(void) +{ + dx_log(module, LOG_TRACE, "Container Initializing"); + + node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 + node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 + lock = sys_mutex(); + default_node = 0; + DEQ_INIT(node_type_list); + + dx_server_set_conn_handler(handler); +} + + +void dx_container_finalize(void) +{ +} + + +int dx_container_register_node_type(const dx_node_type_t *nt) +{ + int result; + dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL); + nxc_node_type_t *nt_item = NEW(nxc_node_type_t); + DEQ_ITEM_INIT(nt_item); + nt_item->ntype = nt; + + sys_mutex_lock(lock); + result = hash_insert_const(node_type_map, iter, nt); + DEQ_INSERT_TAIL(node_type_list, nt_item); + sys_mutex_unlock(lock); + + dx_field_iterator_free(iter); + if (result < 0) + return result; + dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name); + + return 0; +} + + +void dx_container_set_default_node_type(const dx_node_type_t *nt, + void *context, + dx_dist_mode_t supported_dist) +{ + if (default_node) + dx_container_destroy_node(default_node); + + if (nt) { + default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT); + dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); + } else { + default_node = 0; + dx_log(module, LOG_TRACE, "Default node removed"); + } +} + + +dx_node_t *dx_container_create_node(const dx_node_type_t *nt, + const char *name, + void *context, + dx_dist_mode_t supported_dist, + dx_lifetime_policy_t life_policy) +{ + int result; + dx_node_t *node = new_dx_node_t(); + if (!node) + return 0; + + node->ntype = nt; + node->name = 0; + node->context = context; + node->supported_dist = supported_dist; + node->life_policy = life_policy; + + if (name) { + dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL); + sys_mutex_lock(lock); + result = hash_insert(node_map, iter, node); + sys_mutex_unlock(lock); + dx_field_iterator_free(iter); + if (result < 0) { + free_dx_node_t(node); + return 0; + } + + node->name = (char*) malloc(strlen(name) + 1); + strcpy(node->name, name); + } + + if (name) + dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name); + + return node; +} + + +void dx_container_destroy_node(dx_node_t *node) +{ + if (node->name) { + dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL); + sys_mutex_lock(lock); + hash_remove(node_map, iter); + sys_mutex_unlock(lock); + dx_field_iterator_free(iter); + free(node->name); + } + + free_dx_node_t(node); +} + + +void dx_container_node_set_context(dx_node_t *node, void *node_context) +{ + node->context = node_context; +} + + +dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node) +{ + return node->supported_dist; +} + + +dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node) +{ + return node->life_policy; +} + + +dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name) +{ + pn_session_t *sess = pn_session(dx_connection_pn(conn)); + dx_link_t *link = new_dx_link_t(); + + if (dir == DX_OUTGOING) + link->pn_link = pn_sender(sess, name); + else + link->pn_link = pn_receiver(sess, name); + link->context = node->context; + link->node = node; + + pn_link_set_context(link->pn_link, link); + + pn_session_open(sess); + + return link; +} + + +void dx_link_set_context(dx_link_t *link, void *context) +{ + link->context = context; +} + + +void *dx_link_get_context(dx_link_t *link) +{ + return link->context; +} + + +pn_link_t *dx_link_pn(dx_link_t *link) +{ + return link->pn_link; +} + + +pn_terminus_t *dx_link_source(dx_link_t *link) +{ + return pn_link_source(link->pn_link); +} + + +pn_terminus_t *dx_link_target(dx_link_t *link) +{ + return pn_link_target(link->pn_link); +} + + +pn_terminus_t *dx_link_remote_source(dx_link_t *link) +{ + return pn_link_remote_source(link->pn_link); +} + + +pn_terminus_t *dx_link_remote_target(dx_link_t *link) +{ + return pn_link_remote_target(link->pn_link); +} + + +void dx_link_activate(dx_link_t *link) +{ + if (!link || !link->pn_link) + return; + + pn_session_t *sess = pn_link_session(link->pn_link); + if (!sess) + return; + + pn_connection_t *conn = pn_session_connection(sess); + if (!conn) + return; + + dx_connection_t *ctx = pn_connection_get_context(conn); + if (!ctx) + return; + + dx_server_activate(ctx); +} + + +void dx_link_close(dx_link_t *link) +{ + pn_link_close(link->pn_link); +} + + diff --git a/extras/dispatch/src/hash.c b/extras/dispatch/src/hash.c new file mode 100644 index 0000000000..c54d5d6fcf --- /dev/null +++ b/extras/dispatch/src/hash.c @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include <stdio.h> +#include <string.h> + +typedef struct hash_item_t { + DEQ_LINKS(struct hash_item_t); + unsigned char *key; + union { + void *val; + const void *val_const; + } v; +} hash_item_t; + +ALLOC_DECLARE(hash_item_t); +ALLOC_DEFINE(hash_item_t); +DEQ_DECLARE(hash_item_t, items_t); + + +typedef struct bucket_t { + items_t items; +} bucket_t; + + +struct hash_t { + bucket_t *buckets; + unsigned int bucket_count; + unsigned int bucket_mask; + int batch_size; + size_t size; + int is_const; +}; + + +// djb2 hash algorithm +static unsigned long hash_function(dx_field_iterator_t *iter) +{ + unsigned long hash = 5381; + int c; + + while (!dx_field_iterator_end(iter)) { + c = (int) dx_field_iterator_octet(iter); + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + + return hash; +} + + +hash_t *hash(int bucket_exponent, int batch_size, int value_is_const) +{ + int i; + hash_t *h = NEW(hash_t); + + if (!h) + return 0; + + h->bucket_count = 1 << bucket_exponent; + h->bucket_mask = h->bucket_count - 1; + h->batch_size = batch_size; + h->size = 0; + h->is_const = value_is_const; + h->buckets = NEW_ARRAY(bucket_t, h->bucket_count); + for (i = 0; i < h->bucket_count; i++) { + DEQ_INIT(h->buckets[i].items); + } + + return h; +} + + +void hash_free(hash_t *h) +{ + // TODO - Implement this +} + + +size_t hash_size(hash_t *h) +{ + return h ? h->size : 0; +} + + +static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error) +{ + unsigned long idx = hash_function(key) & h->bucket_mask; + hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); + + *error = 0; + + while (item) { + if (dx_field_iterator_equal(key, item->key)) + break; + item = item->next; + } + + if (item) { + *error = -1; + return 0; + } + + item = new_hash_item_t(); + if (!item) { + *error = -2; + return 0; + } + + DEQ_ITEM_INIT(item); + item->key = dx_field_iterator_copy(key); + + DEQ_INSERT_TAIL(h->buckets[idx].items, item); + h->size++; + return item; +} + + +int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) +{ + int error = 0; + hash_item_t *item = hash_internal_insert(h, key, &error); + + if (item) + item->v.val = val; + return error; +} + + +int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) +{ + if (!h->is_const) + return -3; + + int error = 0; + hash_item_t *item = hash_internal_insert(h, key, &error); + + if (item) + item->v.val_const = val; + return error; +} + + +static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key) +{ + unsigned long idx = hash_function(key) & h->bucket_mask; + hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); + + while (item) { + if (dx_field_iterator_equal(key, item->key)) + break; + item = item->next; + } + + return item; +} + + +int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val) +{ + hash_item_t *item = hash_internal_retrieve(h, key); + if (item) { + *val = item->v.val; + return 0; + } + return -1; +} + + +int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val) +{ + if (!h->is_const) + return -3; + + hash_item_t *item = hash_internal_retrieve(h, key); + if (item) { + *val = item->v.val_const; + return 0; + } + return -1; +} + + +int hash_remove(hash_t *h, dx_field_iterator_t *key) +{ + unsigned long idx = hash_function(key) & h->bucket_mask; + hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); + + while (item) { + if (dx_field_iterator_equal(key, item->key)) + break; + item = item->next; + } + + if (item) { + free(item->key); + DEQ_REMOVE(h->buckets[idx].items, item); + free_hash_item_t(item); + h->size--; + return 0; + } + + return -1; +} + diff --git a/extras/dispatch/src/iovec.c b/extras/dispatch/src/iovec.c new file mode 100644 index 0000000000..6ff6874440 --- /dev/null +++ b/extras/dispatch/src/iovec.c @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/iovec.h> +#include <qpid/dispatch/alloc.h> +#include <string.h> + +#define DX_IOVEC_MAX 64 + +struct dx_iovec_t { + struct iovec iov_array[DX_IOVEC_MAX]; + struct iovec *iov; + int iov_count; +}; + + +ALLOC_DECLARE(dx_iovec_t); +ALLOC_DEFINE(dx_iovec_t); + + +dx_iovec_t *dx_iovec(int vector_count) +{ + dx_iovec_t *iov = new_dx_iovec_t(); + if (!iov) + return 0; + + memset(iov, 0, sizeof(dx_iovec_t)); + + iov->iov_count = vector_count; + if (vector_count > DX_IOVEC_MAX) + iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count); + else + iov->iov = &iov->iov_array[0]; + + return iov; +} + + +void dx_iovec_free(dx_iovec_t *iov) +{ + if (!iov) + return; + + if (iov->iov && iov->iov != &iov->iov_array[0]) + free(iov->iov); + + free_dx_iovec_t(iov); +} + + +struct iovec *dx_iovec_array(dx_iovec_t *iov) +{ + if (!iov) + return 0; + return iov->iov; +} + + +int dx_iovec_count(dx_iovec_t *iov) +{ + if (!iov) + return 0; + return iov->iov_count; +} + diff --git a/extras/dispatch/src/iterator.c b/extras/dispatch/src/iterator.c new file mode 100644 index 0000000000..6ab67f948d --- /dev/null +++ b/extras/dispatch/src/iterator.c @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include "message_private.h" +#include <stdio.h> +#include <string.h> + +typedef enum { +MODE_TO_END, +MODE_TO_SLASH +} parse_mode_t; + +struct dx_field_iterator_t { + dx_buffer_t *start_buffer; + unsigned char *start_cursor; + int start_length; + dx_buffer_t *buffer; + unsigned char *cursor; + int length; + dx_iterator_view_t view; + parse_mode_t mode; +}; + + +ALLOC_DECLARE(dx_field_iterator_t); +ALLOC_DEFINE(dx_field_iterator_t); + + +typedef enum { +STATE_START, +STATE_SLASH_LEFT, +STATE_SKIPPING_TO_NEXT_SLASH, +STATE_SCANNING, +STATE_COLON, +STATE_COLON_SLASH, +STATE_AT_NODE_ID +} state_t; + + +static void view_initialize(dx_field_iterator_t *iter) +{ + if (iter->view == ITER_VIEW_ALL) { + iter->mode = MODE_TO_END; + return; + } + + // + // Advance to the node-id. + // + state_t state = STATE_START; + unsigned int octet; + while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) { + octet = dx_field_iterator_octet(iter); + switch (state) { + case STATE_START : + if (octet == '/') + state = STATE_SLASH_LEFT; + else + state = STATE_SCANNING; + break; + + case STATE_SLASH_LEFT : + if (octet == '/') + state = STATE_SKIPPING_TO_NEXT_SLASH; + else + state = STATE_AT_NODE_ID; + break; + + case STATE_SKIPPING_TO_NEXT_SLASH : + if (octet == '/') + state = STATE_AT_NODE_ID; + break; + + case STATE_SCANNING : + if (octet == ':') + state = STATE_COLON; + break; + + case STATE_COLON : + if (octet == '/') + state = STATE_COLON_SLASH; + else + state = STATE_SCANNING; + break; + + case STATE_COLON_SLASH : + if (octet == '/') + state = STATE_SKIPPING_TO_NEXT_SLASH; + else + state = STATE_SCANNING; + break; + + case STATE_AT_NODE_ID : + break; + } + } + + if (state != STATE_AT_NODE_ID) { + // + // The address string was relative, not absolute. The node-id + // is at the beginning of the string. + // + iter->buffer = iter->start_buffer; + iter->cursor = iter->start_cursor; + iter->length = iter->start_length; + } + + // + // Cursor is now on the first octet of the node-id + // + if (iter->view == ITER_VIEW_NODE_ID) { + iter->mode = MODE_TO_SLASH; + return; + } + + if (iter->view == ITER_VIEW_NO_HOST) { + iter->mode = MODE_TO_END; + return; + } + + if (iter->view == ITER_VIEW_NODE_SPECIFIC) { + iter->mode = MODE_TO_END; + while (!dx_field_iterator_end(iter)) { + octet = dx_field_iterator_octet(iter); + if (octet == '/') + break; + } + return; + } +} + + +dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view) +{ + dx_field_iterator_t *iter = new_dx_field_iterator_t(); + if (!iter) + return 0; + + iter->start_buffer = 0; + iter->start_cursor = (unsigned char*) text; + iter->start_length = strlen(text); + + dx_field_iterator_reset(iter, view); + + return iter; +} + + +dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, int length, dx_iterator_view_t view) +{ + dx_field_iterator_t *iter = new_dx_field_iterator_t(); + if (!iter) + return 0; + + iter->start_buffer = buffer; + iter->start_cursor = dx_buffer_base(buffer) + offset; + iter->start_length = length; + + dx_field_iterator_reset(iter, view); + + return iter; +} + + +void dx_field_iterator_free(dx_field_iterator_t *iter) +{ + free_dx_field_iterator_t(iter); +} + + +void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view) +{ + iter->buffer = iter->start_buffer; + iter->cursor = iter->start_cursor; + iter->length = iter->start_length; + iter->view = view; + + view_initialize(iter); +} + + +unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) +{ + if (iter->length == 0) + return (unsigned char) 0; + + unsigned char result = *(iter->cursor); + + iter->cursor++; + iter->length--; + + if (iter->length > 0) { + if (iter->buffer) { + if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) { + iter->buffer = iter->buffer->next; + if (iter->buffer == 0) + iter->length = 0; + iter->cursor = dx_buffer_base(iter->buffer); + } + } + } + + if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/') + iter->length = 0; + + return result; +} + + +int dx_field_iterator_end(dx_field_iterator_t *iter) +{ + return iter->length == 0; +} + + +int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string) +{ + dx_field_iterator_reset(iter, iter->view); + while (!dx_field_iterator_end(iter) && *string) { + if (*string != dx_field_iterator_octet(iter)) + return 0; + string++; + } + + return (dx_field_iterator_end(iter) && (*string == 0)); +} + + +unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) +{ + int length = 0; + int idx = 0; + unsigned char *copy; + + dx_field_iterator_reset(iter, iter->view); + while (!dx_field_iterator_end(iter)) { + dx_field_iterator_octet(iter); + length++; + } + + dx_field_iterator_reset(iter, iter->view); + copy = (unsigned char*) malloc(length + 1); + while (!dx_field_iterator_end(iter)) + copy[idx++] = dx_field_iterator_octet(iter); + copy[idx] = '\0'; + + return copy; +} + diff --git a/extras/dispatch/src/log.c b/extras/dispatch/src/log.c new file mode 100644 index 0000000000..d4ec534915 --- /dev/null +++ b/extras/dispatch/src/log.c @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/log.h> +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +static int mask=LOG_INFO; + +static char *cls_prefix(int cls) +{ + switch (cls) { + case LOG_TRACE : return "TRACE"; + case LOG_ERROR : return "ERROR"; + case LOG_INFO : return "INFO"; + } + + return ""; +} + +void dx_log(const char *module, int cls, const char *fmt, ...) +{ + if (!(cls & mask)) + return; + + va_list ap; + char line[128]; + + va_start(ap, fmt); + vsnprintf(line, 127, fmt, ap); + va_end(ap); + fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line); +} + +void dx_log_set_mask(int _mask) +{ + mask = _mask; +} + diff --git a/extras/dispatch/src/message.c b/extras/dispatch/src/message.c new file mode 100644 index 0000000000..f66e79010c --- /dev/null +++ b/extras/dispatch/src/message.c @@ -0,0 +1,1120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include "message_private.h" +#include <string.h> +#include <stdio.h> + +ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0); +ALLOC_DEFINE(dx_message_content_t); + + +static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) +{ + unsigned char *local_cursor = *cursor; + dx_buffer_t *local_buffer = *buffer; + + int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); + while (consume > 0) { + if (consume < remaining) { + local_cursor += consume; + consume = 0; + } else { + consume -= remaining; + local_buffer = local_buffer->next; + if (local_buffer == 0){ + local_cursor = 0; + break; + } + local_cursor = dx_buffer_base(local_buffer); + remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); + } + } + + *cursor = local_cursor; + *buffer = local_buffer; +} + + +static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer) +{ + unsigned char result = **cursor; + advance(cursor, buffer, 1); + return result; +} + + +static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field) +{ + unsigned char tag = next_octet(cursor, buffer); + if (!(*cursor)) return 0; + int consume = 0; + switch (tag & 0xF0) { + case 0x40 : consume = 0; break; + case 0x50 : consume = 1; break; + case 0x60 : consume = 2; break; + case 0x70 : consume = 4; break; + case 0x80 : consume = 8; break; + case 0x90 : consume = 16; break; + + case 0xB0 : + case 0xD0 : + case 0xF0 : + consume |= ((int) next_octet(cursor, buffer)) << 24; + if (!(*cursor)) return 0; + consume |= ((int) next_octet(cursor, buffer)) << 16; + if (!(*cursor)) return 0; + consume |= ((int) next_octet(cursor, buffer)) << 8; + if (!(*cursor)) return 0; + // Fall through to the next case... + + case 0xA0 : + case 0xC0 : + case 0xE0 : + consume |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + break; + } + + if (field) { + field->buffer = *buffer; + field->offset = *cursor - dx_buffer_base(*buffer); + field->length = consume; + field->parsed = 1; + } + + advance(cursor, buffer, consume); + return 1; +} + + +static int start_list(unsigned char **cursor, dx_buffer_t **buffer) +{ + unsigned char tag = next_octet(cursor, buffer); + if (!(*cursor)) return 0; + int length = 0; + int count = 0; + + switch (tag) { + case 0x45 : // list0 + break; + case 0xd0 : // list32 + length |= ((int) next_octet(cursor, buffer)) << 24; + if (!(*cursor)) return 0; + length |= ((int) next_octet(cursor, buffer)) << 16; + if (!(*cursor)) return 0; + length |= ((int) next_octet(cursor, buffer)) << 8; + if (!(*cursor)) return 0; + length |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + + count |= ((int) next_octet(cursor, buffer)) << 24; + if (!(*cursor)) return 0; + count |= ((int) next_octet(cursor, buffer)) << 16; + if (!(*cursor)) return 0; + count |= ((int) next_octet(cursor, buffer)) << 8; + if (!(*cursor)) return 0; + count |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + + break; + + case 0xc0 : // list8 + length |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + + count |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + break; + } + + return count; +} + + +// +// Check the buffer chain, starting at cursor to see if it matches the pattern. +// If the pattern matches, check the next tag to see if it's in the set of expected +// tags. If not, return zero. If so, set the location descriptor to the good +// tag and advance the cursor (and buffer, if needed) to the end of the matched section. +// +// If there is no match, don't advance the cursor. +// +// Return 0 if the pattern matches but the following tag is unexpected +// Return 0 if the pattern matches and the location already has a pointer (duplicate section) +// Return 1 if the pattern matches and we've advanced the cursor/buffer +// Return 1 if the pattern does not match +// +static int dx_check_and_advance(dx_buffer_t **buffer, + unsigned char **cursor, + unsigned char *pattern, + int pattern_length, + unsigned char *expected_tags, + dx_field_location_t *location) +{ + dx_buffer_t *test_buffer = *buffer; + unsigned char *test_cursor = *cursor; + + if (!test_cursor) + return 1; // no match + + unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer); + int idx = 0; + + while (idx < pattern_length && *test_cursor == pattern[idx]) { + idx++; + test_cursor++; + if (test_cursor == end_of_buffer) { + test_buffer = test_buffer->next; + if (test_buffer == 0) + return 1; // Pattern didn't match + test_cursor = dx_buffer_base(test_buffer); + end_of_buffer = test_cursor + dx_buffer_size(test_buffer); + } + } + + if (idx < pattern_length) + return 1; // Pattern didn't match + + // + // Pattern matched, check the tag + // + while (*expected_tags && *test_cursor != *expected_tags) + expected_tags++; + if (*expected_tags == 0) + return 0; // Unexpected tag + + if (location->parsed) + return 0; // Duplicate section + + // + // Pattern matched and tag is expected. Mark the beginning of the section. + // + location->parsed = 1; + location->buffer = test_buffer; + location->offset = test_cursor - dx_buffer_base(test_buffer); + location->length = 0; + + // + // Advance the pointers to consume the whole section. + // + int consume = 0; + unsigned char tag = next_octet(&test_cursor, &test_buffer); + if (!test_cursor) return 0; + switch (tag) { + case 0x45 : // list0 + break; + + case 0xd0 : // list32 + case 0xd1 : // map32 + case 0xb0 : // vbin32 + consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24; + if (!test_cursor) return 0; + consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16; + if (!test_cursor) return 0; + consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8; + if (!test_cursor) return 0; + // Fall through to the next case... + + case 0xc0 : // list8 + case 0xc1 : // map8 + case 0xa0 : // vbin8 + consume |= (int) next_octet(&test_cursor, &test_buffer); + if (!test_cursor) return 0; + break; + } + + if (consume) + advance(&test_cursor, &test_buffer, consume); + + *cursor = test_cursor; + *buffer = test_buffer; + return 1; +} + + +static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len) +{ + dx_buffer_t *buf = DEQ_TAIL(msg->buffers); + + while (len > 0) { + if (buf == 0 || dx_buffer_capacity(buf) == 0) { + buf = dx_allocate_buffer(); + if (buf == 0) + return; + DEQ_INSERT_TAIL(msg->buffers, buf); + } + + size_t to_copy = dx_buffer_capacity(buf); + if (to_copy > len) + to_copy = len; + memcpy(dx_buffer_cursor(buf), seq, to_copy); + dx_buffer_insert(buf, to_copy); + len -= to_copy; + seq += to_copy; + msg->length += to_copy; + } +} + + +static void dx_insert_8(dx_message_content_t *msg, uint8_t value) +{ + dx_insert(msg, &value, 1); +} + + +static void dx_insert_32(dx_message_content_t *msg, uint32_t value) +{ + uint8_t buf[4]; + buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); + buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); + buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); + buf[3] = (uint8_t) (value & 0x000000FF); + dx_insert(msg, buf, 4); +} + + +static void dx_insert_64(dx_message_content_t *msg, uint64_t value) +{ + uint8_t buf[8]; + buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); + buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48); + buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40); + buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32); + buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24); + buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); + buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); + buf[7] = (uint8_t) (value & 0x00000000000000FFL); + dx_insert(msg, buf, 8); +} + + +static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) +{ + while (*buf) { + if (*cursor >= dx_buffer_size(*buf)) { + *buf = (*buf)->next; + *cursor = 0; + } else { + dx_buffer_base(*buf)[*cursor] = value; + (*cursor)++; + return; + } + } +} + + +static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) +{ + dx_buffer_t *buf = field->buffer; + size_t cursor = field->offset; + + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); +} + + +static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code) +{ + // + // Insert the short-form performative tag + // + dx_insert(msg, (const uint8_t*) "\x00\x53", 2); + dx_insert_8(msg, code); + + // + // Open the list with a list32 tag + // + dx_insert_8(msg, 0xd0); + + // + // Mark the current location to later overwrite the length + // + msg->compose_length.buffer = DEQ_TAIL(msg->buffers); + msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer); + msg->compose_length.length = 4; + msg->compose_length.parsed = 1; + + dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); + + // + // Mark the current location to later overwrite the count + // + msg->compose_count.buffer = DEQ_TAIL(msg->buffers); + msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer); + msg->compose_count.length = 4; + msg->compose_count.parsed = 1; + + dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); + + msg->length = 4; // Include the length of the count field + msg->count = 0; +} + + +static void dx_end_list(dx_message_content_t *msg) +{ + dx_overwrite_32(&msg->compose_length, msg->length); + dx_overwrite_32(&msg->compose_count, msg->count); +} + + +static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + + switch (field) { + case DX_FIELD_TO: + while (1) { + if (content->field_to.parsed) + return &content->field_to; + + if (content->section_message_properties.parsed == 0) + break; + + dx_buffer_t *buffer = content->section_message_properties.buffer; + unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; + + int count = start_list(&cursor, &buffer); + int result; + + if (count < 3) + break; + + result = traverse_field(&cursor, &buffer, 0); // message_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, 0); // user_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, &content->field_to); // to + if (!result) return 0; + } + break; + + case DX_FIELD_BODY: + while (1) { + if (content->body.parsed) + return &content->body; + + if (content->section_body.parsed == 0) + break; + + dx_buffer_t *buffer = content->section_body.buffer; + unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset; + int result; + + result = traverse_field(&cursor, &buffer, &content->body); + if (!result) return 0; + } + break; + + default: + break; + } + + return 0; +} + + +dx_message_t *dx_allocate_message() +{ + dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t(); + if (!msg) + return 0; + + DEQ_ITEM_INIT(msg); + msg->content = new_dx_message_content_t(); + msg->out_delivery = 0; + + if (msg->content == 0) { + free_dx_message_t((dx_message_t*) msg); + return 0; + } + + memset(msg->content, 0, sizeof(dx_message_content_t)); + msg->content->lock = sys_mutex(); + msg->content->ref_count = 1; + + return (dx_message_t*) msg; +} + + +void dx_free_message(dx_message_t *in_msg) +{ + uint32_t rc; + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + + sys_mutex_lock(content->lock); + rc = --content->ref_count; + sys_mutex_unlock(content->lock); + + if (rc == 0) { + dx_buffer_t *buf = DEQ_HEAD(content->buffers); + + while (buf) { + DEQ_REMOVE_HEAD(content->buffers); + dx_free_buffer(buf); + buf = DEQ_HEAD(content->buffers); + } + + sys_mutex_free(content->lock); + free_dx_message_content_t(content); + } + + free_dx_message_t((dx_message_t*) msg); +} + + +dx_message_t *dx_message_copy(dx_message_t *in_msg) +{ + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t(); + + if (!copy) + return 0; + + DEQ_ITEM_INIT(copy); + copy->content = content; + copy->out_delivery = 0; + + sys_mutex_lock(content->lock); + content->ref_count++; + sys_mutex_unlock(content->lock); + + return (dx_message_t*) copy; +} + + +void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery) +{ + ((dx_message_pvt_t*) msg)->out_delivery = delivery; +} + + +pn_delivery_t *dx_message_out_delivery(dx_message_t *msg) +{ + return ((dx_message_pvt_t*) msg)->out_delivery; +} + + +void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + content->in_delivery = delivery; +} + + +pn_delivery_t *dx_message_in_delivery(dx_message_t *msg) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + return content->in_delivery; +} + + +dx_message_t *dx_message_receive(pn_delivery_t *delivery) +{ + pn_link_t *link = pn_delivery_link(delivery); + dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery); + ssize_t rc; + dx_buffer_t *buf; + + // + // If there is no message associated with the delivery, this is the first time + // we've received anything on this delivery. Allocate a message descriptor and + // link it and the delivery together. + // + if (!msg) { + msg = (dx_message_pvt_t*) dx_allocate_message(); + pn_delivery_set_context(delivery, (void*) msg); + + // + // Record the incoming delivery only if it is not settled. If it is + // settled, it should not be recorded as no future operations on it are + // permitted. + // + if (!pn_delivery_settled(delivery)) + msg->content->in_delivery = delivery; + } + + // + // Get a reference to the tail buffer on the message. This is the buffer into which + // we will store incoming message data. If there is no buffer in the message, allocate + // an empty one and add it to the message. + // + buf = DEQ_TAIL(msg->content->buffers); + if (!buf) { + buf = dx_allocate_buffer(); + DEQ_INSERT_TAIL(msg->content->buffers, buf); + } + + while (1) { + // + // Try to receive enough data to fill the remaining space in the tail buffer. + // + rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf)); + + // + // If we receive PN_EOS, we have come to the end of the message. + // + if (rc == PN_EOS) { + // + // If the last buffer in the list is empty, remove it and free it. This + // will only happen if the size of the message content is an exact multiple + // of the buffer size. + // + if (dx_buffer_size(buf) == 0) { + DEQ_REMOVE_TAIL(msg->content->buffers); + dx_free_buffer(buf); + } + return (dx_message_t*) msg; + } + + if (rc > 0) { + // + // We have received a positive number of bytes for the message. Advance + // the cursor in the buffer. + // + dx_buffer_insert(buf, rc); + + // + // If the buffer is full, allocate a new empty buffer and append it to the + // tail of the message's list. + // + if (dx_buffer_capacity(buf) == 0) { + buf = dx_allocate_buffer(); + DEQ_INSERT_TAIL(msg->content->buffers, buf); + } + } else + // + // We received zero bytes, and no PN_EOS. This means that we've received + // all of the data available up to this point, but it does not constitute + // the entire message. We'll be back later to finish it up. + // + break; + } + + return 0; +} + + +void dx_message_send(dx_message_t *in_msg, pn_link_t *link) +{ + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); + + // TODO - Handle cases where annotations have been added or modified + while (buf) { + pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); + buf = DEQ_NEXT(buf); + } +} + + +int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth) +{ + +#define LONG 10 +#define SHORT 3 +#define MSG_HDR_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70" +#define MSG_HDR_SHORT (unsigned char*) "\x00\x53\x70" +#define DELIVERY_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71" +#define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71" +#define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72" +#define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72" +#define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73" +#define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73" +#define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74" +#define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74" +#define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75" +#define BODY_DATA_SHORT (unsigned char*) "\x00\x53\x75" +#define BODY_SEQUENCE_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76" +#define BODY_SEQUENCE_SHORT (unsigned char*) "\x00\x53\x76" +#define FOOTER_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78" +#define FOOTER_SHORT (unsigned char*) "\x00\x53\x78" +#define TAGS_LIST (unsigned char*) "\x45\xc0\xd0" +#define TAGS_MAP (unsigned char*) "\xc1\xd1" +#define TAGS_BINARY (unsigned char*) "\xa0\xb0" + + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + dx_buffer_t *buffer = DEQ_HEAD(content->buffers); + unsigned char *cursor; + + if (!buffer) + return 0; // Invalid - No data in the message + + if (depth == DX_DEPTH_NONE) + return 1; + + cursor = dx_buffer_base(buffer); + + // + // MESSAGE HEADER + // + if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header)) + return 0; + + if (depth == DX_DEPTH_HEADER) + return 1; + + // + // DELIVERY ANNOTATION + // + if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation)) + return 0; + + if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS) + return 1; + + // + // MESSAGE ANNOTATION + // + if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation)) + return 0; + + if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS) + return 1; + + // + // PROPERTIES + // + if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties)) + return 0; + + if (depth == DX_DEPTH_PROPERTIES) + return 1; + + // + // APPLICATION PROPERTIES + // + if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties)) + return 0; + + if (depth == DX_DEPTH_APPLICATION_PROPERTIES) + return 1; + + // + // BODY (Note that this function expects a single data section or a single AMQP sequence) + // + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body)) + return 0; + + if (depth == DX_DEPTH_BODY) + return 1; + + // + // FOOTER + // + if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer)) + return 0; + + return 1; +} + + +dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field) +{ + dx_field_location_t *loc = dx_message_field_location(msg, field); + if (!loc) + return 0; + + return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); +} + + +dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field) +{ + dx_field_location_t *loc = dx_message_field_location(msg, field); + if (!loc) + return 0; + + // + // Count the number of buffers this field straddles + // + int bufcnt = 1; + dx_buffer_t *buf = loc->buffer; + size_t bufsize = dx_buffer_size(buf) - loc->offset; + ssize_t remaining = loc->length - bufsize; + + while (remaining > 0) { + bufcnt++; + buf = buf->next; + if (!buf) + return 0; + remaining -= dx_buffer_size(buf); + } + + // + // Allocate an iovec object big enough to hold the number of buffers + // + dx_iovec_t *iov = dx_iovec(bufcnt); + if (!iov) + return 0; + + // + // Build out the io vectors with pointers to the segments of the field in buffers + // + bufcnt = 0; + buf = loc->buffer; + bufsize = dx_buffer_size(buf) - loc->offset; + void *base = dx_buffer_base(buf) + loc->offset; + remaining = loc->length; + + while (remaining > 0) { + dx_iovec_array(iov)[bufcnt].iov_base = base; + dx_iovec_array(iov)[bufcnt].iov_len = bufsize; + bufcnt++; + remaining -= bufsize; + if (remaining > 0) { + buf = buf->next; + base = dx_buffer_base(buf); + bufsize = dx_buffer_size(buf); + if (bufsize > remaining) + bufsize = remaining; + } + } + + return iov; +} + + +void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers) +{ + dx_message_begin_header(msg); + dx_message_insert_boolean(msg, 0); // durable + //dx_message_insert_null(msg); // priority + //dx_message_insert_null(msg); // ttl + //dx_message_insert_boolean(msg, 0); // first-acquirer + //dx_message_insert_uint(msg, 0); // delivery-count + dx_message_end_header(msg); + + dx_message_begin_message_properties(msg); + dx_message_insert_null(msg); // message-id + dx_message_insert_null(msg); // user-id + dx_message_insert_string(msg, to); // to + //dx_message_insert_null(msg); // subject + //dx_message_insert_null(msg); // reply-to + //dx_message_insert_null(msg); // correlation-id + //dx_message_insert_null(msg); // content-type + //dx_message_insert_null(msg); // content-encoding + //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time + //dx_message_insert_timestamp(msg, 0); // creation-time + //dx_message_insert_null(msg); // group-id + //dx_message_insert_uint(msg, 0); // group-sequence + //dx_message_insert_null(msg); // reply-to-group-id + dx_message_end_message_properties(msg); + + if (buffers) + dx_message_append_body_data(msg, buffers); +} + + +void dx_message_begin_header(dx_message_t *msg) +{ + dx_start_list_performative(MSG_CONTENT(msg), 0x70); +} + + +void dx_message_end_header(dx_message_t *msg) +{ + dx_end_list(MSG_CONTENT(msg)); +} + + +void dx_message_begin_delivery_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_delivery_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_begin_message_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_message_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_begin_message_properties(dx_message_t *msg) +{ + dx_start_list_performative(MSG_CONTENT(msg), 0x73); +} + + +void dx_message_end_message_properties(dx_message_t *msg) +{ + dx_end_list(MSG_CONTENT(msg)); +} + + +void dx_message_begin_application_properties(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_application_properties(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_buffer_t *buf = DEQ_HEAD(*buffers); + uint32_t len = 0; + + // + // Calculate the size of the body to be appended. + // + while (buf) { + len += dx_buffer_size(buf); + buf = DEQ_NEXT(buf); + } + + // + // Insert a DATA section performative header. + // + dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); + if (len < 256) { + dx_insert_8(content, 0xa0); // vbin8 + dx_insert_8(content, (uint8_t) len); + } else { + dx_insert_8(content, 0xb0); // vbin32 + dx_insert_32(content, len); + } + + // + // Move the supplied buffers to the tail of the message's buffer list. + // + buf = DEQ_HEAD(*buffers); + while (buf) { + DEQ_REMOVE_HEAD(*buffers); + DEQ_INSERT_TAIL(content->buffers, buf); + buf = DEQ_HEAD(*buffers); + } +} + + +void dx_message_begin_body_sequence(dx_message_t *msg) +{ +} + + +void dx_message_end_body_sequence(dx_message_t *msg) +{ +} + + +void dx_message_begin_footer(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_footer(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_insert_null(dx_message_t *msg) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x40); + content->count++; +} + + +void dx_message_insert_boolean(dx_message_t *msg, int value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (value) + dx_insert(content, (const uint8_t*) "\x56\x01", 2); + else + dx_insert(content, (const uint8_t*) "\x56\x00", 2); + content->count++; +} + + +void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x50); + dx_insert_8(content, value); + content->count++; +} + + +void dx_message_insert_uint(dx_message_t *msg, uint32_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (value == 0) { + dx_insert_8(content, 0x43); // uint0 + } else if (value < 256) { + dx_insert_8(content, 0x52); // smalluint + dx_insert_8(content, (uint8_t) value); + } else { + dx_insert_8(content, 0x70); // uint + dx_insert_32(content, value); + } + content->count++; +} + + +void dx_message_insert_ulong(dx_message_t *msg, uint64_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (value == 0) { + dx_insert_8(content, 0x44); // ulong0 + } else if (value < 256) { + dx_insert_8(content, 0x53); // smallulong + dx_insert_8(content, (uint8_t) value); + } else { + dx_insert_8(content, 0x80); // ulong + dx_insert_64(content, value); + } + content->count++; +} + + +void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (len < 256) { + dx_insert_8(content, 0xa0); // vbin8 + dx_insert_8(content, (uint8_t) len); + } else { + dx_insert_8(content, 0xb0); // vbin32 + dx_insert_32(content, len); + } + dx_insert(content, start, len); + content->count++; +} + + +void dx_message_insert_string(dx_message_t *msg, const char *start) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + uint32_t len = strlen(start); + + if (len < 256) { + dx_insert_8(content, 0xa1); // str8-utf8 + dx_insert_8(content, (uint8_t) len); + dx_insert(content, (const uint8_t*) start, len); + } else { + dx_insert_8(content, 0xb1); // str32-utf8 + dx_insert_32(content, len); + dx_insert(content, (const uint8_t*) start, len); + } + content->count++; +} + + +void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x98); // uuid + dx_insert(content, value, 16); + content->count++; +} + + +void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (len < 256) { + dx_insert_8(content, 0xa3); // sym8 + dx_insert_8(content, (uint8_t) len); + dx_insert(content, (const uint8_t*) start, len); + } else { + dx_insert_8(content, 0xb3); // sym32 + dx_insert_32(content, len); + dx_insert(content, (const uint8_t*) start, len); + } + content->count++; +} + + +void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x83); // timestamp + dx_insert_64(content, value); + content->count++; +} + + +void dx_message_begin_list(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_list(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_begin_map(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_map(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + diff --git a/extras/dispatch/src/message_private.h b/extras/dispatch/src/message_private.h new file mode 100644 index 0000000000..5fb18078f5 --- /dev/null +++ b/extras/dispatch/src/message_private.h @@ -0,0 +1,94 @@ +#ifndef __message_private_h__ +#define __message_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/threading.h> + +/** + * Architecture of the message module: + * + * +--------------+ +----------------------+ + * | | | | + * | dx_message_t |----------->| dx_message_content_t | + * | | +----->| | + * +--------------+ | +----------------------+ + * | | + * +--------------+ | | +-------------+ +-------------+ +-------------+ + * | | | +--->| dx_buffer_t |-->| dx_buffer_t |-->| dx_buffer_t |--/ + * | dx_message_t |-----+ +-------------+ +-------------+ +-------------+ + * | | + * +--------------+ + * + * The message module provides chained-fixed-sized-buffer storage of message content with multiple + * references. If a message is received and is to be queued for multiple destinations, there is only + * one copy of the message content in memory but multiple lightweight references to the content. + * + */ + +typedef struct { + dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present + size_t offset; // Offset in the buffer to the first octet + size_t length; // Length of the field or zero if unneeded + int parsed; // non-zero iff the buffer chain has been parsed to find this field +} dx_field_location_t; + + +// TODO - consider using pointers to dx_field_location_t below to save memory +// TODO - we need a second buffer list for modified annotations and header +// There are three message scenarios: +// 1) Received message is held and forwarded unmodified - single buffer list +// 2) Received message is held and modified before forwarding - two buffer lists +// 3) Message is composed internally - single buffer list + +typedef struct { + sys_mutex_t *lock; + uint32_t ref_count; // The number of qmessages referencing this + dx_buffer_list_t buffers; // The buffer chain containing the message + pn_delivery_t *in_delivery; // The delivery on which the message arrived + dx_field_location_t section_message_header; // The message header list + dx_field_location_t section_delivery_annotation; // The delivery annotation map + dx_field_location_t section_message_annotation; // The message annotation map + dx_field_location_t section_message_properties; // The message properties list + dx_field_location_t section_application_properties; // The application properties list + dx_field_location_t section_body; // The message body: Data + dx_field_location_t section_footer; // The footer + dx_field_location_t field_user_id; // The string value of the user-id + dx_field_location_t field_to; // The string value of the to field + dx_field_location_t body; // The body of the message + dx_field_location_t compose_length; + dx_field_location_t compose_count; + uint32_t length; + uint32_t count; +} dx_message_content_t; + +typedef struct { + DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t + dx_message_content_t *content; + pn_delivery_t *out_delivery; +} dx_message_pvt_t; + +ALLOC_DECLARE(dx_message_t); +ALLOC_DECLARE(dx_message_content_t); + +#define MSG_CONTENT(m) (((dx_message_pvt_t*) m)->content) + +#endif diff --git a/extras/dispatch/src/posix/threading.c b/extras/dispatch/src/posix/threading.c new file mode 100644 index 0000000000..8edce86cdc --- /dev/null +++ b/extras/dispatch/src/posix/threading.c @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/ctools.h> +#include <stdio.h> +#include <pthread.h> + +struct sys_mutex_t { + pthread_mutex_t mutex; + int acquired; +}; + +sys_mutex_t *sys_mutex(void) +{ + sys_mutex_t *mutex = NEW(sys_mutex_t); + pthread_mutex_init(&(mutex->mutex), 0); + mutex->acquired = 0; + return mutex; +} + + +void sys_mutex_free(sys_mutex_t *mutex) +{ + assert(!mutex->acquired); + pthread_mutex_destroy(&(mutex->mutex)); + free(mutex); +} + + +void sys_mutex_lock(sys_mutex_t *mutex) +{ + pthread_mutex_lock(&(mutex->mutex)); + assert(!mutex->acquired); + mutex->acquired++; +} + + +void sys_mutex_unlock(sys_mutex_t *mutex) +{ + mutex->acquired--; + assert(!mutex->acquired); + pthread_mutex_unlock(&(mutex->mutex)); +} + + +struct sys_cond_t { + pthread_cond_t cond; +}; + + +sys_cond_t *sys_cond(void) +{ + sys_cond_t *cond = NEW(sys_cond_t); + pthread_cond_init(&(cond->cond), 0); + return cond; +} + + +void sys_cond_free(sys_cond_t *cond) +{ + pthread_cond_destroy(&(cond->cond)); + free(cond); +} + + +void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex) +{ + assert(held_mutex->acquired); + held_mutex->acquired--; + pthread_cond_wait(&(cond->cond), &(held_mutex->mutex)); + held_mutex->acquired++; +} + + +void sys_cond_signal(sys_cond_t *cond) +{ + pthread_cond_signal(&(cond->cond)); +} + + +void sys_cond_signal_all(sys_cond_t *cond) +{ + pthread_cond_broadcast(&(cond->cond)); +} + + +struct sys_thread_t { + pthread_t thread; +}; + +sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg) +{ + sys_thread_t *thread = NEW(sys_thread_t); + pthread_create(&(thread->thread), 0, run_function, arg); + return thread; +} + + +void sys_thread_free(sys_thread_t *thread) +{ + free(thread); +} + + +void sys_thread_join(sys_thread_t *thread) +{ + pthread_join(thread->thread, 0); +} + diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c new file mode 100644 index 0000000000..6ddc8f45dd --- /dev/null +++ b/extras/dispatch/src/router_node.c @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/log.h> +#include <qpid/dispatch/router.h> + +static char *module="ROUTER_NODE"; + +struct dx_router_t { + dx_node_t *node; + dx_link_list_t in_links; + dx_link_list_t out_links; + dx_message_list_t in_fifo; + sys_mutex_t *lock; + dx_timer_t *timer; + hash_t *out_hash; + uint64_t dtag; +}; + + +typedef struct { + dx_link_t *link; + dx_message_list_t out_fifo; +} dx_router_link_t; + + +ALLOC_DECLARE(dx_router_link_t); +ALLOC_DEFINE(dx_router_link_t); + + +/** + * Outbound Delivery Handler + */ +static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + dx_message_t *msg; + size_t size; + + sys_mutex_lock(router->lock); + msg = DEQ_HEAD(rlink->out_fifo); + if (!msg) { + // TODO - Recind the delivery + sys_mutex_unlock(router->lock); + return; + } + + DEQ_REMOVE_HEAD(rlink->out_fifo); + size = (DEQ_SIZE(rlink->out_fifo)); + sys_mutex_unlock(router->lock); + + dx_message_send(msg, pn_link); + + // + // If there is no incoming delivery, it was pre-settled. In this case, + // we must pre-settle the outgoing delivery as well. + // + if (dx_message_in_delivery(msg)) { + pn_delivery_set_context(delivery, (void*) msg); + dx_message_set_out_delivery(msg, delivery); + } else { + pn_delivery_settle(delivery); + dx_free_message(msg); + } + + pn_link_advance(pn_link); + pn_link_offered(pn_link, size); +} + + +/** + * Inbound Delivery Handler + */ +static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_message_t *msg; + int valid_message = 0; + + // + // Receive the message into a local representation. If the returned message + // pointer is NULL, we have not yet received a complete message. + // + sys_mutex_lock(router->lock); + msg = dx_message_receive(delivery); + sys_mutex_unlock(router->lock); + + if (!msg) + return; + + // + // Validate the message through the Properties section + // + valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); + + pn_link_advance(pn_link); + pn_link_flow(pn_link, 1); + + if (valid_message) { + dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); + dx_router_link_t *rlink; + if (iter) { + dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + sys_mutex_lock(router->lock); + int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); + dx_field_iterator_free(iter); + + if (result == 0) { + // + // To field is valid and contains a known destination. Enqueue on + // the output fifo for the next-hop-to-destination. + // + pn_link_t* pn_outlink = dx_link_pn(rlink->link); + DEQ_INSERT_TAIL(rlink->out_fifo, msg); + pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo)); + dx_link_activate(rlink->link); + } else { + // + // To field contains an unknown address. Release the message. + // + pn_delivery_update(delivery, PN_RELEASED); + pn_delivery_settle(delivery); + } + + sys_mutex_unlock(router->lock); + } + } else { + // + // Message is invalid. Reject the message. + // + pn_delivery_update(delivery, PN_REJECTED); + pn_delivery_settle(delivery); + pn_delivery_set_context(delivery, 0); + dx_free_message(msg); + } +} + + +/** + * Delivery Disposition Handler + */ +static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + + if (pn_link_is_sender(pn_link)) { + pn_disposition_t disp = pn_delivery_remote_state(delivery); + dx_message_t *msg = pn_delivery_get_context(delivery); + pn_delivery_t *activate = 0; + + if (msg) { + assert(delivery == dx_message_out_delivery(msg)); + if (disp != 0) { + activate = dx_message_in_delivery(msg); + pn_delivery_update(activate, disp); + // TODO - handling of the data accompanying RECEIVED/MODIFIED + } + + if (pn_delivery_settled(delivery)) { + // + // Downstream delivery has been settled. Propagate the settlement + // upstream. + // + activate = dx_message_in_delivery(msg); + pn_delivery_settle(activate); + pn_delivery_settle(delivery); + dx_free_message(msg); + } + + if (activate) { + // + // Activate the upstream/incoming link so that the settlement will + // get pushed out. + // + dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate)); + dx_link_activate(act_link); + } + + return; + } + } + + pn_delivery_settle(delivery); +} + + +/** + * New Incoming Link Handler + */ +static int router_incoming_link_handler(void* context, dx_link_t *link) +{ + dx_router_t *router = (dx_router_t*) context; + dx_link_item_t *item = new_dx_link_item_t(); + pn_link_t *pn_link = dx_link_pn(link); + + if (item) { + DEQ_ITEM_INIT(item); + item->link = link; + + sys_mutex_lock(router->lock); + DEQ_INSERT_TAIL(router->in_links, item); + sys_mutex_unlock(router->lock); + + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_flow(pn_link, 8); + pn_link_open(pn_link); + } else { + pn_link_close(pn_link); + } + return 0; +} + + +/** + * New Outgoing Link Handler + */ +static int router_outgoing_link_handler(void* context, dx_link_t *link) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + + sys_mutex_lock(router->lock); + dx_router_link_t *rlink = new_dx_router_link_t(); + rlink->link = link; + DEQ_INIT(rlink->out_fifo); + dx_link_set_context(link, rlink); + + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + int result = hash_insert(router->out_hash, iter, rlink); + dx_field_iterator_free(iter); + + if (result == 0) { + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_open(pn_link); + sys_mutex_unlock(router->lock); + dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); + return 0; + } + + dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt); + pn_link_close(pn_link); + sys_mutex_unlock(router->lock); + return 0; +} + + +/** + * Outgoing Link Writable Handler + */ +static int router_writable_link_handler(void* context, dx_link_t *link) +{ + dx_router_t *router = (dx_router_t*) context; + int grant_delivery = 0; + pn_delivery_t *delivery; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + pn_link_t *pn_link = dx_link_pn(link); + uint64_t tag; + + sys_mutex_lock(router->lock); + if (DEQ_SIZE(rlink->out_fifo) > 0) { + grant_delivery = 1; + tag = router->dtag++; + } + sys_mutex_unlock(router->lock); + + if (grant_delivery) { + pn_delivery(pn_link, pn_dtag((char*) &tag, 8)); + delivery = pn_link_current(pn_link); + if (delivery) { + router_tx_handler(context, link, delivery); + return 1; + } + } + + return 0; +} + + +/** + * Link Detached Handler + */ +static int router_link_detach_handler(void* context, dx_link_t *link, int closed) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_link_item_t *item; + + sys_mutex_lock(router->lock); + if (pn_link_is_sender(pn_link)) { + item = DEQ_HEAD(router->out_links); + + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + dx_router_link_t *rlink; + if (iter) { + int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); + if (result == 0) { + dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + hash_remove(router->out_hash, iter); + free_dx_router_link_t(rlink); + dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); + } + dx_field_iterator_free(iter); + } + } + else + item = DEQ_HEAD(router->in_links); + + while (item) { + if (item->link == link) { + if (pn_link_is_sender(pn_link)) + DEQ_REMOVE(router->out_links, item); + else + DEQ_REMOVE(router->in_links, item); + free_dx_link_item_t(item); + break; + } + item = item->next; + } + + sys_mutex_unlock(router->lock); + return 0; +} + + +static void router_inbound_open_handler(void *type_context, dx_connection_t *conn) +{ +} + + +static void router_outbound_open_handler(void *type_context, dx_connection_t *conn) +{ +} + + +static void dx_router_timer_handler(void *context) +{ + dx_router_t *router = (dx_router_t*) context; + + // + // Periodic processing. + // + dx_timer_schedule(router->timer, 1000); +} + + +static dx_node_type_t router_node = {"router", 0, 0, + router_rx_handler, + router_tx_handler, + router_disp_handler, + router_incoming_link_handler, + router_outgoing_link_handler, + router_writable_link_handler, + router_link_detach_handler, + 0, // node_created_handler + 0, // node_destroyed_handler + router_inbound_open_handler, + router_outbound_open_handler }; +static int type_registered = 0; + + +dx_router_t *dx_router(dx_router_configuration_t *config) +{ + if (!type_registered) { + type_registered = 1; + dx_container_register_node_type(&router_node); + } + + dx_router_t *router = NEW(dx_router_t); + dx_container_set_default_node_type(&router_node, (void*) router, DX_DIST_BOTH); + + DEQ_INIT(router->in_links); + DEQ_INIT(router->out_links); + DEQ_INIT(router->in_fifo); + + router->lock = sys_mutex(); + + router->timer = dx_timer(dx_router_timer_handler, (void*) router); + dx_timer_schedule(router->timer, 0); // Immediate + + router->out_hash = hash(10, 32, 0); + router->dtag = 1; + + return router; +} + + +void dx_router_free(dx_router_t *router) +{ + dx_container_set_default_node_type(0, 0, DX_DIST_BOTH); + sys_mutex_free(router->lock); + free(router); +} + diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c new file mode 100644 index 0000000000..0099393f60 --- /dev/null +++ b/extras/dispatch/src/server.c @@ -0,0 +1,903 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/log.h> +#include "server_private.h" +#include "timer_private.h" +#include "alloc_private.h" +#include "auth.h" +#include "work_queue.h" +#include <stdio.h> +#include <time.h> +#include <signal.h> + +static char *module="SERVER"; + +typedef struct dx_thread_t { + int thread_id; + volatile int running; + volatile int canceled; + int using_thread; + sys_thread_t *thread; +} dx_thread_t; + + +typedef struct dx_server_t { + int thread_count; + pn_driver_t *driver; + dx_thread_start_cb_t start_handler; + dx_conn_handler_cb_t conn_handler; + dx_signal_handler_cb_t signal_handler; + dx_user_fd_handler_cb_t ufd_handler; + void *start_context; + void *conn_context; + void *signal_context; + sys_cond_t *cond; + sys_mutex_t *lock; + dx_thread_t **threads; + work_queue_t *work_queue; + dx_timer_list_t pending_timers; + bool a_thread_is_waiting; + int threads_active; + int pause_requests; + int threads_paused; + int pause_next_sequence; + int pause_now_serving; + int pending_signal; +} dx_server_t; + + +ALLOC_DEFINE(dx_listener_t); +ALLOC_DEFINE(dx_connector_t); +ALLOC_DEFINE(dx_connection_t); +ALLOC_DEFINE(dx_user_fd_t); + + +/** + * Singleton Concurrent Proton Driver object + */ +static dx_server_t *dx_server = 0; + + +static void signal_handler(int signum) +{ + dx_server->pending_signal = signum; + sys_cond_signal_all(dx_server->cond); +} + + +static dx_thread_t *thread(int id) +{ + dx_thread_t *thread = NEW(dx_thread_t); + if (!thread) + return 0; + + thread->thread_id = id; + thread->running = 0; + thread->canceled = 0; + thread->using_thread = 0; + + return thread; +} + + +static void thread_process_listeners(pn_driver_t *driver) +{ + pn_listener_t *listener = pn_driver_listener(driver); + pn_connector_t *cxtr; + dx_connection_t *ctx; + + while (listener) { + dx_log(module, LOG_TRACE, "Accepting Connection"); + cxtr = pn_listener_accept(listener); + ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_SASL_SERVER; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_cxtr = cxtr; + ctx->pn_conn = 0; + ctx->listener = (dx_listener_t*) pn_listener_context(listener); + ctx->connector = 0; + ctx->context = ctx->listener->context; + ctx->ufd = 0; + + pn_connector_set_context(cxtr, ctx); + listener = pn_driver_listener(driver); + } +} + + +static void handle_signals_LH(void) +{ + int signum = dx_server->pending_signal; + + if (signum) { + dx_server->pending_signal = 0; + if (dx_server->signal_handler) { + sys_mutex_unlock(dx_server->lock); + dx_server->signal_handler(dx_server->signal_context, signum); + sys_mutex_lock(dx_server->lock); + } + } +} + + +static void block_if_paused_LH(void) +{ + if (dx_server->pause_requests > 0) { + dx_server->threads_paused++; + sys_cond_signal_all(dx_server->cond); + while (dx_server->pause_requests > 0) + sys_cond_wait(dx_server->cond, dx_server->lock); + dx_server->threads_paused--; + } +} + + +static void process_connector(pn_connector_t *cxtr) +{ + dx_connection_t *ctx = pn_connector_context(cxtr); + int events = 0; + int auth_passes = 0; + + if (ctx->state == CONN_STATE_USER) { + dx_server->ufd_handler(ctx->ufd->context, ctx->ufd); + return; + } + + do { + // + // Step the engine for pre-handler processing + // + pn_connector_process(cxtr); + + // + // Call the handler that is appropriate for the connector's state. + // + switch (ctx->state) { + case CONN_STATE_CONNECTING: + if (!pn_connector_closed(cxtr)) { + ctx->state = CONN_STATE_SASL_CLIENT; + assert(ctx->connector); + ctx->connector->state = CXTR_STATE_OPEN; + events = 1; + } else { + ctx->state = CONN_STATE_FAILED; + events = 0; + } + break; + + case CONN_STATE_SASL_CLIENT: + if (auth_passes == 0) { + auth_client_handler(cxtr); + events = 1; + } else { + auth_passes++; + events = 0; + } + break; + + case CONN_STATE_SASL_SERVER: + if (auth_passes == 0) { + auth_server_handler(cxtr); + events = 1; + } else { + auth_passes++; + events = 0; + } + break; + + case CONN_STATE_OPENING: + ctx->state = CONN_STATE_OPERATIONAL; + + pn_connection_t *conn = pn_connection(); + pn_connection_set_container(conn, "dispatch"); // TODO - make unique + pn_connector_set_connection(cxtr, conn); + pn_connection_set_context(conn, ctx); + ctx->pn_conn = conn; + + dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + + if (ctx->listener) { + ce = DX_CONN_EVENT_LISTENER_OPEN; + } else if (ctx->connector) { + ce = DX_CONN_EVENT_CONNECTOR_OPEN; + ctx->connector->delay = 0; + } else + assert(0); + + dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + events = 1; + break; + + case CONN_STATE_OPERATIONAL: + if (pn_connector_closed(cxtr)) { + dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_CLOSE, + (dx_connection_t*) pn_connector_context(cxtr)); + events = 0; + } + else + events = dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_PROCESS, + (dx_connection_t*) pn_connector_context(cxtr)); + break; + + default: + break; + } + } while (events > 0); +} + + +// +// TEMPORARY FUNCTION PROTOTYPES +// +void pn_driver_wait_1(pn_driver_t *d); +int pn_driver_wait_2(pn_driver_t *d, int timeout); +void pn_driver_wait_3(pn_driver_t *d); +// +// END TEMPORARY +// + +static void *thread_run(void *arg) +{ + dx_thread_t *thread = (dx_thread_t*) arg; + pn_connector_t *work; + pn_connection_t *conn; + dx_connection_t *ctx; + int error; + int poll_result; + int timer_holdoff = 0; + + if (!thread) + return 0; + + thread->running = 1; + + if (thread->canceled) + return 0; + + // + // Invoke the start handler if the application supplied one. + // This handler can be used to set NUMA or processor affinnity for the thread. + // + if (dx_server->start_handler) + dx_server->start_handler(dx_server->start_context, thread->thread_id); + + // + // Main Loop + // + while (thread->running) { + sys_mutex_lock(dx_server->lock); + + // + // Check for pending signals to process + // + handle_signals_LH(); + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Check to see if the server is pausing. If so, block here. + // + block_if_paused_LH(); + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Service pending timers. + // + dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); + if (timer) { + DEQ_REMOVE_HEAD(dx_server->pending_timers); + + // + // Mark the timer as idle in case it reschedules itself. + // + dx_timer_idle_LH(timer); + + // + // Release the lock and invoke the connection handler. + // + sys_mutex_unlock(dx_server->lock); + timer->handler(timer->context); + pn_driver_wakeup(dx_server->driver); + continue; + } + + // + // Check the work queue for connectors scheduled for processing. + // + work = work_queue_get(dx_server->work_queue); + if (!work) { + // + // There is no pending work to do + // + if (dx_server->a_thread_is_waiting) { + // + // Another thread is waiting on the proton driver, this thread must + // wait on the condition variable until signaled. + // + sys_cond_wait(dx_server->cond, dx_server->lock); + } else { + // + // This thread elects itself to wait on the proton driver. Set the + // thread-is-waiting flag so other idle threads will not interfere. + // + dx_server->a_thread_is_waiting = true; + + // + // Ask the timer module when its next timer is scheduled to fire. We'll + // use this value in driver_wait as the timeout. If there are no scheduled + // timers, the returned value will be -1. + // + long duration = dx_timer_next_duration_LH(); + + // + // Invoke the proton driver's wait sequence. This is a bit of a hack for now + // and will be improved in the future. The wait process is divided into three parts, + // the first and third of which need to be non-reentrant, and the second of which + // must be reentrant (and blocks). + // + pn_driver_wait_1(dx_server->driver); + sys_mutex_unlock(dx_server->lock); + + do { + error = 0; + poll_result = pn_driver_wait_2(dx_server->driver, duration); + if (poll_result == -1) + error = pn_driver_errno(dx_server->driver); + } while (error == PN_INTR); + if (error) { + dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver))); + exit(-1); + } + + sys_mutex_lock(dx_server->lock); + pn_driver_wait_3(dx_server->driver); + + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Visit the timer module. + // + if (poll_result == 0 || ++timer_holdoff == 100) { + struct timespec tv; + clock_gettime(CLOCK_REALTIME, &tv); + long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000; + dx_timer_visit_LH(milliseconds); + timer_holdoff = 0; + } + + // + // Process listeners (incoming connections). + // + thread_process_listeners(dx_server->driver); + + // + // Traverse the list of connectors-needing-service from the proton driver. + // If the connector is not already in the work queue and it is not currently + // being processed by another thread, put it in the work queue and signal the + // condition variable. + // + work = pn_driver_connector(dx_server->driver); + while (work) { + ctx = pn_connector_context(work); + if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { + ctx->enqueued = 1; + work_queue_put(dx_server->work_queue, work); + sys_cond_signal(dx_server->cond); + } + work = pn_driver_connector(dx_server->driver); + } + + // + // Release our exclusive claim on pn_driver_wait. + // + dx_server->a_thread_is_waiting = false; + } + } + + // + // If we were given a connector to work on from the work queue, mark it as + // owned by this thread and as no longer enqueued. + // + if (work) { + ctx = pn_connector_context(work); + if (ctx->owner_thread == CONTEXT_NO_OWNER) { + ctx->owner_thread = thread->thread_id; + ctx->enqueued = 0; + dx_server->threads_active++; + } else { + // + // This connector is being processed by another thread, re-queue it. + // + work_queue_put(dx_server->work_queue, work); + work = 0; + } + } + sys_mutex_unlock(dx_server->lock); + + // + // Process the connector that we now have exclusive access to. + // + if (work) { + process_connector(work); + + // + // Check to see if the connector was closed during processing + // + if (pn_connector_closed(work)) { + // + // Connector is closed. Free the context and the connector. + // + conn = pn_connector_connection(work); + if (ctx->connector) { + ctx->connector->ctx = 0; + ctx->connector->state = CXTR_STATE_CONNECTING; + dx_timer_schedule(ctx->connector->timer, ctx->connector->delay); + } + sys_mutex_lock(dx_server->lock); + free_dx_connection_t(ctx); + pn_connector_free(work); + if (conn) + pn_connection_free(conn); + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); + } else { + // + // The connector lives on. Mark it as no longer owned by this thread. + // + sys_mutex_lock(dx_server->lock); + ctx->owner_thread = CONTEXT_NO_OWNER; + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); + } + + // + // Wake up the proton driver to force it to reconsider its set of FDs + // in light of the processing that just occurred. + // + pn_driver_wakeup(dx_server->driver); + } + } + + return 0; +} + + +static void thread_start(dx_thread_t *thread) +{ + if (!thread) + return; + + thread->using_thread = 1; + thread->thread = sys_thread(thread_run, (void*) thread); +} + + +static void thread_cancel(dx_thread_t *thread) +{ + if (!thread) + return; + + thread->running = 0; + thread->canceled = 1; +} + + +static void thread_join(dx_thread_t *thread) +{ + if (!thread) + return; + + if (thread->using_thread) + sys_thread_join(thread->thread); +} + + +static void thread_free(dx_thread_t *thread) +{ + if (!thread) + return; + + free(thread); +} + + +static void cxtr_try_open(void *context) +{ + dx_connector_t *ct = (dx_connector_t*) context; + if (ct->state != CXTR_STATE_CONNECTING) + return; + + dx_connection_t *ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_CONNECTING; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_conn = 0; + ctx->listener = 0; + ctx->connector = ct; + ctx->context = ct->context; + ctx->user_context = 0; + ctx->ufd = 0; + + // + // pn_connector is not thread safe + // + sys_mutex_lock(dx_server->lock); + ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx); + sys_mutex_unlock(dx_server->lock); + + ct->ctx = ctx; + ct->delay = 5000; + dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); +} + + +void dx_server_initialize(int thread_count) +{ + int i; + + if (dx_server) + return; // TODO - Fail in a more dramatic way + + dx_alloc_initialize(); + dx_server = NEW(dx_server_t); + + if (!dx_server) + return; // TODO - Fail in a more dramatic way + + dx_server->thread_count = thread_count; + dx_server->driver = pn_driver(); + dx_server->start_handler = 0; + dx_server->conn_handler = 0; + dx_server->signal_handler = 0; + dx_server->ufd_handler = 0; + dx_server->start_context = 0; + dx_server->signal_context = 0; + dx_server->lock = sys_mutex(); + dx_server->cond = sys_cond(); + + dx_timer_initialize(dx_server->lock); + + dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count); + for (i = 0; i < thread_count; i++) + dx_server->threads[i] = thread(i); + + dx_server->work_queue = work_queue(); + DEQ_INIT(dx_server->pending_timers); + dx_server->a_thread_is_waiting = false; + dx_server->threads_active = 0; + dx_server->pause_requests = 0; + dx_server->threads_paused = 0; + dx_server->pause_next_sequence = 0; + dx_server->pause_now_serving = 0; + dx_server->pending_signal = 0; +} + + +void dx_server_finalize(void) +{ + int i; + if (!dx_server) + return; + + for (i = 0; i < dx_server->thread_count; i++) + thread_free(dx_server->threads[i]); + + work_queue_free(dx_server->work_queue); + + pn_driver_free(dx_server->driver); + sys_mutex_free(dx_server->lock); + sys_cond_free(dx_server->cond); + free(dx_server); + dx_server = 0; +} + + +void dx_server_set_conn_handler(dx_conn_handler_cb_t handler) +{ + dx_server->conn_handler = handler; +} + + +void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context) +{ + dx_server->signal_handler = handler; + dx_server->signal_context = context; +} + + +void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context) +{ + dx_server->start_handler = handler; + dx_server->start_context = context; +} + + +void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler) +{ + dx_server->ufd_handler = ufd_handler; +} + + +void dx_server_run(void) +{ + int i; + if (!dx_server) + return; + + assert(dx_server->conn_handler); // Server can't run without a connection handler. + + for (i = 1; i < dx_server->thread_count; i++) + thread_start(dx_server->threads[i]); + + dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count); + + thread_run((void*) dx_server->threads[0]); + + for (i = 1; i < dx_server->thread_count; i++) + thread_join(dx_server->threads[i]); + + dx_log(module, LOG_INFO, "Shut Down"); +} + + +void dx_server_stop(void) +{ + int idx; + + sys_mutex_lock(dx_server->lock); + for (idx = 0; idx < dx_server->thread_count; idx++) + thread_cancel(dx_server->threads[idx]); + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_signal(int signum) +{ + signal(signum, signal_handler); +} + + +void dx_server_pause(void) +{ + sys_mutex_lock(dx_server->lock); + + // + // Bump the request count to stop all the threads. + // + dx_server->pause_requests++; + int my_sequence = dx_server->pause_next_sequence++; + + // + // Awaken all threads that are currently blocking. + // + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); + + // + // Wait for the paused thread count plus the number of threads requesting a pause to equal + // the total thread count. Also, don't exit the blocking loop until now_serving equals our + // sequence number. This ensures that concurrent pausers don't run at the same time. + // + while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) || + (my_sequence != dx_server->pause_now_serving)) + sys_cond_wait(dx_server->cond, dx_server->lock); + + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_resume(void) +{ + sys_mutex_lock(dx_server->lock); + dx_server->pause_requests--; + dx_server->pause_now_serving++; + sys_cond_signal_all(dx_server->cond); + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_activate(dx_connection_t *ctx) +{ + if (!ctx) + return; + + pn_connector_t *ctor = ctx->pn_cxtr; + if (!ctor) + return; + + if (!pn_connector_closed(ctor)) + pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE); +} + + +void dx_connection_set_context(dx_connection_t *conn, void *context) +{ + conn->user_context = context; +} + + +void *dx_connection_get_context(dx_connection_t *conn) +{ + return conn->user_context; +} + + +pn_connection_t *dx_connection_pn(dx_connection_t *conn) +{ + return conn->pn_conn; +} + + +dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context) +{ + dx_listener_t *li = new_dx_listener_t(); + + if (!li) + return 0; + + li->config = config; + li->context = context; + li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li); + + if (!li->pn_listener) { + dx_log(module, LOG_ERROR, "Driver Error %d (%s)", + pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver)); + free_dx_listener_t(li); + return 0; + } + dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port); + + return li; +} + + +void dx_server_listener_free(dx_listener_t* li) +{ + pn_listener_free(li->pn_listener); + free_dx_listener_t(li); +} + + +void dx_server_listener_close(dx_listener_t* li) +{ + pn_listener_close(li->pn_listener); +} + + +dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context) +{ + dx_connector_t *ct = new_dx_connector_t(); + + if (!ct) + return 0; + + ct->state = CXTR_STATE_CONNECTING; + ct->config = config; + ct->context = context; + ct->ctx = 0; + ct->timer = dx_timer(cxtr_try_open, (void*) ct); + ct->delay = 0; + + dx_timer_schedule(ct->timer, ct->delay); + return ct; +} + + +void dx_server_connector_free(dx_connector_t* ct) +{ + // Don't free the proton connector. This will be done by the connector + // processing/cleanup. + + if (ct->ctx) { + pn_connector_close(ct->ctx->pn_cxtr); + ct->ctx->connector = 0; + } + + dx_timer_free(ct->timer); + free_dx_connector_t(ct); +} + + +dx_user_fd_t *dx_user_fd(int fd, void *context) +{ + dx_user_fd_t *ufd = new_dx_user_fd_t(); + + if (!ufd) + return 0; + + dx_connection_t *ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_USER; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_conn = 0; + ctx->listener = 0; + ctx->connector = 0; + ctx->context = 0; + ctx->user_context = 0; + ctx->ufd = ufd; + + ufd->context = context; + ufd->fd = fd; + ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx); + pn_driver_wakeup(dx_server->driver); + + return ufd; +} + + +void dx_user_fd_free(dx_user_fd_t *ufd) +{ + pn_connector_close(ufd->pn_conn); + free_dx_user_fd_t(ufd); +} + + +void dx_user_fd_activate_read(dx_user_fd_t *ufd) +{ + pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE); + pn_driver_wakeup(dx_server->driver); +} + + +void dx_user_fd_activate_write(dx_user_fd_t *ufd) +{ + pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE); + pn_driver_wakeup(dx_server->driver); +} + + +bool dx_user_fd_is_readable(dx_user_fd_t *ufd) +{ + return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE); +} + + +bool dx_user_fd_is_writeable(dx_user_fd_t *ufd) +{ + return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE); +} + + +void dx_server_timer_pending_LH(dx_timer_t *timer) +{ + DEQ_INSERT_TAIL(dx_server->pending_timers, timer); +} + + +void dx_server_timer_cancel_LH(dx_timer_t *timer) +{ + DEQ_REMOVE(dx_server->pending_timers, timer); +} + diff --git a/extras/dispatch/src/server_private.h b/extras/dispatch/src/server_private.h new file mode 100644 index 0000000000..1722175e35 --- /dev/null +++ b/extras/dispatch/src/server_private.h @@ -0,0 +1,96 @@ +#ifndef __server_private_h__ +#define __server_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/user_fd.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/alloc.h> +#include <proton/driver.h> +#include <proton/driver_extras.h> + +void dx_server_timer_pending_LH(dx_timer_t *timer); +void dx_server_timer_cancel_LH(dx_timer_t *timer); + + +typedef enum { + CONN_STATE_CONNECTING = 0, + CONN_STATE_SASL_CLIENT, + CONN_STATE_SASL_SERVER, + CONN_STATE_OPENING, + CONN_STATE_OPERATIONAL, + CONN_STATE_FAILED, + CONN_STATE_USER +} conn_state_t; + +#define CONTEXT_NO_OWNER -1 + +typedef enum { + CXTR_STATE_CONNECTING = 0, + CXTR_STATE_OPEN, + CXTR_STATE_FAILED +} cxtr_state_t; + + +struct dx_listener_t { + const dx_server_config_t *config; + void *context; + pn_listener_t *pn_listener; +}; + + +struct dx_connector_t { + cxtr_state_t state; + const dx_server_config_t *config; + void *context; + dx_connection_t *ctx; + dx_timer_t *timer; + long delay; +}; + + +struct dx_connection_t { + conn_state_t state; + int owner_thread; + int enqueued; + pn_connector_t *pn_cxtr; + pn_connection_t *pn_conn; + dx_listener_t *listener; + dx_connector_t *connector; + void *context; // Copy of context from listener or connector + void *user_context; + dx_user_fd_t *ufd; +}; + + +struct dx_user_fd_t { + void *context; + int fd; + pn_connector_t *pn_conn; +}; + + +ALLOC_DECLARE(dx_listener_t); +ALLOC_DECLARE(dx_connector_t); +ALLOC_DECLARE(dx_connection_t); +ALLOC_DECLARE(dx_user_fd_t); + + +#endif diff --git a/extras/dispatch/src/timer.c b/extras/dispatch/src/timer.c new file mode 100644 index 0000000000..b6b4864e26 --- /dev/null +++ b/extras/dispatch/src/timer.c @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "timer_private.h" +#include "server_private.h" +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/alloc.h> +#include <assert.h> +#include <stdio.h> + +static sys_mutex_t *lock; +static dx_timer_list_t idle_timers; +static dx_timer_list_t scheduled_timers; +static long time_base; + +ALLOC_DECLARE(dx_timer_t); +ALLOC_DEFINE(dx_timer_t); + +//========================================================================= +// Private static functions +//========================================================================= + +static void dx_timer_cancel_LH(dx_timer_t *timer) +{ + switch (timer->state) { + case TIMER_FREE: + assert(0); + break; + + case TIMER_IDLE: + break; + + case TIMER_SCHEDULED: + if (timer->next) + timer->next->delta_time += timer->delta_time; + DEQ_REMOVE(scheduled_timers, timer); + DEQ_INSERT_TAIL(idle_timers, timer); + break; + + case TIMER_PENDING: + dx_server_timer_cancel_LH(timer); + break; + } + + timer->state = TIMER_IDLE; +} + + +//========================================================================= +// Public Functions from timer.h +//========================================================================= + +dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context) +{ + dx_timer_t *timer = new_dx_timer_t(); + if (!timer) + return 0; + + DEQ_ITEM_INIT(timer); + + timer->handler = cb; + timer->context = context; + timer->delta_time = 0; + timer->state = TIMER_IDLE; + + sys_mutex_lock(lock); + DEQ_INSERT_TAIL(idle_timers, timer); + sys_mutex_unlock(lock); + + return timer; +} + + +void dx_timer_free(dx_timer_t *timer) +{ + sys_mutex_lock(lock); + dx_timer_cancel_LH(timer); + DEQ_REMOVE(idle_timers, timer); + sys_mutex_unlock(lock); + + timer->state = TIMER_FREE; + free_dx_timer_t(timer); +} + + +void dx_timer_schedule(dx_timer_t *timer, long duration) +{ + dx_timer_t *ptr; + dx_timer_t *last; + long total_time; + + sys_mutex_lock(lock); + dx_timer_cancel_LH(timer); // Timer is now on the idle list + assert(timer->state == TIMER_IDLE); + DEQ_REMOVE(idle_timers, timer); + + // + // Handle the special case of a zero-time scheduling. In this case, + // the timer doesn't go on the scheduled list. It goes straight to the + // pending list in the server. + // + if (duration == 0) { + timer->state = TIMER_PENDING; + dx_server_timer_pending_LH(timer); + sys_mutex_unlock(lock); + return; + } + + // + // Find the insert point in the schedule. + // + total_time = 0; + ptr = DEQ_HEAD(scheduled_timers); + assert(!ptr || ptr->prev == 0); + while (ptr) { + total_time += ptr->delta_time; + if (total_time > duration) + break; + ptr = ptr->next; + } + + // + // Insert the timer into the schedule and adjust the delta time + // of the following timer if present. + // + if (total_time <= duration) { + assert(ptr == 0); + timer->delta_time = duration - total_time; + DEQ_INSERT_TAIL(scheduled_timers, timer); + } else { + total_time -= ptr->delta_time; + timer->delta_time = duration - total_time; + assert(ptr->delta_time > timer->delta_time); + ptr->delta_time -= timer->delta_time; + last = ptr->prev; + if (last) + DEQ_INSERT_AFTER(scheduled_timers, timer, last); + else + DEQ_INSERT_HEAD(scheduled_timers, timer); + } + + timer->state = TIMER_SCHEDULED; + + sys_mutex_unlock(lock); +} + + +void dx_timer_cancel(dx_timer_t *timer) +{ + sys_mutex_lock(lock); + dx_timer_cancel_LH(timer); + sys_mutex_unlock(lock); +} + + +//========================================================================= +// Private Functions from timer_private.h +//========================================================================= + +void dx_timer_initialize(sys_mutex_t *server_lock) +{ + lock = server_lock; + DEQ_INIT(idle_timers); + DEQ_INIT(scheduled_timers); + time_base = 0; +} + + +void dx_timer_finalize(void) +{ + lock = 0; +} + + +long dx_timer_next_duration_LH(void) +{ + dx_timer_t *timer = DEQ_HEAD(scheduled_timers); + if (timer) + return timer->delta_time; + return -1; +} + + +void dx_timer_visit_LH(long current_time) +{ + long delta; + dx_timer_t *timer = DEQ_HEAD(scheduled_timers); + + if (time_base == 0) { + time_base = current_time; + return; + } + + delta = current_time - time_base; + time_base = current_time; + + while (timer) { + assert(delta >= 0); + if (timer->delta_time > delta) { + timer->delta_time -= delta; + break; + } else { + DEQ_REMOVE_HEAD(scheduled_timers); + delta -= timer->delta_time; + timer->state = TIMER_PENDING; + dx_server_timer_pending_LH(timer); + + } + timer = DEQ_HEAD(scheduled_timers); + } +} + + +void dx_timer_idle_LH(dx_timer_t *timer) +{ + timer->state = TIMER_IDLE; + DEQ_INSERT_TAIL(idle_timers, timer); +} + diff --git a/extras/dispatch/src/timer_private.h b/extras/dispatch/src/timer_private.h new file mode 100644 index 0000000000..618297b18e --- /dev/null +++ b/extras/dispatch/src/timer_private.h @@ -0,0 +1,51 @@ +#ifndef __timer_private_h__ +#define __timer_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/threading.h> + +typedef enum { + TIMER_FREE, + TIMER_IDLE, + TIMER_SCHEDULED, + TIMER_PENDING +} dx_timer_state_t; + + +struct dx_timer_t { + DEQ_LINKS(dx_timer_t); + dx_timer_cb_t handler; + void *context; + long delta_time; + dx_timer_state_t state; +}; + +DEQ_DECLARE(dx_timer_t, dx_timer_list_t); + +void dx_timer_initialize(sys_mutex_t *server_lock); +void dx_timer_finalize(void); +long dx_timer_next_duration_LH(void); +void dx_timer_visit_LH(long current_time); +void dx_timer_idle_LH(dx_timer_t *timer); + + +#endif diff --git a/extras/dispatch/src/work_queue.c b/extras/dispatch/src/work_queue.c new file mode 100644 index 0000000000..4b3c5d7fa5 --- /dev/null +++ b/extras/dispatch/src/work_queue.c @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include "work_queue.h" +#include <string.h> +#include <stdio.h> + +#define BATCH_SIZE 100 +typedef struct work_item_t work_item_t; + +struct work_item_t { + DEQ_LINKS(work_item_t); + pn_connector_t *conn; +}; + +DEQ_DECLARE(work_item_t, work_list_t); + +struct work_queue_t { + work_list_t items; + work_list_t free_list; +}; + +static void allocate_batch(work_queue_t *w) +{ + int i; + work_item_t *batch = NEW_ARRAY(work_item_t, BATCH_SIZE); + if (!batch) + return; + + memset(batch, 0, sizeof(work_item_t) * BATCH_SIZE); + + for (i = 0; i < BATCH_SIZE; i++) + DEQ_INSERT_TAIL(w->free_list, &batch[i]); +} + + +work_queue_t *work_queue(void) +{ + work_queue_t *w = NEW(work_queue_t); + if (!w) + return 0; + + DEQ_INIT(w->items); + DEQ_INIT(w->free_list); + + allocate_batch(w); + + return w; +} + + +void work_queue_free(work_queue_t *w) +{ + if (!w) + return; + + // KEEP TRACK OF BATCHES AND FREE + free(w); +} + + +void work_queue_put(work_queue_t *w, pn_connector_t *conn) +{ + work_item_t *item; + + if (!w) + return; + if (DEQ_SIZE(w->free_list) == 0) + allocate_batch(w); + if (DEQ_SIZE(w->free_list) == 0) + return; + + item = DEQ_HEAD(w->free_list); + DEQ_REMOVE_HEAD(w->free_list); + + item->conn = conn; + + DEQ_INSERT_TAIL(w->items, item); +} + + +pn_connector_t *work_queue_get(work_queue_t *w) +{ + work_item_t *item; + pn_connector_t *conn; + + if (!w) + return 0; + item = DEQ_HEAD(w->items); + if (!item) + return 0; + + DEQ_REMOVE_HEAD(w->items); + conn = item->conn; + item->conn = 0; + + DEQ_INSERT_TAIL(w->free_list, item); + + return conn; +} + + +int work_queue_empty(work_queue_t *w) +{ + return !w || DEQ_SIZE(w->items) == 0; +} + + +int work_queue_depth(work_queue_t *w) +{ + if (!w) + return 0; + return DEQ_SIZE(w->items); +} + diff --git a/extras/dispatch/src/work_queue.h b/extras/dispatch/src/work_queue.h new file mode 100644 index 0000000000..597a484a9c --- /dev/null +++ b/extras/dispatch/src/work_queue.h @@ -0,0 +1,33 @@ +#ifndef __work_queue_h__ +#define __work_queue_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/driver.h> + +typedef struct work_queue_t work_queue_t; + +work_queue_t *work_queue(void); +void work_queue_free(work_queue_t *w); +void work_queue_put(work_queue_t *w, pn_connector_t *conn); +pn_connector_t *work_queue_get(work_queue_t *w); +int work_queue_empty(work_queue_t *w); +int work_queue_depth(work_queue_t *w); + +#endif |