summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-10 01:49:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-10 01:49:45 +0000
commit55fa9bf941d78a34f90f2ed278afe4d5247abfad (patch)
treeb93f8810835f11945848ec5a395ec46d1686dac9 /java/client/src
parent21a371f3b6717b454d04d676ada015fcf1934dbb (diff)
downloadqpid-python-55fa9bf941d78a34f90f2ed278afe4d5247abfad.tar.gz
Added a Toy Exchange that does same basic routing for direct and topic.
Should be good enough for Arnaud to test atleast the basic JMS functionality. Added a FileMessage to demo Martins requirment. Haven't tested yet. The Toy Broker can now accept subscriptions and transfer messages to clients git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564451 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Client.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/Client.java)6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/ClientSession.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java)4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java)3
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Connection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/DemoClient.java84
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java99
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java (renamed from java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java)72
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java45
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java2
11 files changed, 247 insertions, 82 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java
index 13acff1ea6..8465475282 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java
@@ -1,7 +1,6 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client;
import java.net.URL;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -14,9 +13,6 @@ import org.apache.qpidity.ConnectionDelegate;
import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.SessionDelegate;
-import org.apache.qpidity.client.DtxSession;
-import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.Session;
public class Client implements org.apache.qpidity.client.Connection
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
index 627829556c..fcde60fa04 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client;
import java.util.HashMap;
import java.util.List;
@@ -9,8 +9,6 @@ import org.apache.qpidity.QpidException;
import org.apache.qpidity.Range;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.MessagePartListener;
/**
* Implements a Qpid Sesion.
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
index 70d1019a06..975dcb6d8b 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
@@ -1,4 +1,4 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client;
import java.nio.ByteBuffer;
@@ -12,7 +12,6 @@ import org.apache.qpidity.RangeSet;
import org.apache.qpidity.Session;
import org.apache.qpidity.SessionDelegate;
import org.apache.qpidity.Struct;
-import org.apache.qpidity.client.MessagePartListener;
public class ClientSessionDelegate extends SessionDelegate
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
index 2ea6db8943..9bc17b14a6 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
@@ -20,6 +20,7 @@ package org.apache.qpidity.client;
import java.net.URL;
+import java.util.UUID;
import org.apache.qpidity.QpidException;
@@ -46,7 +47,7 @@ public interface Connection
* @throws QpidException If the communication layer fails to connect with the broker.
*/
public void connect(URL url) throws QpidException;
-
+
/**
* Close this connection.
*
@@ -83,5 +84,6 @@ public interface Connection
*
* @param exceptionListner The execptionListener
*/
+
public void setExceptionListener(ExceptionListener exceptionListner);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
new file mode 100644
index 0000000000..e46065e0a0
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
@@ -0,0 +1,84 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+
+public class DemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+ // queue
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+ ssn.data("this is the data");
+ ssn.endData();
+
+ //reject
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.endData();
+ ssn.sync();
+
+ // topic subs
+ ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
+ ssn.sync();
+
+ ssn.queueDeclare("topic1", null, null);
+ ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueDeclare("topic2", null, null);
+ ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+ ssn.queueDeclare("topic3", null, null);
+ ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+ ssn.sync();
+
+ // topic
+ ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+ ssn.data("Topic message");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+ ssn.endData();
+ ssn.sync();
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 84de268232..09595c8d0b 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -20,7 +20,6 @@ package org.apache.qpidity.client;
import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.UUID;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
@@ -67,13 +66,6 @@ public interface Session
*/
public void sessionSuspend();
- /**
- * This will resume an existing session
- * <p> Upon resume the session is attached with an underlying channel
- * hence making operation on this session available.
- */
- public void sessionResume(UUID sessionId);
-
//------------------------------------------------------
// Messaging methods
// Producer
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
new file mode 100644
index 0000000000..84f18dcca4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
@@ -0,0 +1,99 @@
+package org.apache.qpidity.client.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * FileMessage provides pull style semantics for
+ * larges messages backed by a disk.
+ * Instead of loading all data into memeory it uses
+ * FileChannel to map regions of the file into memeory
+ * at a time.
+ *
+ * The write methods are not supported.
+ *
+ * From the standpoint of performance it is generally
+ * only worth mapping relatively large files into memory.
+ *
+ * FileMessage msg = new FileMessage(in,delProps,msgProps);
+ * session.messageTransfer(dest,msg,0,0);
+ *
+ * The messageTransfer method will read the file in chunks
+ * and stream it.
+ *
+ */
+public class FileMessage implements Message
+{
+ private MessageProperties _messageProperties;
+ private DeliveryProperties _deliveryProperties;
+ private FileChannel _fileChannel;
+ private int _chunkSize;
+ private long _fileSize;
+ private long _pos = 0;
+
+ public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
+ {
+ _messageProperties = messageProperties;
+ _deliveryProperties = deliveryProperties;
+
+ _fileChannel = in.getChannel();
+ _chunkSize = chunkSize;
+ _fileSize = _fileChannel.size();
+
+ if (_fileSize <= _chunkSize)
+ {
+ _chunkSize = (int)_fileSize;
+ }
+ }
+
+ public void appendData(byte[] src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public void appendData(ByteBuffer src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProperties;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _messageProperties;
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ int readLen = target.length <= _chunkSize ? target.length : _chunkSize;
+ if (_pos + readLen > _fileSize)
+ {
+ readLen = (int)(_fileSize - _pos);
+ }
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen);
+ _pos += readLen;
+ bb.get(target);
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if (_pos + _chunkSize > _fileSize)
+ {
+ _chunkSize = (int)(_fileSize - _pos);
+ }
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize);
+ _pos += _chunkSize;
+ return bb;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
index 5d3f6a6e3e..4ff83db939 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
@@ -1,8 +1,8 @@
-package org.apache.qpidity.impl;
+package org.apache.qpidity.client.util;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import org.apache.qpidity.DeliveryProperties;
@@ -13,11 +13,12 @@ import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.MessagePartListener;
/**
+ * This is a simple message assembler.
+ * Will call onMessage method of the adaptee
+ * when all message data is read.
*
- * Will call onMessage method as soon as data is avialable
- * The client can then start to process the data while
- * the rest of the data is read.
- *
+ * This is a good convinience utility for handling
+ * small messages
*/
public class MessagePartListenerAdapter implements MessagePartListener
{
@@ -25,7 +26,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
Message _currentMsg;
DeliveryProperties _currentDeliveryProps;
MessageProperties _currentMessageProps;
-
+
public MessagePartListenerAdapter(MessageListener listener)
{
_adaptee = listener;
@@ -37,12 +38,12 @@ public class MessagePartListenerAdapter implements MessagePartListener
ByteBuffer _readBuffer;
private int dataSize;
- public void appendData(byte[] src)
+ public void appendData(byte[] src) throws IOException
{
appendData(ByteBuffer.wrap(src));
}
- public void appendData(ByteBuffer src)
+ public void appendData(ByteBuffer src) throws IOException
{
_data.offer(src);
dataSize += src.remaining();
@@ -61,9 +62,9 @@ public class MessagePartListenerAdapter implements MessagePartListener
// since we provide the message only after completion
// we can assume that when this method is called we have
// received all data.
- public void readData(byte[] target)
+ public void readData(byte[] target) throws IOException
{
- if (_readBuffer == null)
+ if (_data.size() >0 && _readBuffer == null)
{
buildReadBuffer();
}
@@ -71,20 +72,59 @@ public class MessagePartListenerAdapter implements MessagePartListener
_readBuffer.get(target);
}
+ public ByteBuffer readData() throws IOException
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ return _readBuffer;
+ }
+
private void buildReadBuffer()
{
- _readBuffer = ByteBuffer.allocate(dataSize);
- for(ByteBuffer buf:_data)
+ //optimize for the simple cases
+ if(_data.size() == 1)
{
- _readBuffer.put(buf);
+ _readBuffer = _data.element().duplicate();
+ }
+ else
+ {
+ _readBuffer = ByteBuffer.allocate(dataSize);
+ for(ByteBuffer buf:_data)
+ {
+ _readBuffer.put(buf);
+ }
}
}
+
+ //hack for testing
+ @Override public String toString()
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+ byte[] b = new byte[_readBuffer.limit()];
+ _readBuffer.get(b);
+ return new String(b);
+ }
};
}
public void addData(ByteBuffer src)
{
- _currentMsg.appendData(src);
+ try
+ {
+ _currentMsg.appendData(src);
+ }
+ catch(IOException e)
+ {
+ // A chance for IO exception
+ // doesn't occur as we are using
+ // a ByteBuffer
+ }
}
public void messageHeaders(Struct... headers)
@@ -103,7 +143,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
}
public void messageReceived()
- {
+ {
_adaptee.onMessage(_currentMsg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
deleted file mode 100644
index d325054bee..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.qpidity.impl;
-
-import org.apache.qpidity.DeliveryProperties;
-import org.apache.qpidity.MessageProperties;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.Session;
-import org.apache.qpidity.client.Connection;
-
-public class DemoClient
-{
-
- public static final void main(String[] args)
- {
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
-
- Session ssn = conn.createSession(50000);
- ssn.setExceptionListener(new ExceptionListener()
- {
- public void onException(QpidException e)
- {
- System.out.println(e);
- }
- });
- ssn.queueDeclare("Queue1", null, null);
- ssn.sync();
-
- ssn.messageTransfer("Queue1", (short) 0, (short) 1);
- ssn.headers(new DeliveryProperties(),
- new MessageProperties());
- ssn.data("this is the data");
- ssn.endData();
-
- ssn.messageTransfer("Queue2", (short) 0, (short) 1);
- ssn.data("this should be rejected");
- ssn.endData();
- ssn.sync();
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 3869c2a793..c071280b37 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -18,13 +18,13 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
import javax.jms.*;
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index 8d707e4ccc..41c8bc02c6 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -26,9 +26,9 @@ import javax.jms.Queue;
import javax.jms.QueueBrowser;
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
/**
* Implementation of the JMS QueueBrowser interface