From 94f66150a60b45d1505cffa31ea8fa54364eabe3 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 15 Jun 2012 17:23:25 +0000 Subject: QPID-4027 Tied in the message implementation into the cpp/jni implementation. A bit more cleanup and testing needs to be done. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350710 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/messaging/cpp/CppConnection.java | 9 ++++++++ .../org/apache/qpid/messaging/cpp/CppReceiver.java | 10 ++++----- .../org/apache/qpid/messaging/cpp/CppSender.java | 26 ++++++++++++++++++++-- .../org/apache/qpid/messaging/cpp/CppTest.java | 4 ++-- .../util/ConnectionManagementDecorator.java | 16 ++++++------- 5 files changed, 48 insertions(+), 17 deletions(-) (limited to 'qpid/java/client-api/src') diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java index fa23eae3da..3ec833b6cc 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java @@ -18,6 +18,7 @@ package org.apache.qpid.messaging.cpp; import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.MessageFactory; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Session; @@ -29,6 +30,8 @@ import org.apache.qpid.messaging.Session; */ public class CppConnection implements Connection { + private static MessageFactory _MSG_FACTORY = new CppMessageFactory(); + private org.apache.qpid.messaging.cpp.jni.Connection _cppConn; public CppConnection(String url) @@ -78,4 +81,10 @@ public class CppConnection implements Connection { return _cppConn.getAuthenticatedUsername(); } + + @Override + public MessageFactory getMessageFactory() + { + return _MSG_FACTORY; + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java index 2bc7391376..be2cf0993f 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java @@ -26,27 +26,28 @@ public class CppReceiver implements Receiver { private CppSession _ssn; private org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver; + private CppMessageFactory _msgFactory; public CppReceiver(CppSession ssn, - org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver) + org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver) throws MessagingException { _ssn = ssn; _cppReceiver = cppReceiver; + _msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory(); } @Override public Message get(long timeout) throws MessagingException { org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get(CppDuration.getDuration(timeout)); - return new TextMessage(m.getContent()); - + return _msgFactory.createMessage(m); } @Override public Message fetch(long timeout) throws MessagingException { org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch(CppDuration.getDuration(timeout)); - return new TextMessage(m); + return _msgFactory.createMessage(m); } @Override @@ -104,5 +105,4 @@ public class CppReceiver implements Receiver _ssn.checkError(); return _ssn; } - } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java index 722b7c16bb..bb981b2c25 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java @@ -21,23 +21,45 @@ import org.apache.qpid.messaging.Message; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Sender; import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.ext.MessageInternal; public class CppSender implements Sender { private CppSession _ssn; private org.apache.qpid.messaging.cpp.jni.Sender _cppSender; + private CppMessageFactory _msgFactory; public CppSender(CppSession ssn, - org.apache.qpid.messaging.cpp.jni.Sender cppSender) + org.apache.qpid.messaging.cpp.jni.Sender cppSender) throws MessagingException { _ssn = ssn; _cppSender = cppSender; + _msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory(); } @Override public void send(Message message, boolean sync) throws MessagingException { - _cppSender.send(((TextMessage)message).getCppMessage(),true); + org.apache.qpid.messaging.cpp.jni.Message m = convertForSending(message); + _cppSender.send(m,true); + } + + private org.apache.qpid.messaging.cpp.jni.Message convertForSending(Message m) throws MessagingException + { + if((m instanceof MessageInternal) && + (_msgFactory.getClass() == ((MessageInternal)m).getMessageFactoryClass()) + ) + { + org.apache.qpid.messaging.cpp.jni.Message msg = + (org.apache.qpid.messaging.cpp.jni.Message)((MessageInternal)m).getFactorySpecificMessageDelegate(); + msg.setContentAsByteBuffer(m.getContent()); + return msg; + } + else + { + throw new MessagingException("Incompatible message implementation." + + "You need to use the MessageFactory given by the connection that owns this "); + } } @Override diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java index 70303a52a7..ff9f35638c 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java @@ -30,7 +30,7 @@ public class CppTest { public static void main(String[] args) throws Exception { - Connection con = ConnectionFactory.get().createConnection("localhost:5672"); + /*Connection con = ConnectionFactory.get().createConnection("localhost:5672"); con.open(); Session ssn = con.createSession("hello"); System.out.println("Got a session object " + ssn); @@ -64,7 +64,7 @@ public class CppTest System.out.println("Msg toString() : " + m); ssn.close(); - con.close(); + con.close();*/ } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java index f635cfcac5..5b2276eb04 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java @@ -180,8 +180,9 @@ public class ConnectionManagementDecorator implements ConnectionExt } @Override - public MessageFactory getMessageFactory() + public MessageFactory getMessageFactory() throws MessagingException { + checkClosedAndThrowException(); return _delegate.getMessageFactory(); } @@ -254,6 +255,12 @@ public class ConnectionManagementDecorator implements ConnectionExt return _connectionLock; } + @Override + public void recreate() throws MessagingException + { + // TODO Auto-generated method stub + } + private void checkClosedAndThrowException() throws ConnectionException { checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection"); @@ -290,11 +297,4 @@ public class ConnectionManagementDecorator implements ConnectionExt // TODO add local IP and pid to the beginning; return _ssnNameGenerator.generate().toString(); } - - @Override - public void recreate() throws MessagingException - { - // TODO Auto-generated method stub - - } } -- cgit v1.2.1