summaryrefslogtreecommitdiff
path: root/qpid/java/client-api/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:23:25 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:23:25 +0000
commit94f66150a60b45d1505cffa31ea8fa54364eabe3 (patch)
treee61a5e5e516098e5d78a0ef387e4915701bf85dd /qpid/java/client-api/src
parent6d96b69a7f6d1c64db6181fe2de6b1c9cc29c2d1 (diff)
downloadqpid-python-94f66150a60b45d1505cffa31ea8fa54364eabe3.tar.gz
QPID-4027 Tied in the message implementation into the cpp/jni
implementation. A bit more cleanup and testing needs to be done. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350710 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client-api/src')
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java9
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java10
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java26
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java4
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java16
5 files changed, 48 insertions, 17 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
index fa23eae3da..3ec833b6cc 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
@@ -18,6 +18,7 @@
package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.MessageFactory;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
@@ -29,6 +30,8 @@ import org.apache.qpid.messaging.Session;
*/
public class CppConnection implements Connection
{
+ private static MessageFactory _MSG_FACTORY = new CppMessageFactory();
+
private org.apache.qpid.messaging.cpp.jni.Connection _cppConn;
public CppConnection(String url)
@@ -78,4 +81,10 @@ public class CppConnection implements Connection
{
return _cppConn.getAuthenticatedUsername();
}
+
+ @Override
+ public MessageFactory getMessageFactory()
+ {
+ return _MSG_FACTORY;
+ }
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
index 2bc7391376..be2cf0993f 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
@@ -26,27 +26,28 @@ public class CppReceiver implements Receiver
{
private CppSession _ssn;
private org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
+ private CppMessageFactory _msgFactory;
public CppReceiver(CppSession ssn,
- org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
+ org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver) throws MessagingException
{
_ssn = ssn;
_cppReceiver = cppReceiver;
+ _msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory();
}
@Override
public Message get(long timeout) throws MessagingException
{
org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get(CppDuration.getDuration(timeout));
- return new TextMessage(m.getContent());
-
+ return _msgFactory.createMessage(m);
}
@Override
public Message fetch(long timeout) throws MessagingException
{
org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch(CppDuration.getDuration(timeout));
- return new TextMessage(m);
+ return _msgFactory.createMessage(m);
}
@Override
@@ -104,5 +105,4 @@ public class CppReceiver implements Receiver
_ssn.checkError();
return _ssn;
}
-
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
index 722b7c16bb..bb981b2c25 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
@@ -21,23 +21,45 @@ import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.ext.MessageInternal;
public class CppSender implements Sender
{
private CppSession _ssn;
private org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
+ private CppMessageFactory _msgFactory;
public CppSender(CppSession ssn,
- org.apache.qpid.messaging.cpp.jni.Sender cppSender)
+ org.apache.qpid.messaging.cpp.jni.Sender cppSender) throws MessagingException
{
_ssn = ssn;
_cppSender = cppSender;
+ _msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory();
}
@Override
public void send(Message message, boolean sync) throws MessagingException
{
- _cppSender.send(((TextMessage)message).getCppMessage(),true);
+ org.apache.qpid.messaging.cpp.jni.Message m = convertForSending(message);
+ _cppSender.send(m,true);
+ }
+
+ private org.apache.qpid.messaging.cpp.jni.Message convertForSending(Message m) throws MessagingException
+ {
+ if((m instanceof MessageInternal) &&
+ (_msgFactory.getClass() == ((MessageInternal)m).getMessageFactoryClass())
+ )
+ {
+ org.apache.qpid.messaging.cpp.jni.Message msg =
+ (org.apache.qpid.messaging.cpp.jni.Message)((MessageInternal)m).getFactorySpecificMessageDelegate();
+ msg.setContentAsByteBuffer(m.getContent());
+ return msg;
+ }
+ else
+ {
+ throw new MessagingException("Incompatible message implementation." +
+ "You need to use the MessageFactory given by the connection that owns this ");
+ }
}
@Override
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
index 70303a52a7..ff9f35638c 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
@@ -30,7 +30,7 @@ public class CppTest
{
public static void main(String[] args) throws Exception
{
- Connection con = ConnectionFactory.get().createConnection("localhost:5672");
+ /*Connection con = ConnectionFactory.get().createConnection("localhost:5672");
con.open();
Session ssn = con.createSession("hello");
System.out.println("Got a session object " + ssn);
@@ -64,7 +64,7 @@ public class CppTest
System.out.println("Msg toString() : " + m);
ssn.close();
- con.close();
+ con.close();*/
}
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
index f635cfcac5..5b2276eb04 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
@@ -180,8 +180,9 @@ public class ConnectionManagementDecorator implements ConnectionExt
}
@Override
- public MessageFactory getMessageFactory()
+ public MessageFactory getMessageFactory() throws MessagingException
{
+ checkClosedAndThrowException();
return _delegate.getMessageFactory();
}
@@ -254,6 +255,12 @@ public class ConnectionManagementDecorator implements ConnectionExt
return _connectionLock;
}
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+ }
+
private void checkClosedAndThrowException() throws ConnectionException
{
checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection");
@@ -290,11 +297,4 @@ public class ConnectionManagementDecorator implements ConnectionExt
// TODO add local IP and pid to the beginning;
return _ssnNameGenerator.generate().toString();
}
-
- @Override
- public void recreate() throws MessagingException
- {
- // TODO Auto-generated method stub
-
- }
}