/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include "message_private.h" #include #include ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0); ALLOC_DEFINE(dx_message_content_t); static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) { unsigned char *local_cursor = *cursor; dx_buffer_t *local_buffer = *buffer; int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); while (consume > 0) { if (consume < remaining) { local_cursor += consume; consume = 0; } else { consume -= remaining; local_buffer = local_buffer->next; if (local_buffer == 0){ local_cursor = 0; break; } local_cursor = dx_buffer_base(local_buffer); remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); } } *cursor = local_cursor; *buffer = local_buffer; } static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer) { unsigned char result = **cursor; advance(cursor, buffer, 1); return result; } static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field) { unsigned char tag = next_octet(cursor, buffer); if (!(*cursor)) return 0; int consume = 0; switch (tag & 0xF0) { case 0x40 : consume = 0; break; case 0x50 : consume = 1; break; case 0x60 : consume = 2; break; case 0x70 : consume = 4; break; case 0x80 : consume = 8; break; case 0x90 : consume = 16; break; case 0xB0 : case 0xD0 : case 0xF0 : consume |= ((int) next_octet(cursor, buffer)) << 24; if (!(*cursor)) return 0; consume |= ((int) next_octet(cursor, buffer)) << 16; if (!(*cursor)) return 0; consume |= ((int) next_octet(cursor, buffer)) << 8; if (!(*cursor)) return 0; // Fall through to the next case... case 0xA0 : case 0xC0 : case 0xE0 : consume |= (int) next_octet(cursor, buffer); if (!(*cursor)) return 0; break; } if (field) { field->buffer = *buffer; field->offset = *cursor - dx_buffer_base(*buffer); field->length = consume; field->parsed = 1; } advance(cursor, buffer, consume); return 1; } static int start_list(unsigned char **cursor, dx_buffer_t **buffer) { unsigned char tag = next_octet(cursor, buffer); if (!(*cursor)) return 0; int length = 0; int count = 0; switch (tag) { case 0x45 : // list0 break; case 0xd0 : // list32 length |= ((int) next_octet(cursor, buffer)) << 24; if (!(*cursor)) return 0; length |= ((int) next_octet(cursor, buffer)) << 16; if (!(*cursor)) return 0; length |= ((int) next_octet(cursor, buffer)) << 8; if (!(*cursor)) return 0; length |= (int) next_octet(cursor, buffer); if (!(*cursor)) return 0; count |= ((int) next_octet(cursor, buffer)) << 24; if (!(*cursor)) return 0; count |= ((int) next_octet(cursor, buffer)) << 16; if (!(*cursor)) return 0; count |= ((int) next_octet(cursor, buffer)) << 8; if (!(*cursor)) return 0; count |= (int) next_octet(cursor, buffer); if (!(*cursor)) return 0; break; case 0xc0 : // list8 length |= (int) next_octet(cursor, buffer); if (!(*cursor)) return 0; count |= (int) next_octet(cursor, buffer); if (!(*cursor)) return 0; break; } return count; } // // Check the buffer chain, starting at cursor to see if it matches the pattern. // If the pattern matches, check the next tag to see if it's in the set of expected // tags. If not, return zero. If so, set the location descriptor to the good // tag and advance the cursor (and buffer, if needed) to the end of the matched section. // // If there is no match, don't advance the cursor. // // Return 0 if the pattern matches but the following tag is unexpected // Return 0 if the pattern matches and the location already has a pointer (duplicate section) // Return 1 if the pattern matches and we've advanced the cursor/buffer // Return 1 if the pattern does not match // static int dx_check_and_advance(dx_buffer_t **buffer, unsigned char **cursor, unsigned char *pattern, int pattern_length, unsigned char *expected_tags, dx_field_location_t *location) { dx_buffer_t *test_buffer = *buffer; unsigned char *test_cursor = *cursor; if (!test_cursor) return 1; // no match unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer); int idx = 0; while (idx < pattern_length && *test_cursor == pattern[idx]) { idx++; test_cursor++; if (test_cursor == end_of_buffer) { test_buffer = test_buffer->next; if (test_buffer == 0) return 1; // Pattern didn't match test_cursor = dx_buffer_base(test_buffer); end_of_buffer = test_cursor + dx_buffer_size(test_buffer); } } if (idx < pattern_length) return 1; // Pattern didn't match // // Pattern matched, check the tag // while (*expected_tags && *test_cursor != *expected_tags) expected_tags++; if (*expected_tags == 0) return 0; // Unexpected tag if (location->parsed) return 0; // Duplicate section // // Pattern matched and tag is expected. Mark the beginning of the section. // location->parsed = 1; location->buffer = test_buffer; location->offset = test_cursor - dx_buffer_base(test_buffer); location->length = 0; // // Advance the pointers to consume the whole section. // int consume = 0; unsigned char tag = next_octet(&test_cursor, &test_buffer); if (!test_cursor) return 0; switch (tag) { case 0x45 : // list0 break; case 0xd0 : // list32 case 0xd1 : // map32 case 0xb0 : // vbin32 consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24; if (!test_cursor) return 0; consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16; if (!test_cursor) return 0; consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8; if (!test_cursor) return 0; // Fall through to the next case... case 0xc0 : // list8 case 0xc1 : // map8 case 0xa0 : // vbin8 consume |= (int) next_octet(&test_cursor, &test_buffer); if (!test_cursor) return 0; break; } if (consume) advance(&test_cursor, &test_buffer, consume); *cursor = test_cursor; *buffer = test_buffer; return 1; } static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len) { dx_buffer_t *buf = DEQ_TAIL(msg->buffers); while (len > 0) { if (buf == 0 || dx_buffer_capacity(buf) == 0) { buf = dx_allocate_buffer(); if (buf == 0) return; DEQ_INSERT_TAIL(msg->buffers, buf); } size_t to_copy = dx_buffer_capacity(buf); if (to_copy > len) to_copy = len; memcpy(dx_buffer_cursor(buf), seq, to_copy); dx_buffer_insert(buf, to_copy); len -= to_copy; seq += to_copy; msg->length += to_copy; } } static void dx_insert_8(dx_message_content_t *msg, uint8_t value) { dx_insert(msg, &value, 1); } static void dx_insert_32(dx_message_content_t *msg, uint32_t value) { uint8_t buf[4]; buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); buf[3] = (uint8_t) (value & 0x000000FF); dx_insert(msg, buf, 4); } static void dx_insert_64(dx_message_content_t *msg, uint64_t value) { uint8_t buf[8]; buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48); buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40); buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32); buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24); buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); buf[7] = (uint8_t) (value & 0x00000000000000FFL); dx_insert(msg, buf, 8); } static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) { while (*buf) { if (*cursor >= dx_buffer_size(*buf)) { *buf = (*buf)->next; *cursor = 0; } else { dx_buffer_base(*buf)[*cursor] = value; (*cursor)++; return; } } } static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) { dx_buffer_t *buf = field->buffer; size_t cursor = field->offset; dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); } static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code) { // // Insert the short-form performative tag // dx_insert(msg, (const uint8_t*) "\x00\x53", 2); dx_insert_8(msg, code); // // Open the list with a list32 tag // dx_insert_8(msg, 0xd0); // // Mark the current location to later overwrite the length // msg->compose_length.buffer = DEQ_TAIL(msg->buffers); msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer); msg->compose_length.length = 4; msg->compose_length.parsed = 1; dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); // // Mark the current location to later overwrite the count // msg->compose_count.buffer = DEQ_TAIL(msg->buffers); msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer); msg->compose_count.length = 4; msg->compose_count.parsed = 1; dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); msg->length = 4; // Include the length of the count field msg->count = 0; } static void dx_end_list(dx_message_content_t *msg) { dx_overwrite_32(&msg->compose_length, msg->length); dx_overwrite_32(&msg->compose_count, msg->count); } static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field) { dx_message_content_t *content = MSG_CONTENT(msg); switch (field) { case DX_FIELD_TO: while (1) { if (content->field_to.parsed) return &content->field_to; if (content->section_message_properties.parsed == 0) break; dx_buffer_t *buffer = content->section_message_properties.buffer; unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; int count = start_list(&cursor, &buffer); int result; if (count < 3) break; result = traverse_field(&cursor, &buffer, 0); // message_id if (!result) return 0; result = traverse_field(&cursor, &buffer, 0); // user_id if (!result) return 0; result = traverse_field(&cursor, &buffer, &content->field_to); // to if (!result) return 0; } break; case DX_FIELD_BODY: while (1) { if (content->body.parsed) return &content->body; if (content->section_body.parsed == 0) break; dx_buffer_t *buffer = content->section_body.buffer; unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset; int result; result = traverse_field(&cursor, &buffer, &content->body); if (!result) return 0; } break; default: break; } return 0; } dx_message_t *dx_allocate_message() { dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t(); if (!msg) return 0; DEQ_ITEM_INIT(msg); msg->content = new_dx_message_content_t(); msg->out_delivery = 0; if (msg->content == 0) { free_dx_message_t((dx_message_t*) msg); return 0; } memset(msg->content, 0, sizeof(dx_message_content_t)); msg->content->lock = sys_mutex(); msg->content->ref_count = 1; return (dx_message_t*) msg; } void dx_free_message(dx_message_t *in_msg) { uint32_t rc; dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; dx_message_content_t *content = msg->content; sys_mutex_lock(content->lock); rc = --content->ref_count; sys_mutex_unlock(content->lock); if (rc == 0) { dx_buffer_t *buf = DEQ_HEAD(content->buffers); while (buf) { DEQ_REMOVE_HEAD(content->buffers); dx_free_buffer(buf); buf = DEQ_HEAD(content->buffers); } sys_mutex_free(content->lock); free_dx_message_content_t(content); } free_dx_message_t((dx_message_t*) msg); } dx_message_t *dx_message_copy(dx_message_t *in_msg) { dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; dx_message_content_t *content = msg->content; dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t(); if (!copy) return 0; DEQ_ITEM_INIT(copy); copy->content = content; copy->out_delivery = 0; sys_mutex_lock(content->lock); content->ref_count++; sys_mutex_unlock(content->lock); return (dx_message_t*) copy; } void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery) { ((dx_message_pvt_t*) msg)->out_delivery = delivery; } pn_delivery_t *dx_message_out_delivery(dx_message_t *msg) { return ((dx_message_pvt_t*) msg)->out_delivery; } void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery) { dx_message_content_t *content = MSG_CONTENT(msg); content->in_delivery = delivery; } pn_delivery_t *dx_message_in_delivery(dx_message_t *msg) { dx_message_content_t *content = MSG_CONTENT(msg); return content->in_delivery; } dx_message_t *dx_message_receive(pn_delivery_t *delivery) { pn_link_t *link = pn_delivery_link(delivery); dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery); ssize_t rc; dx_buffer_t *buf; // // If there is no message associated with the delivery, this is the first time // we've received anything on this delivery. Allocate a message descriptor and // link it and the delivery together. // if (!msg) { msg = (dx_message_pvt_t*) dx_allocate_message(); pn_delivery_set_context(delivery, (void*) msg); // // Record the incoming delivery only if it is not settled. If it is // settled, it should not be recorded as no future operations on it are // permitted. // if (!pn_delivery_settled(delivery)) msg->content->in_delivery = delivery; } // // Get a reference to the tail buffer on the message. This is the buffer into which // we will store incoming message data. If there is no buffer in the message, allocate // an empty one and add it to the message. // buf = DEQ_TAIL(msg->content->buffers); if (!buf) { buf = dx_allocate_buffer(); DEQ_INSERT_TAIL(msg->content->buffers, buf); } while (1) { // // Try to receive enough data to fill the remaining space in the tail buffer. // rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf)); // // If we receive PN_EOS, we have come to the end of the message. // if (rc == PN_EOS) { // // If the last buffer in the list is empty, remove it and free it. This // will only happen if the size of the message content is an exact multiple // of the buffer size. // if (dx_buffer_size(buf) == 0) { DEQ_REMOVE_TAIL(msg->content->buffers); dx_free_buffer(buf); } return (dx_message_t*) msg; } if (rc > 0) { // // We have received a positive number of bytes for the message. Advance // the cursor in the buffer. // dx_buffer_insert(buf, rc); // // If the buffer is full, allocate a new empty buffer and append it to the // tail of the message's list. // if (dx_buffer_capacity(buf) == 0) { buf = dx_allocate_buffer(); DEQ_INSERT_TAIL(msg->content->buffers, buf); } } else // // We received zero bytes, and no PN_EOS. This means that we've received // all of the data available up to this point, but it does not constitute // the entire message. We'll be back later to finish it up. // break; } return 0; } void dx_message_send(dx_message_t *in_msg, pn_link_t *link) { dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); // TODO - Handle cases where annotations have been added or modified while (buf) { pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); buf = DEQ_NEXT(buf); } } int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth) { #define LONG 10 #define SHORT 3 #define MSG_HDR_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70" #define MSG_HDR_SHORT (unsigned char*) "\x00\x53\x70" #define DELIVERY_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71" #define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71" #define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72" #define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72" #define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73" #define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73" #define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74" #define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74" #define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75" #define BODY_DATA_SHORT (unsigned char*) "\x00\x53\x75" #define BODY_SEQUENCE_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76" #define BODY_SEQUENCE_SHORT (unsigned char*) "\x00\x53\x76" #define FOOTER_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78" #define FOOTER_SHORT (unsigned char*) "\x00\x53\x78" #define TAGS_LIST (unsigned char*) "\x45\xc0\xd0" #define TAGS_MAP (unsigned char*) "\xc1\xd1" #define TAGS_BINARY (unsigned char*) "\xa0\xb0" dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; dx_message_content_t *content = msg->content; dx_buffer_t *buffer = DEQ_HEAD(content->buffers); unsigned char *cursor; if (!buffer) return 0; // Invalid - No data in the message if (depth == DX_DEPTH_NONE) return 1; cursor = dx_buffer_base(buffer); // // MESSAGE HEADER // if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header)) return 0; if (depth == DX_DEPTH_HEADER) return 1; // // DELIVERY ANNOTATION // if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation)) return 0; if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS) return 1; // // MESSAGE ANNOTATION // if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation)) return 0; if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS) return 1; // // PROPERTIES // if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties)) return 0; if (depth == DX_DEPTH_PROPERTIES) return 1; // // APPLICATION PROPERTIES // if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties)) return 0; if (depth == DX_DEPTH_APPLICATION_PROPERTIES) return 1; // // BODY (Note that this function expects a single data section or a single AMQP sequence) // if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body)) return 0; if (depth == DX_DEPTH_BODY) return 1; // // FOOTER // if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer)) return 0; if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer)) return 0; return 1; } dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field) { dx_field_location_t *loc = dx_message_field_location(msg, field); if (!loc) return 0; return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); } dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field) { dx_field_location_t *loc = dx_message_field_location(msg, field); if (!loc) return 0; // // Count the number of buffers this field straddles // int bufcnt = 1; dx_buffer_t *buf = loc->buffer; size_t bufsize = dx_buffer_size(buf) - loc->offset; ssize_t remaining = loc->length - bufsize; while (remaining > 0) { bufcnt++; buf = buf->next; if (!buf) return 0; remaining -= dx_buffer_size(buf); } // // Allocate an iovec object big enough to hold the number of buffers // dx_iovec_t *iov = dx_iovec(bufcnt); if (!iov) return 0; // // Build out the io vectors with pointers to the segments of the field in buffers // bufcnt = 0; buf = loc->buffer; bufsize = dx_buffer_size(buf) - loc->offset; void *base = dx_buffer_base(buf) + loc->offset; remaining = loc->length; while (remaining > 0) { dx_iovec_array(iov)[bufcnt].iov_base = base; dx_iovec_array(iov)[bufcnt].iov_len = bufsize; bufcnt++; remaining -= bufsize; if (remaining > 0) { buf = buf->next; base = dx_buffer_base(buf); bufsize = dx_buffer_size(buf); if (bufsize > remaining) bufsize = remaining; } } return iov; } void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers) { dx_message_begin_header(msg); dx_message_insert_boolean(msg, 0); // durable //dx_message_insert_null(msg); // priority //dx_message_insert_null(msg); // ttl //dx_message_insert_boolean(msg, 0); // first-acquirer //dx_message_insert_uint(msg, 0); // delivery-count dx_message_end_header(msg); dx_message_begin_message_properties(msg); dx_message_insert_null(msg); // message-id dx_message_insert_null(msg); // user-id dx_message_insert_string(msg, to); // to //dx_message_insert_null(msg); // subject //dx_message_insert_null(msg); // reply-to //dx_message_insert_null(msg); // correlation-id //dx_message_insert_null(msg); // content-type //dx_message_insert_null(msg); // content-encoding //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time //dx_message_insert_timestamp(msg, 0); // creation-time //dx_message_insert_null(msg); // group-id //dx_message_insert_uint(msg, 0); // group-sequence //dx_message_insert_null(msg); // reply-to-group-id dx_message_end_message_properties(msg); if (buffers) dx_message_append_body_data(msg, buffers); } void dx_message_begin_header(dx_message_t *msg) { dx_start_list_performative(MSG_CONTENT(msg), 0x70); } void dx_message_end_header(dx_message_t *msg) { dx_end_list(MSG_CONTENT(msg)); } void dx_message_begin_delivery_annotations(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_end_delivery_annotations(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_begin_message_annotations(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_end_message_annotations(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_begin_message_properties(dx_message_t *msg) { dx_start_list_performative(MSG_CONTENT(msg), 0x73); } void dx_message_end_message_properties(dx_message_t *msg) { dx_end_list(MSG_CONTENT(msg)); } void dx_message_begin_application_properties(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_end_application_properties(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers) { dx_message_content_t *content = MSG_CONTENT(msg); dx_buffer_t *buf = DEQ_HEAD(*buffers); uint32_t len = 0; // // Calculate the size of the body to be appended. // while (buf) { len += dx_buffer_size(buf); buf = DEQ_NEXT(buf); } // // Insert a DATA section performative header. // dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); if (len < 256) { dx_insert_8(content, 0xa0); // vbin8 dx_insert_8(content, (uint8_t) len); } else { dx_insert_8(content, 0xb0); // vbin32 dx_insert_32(content, len); } // // Move the supplied buffers to the tail of the message's buffer list. // buf = DEQ_HEAD(*buffers); while (buf) { DEQ_REMOVE_HEAD(*buffers); DEQ_INSERT_TAIL(content->buffers, buf); buf = DEQ_HEAD(*buffers); } } void dx_message_begin_body_sequence(dx_message_t *msg) { } void dx_message_end_body_sequence(dx_message_t *msg) { } void dx_message_begin_footer(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_end_footer(dx_message_t *msg) { assert(0); // Not Implemented } void dx_message_insert_null(dx_message_t *msg) { dx_message_content_t *content = MSG_CONTENT(msg); dx_insert_8(content, 0x40); content->count++; } void dx_message_insert_boolean(dx_message_t *msg, int value) { dx_message_content_t *content = MSG_CONTENT(msg); if (value) dx_insert(content, (const uint8_t*) "\x56\x01", 2); else dx_insert(content, (const uint8_t*) "\x56\x00", 2); content->count++; } void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value) { dx_message_content_t *content = MSG_CONTENT(msg); dx_insert_8(content, 0x50); dx_insert_8(content, value); content->count++; } void dx_message_insert_uint(dx_message_t *msg, uint32_t value) { dx_message_content_t *content = MSG_CONTENT(msg); if (value == 0) { dx_insert_8(content, 0x43); // uint0 } else if (value < 256) { dx_insert_8(content, 0x52); // smalluint dx_insert_8(content, (uint8_t) value); } else { dx_insert_8(content, 0x70); // uint dx_insert_32(content, value); } content->count++; } void dx_message_insert_ulong(dx_message_t *msg, uint64_t value) { dx_message_content_t *content = MSG_CONTENT(msg); if (value == 0) { dx_insert_8(content, 0x44); // ulong0 } else if (value < 256) { dx_insert_8(content, 0x53); // smallulong dx_insert_8(content, (uint8_t) value); } else { dx_insert_8(content, 0x80); // ulong dx_insert_64(content, value); } content->count++; } void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len) { dx_message_content_t *content = MSG_CONTENT(msg); if (len < 256) { dx_insert_8(content, 0xa0); // vbin8 dx_insert_8(content, (uint8_t) len); } else { dx_insert_8(content, 0xb0); // vbin32 dx_insert_32(content, len); } dx_insert(content, start, len); content->count++; } void dx_message_insert_string(dx_message_t *msg, const char *start) { dx_message_content_t *content = MSG_CONTENT(msg); uint32_t len = strlen(start); if (len < 256) { dx_insert_8(content, 0xa1); // str8-utf8 dx_insert_8(content, (uint8_t) len); dx_insert(content, (const uint8_t*) start, len); } else { dx_insert_8(content, 0xb1); // str32-utf8 dx_insert_32(content, len); dx_insert(content, (const uint8_t*) start, len); } content->count++; } void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value) { dx_message_content_t *content = MSG_CONTENT(msg); dx_insert_8(content, 0x98); // uuid dx_insert(content, value, 16); content->count++; } void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len) { dx_message_content_t *content = MSG_CONTENT(msg); if (len < 256) { dx_insert_8(content, 0xa3); // sym8 dx_insert_8(content, (uint8_t) len); dx_insert(content, (const uint8_t*) start, len); } else { dx_insert_8(content, 0xb3); // sym32 dx_insert_32(content, len); dx_insert(content, (const uint8_t*) start, len); } content->count++; } void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value) { dx_message_content_t *content = MSG_CONTENT(msg); dx_insert_8(content, 0x83); // timestamp dx_insert_64(content, value); content->count++; } void dx_message_begin_list(dx_message_t* msg) { assert(0); // Not Implemented } void dx_message_end_list(dx_message_t* msg) { assert(0); // Not Implemented } void dx_message_begin_map(dx_message_t* msg) { assert(0); // Not Implemented } void dx_message_end_map(dx_message_t* msg) { assert(0); // Not Implemented }