From 10f23aca8dab83f51cb4fce341dc7fef7dac44b9 Mon Sep 17 00:00:00 2001
From: "Rafael H. Schloming"
This implementation is intended to optimise the performance of lookup(String) + * to about the level of a HashMap get. It has been observed that the scheme + * resolution phase performed by the JVM takes considerably longer, so for + * optimum performance lookups should be coded like:
+ *
+ * Context componentContext = (Context)new InitialContext().lookup("java:comp");
+ * String envEntry = (String) componentContext.lookup("env/myEntry");
+ * String envEntry2 = (String) componentContext.lookup("env/myEntry2");
+ *
+ */
+public class ReadOnlyContext implements Context, Serializable
+{
+ private static final long serialVersionUID = -5754338187296859149L;
+ protected static final NameParser nameParser = new NameParserImpl();
+
+ protected final Hashtable environment; // environment for this context
+ protected final Map bindings; // bindings at my level
+ protected final Map treeBindings; // all bindings under me
+
+ private boolean frozen = false;
+ private String nameInNamespace = "";
+ public static final String SEPARATOR = "/";
+
+ public ReadOnlyContext()
+ {
+ environment = new Hashtable();
+ bindings = new HashMap();
+ treeBindings = new HashMap();
+ }
+
+ public ReadOnlyContext(Hashtable env)
+ {
+ if (env == null)
+ {
+ this.environment = new Hashtable();
+ }
+ else
+ {
+ this.environment = new Hashtable(env);
+ }
+
+ this.bindings = Collections.EMPTY_MAP;
+ this.treeBindings = Collections.EMPTY_MAP;
+ }
+
+ public ReadOnlyContext(Hashtable environment, Map bindings)
+ {
+ if (environment == null)
+ {
+ this.environment = new Hashtable();
+ }
+ else
+ {
+ this.environment = new Hashtable(environment);
+ }
+
+ this.bindings = bindings;
+ treeBindings = new HashMap();
+ frozen = true;
+ }
+
+ public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace)
+ {
+ this(environment, bindings);
+ this.nameInNamespace = nameInNamespace;
+ }
+
+ protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env)
+ {
+ this.bindings = clone.bindings;
+ this.treeBindings = clone.treeBindings;
+ this.environment = new Hashtable(env);
+ }
+
+ protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace)
+ {
+ this(clone, env);
+ this.nameInNamespace = nameInNamespace;
+ }
+
+ public void freeze()
+ {
+ frozen = true;
+ }
+
+ boolean isFrozen()
+ {
+ return frozen;
+ }
+
+ /**
+ * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses.
+ * It binds every possible lookup into a map in each context. To do this, each context
+ * strips off one name segment and if necessary creates a new context for it. Then it asks that context
+ * to bind the remaining name. It returns a map containing all the bindings from the next context, plus
+ * the context it just created (if it in fact created it). (the names are suitably extended by the segment
+ * originally lopped off).
+ *
+ * @param name
+ * @param value
+ * @return
+ * @throws javax.naming.NamingException
+ */
+ protected Map internalBind(String name, Object value) throws NamingException
+ {
+ assert (name != null) && (name.length() > 0);
+ assert !frozen;
+
+ Map newBindings = new HashMap();
+ int pos = name.indexOf('/');
+ if (pos == -1)
+ {
+ if (treeBindings.put(name, value) != null)
+ {
+ throw new NamingException("Something already bound at " + name);
+ }
+
+ bindings.put(name, value);
+ newBindings.put(name, value);
+ }
+ else
+ {
+ String segment = name.substring(0, pos);
+ assert segment != null;
+ assert !segment.equals("");
+ Object o = treeBindings.get(segment);
+ if (o == null)
+ {
+ o = newContext();
+ treeBindings.put(segment, o);
+ bindings.put(segment, o);
+ newBindings.put(segment, o);
+ }
+ else if (!(o instanceof ReadOnlyContext))
+ {
+ throw new NamingException("Something already bound where a subcontext should go");
+ }
+
+ ReadOnlyContext readOnlyContext = (ReadOnlyContext) o;
+ String remainder = name.substring(pos + 1);
+ Map subBindings = readOnlyContext.internalBind(remainder, value);
+ for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ String subName = segment + "/" + (String) entry.getKey();
+ Object bound = entry.getValue();
+ treeBindings.put(subName, bound);
+ newBindings.put(subName, bound);
+ }
+ }
+
+ return newBindings;
+ }
+
+ protected ReadOnlyContext newContext()
+ {
+ return new ReadOnlyContext();
+ }
+
+ public Object addToEnvironment(String propName, Object propVal) throws NamingException
+ {
+ return environment.put(propName, propVal);
+ }
+
+ public Hashtable getEnvironment() throws NamingException
+ {
+ return (Hashtable) environment.clone();
+ }
+
+ public Object removeFromEnvironment(String propName) throws NamingException
+ {
+ return environment.remove(propName);
+ }
+
+ public Object lookup(String name) throws NamingException
+ {
+ if (name.length() == 0)
+ {
+ return this;
+ }
+
+ Object result = treeBindings.get(name);
+ if (result == null)
+ {
+ result = bindings.get(name);
+ }
+
+ if (result == null)
+ {
+ int pos = name.indexOf(':');
+ if (pos > 0)
+ {
+ String scheme = name.substring(0, pos);
+ Context ctx = NamingManager.getURLContext(scheme, environment);
+ if (ctx == null)
+ {
+ throw new NamingException("scheme " + scheme + " not recognized");
+ }
+
+ return ctx.lookup(name);
+ }
+ else
+ {
+ // Split out the first name of the path
+ // and look for it in the bindings map.
+ CompositeName path = new CompositeName(name);
+
+ if (path.size() == 0)
+ {
+ return this;
+ }
+ else
+ {
+ String first = path.get(0);
+ Object obj = bindings.get(first);
+ if (obj == null)
+ {
+ throw new NameNotFoundException(name);
+ }
+ else if ((obj instanceof Context) && (path.size() > 1))
+ {
+ Context subContext = (Context) obj;
+ obj = subContext.lookup(path.getSuffix(1));
+ }
+
+ return obj;
+ }
+ }
+ }
+
+ if (result instanceof LinkRef)
+ {
+ LinkRef ref = (LinkRef) result;
+ result = lookup(ref.getLinkName());
+ }
+
+ if (result instanceof Reference)
+ {
+ try
+ {
+ result = NamingManager.getObjectInstance(result, null, null, this.environment);
+ }
+ catch (NamingException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw (NamingException) new NamingException("could not look up : " + name).initCause(e);
+ }
+ }
+
+ if (result instanceof ReadOnlyContext)
+ {
+ String prefix = getNameInNamespace();
+ if (prefix.length() > 0)
+ {
+ prefix = prefix + SEPARATOR;
+ }
+
+ result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name);
+ }
+
+ return result;
+ }
+
+ public Object lookup(Name name) throws NamingException
+ {
+ return lookup(name.toString());
+ }
+
+ public Object lookupLink(String name) throws NamingException
+ {
+ return lookup(name);
+ }
+
+ public Name composeName(Name name, Name prefix) throws NamingException
+ {
+ Name result = (Name) prefix.clone();
+ result.addAll(name);
+
+ return result;
+ }
+
+ public String composeName(String name, String prefix) throws NamingException
+ {
+ CompositeName result = new CompositeName(prefix);
+ result.addAll(new CompositeName(name));
+
+ return result.toString();
+ }
+
+ public NamingEnumeration list(String name) throws NamingException
+ {
+ Object o = lookup(name);
+ if (o == this)
+ {
+ return new ReadOnlyContext.ListEnumeration();
+ }
+ else if (o instanceof Context)
+ {
+ return ((Context) o).list("");
+ }
+ else
+ {
+ throw new NotContextException();
+ }
+ }
+
+ public NamingEnumeration listBindings(String name) throws NamingException
+ {
+ Object o = lookup(name);
+ if (o == this)
+ {
+ return new ReadOnlyContext.ListBindingEnumeration();
+ }
+ else if (o instanceof Context)
+ {
+ return ((Context) o).listBindings("");
+ }
+ else
+ {
+ throw new NotContextException();
+ }
+ }
+
+ public Object lookupLink(Name name) throws NamingException
+ {
+ return lookupLink(name.toString());
+ }
+
+ public NamingEnumeration list(Name name) throws NamingException
+ {
+ return list(name.toString());
+ }
+
+ public NamingEnumeration listBindings(Name name) throws NamingException
+ {
+ return listBindings(name.toString());
+ }
+
+ public void bind(Name name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void bind(String name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void close() throws NamingException
+ {
+ // ignore
+ }
+
+ public Context createSubcontext(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public Context createSubcontext(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void destroySubcontext(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void destroySubcontext(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public String getNameInNamespace() throws NamingException
+ {
+ return nameInNamespace;
+ }
+
+ public NameParser getNameParser(Name name) throws NamingException
+ {
+ return nameParser;
+ }
+
+ public NameParser getNameParser(String name) throws NamingException
+ {
+ return nameParser;
+ }
+
+ public void rebind(Name name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rebind(String name, Object obj) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rename(Name oldName, Name newName) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void rename(String oldName, String newName) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void unbind(Name name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ public void unbind(String name) throws NamingException
+ {
+ throw new OperationNotSupportedException();
+ }
+
+ private abstract class LocalNamingEnumeration implements NamingEnumeration
+ {
+ private Iterator i = bindings.entrySet().iterator();
+
+ public boolean hasMore() throws NamingException
+ {
+ return i.hasNext();
+ }
+
+ public boolean hasMoreElements()
+ {
+ return i.hasNext();
+ }
+
+ protected Map.Entry getNext()
+ {
+ return (Map.Entry) i.next();
+ }
+
+ public void close() throws NamingException
+ { }
+ }
+
+ private class ListEnumeration extends ReadOnlyContext.LocalNamingEnumeration
+ {
+ public Object next() throws NamingException
+ {
+ return nextElement();
+ }
+
+ public Object nextElement()
+ {
+ Map.Entry entry = getNext();
+
+ return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName());
+ }
+ }
+
+ private class ListBindingEnumeration extends ReadOnlyContext.LocalNamingEnumeration
+ {
+ public Object next() throws NamingException
+ {
+ return nextElement();
+ }
+
+ public Object nextElement()
+ {
+ Map.Entry entry = getNext();
+
+ return new Binding((String) entry.getKey(), entry.getValue());
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/naming/jndi.properties b/java/client/src/main/java/org/apache/qpid/naming/jndi.properties
new file mode 100644
index 0000000000..830de5f619
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/naming/jndi.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.naming.PropertiesFileInitialConextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+# qpid:username=foo;password=password;client_id=id;virtualhost=path@tpc:localhost:1556
+connectionfactory.local = qpid:tcp:localhost'
+
+# register some queues in JNDI using the form
+# queue.[jndiName] = [physicalName]
+queue.MyQueue = example.MyQueue
+
+# register some topics in JNDI using the form
+# topic.[jndiName] = [physicalName]
+topic.ibmStocks = stocks.nyse.ibm
+
+# Register an AMQP destination in JNDI
+# NOTE: Qpid currently only supports direct,topics and headers
+# destination.[jniName] = [BindingURL]
+destination.direct = direct://amq.direct//directQueue
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java
new file mode 100644
index 0000000000..bed3ee02cb
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Client.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.nclient;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.client.url.URLParser_0_10;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.url.QpidURL;
+import org.apache.qpid.ErrorCode;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.nclient.impl.ClientSession;
+import org.apache.qpid.nclient.impl.ClientSessionDelegate;
+import org.apache.qpid.transport.Channel;
+import org.apache.qpid.transport.ClientDelegate;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionCloseOk;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.network.mina.MinaHandler;
+import org.apache.qpid.transport.network.nio.NioHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Client implements org.apache.qpid.nclient.Connection
+{
+ private Connection _conn;
+ private ClosedListener _closedListner;
+ private final Lock _lock = new ReentrantLock();
+ private static Logger _logger = LoggerFactory.getLogger(Client.class);
+ private Condition closeOk;
+ private boolean closed = false;
+ private long timeout = 60000;
+
+ private ProtocolHeader header = null;
+
+ /**
+ *
+ * @return returns a new connection to the broker.
+ */
+ public static org.apache.qpid.nclient.Connection createConnection()
+ {
+ return new Client();
+ }
+
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
+ {
+
+ final Condition negotiationComplete = _lock.newCondition();
+ closeOk = _lock.newCondition();
+ _lock.lock();
+
+ ClientDelegate connectionDelegate = new ClientDelegate()
+ {
+ private boolean receivedClose = false;
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ClientSessionDelegate();
+ }
+
+ public void exception(Throwable t)
+ {
+ if (_closedListner != null)
+ {
+ _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t);
+ }
+ else
+ {
+ throw new RuntimeException("connection closed",t);
+ }
+ }
+
+ public void closed()
+ {
+ if (_closedListner != null && !this.receivedClose)
+ {
+ _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null);
+ }
+ }
+
+ @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct)
+ {
+ _lock.lock();
+ try
+ {
+ closed = true;
+ this.receivedClose = true;
+ closeOk.signalAll();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
+ {
+ ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue());
+ if (_closedListner == null && errorCode != ErrorCode.NO_ERROR)
+ {
+ throw new RuntimeException
+ (new QpidException("Server closed the connection: Reason " +
+ connectionClose.getReplyText(),
+ errorCode,
+ null));
+ }
+ else
+ {
+ _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null);
+ }
+
+ this.receivedClose = true;
+ }
+ @Override public void init(Channel ch, ProtocolHeader hdr)
+ {
+ // TODO: once the merge is done we'll need to update this code
+ // for handling 0.8 protocol version type i.e. major=8 and mino
+ if (hdr.getMajor() != 0 || hdr.getMinor() != 10)
+ {
+ Client.this.header = hdr;
+ _lock.lock();
+ negotiationComplete.signalAll();
+ _lock.unlock();
+ }
+ }
+ };
+
+ connectionDelegate.setCondition(_lock,negotiationComplete);
+ connectionDelegate.setUsername(username);
+ connectionDelegate.setPassword(password);
+ connectionDelegate.setVirtualHost(virtualHost);
+
+ String transport = System.getProperty("transport","io");
+ if (transport.equalsIgnoreCase("nio"))
+ {
+ _logger.info("using NIO Transport");
+ _conn = NioHandler.connect(host, port,connectionDelegate);
+ }
+ else if (transport.equalsIgnoreCase("io"))
+ {
+ _logger.info("using Plain IO Transport");
+ _conn = IoTransport.connect(host, port,connectionDelegate);
+ }
+ else
+ {
+ _logger.info("using MINA Transport");
+ _conn = MinaHandler.connect(host, port,connectionDelegate);
+ // _conn = NativeHandler.connect(host, port,connectionDelegate);
+ }
+
+ // XXX: hardcoded version numbers
+ _conn.send(new ProtocolHeader(1, 0, 10));
+
+ try
+ {
+ negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
+ if (header != null)
+ {
+ _conn.close();
+ throw new ProtocolVersionException(header.getMajor(), header.getMinor());
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public void connect(String url)throws QpidException
+ {
+ URLParser_0_10 parser = null;
+ try
+ {
+ parser = new URLParser_0_10(url);
+ }
+ catch(Exception e)
+ {
+ throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e);
+ }
+ Listconnection, it
+ * informs the connection's ExceptionListener
+ */
+public interface ClosedListener
+{
+ /**
+ * If the communication layer detects a serious problem with a connection, it
+ * informs the connection's ExceptionListener
+ * @param errorCode TODO
+ * @param reason TODO
+ * @param t TODO
+ * @see Connection
+ */
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t);
+}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
new file mode 100644
index 0000000000..2d5b50b33a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import org.apache.qpid.QpidException;
+
+/**
+ * This represents a physical connection to a broker.
+ */
+public interface Connection
+{
+ /**
+ * Establish the connection using the given parameters
+ *
+ * @param host host name
+ * @param port port number
+ * @param virtualHost the virtual host name
+ * @param username user name
+ * @param password password
+ * @throws QpidException If the communication layer fails to establish the connection.
+ */
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
+
+ /**
+ * Establish the connection with the broker identified by the URL.
+ *
+ * @param url Specifies the URL of the broker.
+ * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown.
+ */
+ public void connect(String url) throws QpidException;
+
+ /**
+ * Close this connection.
+ *
+ * @throws QpidException if the communication layer fails to close the connection.
+ */
+ public void close() throws QpidException;
+
+ /**
+ * Create a session for this connection.
+ * The returned session is suspended + * (i.e. this session is not attached to an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than + * or equal to 0 then the session does not expire. + * @return A newly created (suspended) session. + */ + public Session createSession(long expiryInSeconds); + + /** + * Create a DtxSession for this connection. + *
A Dtx Session must be used when resources have to be manipulated as + * part of a global transaction. + *
The retuned DtxSession is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal + * to 0 then the session does not expire. + * @return A newly created (suspended) DtxSession. + */ + public DtxSession createDTXSession(int expiryInSeconds); + + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ClosedListener + * + * @param exceptionListner The ClosedListener + */ + public void setClosedListener(ClosedListener exceptionListner); +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java new file mode 100644 index 0000000000..8a859f2d84 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient; + +import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.GetTimeoutResult; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.RecoverResult; +import org.apache.qpid.transport.XaResult; +import org.apache.qpid.transport.Xid; + +/** + * The resources for this session are controlled under the scope of a distributed transaction. + */ +public interface DtxSession extends Session +{ + + /** + * This method is called when messages should be produced and consumed on behalf a transaction + * branch identified by xid. + * possible options are: + *
The sequence of event for transferring a message is as follows: + *
DeliveryProperties or ApplicationProperties
+ */
+ public void messageHeader(Header header);
+
+ /**
+ * Add the following byte array to the content of the message being received
+ *
+ * @param src Data to be added or streamed.
+ */
+ public void data(ByteBuffer src);
+
+ /**
+ * Indicates that the message has been fully received.
+ */
+ public void messageReceived();
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
new file mode 100644
index 0000000000..e4daaa094e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.api.Message;
+
+/**
+ * A session is associated with a connection. + * When it is created, a session is not associated with an underlying channel. + * The session is single threaded.
+ * + * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method. + * For example,command1 will be synchronously invoked by using the following sequence:
+ * session.command1()
+ * session.sync()
+ * This transfer provides a complete message + * using a single method. The method is internally mapped to messageTransfer() and headers() followed + * by data() and endData(). + * This method should only be used by small messages.
+ * + * @param destination The exchange the message is being sent to. + * @param msg The Message to be sent. + * @param confirmModeThis transfer streams a complete message using a single method. + * It uses pull-semantics instead of doing a push.
+ *Data is pulled from a Message object using read()
+ * and pushed using messageTransfer() and headers() followed by data() and endData().
+ *
This method should only be used by large messages
+ * There are two convenience Message classes to do this.
+ *
{@link org.apache.qpid.nclient.util.FileMessage}
+ * {@link org.apache.qpid.nclient.util.StreamingMessage}
+ * Message interface to wrap any
+ * data stream.
+ *
+ *
+ * @param destination The exchange the message is being sent to.
+ * @param msg The Message to be sent.
+ * @param confirmMode Note that only the data between the buffer's current position and the + * buffer limit is added. + * It is therefore recommended to flip the buffer before adding it to the message, + * + * @param buf Data to be added. + */ + public void data(ByteBuffer buf); + + /** + * Add a string to the content of the message being sent. + * + * @param str String to be added. + */ + public void data(String str); + + /** + * Signals the end of data for the message. + */ + public void endData(); + + //------------------------------------------------------ + // Messaging methods + // Consumer + //------------------------------------------------------ + + /** + * Associate a message listener with a destination. + *
The destination is bound to a queue, and messages are filtered based + * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled). + *
The valid options are: + *
Requests exclusive subscription access, so that only this + * subscription can access the queue. + *
This is an empty option, and has no effect. + *
Only one listener is permitted for each destination. When a new listener is created, + * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener + * has completed processing a message. + * + * @param destination The destination the listener is associated with. + * @param listener The new listener for this destination. + */ + public void setMessageListener(String destination, MessagePartListener listener); + + /** + * Sets the mode of flow control used for a given destination. + *
With credit based flow control, the broker continually maintains its current + * credit balance with the recipient. The credit balance consists of two values, a message + * count, and a byte count. Whenever message data is sent, both counts must be decremented. + * If either value reaches zero, the flow of message data must stop. Additional credit is + * received via the {@link Session#messageFlow} method. + *
Window based flow control is identical to credit based flow control, however message + * acknowledgment implicitly grants a single unit of message credit, and the size of the + * message in byte credits for each acknowledged message. + * + * @param destination The destination to set the flow mode on. + * @param mode
The credit on the broker will remain at zero once + * this method is completed. + * + * @param destination The destination on which the credit supply is to be exhausted. + */ + public void messageFlush(String destination, Option ... options); + + /** + * On receipt of this method, the brokers set credit to zero for a given + * destination. When confirmation of this method + * is issued credit is set to zero. No further messages will be sent until + * further credit is received. + * + * @param destination The destination on which to reset credit. + */ + public void messageStop(String destination, Option ... options); + + /** + * Acknowledge the receipt of a range of messages. + *
Messages must already be acquired, either by receiving them in + * pre-acquire mode or by explicitly acquiring them. + * + * @param ranges Range of messages to be acknowledged. + * @param accept pecify whether to send a message accept to the broker + */ + public void messageAcknowledge(RangeSet ranges, boolean accept); + + /** + * Reject a range of acquired messages. + *
The broker will deliver rejected messages to the + * alternate-exchange on the queue from which it came. If no alternate-exchange is + * defined for that queue the broker will discard the message. + * + * @param ranges Range of messages to be rejected. + * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or + * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but + * failed). + * @param text String describing the reason for a message transfer rejection. + */ + public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); + + /** + * As it is possible that the broker does not manage to reject some messages, after completion of + * {@link Session#messageReject} this method will return the ranges of rejected messages. + *
Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the + * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. + *
A recommended invocation sequence would be: + *
As those messages may have been consumed by another receivers hence, + * message acquisition can fail. + * The outcome of the acquisition is returned as an array of ranges of qcquired messages. + *
This method should only be called on non-acquired messages.
+ *
+ * @param ranges Ranges of messages to be acquired.
+ * @return Indicates the acquired messages
+ */
+ public Future Released messages are re-enqueued.
+ *
+ * @param ranges Ranges of messages to be released.
+ * @param options Valid option is: {@link Option#SET_REDELIVERED})
+ */
+ public void messageRelease(RangeSet ranges, Option ... options);
+
+ // -----------------------------------------------
+ // Local transaction methods
+ // ----------------------------------------------
+ /**
+ * Selects the session for local transaction support.
+ */
+ public void txSelect(Option ... options);
+
+ /**
+ * Commit the receipt and delivery of all messages exchanged by this session's resources.
+ *
+ * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
+ */
+ public void txCommit(Option ... options) throws IllegalStateException;
+
+ /**
+ * Roll back the receipt and delivery of all messages exchanged by this session's resources.
+ *
+ * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
+ */
+ public void txRollback(Option ... options) throws IllegalStateException;
+
+ //---------------------------------------------
+ // Queue methods
+ //---------------------------------------------
+
+ /**
+ * Declare a queue with the given queueName
+ * Following are the valid options:
+ * If this field is set and the exclusive field is also set,
+ * then the queue is deleted when the connection closes.
+ * If this field is set and the exclusive field is not set the queue is deleted when all
+ * the consumers have finished using it.
+ * If set when creating a new queue,
+ * the queue will be marked as durable. Durable queues
+ * remain active when a server restarts. Non-durable queues (transient queues) are purged
+ * if/when a server restarts. Note that durable queues do not necessarily hold persistent
+ * messages, although it does not make sense to send persistent messages to a transient
+ * queue.
+ * Exclusive queues can only be used from one connection at a time.
+ * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the
+ * declaring connection closes.
+ * If set, the server will not create the queue.
+ * This field allows the client to assert the presence of a queue without modifying the server state.
+ * Has no effect as it represents an empty option.
+ * In the absence of a particular option, the defaul value is false for each option
+ *
+ * @param queueName The name of the delcared queue.
+ * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
+ * may be rejected by a queue for the following reasons:
+ * Following are the valid options:
+ * If set, the server will only delete the queue if it has no messages.
+ * If set, the server will only delete the queue if it has no consumers.
+ * If the queue has consumers the server does does not delete it but raises a channel exception instead.
+ * Has no effect as it represents an empty option.
+ *
+ *
+ *
+ * @param arguments Used for backward compatibility
+ * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
+ * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE})
+ * @see Option
+ */
+ public void queueDeclare(String queueName, String alternateExchange, Map
+ *
+ *
In the absence of a particular option, the defaul value is false for each option
+ * + * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the + * current queue for the session, which is the last declared queue. + * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} + * and {@link Option#NONE}) + * @see Option + */ + public void queueDelete(String queueName, Option... options); + + + /** + * This method is used to request information on a particular queue. + * + * @param queueName The name of the queue for which information is requested. + * @return Information on the specified queue. + */ + public FutureValid options are: + *
If set, the exchange is deleted when all queues have finished using it. + *
If set, the exchange will + * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient + * exchanges) are purged when a server restarts. + *
If set, the server will not create the exchange. + * The client can use this to check whether an exchange exists without modifying the server state. + *
This option is an empty option, and has no effect. + *
In the absence of a particular option, the defaul value is false for each option
+ * + * @param exchangeName The exchange name. + * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The + * exchange types define the functionality of the exchange - i.e. how messages are routed + * through it. It is not valid or meaningful to attempt to change the type of an existing + * exchange. Default exchange types are: direct, topic, headers and fanout. + * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which + * the message will be sent. + * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, + * {@link Option#PASSIVE}, {@link Option#NONE}) + * @param arguments Used for backward compatibility + * @see Option + */ + public void exchangeDeclare(String exchangeName, String type, String alternateExchange, + MapFollowing are the valid options: + *
If set, the server will only delete the exchange if it has no queue bindings. If the + * exchange has queue bindings the server does not delete it but raises a channel exception + * instead. + *
Has no effect as it represents an empty option. + *
Note that if an option is not set, it will default to false.
+ *
+ * @param exchangeName The name of exchange to be deleted.
+ * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}.
+ * @see Option
+ */
+ public void exchangeDelete(String exchangeName, Option... options);
+
+
+ /**
+ * This method is used to request information about a particular exchange.
+ *
+ * @param exchangeName The name of the exchange about which information is requested. If not set, the method will
+ * return information about the default exchange.
+ * @return Information on the specified exchange.
+ */
+ public Future
+ * Property name: message_size_before_sync
+ *
+ * Default value: 200000000
+ */
+ public static long MAX_NOT_SYNC_DATA_LENGH;
+ /**
+ * The total message size in KBs that can be transferted before
+ * messages are flushed.
+ * When a flush returns all messages have reached the broker.
+ *
+ * Property name: message_size_before_flush
+ *
+ * Default value: 200000000
+ */
+ public static long MAX_NOT_FLUSH_DATA_LENGH;
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
new file mode 100644
index 0000000000..96e1d2c772
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
@@ -0,0 +1,94 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.qpid.ErrorCode;
+import org.apache.qpid.api.Message;
+import org.apache.qpid.nclient.Client;
+import org.apache.qpid.nclient.Connection;
+import org.apache.qpid.nclient.ClosedListener;
+import org.apache.qpid.nclient.Session;
+import org.apache.qpid.nclient.util.MessageListener;
+import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageProperties;
+
+import java.util.UUID;
+
+public class DemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setClosedListener(new ClosedListener()
+ {
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ {
+ System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+ // queue
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
+ ssn.header(new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId(UUID.randomUUID()));
+ ssn.data("this is the data");
+ ssn.endData();
+
+ //reject
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
+ ssn.data("this should be rejected");
+ ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.endData();
+ ssn.sync();
+
+ // topic subs
+ ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
+ ssn.sync();
+
+ ssn.queueDeclare("topic1", null, null);
+ ssn.exchangeBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueDeclare("topic2", null, null);
+ ssn.exchangeBind("topic2", "amq.topic", "stock.us.*",null);
+ ssn.queueDeclare("topic3", null, null);
+ ssn.exchangeBind("topic3", "amq.topic", "stock.us.rh",null);
+ ssn.sync();
+
+ // topic
+ ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
+ ssn.data("Topic message");
+ ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
+ new MessageProperties().setMessageId(UUID.randomUUID()));
+ ssn.endData();
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
new file mode 100644
index 0000000000..36c0a4b3be
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
@@ -0,0 +1,76 @@
+package org.apache.qpid.nclient.impl;
+
+import java.io.FileInputStream;
+
+import org.apache.qpid.ErrorCode;
+import org.apache.qpid.api.Message;
+import org.apache.qpid.nclient.Client;
+import org.apache.qpid.nclient.Connection;
+import org.apache.qpid.nclient.ClosedListener;
+import org.apache.qpid.nclient.Session;
+import org.apache.qpid.nclient.util.FileMessage;
+import org.apache.qpid.nclient.util.MessageListener;
+import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageProperties;
+
+import java.util.UUID;
+
+public class LargeMsgDemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setClosedListener(new ClosedListener()
+ {
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ {
+ System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+ try
+ {
+ FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
+ 1024,
+ new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId(UUID.randomUUID()));
+
+ // queue
+ ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
+ ssn.sync();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
new file mode 100644
index 0000000000..513c1a95de
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
@@ -0,0 +1,156 @@
+package org.apache.qpid.nclient.interop;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.ErrorCode;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.api.Message;
+import org.apache.qpid.nclient.Client;
+import org.apache.qpid.nclient.Connection;
+import org.apache.qpid.nclient.ClosedListener;
+import org.apache.qpid.nclient.Session;
+import org.apache.qpid.nclient.util.MessageListener;
+import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.RangeSet;
+
+public class BasicInteropTest implements ClosedListener
+{
+
+ private Session session;
+ private Connection conn;
+ private String host;
+
+ public BasicInteropTest(String host)
+ {
+ this.host = host;
+ }
+
+ public void close() throws QpidException
+ {
+ conn.close();
+ }
+
+ public void testCreateConnection(){
+ System.out.println("------- Creating connection--------");
+ conn = Client.createConnection();
+ try{
+ conn.connect(host, 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ System.out.println("------- Error Creating connection--------");
+ e.printStackTrace();
+ System.exit(1);
+ }
+ System.out.println("------- Connection created Suscessfully --------");
+ }
+
+ public void testCreateSession(){
+ System.out.println("------- Creating session --------");
+ session = conn.createSession(0);
+ System.out.println("------- Session created sucessfully --------");
+ }
+
+ public void testExchange(){
+ System.out.println("------- Creating an exchange --------");
+ session.exchangeDeclare("test", "direct", "", null);
+ session.sync();
+ System.out.println("------- Exchange created --------");
+ }
+
+ public void testQueue(){
+ System.out.println("------- Creating a queue --------");
+ session.queueDeclare("testQueue", "", null);
+ session.sync();
+ System.out.println("------- Queue created --------");
+
+ System.out.println("------- Binding a queue --------");
+ session.exchangeBind("testQueue", "test", "testKey", null);
+ session.sync();
+ System.out.println("------- Queue bound --------");
+ }
+
+ public void testSendMessage(){
+ System.out.println("------- Sending a message --------");
+ session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
+
+ Map A Simple implementation of the message interface
+ * for small messages. When the readData methods are called
+ * we assume the message is complete. i.e there want be any
+ * appendData operations after that. If you need large message support please see
+ * This implementation is intended to optimise the performance of lookup(String)
- * to about the level of a HashMap get. It has been observed that the scheme
- * resolution phase performed by the JVM takes considerably longer, so for
- * optimum performance lookups should be coded like: The returned session is suspended
- * (i.e. this session is not attached to an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than
- * or equal to 0 then the session does not expire.
- * @return A newly created (suspended) session.
- */
- public Session createSession(long expiryInSeconds);
-
- /**
- * Create a DtxSession for this connection.
- * A Dtx Session must be used when resources have to be manipulated as
- * part of a global transaction.
- * The retuned DtxSession is suspended
- * (i.e. this session is not attached with an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal
- * to 0 then the session does not expire.
- * @return A newly created (suspended) DtxSession.
- */
- public DtxSession createDTXSession(int expiryInSeconds);
-
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ClosedListener
- *
- * @param exceptionListner The ClosedListener
- */
- public void setClosedListener(ClosedListener exceptionListner);
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
deleted file mode 100644
index 1d9c63df4f..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.nclient;
-
-import org.apache.qpidity.transport.Future;
-import org.apache.qpidity.transport.GetTimeoutResult;
-import org.apache.qpidity.transport.Option;
-import org.apache.qpidity.transport.RecoverResult;
-import org.apache.qpidity.transport.XaResult;
-import org.apache.qpidity.transport.Xid;
-
-/**
- * The resources for this session are controlled under the scope of a distributed transaction.
- */
-public interface DtxSession extends Session
-{
-
- /**
- * This method is called when messages should be produced and consumed on behalf a transaction
- * branch identified by xid.
- * possible options are:
- * The sequence of event for transferring a message is as follows:
- * A session is associated with a connection.
- * When it is created, a session is not associated with an underlying channel.
- * The session is single threaded. This transfer provides a complete message
- * using a single method. The method is internally mapped to messageTransfer() and headers() followed
- * by data() and endData().
- * This method should only be used by small messages. This transfer streams a complete message using a single method.
- * It uses pull-semantics instead of doing a push. Data is pulled from a Message object using read()
- * and pushed using messageTransfer() and headers() followed by data() and endData().
- * FileMessage and StreamingMessage
+ *
- * Context componentContext = (Context)new InitialContext().lookup("java:comp");
- * String envEntry = (String) componentContext.lookup("env/myEntry");
- * String envEntry2 = (String) componentContext.lookup("env/myEntry2");
- *
- */
-public class ReadOnlyContext implements Context, Serializable
-{
- private static final long serialVersionUID = -5754338187296859149L;
- protected static final NameParser nameParser = new NameParserImpl();
-
- protected final Hashtable environment; // environment for this context
- protected final Map bindings; // bindings at my level
- protected final Map treeBindings; // all bindings under me
-
- private boolean frozen = false;
- private String nameInNamespace = "";
- public static final String SEPARATOR = "/";
-
- public ReadOnlyContext()
- {
- environment = new Hashtable();
- bindings = new HashMap();
- treeBindings = new HashMap();
- }
-
- public ReadOnlyContext(Hashtable env)
- {
- if (env == null)
- {
- this.environment = new Hashtable();
- }
- else
- {
- this.environment = new Hashtable(env);
- }
-
- this.bindings = Collections.EMPTY_MAP;
- this.treeBindings = Collections.EMPTY_MAP;
- }
-
- public ReadOnlyContext(Hashtable environment, Map bindings)
- {
- if (environment == null)
- {
- this.environment = new Hashtable();
- }
- else
- {
- this.environment = new Hashtable(environment);
- }
-
- this.bindings = bindings;
- treeBindings = new HashMap();
- frozen = true;
- }
-
- public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace)
- {
- this(environment, bindings);
- this.nameInNamespace = nameInNamespace;
- }
-
- protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env)
- {
- this.bindings = clone.bindings;
- this.treeBindings = clone.treeBindings;
- this.environment = new Hashtable(env);
- }
-
- protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace)
- {
- this(clone, env);
- this.nameInNamespace = nameInNamespace;
- }
-
- public void freeze()
- {
- frozen = true;
- }
-
- boolean isFrozen()
- {
- return frozen;
- }
-
- /**
- * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses.
- * It binds every possible lookup into a map in each context. To do this, each context
- * strips off one name segment and if necessary creates a new context for it. Then it asks that context
- * to bind the remaining name. It returns a map containing all the bindings from the next context, plus
- * the context it just created (if it in fact created it). (the names are suitably extended by the segment
- * originally lopped off).
- *
- * @param name
- * @param value
- * @return
- * @throws javax.naming.NamingException
- */
- protected Map internalBind(String name, Object value) throws NamingException
- {
- assert (name != null) && (name.length() > 0);
- assert !frozen;
-
- Map newBindings = new HashMap();
- int pos = name.indexOf('/');
- if (pos == -1)
- {
- if (treeBindings.put(name, value) != null)
- {
- throw new NamingException("Something already bound at " + name);
- }
-
- bindings.put(name, value);
- newBindings.put(name, value);
- }
- else
- {
- String segment = name.substring(0, pos);
- assert segment != null;
- assert !segment.equals("");
- Object o = treeBindings.get(segment);
- if (o == null)
- {
- o = newContext();
- treeBindings.put(segment, o);
- bindings.put(segment, o);
- newBindings.put(segment, o);
- }
- else if (!(o instanceof ReadOnlyContext))
- {
- throw new NamingException("Something already bound where a subcontext should go");
- }
-
- ReadOnlyContext readOnlyContext = (ReadOnlyContext) o;
- String remainder = name.substring(pos + 1);
- Map subBindings = readOnlyContext.internalBind(remainder, value);
- for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();)
- {
- Map.Entry entry = (Map.Entry) iterator.next();
- String subName = segment + "/" + (String) entry.getKey();
- Object bound = entry.getValue();
- treeBindings.put(subName, bound);
- newBindings.put(subName, bound);
- }
- }
-
- return newBindings;
- }
-
- protected ReadOnlyContext newContext()
- {
- return new ReadOnlyContext();
- }
-
- public Object addToEnvironment(String propName, Object propVal) throws NamingException
- {
- return environment.put(propName, propVal);
- }
-
- public Hashtable getEnvironment() throws NamingException
- {
- return (Hashtable) environment.clone();
- }
-
- public Object removeFromEnvironment(String propName) throws NamingException
- {
- return environment.remove(propName);
- }
-
- public Object lookup(String name) throws NamingException
- {
- if (name.length() == 0)
- {
- return this;
- }
-
- Object result = treeBindings.get(name);
- if (result == null)
- {
- result = bindings.get(name);
- }
-
- if (result == null)
- {
- int pos = name.indexOf(':');
- if (pos > 0)
- {
- String scheme = name.substring(0, pos);
- Context ctx = NamingManager.getURLContext(scheme, environment);
- if (ctx == null)
- {
- throw new NamingException("scheme " + scheme + " not recognized");
- }
-
- return ctx.lookup(name);
- }
- else
- {
- // Split out the first name of the path
- // and look for it in the bindings map.
- CompositeName path = new CompositeName(name);
-
- if (path.size() == 0)
- {
- return this;
- }
- else
- {
- String first = path.get(0);
- Object obj = bindings.get(first);
- if (obj == null)
- {
- throw new NameNotFoundException(name);
- }
- else if ((obj instanceof Context) && (path.size() > 1))
- {
- Context subContext = (Context) obj;
- obj = subContext.lookup(path.getSuffix(1));
- }
-
- return obj;
- }
- }
- }
-
- if (result instanceof LinkRef)
- {
- LinkRef ref = (LinkRef) result;
- result = lookup(ref.getLinkName());
- }
-
- if (result instanceof Reference)
- {
- try
- {
- result = NamingManager.getObjectInstance(result, null, null, this.environment);
- }
- catch (NamingException e)
- {
- throw e;
- }
- catch (Exception e)
- {
- throw (NamingException) new NamingException("could not look up : " + name).initCause(e);
- }
- }
-
- if (result instanceof ReadOnlyContext)
- {
- String prefix = getNameInNamespace();
- if (prefix.length() > 0)
- {
- prefix = prefix + SEPARATOR;
- }
-
- result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name);
- }
-
- return result;
- }
-
- public Object lookup(Name name) throws NamingException
- {
- return lookup(name.toString());
- }
-
- public Object lookupLink(String name) throws NamingException
- {
- return lookup(name);
- }
-
- public Name composeName(Name name, Name prefix) throws NamingException
- {
- Name result = (Name) prefix.clone();
- result.addAll(name);
-
- return result;
- }
-
- public String composeName(String name, String prefix) throws NamingException
- {
- CompositeName result = new CompositeName(prefix);
- result.addAll(new CompositeName(name));
-
- return result.toString();
- }
-
- public NamingEnumeration list(String name) throws NamingException
- {
- Object o = lookup(name);
- if (o == this)
- {
- return new ReadOnlyContext.ListEnumeration();
- }
- else if (o instanceof Context)
- {
- return ((Context) o).list("");
- }
- else
- {
- throw new NotContextException();
- }
- }
-
- public NamingEnumeration listBindings(String name) throws NamingException
- {
- Object o = lookup(name);
- if (o == this)
- {
- return new ReadOnlyContext.ListBindingEnumeration();
- }
- else if (o instanceof Context)
- {
- return ((Context) o).listBindings("");
- }
- else
- {
- throw new NotContextException();
- }
- }
-
- public Object lookupLink(Name name) throws NamingException
- {
- return lookupLink(name.toString());
- }
-
- public NamingEnumeration list(Name name) throws NamingException
- {
- return list(name.toString());
- }
-
- public NamingEnumeration listBindings(Name name) throws NamingException
- {
- return listBindings(name.toString());
- }
-
- public void bind(Name name, Object obj) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void bind(String name, Object obj) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void close() throws NamingException
- {
- // ignore
- }
-
- public Context createSubcontext(Name name) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public Context createSubcontext(String name) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void destroySubcontext(Name name) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void destroySubcontext(String name) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public String getNameInNamespace() throws NamingException
- {
- return nameInNamespace;
- }
-
- public NameParser getNameParser(Name name) throws NamingException
- {
- return nameParser;
- }
-
- public NameParser getNameParser(String name) throws NamingException
- {
- return nameParser;
- }
-
- public void rebind(Name name, Object obj) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void rebind(String name, Object obj) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void rename(Name oldName, Name newName) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void rename(String oldName, String newName) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void unbind(Name name) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- public void unbind(String name) throws NamingException
- {
- throw new OperationNotSupportedException();
- }
-
- private abstract class LocalNamingEnumeration implements NamingEnumeration
- {
- private Iterator i = bindings.entrySet().iterator();
-
- public boolean hasMore() throws NamingException
- {
- return i.hasNext();
- }
-
- public boolean hasMoreElements()
- {
- return i.hasNext();
- }
-
- protected Map.Entry getNext()
- {
- return (Map.Entry) i.next();
- }
-
- public void close() throws NamingException
- { }
- }
-
- private class ListEnumeration extends ReadOnlyContext.LocalNamingEnumeration
- {
- public Object next() throws NamingException
- {
- return nextElement();
- }
-
- public Object nextElement()
- {
- Map.Entry entry = getNext();
-
- return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName());
- }
- }
-
- private class ListBindingEnumeration extends ReadOnlyContext.LocalNamingEnumeration
- {
- public Object next() throws NamingException
- {
- return nextElement();
- }
-
- public Object nextElement()
- {
- Map.Entry entry = getNext();
-
- return new Binding((String) entry.getKey(), entry.getValue());
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties b/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties
deleted file mode 100644
index e451cf53fa..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/naming/jndi.properties
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-java.naming.factory.initial = org.apache.qpidity.naming.PropertiesFileInitialConextFactory
-
-# use the following property to configure the default connector
-#java.naming.provider.url - ignored.
-
-# register some connection factories
-# connectionfactory.[jndiname] = [ConnectionURL]
-# qpid:username=foo;password=password;client_id=id;virtualhost=path@tpc:localhost:1556
-connectionfactory.local = qpid:tcp:localhost'
-
-# register some queues in JNDI using the form
-# queue.[jndiName] = [physicalName]
-queue.MyQueue = example.MyQueue
-
-# register some topics in JNDI using the form
-# topic.[jndiName] = [physicalName]
-topic.ibmStocks = stocks.nyse.ibm
-
-# Register an AMQP destination in JNDI
-# NOTE: Qpid currently only supports direct,topics and headers
-# destination.[jniName] = [BindingURL]
-destination.direct = direct://amq.direct//directQueue
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
deleted file mode 100644
index eb0e370560..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpidity.nclient;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.qpid.client.url.URLParser_0_10;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.url.QpidURL;
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.nclient.impl.ClientSession;
-import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
-import org.apache.qpidity.transport.Channel;
-import org.apache.qpidity.transport.ClientDelegate;
-import org.apache.qpidity.transport.Connection;
-import org.apache.qpidity.transport.ConnectionClose;
-import org.apache.qpidity.transport.ConnectionCloseCode;
-import org.apache.qpidity.transport.ConnectionCloseOk;
-import org.apache.qpidity.transport.ProtocolHeader;
-import org.apache.qpidity.transport.ProtocolVersionException;
-import org.apache.qpidity.transport.SessionDelegate;
-import org.apache.qpidity.transport.network.io.IoTransport;
-import org.apache.qpidity.transport.network.mina.MinaHandler;
-import org.apache.qpidity.transport.network.nio.NioHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class Client implements org.apache.qpidity.nclient.Connection
-{
- private Connection _conn;
- private ClosedListener _closedListner;
- private final Lock _lock = new ReentrantLock();
- private static Logger _logger = LoggerFactory.getLogger(Client.class);
- private Condition closeOk;
- private boolean closed = false;
- private long timeout = 60000;
-
- private ProtocolHeader header = null;
-
- /**
- *
- * @return returns a new connection to the broker.
- */
- public static org.apache.qpidity.nclient.Connection createConnection()
- {
- return new Client();
- }
-
- public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
- {
-
- final Condition negotiationComplete = _lock.newCondition();
- closeOk = _lock.newCondition();
- _lock.lock();
-
- ClientDelegate connectionDelegate = new ClientDelegate()
- {
- private boolean receivedClose = false;
- public SessionDelegate getSessionDelegate()
- {
- return new ClientSessionDelegate();
- }
-
- public void exception(Throwable t)
- {
- if (_closedListner != null)
- {
- _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t);
- }
- else
- {
- throw new RuntimeException("connection closed",t);
- }
- }
-
- public void closed()
- {
- if (_closedListner != null && !this.receivedClose)
- {
- _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null);
- }
- }
-
- @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct)
- {
- _lock.lock();
- try
- {
- closed = true;
- this.receivedClose = true;
- closeOk.signalAll();
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
- {
- ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue());
- if (_closedListner == null && errorCode != ErrorCode.NO_ERROR)
- {
- throw new RuntimeException
- (new QpidException("Server closed the connection: Reason " +
- connectionClose.getReplyText(),
- errorCode,
- null));
- }
- else
- {
- _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null);
- }
-
- this.receivedClose = true;
- }
- @Override public void init(Channel ch, ProtocolHeader hdr)
- {
- // TODO: once the merge is done we'll need to update this code
- // for handling 0.8 protocol version type i.e. major=8 and mino
- if (hdr.getMajor() != 0 || hdr.getMinor() != 10)
- {
- Client.this.header = hdr;
- _lock.lock();
- negotiationComplete.signalAll();
- _lock.unlock();
- }
- }
- };
-
- connectionDelegate.setCondition(_lock,negotiationComplete);
- connectionDelegate.setUsername(username);
- connectionDelegate.setPassword(password);
- connectionDelegate.setVirtualHost(virtualHost);
-
- String transport = System.getProperty("transport","io");
- if (transport.equalsIgnoreCase("nio"))
- {
- _logger.info("using NIO Transport");
- _conn = NioHandler.connect(host, port,connectionDelegate);
- }
- else if (transport.equalsIgnoreCase("io"))
- {
- _logger.info("using Plain IO Transport");
- _conn = IoTransport.connect(host, port,connectionDelegate);
- }
- else
- {
- _logger.info("using MINA Transport");
- _conn = MinaHandler.connect(host, port,connectionDelegate);
- // _conn = NativeHandler.connect(host, port,connectionDelegate);
- }
-
- // XXX: hardcoded version numbers
- _conn.send(new ProtocolHeader(1, 0, 10));
-
- try
- {
- negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
- if (header != null)
- {
- _conn.close();
- throw new ProtocolVersionException(header.getMajor(), header.getMinor());
- }
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public void connect(String url)throws QpidException
- {
- URLParser_0_10 parser = null;
- try
- {
- parser = new URLParser_0_10(url);
- }
- catch(Exception e)
- {
- throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e);
- }
- Listconnection, it
- * informs the connection's ExceptionListener
- */
-public interface ClosedListener
-{
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ExceptionListener
- * @param errorCode TODO
- * @param reason TODO
- * @param t TODO
- * @see Connection
- */
- public void onClosed(ErrorCode errorCode, String reason, Throwable t);
-}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java b/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
deleted file mode 100644
index 49167750d1..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.nclient;
-
-import org.apache.qpidity.QpidException;
-
-/**
- * This represents a physical connection to a broker.
- */
-public interface Connection
-{
- /**
- * Establish the connection using the given parameters
- *
- * @param host host name
- * @param port port number
- * @param virtualHost the virtual host name
- * @param username user name
- * @param password password
- * @throws QpidException If the communication layer fails to establish the connection.
- */
- public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
-
- /**
- * Establish the connection with the broker identified by the URL.
- *
- * @param url Specifies the URL of the broker.
- * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown.
- */
- public void connect(String url) throws QpidException;
-
- /**
- * Close this connection.
- *
- * @throws QpidException if the communication layer fails to close the connection.
- */
- public void close() throws QpidException;
-
- /**
- * Create a session for this connection.
- *
- *
- *
- * @param xid Specifies the xid of the transaction branch to be started.
- * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
- * @return Confirms to the client that the transaction branch is started or specify the error condition.
- */
- public Future
- *
- *
- * @param xid Specifies the xid of the transaction branch to be ended.
- * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
- * @return Confirms to the client that the transaction branch is ended or specifies the error condition.
- */
- public Future
- *
- *
- * @param xid Specifies the xid of the transaction branch to be committed.
- * @param options Available option is: {@link Option#ONE_PHASE}
- * @return Confirms to the client that the transaction branch is committed or specifies the error condition.
- */
- public Future
- *
- * It is up to the implementation to assemble the message once the different parts
- * are transferred.
- */
-public interface MessagePartListener
-{
- /**
- * Indicates the Message transfer has started.
- *
- * @param transferId The message transfer ID.
- */
- public void messageTransfer(int transferId);
-
- /**
- * Add the following a header to the message being received.
- *
- * @param header Either DeliveryProperties or ApplicationProperties
- */
- public void messageHeader(Header header);
-
- /**
- * Add the following byte array to the content of the message being received
- *
- * @param src Data to be added or streamed.
- */
- public void data(ByteBuffer src);
-
- /**
- * Indicates that the message has been fully received.
- */
- public void messageReceived();
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
deleted file mode 100644
index 218a7ed571..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.nclient;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.qpidity.transport.*;
-import org.apache.qpidity.api.Message;
-
-/**
- * command1 will be synchronously invoked by using the following sequence:
- *
- *
- */
-public interface Session
-{
- public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1;
- public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0;
- public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0;
- public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1;
- public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
- public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
- public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
- public static final short MESSAGE_FLOW_UNIT_BYTE = 1;
- public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF;
- public static final short MESSAGE_REJECT_CODE_GENERIC = 0;
- public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1;
- public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
- public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1;
-
- //------------------------------------------------------
- // Session housekeeping methods
- //------------------------------------------------------
-
- /**
- * Sync method will block the session until all outstanding commands
- * are executed.
- */
- public void sync();
-
- public void close();
-
- public void sessionDetach(byte[] name, Option ... options);
-
- public void sessionRequestTimeout(long expiry, Option ... options);
-
- public byte[] getName();
-
- public void setAutoSync(boolean value);
-
- //------------------------------------------------------
- // Messaging methods
- // Producer
- //------------------------------------------------------
- /**
- * Transfer a message to a specified exchange.
- *
- * session.command1()
- * session.sync()
- * off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required. Once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred.
- *
- *
- * @param acquireMode
- *
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown.
- */
- public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)
- throws IOException;
-
-
- /**
- *
This method should only be used by large messages
- * There are two convenience Message classes to do this.
- *
- *
- * You can also implement a {@link org.apache.qpidity.nclient.util.FileMessage}
- * {@link org.apache.qpidity.nclient.util.StreamingMessage}
- * Message interface to wrap any
- * data stream.
- *
Note that only the data between the buffer's current position and the - * buffer limit is added. - * It is therefore recommended to flip the buffer before adding it to the message, - * - * @param buf Data to be added. - */ - public void data(ByteBuffer buf); - - /** - * Add a string to the content of the message being sent. - * - * @param str String to be added. - */ - public void data(String str); - - /** - * Signals the end of data for the message. - */ - public void endData(); - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Associate a message listener with a destination. - *
The destination is bound to a queue, and messages are filtered based - * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled). - *
The valid options are: - *
Requests exclusive subscription access, so that only this - * subscription can access the queue. - *
This is an empty option, and has no effect. - *
Only one listener is permitted for each destination. When a new listener is created, - * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener - * has completed processing a message. - * - * @param destination The destination the listener is associated with. - * @param listener The new listener for this destination. - */ - public void setMessageListener(String destination, MessagePartListener listener); - - /** - * Sets the mode of flow control used for a given destination. - *
With credit based flow control, the broker continually maintains its current - * credit balance with the recipient. The credit balance consists of two values, a message - * count, and a byte count. Whenever message data is sent, both counts must be decremented. - * If either value reaches zero, the flow of message data must stop. Additional credit is - * received via the {@link Session#messageFlow} method. - *
Window based flow control is identical to credit based flow control, however message - * acknowledgment implicitly grants a single unit of message credit, and the size of the - * message in byte credits for each acknowledged message. - * - * @param destination The destination to set the flow mode on. - * @param mode
The credit on the broker will remain at zero once - * this method is completed. - * - * @param destination The destination on which the credit supply is to be exhausted. - */ - public void messageFlush(String destination, Option ... options); - - /** - * On receipt of this method, the brokers set credit to zero for a given - * destination. When confirmation of this method - * is issued credit is set to zero. No further messages will be sent until - * further credit is received. - * - * @param destination The destination on which to reset credit. - */ - public void messageStop(String destination, Option ... options); - - /** - * Acknowledge the receipt of a range of messages. - *
Messages must already be acquired, either by receiving them in - * pre-acquire mode or by explicitly acquiring them. - * - * @param ranges Range of messages to be acknowledged. - * @param accept pecify whether to send a message accept to the broker - */ - public void messageAcknowledge(RangeSet ranges, boolean accept); - - /** - * Reject a range of acquired messages. - *
The broker will deliver rejected messages to the - * alternate-exchange on the queue from which it came. If no alternate-exchange is - * defined for that queue the broker will discard the message. - * - * @param ranges Range of messages to be rejected. - * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or - * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but - * failed). - * @param text String describing the reason for a message transfer rejection. - */ - public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); - - /** - * As it is possible that the broker does not manage to reject some messages, after completion of - * {@link Session#messageReject} this method will return the ranges of rejected messages. - *
Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the - * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. - *
A recommended invocation sequence would be: - *
As those messages may have been consumed by another receivers hence, - * message acquisition can fail. - * The outcome of the acquisition is returned as an array of ranges of qcquired messages. - *
This method should only be called on non-acquired messages.
- *
- * @param ranges Ranges of messages to be acquired.
- * @return Indicates the acquired messages
- */
- public Future Released messages are re-enqueued.
- *
- * @param ranges Ranges of messages to be released.
- * @param options Valid option is: {@link Option#SET_REDELIVERED})
- */
- public void messageRelease(RangeSet ranges, Option ... options);
-
- // -----------------------------------------------
- // Local transaction methods
- // ----------------------------------------------
- /**
- * Selects the session for local transaction support.
- */
- public void txSelect(Option ... options);
-
- /**
- * Commit the receipt and delivery of all messages exchanged by this session's resources.
- *
- * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
- */
- public void txCommit(Option ... options) throws IllegalStateException;
-
- /**
- * Roll back the receipt and delivery of all messages exchanged by this session's resources.
- *
- * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
- */
- public void txRollback(Option ... options) throws IllegalStateException;
-
- //---------------------------------------------
- // Queue methods
- //---------------------------------------------
-
- /**
- * Declare a queue with the given queueName
- * Following are the valid options:
- * If this field is set and the exclusive field is also set,
- * then the queue is deleted when the connection closes.
- * If this field is set and the exclusive field is not set the queue is deleted when all
- * the consumers have finished using it.
- * If set when creating a new queue,
- * the queue will be marked as durable. Durable queues
- * remain active when a server restarts. Non-durable queues (transient queues) are purged
- * if/when a server restarts. Note that durable queues do not necessarily hold persistent
- * messages, although it does not make sense to send persistent messages to a transient
- * queue.
- * Exclusive queues can only be used from one connection at a time.
- * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the
- * declaring connection closes.
- * If set, the server will not create the queue.
- * This field allows the client to assert the presence of a queue without modifying the server state.
- * Has no effect as it represents an empty option.
- * In the absence of a particular option, the defaul value is false for each option
- *
- * @param queueName The name of the delcared queue.
- * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
- * may be rejected by a queue for the following reasons:
- * Following are the valid options:
- * If set, the server will only delete the queue if it has no messages.
- * If set, the server will only delete the queue if it has no consumers.
- * If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * Has no effect as it represents an empty option.
- *
- *
- *
- * @param arguments Used for backward compatibility
- * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE})
- * @see Option
- */
- public void queueDeclare(String queueName, String alternateExchange, Map
- *
- *
In the absence of a particular option, the defaul value is false for each option
- * - * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NONE}) - * @see Option - */ - public void queueDelete(String queueName, Option... options); - - - /** - * This method is used to request information on a particular queue. - * - * @param queueName The name of the queue for which information is requested. - * @return Information on the specified queue. - */ - public FutureValid options are: - *
If set, the exchange is deleted when all queues have finished using it. - *
If set, the exchange will - * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient - * exchanges) are purged when a server restarts. - *
If set, the server will not create the exchange. - * The client can use this to check whether an exchange exists without modifying the server state. - *
This option is an empty option, and has no effect. - *
In the absence of a particular option, the defaul value is false for each option
- * - * @param exchangeName The exchange name. - * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The - * exchange types define the functionality of the exchange - i.e. how messages are routed - * through it. It is not valid or meaningful to attempt to change the type of an existing - * exchange. Default exchange types are: direct, topic, headers and fanout. - * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which - * the message will be sent. - * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NONE}) - * @param arguments Used for backward compatibility - * @see Option - */ - public void exchangeDeclare(String exchangeName, String type, String alternateExchange, - MapFollowing are the valid options: - *
If set, the server will only delete the exchange if it has no queue bindings. If the - * exchange has queue bindings the server does not delete it but raises a channel exception - * instead. - *
Has no effect as it represents an empty option. - *
Note that if an option is not set, it will default to false.
- *
- * @param exchangeName The name of exchange to be deleted.
- * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}.
- * @see Option
- */
- public void exchangeDelete(String exchangeName, Option... options);
-
-
- /**
- * This method is used to request information about a particular exchange.
- *
- * @param exchangeName The name of the exchange about which information is requested. If not set, the method will
- * return information about the default exchange.
- * @return Information on the specified exchange.
- */
- public Future
- * Property name: message_size_before_sync
- *
- * Default value: 200000000
- */
- public static long MAX_NOT_SYNC_DATA_LENGH;
- /**
- * The total message size in KBs that can be transferted before
- * messages are flushed.
- * When a flush returns all messages have reached the broker.
- *
- * Property name: message_size_before_flush
- *
- * Default value: 200000000
- */
- public static long MAX_NOT_FLUSH_DATA_LENGH;
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
deleted file mode 100644
index 05b99a3cf1..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.qpidity.nclient.impl;
-
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.Client;
-import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ClosedListener;
-import org.apache.qpidity.nclient.Session;
-import org.apache.qpidity.nclient.util.MessageListener;
-import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.MessageAcceptMode;
-import org.apache.qpidity.transport.MessageAcquireMode;
-import org.apache.qpidity.transport.MessageProperties;
-
-import java.util.UUID;
-
-public class DemoClient
-{
- public static MessagePartListenerAdapter createAdapter()
- {
- return new MessagePartListenerAdapter(new MessageListener()
- {
- public void onMessage(Message m)
- {
- System.out.println("\n================== Received Msg ==================");
- System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
- System.out.println(m.toString());
- System.out.println("================== End Msg ==================\n");
- }
-
- });
- }
-
- public static final void main(String[] args)
- {
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
-
- Session ssn = conn.createSession(50000);
- ssn.setClosedListener(new ClosedListener()
- {
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
- {
- System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
- }
- });
- ssn.queueDeclare("queue1", null, null);
- ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
- ssn.sync();
-
- ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
-
- // queue
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.header(new DeliveryProperties().setRoutingKey("queue1"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.data("this is the data");
- ssn.endData();
-
- //reject
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("this should be rejected");
- ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
- ssn.endData();
- ssn.sync();
-
- // topic subs
- ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
- ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
- ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
- ssn.sync();
-
- ssn.queueDeclare("topic1", null, null);
- ssn.exchangeBind("topic1", "amq.topic", "stock.*",null);
- ssn.queueDeclare("topic2", null, null);
- ssn.exchangeBind("topic2", "amq.topic", "stock.us.*",null);
- ssn.queueDeclare("topic3", null, null);
- ssn.exchangeBind("topic3", "amq.topic", "stock.us.rh",null);
- ssn.sync();
-
- // topic
- ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
- ssn.data("Topic message");
- ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
- ssn.endData();
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
deleted file mode 100644
index 64ffe17fe0..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.qpidity.nclient.impl;
-
-import java.io.FileInputStream;
-
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.Client;
-import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ClosedListener;
-import org.apache.qpidity.nclient.Session;
-import org.apache.qpidity.nclient.util.FileMessage;
-import org.apache.qpidity.nclient.util.MessageListener;
-import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.MessageProperties;
-
-import java.util.UUID;
-
-public class LargeMsgDemoClient
-{
- public static MessagePartListenerAdapter createAdapter()
- {
- return new MessagePartListenerAdapter(new MessageListener()
- {
- public void onMessage(Message m)
- {
- System.out.println("\n================== Received Msg ==================");
- System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
- System.out.println(m.toString());
- System.out.println("================== End Msg ==================\n");
- }
-
- });
- }
-
- public static final void main(String[] args)
- {
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
-
- Session ssn = conn.createSession(50000);
- ssn.setClosedListener(new ClosedListener()
- {
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
- {
- System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
- }
- });
- ssn.queueDeclare("queue1", null, null);
- ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
- ssn.sync();
-
- ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
-
- try
- {
- FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
- 1024,
- new DeliveryProperties().setRoutingKey("queue1"),
- new MessageProperties().setMessageId(UUID.randomUUID()));
-
- // queue
- ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
- ssn.sync();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
deleted file mode 100644
index e452091622..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-package org.apache.qpidity.nclient.interop;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.Client;
-import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ClosedListener;
-import org.apache.qpidity.nclient.Session;
-import org.apache.qpidity.nclient.util.MessageListener;
-import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.MessageAcceptMode;
-import org.apache.qpidity.transport.MessageAcquireMode;
-import org.apache.qpidity.transport.MessageCreditUnit;
-import org.apache.qpidity.transport.MessageFlowMode;
-import org.apache.qpidity.transport.MessageProperties;
-import org.apache.qpidity.transport.RangeSet;
-
-public class BasicInteropTest implements ClosedListener
-{
-
- private Session session;
- private Connection conn;
- private String host;
-
- public BasicInteropTest(String host)
- {
- this.host = host;
- }
-
- public void close() throws QpidException
- {
- conn.close();
- }
-
- public void testCreateConnection(){
- System.out.println("------- Creating connection--------");
- conn = Client.createConnection();
- try{
- conn.connect(host, 5672, "test", "guest", "guest");
- }catch(Exception e){
- System.out.println("------- Error Creating connection--------");
- e.printStackTrace();
- System.exit(1);
- }
- System.out.println("------- Connection created Suscessfully --------");
- }
-
- public void testCreateSession(){
- System.out.println("------- Creating session --------");
- session = conn.createSession(0);
- System.out.println("------- Session created sucessfully --------");
- }
-
- public void testExchange(){
- System.out.println("------- Creating an exchange --------");
- session.exchangeDeclare("test", "direct", "", null);
- session.sync();
- System.out.println("------- Exchange created --------");
- }
-
- public void testQueue(){
- System.out.println("------- Creating a queue --------");
- session.queueDeclare("testQueue", "", null);
- session.sync();
- System.out.println("------- Queue created --------");
-
- System.out.println("------- Binding a queue --------");
- session.exchangeBind("testQueue", "test", "testKey", null);
- session.sync();
- System.out.println("------- Queue bound --------");
- }
-
- public void testSendMessage(){
- System.out.println("------- Sending a message --------");
- session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-
- Map A Simple implementation of the message interface
- * for small messages. When the readData methods are called
- * we assume the message is complete. i.e there want be any
- * appendData operations after that. If you need large message support please see
- * FileMessage and StreamingMessage
- *