diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-01-16 22:21:07 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-01-16 22:21:07 +0000 |
| commit | 4b1efc5908b5a8c5a9bc274f7a8e8ff2d5ddccab (patch) | |
| tree | 8ed105406708cde64ccc8d0d6f6dfb7163fed6fd /java | |
| parent | 648f1a6384730ccf3183d3507663c849d98f19f0 (diff) | |
| download | qpid-python-4b1efc5908b5a8c5a9bc274f7a8e8ff2d5ddccab.tar.gz | |
Fixed various compilation errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@612593 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
15 files changed, 164 insertions, 286 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java index 0db54af3b6..7c27051fb2 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java @@ -1,18 +1,49 @@ package org.apache.qpid.example.amqpexample.direct; +import java.nio.ByteBuffer; + +import org.apache.qpidity.api.Message; import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.Connection; import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.transport.DeliveryProperties; -public class DirectProducer +public class DirectProducer implements MessageListener { - /** - * This sends 10 messages to the - * amq.direct exchange using the - * routing key as "routing_key" - * - */ + boolean finish = false; + + public void onMessage(Message m) + { + String data = null; + + try + { + ByteBuffer buf = m.readData(); + byte[] b = new byte[buf.remaining()]; + buf.get(b); + data = new String(b); + } + catch(Exception e) + { + System.out.print("Error reading message"); + e.printStackTrace(); + } + + System.out.println("Message: " + data); + + + if (data != null && data.equals("That's all, folks!")) + { + finish = true; + } + } + + public boolean isFinished() + { + return finish; + } + public static void main(String[] args) { // Create connection diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java deleted file mode 100755 index d9573b0425..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.apache.qpid.example.amqpexample.fannout; - -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.Session; - -/** - * This creates a queue a queue and binds it to the - * amq.direct exchange - * - */ -public class DeclareQueue -{ - - public static void main(String[] args) - { - // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } - - // Create session - Session session = con.createSession(0); - - // declare and bind queue - session.queueDeclare("message_queue", null, null); - session.queueBind("message_queue", "amq.fanout",null, null); - - // confirm completion - session.sync(); - - //cleanup - session.sessionClose(); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } - } -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java deleted file mode 100755 index 752d973998..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.qpid.example.amqpexample.fannout; - -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.transport.DeliveryProperties; - -public class FannoutProducer -{ - /** - * This sends 10 messages to the - * amq.fannout exchange - */ - public static void main(String[] args) - { - // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } - - // Create session - Session session = con.createSession(0); - DeliveryProperties deliveryProps = new DeliveryProperties(); - deliveryProps.setRoutingKey("routing_key"); - - for (int i=0; i<10; i++) - { - session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); - session.header(deliveryProps); - session.data("Message " + i); - session.endData(); - } - - session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); - session.header(deliveryProps); - session.data("That's all, folks!"); - session.endData(); - - // confirm completion - session.sync(); - - //cleanup - session.sessionClose(); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } - } - -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java deleted file mode 100755 index 3fada3422c..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.apache.qpid.example.amqpexample.fannout; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.nclient.util.MessageListener; -import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; - -/** - * This listens to messages on a queue and terminates - * when it sees the final message - * - */ -public class Listener implements MessageListener -{ - boolean finish = false; - - public void onMessage(Message m) - { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } - } - - public boolean isFinished() - { - return finish; - } - - /** - * This sends 10 messages to the - * amq.direct exchange using the - * routing key as "routing_key" - * - */ - public static void main(String[] args) - { - // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } - - // Create session - Session session = con.createSession(0); - - // Create an instance of the listener - Listener listener = new Listener(); - - // create a subscription - session.messageSubscribe("message_queue", - "listener_destination", - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); - - - // issue credits - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11); - - // confirm completion - session.sync(); - - // check to see if we have received all the messages - while (!listener.isFinished()){} - System.out.println("Shutting down listener for listener_destination"); - session.messageCancel("listener_destination"); - - //cleanup - session.sessionClose(); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } - } - -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties new file mode 100644 index 0000000000..1d428d26d5 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = qpid:password=pass;username=name@tcp:localhost:5672 + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.directQueue = direct://amq.direct//message_queue?routingkey='routing_key'
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/jndi/Example.properties b/java/client/src/main/java/org/apache/qpid/jndi/Example.properties index c457e94cab..def53d8494 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/Example.properties +++ b/java/client/src/main/java/org/apache/qpid/jndi/Example.properties @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -37,3 +37,4 @@ topic.ibmStocks = stocks.nyse.ibm # NOTE: Qpid currently only supports direct,topics and headers # destination.[jniName] = [BindingURL] destination.direct = direct://amq.direct//directQueue +destination.directQueue = direct://amq.direct//message_queue?routingkey="routing_key" diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index f6e7911078..4cca33f300 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -1,12 +1,14 @@ package org.apache.qpidity.nclient; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.qpid.client.url.URLParser_0_10; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.QpidURL; -import org.apache.qpidity.BrokerDetails; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; import org.apache.qpidity.nclient.impl.ClientSession; @@ -82,6 +84,7 @@ public class Client implements org.apache.qpidity.nclient.Connection { System.out.println("using MINA"); _conn = MinaHandler.connect(host, port,connectionDelegate); + // _conn = NativeHandler.connect(host, port,connectionDelegate); } // XXX: hardcoded version numbers @@ -101,12 +104,34 @@ public class Client implements org.apache.qpidity.nclient.Connection } } + public void connect(String url)throws QpidException + { + URLParser_0_10 parser = null; + try + { + parser = new URLParser_0_10(url); + } + catch(Exception e) + { + throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e); + } + List<BrokerDetails> brokers = parser.getAllBrokerDetails(); + BrokerDetails brokerDetail = brokers.get(0); + connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"), + brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"), + brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password")); + } + /* * Until the dust settles with the URL disucssion * I am not going to implement this. */ public void connect(QpidURL url) throws QpidException { + throw new UnsupportedOperationException("Not implemented"); + } + + /* { // temp impl to tests BrokerDetails details = url.getAllBrokerDetails().get(0); connect(details.getHost(), @@ -115,6 +140,7 @@ public class Client implements org.apache.qpidity.nclient.Connection details.getUserName(), details.getPassword()); } +*/ public void close() throws QpidException { diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java b/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java index d486c86f33..95d2b07f31 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java @@ -18,7 +18,6 @@ */ package org.apache.qpidity.nclient; -import org.apache.qpid.url.QpidURL; import org.apache.qpidity.QpidException; /** @@ -28,23 +27,24 @@ public interface Connection { /** * Establish the connection using the given parameters - * + * * @param host * @param port * @param username * @param password * @throws QpidException - */ + */ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; - - /** - * Establish the connection with the broker identified by the provided URL. - * - * @param url The URL of the broker. - * @throws QpidException If the communication layer fails to connect with the broker. - */ - public void connect(QpidURL url) throws QpidException; - + + + /** + * Establish the connection with the broker identified by the URL. + * + * @param url The URL of the broker. + * @throws QpidException If the communication layer fails to connect with the broker. + */ + public void connect(String url) throws QpidException; + /** * Close this connection. * @@ -81,6 +81,6 @@ public interface Connection * * @param exceptionListner The execptionListener */ - + public void setClosedListener(ClosedListener exceptionListner); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java index cab5c82411..7eb482c26b 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java @@ -1,5 +1,8 @@ package org.apache.qpidity.nclient; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; @@ -20,13 +23,45 @@ public class JMSTestCase javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test"); javax.jms.MessageConsumer cons = ssn.createConsumer(dest); + javax.jms.MessageProducer prod = ssn.createProducer(dest); + + //javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive(); + /* cons.setMessageListener(new MessageListener() + { + public void onMessage(Message m) + { + javax.jms.TextMessage m2 = (javax.jms.TextMessage)m; + try + { + System.out.println("m : " + m2.getText()); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + + });*/ - javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive(); + javax.jms.TextMessage msg = ssn.createTextMessage(); + msg.setText("This is a test message"); + msg.setBooleanProperty("targetMessage", false); + prod.send(msg); - if (m != null) + msg.setBooleanProperty("targetMessage", true); + prod.send(msg); + + javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait(); + + if (m == null) + { + System.out.println("message is null"); + } + else { - System.out.println("Message" + m); + System.out.println("message is not null" + m); } + } catch(Exception e) { @@ -34,22 +69,4 @@ public class JMSTestCase } } - /* javax.jms.TextMessage msg = ssn.createTextMessage(); - msg.setText("This is a test message"); - msg.setBooleanProperty("targetMessage", false); - prod.send(msg); - - msg.setBooleanProperty("targetMessage", true); - prod.send(msg); - - javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait(); - - if (m == null) - { - System.out.println("message is null"); - } - else - { - System.out.println("message is not null" + m); - }*/ } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index 7328efc496..b3d3a7c12d 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,11 +47,11 @@ public interface Session public static final short MESSAGE_FLOW_MODE_WINDOW = 1; public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; public static final short MESSAGE_FLOW_UNIT_BYTE = 1; + public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF; public static final short MESSAGE_REJECT_CODE_GENERIC = 0; public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1; public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1; - public static final short MESSAGE_FLOW_MAX_BYTES=1000; //------------------------------------------------------ // Session housekeeping methods @@ -75,9 +75,9 @@ public interface Session */ public void sessionSuspend(); - //------------------------------------------------------ + //------------------------------------------------------ // Messaging methods - // Producer + // Producer //------------------------------------------------------ /** * Transfer the given @@ -464,7 +464,7 @@ public interface Session public void txRollback() throws IllegalStateException; //--------------------------------------------- - // Queue methods + // Queue methods //--------------------------------------------- /** @@ -586,7 +586,7 @@ public interface Session Map<String, Object> arguments); // -------------------------------------- - // exhcange methods + // exhcange methods // -------------------------------------- /** diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index d9434419da..0a25ea3961 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -44,7 +44,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen } private static long MAX_NOT_SYNC_DATA_LENGH; - private static long MAX_NOT_FLUSH_DATA_LENGH; + private static long MAX_NOT_FLUSH_DATA_LENGH; private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); private ClosedListener _exceptionListner; private RangeSet _acquiredMessages; diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java index 541d955cbd..96ec98a45a 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java @@ -83,7 +83,6 @@ public class DemoClient ssn.data("Topic message"); ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); ssn.endData(); - ssn.sync(); } } diff --git a/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java index 6e90a2a4cd..de313e7bed 100644 --- a/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -122,7 +122,7 @@ public class ConnectionImpl implements Connection protected ConnectionImpl(QpidURL qpidURL) throws QpidException { _qpidConnection = Client.createConnection(); - _qpidConnection.connect(qpidURL); + //_qpidConnection.connect(qpidURL); } //---- Interface javax.njms.Connection ---// diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java index a618a6599d..1124e070a1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java @@ -40,11 +40,7 @@ public class MessageConsumerTest extends Options implements MessageListener Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination); _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE); MessageConsumer _consumer = _session.createConsumer(dest); - if(!_synchronous) - { - _consumer.setMessageListener(this); - } - + _consumer.setMessageListener(this); _startTime = System.currentTimeMillis(); if(Boolean.getBoolean("collect_stats")) { diff --git a/java/pom.xml b/java/pom.xml index 222c3939d4..90ea21f362 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -503,12 +503,12 @@ under the License. <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> - <version>1.1.5</version> + <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-filter-ssl</artifactId> - <version>1.1.5</version> + <version>1.0.0</version> </dependency> <!-- <dependency> |
