summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/pom.xml17
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Client.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java7
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java (renamed from java/client/src/main/java/org/apache/qpidity/client/ClientSession.java)5
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java (renamed from java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java)5
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java (renamed from java/client/src/main/java/org/apache/qpidity/client/DemoClient.java)7
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java74
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java100
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java41
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java5
13 files changed, 258 insertions, 15 deletions
diff --git a/java/client/pom.xml b/java/client/pom.xml
index cc0731d382..9bb448b631 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -188,6 +188,23 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <excludePackageNames>org.apache.qpid.*:org.apache.qpidity.jms:org.apache.qpidity.jms.*:org.apache.qpidity.client.impl</excludePackageNames>
+ <groups>
+ <group>
+ <title>API</title>
+ <packages>org.apache.qpidity.client</packages>
+ </group>
+ <group>
+ <title>Utility Package</title>
+ <packages>org.apache.qpidity.client.util</packages>
+ </group>
+ </groups>
+ </configuration>
+ </plugin>
</plugins>
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java
index b440561b66..25bebb4ae5 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java
@@ -14,6 +14,8 @@ import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.client.impl.ClientSession;
+import org.apache.qpidity.client.impl.ClientSessionDelegate;
public class Client implements org.apache.qpidity.client.Connection
@@ -23,6 +25,10 @@ public class Client implements org.apache.qpidity.client.Connection
private ExceptionListener _exceptionListner;
private final Lock _lock = new ReentrantLock();
+ /**
+ *
+ * @return returns a new connection to the broker.
+ */
public static org.apache.qpidity.client.Connection createConnection()
{
return new Client();
diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
index 273b9b899a..4ccef6df55 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
@@ -54,7 +54,7 @@ public interface MessagePartListener
*
* @param data Data to be added or streamed.
*/
- public void addData(ByteBuffer src);
+ public void data(ByteBuffer src);
/**
* Indicates that the message has been fully received.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index e4f2ae217c..dea6a01da6 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -336,6 +336,9 @@ public interface Session
*/
public void messageReject(RangeSet ranges, int code, String text);
+ /**
+ * @return the rejected message ranges
+ */
public RangeSet getRejectedMessages();
/**
@@ -350,7 +353,9 @@ public interface Session
*/
public void messageAcquire(RangeSet ranges, short mode);
-
+ /**
+ * @return returns the message ranges marked by the broker as acquired.
+ */
public RangeSet getAccquiredMessages();
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
index 13f3eeb1b6..8059633cab 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity.client;
+package org.apache.qpidity.client.impl;
import java.io.EOFException;
import java.io.IOException;
@@ -12,6 +12,9 @@ import org.apache.qpidity.QpidException;
import org.apache.qpidity.Range;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.Session;
/**
* Implements a Qpid Sesion.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
index 769fc3aeaa..dc72f1f975 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity.client;
+package org.apache.qpidity.client.impl;
import java.nio.ByteBuffer;
@@ -14,6 +14,7 @@ import org.apache.qpidity.Session;
import org.apache.qpidity.SessionClosed;
import org.apache.qpidity.SessionDelegate;
import org.apache.qpidity.Struct;
+import org.apache.qpidity.client.MessagePartListener;
public class ClientSessionDelegate extends SessionDelegate
@@ -33,7 +34,7 @@ public class ClientSessionDelegate extends SessionDelegate
{
for (ByteBuffer b : frame)
{
- _currentMessageListener.addData(b);
+ _currentMessageListener.data(b);
}
if (frame.isLastSegment() && frame.isLastFrame())
{
diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
index e46065e0a0..e2962b4f22 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
@@ -1,9 +1,14 @@
-package org.apache.qpidity.client;
+package org.apache.qpidity.client.impl;
import org.apache.qpidity.DeliveryProperties;
import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.Client;
+import org.apache.qpidity.client.Connection;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
public class DemoClient
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
new file mode 100644
index 0000000000..38a7b36403
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
@@ -0,0 +1,74 @@
+package org.apache.qpidity.client.impl;
+
+import java.io.FileInputStream;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.Client;
+import org.apache.qpidity.client.Connection;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.util.FileMessage;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+
+public class LargeMsgDemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+ try
+ {
+ FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
+ 1024,
+ new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId("123"));
+
+ // queue
+ ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
+ ssn.sync();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
index 9e4cf00c87..c4b8ae3f8b 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
@@ -32,7 +32,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
_currentMsg = new ByteBufferMessage(transferId);
}
- public void addData(ByteBuffer src)
+ public void data(ByteBuffer src)
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
new file mode 100644
index 0000000000..6a786e3edb
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
@@ -0,0 +1,100 @@
+package org.apache.qpidity.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpidity.QpidException;
+
+public class ConnectionFactoryImpl implements ConnectionFactory,QueueConnectionFactory, TopicConnectionFactory, Referenceable
+{
+ private String _host;
+ private int _port;
+ private String _defaultUsername;
+ private String _defaultPassword;
+ private String _virtualPath;
+ private String _url;
+
+ // Undefined at the moment
+ public ConnectionFactoryImpl(String url)
+ {
+ _url = url;
+ }
+
+ public ConnectionFactoryImpl(String host,int port,String virtualHost,String defaultUsername,String defaultPassword)
+ {
+ _host = host;
+ _port = port;
+ _defaultUsername = defaultUsername;
+ _defaultPassword = defaultPassword;
+ _virtualPath = virtualHost;
+ }
+
+ public Connection createConnection() throws JMSException
+ {
+ try
+ {
+ return new ConnectionImpl(_host,_port,_virtualPath,_defaultUsername,_defaultPassword);
+ }
+ catch(QpidException e)
+ {
+ // need to convert the qpid exception into jms exception
+ throw new JMSException("","");
+ }
+ }
+
+ public Connection createConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new ConnectionImpl(_host,_port,_virtualPath,username,password);
+ }
+ catch(QpidException e)
+ {
+ // need to convert the qpid exception into jms exception
+ throw new JMSException("","");
+ }
+ }
+
+ // ----------------------------------------
+ // Support for JMS 1.0 classes
+ // ----------------------------------------
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ return (QueueConnection) createConnection();
+ }
+
+ public QueueConnection createQueueConnection(String username, String password) throws JMSException
+ {
+ return (QueueConnection) createConnection(username, password);
+ }
+
+ public TopicConnection createTopicConnection() throws JMSException
+ {
+ return (TopicConnection) createConnection();
+ }
+
+ public TopicConnection createTopicConnection(String username, String password) throws JMSException
+ {
+ return (TopicConnection) createConnection(username, password);
+ }
+
+
+ // ----------------------------------------
+ // Support for JNDI
+ // ----------------------------------------
+ public Reference getReference() throws NamingException
+ {
+ return new Reference( ConnectionFactoryImpl.class.getName(),
+ new StringRefAddr(ConnectionFactoryImpl.class.getName(),_url));
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
index 3b6153c487..656a8cddd3 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
@@ -17,19 +17,38 @@
*/
package org.apache.qpidity.jms;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpidity.QpidException;
+import java.util.Vector;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
-import java.util.Vector;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection
*/
-public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
+public class ConnectionImpl implements Connection, QueueConnection, TopicConnection, Referenceable
{
/**
* This class's logger
@@ -95,8 +114,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
/**
* TODO define the parameters
*/
- public ConnectionImpl()
+ public ConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException
{
+ _qpidConnection = Client.createConnection();
+ _qpidConnection.connect(host, port, virtualHost, username, password);
}
//---- Interface javax.jms.Connection ---//
@@ -478,4 +499,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
return _qpidConnection;
}
+
+ public Reference getReference() throws NamingException
+ {
+ return new Reference( ConnectionImpl.class.getName(),
+ new StringRefAddr(ConnectionImpl.class.getName(),""));
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
index b95790486a..ce1d11c7e5 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
@@ -43,7 +43,7 @@ public class QueueImpl extends DestinationImpl implements Queue
{
super(session, name);
_exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
_queueName = name;
// check that this queue exist on the server
// As pasive is set the server will not create the queue.
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
index e76e566efd..3863adb07a 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
@@ -28,6 +28,11 @@ import javax.jms.XASession;
*/
public class XAConnectionImpl extends ConnectionImpl implements XAConnection
{
+ public XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+
/**
* Creates an XASession.
*