diff options
Diffstat (limited to 'java')
3 files changed, 85 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java index 3c13327b52..cab5c82411 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java @@ -1,6 +1,7 @@ -package org.apache.qpidity.nclient; + package org.apache.qpidity.nclient; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; @@ -17,27 +18,14 @@ public class JMSTestCase javax.jms.Session ssn = con.createSession(false, 1); - javax.jms.Destination dest = new AMQTopic(new AMQShortString("amq.topic"),"myTopic"); - javax.jms.MessageProducer prod = ssn.createProducer(dest); - javax.jms.MessageConsumer cons = ssn.createConsumer(dest,"targetMessage = TRUE"); + javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test"); + javax.jms.MessageConsumer cons = ssn.createConsumer(dest); - javax.jms.TextMessage msg = ssn.createTextMessage(); - msg.setText("This is a test message"); - msg.setBooleanProperty("targetMessage", false); - prod.send(msg); + javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive(); - msg.setBooleanProperty("targetMessage", true); - prod.send(msg); - - javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait(); - - if (m == null) - { - System.out.println("message is null"); - } - else + if (m != null) { - System.out.println("message is not null"); + System.out.println("Message" + m); } } catch(Exception e) @@ -45,4 +33,23 @@ public class JMSTestCase e.printStackTrace(); } } + + /* javax.jms.TextMessage msg = ssn.createTextMessage(); + msg.setText("This is a test message"); + msg.setBooleanProperty("targetMessage", false); + prod.send(msg); + + msg.setBooleanProperty("targetMessage", true); + prod.send(msg); + + javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait(); + + if (m == null) + { + System.out.println("message is null"); + } + else + { + System.out.println("message is not null" + m); + }*/ } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index ce8e98bc68..3834f043eb 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -2,28 +2,27 @@ package org.apache.qpidity.nclient.impl; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import java.util.Properties; -import java.nio.ByteBuffer; -import org.apache.qpidity.transport.Option; import org.apache.qpidity.QpidException; -import org.apache.qpidity.transport.Range; -import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.api.Message; import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.nclient.MessagePartListener; +import org.apache.qpidity.transport.Option; +import org.apache.qpidity.transport.Range; +import org.apache.qpidity.transport.RangeSet; /** - * Implements a Qpid Sesion. + * Implements a Qpid Sesion. */ public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession { static { MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024; - String max = "message_size_before_sync"; + String max = "message_size_before_sync"; if (System.getProperties().containsKey(max)) { try @@ -37,12 +36,12 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen } } - private static long MAX_NOT_SYNC_DATA_LENGH ; + private static long MAX_NOT_SYNC_DATA_LENGH; private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); private ClosedListener _exceptionListner; private RangeSet _acquiredMessages; private RangeSet _rejectedMessages; - private long _dataSentNotSync; + private long _currentDataSizeNotSynced; public void messageAcknowledge(RangeSet ranges) @@ -65,21 +64,38 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen // The javadoc clearly says that this method is suitable for small messages // therefore reading the content in one shot. ByteBuffer data = msg.readData(); - _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit(); super.messageTransfer(destination, confirmMode, acquireMode); super.header(msg.getDeliveryProperties(),msg.getMessageProperties()); super.data( data ); super.endData(); - if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH) - { - sync(); - } } public void sync() { - _dataSentNotSync = 0; super.sync(); + _currentDataSizeNotSynced = 0; + } + + /* ------------------------- + * Data methods + * ------------------------*/ + + public void data(ByteBuffer buf) + { + _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining(); + super.data(buf); + } + + public void data(String str) + { + _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length; + super.data(str); + } + + public void data(byte[] bytes) + { + _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length; + super.data(bytes); } public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException @@ -89,7 +105,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen boolean b = true; int count = 0; while(b) - { + { try { System.out.println("count : " + count++); @@ -99,11 +115,20 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { b = false; } - } - + } + super.endData(); } - + + public void endData() + { + super.endData(); + if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH) + { + sync(); + } + } + public RangeSet getAccquiredMessages() { return _acquiredMessages; @@ -113,36 +138,36 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { return _rejectedMessages; } - + public void setMessageListener(String destination, MessagePartListener listener) { if (listener == null) { throw new IllegalArgumentException("Cannot set message listener to null"); } - _messageListeners.put(destination, listener); + _messageListeners.put(destination, listener); } - + public void setClosedListener(ClosedListener exceptionListner) { - _exceptionListner = exceptionListner; - } - + _exceptionListner = exceptionListner; + } + void setAccquiredMessages(RangeSet acquiredMessages) { _acquiredMessages = acquiredMessages; } - + void setRejectedMessages(RangeSet rejectedMessages) { _rejectedMessages = rejectedMessages; } - + void notifyException(QpidException ex) { _exceptionListner.onClosed(null, null); } - + Map<String,MessagePartListener> getMessageListerners() { return _messageListeners; diff --git a/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java index 6e722776e8..9da8e24acc 100644 --- a/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -327,15 +327,15 @@ public class XAResourceImpl implements XAResource */ public Xid[] recover(int flag) throws XAException { - // the flag is ignored + // the flag is ignored Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover(); DtxCoordinationRecoverResult res = future.get(); // todo make sure that the keys of the returned map are the xids Xid[] result = new Xid[res.getInDoubt().size()]; int i = 0; - try + /* try { - for (Object xid : res.getInDoubt()) + /* for (Object xid : res.getInDoubt()) { result[i] = new XidImpl((String) xid); i++; @@ -348,7 +348,7 @@ public class XAResourceImpl implements XAResource _logger.debug("Cannot convert string into Xid ", e); } throw new XAException(XAException.XAER_PROTO); - } + }*/ return result; } |
