diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-15 23:29:13 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-15 23:29:13 +0000 |
| commit | 874ac4d63c3fab96e73ca9327617c26d51681c73 (patch) | |
| tree | 1fb9312573fcabfa2ef0259486a3a4a176b68660 /java | |
| parent | cbd5daf4330123a335374be6724309103fc2b41d (diff) | |
| download | qpid-python-874ac4d63c3fab96e73ca9327617c26d51681c73.tar.gz | |
Added initial ConnectionFactory support to JMS
Rearranged package structure for qpid client
Addded javadoc support for qpid client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566403 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
13 files changed, 258 insertions, 15 deletions
diff --git a/java/client/pom.xml b/java/client/pom.xml index cc0731d382..9bb448b631 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -188,6 +188,23 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <excludePackageNames>org.apache.qpid.*:org.apache.qpidity.jms:org.apache.qpidity.jms.*:org.apache.qpidity.client.impl</excludePackageNames> + <groups> + <group> + <title>API</title> + <packages>org.apache.qpidity.client</packages> + </group> + <group> + <title>Utility Package</title> + <packages>org.apache.qpidity.client.util</packages> + </group> + </groups> + </configuration> + </plugin> </plugins> diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index b440561b66..25bebb4ae5 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -14,6 +14,8 @@ import org.apache.qpidity.ErrorCode; import org.apache.qpidity.MinaHandler; import org.apache.qpidity.QpidException; import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.client.impl.ClientSession; +import org.apache.qpidity.client.impl.ClientSessionDelegate; public class Client implements org.apache.qpidity.client.Connection @@ -23,6 +25,10 @@ public class Client implements org.apache.qpidity.client.Connection private ExceptionListener _exceptionListner; private final Lock _lock = new ReentrantLock(); + /** + * + * @return returns a new connection to the broker. + */ public static org.apache.qpidity.client.Connection createConnection() { return new Client(); diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index 273b9b899a..4ccef6df55 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -54,7 +54,7 @@ public interface MessagePartListener * * @param data Data to be added or streamed. */ - public void addData(ByteBuffer src); + public void data(ByteBuffer src); /** * Indicates that the message has been fully received. diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index e4f2ae217c..dea6a01da6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -336,6 +336,9 @@ public interface Session */
public void messageReject(RangeSet ranges, int code, String text);
+ /**
+ * @return the rejected message ranges
+ */
public RangeSet getRejectedMessages();
/**
@@ -350,7 +353,9 @@ public interface Session */
public void messageAcquire(RangeSet ranges, short mode);
-
+ /**
+ * @return returns the message ranges marked by the broker as acquired.
+ */
public RangeSet getAccquiredMessages();
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index 13f3eeb1b6..8059633cab 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -1,4 +1,4 @@ -package org.apache.qpidity.client; +package org.apache.qpidity.client.impl; import java.io.EOFException; import java.io.IOException; @@ -12,6 +12,9 @@ import org.apache.qpidity.QpidException; import org.apache.qpidity.Range; import org.apache.qpidity.RangeSet; import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.Session; /** * Implements a Qpid Sesion. diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java index 769fc3aeaa..dc72f1f975 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -1,4 +1,4 @@ -package org.apache.qpidity.client; +package org.apache.qpidity.client.impl; import java.nio.ByteBuffer; @@ -14,6 +14,7 @@ import org.apache.qpidity.Session; import org.apache.qpidity.SessionClosed; import org.apache.qpidity.SessionDelegate; import org.apache.qpidity.Struct; +import org.apache.qpidity.client.MessagePartListener; public class ClientSessionDelegate extends SessionDelegate @@ -33,7 +34,7 @@ public class ClientSessionDelegate extends SessionDelegate { for (ByteBuffer b : frame) { - _currentMessageListener.addData(b); + _currentMessageListener.data(b); } if (frame.isLastSegment() && frame.isLastFrame()) { diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java index e46065e0a0..e2962b4f22 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java @@ -1,9 +1,14 @@ -package org.apache.qpidity.client; +package org.apache.qpidity.client.impl; import org.apache.qpidity.DeliveryProperties; import org.apache.qpidity.MessageProperties; import org.apache.qpidity.QpidException; import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.Client; +import org.apache.qpidity.client.Connection; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.MessageListener; +import org.apache.qpidity.client.Session; import org.apache.qpidity.client.util.MessagePartListenerAdapter; public class DemoClient diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java new file mode 100644 index 0000000000..38a7b36403 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java @@ -0,0 +1,74 @@ +package org.apache.qpidity.client.impl; + +import java.io.FileInputStream; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.Client; +import org.apache.qpidity.client.Connection; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.MessageListener; +import org.apache.qpidity.client.Session; +import org.apache.qpidity.client.util.FileMessage; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; + +public class LargeMsgDemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.queueBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + try + { + FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"), + 1024, + new DeliveryProperties().setRoutingKey("queue1"), + new MessageProperties().setMessageId("123")); + + // queue + ssn.messageStream("amq.direct",msg, (short) 0, (short) 1); + ssn.sync(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index 9e4cf00c87..c4b8ae3f8b 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -32,7 +32,7 @@ public class MessagePartListenerAdapter implements MessagePartListener _currentMsg = new ByteBufferMessage(transferId); } - public void addData(ByteBuffer src) + public void data(ByteBuffer src) { try { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java new file mode 100644 index 0000000000..6a786e3edb --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java @@ -0,0 +1,100 @@ +package org.apache.qpidity.jms; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.qpidity.QpidException; + +public class ConnectionFactoryImpl implements ConnectionFactory,QueueConnectionFactory, TopicConnectionFactory, Referenceable +{ + private String _host; + private int _port; + private String _defaultUsername; + private String _defaultPassword; + private String _virtualPath; + private String _url; + + // Undefined at the moment + public ConnectionFactoryImpl(String url) + { + _url = url; + } + + public ConnectionFactoryImpl(String host,int port,String virtualHost,String defaultUsername,String defaultPassword) + { + _host = host; + _port = port; + _defaultUsername = defaultUsername; + _defaultPassword = defaultPassword; + _virtualPath = virtualHost; + } + + public Connection createConnection() throws JMSException + { + try + { + return new ConnectionImpl(_host,_port,_virtualPath,_defaultUsername,_defaultPassword); + } + catch(QpidException e) + { + // need to convert the qpid exception into jms exception + throw new JMSException("",""); + } + } + + public Connection createConnection(String username, String password) throws JMSException + { + try + { + return new ConnectionImpl(_host,_port,_virtualPath,username,password); + } + catch(QpidException e) + { + // need to convert the qpid exception into jms exception + throw new JMSException("",""); + } + } + + // ---------------------------------------- + // Support for JMS 1.0 classes + // ---------------------------------------- + public QueueConnection createQueueConnection() throws JMSException + { + return (QueueConnection) createConnection(); + } + + public QueueConnection createQueueConnection(String username, String password) throws JMSException + { + return (QueueConnection) createConnection(username, password); + } + + public TopicConnection createTopicConnection() throws JMSException + { + return (TopicConnection) createConnection(); + } + + public TopicConnection createTopicConnection(String username, String password) throws JMSException + { + return (TopicConnection) createConnection(username, password); + } + + + // ---------------------------------------- + // Support for JNDI + // ---------------------------------------- + public Reference getReference() throws NamingException + { + return new Reference( ConnectionFactoryImpl.class.getName(), + new StringRefAddr(ConnectionFactoryImpl.class.getName(),_url)); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index 3b6153c487..656a8cddd3 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -17,19 +17,38 @@ */ package org.apache.qpidity.jms; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpidity.QpidException; +import java.util.Vector; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; -import java.util.Vector; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection */ -public class ConnectionImpl implements Connection, QueueConnection, TopicConnection +public class ConnectionImpl implements Connection, QueueConnection, TopicConnection, Referenceable { /** * This class's logger @@ -95,8 +114,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect /** * TODO define the parameters */ - public ConnectionImpl() + public ConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException { + _qpidConnection = Client.createConnection(); + _qpidConnection.connect(host, port, virtualHost, username, password); } //---- Interface javax.jms.Connection ---// @@ -478,4 +499,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { return _qpidConnection; } + + public Reference getReference() throws NamingException + { + return new Reference( ConnectionImpl.class.getName(), + new StringRefAddr(ConnectionImpl.class.getName(),"")); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java index b95790486a..ce1d11c7e5 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java @@ -43,7 +43,7 @@ public class QueueImpl extends DestinationImpl implements Queue { super(session, name); _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; - _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; _queueName = name; // check that this queue exist on the server // As pasive is set the server will not create the queue. diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java index e76e566efd..3863adb07a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java @@ -28,6 +28,11 @@ import javax.jms.XASession; */ public class XAConnectionImpl extends ConnectionImpl implements XAConnection { + public XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException + { + super(host, port, virtualHost, username, password); + } + /** * Creates an XASession. * |
