summaryrefslogtreecommitdiff
path: root/qpid/extras
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-01-15 23:28:40 +0000
committerTed Ross <tross@apache.org>2013-01-15 23:28:40 +0000
commitc7f026821b3d179b3e7e34a0b3eb02ae0a965f42 (patch)
tree799953ffa997b315fb3c3ce91d894e5f1704ef2c /qpid/extras
parent32dd02334903931ef8c7e35c2bfff6efa840f69d (diff)
downloadqpid-python-c7f026821b3d179b3e7e34a0b3eb02ae0a965f42.tar.gz
QPID-4538 - Experimental server/container for AMQP based on Proton-c.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1433735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras')
-rw-r--r--qpid/extras/nexus/CMakeLists.txt94
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/alloc.h70
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/container.h122
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/ctools.h146
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/hash.h37
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/iterator.h114
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/log.h31
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/message.h162
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/server.h403
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/threading.h45
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/timer.h86
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/user_fd.h121
-rw-r--r--qpid/extras/nexus/site/css/style.css280
-rw-r--r--qpid/extras/nexus/site/images/gwarch.diabin0 -> 1370 bytes
-rw-r--r--qpid/extras/nexus/site/images/gwarch.pngbin0 -> 7941 bytes
-rw-r--r--qpid/extras/nexus/site/includes/footer.include7
-rw-r--r--qpid/extras/nexus/site/includes/header.include6
-rw-r--r--qpid/extras/nexus/site/includes/menu.include71
-rwxr-xr-xqpid/extras/nexus/site/index.html98
-rw-r--r--qpid/extras/nexus/src/alloc.c202
-rw-r--r--qpid/extras/nexus/src/alloc_private.h26
-rw-r--r--qpid/extras/nexus/src/auth.c75
-rw-r--r--qpid/extras/nexus/src/auth.h27
-rw-r--r--qpid/extras/nexus/src/container.c620
-rw-r--r--qpid/extras/nexus/src/hash.c223
-rw-r--r--qpid/extras/nexus/src/iterator.c268
-rw-r--r--qpid/extras/nexus/src/log.c56
-rw-r--r--qpid/extras/nexus/src/message.c1164
-rw-r--r--qpid/extras/nexus/src/posix/threading.c126
-rw-r--r--qpid/extras/nexus/src/server.c903
-rw-r--r--qpid/extras/nexus/src/server_private.h95
-rw-r--r--qpid/extras/nexus/src/timer.c240
-rw-r--r--qpid/extras/nexus/src/timer_private.h51
-rw-r--r--qpid/extras/nexus/src/work_queue.c132
-rw-r--r--qpid/extras/nexus/src/work_queue.h33
-rw-r--r--qpid/extras/nexus/tests/CMakeLists.txt34
-rw-r--r--qpid/extras/nexus/tests/alloc_test.c86
-rw-r--r--qpid/extras/nexus/tests/message_test.c131
-rw-r--r--qpid/extras/nexus/tests/run_tests.c36
-rw-r--r--qpid/extras/nexus/tests/server_test.c195
-rw-r--r--qpid/extras/nexus/tests/test_case.h36
-rw-r--r--qpid/extras/nexus/tests/timer_test.c386
-rw-r--r--qpid/extras/nexus/tests/tool_test.c159
43 files changed, 7197 insertions, 0 deletions
diff --git a/qpid/extras/nexus/CMakeLists.txt b/qpid/extras/nexus/CMakeLists.txt
new file mode 100644
index 0000000000..04c33c35e2
--- /dev/null
+++ b/qpid/extras/nexus/CMakeLists.txt
@@ -0,0 +1,94 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License.
+##
+
+cmake_minimum_required(VERSION 2.6)
+include(CheckLibraryExists)
+include(CheckSymbolExists)
+
+project(qpid-nexus C)
+
+set (SO_VERSION_MAJOR 0)
+set (SO_VERSION_MINOR 1)
+set (SO_VERSION "${SO_VERSION_MAJOR}.${SO_VERSION_MINOR}")
+
+if (NOT DEFINED LIB_SUFFIX)
+ get_property(LIB64 GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS)
+ if ("${LIB64}" STREQUAL "TRUE" AND ${CMAKE_SIZEOF_VOID_P} STREQUAL "8")
+ set(LIB_SUFFIX 64)
+ else()
+ set(LIB_SUFFIX "")
+ endif()
+endif()
+
+set(INCLUDE_INSTALL_DIR include CACHE PATH "Include file directory")
+set(LIB_INSTALL_DIR "lib${LIB_SUFFIX}" CACHE PATH "Library object file directory")
+set(SYSCONF_INSTALL_DIR etc CACHE PATH "System read only configuration directory")
+set(SHARE_INSTALL_DIR share CACHE PATH "Shared read only data directory")
+set(MAN_INSTALL_DIR share/man CACHE PATH "Manpage directory")
+
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${CMAKE_CURRENT_SOURCE_DIR}/src
+ ${proton_include}
+ )
+
+##
+## Find dependencies
+##
+find_library(proton_lib qpid-proton)
+find_library(pthread_lib pthread)
+find_library(rt_lib rt)
+find_path(proton_include proton/driver.h)
+
+set(CMAKE_C_FLAGS "-pthread -Wall -Werror")
+set(CATCH_UNDEFINED "-Wl,--no-undefined")
+
+##
+## Build the Multi-Threaded Server Library
+##
+set(server_SOURCES
+ src/alloc.c
+ src/auth.c
+ src/container.c
+ src/hash.c
+ src/iterator.c
+ src/log.c
+ src/message.c
+ src/posix/threading.c
+ src/server.c
+ src/timer.c
+ src/work_queue.c
+ )
+
+add_library(qpid-nexus SHARED ${server_SOURCES})
+target_link_libraries(qpid-nexus ${proton_lib} ${pthread_lib} ${rt_lib})
+set_target_properties(qpid-nexus PROPERTIES
+ VERSION "${SO_VERSION}"
+ SOVERSION "${SO_VERSION_MAJOR}"
+ LINK_FLAGS "${CATCH_UNDEFINED}"
+ )
+install(TARGETS qpid-nexus
+ LIBRARY DESTINATION ${LIB_INSTALL_DIR})
+file(GLOB headers "include/qpid/nexus/*.h")
+install(FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/qpid/nexus)
+
+##
+## Build Tests
+##
+add_subdirectory(tests)
diff --git a/qpid/extras/nexus/include/qpid/nexus/alloc.h b/qpid/extras/nexus/include/qpid/nexus/alloc.h
new file mode 100644
index 0000000000..a0c832c069
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/alloc.h
@@ -0,0 +1,70 @@
+#ifndef __nexus_alloc_h__
+#define __nexus_alloc_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <qpid/nexus/threading.h>
+
+typedef struct nx_alloc_pool_t nx_alloc_pool_t;
+
+typedef struct {
+ int transfer_batch_size;
+ int local_free_list_max;
+ int global_free_list_max;
+} nx_alloc_config_t;
+
+typedef struct {
+ uint64_t total_alloc_from_heap;
+ uint64_t total_free_to_heap;
+ uint64_t held_by_threads;
+ uint64_t batches_rebalanced_to_threads;
+ uint64_t batches_rebalanced_to_global;
+} nx_alloc_stats_t;
+
+typedef struct {
+ char *type_name;
+ size_t type_size;
+ nx_alloc_config_t *config;
+ nx_alloc_stats_t *stats;
+ nx_alloc_pool_t *global_pool;
+ sys_mutex_t *lock;
+} nx_alloc_type_desc_t;
+
+
+void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool);
+void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p);
+
+
+#define ALLOC_DECLARE(T) \
+ T *new_##T(); \
+ void free_##T(T *p)
+
+#define ALLOC_DEFINE_CONFIG(T,C) \
+ nx_alloc_type_desc_t __desc_##T = {#T, sizeof(T), C, 0, 0, 0}; \
+ __thread nx_alloc_pool_t *__local_pool_##T = 0; \
+ T *new_##T() { return (T*) nx_alloc(&__desc_##T, &__local_pool_##T); } \
+ void free_##T(T *p) { nx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \
+ nx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; }
+
+#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, 0)
+
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/container.h b/qpid/extras/nexus/include/qpid/nexus/container.h
new file mode 100644
index 0000000000..f6c9839da0
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/container.h
@@ -0,0 +1,122 @@
+#ifndef __container_h__
+#define __container_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+#include <qpid/nexus/server.h>
+#include <qpid/nexus/alloc.h>
+#include <qpid/nexus/ctools.h>
+
+typedef uint8_t nx_dist_mode_t;
+#define NX_DIST_COPY 0x01
+#define NX_DIST_MOVE 0x02
+#define NX_DIST_BOTH 0x03
+
+typedef enum {
+ NX_LIFE_PERMANENT,
+ NX_LIFE_DELETE_CLOSE,
+ NX_LIFE_DELETE_NO_LINKS,
+ NX_LIFE_DELETE_NO_MESSAGES,
+ NX_LIFE_DELETE_NO_LINKS_MESSAGES
+} nx_lifetime_policy_t;
+
+typedef enum {
+ NX_INCOMING,
+ NX_OUTGOING
+} nx_direction_t;
+
+
+typedef struct nx_node_t nx_node_t;
+typedef struct nx_link_t nx_link_t;
+
+typedef void (*nx_container_delivery_handler_t) (void *node_context, nx_link_t *link, pn_delivery_t *delivery);
+typedef int (*nx_container_link_handler_t) (void *node_context, nx_link_t *link);
+typedef int (*nx_container_link_detach_handler_t) (void *node_context, nx_link_t *link, int closed);
+typedef void (*nx_container_node_handler_t) (void *type_context, nx_node_t *node);
+typedef void (*nx_container_conn_handler_t) (void *type_context, nx_connection_t *conn);
+
+typedef struct {
+ char *type_name;
+ void *type_context;
+ int allow_dynamic_creation;
+
+ //
+ // Node-Instance Handlers
+ //
+ nx_container_delivery_handler_t rx_handler;
+ nx_container_delivery_handler_t tx_handler;
+ nx_container_delivery_handler_t disp_handler;
+ nx_container_link_handler_t incoming_handler;
+ nx_container_link_handler_t outgoing_handler;
+ nx_container_link_handler_t writable_handler;
+ nx_container_link_detach_handler_t link_detach_handler;
+
+ //
+ // Node-Type Handlers
+ //
+ nx_container_node_handler_t node_created_handler;
+ nx_container_node_handler_t node_destroyed_handler;
+ nx_container_conn_handler_t inbound_conn_open_handler;
+ nx_container_conn_handler_t outbound_conn_open_handler;
+} nx_node_type_t;
+
+void nx_container_initialize(void);
+void nx_container_finalize(void);
+
+int nx_container_register_node_type(const nx_node_type_t *nt);
+
+void nx_container_set_default_node_type(const nx_node_type_t *nt,
+ void *node_context,
+ nx_dist_mode_t supported_dist);
+
+nx_node_t *nx_container_create_node(const nx_node_type_t *nt,
+ const char *name,
+ void *node_context,
+ nx_dist_mode_t supported_dist,
+ nx_lifetime_policy_t life_policy);
+void nx_container_destroy_node(nx_node_t *node);
+
+void nx_container_node_set_context(nx_node_t *node, void *node_context);
+nx_dist_mode_t nx_container_node_get_dist_modes(const nx_node_t *node);
+nx_lifetime_policy_t nx_container_node_get_life_policy(const nx_node_t *node);
+
+nx_link_t *nx_link(nx_node_t *node, nx_connection_t *conn, nx_direction_t dir, const char *name);
+void nx_link_set_context(nx_link_t *link, void *link_context);
+void *nx_link_get_context(nx_link_t *link);
+pn_link_t *nx_link_pn(nx_link_t *link);
+pn_terminus_t *nx_link_source(nx_link_t *link);
+pn_terminus_t *nx_link_target(nx_link_t *link);
+pn_terminus_t *nx_link_remote_source(nx_link_t *link);
+pn_terminus_t *nx_link_remote_target(nx_link_t *link);
+void nx_link_activate(nx_link_t *link);
+void nx_link_close(nx_link_t *link);
+
+
+typedef struct nx_link_item_t nx_link_item_t;
+
+struct nx_link_item_t {
+ DEQ_LINKS(nx_link_item_t);
+ nx_link_t *link;
+};
+
+ALLOC_DECLARE(nx_link_item_t);
+DEQ_DECLARE(nx_link_item_t, nx_link_list_t);
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/ctools.h b/qpid/extras/nexus/include/qpid/nexus/ctools.h
new file mode 100644
index 0000000000..6b8f072b75
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/ctools.h
@@ -0,0 +1,146 @@
+#ifndef __ctools_h__
+#define __ctools_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+
+#define CT_ASSERT(exp) { assert(exp); }
+
+#define NEW(t) (t*) malloc(sizeof(t))
+#define NEW_ARRAY(t,n) (t*) malloc(sizeof(t)*(n))
+#define NEW_PTR_ARRAY(t,n) (t**) malloc(sizeof(t*)*(n))
+
+#define DEQ_DECLARE(i,d) typedef struct { \
+ i *head; \
+ i *tail; \
+ i *scratch; \
+ size_t size; \
+ } d
+
+#define DEQ_LINKS(t) t *prev; t *next
+
+#define DEQ_INIT(d) do { d.head = 0; d.tail = 0; d.scratch = 0; d.size = 0; } while (0)
+#define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0)
+#define DEQ_HEAD(d) (d.head)
+#define DEQ_TAIL(d) (d.tail)
+#define DEQ_SIZE(d) (d.size)
+#define DEQ_NEXT(i) (i)->next
+#define DEQ_PREV(i) (i)->prev
+
+#define DEQ_INSERT_HEAD(d,i) \
+do { \
+ CT_ASSERT((i)->next == 0); \
+ CT_ASSERT((i)->prev == 0); \
+ if (d.head) { \
+ (i)->next = d.head; \
+ d.head->prev = i; \
+ } else { \
+ d.tail = i; \
+ (i)->next = 0; \
+ CT_ASSERT(d.size == 0); \
+ } \
+ (i)->prev = 0; \
+ d.head = i; \
+ d.size++; \
+} while (0)
+
+#define DEQ_INSERT_TAIL(d,i) \
+do { \
+ CT_ASSERT((i)->next == 0); \
+ CT_ASSERT((i)->prev == 0); \
+ if (d.tail) { \
+ (i)->prev = d.tail; \
+ d.tail->next = i; \
+ } else { \
+ d.head = i; \
+ (i)->prev = 0; \
+ CT_ASSERT(d.size == 0); \
+ } \
+ (i)->next = 0; \
+ d.tail = i; \
+ d.size++; \
+} while (0)
+
+#define DEQ_REMOVE_HEAD(d) \
+do { \
+ CT_ASSERT(d.head); \
+ if (d.head) { \
+ d.scratch = d.head; \
+ d.head = d.head->next; \
+ if (d.head == 0) { \
+ d.tail = 0; \
+ CT_ASSERT(d.size == 1); \
+ } else \
+ d.head->prev = 0; \
+ d.size--; \
+ d.scratch->next = 0; \
+ d.scratch->prev = 0; \
+ } \
+} while (0)
+
+#define DEQ_REMOVE_TAIL(d) \
+do { \
+ CT_ASSERT(d.tail); \
+ if (d.tail) { \
+ d.scratch = d.tail; \
+ d.tail = d.tail->prev; \
+ if (d.tail == 0) { \
+ d.head = 0; \
+ CT_ASSERT(d.size == 1); \
+ } else \
+ d.tail->next = 0; \
+ d.size--; \
+ d.scratch->next = 0; \
+ d.scratch->prev = 0; \
+ } \
+} while (0)
+
+#define DEQ_INSERT_AFTER(d,i,a) \
+do { \
+ CT_ASSERT((i)->next == 0); \
+ CT_ASSERT((i)->prev == 0); \
+ if ((a)->next) \
+ (a)->next->prev = (i); \
+ else \
+ d.tail = (i); \
+ (i)->next = (a)->next; \
+ (i)->prev = (a); \
+ (a)->next = (i); \
+ d.size++; \
+} while (0)
+
+#define DEQ_REMOVE(d,i) \
+do { \
+ if ((i)->next) \
+ (i)->next->prev = (i)->prev; \
+ else \
+ d.tail = (i)->prev; \
+ if ((i)->prev) \
+ (i)->prev->next = (i)->next; \
+ else \
+ d.head = (i)->next; \
+ d.size--; \
+ (i)->next = 0; \
+ (i)->prev = 0; \
+ CT_ASSERT(d.size || (!d.head && !d.tail)); \
+} while (0)
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/hash.h b/qpid/extras/nexus/include/qpid/nexus/hash.h
new file mode 100644
index 0000000000..0efded35e8
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/hash.h
@@ -0,0 +1,37 @@
+#ifndef __hash_h__
+#define __hash_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <qpid/nexus/iterator.h>
+
+typedef struct hash_t hash_t;
+
+hash_t *hash(int bucket_exponent, int batch_size, int value_is_const);
+void hash_free(hash_t *h);
+
+size_t hash_size(hash_t *h);
+int hash_insert(hash_t *h, nx_field_iterator_t *key, void *val);
+int hash_insert_const(hash_t *h, nx_field_iterator_t *key, const void *val);
+int hash_retrieve(hash_t *h, nx_field_iterator_t *key, void **val);
+int hash_retrieve_const(hash_t *h, nx_field_iterator_t *key, const void **val);
+int hash_remove(hash_t *h, nx_field_iterator_t *key);
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/iterator.h b/qpid/extras/nexus/include/qpid/nexus/iterator.h
new file mode 100644
index 0000000000..9aca3d4795
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/iterator.h
@@ -0,0 +1,114 @@
+#ifndef __nexus_iterator_h__
+#define __nexus_iterator_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+typedef struct nx_buffer_t nx_buffer_t;
+
+/**
+ * The field iterator is used to access fields within a buffer chain.
+ * It shields the user from the fact that the field may be split across
+ * one or more physical buffers.
+ */
+typedef struct nx_field_iterator_t nx_field_iterator_t;
+
+/**
+ * Iterator views allow the code traversing the field to see a transformed
+ * view of the raw field.
+ *
+ * ITER_VIEW_ALL - No transformation of the raw field data
+ *
+ * ITER_VIEW_NO_HOST - Remove the scheme and host fields from the view
+ *
+ * amqp://host.domain.com:port/node-id/node/specific
+ * ^^^^^^^^^^^^^^^^^^^^^
+ * node-id/node/specific
+ * ^^^^^^^^^^^^^^^^^^^^^
+ *
+ * ITER_VIEW_NODE_ID - Isolate the node identifier from an address
+ *
+ * amqp://host.domain.com:port/node-id/node/specific
+ * ^^^^^^^
+ * node-id/node/specific
+ * ^^^^^^^
+ *
+ * ITER_VIEW_NODE_SPECIFIC - Isolate node-specific text from an address
+ *
+ * amqp://host.domain.com:port/node-id/node/specific
+ * ^^^^^^^^^^^^^
+ * node-id/node/specific
+ * ^^^^^^^^^^^^^
+ */
+typedef enum {
+ ITER_VIEW_ALL,
+ ITER_VIEW_NO_HOST,
+ ITER_VIEW_NODE_ID,
+ ITER_VIEW_NODE_SPECIFIC
+} nx_iterator_view_t;
+
+/**
+ * Create an iterator from a null-terminated string.
+ *
+ * The "text" string must stay intact for the whole life of the iterator. The iterator
+ * does not copy the string, it references it.
+ */
+nx_field_iterator_t* nx_field_iterator_string(const char *text,
+ nx_iterator_view_t view);
+
+/**
+ * Create an iterator from a field in a buffer chain
+ */
+nx_field_iterator_t *nx_field_iterator_buffer(nx_buffer_t *buffer,
+ int offset,
+ int length,
+ nx_iterator_view_t view);
+
+/**
+ * Free an iterator
+ */
+void nx_field_iterator_free(nx_field_iterator_t *iter);
+
+/**
+ * Reset the iterator to the first octet and set a new view
+ */
+void nx_field_iterator_reset(nx_field_iterator_t *iter,
+ nx_iterator_view_t view);
+
+/**
+ * Return the current octet in the iterator's view and step to the next.
+ */
+unsigned char nx_field_iterator_octet(nx_field_iterator_t *iter);
+
+/**
+ * Return true iff the iterator has no more octets in the view.
+ */
+int nx_field_iterator_end(nx_field_iterator_t *iter);
+
+/**
+ * Compare an input string to the iterator's view. Return true iff they are equal.
+ */
+int nx_field_iterator_equal(nx_field_iterator_t *iter, unsigned char *string);
+
+/**
+ * Return a copy of the iterator's view.
+ */
+unsigned char *nx_field_iterator_copy(nx_field_iterator_t *iter);
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/log.h b/qpid/extras/nexus/include/qpid/nexus/log.h
new file mode 100644
index 0000000000..1376405d13
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/log.h
@@ -0,0 +1,31 @@
+#ifndef __nx_log_h__
+#define __nx_log_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define LOG_NONE 0x00000000
+#define LOG_TRACE 0x00000001
+#define LOG_ERROR 0x00000002
+#define LOG_INFO 0x00000004
+
+void nx_log(const char *module, int cls, const char *fmt, ...);
+
+void nx_log_set_mask(int mask);
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/message.h b/qpid/extras/nexus/include/qpid/nexus/message.h
new file mode 100644
index 0000000000..3bb6b950ea
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/message.h
@@ -0,0 +1,162 @@
+#ifndef __nexus_message_h__
+#define __nexus_message_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/iterator.h>
+
+typedef struct nx_message_t nx_message_t;
+typedef struct nx_buffer_t nx_buffer_t;
+
+DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t);
+DEQ_DECLARE(nx_message_t, nx_message_list_t);
+
+typedef struct {
+ nx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
+ size_t offset; // Offset in the buffer to the first octet
+ size_t length; // Length of the field or zero if unneeded
+ int parsed; // non-zero iff the buffer chain has been parsed to find this field
+} nx_field_location_t;
+
+
+// TODO - consider using pointers to nx_field_location_t below to save memory
+struct nx_message_t {
+ DEQ_LINKS(nx_message_t);
+ nx_buffer_list_t buffers; // The buffer chain containing the message
+ pn_delivery_t *in_delivery; // The delivery on which the message arrived
+ pn_delivery_t *out_delivery; // The delivery on which the message was last sent
+ nx_field_location_t section_message_header; // The message header list
+ nx_field_location_t section_delivery_annotation; // The delivery annotation map
+ nx_field_location_t section_message_annotation; // The message annotation map
+ nx_field_location_t section_message_properties; // The message properties list
+ nx_field_location_t section_application_properties; // The application properties list
+ nx_field_location_t section_body; // The message body: Data
+ nx_field_location_t section_footer; // The footer
+ nx_field_location_t field_user_id; // The string value of the user-id
+ nx_field_location_t field_to; // The string value of the to field
+ nx_field_location_t body; // The body of the message
+ nx_field_location_t compose_length;
+ nx_field_location_t compose_count;
+ uint32_t length;
+ uint32_t count;
+};
+
+struct nx_buffer_t {
+ DEQ_LINKS(nx_buffer_t);
+ unsigned int size;
+};
+
+typedef struct {
+ size_t buffer_size;
+ unsigned long buffer_preallocation_count;
+ unsigned long buffer_rebalancing_batch_count;
+ unsigned long buffer_local_storage_max;
+ unsigned long buffer_free_list_max;
+ unsigned long message_allocation_batch_count;
+ unsigned long message_rebalancing_batch_count;
+ unsigned long message_local_storage_max;
+} nx_allocator_config_t;
+
+const nx_allocator_config_t *nx_allocator_default_config(void);
+
+void nx_allocator_initialize(const nx_allocator_config_t *config);
+void nx_allocator_finalize(void);
+
+//
+// Functions for per-thread allocators.
+//
+nx_message_t *nx_allocate_message(void);
+nx_buffer_t *nx_allocate_buffer(void);
+void nx_free_message(nx_message_t *msg);
+void nx_free_buffer(nx_buffer_t *buf);
+
+
+typedef enum {
+ NX_DEPTH_NONE,
+ NX_DEPTH_HEADER,
+ NX_DEPTH_DELIVERY_ANNOTATIONS,
+ NX_DEPTH_MESSAGE_ANNOTATIONS,
+ NX_DEPTH_MESSAGE_PROPERTIES, // Needed for 'user-id' and 'to'
+ NX_DEPTH_APPLICATION_PROPERTIES,
+ NX_DEPTH_BODY,
+ NX_DEPTH_ALL
+} nx_message_depth_t;
+
+//
+// Functions for received messages
+//
+nx_message_t *nx_message_receive(pn_delivery_t *delivery);
+int nx_message_check(nx_message_t *msg, nx_message_depth_t depth);
+nx_field_iterator_t *nx_message_field_to(nx_message_t *msg);
+nx_field_iterator_t *nx_message_body(nx_message_t *msg);
+
+//
+// Functions for composed messages
+//
+
+// Convenience Functions
+void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain);
+
+// Raw Functions
+void nx_message_begin_header(nx_message_t *msg);
+void nx_message_end_header(nx_message_t *msg);
+
+void nx_message_begin_delivery_annotations(nx_message_t *msg);
+void nx_message_end_delivery_annotations(nx_message_t *msg);
+
+void nx_message_begin_message_annotations(nx_message_t *msg);
+void nx_message_end_message_annotations(nx_message_t *msg);
+
+void nx_message_begin_message_properties(nx_message_t *msg);
+void nx_message_end_message_properties(nx_message_t *msg);
+
+void nx_message_begin_application_properties(nx_message_t *msg);
+void nx_message_end_application_properties(nx_message_t *msg);
+
+void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain);
+
+void nx_message_begin_body_sequence(nx_message_t *msg);
+void nx_message_end_body_sequence(nx_message_t *msg);
+
+void nx_message_begin_footer(nx_message_t *msg);
+void nx_message_end_footer(nx_message_t *msg);
+
+void nx_message_insert_null(nx_message_t *msg);
+void nx_message_insert_boolean(nx_message_t *msg, int value);
+void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value);
+void nx_message_insert_uint(nx_message_t *msg, uint32_t value);
+void nx_message_insert_ulong(nx_message_t *msg, uint64_t value);
+void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len);
+void nx_message_insert_string(nx_message_t *msg, const char *start);
+void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value);
+void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len);
+void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value);
+
+//
+// Functions for buffers
+//
+unsigned char *nx_buffer_base(nx_buffer_t *buf); // Pointer to the first octet in the buffer
+unsigned char *nx_buffer_cursor(nx_buffer_t *buf); // Pointer to the first free octet in the buffer
+size_t nx_buffer_capacity(nx_buffer_t *buf); // Size of free space in the buffer in octets
+size_t nx_buffer_size(nx_buffer_t *buf); // Number of octets in the buffer
+void nx_buffer_insert(nx_buffer_t *buf, size_t len); // Notify the buffer that 'len' octets were written at cursor
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/server.h b/qpid/extras/nexus/include/qpid/nexus/server.h
new file mode 100644
index 0000000000..b04db5cf9a
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/server.h
@@ -0,0 +1,403 @@
+#ifndef __nexus_server_h__
+#define __nexus_server_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+
+/**
+ * \defgroup Control Server Control Functions
+ * @{
+ */
+
+/**
+ * \brief Thread Start Handler
+ *
+ * Callback invoked when a new server thread is started. The callback is
+ * invoked on the newly created thread.
+ *
+ * This handler can be used to set processor affinity or other thread-specific
+ * tuning values.
+ *
+ * @param context The handler context supplied in nx_server_initialize.
+ * @param thread_id The integer thread identifier that uniquely identifies this thread.
+ */
+typedef void (*nx_thread_start_cb_t)(void* context, int thread_id);
+
+
+/**
+ * \brief Initialize the server module and prepare it for operation.
+ *
+ * @param thread_count The number of worker threads (1 or more) that the server shall create
+ */
+void nx_server_initialize(int thread_count);
+
+
+/**
+ * \brief Finalize the server after it has stopped running.
+ */
+void nx_server_finalize(void);
+
+
+/**
+ * \brief Set the optional thread-start handler.
+ *
+ * This handler is called once on each worker thread at the time
+ * the thread is started. This may be used to set tuning settings like processor affinity, etc.
+ *
+ * @param start_handler The thread-start handler invoked per thread on thread startup.
+ * @param context Opaque context to be passed back in the callback function.
+ */
+void nx_server_set_start_handler(nx_thread_start_cb_t start_handler, void *context);
+
+
+/**
+ * \brief Run the server threads until completion.
+ *
+ * Start the operation of the server, including launching all of the worker threads.
+ * This function does not return until after the server has been stopped. The thread
+ * that calls nx_server_run is used as one of the worker threads.
+ */
+void nx_server_run(void);
+
+
+/**
+ * \brief Stop the server
+ *
+ * Stop the server and join all of its worker threads. This function may be called from any
+ * thread. When this function returns, all of the other server threads have been closed and
+ * joined. The calling thread will be the only running thread in the process.
+ */
+void nx_server_stop(void);
+
+
+/**
+ * \brief Pause (quiesce) the server.
+ *
+ * This call blocks until all of the worker threads (except
+ * the one calling the this function) are finished processing and have been blocked. When
+ * this call returns, the calling thread is the only thread running in the process.
+ */
+void nx_server_pause(void);
+
+
+/**
+ * \brief Resume normal operation of a paused server.
+ *
+ * This call unblocks all of the worker threads
+ * so they can resume normal connection processing.
+ */
+void nx_server_resume(void);
+
+
+/**
+ * @}
+ * \defgroup Signal Server Signal Handling Functions
+ * @{
+ */
+
+
+/**
+ * \brief Signal Handler
+ *
+ * Callback for caught signals. This handler will only be invoked for signal numbers
+ * that were registered via nx_server_signal. The handler is not invoked in the context
+ * of the OS signal handler. Rather, it is invoked on one of the worker threads in an
+ * orderly sequence.
+ *
+ * @param context The handler context supplied in nx_server_initialize.
+ * @param signum The signal number that was raised.
+ */
+typedef void (*nx_signal_handler_cb_t)(void* context, int signum);
+
+
+/**
+ * Set the signal handler for the server. The signal handler is invoked cleanly on a worker thread
+ * after the server process catches an operating-system signal. The signal handler is optional and
+ * need not be set.
+ *
+ * @param signal_handler The signal handler called when a registered signal is caught.
+ * @param context Opaque context to be passed back in the callback function.
+ */
+void nx_server_set_signal_handler(nx_signal_handler_cb_t signal_handler, void *context);
+
+
+/**
+ * \brief Register a signal to be caught and handled by the signal handler.
+ *
+ * @param signum The signal number of a signal to be handled by the application.
+ */
+void nx_server_signal(int signum);
+
+
+/**
+ * @}
+ * \defgroup Connection Server AMQP Connection Handling Functions
+ * @{
+ */
+
+/**
+ * \brief Listener objects represent the desire to accept incoming transport connections.
+ */
+typedef struct nx_listener_t nx_listener_t;
+
+/**
+ * \brief Connector objects represent the desire to create and maintain an outgoing transport connection.
+ */
+typedef struct nx_connector_t nx_connector_t;
+
+/**
+ * \brief Connection objects wrap Proton connection objects.
+ */
+typedef struct nx_connection_t nx_connection_t;
+
+/**
+ * Event type for the connection callback.
+ */
+typedef enum {
+ /// The connection just opened via a listener (inbound).
+ NX_CONN_EVENT_LISTENER_OPEN,
+
+ /// The connection just opened via a connector (outbound).
+ NX_CONN_EVENT_CONNECTOR_OPEN,
+
+ /// The connection was closed at the transport level (not cleanly).
+ NX_CONN_EVENT_CLOSE,
+
+ /// The connection requires processing.
+ NX_CONN_EVENT_PROCESS
+} nx_conn_event_t;
+
+
+/**
+ * \brief Connection Event Handler
+ *
+ * Callback invoked when processing is needed on a proton connection. This callback
+ * shall be invoked on one of the server's worker threads. The server guarantees that
+ * no two threads shall be allowed to process a single connection concurrently.
+ * The implementation of this handler may assume that it has exclusive access to the
+ * connection and its subservient components (sessions, links, deliveries, etc.).
+ *
+ * @param context The handler context supplied in nx_server_{connect,listen}.
+ * @param event The event/reason for the invocation of the handler.
+ * @param conn The connection that requires processing by the handler.
+ * @return A value greater than zero if the handler did any proton processing for
+ * the connection. If no work was done, zero is returned.
+ */
+typedef int (*nx_conn_handler_cb_t)(void* context, nx_conn_event_t event, nx_connection_t *conn);
+
+
+/**
+ * \brief Set the connection event handler callback.
+ *
+ * Set the connection handler callback for the server. This callback is mandatory and must be set
+ * prior to the invocation of nx_server_run.
+ *
+ * @param conn_hander The handler for processing connection-related events.
+ */
+void nx_server_set_conn_handler(nx_conn_handler_cb_t conn_handler);
+
+
+/**
+ * \brief Set the user context for a connection.
+ *
+ * @param conn Connection object supplied in NX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @param context User context to be stored with the connection.
+ */
+void nx_connection_set_context(nx_connection_t *conn, void *context);
+
+
+/**
+ * \brief Get the user context from a connection.
+ *
+ * @param conn Connection object supplied in NX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The user context stored with the connection.
+ */
+void *nx_connection_get_context(nx_connection_t *conn);
+
+
+/**
+ * \brief Activate a connection for output.
+ *
+ * This function is used to request that the server activate the indicated connection.
+ * It is assumed that the connection is one that the caller does not have permission to
+ * access (i.e. it may be owned by another thread currently). An activated connection
+ * will, when writable, appear in the internal work list and be invoked for processing
+ * by a worker thread.
+ *
+ * @param conn The connection over which the application wishes to send data
+ */
+void nx_server_activate(nx_connection_t *conn);
+
+
+/**
+ * \brief Get the wrapped proton-engine connection object.
+ *
+ * @param conn Connection object supplied in NX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The proton connection object.
+ */
+pn_connection_t *nx_connection_pn(nx_connection_t *conn);
+
+
+/**
+ * \brief Configuration block for a connector or a listener.
+ */
+typedef struct nx_server_config_t {
+ /**
+ * Host name or network address to bind to a listener or use in the connector.
+ */
+ char *host;
+
+ /**
+ * Port name or number to bind to a listener or use in the connector.
+ */
+ char *port;
+
+ /**
+ * Space-separated list of SASL mechanisms to be accepted for the connection.
+ */
+ char *sasl_mechanisms;
+
+ /**
+ * If appropriate for the mechanism, the username for authentication
+ * (connector only)
+ */
+ char *sasl_username;
+
+ /**
+ * If appropriate for the mechanism, the password for authentication
+ * (connector only)
+ */
+ char *sasl_password;
+
+ /**
+ * If appropriate for the mechanism, the minimum acceptable security strength factor
+ */
+ int sasl_minssf;
+
+ /**
+ * If appropriate for the mechanism, the maximum acceptable security strength factor
+ */
+ int sasl_maxssf;
+
+ /**
+ * SSL is enabled for this connection iff non-zero.
+ */
+ int ssl_enabled;
+
+ /**
+ * Connection will take on the role of SSL server iff non-zero.
+ */
+ int ssl_server;
+
+ /**
+ * Iff non-zero AND ssl_enabled is non-zero, this listener will detect the client's use
+ * of SSL or non-SSL and conform to the client's protocol.
+ * (listener only)
+ */
+ int ssl_allow_unsecured_client;
+
+ /**
+ * Path to the file containing the PEM-formatted public certificate for the local end
+ * of the connection.
+ */
+ char *ssl_certificate_file;
+
+ /**
+ * Path to the file containing the PEM-formatted private key for the local end of the
+ * connection.
+ */
+ char *ssl_private_key_file;
+
+ /**
+ * The password used to sign the private key, or NULL if the key is not protected.
+ */
+ char *ssl_password;
+
+ /**
+ * Path to the file containing the PEM-formatted set of certificates of trusted CAs.
+ */
+ char *ssl_trusted_certificate_db;
+
+ /**
+ * Iff non-zero, require that the peer's certificate be supplied and that it be authentic
+ * according to the set of trusted CAs.
+ */
+ int ssl_require_peer_authentication;
+
+ /**
+ * Allow the connection to be redirected by the peer (via CLOSE->Redirect). This is
+ * meaningful for outgoing (connector) connections only.
+ */
+ int allow_redirect;
+} nx_server_config_t;
+
+
+/**
+ * \brief Create a listener for incoming connections.
+ *
+ * @param config Pointer to a configuration block for this listener. This block will be
+ * referenced by the server, not copied. The referenced record must remain
+ * in-scope for the life of the listener.
+ * @param context User context passed back in the connection handler.
+ * @return A pointer to the new listener, or NULL in case of failure.
+ */
+nx_listener_t *nx_server_listen(const nx_server_config_t *config, void *context);
+
+
+/**
+ * \brief Free the resources associated with a listener.
+ *
+ * @param li A listener pointer returned by nx_listen.
+ */
+void nx_listener_free(nx_listener_t* li);
+
+
+/**
+ * \brief Close a listener so it will accept no more connections.
+ *
+ * @param li A listener pointer returned by nx_listen.
+ */
+void nx_listener_close(nx_listener_t* li);
+
+
+/**
+ * \brief Create a connector for an outgoing connection.
+ *
+ * @param config Pointer to a configuration block for this connector. This block will be
+ * referenced by the server, not copied. The referenced record must remain
+ * in-scope for the life of the connector..
+ * @param context User context passed back in the connection handler.
+ * @return A pointer to the new connector, or NULL in case of failure.
+ */
+nx_connector_t *nx_server_connect(const nx_server_config_t *config, void *context);
+
+
+/**
+ * \brief Free the resources associated with a connector.
+ *
+ * @param ct A connector pointer returned by nx_connect.
+ */
+void nx_connector_free(nx_connector_t* ct);
+
+/**
+ * @}
+ */
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/threading.h b/qpid/extras/nexus/include/qpid/nexus/threading.h
new file mode 100644
index 0000000000..f275fc0086
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/threading.h
@@ -0,0 +1,45 @@
+#ifndef __sys_threading_h__
+#define __sys_threading_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct sys_mutex_t sys_mutex_t;
+
+sys_mutex_t *sys_mutex(void);
+void sys_mutex_free(sys_mutex_t *mutex);
+void sys_mutex_lock(sys_mutex_t *mutex);
+void sys_mutex_unlock(sys_mutex_t *mutex);
+
+
+typedef struct sys_cond_t sys_cond_t;
+
+sys_cond_t *sys_cond(void);
+void sys_cond_free(sys_cond_t *cond);
+void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex);
+void sys_cond_signal(sys_cond_t *cond);
+void sys_cond_signal_all(sys_cond_t *cond);
+
+
+typedef struct sys_thread_t sys_thread_t;
+
+sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg);
+void sys_thread_free(sys_thread_t *thread);
+void sys_thread_join(sys_thread_t *thread);
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/timer.h b/qpid/extras/nexus/include/qpid/nexus/timer.h
new file mode 100644
index 0000000000..5444989296
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/timer.h
@@ -0,0 +1,86 @@
+#ifndef __nexus_timer_h__
+#define __nexus_timer_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \defgroup Timer Server Timer Functions
+ * @{
+ */
+
+typedef struct nx_timer_t nx_timer_t;
+
+/**
+ * Timer Callback
+ *
+ * Callback invoked after a timer's interval expires and the timer fires.
+ *
+ * @param context The context supplied in nx_timer
+ */
+typedef void (*nx_timer_cb_t)(void* context);
+
+
+/**
+ * Create a new timer object.
+ *
+ * @param cb The callback function to be invoked when the timer expires.
+ * @param context An opaque, user-supplied context to be passed into the callback.
+ * @return A pointer to the new timer object or NULL if memory is exhausted.
+ */
+nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context);
+
+
+/**
+ * Free the resources for a timer object. If the timer was scheduled, it will be canceled
+ * prior to freeing. After this function returns, the callback will not be invoked for this
+ * timer.
+ *
+ * @param timer Pointer to the timer object returned by nx_timer.
+ */
+void nx_timer_free(nx_timer_t *timer);
+
+
+/**
+ * Schedule a timer to fire in the future.
+ *
+ * Note that the timer callback will never be invoked synchronously during the execution
+ * of nx_timer_schedule. Even if the interval is immediate (0), the callback invocation will
+ * be asynchronous and after the return of this function.
+ *
+ * @param timer Pointer to the timer object returned by nx_timer.
+ * @param msec The minimum number of milliseconds of delay until the timer fires.
+ * If 0 is supplied, the timer will fire immediately.
+ */
+void nx_timer_schedule(nx_timer_t *timer, long msec);
+
+
+/**
+ * Attempt to cancel a scheduled timer. Since the timer callback can be invoked on any
+ * server thread, it is always possible that a last-second cancel attempt may arrive too late
+ * to stop the timer from firing (i.e. the cancel is concurrent with the fire callback).
+ *
+ * @param timer Pointer to the timer object returned by nx_timer.
+ */
+void nx_timer_cancel(nx_timer_t *timer);
+
+/**
+ * @}
+ */
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/user_fd.h b/qpid/extras/nexus/include/qpid/nexus/user_fd.h
new file mode 100644
index 0000000000..2f139c2c4f
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/user_fd.h
@@ -0,0 +1,121 @@
+#ifndef __nexus_user_fd_h__
+#define __nexus_user_fd_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * \defgroup UserFd Server User-File-Descriptor Functions
+ * @{
+ */
+
+typedef struct nx_user_fd_t nx_user_fd_t;
+
+
+/**
+ * User_fd Handler
+ *
+ * Callback invoked when a user-managed file descriptor is available for reading or writing or there
+ * was an error on the file descriptor.
+ *
+ * @param context The handler context supplied in the nx_user_fd call.
+ * @param ufd The user_fd handle for the processable fd.
+ */
+typedef void (*nx_user_fd_handler_cb_t)(void* context, nx_user_fd_t *ufd);
+
+
+/**
+ * Set the user-fd handler callback for the server. This handler is optional, but must be supplied
+ * if the nx_server is used to manage the activation of user file descriptors.
+ */
+void nx_server_set_user_fd_handler(nx_user_fd_handler_cb_t ufd_handler);
+
+
+/**
+ * Create a tracker for a user-managed file descriptor.
+ *
+ * A user-fd is appropriate for use when the application opens and manages file descriptors
+ * for purposes other than AMQP communication. Registering a user fd with the nexus server
+ * controls processing of the FD alongside the FDs used for messaging.
+ *
+ * @param fd The open file descriptor being managed by the application.
+ * @param context User context passed back in the connection handler.
+ * @return A pointer to the new user_fd.
+ */
+nx_user_fd_t *nx_user_fd(int fd, void *context);
+
+
+/**
+ * Free the resources for a user-managed FD tracker.
+ *
+ * @param ufd Structure pointer returned by nx_user_fd.
+ */
+void nx_user_fd_free(nx_user_fd_t *ufd);
+
+
+/**
+ * Activate a user-fd for read.
+ *
+ * Use this activation when the application has capacity to receive data from the user-fd. This will
+ * cause the callback set in nx_server_set_user_fd_handler to later be invoked when the
+ * file descriptor has data to read.
+ *
+ * @param ufd Structure pointer returned by nx_user_fd.
+ */
+void nx_user_fd_activate_read(nx_user_fd_t *ufd);
+
+
+/**
+ * Activate a user-fd for write.
+ *
+ * Use this activation when the application has data to write via the user-fd. This will
+ * cause the callback set in nx_server_set_user_fd_handler to later be invoked when the
+ * file descriptor is writable.
+ *
+ * @param ufd Structure pointer returned by nx_user_fd.
+ */
+void nx_user_fd_activate_write(nx_user_fd_t *ufd);
+
+
+/**
+ * Check readable status of a user-fd
+ *
+ * Note: It is possible that readable status is spurious (i.e. this function returns true
+ * but the file-descriptor is not readable and will block if not set to O_NONBLOCK).
+ * Code accordingly.
+ *
+ * @param ufd Structure pointer returned by nx_user_fd.
+ * @return true iff the user file descriptor is readable.
+ */
+bool nx_user_fd_is_readable(nx_user_fd_t *ufd);
+
+
+/**
+ * Check writable status of a user-fd
+ *
+ * @param ufd Structure pointer returned by nx_user_fd.
+ * @return true iff the user file descriptor is writable.
+ */
+bool nx_user_fd_is_writeable(nx_user_fd_t *ufd);
+
+/**
+ * @}
+ */
+
+#endif
diff --git a/qpid/extras/nexus/site/css/style.css b/qpid/extras/nexus/site/css/style.css
new file mode 100644
index 0000000000..b73c136d4a
--- /dev/null
+++ b/qpid/extras/nexus/site/css/style.css
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ul {
+ list-style-type:square;
+}
+
+th {
+ text-align: left;
+ font-weight: bold;
+}
+
+body {
+ margin:0;
+ background:#FFFFFF;
+ font-family:"Verdana", sans-serif;
+}
+
+.container {
+ width:950px;
+ margin:0 auto;
+}
+
+.header {
+ height:100px;
+ width:950px;
+ background:url(images/header.png)
+}
+
+.logo {
+ text-align:center;
+ font-weight:600;
+ padding:0 0 0 0;
+ font-size:14px;
+ font-family:"Verdana", cursive;
+}
+
+.logo a {
+ color:#000000;
+ text-decoration:none;
+}
+
+.main_text_area {
+ margin-left:200px;
+}
+
+.main_text_area_top {
+ height:14px;
+ font-size:1px;
+}
+
+.main_text_area_bottom {
+ display:none;
+/* height:14px;
+ margin-bottom:4px;*/
+}
+
+.main_text_area_body {
+ padding:5px 24px;
+}
+
+.main_text_area_body p {
+ text-align:justify;
+}
+
+.main_text_area br {
+ line-height:10px;
+}
+
+.main_text_area h1 {
+ font-size:28px;
+ font-weight:600;
+ margin:0 0 24px 0;
+ color:#0c3b82;
+ font-family:"Verdana", Times, serif;
+}
+
+.main_text_area h2 {
+ font-size:24px;
+ font-weight:600;
+ margin:24px 0 8px 0;
+ color:#0c3b82;
+ font-family:"Verdana",Times, serif;
+}
+
+.main_text_area ol, .main_text_area ul {
+ padding:0;
+ margin:10px 0;
+ margin-left:20px;
+}
+
+.main_text_area li {
+/* margin-left:40px; */
+}
+
+.main_text_area, .menu_box {
+ font-size:13px;
+ line-height:17px;
+ color:#000000;
+}
+
+.main_text_area {
+ font-size:15px;
+}
+
+.main_text_area a {
+ color:#000000;
+}
+
+.main_text_area a:hover {
+ color:#000000;
+}
+
+.menu_box {
+ width:196px;
+ float:left;
+ margin-left:4px;
+}
+
+.menu_box_top {
+ background:url(images/menu_top.png) no-repeat;
+ height:14px;
+ font-size:1px;
+}
+
+.menu_box_body {
+ background:url(images/menu_body.png) repeat-y;
+ padding:5px 24px 5px 24px;
+}
+
+.menu_box_bottom {
+ background:url(images/menu_bottom.png) no-repeat;
+ height:14px;
+ font-size:1px;
+ margin-bottom:1px;
+}
+
+.menu_box h3 {
+ font-size:20px;
+ font-weight:500;
+ margin:0 0 8px 0;
+ color:#0c3b82;
+ font-family:"Verdana",Times, serif;
+}
+
+.menu_box ul {
+ margin:12px;
+ padding:0px;
+}
+
+.menu_box li {
+ list-style:square;
+}
+
+.menu_box a {
+ color:#000000;
+ text-decoration:none;
+}
+
+.menu_box a:hover {
+ color:#000000;
+ text-decoration:underline;
+}
+
+.feature_box {
+ width:698px;
+ overflow:hidden;
+}
+
+.feature_box h3 {
+ font-size:18px;
+ font-weight:600;
+ margin:0 0 8px 0;
+ color:#0c3b82;
+ font-family:"Verdana", Times, serif;
+}
+
+.feature_box_column1 {
+ width:196px;
+ float:left;
+ padding:10px 15px 10px 15px;
+ margin-left:0px;
+}
+
+.feature_box_column2 {
+ width:196px;
+ float:left;
+ padding:10px 15px 10px 15px;
+ margin-left:0px;
+}
+
+.feature_box_column3 {
+ width:196px;
+ float:left;
+ padding:10px 15px 10px 15px;
+ margin-left:0px;
+}
+
+
+.feature_box ul {
+ margin:.8em .4em;
+ padding-left:1.2em;
+ padding:0;
+ list-style-type: square;
+}
+
+.feature_box ul li {
+ font-family:"Verdana",sans-serif;
+ font-size:14px;
+ color:#000;
+ margin:.4em 0;
+}
+
+.feature_box ul li ul {
+ padding-left:1.2em;
+ margin-left:2em;
+}
+
+.feature_box a {
+ color:#000000;
+ text-decoration:none;
+}
+
+.feature_box a:hover {
+ color:#000000;
+ text-decoration:underline;
+}
+
+.footer {
+ color:#000000;
+ clear:both;
+ text-align:center;
+ font-size:11px;
+ line-height:17px;
+ height:45px;
+ padding-top:18px;
+}
+
+.footer a {
+ color:#000000;
+}
+
+.footer a:hover {
+ color:#000000;
+}
+
+.download_table {
+ width:100%;
+}
+
+.download_table_col_1 {
+ width:240px;
+}
+
+.proton_download_table_col_1 {
+ width:420px;
+}
+
+.download_table_amqp_col {
+ text-align:center;
+ width:80px;
+}
+
diff --git a/qpid/extras/nexus/site/images/gwarch.dia b/qpid/extras/nexus/site/images/gwarch.dia
new file mode 100644
index 0000000000..fd7eef97a4
--- /dev/null
+++ b/qpid/extras/nexus/site/images/gwarch.dia
Binary files differ
diff --git a/qpid/extras/nexus/site/images/gwarch.png b/qpid/extras/nexus/site/images/gwarch.png
new file mode 100644
index 0000000000..923baadf9f
--- /dev/null
+++ b/qpid/extras/nexus/site/images/gwarch.png
Binary files differ
diff --git a/qpid/extras/nexus/site/includes/footer.include b/qpid/extras/nexus/site/includes/footer.include
new file mode 100644
index 0000000000..35ff04b9f2
--- /dev/null
+++ b/qpid/extras/nexus/site/includes/footer.include
@@ -0,0 +1,7 @@
+ <div class="footer">
+ <p>
+ &#xA9; 2004-2012 The Apache Software Foundation.<br />
+ Apache Qpid, Qpid, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation.<br />
+ All other marks mentioned may be trademarks or registered trademarks of their respective owners.
+ </p>
+ </div>
diff --git a/qpid/extras/nexus/site/includes/header.include b/qpid/extras/nexus/site/includes/header.include
new file mode 100644
index 0000000000..244dfc4517
--- /dev/null
+++ b/qpid/extras/nexus/site/includes/header.include
@@ -0,0 +1,6 @@
+ <div class="header">
+ <div class="logo">
+ <h1>Apache Qpid&#8482;</h1>
+ <h2>Open Source AMQP Messaging</h2>
+ </div>
+ </div>
diff --git a/qpid/extras/nexus/site/includes/menu.include b/qpid/extras/nexus/site/includes/menu.include
new file mode 100644
index 0000000000..aa96000e94
--- /dev/null
+++ b/qpid/extras/nexus/site/includes/menu.include
@@ -0,0 +1,71 @@
+ <div class="menu_box">
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Apache Qpid</h3>
+ <ul>
+ <li><a href="index.html">Home</a></li>
+ <li><a href="download.html">Download</a></li>
+ <li><a href="getting_started.html">Getting Started</a></li>
+ <li><a href="http://www.apache.org/licenses/">License</a></li>
+ <li><a href="https://cwiki.apache.org/qpid/faq.html">FAQ</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Documentation</h3>
+ <ul>
+ <li><a href="documentation.html#doc-release">Latest Release</a></li>
+ <li><a href="documentation.html#doc-trunk">Trunk</a></li>
+ <li><a href="documentation.html#doc-archives">Archive</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Community</h3>
+ <ul>
+ <li><a href="getting_involved.html">Getting Involved</a></li>
+ <li><a href="source_repository.html">Source Repository</a></li>
+ <li><a href="mailing_lists.html">Mailing Lists</a></li>
+ <li><a href="https://cwiki.apache.org/qpid/">Wiki</a></li>
+ <li><a href="https://issues.apache.org/jira/browse/qpid">Issue Reporting</a></li>
+ <li><a href="people.html">People</a></li>
+ <li><a href="acknowledgements.html">Acknowledgements</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Developers</h3>
+ <ul>
+ <li><a href="https://cwiki.apache.org/qpid/building.html">Building Qpid</a></li>
+ <li><a href="https://cwiki.apache.org/qpid/developer-pages.html">Developer Pages</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>About AMQP</h3>
+ <ul>
+ <li><a href="amqp.html">What is AMQP?</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>About Apache</h3>
+ <ul>
+ <li><a href="http://www.apache.org">Home</a></li>
+ <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li>
+ <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li>
+ <li><a href="http://www.apache.org/security/">Security</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+ </div>
diff --git a/qpid/extras/nexus/site/index.html b/qpid/extras/nexus/site/index.html
new file mode 100755
index 0000000000..806965a9c1
--- /dev/null
+++ b/qpid/extras/nexus/site/index.html
@@ -0,0 +1,98 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+-->
+<html xmlns="http://www.w3.org/1999/xhtml">
+ <head>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+ <title>Apache Qpid Nexus&#8482;: A Platform for Building AMQP Infrastructure</title>
+ <link href="css/style.css" rel="stylesheet" type="text/css"/>
+ </head>
+
+ <body>
+ <div class="container">
+ <!-- begin header -->
+
+ <div class="header">
+ <div class="logo">
+ <h1>Apache Qpid Nexus&#8482;</h1>
+ <h2>A Platform for Building AMQP Infrastructure</h2>
+ </div>
+ </div>
+
+ <!-- end header -->
+
+ <!-- begin menu -->
+ <!--#include virtual="/includes/menu.include" -->
+ <!-- end menu -->
+
+ <!-- begin content -->
+ <div class="main_text_area">
+ <div class="main_text_area_top"></div>
+
+ <div class="main_text_area_body">
+
+<p>Qpid Nexus is a library to help developers build infrastructure
+components for AMQP. Nexus is not a general-purpose Messaging API.
+Rather, it is a foundation on which to build applications, services, and
+appliances that need direct access to the detailed constructs of AMQP.</p>
+<hr width="80%" />
+<h2>Overview</h2>
+<p>Nexus is an extension of the Engine and Driver interfaces of
+<a href="http://qpid.apache.org/proton">Qpid Proton</a>. The following
+features are provided:</p>
+
+<ul>
+ <li>Safe multi-threaded use of Proton</li>
+ <li>Operating System Signal handling</li>
+ <li>Quiesce and Resume for the application's threads</li>
+ <li>Timers</li>
+ <li>Resilient outbound connections (retry/reconnect)</li>
+ <li>Polling support for the application's non-AMQP file descriptors</li>
+ <li>An AMQP Node Container that allows the developer to create node types</li>
+ <li>Node instances can be statically or dynamically provisioned</li>
+</ul>
+<p />
+<hr width="80%" />
+<h2>Architecture</h2>
+<center><img src="images/gwarch.png" /></center>
+<ul>
+ <li><b>Proton Engine and Driver</b> provide the underlying AMQP capability</li>
+ <li><a href="doxygen/server/modules.html">Nexus Server</a>
+ wraps Proton connections in a multi-threaded server environment</li>
+ <li><b>Nexus Container</b> provides management of AMQP nodes (links, termini, and deliveries)</li>
+ <li><b>Nexus Message</b> provides efficient message encode/decode, optimized for messaging intermediaries</li>
+ <li>The <b>Application</b> uses all of the above services to implement scalable and performant AMQP infrastructure</li>
+</ul>
+<hr width="80%" />
+
+ </div>
+
+ <div class="main_text_area_bottom"></div>
+ </div>
+ <!-- end content -->
+
+ <!-- begin footer -->
+ <!--#include virtual="/includes/footer.include" -->
+ <!-- end footer -->
+
+ </div>
+ </body>
+</html>
diff --git a/qpid/extras/nexus/src/alloc.c b/qpid/extras/nexus/src/alloc.c
new file mode 100644
index 0000000000..397a7897ac
--- /dev/null
+++ b/qpid/extras/nexus/src/alloc.c
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/alloc.h>
+#include <qpid/nexus/ctools.h>
+#include <memory.h>
+#include <stdio.h>
+
+typedef struct item_t item_t;
+
+struct item_t {
+ DEQ_LINKS(item_t);
+ nx_alloc_type_desc_t *desc;
+};
+
+DEQ_DECLARE(item_t, item_list_t);
+
+struct nx_alloc_pool_t {
+ item_list_t free_list;
+};
+
+nx_alloc_config_t nx_alloc_default_config_big = {16, 32, 0};
+nx_alloc_config_t nx_alloc_default_config_small = {64, 128, 0};
+
+sys_mutex_t *init_lock;
+item_list_t type_list;
+
+static void nx_alloc_init(nx_alloc_type_desc_t *desc)
+{
+ sys_mutex_lock(init_lock);
+
+ if (!desc->global_pool) {
+ if (desc->config == 0)
+ desc->config = desc->type_size > 256 ?
+ &nx_alloc_default_config_big : &nx_alloc_default_config_small;
+
+ assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size);
+
+ desc->global_pool = NEW(nx_alloc_pool_t);
+ DEQ_INIT(desc->global_pool->free_list);
+ desc->lock = sys_mutex();
+ desc->stats = NEW(nx_alloc_stats_t);
+ memset(desc->stats, 0, sizeof(nx_alloc_stats_t));
+ }
+
+ item_t *type_item = NEW(item_t);
+ DEQ_ITEM_INIT(type_item);
+ type_item->desc = desc;
+ DEQ_INSERT_TAIL(type_list, type_item);
+
+ sys_mutex_unlock(init_lock);
+}
+
+
+void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool)
+{
+ int idx;
+
+ //
+ // If the descriptor is not initialized, set it up now.
+ //
+ if (!desc->global_pool)
+ nx_alloc_init(desc);
+
+ //
+ // If this is the thread's first pass through here, allocate the
+ // thread-local pool for this type.
+ //
+ if (*tpool == 0) {
+ *tpool = NEW(nx_alloc_pool_t);
+ DEQ_INIT((*tpool)->free_list);
+ }
+
+ nx_alloc_pool_t *pool = *tpool;
+
+ //
+ // Fast case: If there's an item on the local free list, take it off the
+ // list and return it. Since everything we've touched is thread-local,
+ // there is no need to acquire a lock.
+ //
+ item_t *item = DEQ_HEAD(pool->free_list);
+ if (item) {
+ DEQ_REMOVE_HEAD(pool->free_list);
+ return &item[1];
+ }
+
+ //
+ // The local free list is empty, we need to either rebalance a batch
+ // of items from the global list or go to the heap to get new memory.
+ //
+ sys_mutex_lock(desc->lock);
+ if (DEQ_SIZE(desc->global_pool->free_list) >= desc->config->transfer_batch_size) {
+ //
+ // Rebalance a full batch from the global free list to the thread list.
+ //
+ desc->stats->batches_rebalanced_to_threads++;
+ desc->stats->held_by_threads += desc->config->transfer_batch_size;
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ DEQ_INSERT_TAIL(pool->free_list, item);
+ }
+ } else {
+ //
+ // Allocate a full batch from the heap and put it on the thread list.
+ //
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = (item_t*) malloc(sizeof(item_t) + desc->type_size);
+ if (item == 0)
+ break;
+ DEQ_ITEM_INIT(item);
+ item->desc = desc;
+ DEQ_INSERT_TAIL(pool->free_list, item);
+ desc->stats->held_by_threads++;
+ desc->stats->total_alloc_from_heap++;
+ }
+ }
+ sys_mutex_unlock(desc->lock);
+
+ item = DEQ_HEAD(pool->free_list);
+ if (item) {
+ DEQ_REMOVE_HEAD(pool->free_list);
+ return &item[1];
+ }
+
+ return 0;
+}
+
+
+void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p)
+{
+ item_t *item = ((item_t*) p) - 1;
+ int idx;
+
+ //
+ // If this is the thread's first pass through here, allocate the
+ // thread-local pool for this type.
+ //
+ if (*tpool == 0) {
+ *tpool = NEW(nx_alloc_pool_t);
+ DEQ_INIT((*tpool)->free_list);
+ }
+
+ nx_alloc_pool_t *pool = *tpool;
+
+ DEQ_INSERT_TAIL(pool->free_list, item);
+
+ if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max)
+ return;
+
+ //
+ // We've exceeded the maximum size of the local free list. A batch must be
+ // rebalanced back to the global list.
+ //
+ sys_mutex_lock(desc->lock);
+ desc->stats->batches_rebalanced_to_global++;
+ desc->stats->held_by_threads -= desc->config->transfer_batch_size;
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = DEQ_HEAD(pool->free_list);
+ DEQ_REMOVE_HEAD(pool->free_list);
+ DEQ_INSERT_TAIL(desc->global_pool->free_list, item);
+ }
+
+ //
+ // If there's a global_free_list size limit, remove items until the limit is
+ // not exceeded.
+ //
+ if (desc->config->global_free_list_max != 0) {
+ while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) {
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ free(item);
+ desc->stats->total_free_to_heap++;
+ }
+ }
+
+ sys_mutex_unlock(desc->lock);
+}
+
+
+void nx_alloc_initialize(void)
+{
+ init_lock = sys_mutex();
+ DEQ_INIT(type_list);
+}
+
diff --git a/qpid/extras/nexus/src/alloc_private.h b/qpid/extras/nexus/src/alloc_private.h
new file mode 100644
index 0000000000..00a4380bff
--- /dev/null
+++ b/qpid/extras/nexus/src/alloc_private.h
@@ -0,0 +1,26 @@
+#ifndef __nexus_alloc_private_h__
+#define __nexus_alloc_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/alloc.h>
+
+void nx_alloc_initialize(void);
+
+#endif
diff --git a/qpid/extras/nexus/src/auth.c b/qpid/extras/nexus/src/auth.c
new file mode 100644
index 0000000000..f33e907359
--- /dev/null
+++ b/qpid/extras/nexus/src/auth.c
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include "auth.h"
+#include "server_private.h"
+#include <proton/sasl.h>
+
+
+void auth_client_handler(pn_connector_t *cxtr)
+{
+ pn_sasl_t *sasl = pn_connector_sasl(cxtr);
+ pn_sasl_state_t state = pn_sasl_state(sasl);
+ nx_connection_t *ctx = (nx_connection_t*) pn_connector_context(cxtr);
+
+ if (state == PN_SASL_CONF) {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_client(sasl);
+ }
+
+ state = pn_sasl_state(sasl);
+
+ if (state == PN_SASL_PASS) {
+ ctx->state = CONN_STATE_OPENING;
+ } else if (state == PN_SASL_FAIL) {
+ ctx->state = CONN_STATE_FAILED;
+ }
+}
+
+
+void auth_server_handler(pn_connector_t *cxtr)
+{
+ pn_sasl_t *sasl = pn_connector_sasl(cxtr);
+ pn_sasl_state_t state = pn_sasl_state(sasl);
+ nx_connection_t *ctx = (nx_connection_t*) pn_connector_context(cxtr);
+
+ while (state == PN_SASL_CONF || state == PN_SASL_STEP) {
+ if (state == PN_SASL_CONF) {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_server(sasl);
+ } else if (state == PN_SASL_STEP) {
+ const char* mechanisms = pn_sasl_remote_mechanisms(sasl);
+ if (strcmp(mechanisms, "ANONYMOUS") == 0)
+ pn_sasl_done(sasl, PN_SASL_OK);
+ else
+ pn_sasl_done(sasl, PN_SASL_AUTH);
+ }
+ state = pn_sasl_state(sasl);
+ }
+
+ if (state == PN_SASL_PASS) {
+ ctx->state = CONN_STATE_OPENING;
+ } else if (state == PN_SASL_FAIL) {
+ ctx->state = CONN_STATE_FAILED;
+ }
+}
+
+
diff --git a/qpid/extras/nexus/src/auth.h b/qpid/extras/nexus/src/auth.h
new file mode 100644
index 0000000000..c551c8ff76
--- /dev/null
+++ b/qpid/extras/nexus/src/auth.h
@@ -0,0 +1,27 @@
+#ifndef __auth_h__
+#define __auth_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/driver.h>
+
+void auth_client_handler(pn_connector_t *conn);
+void auth_server_handler(pn_connector_t *conn);
+
+#endif
diff --git a/qpid/extras/nexus/src/container.c b/qpid/extras/nexus/src/container.c
new file mode 100644
index 0000000000..3d57a8f21d
--- /dev/null
+++ b/qpid/extras/nexus/src/container.c
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <qpid/nexus/container.h>
+#include <qpid/nexus/message.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/hash.h>
+#include <qpid/nexus/threading.h>
+#include <qpid/nexus/iterator.h>
+#include <qpid/nexus/log.h>
+
+static char *module="CONTAINER";
+
+struct nx_node_t {
+ const nx_node_type_t *ntype;
+ char *name;
+ void *context;
+ nx_dist_mode_t supported_dist;
+ nx_lifetime_policy_t life_policy;
+};
+
+ALLOC_DECLARE(nx_node_t);
+ALLOC_DEFINE(nx_node_t);
+ALLOC_DEFINE(nx_link_item_t);
+
+struct nx_link_t {
+ pn_link_t *pn_link;
+ void *context;
+ nx_node_t *node;
+};
+
+ALLOC_DECLARE(nx_link_t);
+ALLOC_DEFINE(nx_link_t);
+
+typedef struct nxc_node_type_t {
+ DEQ_LINKS(struct nxc_node_type_t);
+ const nx_node_type_t *ntype;
+} nxc_node_type_t;
+DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t);
+
+
+static hash_t *node_type_map;
+static hash_t *node_map;
+static sys_mutex_t *lock;
+static nx_node_t *default_node;
+static nxc_node_type_list_t node_type_list;
+
+static void setup_outgoing_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ nx_node_t *node;
+ int result;
+ const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
+ nx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured source
+
+ if (source) {
+ iter = nx_field_iterator_string(source, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ nx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ nx_link_t *link = new_nx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->outgoing_handler(node->context, link);
+}
+
+
+static void setup_incoming_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ nx_node_t *node;
+ int result;
+ const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ nx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured target
+
+ if (target) {
+ iter = nx_field_iterator_string(target, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ nx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ nx_link_t *link = new_nx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->incoming_handler(node->context, link);
+}
+
+
+static int do_writable(pn_link_t *pn_link)
+{
+ nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link);
+ if (!link)
+ return 0;
+
+ nx_node_t *node = link->node;
+ if (!node)
+ return 0;
+
+ return node->ntype->writable_handler(node->context, link);
+}
+
+
+static void process_receive(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ nx_node_t *node = link->node;
+ if (node) {
+ node->ntype->rx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ //
+ // Reject the delivery if we couldn't find a node to handle it
+ //
+ pn_link_advance(pn_link);
+ pn_link_flow(pn_link, 1);
+ pn_delivery_update(delivery, PN_REJECTED);
+ pn_delivery_settle(delivery);
+}
+
+
+static void do_send(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ nx_node_t *node = link->node;
+ if (node) {
+ node->ntype->tx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ // TODO - Cancel the delivery
+}
+
+
+static void do_updated(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ nx_node_t *node = link->node;
+ if (node)
+ node->ntype->disp_handler(node->context, link, delivery);
+ }
+}
+
+
+static int close_handler(void* unused, pn_connection_t *conn)
+{
+ //
+ // Close all links, passing False as the 'closed' argument. These links are not
+ // being properly 'detached'. They are being orphaned.
+ //
+ pn_link_t *pn_link = pn_link_head(conn, 0);
+ while (pn_link) {
+ nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link);
+ nx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 0);
+ pn_link_close(pn_link);
+ free_nx_link_t(link);
+ pn_link = pn_link_next(pn_link, 0);
+ }
+
+ // teardown all sessions
+ pn_session_t *ssn = pn_session_head(conn, 0);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, 0);
+ }
+
+ // teardown the connection
+ pn_connection_close(conn);
+ return 0;
+}
+
+
+static int process_handler(void* unused, pn_connection_t *conn)
+{
+ pn_session_t *ssn;
+ pn_link_t *pn_link;
+ pn_delivery_t *delivery;
+ int event_count = 0;
+
+ // Step 1: setup the engine's connection, and any sessions and links
+ // that may be pending.
+
+ // initialize the connection if it's new
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+ pn_connection_open(conn);
+ event_count++;
+ }
+
+ // open all pending sessions
+ ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
+ while (ssn) {
+ pn_session_open(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+ // configure and open any pending links
+ pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
+ while (pn_link) {
+ if (pn_link_is_sender(pn_link))
+ setup_outgoing_link(pn_link);
+ else
+ setup_incoming_link(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+
+ // Step 2: Now drain all the pending deliveries from the connection's
+ // work queue and process them
+
+ delivery = pn_work_head(conn);
+ while (delivery) {
+ if (pn_delivery_readable(delivery))
+ process_receive(delivery);
+ else if (pn_delivery_writable(delivery))
+ do_send(delivery);
+
+ if (pn_delivery_updated(delivery))
+ do_updated(delivery);
+
+ delivery = pn_work_next(delivery);
+ event_count++;
+ }
+
+ //
+ // Step 2.5: Traverse all of the links on the connection looking for
+ // outgoing links with non-zero credit. Call the attached node's
+ // writable handler for such links.
+ //
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ while (pn_link) {
+ assert(pn_session_connection(pn_link_session(pn_link)) == conn);
+ if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
+ event_count += do_writable(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ }
+
+ // Step 3: Clean up any links or sessions that have been closed by the
+ // remote. If the connection has been closed remotely, clean that up
+ // also.
+
+ // teardown any terminating links
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (pn_link) {
+ nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link);
+ nx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
+ pn_link_close(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown any terminating sessions
+ ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown the connection if it's terminating
+ if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ pn_connection_close(conn);
+ event_count++;
+ }
+
+ return event_count;
+}
+
+
+static void open_handler(nx_connection_t *conn, nx_direction_t dir)
+{
+ const nx_node_type_t *nt;
+
+ //
+ // Note the locking structure in this function. Generally this would be unsafe, but since
+ // this particular list is only ever appended to and never has items inserted or deleted,
+ // this usage is safe in this case.
+ //
+ sys_mutex_lock(lock);
+ nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list);
+ sys_mutex_unlock(lock);
+
+ pn_connection_open(nx_connection_pn(conn));
+
+ while (nt_item) {
+ nt = nt_item->ntype;
+ if (dir == NX_INCOMING) {
+ if (nt->inbound_conn_open_handler)
+ nt->inbound_conn_open_handler(nt->type_context, conn);
+ } else {
+ if (nt->outbound_conn_open_handler)
+ nt->outbound_conn_open_handler(nt->type_context, conn);
+ }
+
+ sys_mutex_lock(lock);
+ nt_item = DEQ_NEXT(nt_item);
+ sys_mutex_unlock(lock);
+ }
+}
+
+
+static int handler(void* context, nx_conn_event_t event, nx_connection_t *nx_conn)
+{
+ pn_connection_t *conn = nx_connection_pn(nx_conn);
+
+ switch (event) {
+ case NX_CONN_EVENT_LISTENER_OPEN: open_handler(nx_conn, NX_INCOMING); break;
+ case NX_CONN_EVENT_CONNECTOR_OPEN: open_handler(nx_conn, NX_OUTGOING); break;
+ case NX_CONN_EVENT_CLOSE: return close_handler(context, conn);
+ case NX_CONN_EVENT_PROCESS: return process_handler(context, conn);
+ }
+
+ return 0;
+}
+
+
+void nx_container_initialize(void)
+{
+ nx_log(module, LOG_TRACE, "Container Initializing");
+
+ // TODO - move allocator init to server?
+ const nx_allocator_config_t *alloc_config = nx_allocator_default_config();
+ nx_allocator_initialize(alloc_config);
+
+ node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
+ node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
+ lock = sys_mutex();
+ default_node = 0;
+ DEQ_INIT(node_type_list);
+
+ nx_server_set_conn_handler(handler);
+}
+
+
+void nx_container_finalize(void)
+{
+}
+
+
+int nx_container_register_node_type(const nx_node_type_t *nt)
+{
+ int result;
+ nx_field_iterator_t *iter = nx_field_iterator_string(nt->type_name, ITER_VIEW_ALL);
+ nxc_node_type_t *nt_item = NEW(nxc_node_type_t);
+ DEQ_ITEM_INIT(nt_item);
+ nt_item->ntype = nt;
+
+ sys_mutex_lock(lock);
+ result = hash_insert_const(node_type_map, iter, nt);
+ DEQ_INSERT_TAIL(node_type_list, nt_item);
+ sys_mutex_unlock(lock);
+
+ nx_field_iterator_free(iter);
+ if (result < 0)
+ return result;
+ nx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name);
+
+ return 0;
+}
+
+
+void nx_container_set_default_node_type(const nx_node_type_t *nt,
+ void *context,
+ nx_dist_mode_t supported_dist)
+{
+ if (default_node)
+ nx_container_destroy_node(default_node);
+
+ if (nt) {
+ default_node = nx_container_create_node(nt, 0, context, supported_dist, NX_LIFE_PERMANENT);
+ nx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name);
+ } else {
+ default_node = 0;
+ nx_log(module, LOG_TRACE, "Default node removed");
+ }
+}
+
+
+nx_node_t *nx_container_create_node(const nx_node_type_t *nt,
+ const char *name,
+ void *context,
+ nx_dist_mode_t supported_dist,
+ nx_lifetime_policy_t life_policy)
+{
+ int result;
+ nx_node_t *node = new_nx_node_t();
+ if (!node)
+ return 0;
+
+ node->ntype = nt;
+ node->name = 0;
+ node->context = context;
+ node->supported_dist = supported_dist;
+ node->life_policy = life_policy;
+
+ if (name) {
+ nx_field_iterator_t *iter = nx_field_iterator_string(name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ result = hash_insert(node_map, iter, node);
+ sys_mutex_unlock(lock);
+ nx_field_iterator_free(iter);
+ if (result < 0) {
+ free_nx_node_t(node);
+ return 0;
+ }
+
+ node->name = (char*) malloc(strlen(name) + 1);
+ strcpy(node->name, name);
+ }
+
+ if (name)
+ nx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name);
+
+ return node;
+}
+
+
+void nx_container_destroy_node(nx_node_t *node)
+{
+ if (node->name) {
+ nx_field_iterator_t *iter = nx_field_iterator_string(node->name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ hash_remove(node_map, iter);
+ sys_mutex_unlock(lock);
+ nx_field_iterator_free(iter);
+ free(node->name);
+ }
+
+ free_nx_node_t(node);
+}
+
+
+void nx_container_node_set_context(nx_node_t *node, void *node_context)
+{
+ node->context = node_context;
+}
+
+
+nx_dist_mode_t nx_container_node_get_dist_modes(const nx_node_t *node)
+{
+ return node->supported_dist;
+}
+
+
+nx_lifetime_policy_t nx_container_node_get_life_policy(const nx_node_t *node)
+{
+ return node->life_policy;
+}
+
+
+nx_link_t *nx_link(nx_node_t *node, nx_connection_t *conn, nx_direction_t dir, const char* name)
+{
+ pn_session_t *sess = pn_session(nx_connection_pn(conn));
+ nx_link_t *link = new_nx_link_t();
+
+ if (dir == NX_OUTGOING)
+ link->pn_link = pn_sender(sess, name);
+ else
+ link->pn_link = pn_receiver(sess, name);
+ link->context = node->context;
+ link->node = node;
+
+ pn_link_set_context(link->pn_link, link);
+
+ pn_session_open(sess);
+
+ return link;
+}
+
+
+void nx_link_set_context(nx_link_t *link, void *context)
+{
+ link->context = context;
+}
+
+
+void *nx_link_get_context(nx_link_t *link)
+{
+ return link->context;
+}
+
+
+pn_link_t *nx_link_pn(nx_link_t *link)
+{
+ return link->pn_link;
+}
+
+
+pn_terminus_t *nx_link_source(nx_link_t *link)
+{
+ return pn_link_source(link->pn_link);
+}
+
+
+pn_terminus_t *nx_link_target(nx_link_t *link)
+{
+ return pn_link_target(link->pn_link);
+}
+
+
+pn_terminus_t *nx_link_remote_source(nx_link_t *link)
+{
+ return pn_link_remote_source(link->pn_link);
+}
+
+
+pn_terminus_t *nx_link_remote_target(nx_link_t *link)
+{
+ return pn_link_remote_target(link->pn_link);
+}
+
+
+void nx_link_activate(nx_link_t *link)
+{
+ if (!link || !link->pn_link)
+ return;
+
+ pn_session_t *sess = pn_link_session(link->pn_link);
+ if (!sess)
+ return;
+
+ pn_connection_t *conn = pn_session_connection(sess);
+ if (!conn)
+ return;
+
+ nx_connection_t *ctx = pn_connection_get_context(conn);
+ if (!ctx)
+ return;
+
+ nx_server_activate(ctx);
+}
+
+
+void nx_link_close(nx_link_t *link)
+{
+ pn_link_close(link->pn_link);
+}
+
+
diff --git a/qpid/extras/nexus/src/hash.c b/qpid/extras/nexus/src/hash.c
new file mode 100644
index 0000000000..c5d882519d
--- /dev/null
+++ b/qpid/extras/nexus/src/hash.c
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/hash.h>
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/alloc.h>
+#include <stdio.h>
+#include <string.h>
+
+typedef struct hash_item_t {
+ DEQ_LINKS(struct hash_item_t);
+ unsigned char *key;
+ union {
+ void *val;
+ const void *val_const;
+ } v;
+} hash_item_t;
+
+ALLOC_DECLARE(hash_item_t);
+ALLOC_DEFINE(hash_item_t);
+DEQ_DECLARE(hash_item_t, items_t);
+
+
+typedef struct bucket_t {
+ items_t items;
+} bucket_t;
+
+
+struct hash_t {
+ bucket_t *buckets;
+ unsigned int bucket_count;
+ unsigned int bucket_mask;
+ int batch_size;
+ size_t size;
+ int is_const;
+};
+
+
+// djb2 hash algorithm
+static unsigned long hash_function(nx_field_iterator_t *iter)
+{
+ unsigned long hash = 5381;
+ int c;
+
+ while (!nx_field_iterator_end(iter)) {
+ c = (int) nx_field_iterator_octet(iter);
+ hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
+ }
+
+ return hash;
+}
+
+
+hash_t *hash(int bucket_exponent, int batch_size, int value_is_const)
+{
+ int i;
+ hash_t *h = NEW(hash_t);
+
+ if (!h)
+ return 0;
+
+ h->bucket_count = 1 << bucket_exponent;
+ h->bucket_mask = h->bucket_count - 1;
+ h->batch_size = batch_size;
+ h->size = 0;
+ h->is_const = value_is_const;
+ h->buckets = NEW_ARRAY(bucket_t, h->bucket_count);
+ for (i = 0; i < h->bucket_count; i++) {
+ DEQ_INIT(h->buckets[i].items);
+ }
+
+ return h;
+}
+
+
+void hash_free(hash_t *h)
+{
+ // TODO - Implement this
+}
+
+
+size_t hash_size(hash_t *h)
+{
+ return h ? h->size : 0;
+}
+
+
+static hash_item_t *hash_internal_insert(hash_t *h, nx_field_iterator_t *key, int *error)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ *error = 0;
+
+ while (item) {
+ if (nx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ if (item) {
+ *error = -1;
+ return 0;
+ }
+
+ item = new_hash_item_t();
+ if (!item) {
+ *error = -2;
+ return 0;
+ }
+
+ DEQ_ITEM_INIT(item);
+ item->key = nx_field_iterator_copy(key);
+
+ DEQ_INSERT_TAIL(h->buckets[idx].items, item);
+ h->size++;
+ return item;
+}
+
+
+int hash_insert(hash_t *h, nx_field_iterator_t *key, void *val)
+{
+ int error = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &error);
+
+ if (item)
+ item->v.val = val;
+ return error;
+}
+
+
+int hash_insert_const(hash_t *h, nx_field_iterator_t *key, const void *val)
+{
+ if (!h->is_const)
+ return -3;
+
+ int error = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &error);
+
+ if (item)
+ item->v.val_const = val;
+ return error;
+}
+
+
+static hash_item_t *hash_internal_retrieve(hash_t *h, nx_field_iterator_t *key)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ while (item) {
+ if (nx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ return item;
+}
+
+
+int hash_retrieve(hash_t *h, nx_field_iterator_t *key, void **val)
+{
+ hash_item_t *item = hash_internal_retrieve(h, key);
+ if (item) {
+ *val = item->v.val;
+ return 0;
+ }
+ return -1;
+}
+
+
+int hash_retrieve_const(hash_t *h, nx_field_iterator_t *key, const void **val)
+{
+ if (!h->is_const)
+ return -3;
+
+ hash_item_t *item = hash_internal_retrieve(h, key);
+ if (item) {
+ *val = item->v.val_const;
+ return 0;
+ }
+ return -1;
+}
+
+
+int hash_remove(hash_t *h, nx_field_iterator_t *key)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ while (item) {
+ if (nx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ if (item) {
+ free(item->key);
+ DEQ_REMOVE(h->buckets[idx].items, item);
+ free_hash_item_t(item);
+ h->size--;
+ return 0;
+ }
+
+ return -1;
+}
+
diff --git a/qpid/extras/nexus/src/iterator.c b/qpid/extras/nexus/src/iterator.c
new file mode 100644
index 0000000000..d03590e851
--- /dev/null
+++ b/qpid/extras/nexus/src/iterator.c
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/iterator.h>
+#include <qpid/nexus/message.h>
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/alloc.h>
+#include <stdio.h>
+#include <string.h>
+
+typedef enum {
+MODE_TO_END,
+MODE_TO_SLASH
+} parse_mode_t;
+
+struct nx_field_iterator_t {
+ nx_buffer_t *start_buffer;
+ unsigned char *start_cursor;
+ int start_length;
+ nx_buffer_t *buffer;
+ unsigned char *cursor;
+ int length;
+ nx_iterator_view_t view;
+ parse_mode_t mode;
+};
+
+
+ALLOC_DECLARE(nx_field_iterator_t);
+ALLOC_DEFINE(nx_field_iterator_t);
+
+
+typedef enum {
+STATE_START,
+STATE_SLASH_LEFT,
+STATE_SKIPPING_TO_NEXT_SLASH,
+STATE_SCANNING,
+STATE_COLON,
+STATE_COLON_SLASH,
+STATE_AT_NODE_ID
+} state_t;
+
+
+static void view_initialize(nx_field_iterator_t *iter)
+{
+ if (iter->view == ITER_VIEW_ALL) {
+ iter->mode = MODE_TO_END;
+ return;
+ }
+
+ //
+ // Advance to the node-id.
+ //
+ state_t state = STATE_START;
+ unsigned int octet;
+ while (!nx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) {
+ octet = nx_field_iterator_octet(iter);
+ switch (state) {
+ case STATE_START :
+ if (octet == '/')
+ state = STATE_SLASH_LEFT;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_SLASH_LEFT :
+ if (octet == '/')
+ state = STATE_SKIPPING_TO_NEXT_SLASH;
+ else
+ state = STATE_AT_NODE_ID;
+ break;
+
+ case STATE_SKIPPING_TO_NEXT_SLASH :
+ if (octet == '/')
+ state = STATE_AT_NODE_ID;
+ break;
+
+ case STATE_SCANNING :
+ if (octet == ':')
+ state = STATE_COLON;
+ break;
+
+ case STATE_COLON :
+ if (octet == '/')
+ state = STATE_COLON_SLASH;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_COLON_SLASH :
+ if (octet == '/')
+ state = STATE_SKIPPING_TO_NEXT_SLASH;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_AT_NODE_ID :
+ break;
+ }
+ }
+
+ if (state != STATE_AT_NODE_ID) {
+ //
+ // The address string was relative, not absolute. The node-id
+ // is at the beginning of the string.
+ //
+ iter->buffer = iter->start_buffer;
+ iter->cursor = iter->start_cursor;
+ iter->length = iter->start_length;
+ }
+
+ //
+ // Cursor is now on the first octet of the node-id
+ //
+ if (iter->view == ITER_VIEW_NODE_ID) {
+ iter->mode = MODE_TO_SLASH;
+ return;
+ }
+
+ if (iter->view == ITER_VIEW_NO_HOST) {
+ iter->mode = MODE_TO_END;
+ return;
+ }
+
+ if (iter->view == ITER_VIEW_NODE_SPECIFIC) {
+ iter->mode = MODE_TO_END;
+ while (!nx_field_iterator_end(iter)) {
+ octet = nx_field_iterator_octet(iter);
+ if (octet == '/')
+ break;
+ }
+ return;
+ }
+}
+
+
+nx_field_iterator_t* nx_field_iterator_string(const char *text, nx_iterator_view_t view)
+{
+ nx_field_iterator_t *iter = new_nx_field_iterator_t();
+ if (!iter)
+ return 0;
+
+ iter->start_buffer = 0;
+ iter->start_cursor = (unsigned char*) text;
+ iter->start_length = strlen(text);
+
+ nx_field_iterator_reset(iter, view);
+
+ return iter;
+}
+
+
+nx_field_iterator_t *nx_field_iterator_buffer(nx_buffer_t *buffer, int offset, int length, nx_iterator_view_t view)
+{
+ nx_field_iterator_t *iter = new_nx_field_iterator_t();
+ if (!iter)
+ return 0;
+
+ iter->start_buffer = buffer;
+ iter->start_cursor = nx_buffer_base(buffer) + offset;
+ iter->start_length = length;
+
+ nx_field_iterator_reset(iter, view);
+
+ return iter;
+}
+
+
+void nx_field_iterator_free(nx_field_iterator_t *iter)
+{
+ free_nx_field_iterator_t(iter);
+}
+
+
+void nx_field_iterator_reset(nx_field_iterator_t *iter, nx_iterator_view_t view)
+{
+ iter->buffer = iter->start_buffer;
+ iter->cursor = iter->start_cursor;
+ iter->length = iter->start_length;
+ iter->view = view;
+
+ view_initialize(iter);
+}
+
+
+unsigned char nx_field_iterator_octet(nx_field_iterator_t *iter)
+{
+ if (iter->length == 0)
+ return (unsigned char) 0;
+
+ unsigned char result = *(iter->cursor);
+
+ iter->cursor++;
+ iter->length--;
+
+ if (iter->length > 0) {
+ if (iter->buffer) {
+ if (iter->cursor - nx_buffer_base(iter->buffer) == nx_buffer_size(iter->buffer)) {
+ iter->buffer = iter->buffer->next;
+ if (iter->buffer == 0)
+ iter->length = 0;
+ iter->cursor = nx_buffer_base(iter->buffer);
+ }
+ }
+ }
+
+ if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/')
+ iter->length = 0;
+
+ return result;
+}
+
+
+int nx_field_iterator_end(nx_field_iterator_t *iter)
+{
+ return iter->length == 0;
+}
+
+
+int nx_field_iterator_equal(nx_field_iterator_t *iter, unsigned char *string)
+{
+ nx_field_iterator_reset(iter, iter->view);
+ while (!nx_field_iterator_end(iter) && *string) {
+ if (*string != nx_field_iterator_octet(iter))
+ return 0;
+ string++;
+ }
+
+ return (nx_field_iterator_end(iter) && (*string == 0));
+}
+
+
+unsigned char *nx_field_iterator_copy(nx_field_iterator_t *iter)
+{
+ int length = 0;
+ int idx = 0;
+ unsigned char *copy;
+
+ nx_field_iterator_reset(iter, iter->view);
+ while (!nx_field_iterator_end(iter)) {
+ nx_field_iterator_octet(iter);
+ length++;
+ }
+
+ nx_field_iterator_reset(iter, iter->view);
+ copy = (unsigned char*) malloc(length + 1);
+ while (!nx_field_iterator_end(iter))
+ copy[idx++] = nx_field_iterator_octet(iter);
+ copy[idx] = '\0';
+
+ return copy;
+}
+
diff --git a/qpid/extras/nexus/src/log.c b/qpid/extras/nexus/src/log.c
new file mode 100644
index 0000000000..ca1af86915
--- /dev/null
+++ b/qpid/extras/nexus/src/log.c
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/log.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+static int mask=LOG_INFO;
+
+static char *cls_prefix(int cls)
+{
+ switch (cls) {
+ case LOG_TRACE : return "TRACE";
+ case LOG_ERROR : return "ERROR";
+ case LOG_INFO : return "INFO";
+ }
+
+ return "";
+}
+
+void nx_log(const char *module, int cls, const char *fmt, ...)
+{
+ if (!(cls & mask))
+ return;
+
+ va_list ap;
+ char line[128];
+
+ va_start(ap, fmt);
+ vsnprintf(line, 127, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line);
+}
+
+void nx_log_set_mask(int _mask)
+{
+ mask = _mask;
+}
+
diff --git a/qpid/extras/nexus/src/message.c b/qpid/extras/nexus/src/message.c
new file mode 100644
index 0000000000..11f58a1474
--- /dev/null
+++ b/qpid/extras/nexus/src/message.c
@@ -0,0 +1,1164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/message.h>
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/threading.h>
+#include <string.h>
+#include <stdio.h>
+
+
+//
+// Per-Thread allocator
+//
+typedef struct nx_allocator_t {
+ nx_message_list_t message_free_list;
+ nx_buffer_list_t buffer_free_list;
+} nx_allocator_t;
+
+//
+// Global allocator (protected by a global lock)
+//
+typedef struct {
+ nx_message_list_t message_free_list;
+ nx_buffer_list_t buffer_free_list;
+ sys_mutex_t *lock;
+} nx_global_allocator_t;
+
+static nx_global_allocator_t global;
+static nx_allocator_config_t default_config;
+static const nx_allocator_config_t *config;
+
+
+static nx_allocator_t *nx_get_allocator(void)
+{
+ static __thread nx_allocator_t *alloc = 0;
+
+ if (!alloc) {
+ alloc = NEW(nx_allocator_t);
+
+ if (!alloc)
+ return 0;
+
+ DEQ_INIT(alloc->message_free_list);
+ DEQ_INIT(alloc->buffer_free_list);
+ }
+
+ return alloc;
+}
+
+
+static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume)
+{
+ unsigned char *local_cursor = *cursor;
+ nx_buffer_t *local_buffer = *buffer;
+
+ int remaining = nx_buffer_size(local_buffer) - (local_cursor - nx_buffer_base(local_buffer));
+ while (consume > 0) {
+ if (consume < remaining) {
+ local_cursor += consume;
+ consume = 0;
+ } else {
+ consume -= remaining;
+ local_buffer = local_buffer->next;
+ if (local_buffer == 0){
+ local_cursor = 0;
+ break;
+ }
+ local_cursor = nx_buffer_base(local_buffer);
+ remaining = nx_buffer_size(local_buffer) - (local_cursor - nx_buffer_base(local_buffer));
+ }
+ }
+
+ *cursor = local_cursor;
+ *buffer = local_buffer;
+}
+
+
+static unsigned char next_octet(unsigned char **cursor, nx_buffer_t **buffer)
+{
+ unsigned char result = **cursor;
+ advance(cursor, buffer, 1);
+ return result;
+}
+
+
+static int traverse_field(unsigned char **cursor, nx_buffer_t **buffer, nx_field_location_t *field)
+{
+ unsigned char tag = next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ int consume = 0;
+ switch (tag & 0xF0) {
+ case 0x40 : consume = 0; break;
+ case 0x50 : consume = 1; break;
+ case 0x60 : consume = 2; break;
+ case 0x70 : consume = 4; break;
+ case 0x80 : consume = 8; break;
+ case 0x90 : consume = 16; break;
+
+ case 0xB0 :
+ case 0xD0 :
+ case 0xF0 :
+ consume |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ consume |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ consume |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ // Fall through to the next case...
+
+ case 0xA0 :
+ case 0xC0 :
+ case 0xE0 :
+ consume |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ break;
+ }
+
+ if (field) {
+ field->buffer = *buffer;
+ field->offset = *cursor - nx_buffer_base(*buffer);
+ field->length = consume;
+ field->parsed = 1;
+ }
+
+ advance(cursor, buffer, consume);
+ return 1;
+}
+
+
+static int start_list(unsigned char **cursor, nx_buffer_t **buffer)
+{
+ unsigned char tag = next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ int length = 0;
+ int count = 0;
+
+ switch (tag) {
+ case 0x45 : // list0
+ break;
+ case 0xd0 : // list32
+ length |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ length |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ length |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ length |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ count |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ count |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ count |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ count |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ break;
+
+ case 0xc0 : // list8
+ length |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ count |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ break;
+ }
+
+ return count;
+}
+
+
+//
+// Check the buffer chain, starting at cursor to see if it matches the pattern.
+// If the pattern matches, check the next tag to see if it's in the set of expected
+// tags. If not, return zero. If so, set the location descriptor to the good
+// tag and advance the cursor (and buffer, if needed) to the end of the matched section.
+//
+// If there is no match, don't advance the cursor.
+//
+// Return 0 if the pattern matches but the following tag is unexpected
+// Return 0 if the pattern matches and the location already has a pointer (duplicate section)
+// Return 1 if the pattern matches and we've advanced the cursor/buffer
+// Return 1 if the pattern does not match
+//
+static int nx_check_and_advance(nx_buffer_t **buffer,
+ unsigned char **cursor,
+ unsigned char *pattern,
+ int pattern_length,
+ unsigned char *expected_tags,
+ nx_field_location_t *location)
+{
+ nx_buffer_t *test_buffer = *buffer;
+ unsigned char *test_cursor = *cursor;
+
+ if (!test_cursor)
+ return 1; // no match
+
+ unsigned char *end_of_buffer = nx_buffer_base(test_buffer) + nx_buffer_size(test_buffer);
+ int idx = 0;
+
+ while (idx < pattern_length && *test_cursor == pattern[idx]) {
+ idx++;
+ test_cursor++;
+ if (test_cursor == end_of_buffer) {
+ test_buffer = test_buffer->next;
+ if (test_buffer == 0)
+ return 1; // Pattern didn't match
+ test_cursor = nx_buffer_base(test_buffer);
+ end_of_buffer = test_cursor + nx_buffer_size(test_buffer);
+ }
+ }
+
+ if (idx < pattern_length)
+ return 1; // Pattern didn't match
+
+ //
+ // Pattern matched, check the tag
+ //
+ while (*expected_tags && *test_cursor != *expected_tags)
+ expected_tags++;
+ if (*expected_tags == 0)
+ return 0; // Unexpected tag
+
+ if (location->parsed)
+ return 0; // Duplicate section
+
+ //
+ // Pattern matched and tag is expected. Mark the beginning of the section.
+ //
+ location->parsed = 1;
+ location->buffer = test_buffer;
+ location->offset = test_cursor - nx_buffer_base(test_buffer);
+ location->length = 0;
+
+ //
+ // Advance the pointers to consume the whole section.
+ //
+ int consume = 0;
+ unsigned char tag = next_octet(&test_cursor, &test_buffer);
+ if (!test_cursor) return 0;
+ switch (tag) {
+ case 0x45 : // list0
+ break;
+
+ case 0xd0 : // list32
+ case 0xd1 : // map32
+ case 0xb0 : // vbin32
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24;
+ if (!test_cursor) return 0;
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16;
+ if (!test_cursor) return 0;
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8;
+ if (!test_cursor) return 0;
+ // Fall through to the next case...
+
+ case 0xc0 : // list8
+ case 0xc1 : // map8
+ case 0xa0 : // vbin8
+ consume |= (int) next_octet(&test_cursor, &test_buffer);
+ if (!test_cursor) return 0;
+ break;
+ }
+
+ if (consume)
+ advance(&test_cursor, &test_buffer, consume);
+
+ *cursor = test_cursor;
+ *buffer = test_buffer;
+ return 1;
+}
+
+
+static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len)
+{
+ nx_buffer_t *buf = DEQ_TAIL(msg->buffers);
+
+ while (len > 0) {
+ if (buf == 0 || nx_buffer_capacity(buf) == 0) {
+ buf = nx_allocate_buffer();
+ if (buf == 0)
+ return;
+ DEQ_INSERT_TAIL(msg->buffers, buf);
+ }
+
+ size_t to_copy = nx_buffer_capacity(buf);
+ if (to_copy > len)
+ to_copy = len;
+ memcpy(nx_buffer_cursor(buf), seq, to_copy);
+ nx_buffer_insert(buf, to_copy);
+ len -= to_copy;
+ seq += to_copy;
+ msg->length += to_copy;
+ }
+}
+
+
+static void nx_insert_8(nx_message_t *msg, uint8_t value)
+{
+ nx_insert(msg, &value, 1);
+}
+
+
+static void nx_insert_32(nx_message_t *msg, uint32_t value)
+{
+ uint8_t buf[4];
+ buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
+ buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16);
+ buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8);
+ buf[3] = (uint8_t) (value & 0x000000FF);
+ nx_insert(msg, buf, 4);
+}
+
+
+static void nx_insert_64(nx_message_t *msg, uint64_t value)
+{
+ uint8_t buf[8];
+ buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
+ buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48);
+ buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40);
+ buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32);
+ buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24);
+ buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16);
+ buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8);
+ buf[7] = (uint8_t) (value & 0x00000000000000FFL);
+ nx_insert(msg, buf, 8);
+}
+
+
+static void nx_overwrite(nx_buffer_t **buf, size_t *cursor, uint8_t value)
+{
+ while (*buf) {
+ if (*cursor >= nx_buffer_size(*buf)) {
+ *buf = (*buf)->next;
+ *cursor = 0;
+ } else {
+ nx_buffer_base(*buf)[*cursor] = value;
+ (*cursor)++;
+ return;
+ }
+ }
+}
+
+
+static void nx_overwrite_32(nx_field_location_t *field, uint32_t value)
+{
+ nx_buffer_t *buf = field->buffer;
+ size_t cursor = field->offset;
+
+ nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
+ nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
+ nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
+ nx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF));
+}
+
+
+static void nx_start_list_performative(nx_message_t *msg, uint8_t code)
+{
+ //
+ // Insert the short-form performative tag
+ //
+ nx_insert(msg, (const uint8_t*) "\x00\x53", 2);
+ nx_insert_8(msg, code);
+
+ //
+ // Open the list with a list32 tag
+ //
+ nx_insert_8(msg, 0xd0);
+
+ //
+ // Mark the current location to later overwrite the length
+ //
+ msg->compose_length.buffer = DEQ_TAIL(msg->buffers);
+ msg->compose_length.offset = nx_buffer_size(msg->compose_length.buffer);
+ msg->compose_length.length = 4;
+ msg->compose_length.parsed = 1;
+
+ nx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ //
+ // Mark the current location to later overwrite the count
+ //
+ msg->compose_count.buffer = DEQ_TAIL(msg->buffers);
+ msg->compose_count.offset = nx_buffer_size(msg->compose_count.buffer);
+ msg->compose_count.length = 4;
+ msg->compose_count.parsed = 1;
+
+ nx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ msg->length = 4; // Include the length of the count field
+ msg->count = 0;
+}
+
+
+static void nx_end_list(nx_message_t *msg)
+{
+ nx_overwrite_32(&msg->compose_length, msg->length);
+ nx_overwrite_32(&msg->compose_count, msg->count);
+}
+
+
+const nx_allocator_config_t *nx_allocator_default_config(void)
+{
+ default_config.buffer_size = 1024;
+ default_config.buffer_preallocation_count = 512;
+ default_config.buffer_rebalancing_batch_count = 16;
+ default_config.buffer_local_storage_max = 64;
+ default_config.buffer_free_list_max = 1000000;
+ default_config.message_allocation_batch_count = 256;
+ default_config.message_rebalancing_batch_count = 64;
+ default_config.message_local_storage_max = 256;
+
+ return &default_config;
+}
+
+
+void nx_allocator_initialize(const nx_allocator_config_t *c)
+{
+ config = c;
+
+ // Initialize the fields in the global structure.
+ DEQ_INIT(global.message_free_list);
+ DEQ_INIT(global.buffer_free_list);
+ global.lock = sys_mutex();
+
+ // Pre-allocate buffers according to the configuration
+ int i;
+ nx_buffer_t *buf;
+
+ for (i = 0; i < config->buffer_preallocation_count; i++) {
+ buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
+ DEQ_ITEM_INIT(buf);
+ DEQ_INSERT_TAIL(global.buffer_free_list, buf);
+ }
+}
+
+
+void nx_allocator_finalize(void)
+{
+ // TODO - Free buffers and messages
+}
+
+
+nx_message_t *nx_allocate_message(void)
+{
+ nx_allocator_t *alloc = nx_get_allocator();
+ nx_message_t *msg;
+ int i;
+
+ if (DEQ_SIZE(alloc->message_free_list) == 0) {
+ //
+ // The local free list is empty, rebalance a batch of objects from the global
+ // free list.
+ //
+ sys_mutex_lock(global.lock);
+ if (DEQ_SIZE(global.message_free_list) >= config->message_rebalancing_batch_count) {
+ for (i = 0; i < config->message_rebalancing_batch_count; i++) {
+ msg = DEQ_HEAD(global.message_free_list);
+ DEQ_REMOVE_HEAD(global.message_free_list);
+ DEQ_INSERT_TAIL(alloc->message_free_list, msg);
+ }
+ }
+ sys_mutex_unlock(global.lock);
+ }
+
+ if (DEQ_SIZE(alloc->message_free_list) == 0) {
+ //
+ // The local free list is still empty. This means there were not enough objects on the
+ // global free list to make up a batch. Allocate new objects from the heap and store
+ // them in the local free list.
+ //
+ nx_message_t *batch = NEW_ARRAY(nx_message_t, config->message_allocation_batch_count);
+ memset(batch, 0, sizeof(nx_message_t) * config->message_allocation_batch_count);
+ for (i = 0; i < config->message_allocation_batch_count; i++) {
+ DEQ_INSERT_TAIL(alloc->message_free_list, &batch[i]);
+ }
+ }
+
+ //
+ // If the local free list is still empty, we're out of memory.
+ //
+ if (DEQ_SIZE(alloc->message_free_list) == 0)
+ return 0;
+
+ msg = DEQ_HEAD(alloc->message_free_list);
+ DEQ_REMOVE_HEAD(alloc->message_free_list);
+
+ DEQ_INIT(msg->buffers);
+ msg->in_delivery = NULL;
+ msg->out_delivery = NULL;
+ msg->section_message_header.buffer = 0;
+ msg->section_message_header.parsed = 0;
+ msg->section_delivery_annotation.buffer = 0;
+ msg->section_delivery_annotation.parsed = 0;
+ msg->section_message_annotation.buffer = 0;
+ msg->section_message_annotation.parsed = 0;
+ msg->section_message_properties.buffer = 0;
+ msg->section_message_properties.parsed = 0;
+ msg->section_application_properties.buffer = 0;
+ msg->section_application_properties.parsed = 0;
+ msg->section_body.buffer = 0;
+ msg->section_body.parsed = 0;
+ msg->section_footer.buffer = 0;
+ msg->section_footer.parsed = 0;
+ msg->field_user_id.buffer = 0;
+ msg->field_user_id.parsed = 0;
+ msg->field_to.buffer = 0;
+ msg->field_to.parsed = 0;
+ msg->body.buffer = 0;
+ msg->body.parsed = 0;
+ return msg;
+}
+
+
+nx_buffer_t *nx_allocate_buffer(void)
+{
+ nx_allocator_t *alloc = nx_get_allocator();
+ nx_buffer_t *buf;
+ int i;
+
+ if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
+ sys_mutex_lock(global.lock);
+ if (DEQ_SIZE(global.buffer_free_list) >= config->buffer_rebalancing_batch_count) {
+ // Rebalance a batch of free descriptors to the local free list.
+ for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
+ buf = DEQ_HEAD(global.buffer_free_list);
+ DEQ_REMOVE_HEAD(global.buffer_free_list);
+ DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
+ }
+ }
+ sys_mutex_unlock(global.lock);
+ }
+
+ if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
+ // Allocate a buffer from the heap
+ buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
+ DEQ_ITEM_INIT(buf);
+ DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
+ }
+
+ if (DEQ_SIZE(alloc->buffer_free_list) == 0)
+ return 0;
+
+ buf = DEQ_HEAD(alloc->buffer_free_list);
+ DEQ_REMOVE_HEAD(alloc->buffer_free_list);
+
+ buf->size = 0;
+
+ return buf;
+}
+
+
+void nx_free_message(nx_message_t *msg)
+{
+ nx_allocator_t *alloc = nx_get_allocator();
+
+ // Free any buffers in the message
+ int i;
+ nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
+ while (buf) {
+ DEQ_REMOVE_HEAD(msg->buffers);
+ nx_free_buffer(buf);
+ buf = DEQ_HEAD(msg->buffers);
+ }
+
+ DEQ_INSERT_TAIL(alloc->message_free_list, msg);
+ if (DEQ_SIZE(alloc->message_free_list) > config->message_local_storage_max) {
+ //
+ // The local free list has exceeded the threshold for local storage.
+ // Rebalance a batch of free objects to the global free list.
+ //
+ sys_mutex_lock(global.lock);
+ for (i = 0; i < config->message_rebalancing_batch_count; i++) {
+ msg = DEQ_HEAD(alloc->message_free_list);
+ DEQ_REMOVE_HEAD(alloc->message_free_list);
+ DEQ_INSERT_TAIL(global.message_free_list, msg);
+ }
+ sys_mutex_unlock(global.lock);
+ }
+}
+
+
+void nx_free_buffer(nx_buffer_t *buf)
+{
+ nx_allocator_t *alloc = nx_get_allocator();
+ int i;
+
+ DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
+ if (DEQ_SIZE(alloc->buffer_free_list) > config->buffer_local_storage_max) {
+ // Rebalance a batch of free descriptors to the global free list.
+ sys_mutex_lock(global.lock);
+ for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
+ buf = DEQ_HEAD(alloc->buffer_free_list);
+ DEQ_REMOVE_HEAD(alloc->buffer_free_list);
+ DEQ_INSERT_TAIL(global.buffer_free_list, buf);
+ }
+ sys_mutex_unlock(global.lock);
+ }
+}
+
+
+nx_message_t *nx_message_receive(pn_delivery_t *delivery)
+{
+ pn_link_t *link = pn_delivery_link(delivery);
+ nx_message_t *msg = (nx_message_t*) pn_delivery_get_context(delivery);
+ ssize_t rc;
+ nx_buffer_t *buf;
+
+ //
+ // If there is no message associated with the delivery, this is the first time
+ // we've received anything on this delivery. Allocate a message descriptor and
+ // link it and the delivery together.
+ //
+ if (!msg) {
+ msg = nx_allocate_message();
+ pn_delivery_set_context(delivery, (void*) msg);
+
+ //
+ // Record the incoming delivery only if it is not settled. If it is
+ // settled, there's no need to propagate disposition back to the sender.
+ //
+ if (!pn_delivery_settled(delivery))
+ msg->in_delivery = delivery;
+ }
+
+ //
+ // Get a reference to the tail buffer on the message. This is the buffer into which
+ // we will store incoming message data. If there is no buffer in the message, allocate
+ // an empty one and add it to the message.
+ //
+ buf = DEQ_TAIL(msg->buffers);
+ if (!buf) {
+ buf = nx_allocate_buffer();
+ DEQ_INSERT_TAIL(msg->buffers, buf);
+ }
+
+ while (1) {
+ //
+ // Try to receive enough data to fill the remaining space in the tail buffer.
+ //
+ rc = pn_link_recv(link, (char*) nx_buffer_cursor(buf), nx_buffer_capacity(buf));
+
+ //
+ // If we receive PN_EOS, we have come to the end of the message.
+ //
+ if (rc == PN_EOS) {
+ //
+ // If the last buffer in the list is empty, remove it and free it. This
+ // will only happen if the size of the message content is an exact multiple
+ // of the buffer size.
+ //
+ if (nx_buffer_size(buf) == 0) {
+ DEQ_REMOVE_TAIL(msg->buffers);
+ nx_free_buffer(buf);
+ }
+ return msg;
+ }
+
+ if (rc > 0) {
+ //
+ // We have received a positive number of bytes for the message. Advance
+ // the cursor in the buffer.
+ //
+ nx_buffer_insert(buf, rc);
+
+ //
+ // If the buffer is full, allocate a new empty buffer and append it to the
+ // tail of the message's list.
+ //
+ if (nx_buffer_capacity(buf) == 0) {
+ buf = nx_allocate_buffer();
+ DEQ_INSERT_TAIL(msg->buffers, buf);
+ }
+ } else
+ //
+ // We received zero bytes, and no PN_EOS. This means that we've received
+ // all of the data available up to this point, but it does not constitute
+ // the entire message. We'll be back later to finish it up.
+ //
+ break;
+ }
+
+ return NULL;
+}
+
+
+int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
+{
+
+#define LONG 10
+#define SHORT 3
+#define MSG_HDR_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"
+#define MSG_HDR_SHORT (unsigned char*) "\x00\x53\x70"
+#define DELIVERY_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"
+#define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71"
+#define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"
+#define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72"
+#define MESSAGE_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
+#define MESSAGE_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73"
+#define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"
+#define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74"
+#define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"
+#define BODY_DATA_SHORT (unsigned char*) "\x00\x53\x75"
+#define BODY_SEQUENCE_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"
+#define BODY_SEQUENCE_SHORT (unsigned char*) "\x00\x53\x76"
+#define FOOTER_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"
+#define FOOTER_SHORT (unsigned char*) "\x00\x53\x78"
+#define TAGS_LIST (unsigned char*) "\x45\xc0\xd0"
+#define TAGS_MAP (unsigned char*) "\xc1\xd1"
+#define TAGS_BINARY (unsigned char*) "\xa0\xb0"
+
+ nx_buffer_t *buffer = DEQ_HEAD(msg->buffers);
+ unsigned char *cursor;
+
+ if (!buffer)
+ return 0; // Invalid - No data in the message
+
+ if (depth == NX_DEPTH_NONE)
+ return 1;
+
+ cursor = nx_buffer_base(buffer);
+
+ //
+ // MESSAGE HEADER
+ //
+ if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &msg->section_message_header))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &msg->section_message_header))
+ return 0;
+
+ if (depth == NX_DEPTH_HEADER)
+ return 1;
+
+ //
+ // DELIVERY ANNOTATION
+ //
+ if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_delivery_annotation))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_delivery_annotation))
+ return 0;
+
+ if (depth == NX_DEPTH_DELIVERY_ANNOTATIONS)
+ return 1;
+
+ //
+ // MESSAGE ANNOTATION
+ //
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_message_annotation))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_message_annotation))
+ return 0;
+
+ if (depth == NX_DEPTH_MESSAGE_ANNOTATIONS)
+ return 1;
+
+ //
+ // MESSAGE PROPERTIES
+ //
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_LONG, LONG, TAGS_LIST, &msg->section_message_properties))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_SHORT, SHORT, TAGS_LIST, &msg->section_message_properties))
+ return 0;
+
+ if (depth == NX_DEPTH_MESSAGE_PROPERTIES)
+ return 1;
+
+ //
+ // APPLICATION PROPERTIES
+ //
+ if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &msg->section_application_properties))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &msg->section_application_properties))
+ return 0;
+
+ if (depth == NX_DEPTH_APPLICATION_PROPERTIES)
+ return 1;
+
+ //
+ // BODY (Note that this function expects a single data section or a single AMQP sequence)
+ //
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &msg->section_body))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &msg->section_body))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &msg->section_body))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &msg->section_body))
+ return 0;
+
+ if (depth == NX_DEPTH_BODY)
+ return 1;
+
+ //
+ // FOOTER
+ //
+ if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &msg->section_footer))
+ return 0;
+ if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &msg->section_footer))
+ return 0;
+
+ return 1;
+}
+
+
+nx_field_iterator_t *nx_message_field_to(nx_message_t *msg)
+{
+ while (1) {
+ if (msg->field_to.parsed)
+ return nx_field_iterator_buffer(msg->field_to.buffer, msg->field_to.offset, msg->field_to.length, ITER_VIEW_ALL);
+
+ if (msg->section_message_properties.parsed == 0)
+ break;
+
+ nx_buffer_t *buffer = msg->section_message_properties.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + msg->section_message_properties.offset;
+
+ int count = start_list(&cursor, &buffer);
+ int result;
+
+ if (count < 3)
+ break;
+
+ result = traverse_field(&cursor, &buffer, 0); // message_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // user_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, &msg->field_to); // to
+ if (!result) return 0;
+ }
+
+ return 0;
+}
+
+
+nx_field_iterator_t *nx_message_body(nx_message_t *msg)
+{
+ while (1) {
+ if (msg->body.parsed)
+ return nx_field_iterator_buffer(msg->body.buffer, msg->body.offset, msg->body.length, ITER_VIEW_ALL);
+
+ if (msg->section_body.parsed == 0)
+ break;
+
+ nx_buffer_t *buffer = msg->section_body.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + msg->section_body.offset;
+ int result;
+
+ result = traverse_field(&cursor, &buffer, &msg->body);
+ if (!result) return 0;
+ }
+
+ return 0;
+}
+
+
+void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain)
+{
+ nx_message_begin_header(msg);
+ nx_message_insert_boolean(msg, 0); // durable
+ //nx_message_insert_null(msg); // priority
+ //nx_message_insert_null(msg); // ttl
+ //nx_message_insert_boolean(msg, 0); // first-acquirer
+ //nx_message_insert_uint(msg, 0); // delivery-count
+ nx_message_end_header(msg);
+
+ nx_message_begin_message_properties(msg);
+ nx_message_insert_null(msg); // message-id
+ nx_message_insert_null(msg); // user-id
+ nx_message_insert_string(msg, to); // to
+ //nx_message_insert_null(msg); // subject
+ //nx_message_insert_null(msg); // reply-to
+ //nx_message_insert_null(msg); // correlation-id
+ //nx_message_insert_null(msg); // content-type
+ //nx_message_insert_null(msg); // content-encoding
+ //nx_message_insert_timestamp(msg, 0); // absolute-expiry-time
+ //nx_message_insert_timestamp(msg, 0); // creation-time
+ //nx_message_insert_null(msg); // group-id
+ //nx_message_insert_uint(msg, 0); // group-sequence
+ //nx_message_insert_null(msg); // reply-to-group-id
+ nx_message_end_message_properties(msg);
+
+ if (buf_chain)
+ nx_message_append_body_data(msg, buf_chain);
+}
+
+
+void nx_message_begin_header(nx_message_t *msg)
+{
+ nx_start_list_performative(msg, 0x70);
+}
+
+
+void nx_message_end_header(nx_message_t *msg)
+{
+ nx_end_list(msg);
+}
+
+
+void nx_message_begin_delivery_annotations(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_end_delivery_annotations(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_begin_message_annotations(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_end_message_annotations(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_begin_message_properties(nx_message_t *msg)
+{
+ nx_start_list_performative(msg, 0x73);
+}
+
+
+void nx_message_end_message_properties(nx_message_t *msg)
+{
+ nx_end_list(msg);
+}
+
+
+void nx_message_begin_application_properties(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_end_application_properties(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain)
+{
+ uint32_t len = 0;
+ nx_buffer_t *buf = buf_chain;
+ nx_buffer_t *last = 0;
+ size_t count = 0;
+
+ while (buf) {
+ len += nx_buffer_size(buf);
+ count++;
+ last = buf;
+ buf = DEQ_NEXT(buf);
+ }
+
+ nx_insert(msg, (const uint8_t*) "\x00\x53\x75", 3);
+ if (len < 256) {
+ nx_insert_8(msg, 0xa0); // vbin8
+ nx_insert_8(msg, (uint8_t) len);
+ } else {
+ nx_insert_8(msg, 0xb0); // vbin32
+ nx_insert_32(msg, len);
+ }
+
+ if (len > 0) {
+ buf_chain->prev = msg->buffers.tail;
+ msg->buffers.tail->next = buf_chain;
+ msg->buffers.tail = last;
+ msg->buffers.size += count;
+ }
+}
+
+
+void nx_message_begin_body_sequence(nx_message_t *msg)
+{
+}
+
+
+void nx_message_end_body_sequence(nx_message_t *msg)
+{
+}
+
+
+void nx_message_begin_footer(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_end_footer(nx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void nx_message_insert_null(nx_message_t *msg)
+{
+ nx_insert_8(msg, 0x40);
+ msg->count++;
+}
+
+
+void nx_message_insert_boolean(nx_message_t *msg, int value)
+{
+ if (value)
+ nx_insert(msg, (const uint8_t*) "\x56\x01", 2);
+ else
+ nx_insert(msg, (const uint8_t*) "\x56\x00", 2);
+ msg->count++;
+}
+
+
+void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value)
+{
+ nx_insert_8(msg, 0x50);
+ nx_insert_8(msg, value);
+ msg->count++;
+}
+
+
+void nx_message_insert_uint(nx_message_t *msg, uint32_t value)
+{
+ if (value == 0) {
+ nx_insert_8(msg, 0x43); // uint0
+ } else if (value < 256) {
+ nx_insert_8(msg, 0x52); // smalluint
+ nx_insert_8(msg, (uint8_t) value);
+ } else {
+ nx_insert_8(msg, 0x70); // uint
+ nx_insert_32(msg, value);
+ }
+ msg->count++;
+}
+
+
+void nx_message_insert_ulong(nx_message_t *msg, uint64_t value)
+{
+ if (value == 0) {
+ nx_insert_8(msg, 0x44); // ulong0
+ } else if (value < 256) {
+ nx_insert_8(msg, 0x53); // smallulong
+ nx_insert_8(msg, (uint8_t) value);
+ } else {
+ nx_insert_8(msg, 0x80); // ulong
+ nx_insert_64(msg, value);
+ }
+ msg->count++;
+}
+
+
+void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len)
+{
+ if (len < 256) {
+ nx_insert_8(msg, 0xa0); // vbin8
+ nx_insert_8(msg, (uint8_t) len);
+ } else {
+ nx_insert_8(msg, 0xb0); // vbin32
+ nx_insert_32(msg, len);
+ }
+ nx_insert(msg, start, len);
+ msg->count++;
+}
+
+
+void nx_message_insert_string(nx_message_t *msg, const char *start)
+{
+ uint32_t len = strlen(start);
+
+ if (len < 256) {
+ nx_insert_8(msg, 0xa1); // str8-utf8
+ nx_insert_8(msg, (uint8_t) len);
+ nx_insert(msg, (const uint8_t*) start, len);
+ } else {
+ nx_insert_8(msg, 0xb1); // str32-utf8
+ nx_insert_32(msg, len);
+ nx_insert(msg, (const uint8_t*) start, len);
+ }
+ msg->count++;
+}
+
+
+void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value)
+{
+ nx_insert_8(msg, 0x98); // uuid
+ nx_insert(msg, value, 16);
+ msg->count++;
+}
+
+
+void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len)
+{
+ if (len < 256) {
+ nx_insert_8(msg, 0xa3); // sym8
+ nx_insert_8(msg, (uint8_t) len);
+ nx_insert(msg, (const uint8_t*) start, len);
+ } else {
+ nx_insert_8(msg, 0xb3); // sym32
+ nx_insert_32(msg, len);
+ nx_insert(msg, (const uint8_t*) start, len);
+ }
+ msg->count++;
+}
+
+
+void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value)
+{
+ nx_insert_8(msg, 0x83); // timestamp
+ nx_insert_64(msg, value);
+ msg->count++;
+}
+
+
+unsigned char *nx_buffer_base(nx_buffer_t *buf)
+{
+ return (unsigned char*) &buf[1];
+}
+
+
+unsigned char *nx_buffer_cursor(nx_buffer_t *buf)
+{
+ return ((unsigned char*) &buf[1]) + buf->size;
+}
+
+
+size_t nx_buffer_capacity(nx_buffer_t *buf)
+{
+ return config->buffer_size - buf->size;
+}
+
+
+size_t nx_buffer_size(nx_buffer_t *buf)
+{
+ return buf->size;
+}
+
+
+void nx_buffer_insert(nx_buffer_t *buf, size_t len)
+{
+ buf->size += len;
+ assert(buf->size <= config->buffer_size);
+}
+
diff --git a/qpid/extras/nexus/src/posix/threading.c b/qpid/extras/nexus/src/posix/threading.c
new file mode 100644
index 0000000000..6121151378
--- /dev/null
+++ b/qpid/extras/nexus/src/posix/threading.c
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/threading.h>
+#include <qpid/nexus/ctools.h>
+#include <stdio.h>
+#include <pthread.h>
+
+struct sys_mutex_t {
+ pthread_mutex_t mutex;
+ int acquired;
+};
+
+sys_mutex_t *sys_mutex(void)
+{
+ sys_mutex_t *mutex = NEW(sys_mutex_t);
+ pthread_mutex_init(&(mutex->mutex), 0);
+ mutex->acquired = 0;
+ return mutex;
+}
+
+
+void sys_mutex_free(sys_mutex_t *mutex)
+{
+ assert(!mutex->acquired);
+ pthread_mutex_destroy(&(mutex->mutex));
+ free(mutex);
+}
+
+
+void sys_mutex_lock(sys_mutex_t *mutex)
+{
+ pthread_mutex_lock(&(mutex->mutex));
+ assert(!mutex->acquired);
+ mutex->acquired++;
+}
+
+
+void sys_mutex_unlock(sys_mutex_t *mutex)
+{
+ mutex->acquired--;
+ assert(!mutex->acquired);
+ pthread_mutex_unlock(&(mutex->mutex));
+}
+
+
+struct sys_cond_t {
+ pthread_cond_t cond;
+};
+
+
+sys_cond_t *sys_cond(void)
+{
+ sys_cond_t *cond = NEW(sys_cond_t);
+ pthread_cond_init(&(cond->cond), 0);
+ return cond;
+}
+
+
+void sys_cond_free(sys_cond_t *cond)
+{
+ pthread_cond_destroy(&(cond->cond));
+ free(cond);
+}
+
+
+void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex)
+{
+ assert(held_mutex->acquired);
+ held_mutex->acquired--;
+ pthread_cond_wait(&(cond->cond), &(held_mutex->mutex));
+ held_mutex->acquired++;
+}
+
+
+void sys_cond_signal(sys_cond_t *cond)
+{
+ pthread_cond_signal(&(cond->cond));
+}
+
+
+void sys_cond_signal_all(sys_cond_t *cond)
+{
+ pthread_cond_broadcast(&(cond->cond));
+}
+
+
+struct sys_thread_t {
+ pthread_t thread;
+};
+
+sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg)
+{
+ sys_thread_t *thread = NEW(sys_thread_t);
+ pthread_create(&(thread->thread), 0, run_function, arg);
+ return thread;
+}
+
+
+void sys_thread_free(sys_thread_t *thread)
+{
+ free(thread);
+}
+
+
+void sys_thread_join(sys_thread_t *thread)
+{
+ pthread_join(thread->thread, 0);
+}
+
diff --git a/qpid/extras/nexus/src/server.c b/qpid/extras/nexus/src/server.c
new file mode 100644
index 0000000000..16740b812f
--- /dev/null
+++ b/qpid/extras/nexus/src/server.c
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/threading.h>
+#include <qpid/nexus/log.h>
+#include "server_private.h"
+#include "timer_private.h"
+#include "alloc_private.h"
+#include "auth.h"
+#include "work_queue.h"
+#include <stdio.h>
+#include <time.h>
+#include <signal.h>
+
+static char *module="SERVER";
+
+typedef struct nx_thread_t {
+ int thread_id;
+ volatile int running;
+ volatile int canceled;
+ int using_thread;
+ sys_thread_t *thread;
+} nx_thread_t;
+
+
+typedef struct nx_server_t {
+ int thread_count;
+ pn_driver_t *driver;
+ nx_thread_start_cb_t start_handler;
+ nx_conn_handler_cb_t conn_handler;
+ nx_signal_handler_cb_t signal_handler;
+ nx_user_fd_handler_cb_t ufd_handler;
+ void *start_context;
+ void *conn_context;
+ void *signal_context;
+ sys_cond_t *cond;
+ sys_mutex_t *lock;
+ nx_thread_t **threads;
+ work_queue_t *work_queue;
+ nx_timer_list_t pending_timers;
+ bool a_thread_is_waiting;
+ int threads_active;
+ int pause_requests;
+ int threads_paused;
+ int pause_next_sequence;
+ int pause_now_serving;
+ int pending_signal;
+} nx_server_t;
+
+
+ALLOC_DEFINE(nx_listener_t);
+ALLOC_DEFINE(nx_connector_t);
+ALLOC_DEFINE(nx_connection_t);
+ALLOC_DEFINE(nx_user_fd_t);
+
+
+/**
+ * Singleton Concurrent Proton Driver object
+ */
+static nx_server_t *nx_server = 0;
+
+
+static void signal_handler(int signum)
+{
+ nx_server->pending_signal = signum;
+ sys_cond_signal_all(nx_server->cond);
+}
+
+
+static nx_thread_t *thread(int id)
+{
+ nx_thread_t *thread = NEW(nx_thread_t);
+ if (!thread)
+ return 0;
+
+ thread->thread_id = id;
+ thread->running = 0;
+ thread->canceled = 0;
+ thread->using_thread = 0;
+
+ return thread;
+}
+
+
+static void thread_process_listeners(pn_driver_t *driver)
+{
+ pn_listener_t *listener = pn_driver_listener(driver);
+ pn_connector_t *cxtr;
+ nx_connection_t *ctx;
+
+ while (listener) {
+ nx_log(module, LOG_TRACE, "Accepting Connection");
+ cxtr = pn_listener_accept(listener);
+ ctx = new_nx_connection_t();
+ ctx->state = CONN_STATE_SASL_SERVER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_cxtr = cxtr;
+ ctx->pn_conn = 0;
+ ctx->listener = (nx_listener_t*) pn_listener_context(listener);
+ ctx->connector = 0;
+ ctx->context = ctx->listener->context;
+ ctx->ufd = 0;
+
+ pn_connector_set_context(cxtr, ctx);
+ listener = pn_driver_listener(driver);
+ }
+}
+
+
+static void handle_signals_LH(void)
+{
+ int signum = nx_server->pending_signal;
+
+ if (signum) {
+ nx_server->pending_signal = 0;
+ if (nx_server->signal_handler) {
+ sys_mutex_unlock(nx_server->lock);
+ nx_server->signal_handler(nx_server->signal_context, signum);
+ sys_mutex_lock(nx_server->lock);
+ }
+ }
+}
+
+
+static void block_if_paused_LH(void)
+{
+ if (nx_server->pause_requests > 0) {
+ nx_server->threads_paused++;
+ sys_cond_signal_all(nx_server->cond);
+ while (nx_server->pause_requests > 0)
+ sys_cond_wait(nx_server->cond, nx_server->lock);
+ nx_server->threads_paused--;
+ }
+}
+
+
+static void process_connector(pn_connector_t *cxtr)
+{
+ nx_connection_t *ctx = pn_connector_context(cxtr);
+ int events = 0;
+ int auth_passes = 0;
+
+ if (ctx->state == CONN_STATE_USER) {
+ nx_server->ufd_handler(ctx->ufd->context, ctx->ufd);
+ return;
+ }
+
+ do {
+ //
+ // Step the engine for pre-handler processing
+ //
+ pn_connector_process(cxtr);
+
+ //
+ // Call the handler that is appropriate for the connector's state.
+ //
+ switch (ctx->state) {
+ case CONN_STATE_CONNECTING:
+ if (!pn_connector_closed(cxtr)) {
+ ctx->state = CONN_STATE_SASL_CLIENT;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
+ events = 1;
+ } else {
+ ctx->state = CONN_STATE_FAILED;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_CLIENT:
+ if (auth_passes == 0) {
+ auth_client_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_SERVER:
+ if (auth_passes == 0) {
+ auth_server_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_OPENING:
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, "nexus"); // TODO - make unique
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
+ nx_conn_event_t ce = NX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = NX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = NX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ nx_server->conn_handler(ctx->context, ce, (nx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+
+ case CONN_STATE_OPERATIONAL:
+ if (pn_connector_closed(cxtr)) {
+ nx_server->conn_handler(ctx->context,
+ NX_CONN_EVENT_CLOSE,
+ (nx_connection_t*) pn_connector_context(cxtr));
+ events = 0;
+ }
+ else
+ events = nx_server->conn_handler(ctx->context,
+ NX_CONN_EVENT_PROCESS,
+ (nx_connection_t*) pn_connector_context(cxtr));
+ break;
+
+ default:
+ break;
+ }
+ } while (events > 0);
+}
+
+
+//
+// TEMPORARY FUNCTION PROTOTYPES
+//
+void pn_driver_wait_1(pn_driver_t *d);
+int pn_driver_wait_2(pn_driver_t *d, int timeout);
+void pn_driver_wait_3(pn_driver_t *d);
+//
+// END TEMPORARY
+//
+
+static void *thread_run(void *arg)
+{
+ nx_thread_t *thread = (nx_thread_t*) arg;
+ pn_connector_t *work;
+ pn_connection_t *conn;
+ nx_connection_t *ctx;
+ int error;
+ int poll_result;
+ int timer_holdoff = 0;
+
+ if (!thread)
+ return 0;
+
+ thread->running = 1;
+
+ if (thread->canceled)
+ return 0;
+
+ //
+ // Invoke the start handler if the application supplied one.
+ // This handler can be used to set NUMA or processor affinnity for the thread.
+ //
+ if (nx_server->start_handler)
+ nx_server->start_handler(nx_server->start_context, thread->thread_id);
+
+ //
+ // Main Loop
+ //
+ while (thread->running) {
+ sys_mutex_lock(nx_server->lock);
+
+ //
+ // Check for pending signals to process
+ //
+ handle_signals_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(nx_server->lock);
+ break;
+ }
+
+ //
+ // Check to see if the server is pausing. If so, block here.
+ //
+ block_if_paused_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(nx_server->lock);
+ break;
+ }
+
+ //
+ // Service pending timers.
+ //
+ nx_timer_t *timer = DEQ_HEAD(nx_server->pending_timers);
+ if (timer) {
+ DEQ_REMOVE_HEAD(nx_server->pending_timers);
+
+ //
+ // Mark the timer as idle in case it reschedules itself.
+ //
+ nx_timer_idle_LH(timer);
+
+ //
+ // Release the lock and invoke the connection handler.
+ //
+ sys_mutex_unlock(nx_server->lock);
+ timer->handler(timer->context);
+ pn_driver_wakeup(nx_server->driver);
+ continue;
+ }
+
+ //
+ // Check the work queue for connectors scheduled for processing.
+ //
+ work = work_queue_get(nx_server->work_queue);
+ if (!work) {
+ //
+ // There is no pending work to do
+ //
+ if (nx_server->a_thread_is_waiting) {
+ //
+ // Another thread is waiting on the proton driver, this thread must
+ // wait on the condition variable until signaled.
+ //
+ sys_cond_wait(nx_server->cond, nx_server->lock);
+ } else {
+ //
+ // This thread elects itself to wait on the proton driver. Set the
+ // thread-is-waiting flag so other idle threads will not interfere.
+ //
+ nx_server->a_thread_is_waiting = true;
+
+ //
+ // Ask the timer module when its next timer is scheduled to fire. We'll
+ // use this value in driver_wait as the timeout. If there are no scheduled
+ // timers, the returned value will be -1.
+ //
+ long duration = nx_timer_next_duration_LH();
+
+ //
+ // Invoke the proton driver's wait sequence. This is a bit of a hack for now
+ // and will be improved in the future. The wait process is divided into three parts,
+ // the first and third of which need to be non-reentrant, and the second of which
+ // must be reentrant (and blocks).
+ //
+ pn_driver_wait_1(nx_server->driver);
+ sys_mutex_unlock(nx_server->lock);
+
+ do {
+ error = 0;
+ poll_result = pn_driver_wait_2(nx_server->driver, duration);
+ if (poll_result == -1)
+ error = pn_driver_errno(nx_server->driver);
+ } while (error == PN_INTR);
+ if (error) {
+ nx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(nx_server->driver)));
+ exit(-1);
+ }
+
+ sys_mutex_lock(nx_server->lock);
+ pn_driver_wait_3(nx_server->driver);
+
+ if (!thread->running) {
+ sys_mutex_unlock(nx_server->lock);
+ break;
+ }
+
+ //
+ // Visit the timer module.
+ //
+ if (poll_result == 0 || ++timer_holdoff == 100) {
+ struct timespec tv;
+ clock_gettime(CLOCK_REALTIME, &tv);
+ long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
+ nx_timer_visit_LH(milliseconds);
+ timer_holdoff = 0;
+ }
+
+ //
+ // Process listeners (incoming connections).
+ //
+ thread_process_listeners(nx_server->driver);
+
+ //
+ // Traverse the list of connectors-needing-service from the proton driver.
+ // If the connector is not already in the work queue and it is not currently
+ // being processed by another thread, put it in the work queue and signal the
+ // condition variable.
+ //
+ work = pn_driver_connector(nx_server->driver);
+ while (work) {
+ ctx = pn_connector_context(work);
+ if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->enqueued = 1;
+ work_queue_put(nx_server->work_queue, work);
+ sys_cond_signal(nx_server->cond);
+ }
+ work = pn_driver_connector(nx_server->driver);
+ }
+
+ //
+ // Release our exclusive claim on pn_driver_wait.
+ //
+ nx_server->a_thread_is_waiting = false;
+ }
+ }
+
+ //
+ // If we were given a connector to work on from the work queue, mark it as
+ // owned by this thread and as no longer enqueued.
+ //
+ if (work) {
+ ctx = pn_connector_context(work);
+ if (ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->owner_thread = thread->thread_id;
+ ctx->enqueued = 0;
+ nx_server->threads_active++;
+ } else {
+ //
+ // This connector is being processed by another thread, re-queue it.
+ //
+ work_queue_put(nx_server->work_queue, work);
+ work = 0;
+ }
+ }
+ sys_mutex_unlock(nx_server->lock);
+
+ //
+ // Process the connector that we now have exclusive access to.
+ //
+ if (work) {
+ process_connector(work);
+
+ //
+ // Check to see if the connector was closed during processing
+ //
+ if (pn_connector_closed(work)) {
+ //
+ // Connector is closed. Free the context and the connector.
+ //
+ conn = pn_connector_connection(work);
+ if (ctx->connector) {
+ ctx->connector->ctx = 0;
+ ctx->connector->state = CXTR_STATE_CONNECTING;
+ nx_timer_schedule(ctx->connector->timer, ctx->connector->delay);
+ }
+ sys_mutex_lock(nx_server->lock);
+ free_nx_connection_t(ctx);
+ pn_connector_free(work);
+ if (conn)
+ pn_connection_free(conn);
+ nx_server->threads_active--;
+ sys_mutex_unlock(nx_server->lock);
+ } else {
+ //
+ // The connector lives on. Mark it as no longer owned by this thread.
+ //
+ sys_mutex_lock(nx_server->lock);
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ nx_server->threads_active--;
+ sys_mutex_unlock(nx_server->lock);
+ }
+
+ //
+ // Wake up the proton driver to force it to reconsider its set of FDs
+ // in light of the processing that just occurred.
+ //
+ pn_driver_wakeup(nx_server->driver);
+ }
+ }
+
+ return 0;
+}
+
+
+static void thread_start(nx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->using_thread = 1;
+ thread->thread = sys_thread(thread_run, (void*) thread);
+}
+
+
+static void thread_cancel(nx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->running = 0;
+ thread->canceled = 1;
+}
+
+
+static void thread_join(nx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ if (thread->using_thread)
+ sys_thread_join(thread->thread);
+}
+
+
+static void thread_free(nx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ free(thread);
+}
+
+
+static void cxtr_try_open(void *context)
+{
+ nx_connector_t *ct = (nx_connector_t*) context;
+ if (ct->state != CXTR_STATE_CONNECTING)
+ return;
+
+ nx_connection_t *ctx = new_nx_connection_t();
+ ctx->state = CONN_STATE_CONNECTING;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = ct;
+ ctx->context = ct->context;
+ ctx->user_context = 0;
+ ctx->ufd = 0;
+
+ //
+ // pn_connector is not thread safe
+ //
+ sys_mutex_lock(nx_server->lock);
+ ctx->pn_cxtr = pn_connector(nx_server->driver, ct->config->host, ct->config->port, (void*) ctx);
+ sys_mutex_unlock(nx_server->lock);
+
+ ct->ctx = ctx;
+ ct->delay = 5000;
+ nx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
+}
+
+
+void nx_server_initialize(int thread_count)
+{
+ int i;
+
+ if (nx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ nx_alloc_initialize();
+ nx_server = NEW(nx_server_t);
+
+ if (!nx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ nx_server->thread_count = thread_count;
+ nx_server->driver = pn_driver();
+ nx_server->start_handler = 0;
+ nx_server->conn_handler = 0;
+ nx_server->signal_handler = 0;
+ nx_server->ufd_handler = 0;
+ nx_server->start_context = 0;
+ nx_server->signal_context = 0;
+ nx_server->lock = sys_mutex();
+ nx_server->cond = sys_cond();
+
+ nx_timer_initialize(nx_server->lock);
+
+ nx_server->threads = NEW_PTR_ARRAY(nx_thread_t, thread_count);
+ for (i = 0; i < thread_count; i++)
+ nx_server->threads[i] = thread(i);
+
+ nx_server->work_queue = work_queue();
+ DEQ_INIT(nx_server->pending_timers);
+ nx_server->a_thread_is_waiting = false;
+ nx_server->threads_active = 0;
+ nx_server->pause_requests = 0;
+ nx_server->threads_paused = 0;
+ nx_server->pause_next_sequence = 0;
+ nx_server->pause_now_serving = 0;
+ nx_server->pending_signal = 0;
+}
+
+
+void nx_server_finalize(void)
+{
+ int i;
+ if (!nx_server)
+ return;
+
+ for (i = 0; i < nx_server->thread_count; i++)
+ thread_free(nx_server->threads[i]);
+
+ work_queue_free(nx_server->work_queue);
+
+ pn_driver_free(nx_server->driver);
+ sys_mutex_free(nx_server->lock);
+ sys_cond_free(nx_server->cond);
+ free(nx_server);
+ nx_server = 0;
+}
+
+
+void nx_server_set_conn_handler(nx_conn_handler_cb_t handler)
+{
+ nx_server->conn_handler = handler;
+}
+
+
+void nx_server_set_signal_handler(nx_signal_handler_cb_t handler, void *context)
+{
+ nx_server->signal_handler = handler;
+ nx_server->signal_context = context;
+}
+
+
+void nx_server_set_start_handler(nx_thread_start_cb_t handler, void *context)
+{
+ nx_server->start_handler = handler;
+ nx_server->start_context = context;
+}
+
+
+void nx_server_set_user_fd_handler(nx_user_fd_handler_cb_t ufd_handler)
+{
+ nx_server->ufd_handler = ufd_handler;
+}
+
+
+void nx_server_run(void)
+{
+ int i;
+ if (!nx_server)
+ return;
+
+ assert(nx_server->conn_handler); // Server can't run without a connection handler.
+
+ for (i = 1; i < nx_server->thread_count; i++)
+ thread_start(nx_server->threads[i]);
+
+ nx_log(module, LOG_INFO, "Operational, %d Threads Running", nx_server->thread_count);
+
+ thread_run((void*) nx_server->threads[0]);
+
+ for (i = 1; i < nx_server->thread_count; i++)
+ thread_join(nx_server->threads[i]);
+
+ nx_log(module, LOG_INFO, "Shut Down");
+}
+
+
+void nx_server_stop(void)
+{
+ int idx;
+
+ sys_mutex_lock(nx_server->lock);
+ for (idx = 0; idx < nx_server->thread_count; idx++)
+ thread_cancel(nx_server->threads[idx]);
+ sys_cond_signal_all(nx_server->cond);
+ pn_driver_wakeup(nx_server->driver);
+ sys_mutex_unlock(nx_server->lock);
+}
+
+
+void nx_server_signal(int signum)
+{
+ signal(signum, signal_handler);
+}
+
+
+void nx_server_pause(void)
+{
+ sys_mutex_lock(nx_server->lock);
+
+ //
+ // Bump the request count to stop all the threads.
+ //
+ nx_server->pause_requests++;
+ int my_sequence = nx_server->pause_next_sequence++;
+
+ //
+ // Awaken all threads that are currently blocking.
+ //
+ sys_cond_signal_all(nx_server->cond);
+ pn_driver_wakeup(nx_server->driver);
+
+ //
+ // Wait for the paused thread count plus the number of threads requesting a pause to equal
+ // the total thread count. Also, don't exit the blocking loop until now_serving equals our
+ // sequence number. This ensures that concurrent pausers don't run at the same time.
+ //
+ while ((nx_server->threads_paused + nx_server->pause_requests < nx_server->thread_count) ||
+ (my_sequence != nx_server->pause_now_serving))
+ sys_cond_wait(nx_server->cond, nx_server->lock);
+
+ sys_mutex_unlock(nx_server->lock);
+}
+
+
+void nx_server_resume(void)
+{
+ sys_mutex_lock(nx_server->lock);
+ nx_server->pause_requests--;
+ nx_server->pause_now_serving++;
+ sys_cond_signal_all(nx_server->cond);
+ sys_mutex_unlock(nx_server->lock);
+}
+
+
+void nx_server_activate(nx_connection_t *ctx)
+{
+ if (!ctx)
+ return;
+
+ pn_connector_t *ctor = ctx->pn_cxtr;
+ if (!ctor)
+ return;
+
+ if (!pn_connector_closed(ctor))
+ pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE);
+}
+
+
+void nx_connection_set_context(nx_connection_t *conn, void *context)
+{
+ conn->user_context = context;
+}
+
+
+void *nx_connection_get_context(nx_connection_t *conn)
+{
+ return conn->user_context;
+}
+
+
+pn_connection_t *nx_connection_pn(nx_connection_t *conn)
+{
+ return conn->pn_conn;
+}
+
+
+nx_listener_t *nx_server_listen(const nx_server_config_t *config, void *context)
+{
+ nx_listener_t *li = new_nx_listener_t();
+
+ if (!li)
+ return 0;
+
+ li->config = config;
+ li->context = context;
+ li->pn_listener = pn_listener(nx_server->driver, config->host, config->port, (void*) li);
+
+ if (!li->pn_listener) {
+ nx_log(module, LOG_ERROR, "Driver Error %d (%s)",
+ pn_driver_errno(nx_server->driver), pn_driver_error(nx_server->driver));
+ free_nx_listener_t(li);
+ return 0;
+ }
+ nx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);
+
+ return li;
+}
+
+
+void nx_server_listener_free(nx_listener_t* li)
+{
+ pn_listener_free(li->pn_listener);
+ free_nx_listener_t(li);
+}
+
+
+void nx_server_listener_close(nx_listener_t* li)
+{
+ pn_listener_close(li->pn_listener);
+}
+
+
+nx_connector_t *nx_server_connect(const nx_server_config_t *config, void *context)
+{
+ nx_connector_t *ct = new_nx_connector_t();
+
+ if (!ct)
+ return 0;
+
+ ct->state = CXTR_STATE_CONNECTING;
+ ct->config = config;
+ ct->context = context;
+ ct->ctx = 0;
+ ct->timer = nx_timer(cxtr_try_open, (void*) ct);
+ ct->delay = 0;
+
+ nx_timer_schedule(ct->timer, ct->delay);
+ return ct;
+}
+
+
+void nx_server_connector_free(nx_connector_t* ct)
+{
+ // Don't free the proton connector. This will be done by the connector
+ // processing/cleanup.
+
+ if (ct->ctx) {
+ pn_connector_close(ct->ctx->pn_cxtr);
+ ct->ctx->connector = 0;
+ }
+
+ nx_timer_free(ct->timer);
+ free_nx_connector_t(ct);
+}
+
+
+nx_user_fd_t *nx_user_fd(int fd, void *context)
+{
+ nx_user_fd_t *ufd = new_nx_user_fd_t();
+
+ if (!ufd)
+ return 0;
+
+ nx_connection_t *ctx = new_nx_connection_t();
+ ctx->state = CONN_STATE_USER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = 0;
+ ctx->context = 0;
+ ctx->user_context = 0;
+ ctx->ufd = ufd;
+
+ ufd->context = context;
+ ufd->fd = fd;
+ ufd->pn_conn = pn_connector_fd(nx_server->driver, fd, (void*) ctx);
+ pn_driver_wakeup(nx_server->driver);
+
+ return ufd;
+}
+
+
+void nx_user_fd_free(nx_user_fd_t *ufd)
+{
+ pn_connector_close(ufd->pn_conn);
+ free_nx_user_fd_t(ufd);
+}
+
+
+void nx_user_fd_activate_read(nx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
+ pn_driver_wakeup(nx_server->driver);
+}
+
+
+void nx_user_fd_activate_write(nx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+ pn_driver_wakeup(nx_server->driver);
+}
+
+
+bool nx_user_fd_is_readable(nx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE);
+}
+
+
+bool nx_user_fd_is_writeable(nx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+}
+
+
+void nx_server_timer_pending_LH(nx_timer_t *timer)
+{
+ DEQ_INSERT_TAIL(nx_server->pending_timers, timer);
+}
+
+
+void nx_server_timer_cancel_LH(nx_timer_t *timer)
+{
+ DEQ_REMOVE(nx_server->pending_timers, timer);
+}
+
diff --git a/qpid/extras/nexus/src/server_private.h b/qpid/extras/nexus/src/server_private.h
new file mode 100644
index 0000000000..a7f0a18ef7
--- /dev/null
+++ b/qpid/extras/nexus/src/server_private.h
@@ -0,0 +1,95 @@
+#ifndef __server_private_h__
+#define __server_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/server.h>
+#include <qpid/nexus/user_fd.h>
+#include <qpid/nexus/timer.h>
+#include <qpid/nexus/alloc.h>
+#include <proton/driver.h>
+
+void nx_server_timer_pending_LH(nx_timer_t *timer);
+void nx_server_timer_cancel_LH(nx_timer_t *timer);
+
+
+typedef enum {
+ CONN_STATE_CONNECTING = 0,
+ CONN_STATE_SASL_CLIENT,
+ CONN_STATE_SASL_SERVER,
+ CONN_STATE_OPENING,
+ CONN_STATE_OPERATIONAL,
+ CONN_STATE_FAILED,
+ CONN_STATE_USER
+} conn_state_t;
+
+#define CONTEXT_NO_OWNER -1
+
+typedef enum {
+ CXTR_STATE_CONNECTING = 0,
+ CXTR_STATE_OPEN,
+ CXTR_STATE_FAILED
+} cxtr_state_t;
+
+
+struct nx_listener_t {
+ const nx_server_config_t *config;
+ void *context;
+ pn_listener_t *pn_listener;
+};
+
+
+struct nx_connector_t {
+ cxtr_state_t state;
+ const nx_server_config_t *config;
+ void *context;
+ nx_connection_t *ctx;
+ nx_timer_t *timer;
+ long delay;
+};
+
+
+struct nx_connection_t {
+ conn_state_t state;
+ int owner_thread;
+ int enqueued;
+ pn_connector_t *pn_cxtr;
+ pn_connection_t *pn_conn;
+ nx_listener_t *listener;
+ nx_connector_t *connector;
+ void *context; // Copy of context from listener or connector
+ void *user_context;
+ nx_user_fd_t *ufd;
+};
+
+
+struct nx_user_fd_t {
+ void *context;
+ int fd;
+ pn_connector_t *pn_conn;
+};
+
+
+ALLOC_DECLARE(nx_listener_t);
+ALLOC_DECLARE(nx_connector_t);
+ALLOC_DECLARE(nx_connection_t);
+ALLOC_DECLARE(nx_user_fd_t);
+
+
+#endif
diff --git a/qpid/extras/nexus/src/timer.c b/qpid/extras/nexus/src/timer.c
new file mode 100644
index 0000000000..81b531305d
--- /dev/null
+++ b/qpid/extras/nexus/src/timer.c
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "timer_private.h"
+#include "server_private.h"
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/threading.h>
+#include <assert.h>
+#include <stdio.h>
+
+static sys_mutex_t *lock;
+static nx_timer_list_t free_list;
+static nx_timer_list_t idle_timers;
+static nx_timer_list_t scheduled_timers;
+static long time_base;
+
+
+//=========================================================================
+// Private static functions
+//=========================================================================
+
+static void nx_timer_cancel_LH(nx_timer_t *timer)
+{
+ switch (timer->state) {
+ case TIMER_FREE:
+ assert(0);
+ break;
+
+ case TIMER_IDLE:
+ break;
+
+ case TIMER_SCHEDULED:
+ if (timer->next)
+ timer->next->delta_time += timer->delta_time;
+ DEQ_REMOVE(scheduled_timers, timer);
+ DEQ_INSERT_TAIL(idle_timers, timer);
+ break;
+
+ case TIMER_PENDING:
+ nx_server_timer_cancel_LH(timer);
+ break;
+ }
+
+ timer->state = TIMER_IDLE;
+}
+
+
+//=========================================================================
+// Public Functions from timer.h
+//=========================================================================
+
+nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context)
+{
+ nx_timer_t *timer;
+
+ sys_mutex_lock(lock);
+
+ timer = DEQ_HEAD(free_list);
+ if (timer) {
+ DEQ_REMOVE_HEAD(free_list);
+ } else {
+ timer = NEW(nx_timer_t);
+ DEQ_ITEM_INIT(timer);
+ }
+
+ if (timer) {
+ timer->handler = cb;
+ timer->context = context;
+ timer->delta_time = 0;
+ timer->state = TIMER_IDLE;
+ DEQ_INSERT_TAIL(idle_timers, timer);
+ }
+
+ sys_mutex_unlock(lock);
+ return timer;
+}
+
+
+void nx_timer_free(nx_timer_t *timer)
+{
+ sys_mutex_lock(lock);
+ nx_timer_cancel_LH(timer);
+ DEQ_REMOVE(idle_timers, timer);
+ DEQ_INSERT_TAIL(free_list, timer);
+ timer->state = TIMER_FREE;
+ sys_mutex_unlock(lock);
+}
+
+
+void nx_timer_schedule(nx_timer_t *timer, long duration)
+{
+ nx_timer_t *ptr;
+ nx_timer_t *last;
+ long total_time;
+
+ sys_mutex_lock(lock);
+ nx_timer_cancel_LH(timer); // Timer is now on the idle list
+ assert(timer->state == TIMER_IDLE);
+ DEQ_REMOVE(idle_timers, timer);
+
+ //
+ // Handle the special case of a zero-time scheduling. In this case,
+ // the timer doesn't go on the scheduled list. It goes straight to the
+ // pending list in the server.
+ //
+ if (duration == 0) {
+ timer->state = TIMER_PENDING;
+ nx_server_timer_pending_LH(timer);
+ sys_mutex_unlock(lock);
+ return;
+ }
+
+ //
+ // Find the insert point in the schedule.
+ //
+ total_time = 0;
+ ptr = DEQ_HEAD(scheduled_timers);
+ assert(!ptr || ptr->prev == 0);
+ while (ptr) {
+ total_time += ptr->delta_time;
+ if (total_time > duration)
+ break;
+ ptr = ptr->next;
+ }
+
+ //
+ // Insert the timer into the schedule and adjust the delta time
+ // of the following timer if present.
+ //
+ if (total_time <= duration) {
+ assert(ptr == 0);
+ timer->delta_time = duration - total_time;
+ DEQ_INSERT_TAIL(scheduled_timers, timer);
+ } else {
+ total_time -= ptr->delta_time;
+ timer->delta_time = duration - total_time;
+ assert(ptr->delta_time > timer->delta_time);
+ ptr->delta_time -= timer->delta_time;
+ last = ptr->prev;
+ if (last)
+ DEQ_INSERT_AFTER(scheduled_timers, timer, last);
+ else
+ DEQ_INSERT_HEAD(scheduled_timers, timer);
+ }
+
+ timer->state = TIMER_SCHEDULED;
+
+ sys_mutex_unlock(lock);
+}
+
+
+void nx_timer_cancel(nx_timer_t *timer)
+{
+ sys_mutex_lock(lock);
+ nx_timer_cancel_LH(timer);
+ sys_mutex_unlock(lock);
+}
+
+
+//=========================================================================
+// Private Functions from timer_private.h
+//=========================================================================
+
+void nx_timer_initialize(sys_mutex_t *server_lock)
+{
+ lock = server_lock;
+ DEQ_INIT(free_list);
+ DEQ_INIT(idle_timers);
+ DEQ_INIT(scheduled_timers);
+ time_base = 0;
+}
+
+
+void nx_timer_finalize(void)
+{
+ lock = 0;
+}
+
+
+long nx_timer_next_duration_LH(void)
+{
+ nx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+ if (timer)
+ return timer->delta_time;
+ return -1;
+}
+
+
+void nx_timer_visit_LH(long current_time)
+{
+ long delta;
+ nx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+
+ if (time_base == 0) {
+ time_base = current_time;
+ return;
+ }
+
+ delta = current_time - time_base;
+ time_base = current_time;
+
+ while (timer) {
+ assert(delta >= 0);
+ if (timer->delta_time > delta) {
+ timer->delta_time -= delta;
+ break;
+ } else {
+ DEQ_REMOVE_HEAD(scheduled_timers);
+ delta -= timer->delta_time;
+ timer->state = TIMER_PENDING;
+ nx_server_timer_pending_LH(timer);
+
+ }
+ timer = DEQ_HEAD(scheduled_timers);
+ }
+}
+
+
+void nx_timer_idle_LH(nx_timer_t *timer)
+{
+ timer->state = TIMER_IDLE;
+ DEQ_INSERT_TAIL(idle_timers, timer);
+}
+
diff --git a/qpid/extras/nexus/src/timer_private.h b/qpid/extras/nexus/src/timer_private.h
new file mode 100644
index 0000000000..fa9891953f
--- /dev/null
+++ b/qpid/extras/nexus/src/timer_private.h
@@ -0,0 +1,51 @@
+#ifndef __timer_private_h__
+#define __timer_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/timer.h>
+#include <qpid/nexus/threading.h>
+
+typedef enum {
+ TIMER_FREE,
+ TIMER_IDLE,
+ TIMER_SCHEDULED,
+ TIMER_PENDING
+} nx_timer_state_t;
+
+
+struct nx_timer_t {
+ DEQ_LINKS(nx_timer_t);
+ nx_timer_cb_t handler;
+ void *context;
+ long delta_time;
+ nx_timer_state_t state;
+};
+
+DEQ_DECLARE(nx_timer_t, nx_timer_list_t);
+
+void nx_timer_initialize(sys_mutex_t *server_lock);
+void nx_timer_finalize(void);
+long nx_timer_next_duration_LH(void);
+void nx_timer_visit_LH(long current_time);
+void nx_timer_idle_LH(nx_timer_t *timer);
+
+
+#endif
diff --git a/qpid/extras/nexus/src/work_queue.c b/qpid/extras/nexus/src/work_queue.c
new file mode 100644
index 0000000000..b9555b3cb2
--- /dev/null
+++ b/qpid/extras/nexus/src/work_queue.c
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/ctools.h>
+#include "work_queue.h"
+#include <string.h>
+#include <stdio.h>
+
+#define BATCH_SIZE 100
+typedef struct work_item_t work_item_t;
+
+struct work_item_t {
+ DEQ_LINKS(work_item_t);
+ pn_connector_t *conn;
+};
+
+DEQ_DECLARE(work_item_t, work_list_t);
+
+struct work_queue_t {
+ work_list_t items;
+ work_list_t free_list;
+};
+
+static void allocate_batch(work_queue_t *w)
+{
+ int i;
+ work_item_t *batch = NEW_ARRAY(work_item_t, BATCH_SIZE);
+ if (!batch)
+ return;
+
+ memset(batch, 0, sizeof(work_item_t) * BATCH_SIZE);
+
+ for (i = 0; i < BATCH_SIZE; i++)
+ DEQ_INSERT_TAIL(w->free_list, &batch[i]);
+}
+
+
+work_queue_t *work_queue(void)
+{
+ work_queue_t *w = NEW(work_queue_t);
+ if (!w)
+ return 0;
+
+ DEQ_INIT(w->items);
+ DEQ_INIT(w->free_list);
+
+ allocate_batch(w);
+
+ return w;
+}
+
+
+void work_queue_free(work_queue_t *w)
+{
+ if (!w)
+ return;
+
+ // KEEP TRACK OF BATCHES AND FREE
+ free(w);
+}
+
+
+void work_queue_put(work_queue_t *w, pn_connector_t *conn)
+{
+ work_item_t *item;
+
+ if (!w)
+ return;
+ if (DEQ_SIZE(w->free_list) == 0)
+ allocate_batch(w);
+ if (DEQ_SIZE(w->free_list) == 0)
+ return;
+
+ item = DEQ_HEAD(w->free_list);
+ DEQ_REMOVE_HEAD(w->free_list);
+
+ item->conn = conn;
+
+ DEQ_INSERT_TAIL(w->items, item);
+}
+
+
+pn_connector_t *work_queue_get(work_queue_t *w)
+{
+ work_item_t *item;
+ pn_connector_t *conn;
+
+ if (!w)
+ return 0;
+ item = DEQ_HEAD(w->items);
+ if (!item)
+ return 0;
+
+ DEQ_REMOVE_HEAD(w->items);
+ conn = item->conn;
+ item->conn = 0;
+
+ DEQ_INSERT_TAIL(w->free_list, item);
+
+ return conn;
+}
+
+
+int work_queue_empty(work_queue_t *w)
+{
+ return !w || DEQ_SIZE(w->items) == 0;
+}
+
+
+int work_queue_depth(work_queue_t *w)
+{
+ if (!w)
+ return 0;
+ return DEQ_SIZE(w->items);
+}
+
diff --git a/qpid/extras/nexus/src/work_queue.h b/qpid/extras/nexus/src/work_queue.h
new file mode 100644
index 0000000000..597a484a9c
--- /dev/null
+++ b/qpid/extras/nexus/src/work_queue.h
@@ -0,0 +1,33 @@
+#ifndef __work_queue_h__
+#define __work_queue_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/driver.h>
+
+typedef struct work_queue_t work_queue_t;
+
+work_queue_t *work_queue(void);
+void work_queue_free(work_queue_t *w);
+void work_queue_put(work_queue_t *w, pn_connector_t *conn);
+pn_connector_t *work_queue_get(work_queue_t *w);
+int work_queue_empty(work_queue_t *w);
+int work_queue_depth(work_queue_t *w);
+
+#endif
diff --git a/qpid/extras/nexus/tests/CMakeLists.txt b/qpid/extras/nexus/tests/CMakeLists.txt
new file mode 100644
index 0000000000..383c3c9919
--- /dev/null
+++ b/qpid/extras/nexus/tests/CMakeLists.txt
@@ -0,0 +1,34 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License.
+##
+
+##
+## Build test applications
+##
+set(test_SOURCES
+ alloc_test.c
+ message_test.c
+ run_tests.c
+ server_test.c
+ timer_test.c
+ tool_test.c
+ )
+
+add_executable(run_tests ${test_SOURCES})
+target_link_libraries(run_tests qpid-nexus)
+
diff --git a/qpid/extras/nexus/tests/alloc_test.c b/qpid/extras/nexus/tests/alloc_test.c
new file mode 100644
index 0000000000..02f48af7e7
--- /dev/null
+++ b/qpid/extras/nexus/tests/alloc_test.c
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include "alloc_private.h"
+
+typedef struct {
+ int A;
+ int B;
+} object_t;
+
+nx_alloc_config_t config = {3, 7, 10};
+
+ALLOC_DECLARE(object_t);
+ALLOC_DEFINE_CONFIG(object_t, &config);
+
+
+static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)
+{
+ if (stats->total_alloc_from_heap != ah) return "Incorrect alloc-from-heap";
+ if (stats->total_free_to_heap != fh) return "Incorrect free-to-heap";
+ if (stats->held_by_threads != ht) return "Incorrect held-by-threads";
+ if (stats->batches_rebalanced_to_threads != rt) return "Incorrect rebalance-to-threads";
+ if (stats->batches_rebalanced_to_global != rg) return "Incorrect rebalance-to-global";
+ return 0;
+}
+
+
+static char* test_alloc_basic(void *context)
+{
+ object_t *obj[50];
+ int idx;
+ nx_alloc_stats_t *stats;
+ char *error;
+
+ for (idx = 0; idx < 20; idx++)
+ obj[idx] = new_object_t();
+
+ stats = alloc_stats_object_t();
+ error = check_stats(stats, 21, 0, 21, 0, 0);
+ if (error) return error;
+
+ for (idx = 0; idx < 20; idx++)
+ free_object_t(obj[idx]);
+
+ error = check_stats(stats, 21, 5, 6, 0, 5);
+ if (error) return error;
+
+ for (idx = 0; idx < 20; idx++)
+ obj[idx] = new_object_t();
+
+ error = check_stats(stats, 27, 5, 21, 3, 5);
+ if (error) return error;
+
+ return 0;
+}
+
+
+int alloc_tests(void)
+{
+ int result = 0;
+ nx_alloc_initialize();
+
+ TEST_CASE(test_alloc_basic, 0);
+
+ return result;
+}
+
diff --git a/qpid/extras/nexus/tests/message_test.c b/qpid/extras/nexus/tests/message_test.c
new file mode 100644
index 0000000000..e9a4f01636
--- /dev/null
+++ b/qpid/extras/nexus/tests/message_test.c
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include <qpid/nexus/message.h>
+#include <qpid/nexus/iterator.h>
+#include <proton/message.h>
+
+
+static char* test_init(void *context)
+{
+ nx_allocator_initialize(nx_allocator_default_config());
+ nx_allocator_finalize();
+ return 0;
+}
+
+
+static char* test_send_to_messenger(void *context)
+{
+ nx_allocator_initialize(nx_allocator_default_config());
+
+ nx_message_t *msg = nx_allocate_message();
+ nx_message_compose_1(msg, "test_addr_0", 0);
+ nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
+ if (buf == 0) return "Expected a buffer in the test message";
+
+ pn_message_t *pn_msg = pn_message();
+ int result = pn_message_decode(pn_msg, (const char*) nx_buffer_base(buf), nx_buffer_size(buf));
+ if (result != 0) return "Error in pn_message_decode";
+
+ if (strcmp(pn_message_get_address(pn_msg), "test_addr_0") != 0)
+ return "Address mismatch in received message";
+
+ pn_message_free(pn_msg);
+ nx_free_message(msg);
+
+ nx_allocator_finalize();
+ return 0;
+}
+
+
+static char* test_receive_from_messenger(void *context)
+{
+ nx_allocator_initialize(nx_allocator_default_config());
+
+ pn_message_t *pn_msg = pn_message();
+ pn_message_set_address(pn_msg, "test_addr_1");
+
+ nx_buffer_t *buf = nx_allocate_buffer();
+ size_t size = nx_buffer_capacity(buf);
+ int result = pn_message_encode(pn_msg, (char*) nx_buffer_cursor(buf), &size);
+ if (result != 0) return "Error in pn_message_encode";
+ nx_buffer_insert(buf, size);
+
+ nx_message_t *msg = nx_allocate_message();
+ DEQ_INSERT_TAIL(msg->buffers, buf);
+ int valid = nx_message_check(msg, NX_DEPTH_ALL);
+ if (!valid) return "nx_message_check returns 'invalid'";
+
+ nx_field_iterator_t *iter = nx_message_field_to(msg);
+ if (iter == 0) return "Expected an iterator for the 'to' field";
+
+ if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1"))
+ return "Mismatched 'to' field contents";
+
+ pn_message_free(pn_msg);
+ nx_free_message(msg);
+
+ nx_allocator_finalize();
+ return 0;
+}
+
+
+static char* test_insufficient_check_depth(void *context)
+{
+ nx_allocator_initialize(nx_allocator_default_config());
+
+ pn_message_t *pn_msg = pn_message();
+ pn_message_set_address(pn_msg, "test_addr_2");
+
+ nx_buffer_t *buf = nx_allocate_buffer();
+ size_t size = nx_buffer_capacity(buf);
+ int result = pn_message_encode(pn_msg, (char*) nx_buffer_cursor(buf), &size);
+ if (result != 0) return "Error in pn_message_encode";
+ nx_buffer_insert(buf, size);
+
+ nx_message_t *msg = nx_allocate_message();
+ DEQ_INSERT_TAIL(msg->buffers, buf);
+ int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS);
+ if (!valid) return "nx_message_check returns 'invalid'";
+
+ nx_field_iterator_t *iter = nx_message_field_to(msg);
+ if (iter) return "Expected no iterator for the 'to' field";
+
+ nx_free_message(msg);
+
+ nx_allocator_finalize();
+ return 0;
+}
+
+
+int message_tests(void)
+{
+ int result = 0;
+
+ TEST_CASE(test_init, 0);
+ TEST_CASE(test_send_to_messenger, 0);
+ TEST_CASE(test_receive_from_messenger, 0);
+ TEST_CASE(test_insufficient_check_depth, 0);
+
+ return result;
+}
+
diff --git a/qpid/extras/nexus/tests/run_tests.c b/qpid/extras/nexus/tests/run_tests.c
new file mode 100644
index 0000000000..a677c04577
--- /dev/null
+++ b/qpid/extras/nexus/tests/run_tests.c
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+int tool_tests();
+int timer_tests();
+int alloc_tests();
+int server_tests();
+int message_tests();
+
+int main(int argc, char** argv)
+{
+ int result = 0;
+ result += tool_tests();
+ result += timer_tests();
+ result += alloc_tests();
+ result += server_tests();
+ result += message_tests();
+ return result;
+}
+
diff --git a/qpid/extras/nexus/tests/server_test.c b/qpid/extras/nexus/tests/server_test.c
new file mode 100644
index 0000000000..29cd70eeb3
--- /dev/null
+++ b/qpid/extras/nexus/tests/server_test.c
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+#include <qpid/nexus/timer.h>
+#include "test_case.h"
+#include <qpid/nexus/server.h>
+#include <qpid/nexus/user_fd.h>
+#include <qpid/nexus/threading.h>
+#include <qpid/nexus/log.h>
+
+#define THREAD_COUNT 4
+#define OCTET_COUNT 100
+
+static sys_mutex_t *test_lock;
+
+static void *expected_context;
+static int call_count;
+static int threads_seen[THREAD_COUNT];
+static char stored_error[512];
+
+static int write_count;
+static int read_count;
+static int fd[2];
+static nx_user_fd_t *ufd_write;
+static nx_user_fd_t *ufd_read;
+
+
+static void thread_start(void *context, int thread_id)
+{
+ sys_mutex_lock(test_lock);
+ if (context != expected_context && !stored_error[0])
+ sprintf(stored_error, "Unexpected Context Value: %lx", (long) context);
+ if (thread_id >= THREAD_COUNT && !stored_error[0])
+ sprintf(stored_error, "Thread_ID too large: %d", thread_id);
+ if (thread_id < 0 && !stored_error[0])
+ sprintf(stored_error, "Thread_ID negative: %d", thread_id);
+
+ call_count++;
+ if (thread_id >= 0 && thread_id < THREAD_COUNT)
+ threads_seen[thread_id]++;
+
+ if (call_count == THREAD_COUNT)
+ nx_server_stop();
+ sys_mutex_unlock(test_lock);
+}
+
+
+static int conn_handler(void *context, nx_conn_event_t event, nx_connection_t *conn)
+{
+ return 0;
+}
+
+
+static void ufd_handler(void *context, nx_user_fd_t *ufd)
+{
+ long dir = (long) context;
+ char buffer;
+ ssize_t len;
+ static int in_read = 0;
+ static int in_write = 0;
+
+ if (dir == 0) { // READ
+ in_read++;
+ assert(in_read == 1);
+ if (!nx_user_fd_is_readable(ufd_read)) {
+ sprintf(stored_error, "Expected Readable");
+ nx_server_stop();
+ } else {
+ len = read(fd[0], &buffer, 1);
+ if (len == 1) {
+ read_count++;
+ if (read_count == OCTET_COUNT)
+ nx_server_stop();
+ }
+ nx_user_fd_activate_read(ufd_read);
+ }
+ in_read--;
+ } else { // WRITE
+ in_write++;
+ assert(in_write == 1);
+ if (!nx_user_fd_is_writeable(ufd_write)) {
+ sprintf(stored_error, "Expected Writable");
+ nx_server_stop();
+ } else {
+ write(fd[1], "X", 1);
+
+ write_count++;
+ if (write_count < OCTET_COUNT)
+ nx_user_fd_activate_write(ufd_write);
+ }
+ in_write--;
+ }
+}
+
+
+static void fd_test_start(void *context)
+{
+ nx_user_fd_activate_read(ufd_read);
+}
+
+
+static char* test_start_handler(void *context)
+{
+ int i;
+
+ nx_server_initialize(THREAD_COUNT);
+
+ expected_context = (void*) 0x00112233;
+ stored_error[0] = 0x0;
+ call_count = 0;
+ for (i = 0; i < THREAD_COUNT; i++)
+ threads_seen[i] = 0;
+
+ nx_server_set_conn_handler(conn_handler);
+ nx_server_set_start_handler(thread_start, expected_context);
+ nx_server_run();
+ nx_server_finalize();
+
+ if (stored_error[0]) return stored_error;
+ if (call_count != THREAD_COUNT) return "Incorrect number of thread-start callbacks";
+ for (i = 0; i < THREAD_COUNT; i++)
+ if (threads_seen[i] != 1) return "Incorrect count on one thread ID";
+
+ return 0;
+}
+
+
+static char* test_user_fd(void *context)
+{
+ int res;
+ nx_timer_t *timer;
+
+ nx_server_initialize(THREAD_COUNT);
+ nx_server_set_conn_handler(conn_handler);
+ nx_server_set_user_fd_handler(ufd_handler);
+ timer = nx_timer(fd_test_start, 0);
+ nx_timer_schedule(timer, 0);
+
+ stored_error[0] = 0x0;
+ res = pipe2(fd, O_NONBLOCK);
+ if (res != 0) return "Error creating pipe2";
+
+ ufd_write = nx_user_fd(fd[1], (void*) 1);
+ ufd_read = nx_user_fd(fd[0], (void*) 0);
+
+ nx_server_run();
+ nx_timer_free(timer);
+ nx_server_finalize();
+ close(fd[0]);
+ close(fd[1]);
+
+ if (stored_error[0]) return stored_error;
+ if (write_count - OCTET_COUNT > 2) sprintf(stored_error, "Excessively high Write Count: %d", write_count);
+ if (read_count != OCTET_COUNT) sprintf(stored_error, "Incorrect Read Count: %d", read_count);;
+
+ if (stored_error[0]) return stored_error;
+ return 0;
+}
+
+
+int server_tests(void)
+{
+ int result = 0;
+ test_lock = sys_mutex();
+ nx_log_set_mask(LOG_NONE);
+
+ TEST_CASE(test_start_handler, 0);
+ TEST_CASE(test_user_fd, 0);
+
+ sys_mutex_free(test_lock);
+ return result;
+}
+
diff --git a/qpid/extras/nexus/tests/test_case.h b/qpid/extras/nexus/tests/test_case.h
new file mode 100644
index 0000000000..6e36b440a5
--- /dev/null
+++ b/qpid/extras/nexus/tests/test_case.h
@@ -0,0 +1,36 @@
+#ifndef _nexus_test_case_h_
+#define _nexus_test_case_h_ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef char* (*testcase_t)(void *context);
+
+#define TEST_CASE(T,C) do { \
+ char *r = T(C); \
+ printf("Test Case %s.%s: ", __FUNCTION__, #T); \
+ if (r) { \
+ printf("FAIL: %s\n", r); \
+ result++; \
+ } else \
+ printf("PASS\n"); \
+} while(0);
+
+
+#endif
+
diff --git a/qpid/extras/nexus/tests/timer_test.c b/qpid/extras/nexus/tests/timer_test.c
new file mode 100644
index 0000000000..f50f9367ea
--- /dev/null
+++ b/qpid/extras/nexus/tests/timer_test.c
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/nexus/timer.h>
+#include "timer_private.h"
+#include "test_case.h"
+#include <qpid/nexus/threading.h>
+
+
+static unsigned long fire_mask;
+static nx_timer_list_t pending_timers;
+static sys_mutex_t *lock;
+static long time;
+static nx_timer_t *timers[16];
+
+
+void nx_server_timer_pending_LH(nx_timer_t *timer)
+{
+ DEQ_INSERT_TAIL(pending_timers, timer);
+}
+
+
+void nx_server_timer_cancel_LH(nx_timer_t *timer)
+{
+ if (timer->state == TIMER_PENDING)
+ DEQ_REMOVE(pending_timers, timer);
+}
+
+
+static int fire_head()
+{
+ sys_mutex_lock(lock);
+ int result = DEQ_SIZE(pending_timers);
+ nx_timer_t *timer = DEQ_HEAD(pending_timers);
+ if (timer) {
+ DEQ_REMOVE_HEAD(pending_timers);
+ nx_timer_idle_LH(timer);
+ fire_mask |= (unsigned long) timer->context;
+ }
+ sys_mutex_unlock(lock);
+ return result;
+}
+
+
+static char* test_quiet(void *context)
+{
+ fire_mask = 0;
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+
+ while(fire_head());
+
+ if (fire_mask != 0)
+ return "Expected zero timers fired";
+ return 0;
+}
+
+static char* test_immediate(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ nx_timer_schedule(timers[0], 0);
+
+ if (fire_mask != 0) return "Premature firing";
+ if (fire_head() > 1) return "Too many firings";
+ if (fire_mask != 1) return "Incorrect fire mask";
+
+ return 0;
+}
+
+
+static char* test_immediate_plus_delayed(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ nx_timer_schedule(timers[0], 0);
+ nx_timer_schedule(timers[1], 5);
+
+ if (fire_mask != 0) return "Premature firing";
+ if (fire_head() > 1) return "Too many firings";
+ if (fire_mask != 1) return "Incorrect fire mask 1";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ time += 8;
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+
+ if (fire_head() < 1) return "Delayed Failed to fire";
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ return 0;
+}
+
+
+static char* test_single(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ nx_timer_schedule(timers[0], 2);
+ if (fire_head() > 0) return "Premature firing 1";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() > 0) return "Premature firing 2";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() < 1) return "Failed to fire";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() != 0) return "Spurious fires";
+
+ if (fire_mask != 1) return "Incorrect fire mask";
+ if (timers[0]->state != TIMER_IDLE) return "Expected idle timer state";
+
+ return 0;
+}
+
+
+static char* test_two_inorder(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ nx_timer_schedule(timers[0], 2);
+ nx_timer_schedule(timers[1], 4);
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ int count = fire_head();
+ if (count < 1) return "First failed to fire";
+ if (count > 1) return "Second fired prematurely";
+ if (fire_mask != 1) return "Incorrect fire mask 1";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() < 1) return "Second failed to fire";
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ return 0;
+}
+
+
+static char* test_two_reverse(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ nx_timer_schedule(timers[0], 4);
+ nx_timer_schedule(timers[1], 2);
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ int count = fire_head();
+ if (count < 1) return "First failed to fire";
+ if (count > 1) return "Second fired prematurely";
+ if (fire_mask != 2) return "Incorrect fire mask 2";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() < 1) return "Second failed to fire";
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ return 0;
+}
+
+
+static char* test_two_duplicate(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ nx_timer_schedule(timers[0], 2);
+ nx_timer_schedule(timers[1], 2);
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ int count = fire_head();
+ if (count != 2) return "Expected two firings";
+ fire_head();
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() > 0) return "Spurious timer fires";
+
+ return 0;
+}
+
+
+static char* test_separated(void *context)
+{
+ int count;
+
+ while(fire_head());
+ fire_mask = 0;
+
+ nx_timer_schedule(timers[0], 2);
+ nx_timer_schedule(timers[1], 4);
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ if (count < 1) return "First failed to fire";
+ if (count > 1) return "Second fired prematurely";
+ if (fire_mask != 1) return "Incorrect fire mask 1";
+
+ nx_timer_schedule(timers[2], 2);
+ nx_timer_schedule(timers[3], 4);
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ fire_head();
+ if (count < 1) return "Second failed to fire";
+ if (count < 2) return "Third failed to fire";
+ if (fire_mask != 7) return "Incorrect fire mask 7";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ if (count < 1) return "Fourth failed to fire";
+ if (fire_mask != 15) return "Incorrect fire mask 15";
+
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ if (count > 0) return "Spurious fire";
+
+ return 0;
+}
+
+
+static char* test_big(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ long durations[16] =
+ { 5, 8, 7, 6,
+ 14, 10, 16, 15,
+ 11, 12, 9, 12,
+ 1, 2, 3, 4};
+ unsigned long masks[18] = {
+ 0x1000,
+ 0x3000,
+ 0x7000,
+ 0xf000,
+ 0xf001,
+ 0xf009,
+ 0xf00d,
+ 0xf00f,
+ 0xf40f,
+ 0xf42f,
+ 0xf52f,
+ 0xff2f,
+ 0xff2f,
+ 0xff3f,
+ 0xffbf,
+ 0xffff,
+ 0xffff,
+ 0xffff
+ };
+
+ int i;
+ for (i = 0; i < 16; i++)
+ nx_timer_schedule(timers[i], durations[i]);
+ for (i = 0; i < 18; i++) {
+ sys_mutex_lock(lock);
+ nx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ while(fire_head());
+ if (fire_mask != masks[i]) {
+ static char error[100];
+ sprintf(error, "Iteration %d: expected mask %04lx, got %04lx", i, masks[i], fire_mask);
+ return error;
+ }
+ }
+
+ return 0;
+}
+
+
+int timer_tests(void)
+{
+ int result = 0;
+
+ fire_mask = 0;
+ DEQ_INIT(pending_timers);
+ lock = sys_mutex();
+ nx_timer_initialize(lock);
+ time = 1;
+
+ timers[0] = nx_timer(0, (void*) 0x00000001);
+ timers[1] = nx_timer(0, (void*) 0x00000002);
+ timers[2] = nx_timer(0, (void*) 0x00000004);
+ timers[3] = nx_timer(0, (void*) 0x00000008);
+ timers[4] = nx_timer(0, (void*) 0x00000010);
+ timers[5] = nx_timer(0, (void*) 0x00000020);
+ timers[6] = nx_timer(0, (void*) 0x00000040);
+ timers[7] = nx_timer(0, (void*) 0x00000080);
+ timers[8] = nx_timer(0, (void*) 0x00000100);
+ timers[9] = nx_timer(0, (void*) 0x00000200);
+ timers[10] = nx_timer(0, (void*) 0x00000400);
+ timers[11] = nx_timer(0, (void*) 0x00000800);
+ timers[12] = nx_timer(0, (void*) 0x00001000);
+ timers[13] = nx_timer(0, (void*) 0x00002000);
+ timers[14] = nx_timer(0, (void*) 0x00004000);
+ timers[15] = nx_timer(0, (void*) 0x00008000);
+
+ TEST_CASE(test_quiet, 0);
+ TEST_CASE(test_immediate, 0);
+ TEST_CASE(test_immediate_plus_delayed, 0);
+ TEST_CASE(test_single, 0);
+ TEST_CASE(test_two_inorder, 0);
+ TEST_CASE(test_two_reverse, 0);
+ TEST_CASE(test_two_duplicate, 0);
+ TEST_CASE(test_separated, 0);
+ TEST_CASE(test_big, 0);
+
+ int i;
+ for (i = 0; i < 16; i++)
+ nx_timer_free(timers[i]);
+
+ nx_timer_finalize();
+
+ return result;
+}
+
diff --git a/qpid/extras/nexus/tests/tool_test.c b/qpid/extras/nexus/tests/tool_test.c
new file mode 100644
index 0000000000..0848b51ec7
--- /dev/null
+++ b/qpid/extras/nexus/tests/tool_test.c
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include <qpid/nexus/ctools.h>
+
+typedef struct item_t {
+ DEQ_LINKS(struct item_t);
+ char letter;
+} item_t;
+
+DEQ_DECLARE(item_t, item_list_t);
+
+
+static char* list_well_formed(item_list_t list, char *key)
+{
+ item_t *ptr;
+ item_t *last = 0;
+ int size = DEQ_SIZE(list);
+ int count = 0;
+ char str[32];
+
+ ptr = DEQ_HEAD(list);
+ while (ptr) {
+ str[count] = ptr->letter;
+ count++;
+ if (DEQ_PREV(ptr) != last) return "Corrupt previous link";
+ last = ptr;
+ ptr = DEQ_NEXT(ptr);
+ }
+ str[count] = '\0';
+ if (strcmp(str, key) != 0) return "Invalid key";
+
+ if (count != size) return "Size different from number of items (forward)";
+
+ count = 0;
+ last = 0;
+ ptr = DEQ_TAIL(list);
+ while (ptr) {
+ count++;
+ if (DEQ_NEXT(ptr) != last) return "Corrupt next link";
+ last = ptr;
+ ptr = DEQ_PREV(ptr);
+ }
+
+ if (count != size) return "Size different from number of items (backward)";
+
+ return 0;
+}
+
+
+static char* test_deq_basic(void *context)
+{
+ item_list_t list;
+ item_t item[10];
+ item_t *ptr;
+ int idx;
+ char *subtest;
+
+ DEQ_INIT(list);
+ if (DEQ_SIZE(list) != 0) return "Expected zero initial size";
+
+ for (idx = 0; idx < 10; idx++) {
+ DEQ_ITEM_INIT(&item[idx]);
+ item[idx].letter = 'A' + idx;
+ DEQ_INSERT_TAIL(list, &item[idx]);
+ }
+ if (DEQ_SIZE(list) != 10) return "Expected 10 items in list";
+
+ ptr = DEQ_HEAD(list);
+ if (!ptr) return "Expected valid head item";
+ if (DEQ_PREV(ptr)) return "Head item has non-null previous link";
+ if (ptr->letter != 'A') return "Expected item A at the head";
+ if (DEQ_NEXT(ptr) == 0) return "Head item has null next link";
+ subtest = list_well_formed(list, "ABCDEFGHIJ");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE_HEAD(list);
+ if (DEQ_SIZE(list) != 9) return "Expected 9 items in list";
+ ptr = DEQ_HEAD(list);
+ if (ptr->letter != 'B') return "Expected item B at the head";
+ subtest = list_well_formed(list, "BCDEFGHIJ");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE_TAIL(list);
+ if (DEQ_SIZE(list) != 8) return "Expected 8 items in list";
+ ptr = DEQ_TAIL(list);
+ if (ptr->letter != 'I') return "Expected item I at the tail";
+ subtest = list_well_formed(list, "BCDEFGHI");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE(list, &item[4]);
+ if (DEQ_SIZE(list) != 7) return "Expected 7 items in list";
+ subtest = list_well_formed(list, "BCDFGHI");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE(list, &item[1]);
+ if (DEQ_SIZE(list) != 6) return "Expected 6 items in list";
+ subtest = list_well_formed(list, "CDFGHI");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE(list, &item[8]);
+ if (DEQ_SIZE(list) != 5) return "Expected 5 items in list";
+ subtest = list_well_formed(list, "CDFGH");
+ if (subtest) return subtest;
+
+ DEQ_INSERT_HEAD(list, &item[8]);
+ if (DEQ_SIZE(list) != 6) return "Expected 6 items in list";
+ ptr = DEQ_HEAD(list);
+ if (ptr->letter != 'I') return "Expected item I at the head";
+ subtest = list_well_formed(list, "ICDFGH");
+ if (subtest) return subtest;
+
+ DEQ_INSERT_AFTER(list, &item[4], &item[7]);
+ if (DEQ_SIZE(list) != 7) return "Expected 7 items in list";
+ ptr = DEQ_TAIL(list);
+ if (ptr->letter != 'E') return "Expected item E at the head";
+ subtest = list_well_formed(list, "ICDFGHE");
+ if (subtest) return subtest;
+
+ DEQ_INSERT_AFTER(list, &item[1], &item[5]);
+ if (DEQ_SIZE(list) != 8) return "Expected 8 items in list";
+ subtest = list_well_formed(list, "ICDFBGHE");
+ if (subtest) return subtest;
+
+ if (item[0].prev || item[0].next) return "Unlisted item A has non-null pointers";
+ if (item[9].prev || item[9].next) return "Unlisted item J has non-null pointers";
+
+ return 0;
+}
+
+
+int tool_tests(void)
+{
+ int result = 0;
+
+ TEST_CASE(test_deq_basic, 0);
+
+ return result;
+}
+