From 3bdd4780e35b4454477cf423b31ad6915df357fa Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 25 Sep 2013 15:15:18 +0000 Subject: NO-JIRA : set svn:eol-style to native git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1526202 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/amqp_1_0/client/Demo.java | 862 +++++++------- .../org/apache/qpid/amqp_1_0/client/Request.java | 472 ++++---- .../java/org/apache/qpid/amqp_1_0/client/Send.java | 470 ++++---- .../apache/qpid/amqp_1_0/client/Connection.java | 824 ++++++------- .../org/apache/qpid/amqp_1_0/client/Message.java | 296 ++--- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 1230 ++++++++++---------- .../org/apache/qpid/amqp_1_0/client/Sender.java | 904 +++++++------- .../org/apache/qpid/amqp_1_0/client/Session.java | 768 ++++++------ 8 files changed, 2913 insertions(+), 2913 deletions(-) diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java index 5e77b7097c..26b0d80783 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -1,431 +1,431 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -public class Demo extends Util -{ - private static final String USAGE_STRING = "demo [options] [ ...]\n\nOptions:"; - private static final String OPCODE = "opcode"; - private static final String ACTION = "action"; - private static final String MESSAGE_ID = "message-id"; - private static final String VENDOR = "vendor"; - private static final String LOG = "log"; - private static final String RECEIVED = "received"; - private static final String TEST = "test"; - private static final String APACHE = "apache"; - private static final String SENT = "sent"; - private static final String LINK_REF = "link-ref"; - private static final String HOST = "host"; - private static final String PORT = "port"; - private static final String SASL_USER = "sasl-user"; - private static final String SASL_PASSWORD = "sasl-password"; - private static final String ROLE = "role"; - private static final String ADDRESS = "address"; - private static final String SENDER = "sender"; - private static final String SEND_MESSAGE = "send-message"; - private static final String ANNOUNCE = "announce"; - private static final String MESSAGE_VENDOR = "message-vendor"; - private static final String CREATE_LINK = "create-link"; - - public static void main(String[] args) - { - new Demo(args).run(); - } - - public Demo(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected boolean hasWindowSizeOption() - { - return false; - } - - public void run() - { - - try - { - - final String vendor = getArgs()[0]; - final String queue = "control"; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - - Receiver responseReceiver; - - responseReceiver = session.createTemporaryQueueReceiver(); - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - - Sender s = session.createSender(queue, getWindowSize(), getMode()); - - - Properties properties = new Properties(); - properties.setMessageId(java.util.UUID.randomUUID()); - properties.setReplyTo(responseReceiver.getAddress()); - - HashMap appPropMap = new HashMap(); - ApplicationProperties appProperties = new ApplicationProperties(appPropMap); - - appPropMap.put(OPCODE, ANNOUNCE); - appPropMap.put(VENDOR, vendor); - appPropMap.put(ADDRESS,responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { properties, appProperties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - try - { - s.send(message1); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - Map sendingLinks = new HashMap(); - Map receivingLinks = new HashMap(); - - - boolean done = false; - - while(!done) - { - boolean wait = true; - Message m = responseReceiver.receive(false); - if(m != null) - { - List
payload = m.getPayload(); - wait = false; - ApplicationProperties props = m.getApplicationProperties(); - Map map = props.getValue(); - String op = (String) map.get(OPCODE); - if("reset".equals(op)) - { - for(Sender sender : sendingLinks.values()) - { - try - { - sender.close(); - Session session1 = sender.getSession(); - session1.close(); - session1.getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - for(Receiver receiver : receivingLinks.values()) - { - try - { - receiver.close(); - receiver.getSession().close(); - receiver.getSession().getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - sendingLinks.clear(); - receivingLinks.clear(); - } - else if(CREATE_LINK.equals(op)) - { - Object linkRef = map.get(LINK_REF); - String host = (String) map.get(HOST); - Object o = map.get(PORT); - int port = Integer.parseInt(String.valueOf(o)); - String user = (String) map.get(SASL_USER); - String password = (String) map.get(SASL_PASSWORD); - String role = (String) map.get(ROLE); - String address = (String) map.get(ADDRESS); - System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); - try{ - - - Connection conn2 = new Connection(host, port, user, password, host); - Session session2 = conn2.createSession(); - if(sendingLinks.containsKey(linkRef)) - { - try - { - sendingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(receivingLinks.containsKey(linkRef)) - { - try - { - receivingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(SENDER.equals(role)) - { - - System.err.println("%%% Creating sender (" + linkRef + ")"); - Sender sender = session2.createSender(address); - sendingLinks.put(linkRef, sender); - } - else - { - - System.err.println("%%% Creating receiver (" + linkRef + ")"); - Receiver receiver2 = session2.createReceiver(address); - receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - receivingLinks.put(linkRef, receiver2); - } - } - catch(Exception e) - { - e.printStackTrace(); - } - } - else if(SEND_MESSAGE.equals(op)) - { - Sender sender = sendingLinks.get(map.get(LINK_REF)); - Properties m2props = new Properties(); - Object messageId = map.get(MESSAGE_ID); - m2props.setMessageId(messageId); - Map m2propmap = new HashMap(); - m2propmap.put(OPCODE, TEST); - m2propmap.put(VENDOR, vendor); - ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); - Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); - try - { - sender.send(m2); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, SENT); - m3propmap.put(MESSAGE_ID, messageId); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, vendor); - - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+messageId))); - try - { - s.send(m3); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - responseReceiver.acknowledge(m); - } - else - { - for(Map.Entry entry : receivingLinks.entrySet()) - { - m = entry.getValue().receive(false); - if(m != null) - { - wait = false; - - System.err.println("%%% Received message from " + entry.getKey()); - - Properties mp = m.getProperties(); - ApplicationProperties ap = m.getApplicationProperties(); - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, RECEIVED); - m3propmap.put(MESSAGE_ID, mp.getMessageId()); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+mp.getMessageId()))); - try - { - s.send(m3); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - entry.getValue().acknowledge(m); - } - - } - } - - if(wait) - { - try - { - Thread.sleep(500l); - } - catch (InterruptedException e) - { - e.printStackTrace(); //TODO. - } - } - - } - - - - - - - - - - s.close(); - session.close(); - conn.close(); - - } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; + +public class Demo extends Util +{ + private static final String USAGE_STRING = "demo [options] [ ...]\n\nOptions:"; + private static final String OPCODE = "opcode"; + private static final String ACTION = "action"; + private static final String MESSAGE_ID = "message-id"; + private static final String VENDOR = "vendor"; + private static final String LOG = "log"; + private static final String RECEIVED = "received"; + private static final String TEST = "test"; + private static final String APACHE = "apache"; + private static final String SENT = "sent"; + private static final String LINK_REF = "link-ref"; + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String SASL_USER = "sasl-user"; + private static final String SASL_PASSWORD = "sasl-password"; + private static final String ROLE = "role"; + private static final String ADDRESS = "address"; + private static final String SENDER = "sender"; + private static final String SEND_MESSAGE = "send-message"; + private static final String ANNOUNCE = "announce"; + private static final String MESSAGE_VENDOR = "message-vendor"; + private static final String CREATE_LINK = "create-link"; + + public static void main(String[] args) + { + new Demo(args).run(); + } + + public Demo(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected boolean hasWindowSizeOption() + { + return false; + } + + public void run() + { + + try + { + + final String vendor = getArgs()[0]; + final String queue = "control"; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + + Receiver responseReceiver; + + responseReceiver = session.createTemporaryQueueReceiver(); + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + + Properties properties = new Properties(); + properties.setMessageId(java.util.UUID.randomUUID()); + properties.setReplyTo(responseReceiver.getAddress()); + + HashMap appPropMap = new HashMap(); + ApplicationProperties appProperties = new ApplicationProperties(appPropMap); + + appPropMap.put(OPCODE, ANNOUNCE); + appPropMap.put(VENDOR, vendor); + appPropMap.put(ADDRESS,responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { properties, appProperties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + try + { + s.send(message1); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + Map sendingLinks = new HashMap(); + Map receivingLinks = new HashMap(); + + + boolean done = false; + + while(!done) + { + boolean wait = true; + Message m = responseReceiver.receive(false); + if(m != null) + { + List
payload = m.getPayload(); + wait = false; + ApplicationProperties props = m.getApplicationProperties(); + Map map = props.getValue(); + String op = (String) map.get(OPCODE); + if("reset".equals(op)) + { + for(Sender sender : sendingLinks.values()) + { + try + { + sender.close(); + Session session1 = sender.getSession(); + session1.close(); + session1.getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + for(Receiver receiver : receivingLinks.values()) + { + try + { + receiver.close(); + receiver.getSession().close(); + receiver.getSession().getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + sendingLinks.clear(); + receivingLinks.clear(); + } + else if(CREATE_LINK.equals(op)) + { + Object linkRef = map.get(LINK_REF); + String host = (String) map.get(HOST); + Object o = map.get(PORT); + int port = Integer.parseInt(String.valueOf(o)); + String user = (String) map.get(SASL_USER); + String password = (String) map.get(SASL_PASSWORD); + String role = (String) map.get(ROLE); + String address = (String) map.get(ADDRESS); + System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); + try{ + + + Connection conn2 = new Connection(host, port, user, password, host); + Session session2 = conn2.createSession(); + if(sendingLinks.containsKey(linkRef)) + { + try + { + sendingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(receivingLinks.containsKey(linkRef)) + { + try + { + receivingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(SENDER.equals(role)) + { + + System.err.println("%%% Creating sender (" + linkRef + ")"); + Sender sender = session2.createSender(address); + sendingLinks.put(linkRef, sender); + } + else + { + + System.err.println("%%% Creating receiver (" + linkRef + ")"); + Receiver receiver2 = session2.createReceiver(address); + receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + receivingLinks.put(linkRef, receiver2); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + else if(SEND_MESSAGE.equals(op)) + { + Sender sender = sendingLinks.get(map.get(LINK_REF)); + Properties m2props = new Properties(); + Object messageId = map.get(MESSAGE_ID); + m2props.setMessageId(messageId); + Map m2propmap = new HashMap(); + m2propmap.put(OPCODE, TEST); + m2propmap.put(VENDOR, vendor); + ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); + Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); + try + { + sender.send(m2); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, SENT); + m3propmap.put(MESSAGE_ID, messageId); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, vendor); + + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+messageId))); + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + responseReceiver.acknowledge(m); + } + else + { + for(Map.Entry entry : receivingLinks.entrySet()) + { + m = entry.getValue().receive(false); + if(m != null) + { + wait = false; + + System.err.println("%%% Received message from " + entry.getKey()); + + Properties mp = m.getProperties(); + ApplicationProperties ap = m.getApplicationProperties(); + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, RECEIVED); + m3propmap.put(MESSAGE_ID, mp.getMessageId()); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+mp.getMessageId()))); + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + entry.getValue().acknowledge(m); + } + + } + } + + if(wait) + { + try + { + Thread.sleep(500l); + } + catch (InterruptedException e) + { + e.printStackTrace(); //TODO. + } + } + + } + + + + + + + + + + s.close(); + session.close(); + conn.close(); + + } + catch (ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return false; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java index dbe273182f..7845e318cb 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -1,236 +1,236 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -import java.util.Arrays; - -public class Request extends Util -{ - private static final String USAGE_STRING = "request [options]
[ ...]\n\nOptions:"; - - public static void main(String[] args) - { - new Request(args).run(); - } - - public Request(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public void run() - { - - try - { - - - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - Connection conn2; - Session session2; - Receiver responseReceiver; - - if(isUseMultipleConnections()) - { - conn2 = newConnection(); - session2 = conn2.createSession(); - responseReceiver = session2.createTemporaryQueueReceiver(); - } - else - { - conn2 = null; - session2 = null; - responseReceiver = session.createTemporaryQueueReceiver(); - } - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - - Sender s = session.createSender(queue, getWindowSize(), getMode()); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - int received = 0; - - if(getArgs().length >= 2) - { - message = getArgs()[1]; - if(message.length() < getMessageSize()) - { - StringBuilder builder = new StringBuilder(getMessageSize()); - builder.append(message); - for(int x = message.length(); x < getMessageSize(); x++) - { - builder.append('.'); - } - message = builder.toString(); - } - - for(int i = 0; i < getCount(); i++) - { - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - properties.setReplyTo(responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { new Header() , properties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - - Message responseMessage = responseReceiver.receive(false); - if(responseMessage != null) - { - responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); - received++; - } - } - } - - if(txn != null) - { - txn.commit(); - } - - - while(received < getCount()) - { - Message responseMessage = responseReceiver.receive(); - responseReceiver.acknowledge(responseMessage.getDeliveryTag()); - received++; - } - - - - - s.close(); - session.close(); - conn.close(); - - if(session2 != null) - { - session2.close(); - conn2.close(); - } - } - catch (Exception e) - { - e.printStackTrace(); //TODO. - } - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +import java.util.Arrays; + +public class Request extends Util +{ + private static final String USAGE_STRING = "request [options]
[ ...]\n\nOptions:"; + + public static void main(String[] args) + { + new Request(args).run(); + } + + public Request(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + public void run() + { + + try + { + + + final String queue = getArgs()[0]; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + Connection conn2; + Session session2; + Receiver responseReceiver; + + if(isUseMultipleConnections()) + { + conn2 = newConnection(); + session2 = conn2.createSession(); + responseReceiver = session2.createTemporaryQueueReceiver(); + } + else + { + conn2 = null; + session2 = null; + responseReceiver = session.createTemporaryQueueReceiver(); + } + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + int received = 0; + + if(getArgs().length >= 2) + { + message = getArgs()[1]; + if(message.length() < getMessageSize()) + { + StringBuilder builder = new StringBuilder(getMessageSize()); + builder.append(message); + for(int x = message.length(); x < getMessageSize(); x++) + { + builder.append('.'); + } + message = builder.toString(); + } + + for(int i = 0; i < getCount(); i++) + { + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + properties.setReplyTo(responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { new Header() , properties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + + Message responseMessage = responseReceiver.receive(false); + if(responseMessage != null) + { + responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); + received++; + } + } + } + + if(txn != null) + { + txn.commit(); + } + + + while(received < getCount()) + { + Message responseMessage = responseReceiver.receive(); + responseReceiver.acknowledge(responseMessage.getDeliveryTag()); + received++; + } + + + + + s.close(); + session.close(); + conn.close(); + + if(session2 != null) + { + session2.close(); + conn2.close(); + } + } + catch (Exception e) + { + e.printStackTrace(); //TODO. + } + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return true; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java index b4ae16ab3f..36aadc7851 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -1,235 +1,235 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.LineNumberReader; -import java.util.Arrays; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -public class Send extends Util -{ - private static final String USAGE_STRING = "send [options]
[ ...]\n\nOptions:"; - private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; - - - public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException - { - new Send(args).run(); - } - - - public Send(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasSubjectOption() - { - return true; - } - - public void run() - { - - final String queue = getArgs()[0]; - - String message = ""; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - if(getArgs().length == 2) - { - message = getArgs()[1]; - } - for(int i = 0; i < getCount(); i++) - { - - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - if(getSubject() != null) - { - properties.setSubject(getSubject()); - } - Section bodySection; - byte[] bytes = (message + " " + i).getBytes(); - if(bytes.length < getMessageSize()) - { - byte[] origBytes = bytes; - bytes = new byte[getMessageSize()]; - System.arraycopy(origBytes,0,bytes,0,origBytes.length); - for(int x = origBytes.length; x < bytes.length; x++) - { - bytes[x] = (byte) '.'; - } - bodySection = new Data(new Binary(bytes)); - } - else - { - bodySection = new AmqpValue(message + " " + i); - } - - Section[] sections = {properties, bodySection}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - } - } - else - { - for(int i = 1; i < getArgs().length; i++) - { - s.send(new Message(getArgs()[i]), txn); - } - - } - } - else - { - LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); - - - try - { - while((message = buf.readLine()) != null) - { - s.send(new Message(message), txn); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - } - - if(txn != null) - { - txn.commit(); - } - - s.close(); - - session.close(); - conn.close(); - } - catch (Exception e) - { - e.printStackTrace(); //TODO. - } - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.util.Arrays; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +public class Send extends Util +{ + private static final String USAGE_STRING = "send [options]
[ ...]\n\nOptions:"; + private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; + + + public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException + { + new Send(args).run(); + } + + + public Send(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return true; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + @Override + protected boolean hasSubjectOption() + { + return true; + } + + public void run() + { + + final String queue = getArgs()[0]; + + String message = ""; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + + Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + if(!useStdIn()) + { + if(getArgs().length <= 2) + { + if(getArgs().length == 2) + { + message = getArgs()[1]; + } + for(int i = 0; i < getCount(); i++) + { + + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + if(getSubject() != null) + { + properties.setSubject(getSubject()); + } + Section bodySection; + byte[] bytes = (message + " " + i).getBytes(); + if(bytes.length < getMessageSize()) + { + byte[] origBytes = bytes; + bytes = new byte[getMessageSize()]; + System.arraycopy(origBytes,0,bytes,0,origBytes.length); + for(int x = origBytes.length; x < bytes.length; x++) + { + bytes[x] = (byte) '.'; + } + bodySection = new Data(new Binary(bytes)); + } + else + { + bodySection = new AmqpValue(message + " " + i); + } + + Section[] sections = {properties, bodySection}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + } + } + else + { + for(int i = 1; i < getArgs().length; i++) + { + s.send(new Message(getArgs()[i]), txn); + } + + } + } + else + { + LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); + + + try + { + while((message = buf.readLine()) != null) + { + s.send(new Message(message), txn); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + } + + if(txn != null) + { + txn.commit(); + } + + s.close(); + + session.close(); + conn.close(); + } + catch (Exception e) + { + e.printStackTrace(); //TODO. + } + + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index 4c5ffeb177..f66a33b978 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -1,412 +1,412 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.net.ssl.SSLSocketFactory; -import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; -import org.apache.qpid.amqp_1_0.transport.AMQPTransport; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.qpid.amqp_1_0.transport.StateChangeListener; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.SaslFrameBody; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; - -public class Connection -{ - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); - private static final int MAX_FRAME_SIZE = 65536; - - private String _address; - private ConnectionEndpoint _conn; - private int _sessionCount; - - - public Connection(final String address, - final int port, - final String username, - final String password) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE); - } - - public Connection(final String address, - final int port, - final String username, - final String password, String remoteHostname) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize) throws ConnectionException - { - this(address,port,username,password,maxFrameSize,new Container()); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final Container container) throws ConnectionException - { - this(address,port,username,password,MAX_FRAME_SIZE,container); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container) throws ConnectionException - { - this(address,port,username,password,maxFrameSize,container, null); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container, - final String remoteHostname) throws ConnectionException - { - this(address,port,username,password,maxFrameSize,container,remoteHostname,false); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final Container container, - final boolean ssl) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final String remoteHost, - final boolean ssl) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final Container container, - final String remoteHost, - final boolean ssl) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl); - } - - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container, - final String remoteHostname, boolean ssl) throws ConnectionException - { - - _address = address; - - try - { - final Socket s; - if(ssl) - { - s = SSLSocketFactory.getDefault().createSocket(address, port); - } - else - { - s = new Socket(address, port); - } - - - Principal principal = username == null ? null : new Principal() - { - - public String getName() - { - return username; - } - }; - _conn = new ConnectionEndpoint(container, principal, password); - _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); - _conn.setRemoteAddress(s.getRemoteSocketAddress()); - _conn.setRemoteHostname(remoteHostname); - - - - ConnectionHandler.FrameOutput out = new ConnectionHandler.FrameOutput(_conn); - - - final OutputStream outputStream = s.getOutputStream(); - - ConnectionHandler.BytesSource src; - - if(_conn.requiresSASL()) - { - ConnectionHandler.FrameOutput saslOut = new ConnectionHandler.FrameOutput(_conn); - - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)3, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), - new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) - ); - - _conn.setSaslFrameOutput(saslOut); - } - else - { - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) - ); - } - - - ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn); - Thread outputThread = new Thread(outputHandler); - outputThread.setDaemon(true); - outputThread.start(); - _conn.setFrameOutputHandler(out); - - - - final ConnectionHandler handler = new ConnectionHandler(_conn); - final InputStream inputStream = s.getInputStream(); - - Thread inputThread = new Thread(new Runnable() - { - - public void run() - { - try - { - doRead(handler, inputStream); - } - finally - { - if(_conn.closedForInput() && _conn.closedForOutput()) - { - try - { - s.close(); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - }); - - inputThread.setDaemon(true); - inputThread.start(); - - _conn.open(); - - } - catch (IOException e) - { - throw new ConnectionException(e); - } - - - } - - private Connection(ConnectionEndpoint endpoint) - { - _conn = endpoint; - } - - - private void doRead(final AMQPTransport transport, final InputStream inputStream) - { - byte[] buf = new byte[2<<15]; - ByteBuffer bbuf = ByteBuffer.wrap(buf); - final Object lock = new Object(); - transport.setInputStateChangeListener(new StateChangeListener(){ - - public void onStateChange(final boolean active) - { - synchronized(lock) - { - lock.notifyAll(); - } - } - }); - - try - { - int read; - while((read = inputStream.read(buf)) != -1) - { - bbuf.position(0); - bbuf.limit(read); - - while(bbuf.hasRemaining() && transport.isOpenForInput()) - { - transport.processBytes(bbuf); - } - - - } - } - catch (IOException e) - { - e.printStackTrace(); - } - - } - - public Session createSession() throws ConnectionException - { - checkNotClosed(); - Session session = new Session(this,String.valueOf(_sessionCount++)); - return session; - } - - void checkNotClosed() throws ConnectionClosedException - { - if(getEndpoint().isClosed()) - { - throw new ConnectionClosedException(getEndpoint().getRemoteError()); - } - } - - public ConnectionEndpoint getEndpoint() - { - return _conn; - } - - public void awaitOpen() - { - synchronized(getEndpoint().getLock()) - { - while(!getEndpoint().isOpen() && !getEndpoint().isClosed()) - { - try - { - getEndpoint().getLock().wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - - } - - private void doRead(final ConnectionHandler handler, final InputStream inputStream) - { - byte[] buf = new byte[2<<15]; - - - try - { - int read; - boolean done = false; - while(!handler.isDone() && (read = inputStream.read(buf)) != -1) - { - ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); - Binary b = new Binary(buf,0,read); - - if(RAW_LOGGER.isLoggable(Level.FINE)) - { - RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); - } - while(bbuf.hasRemaining() && !handler.isDone()) - { - handler.parse(bbuf); - } - - - } - } - catch (IOException e) - { - e.printStackTrace(); - } - } - - public void close() - { - _conn.close(); - - synchronized (_conn.getLock()) - { - while(!_conn.closedForInput()) - { - try - { - _conn.getLock().wait(); - } - catch (InterruptedException e) - { - - } - } - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.net.ssl.SSLSocketFactory; +import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.transport.AMQPTransport; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.qpid.amqp_1_0.transport.StateChangeListener; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.FrameBody; +import org.apache.qpid.amqp_1_0.type.SaslFrameBody; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; + +public class Connection +{ + private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); + private static final int MAX_FRAME_SIZE = 65536; + + private String _address; + private ConnectionEndpoint _conn; + private int _sessionCount; + + + public Connection(final String address, + final int port, + final String username, + final String password) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE); + } + + public Connection(final String address, + final int port, + final String username, + final String password, String remoteHostname) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize) throws ConnectionException + { + this(address,port,username,password,maxFrameSize,new Container()); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final Container container) throws ConnectionException + { + this(address,port,username,password,MAX_FRAME_SIZE,container); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container) throws ConnectionException + { + this(address,port,username,password,maxFrameSize,container, null); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container, + final String remoteHostname) throws ConnectionException + { + this(address,port,username,password,maxFrameSize,container,remoteHostname,false); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final Container container, + final boolean ssl) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final String remoteHost, + final boolean ssl) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final Container container, + final String remoteHost, + final boolean ssl) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl); + } + + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container, + final String remoteHostname, boolean ssl) throws ConnectionException + { + + _address = address; + + try + { + final Socket s; + if(ssl) + { + s = SSLSocketFactory.getDefault().createSocket(address, port); + } + else + { + s = new Socket(address, port); + } + + + Principal principal = username == null ? null : new Principal() + { + + public String getName() + { + return username; + } + }; + _conn = new ConnectionEndpoint(container, principal, password); + _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); + _conn.setRemoteAddress(s.getRemoteSocketAddress()); + _conn.setRemoteHostname(remoteHostname); + + + + ConnectionHandler.FrameOutput out = new ConnectionHandler.FrameOutput(_conn); + + + final OutputStream outputStream = s.getOutputStream(); + + ConnectionHandler.BytesSource src; + + if(_conn.requiresSASL()) + { + ConnectionHandler.FrameOutput saslOut = new ConnectionHandler.FrameOutput(_conn); + + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)3, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), + new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + + _conn.setSaslFrameOutput(saslOut); + } + else + { + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + } + + + ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn); + Thread outputThread = new Thread(outputHandler); + outputThread.setDaemon(true); + outputThread.start(); + _conn.setFrameOutputHandler(out); + + + + final ConnectionHandler handler = new ConnectionHandler(_conn); + final InputStream inputStream = s.getInputStream(); + + Thread inputThread = new Thread(new Runnable() + { + + public void run() + { + try + { + doRead(handler, inputStream); + } + finally + { + if(_conn.closedForInput() && _conn.closedForOutput()) + { + try + { + s.close(); + } + catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + }); + + inputThread.setDaemon(true); + inputThread.start(); + + _conn.open(); + + } + catch (IOException e) + { + throw new ConnectionException(e); + } + + + } + + private Connection(ConnectionEndpoint endpoint) + { + _conn = endpoint; + } + + + private void doRead(final AMQPTransport transport, final InputStream inputStream) + { + byte[] buf = new byte[2<<15]; + ByteBuffer bbuf = ByteBuffer.wrap(buf); + final Object lock = new Object(); + transport.setInputStateChangeListener(new StateChangeListener(){ + + public void onStateChange(final boolean active) + { + synchronized(lock) + { + lock.notifyAll(); + } + } + }); + + try + { + int read; + while((read = inputStream.read(buf)) != -1) + { + bbuf.position(0); + bbuf.limit(read); + + while(bbuf.hasRemaining() && transport.isOpenForInput()) + { + transport.processBytes(bbuf); + } + + + } + } + catch (IOException e) + { + e.printStackTrace(); + } + + } + + public Session createSession() throws ConnectionException + { + checkNotClosed(); + Session session = new Session(this,String.valueOf(_sessionCount++)); + return session; + } + + void checkNotClosed() throws ConnectionClosedException + { + if(getEndpoint().isClosed()) + { + throw new ConnectionClosedException(getEndpoint().getRemoteError()); + } + } + + public ConnectionEndpoint getEndpoint() + { + return _conn; + } + + public void awaitOpen() + { + synchronized(getEndpoint().getLock()) + { + while(!getEndpoint().isOpen() && !getEndpoint().isClosed()) + { + try + { + getEndpoint().getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + + private void doRead(final ConnectionHandler handler, final InputStream inputStream) + { + byte[] buf = new byte[2<<15]; + + + try + { + int read; + boolean done = false; + while(!handler.isDone() && (read = inputStream.read(buf)) != -1) + { + ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); + Binary b = new Binary(buf,0,read); + + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); + } + while(bbuf.hasRemaining() && !handler.isDone()) + { + handler.parse(bbuf); + } + + + } + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + public void close() + { + _conn.close(); + + synchronized (_conn.getLock()) + { + while(!_conn.closedForInput()) + { + try + { + _conn.getLock().wait(); + } + catch (InterruptedException e) + { + + } + } + } + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java index 7c1172898b..e8ac1de6c1 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java @@ -1,148 +1,148 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -public class Message -{ - private Binary _deliveryTag; - private List
_payload = new ArrayList
(); - private Boolean _resume; - private boolean _settled; - private DeliveryState _deliveryState; - private Receiver _receiver; - - - public Message() - { - } - - public Message(Collection
sections) - { - _payload.addAll(sections); - } - - public Message(Section section) - { - this(Collections.singletonList(section)); - } - - public Message(String message) - { - this(new AmqpValue(message)); - } - - - public Binary getDeliveryTag() - { - return _deliveryTag; - } - - public void setDeliveryTag(Binary deliveryTag) - { - _deliveryTag = deliveryTag; - } - - public List
getPayload() - { - return Collections.unmodifiableList(_payload); - } - - private T getSection(Class clazz) - { - for(Section s : _payload) - { - if(clazz.isAssignableFrom(s.getClass())) - { - return (T) s; - } - } - return null; - } - - public ApplicationProperties getApplicationProperties() - { - return getSection(ApplicationProperties.class); - } - - public Properties getProperties() - { - return getSection(Properties.class); - } - - public Header getHeader() - { - return getSection(Header.class); - } - - - public void setResume(final Boolean resume) - { - _resume = resume; - } - - public boolean isResume() - { - return Boolean.TRUE.equals(_resume); - } - - public void setDeliveryState(DeliveryState state) - { - _deliveryState = state; - } - - public DeliveryState getDeliveryState() - { - return _deliveryState; - } - - public void setSettled(boolean settled) - { - _settled = settled; - } - - public boolean getSettled() - { - return _settled; - } - - public void setReceiver(final Receiver receiver) - { - _receiver = receiver; - } - - public Receiver getReceiver() - { - return _receiver; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class Message +{ + private Binary _deliveryTag; + private List
_payload = new ArrayList
(); + private Boolean _resume; + private boolean _settled; + private DeliveryState _deliveryState; + private Receiver _receiver; + + + public Message() + { + } + + public Message(Collection
sections) + { + _payload.addAll(sections); + } + + public Message(Section section) + { + this(Collections.singletonList(section)); + } + + public Message(String message) + { + this(new AmqpValue(message)); + } + + + public Binary getDeliveryTag() + { + return _deliveryTag; + } + + public void setDeliveryTag(Binary deliveryTag) + { + _deliveryTag = deliveryTag; + } + + public List
getPayload() + { + return Collections.unmodifiableList(_payload); + } + + private T getSection(Class clazz) + { + for(Section s : _payload) + { + if(clazz.isAssignableFrom(s.getClass())) + { + return (T) s; + } + } + return null; + } + + public ApplicationProperties getApplicationProperties() + { + return getSection(ApplicationProperties.class); + } + + public Properties getProperties() + { + return getSection(Properties.class); + } + + public Header getHeader() + { + return getSection(Header.class); + } + + + public void setResume(final Boolean resume) + { + _resume = resume; + } + + public boolean isResume() + { + return Boolean.TRUE.equals(_resume); + } + + public void setDeliveryState(DeliveryState state) + { + _deliveryState = state; + } + + public DeliveryState getDeliveryState() + { + return _deliveryState; + } + + public void setSettled(boolean settled) + { + _settled = settled; + } + + public boolean getSettled() + { + return _settled; + } + + public void setReceiver(final Receiver receiver) + { + _receiver = receiver; + } + + public Receiver getReceiver() + { + return _receiver; + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index 596931088f..5175d1d847 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -1,615 +1,615 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; - -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.*; -import org.apache.qpid.amqp_1_0.type.transport.Error; - -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class Receiver implements DeliveryStateHandler -{ - private ReceivingLinkEndpoint _endpoint; - private int _id; - private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100); - private Session _session; - - private Queue _prefetchQueue = new ConcurrentLinkedQueue(); - private Map _unsettledMap = new HashMap(); - private MessageArrivalListener _messageArrivalListener; - private org.apache.qpid.amqp_1_0.type.transport.Error _error; - private Runnable _remoteErrorTask; - - public Receiver(final Session session, - final String linkName, - final Target target, - final Source source, - final AcknowledgeMode ackMode) throws ConnectionErrorException - { - this(session, linkName, target, source, ackMode, false); - } - - public Receiver(final Session session, - final String linkName, - final Target target, - final Source source, - final AcknowledgeMode ackMode, - boolean isDurable) throws ConnectionErrorException - { - this(session,linkName,target,source,ackMode,isDurable,null); - } - - public Receiver(final Session session, - final String linkName, - final Target target, - final Source source, - final AcknowledgeMode ackMode, - final boolean isDurable, - final Map unsettled) throws ConnectionErrorException - { - - session.getConnection().checkNotClosed(); - _session = session; - if(isDurable) - { - source.setDurable(TerminusDurability.UNSETTLED_STATE); - source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); - } - else if(source != null) - { - source.setDurable(TerminusDurability.NONE); - source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); - } - _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source, - UnsignedInteger.ZERO); - - _endpoint.setDeliveryStateHandler(this); - - switch(ackMode) - { - case ALO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case AMO: - _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case EO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); - break; - - } - - _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener() - { - @Override public void messageTransfer(final Transfer xfr) - { - _prefetchQueue.add(xfr); - postPrefetchAction(); - } - - @Override - public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) - { - _error = detach.getError(); - if(detach.getError()!=null) - { - remoteError(); - } - super.remoteDetached(endpoint, detach); - } - }); - - _endpoint.setLocalUnsettled(unsettled); - _endpoint.attach(); - - synchronized(_endpoint.getLock()) - { - while(!_endpoint.isAttached() && !_endpoint.isDetached()) - { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - - if(_endpoint.getSource() == null) - { - synchronized(_endpoint.getLock()) - { - while(!_endpoint.isDetached()) - { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - throw new ConnectionErrorException(getError()); - } - else - { - - } - } - - private void remoteError() - { - if(_remoteErrorTask != null) - { - _remoteErrorTask.run(); - } - } - - private void postPrefetchAction() - { - if(_messageArrivalListener != null) - { - _messageArrivalListener.messageArrived(this); - } - } - - public void setCredit(UnsignedInteger credit, boolean window) - { - _endpoint.setLinkCredit(credit); - _endpoint.setCreditWindow(window); - - } - - - public String getAddress() - { - return ((Source)_endpoint.getSource()).getAddress(); - } - - public Map getFilter() - { - return ((Source)_endpoint.getSource()).getFilter(); - } - - public Message receive() - { - return receive(-1L); - } - - public Message receive(boolean wait) - { - return receive(wait ? -1L : 0L); - } - - // 0 means no wait, -1 wait forever - public Message receive(long wait) - { - Message m = null; - Transfer xfr; - long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L; - - while((xfr = receiveFromPrefetch(wait)) != null ) - { - - if(!Boolean.TRUE.equals(xfr.getAborted())) - { - Binary deliveryTag = xfr.getDeliveryTag(); - Boolean resume = xfr.getResume(); - - List
sections = new ArrayList
(); - List payloads = new ArrayList(); - int totalSize = 0; - - boolean hasMore; - do - { - hasMore = Boolean.TRUE.equals(xfr.getMore()); - - ByteBuffer buf = xfr.getPayload(); - - if(buf != null) - { - - totalSize += buf.remaining(); - - payloads.add(buf); - } - if(hasMore) - { - xfr = receiveFromPrefetch(-1l); - if(xfr== null) - { - // TODO - this is wrong!!!! - System.out.println("eeek"); - } - } - } - while(hasMore && !Boolean.TRUE.equals(xfr.getAborted())); - - if(!Boolean.TRUE.equals(xfr.getAborted())) - { - ByteBuffer allPayload = ByteBuffer.allocate(totalSize); - for(ByteBuffer payload : payloads) - { - allPayload.put(payload); - } - allPayload.flip(); - SectionDecoder decoder = _session.getSectionDecoder(); - - try - { - sections = decoder.parseAll(allPayload); - } - catch (AmqpErrorException e) - { - // todo - throw a sensible error - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - m = new Message(sections); - m.setDeliveryTag(deliveryTag); - m.setResume(resume); - m.setReceiver(this); - break; - } - } - - if(wait > 0L) - { - wait = endTime - System.currentTimeMillis(); - if(wait <=0L) - { - break; - } - } - } - - - return m; - - } - - private Transfer receiveFromPrefetch(long wait) - { - long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L); - final Object lock = _endpoint.getLock(); - synchronized(lock) - { - Transfer xfr; - while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached() - && wait != 0) - { - try - { - if(wait>0L) - { - lock.wait(wait); - } - else if(wait<0L) - { - lock.wait(); - } - } - catch (InterruptedException e) - { - return null; - } - if(wait > 0L) - { - wait = endTime - System.currentTimeMillis(); - if(wait <= 0L) - { - break; - } - } - - } - if(xfr != null) - { - _prefetchQueue.poll(); - - } - - return xfr; - } - - } - - - public void release(final Message m) - { - release(m.getDeliveryTag()); - } - - public void release(Binary deliveryTag) - { - update(new Released(), deliveryTag, null, null); - } - - - public void modified(Binary tag) - { - final Modified outcome = new Modified(); - outcome.setDeliveryFailed(true); - - update(outcome, tag, null, null); - } - - public void acknowledge(final Message m) - { - acknowledge(m.getDeliveryTag()); - } - - public void acknowledge(final Message m, SettledAction a) - { - acknowledge(m.getDeliveryTag(), a); - } - - - public void acknowledge(final Message m, Transaction txn) - { - acknowledge(m.getDeliveryTag(), txn); - } - - - public void acknowledge(final Binary deliveryTag) - { - acknowledge(deliveryTag, null, null); - } - - - public void acknowledge(final Binary deliveryTag, SettledAction a) - { - acknowledge(deliveryTag, null, a); - } - - public void acknowledge(final Binary deliveryTag, final Transaction txn) - { - acknowledge(deliveryTag, txn, null); - } - - public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action) - { - update(new Accepted(), deliveryTag, txn, action); - } - - public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action) - { - - DeliveryState state; - if(txn != null) - { - TransactionalState txnState = new TransactionalState(); - txnState.setOutcome(outcome); - txnState.setTxnId(txn.getTxnId()); - state = txnState; - } - else - { - state = (DeliveryState) outcome; - } - boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); - - if(!(settled || action == null)) - { - _unsettledMap.put(deliveryTag, action); - } - - _endpoint.updateDisposition(deliveryTag,state, settled); - } - - public Error getError() - { - return _error; - } - - public void acknowledgeAll(Message m) - { - acknowledgeAll(m.getDeliveryTag()); - } - - public void acknowledgeAll(Binary deliveryTag) - { - acknowledgeAll(deliveryTag, null, null); - } - - public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action) - { - updateAll(new Accepted(), deliveryTag, txn, action); - } - - public void updateAll(Outcome outcome, Binary deliveryTag) - { - updateAll(outcome, deliveryTag, null, null); - } - - public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action) - { - DeliveryState state; - - if(txn != null) - { - TransactionalState txnState = new TransactionalState(); - txnState.setOutcome(outcome); - txnState.setTxnId(txn.getTxnId()); - state = txnState; - } - else - { - state = (DeliveryState) outcome; - } - boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); - - if(!(settled || action == null)) - { - _unsettledMap.put(deliveryTag, action); - } - _endpoint.updateAllDisposition(deliveryTag, state, settled); - } - - - - public void close() - { - _endpoint.setTarget(null); - _endpoint.close(); - Message msg; - while((msg = receive(-1l)) != null) - { - release(msg); - } - - } - - - public void detach() - { - _endpoint.setTarget(null); - _endpoint.detach(); - Message msg; - while((msg = receive(-1l)) != null) - { - release(msg); - } - - } - - public void drain() - { - _endpoint.drain(); - } - - /** - * Waits for the receiver to drain or a message to be available to be received. - * @return true if the receiver has been drained. - */ - public boolean drainWait() - { - final Object lock = _endpoint.getLock(); - synchronized(lock) - { - try - { - while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() ) - { - lock.wait(); - } - } - catch (InterruptedException e) - { - } - } - return _prefetchQueue.peek()==null && _endpoint.isDrained(); - } - - /** - * Clears the receiver drain so that message delivery can resume. - */ - public void clearDrain() - { - _endpoint.clearDrain(); - } - - public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn) - { - _endpoint.setLinkCredit(credit); - _endpoint.setTransactionId(txn == null ? null : txn.getTxnId()); - _endpoint.setCreditWindow(false); - - } - - public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled) - { - if(Boolean.TRUE.equals(settled)) - { - SettledAction action = _unsettledMap.remove(deliveryTag); - if(action != null) - { - action.onSettled(deliveryTag); - } - } - } - - public Map getRemoteUnsettled() - { - return _endpoint.getInitialUnsettledMap(); - } - - - public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener) - { - synchronized(_endpoint.getLock()) - { - _messageArrivalListener = messageArrivalListener; - int prefetchSize = _prefetchQueue.size(); - for(int i = 0; i < prefetchSize; i++) - { - postPrefetchAction(); - } - } - } - - public Session getSession() - { - return _session; - } - - public org.apache.qpid.amqp_1_0.type.Source getSource() - { - return _endpoint.getSource(); - } - - public static interface SettledAction - { - public void onSettled(Binary deliveryTag); - } - - - public interface MessageArrivalListener - { - void messageArrived(Receiver receiver); - } - - public void setRemoteErrorListener(Runnable listener) - { - _remoteErrorTask = listener; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; + +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.Target; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class Receiver implements DeliveryStateHandler +{ + private ReceivingLinkEndpoint _endpoint; + private int _id; + private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100); + private Session _session; + + private Queue _prefetchQueue = new ConcurrentLinkedQueue(); + private Map _unsettledMap = new HashMap(); + private MessageArrivalListener _messageArrivalListener; + private org.apache.qpid.amqp_1_0.type.transport.Error _error; + private Runnable _remoteErrorTask; + + public Receiver(final Session session, + final String linkName, + final Target target, + final Source source, + final AcknowledgeMode ackMode) throws ConnectionErrorException + { + this(session, linkName, target, source, ackMode, false); + } + + public Receiver(final Session session, + final String linkName, + final Target target, + final Source source, + final AcknowledgeMode ackMode, + boolean isDurable) throws ConnectionErrorException + { + this(session,linkName,target,source,ackMode,isDurable,null); + } + + public Receiver(final Session session, + final String linkName, + final Target target, + final Source source, + final AcknowledgeMode ackMode, + final boolean isDurable, + final Map unsettled) throws ConnectionErrorException + { + + session.getConnection().checkNotClosed(); + _session = session; + if(isDurable) + { + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + } + else if(source != null) + { + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + } + _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source, + UnsignedInteger.ZERO); + + _endpoint.setDeliveryStateHandler(this); + + switch(ackMode) + { + case ALO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case EO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + + _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener() + { + @Override public void messageTransfer(final Transfer xfr) + { + _prefetchQueue.add(xfr); + postPrefetchAction(); + } + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + _error = detach.getError(); + if(detach.getError()!=null) + { + remoteError(); + } + super.remoteDetached(endpoint, detach); + } + }); + + _endpoint.setLocalUnsettled(unsettled); + _endpoint.attach(); + + synchronized(_endpoint.getLock()) + { + while(!_endpoint.isAttached() && !_endpoint.isDetached()) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + if(_endpoint.getSource() == null) + { + synchronized(_endpoint.getLock()) + { + while(!_endpoint.isDetached()) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + throw new ConnectionErrorException(getError()); + } + else + { + + } + } + + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + + private void postPrefetchAction() + { + if(_messageArrivalListener != null) + { + _messageArrivalListener.messageArrived(this); + } + } + + public void setCredit(UnsignedInteger credit, boolean window) + { + _endpoint.setLinkCredit(credit); + _endpoint.setCreditWindow(window); + + } + + + public String getAddress() + { + return ((Source)_endpoint.getSource()).getAddress(); + } + + public Map getFilter() + { + return ((Source)_endpoint.getSource()).getFilter(); + } + + public Message receive() + { + return receive(-1L); + } + + public Message receive(boolean wait) + { + return receive(wait ? -1L : 0L); + } + + // 0 means no wait, -1 wait forever + public Message receive(long wait) + { + Message m = null; + Transfer xfr; + long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L; + + while((xfr = receiveFromPrefetch(wait)) != null ) + { + + if(!Boolean.TRUE.equals(xfr.getAborted())) + { + Binary deliveryTag = xfr.getDeliveryTag(); + Boolean resume = xfr.getResume(); + + List
sections = new ArrayList
(); + List payloads = new ArrayList(); + int totalSize = 0; + + boolean hasMore; + do + { + hasMore = Boolean.TRUE.equals(xfr.getMore()); + + ByteBuffer buf = xfr.getPayload(); + + if(buf != null) + { + + totalSize += buf.remaining(); + + payloads.add(buf); + } + if(hasMore) + { + xfr = receiveFromPrefetch(-1l); + if(xfr== null) + { + // TODO - this is wrong!!!! + System.out.println("eeek"); + } + } + } + while(hasMore && !Boolean.TRUE.equals(xfr.getAborted())); + + if(!Boolean.TRUE.equals(xfr.getAborted())) + { + ByteBuffer allPayload = ByteBuffer.allocate(totalSize); + for(ByteBuffer payload : payloads) + { + allPayload.put(payload); + } + allPayload.flip(); + SectionDecoder decoder = _session.getSectionDecoder(); + + try + { + sections = decoder.parseAll(allPayload); + } + catch (AmqpErrorException e) + { + // todo - throw a sensible error + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + m = new Message(sections); + m.setDeliveryTag(deliveryTag); + m.setResume(resume); + m.setReceiver(this); + break; + } + } + + if(wait > 0L) + { + wait = endTime - System.currentTimeMillis(); + if(wait <=0L) + { + break; + } + } + } + + + return m; + + } + + private Transfer receiveFromPrefetch(long wait) + { + long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L); + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + Transfer xfr; + while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached() + && wait != 0) + { + try + { + if(wait>0L) + { + lock.wait(wait); + } + else if(wait<0L) + { + lock.wait(); + } + } + catch (InterruptedException e) + { + return null; + } + if(wait > 0L) + { + wait = endTime - System.currentTimeMillis(); + if(wait <= 0L) + { + break; + } + } + + } + if(xfr != null) + { + _prefetchQueue.poll(); + + } + + return xfr; + } + + } + + + public void release(final Message m) + { + release(m.getDeliveryTag()); + } + + public void release(Binary deliveryTag) + { + update(new Released(), deliveryTag, null, null); + } + + + public void modified(Binary tag) + { + final Modified outcome = new Modified(); + outcome.setDeliveryFailed(true); + + update(outcome, tag, null, null); + } + + public void acknowledge(final Message m) + { + acknowledge(m.getDeliveryTag()); + } + + public void acknowledge(final Message m, SettledAction a) + { + acknowledge(m.getDeliveryTag(), a); + } + + + public void acknowledge(final Message m, Transaction txn) + { + acknowledge(m.getDeliveryTag(), txn); + } + + + public void acknowledge(final Binary deliveryTag) + { + acknowledge(deliveryTag, null, null); + } + + + public void acknowledge(final Binary deliveryTag, SettledAction a) + { + acknowledge(deliveryTag, null, a); + } + + public void acknowledge(final Binary deliveryTag, final Transaction txn) + { + acknowledge(deliveryTag, txn, null); + } + + public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action) + { + update(new Accepted(), deliveryTag, txn, action); + } + + public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action) + { + + DeliveryState state; + if(txn != null) + { + TransactionalState txnState = new TransactionalState(); + txnState.setOutcome(outcome); + txnState.setTxnId(txn.getTxnId()); + state = txnState; + } + else + { + state = (DeliveryState) outcome; + } + boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); + + if(!(settled || action == null)) + { + _unsettledMap.put(deliveryTag, action); + } + + _endpoint.updateDisposition(deliveryTag,state, settled); + } + + public Error getError() + { + return _error; + } + + public void acknowledgeAll(Message m) + { + acknowledgeAll(m.getDeliveryTag()); + } + + public void acknowledgeAll(Binary deliveryTag) + { + acknowledgeAll(deliveryTag, null, null); + } + + public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action) + { + updateAll(new Accepted(), deliveryTag, txn, action); + } + + public void updateAll(Outcome outcome, Binary deliveryTag) + { + updateAll(outcome, deliveryTag, null, null); + } + + public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action) + { + DeliveryState state; + + if(txn != null) + { + TransactionalState txnState = new TransactionalState(); + txnState.setOutcome(outcome); + txnState.setTxnId(txn.getTxnId()); + state = txnState; + } + else + { + state = (DeliveryState) outcome; + } + boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); + + if(!(settled || action == null)) + { + _unsettledMap.put(deliveryTag, action); + } + _endpoint.updateAllDisposition(deliveryTag, state, settled); + } + + + + public void close() + { + _endpoint.setTarget(null); + _endpoint.close(); + Message msg; + while((msg = receive(-1l)) != null) + { + release(msg); + } + + } + + + public void detach() + { + _endpoint.setTarget(null); + _endpoint.detach(); + Message msg; + while((msg = receive(-1l)) != null) + { + release(msg); + } + + } + + public void drain() + { + _endpoint.drain(); + } + + /** + * Waits for the receiver to drain or a message to be available to be received. + * @return true if the receiver has been drained. + */ + public boolean drainWait() + { + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + try + { + while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() ) + { + lock.wait(); + } + } + catch (InterruptedException e) + { + } + } + return _prefetchQueue.peek()==null && _endpoint.isDrained(); + } + + /** + * Clears the receiver drain so that message delivery can resume. + */ + public void clearDrain() + { + _endpoint.clearDrain(); + } + + public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn) + { + _endpoint.setLinkCredit(credit); + _endpoint.setTransactionId(txn == null ? null : txn.getTxnId()); + _endpoint.setCreditWindow(false); + + } + + public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled) + { + if(Boolean.TRUE.equals(settled)) + { + SettledAction action = _unsettledMap.remove(deliveryTag); + if(action != null) + { + action.onSettled(deliveryTag); + } + } + } + + public Map getRemoteUnsettled() + { + return _endpoint.getInitialUnsettledMap(); + } + + + public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener) + { + synchronized(_endpoint.getLock()) + { + _messageArrivalListener = messageArrivalListener; + int prefetchSize = _prefetchQueue.size(); + for(int i = 0; i < prefetchSize; i++) + { + postPrefetchAction(); + } + } + } + + public Session getSession() + { + return _session; + } + + public org.apache.qpid.amqp_1_0.type.Source getSource() + { + return _endpoint.getSource(); + } + + public static interface SettledAction + { + public void onSettled(Binary deliveryTag); + } + + + public interface MessageArrivalListener + { + void messageArrived(Receiver receiver); + } + + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index 0feaa48805..851d02a798 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -1,452 +1,452 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.Source; -import org.apache.qpid.amqp_1_0.type.Target; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.*; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.qpid.amqp_1_0.type.transport.Error; - -public class Sender implements DeliveryStateHandler -{ - private SendingLinkEndpoint _endpoint; - private int _id; - private Session _session; - private int _windowSize; - private Map _outcomeActions = Collections.synchronizedMap(new HashMap()); - private boolean _closed; - private Error _error; - private Runnable _remoteErrorTask; - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, false); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - boolean synchronous) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window) throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO); - } - - - public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, - int window) throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, target, source, window, AcknowledgeMode.ALO); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window, AcknowledgeMode mode) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, window, mode, null); - } - - public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, - int window, AcknowledgeMode mode) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, target, source, window, mode, null); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window, AcknowledgeMode mode, Map unsettled) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window, AcknowledgeMode mode, boolean isDurable, Map unsettled) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled); - } - - private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr) - { - org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source(); - source.setAddress(sourceAddr); - return source; - } - - private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable) - { - org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target(); - target.setAddress(targetAddr); - if(isDurable) - { - target.setDurable(TerminusDurability.UNSETTLED_STATE); - target.setExpiryPolicy(TerminusExpiryPolicy.NEVER); - } - return target; - } - - public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, - int window, AcknowledgeMode mode, Map unsettled) - throws SenderCreationException, ConnectionClosedException - { - - _session = session; - session.getConnection().checkNotClosed(); - _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName, - source, target, unsettled); - - - switch(mode) - { - case ALO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case AMO: - _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); - break; - case EO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); - break; - - } - _endpoint.setDeliveryStateHandler(this); - _endpoint.attach(); - _windowSize = window; - - synchronized(_endpoint.getLock()) - { - while(!(_endpoint.isAttached() || _endpoint.isDetached())) - { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) - { - throw new SenderCreationException(e); - } - } - if(_endpoint.getTarget()== null) - { - throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); - }; - } - - _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() - { - - @Override - public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) - { - _error = detach.getError(); - if(_error != null) - { - remoteError(); - } - super.remoteDetached(endpoint, detach); - } - }); - } - - public Source getSource() - { - return _endpoint.getSource(); - } - - public Target getTarget() - { - return _endpoint.getTarget(); - } - - public void send(Message message) throws LinkDetachedException - { - send(message, null, null); - } - - public void send(Message message, final OutcomeAction action) throws LinkDetachedException - { - send(message, null, action); - } - - public void send(Message message, final Transaction txn) throws LinkDetachedException - { - send(message, txn, null); - } - - public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException - { - - List
sections = message.getPayload(); - - Transfer xfr = new Transfer(); - - if(sections != null && !sections.isEmpty()) - { - SectionEncoder encoder = _session.getSectionEncoder(); - encoder.reset(); - - int sectionNumber = 0; - for(Section section : sections) - { - encoder.encodeObject(section); - } - - - Binary encoding = encoder.getEncoding(); - ByteBuffer payload = encoding.asByteBuffer(); - xfr.setPayload(payload); - } - if(message.getDeliveryTag() == null) - { - message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes())); - } - if(message.isResume()) - { - xfr.setResume(Boolean.TRUE); - } - if(message.getDeliveryState() != null) - { - xfr.setState(message.getDeliveryState()); - } - - xfr.setDeliveryTag(message.getDeliveryTag()); - //xfr.setSettled(_windowSize ==0); - if(txn != null) - { - xfr.setSettled(false); - TransactionalState deliveryState = new TransactionalState(); - deliveryState.setTxnId(txn.getTxnId()); - xfr.setState(deliveryState); - } - else - { - xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED); - } - final Object lock = _endpoint.getLock(); - synchronized(lock) - { - while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - if(_endpoint.isDetached()) - { - throw new LinkDetachedException(_error); - } - if(action != null) - { - _outcomeActions.put(message.getDeliveryTag(), action); - } - _endpoint.transfer(xfr); - //TODO - rationalise sending of flows - // _endpoint.sendFlow(); - } - - if(_windowSize != 0) - { - synchronized(lock) - { - - - while(_endpoint.getUnsettledCount() >= _windowSize) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - - } - - - } - - public void close() throws SenderClosingException - { - - if(_windowSize != 0) - { - synchronized(_endpoint.getLock()) - { - - - while(_endpoint.getUnsettledCount() > 0) - { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - - } - _session.removeSender(this); - _endpoint.setSource(null); - _endpoint.detach(); - _closed = true; - - synchronized(_endpoint.getLock()) - { - while(!_endpoint.isDetached()) - { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) - { - throw new SenderClosingException(e); - } - } - } - } - - public boolean isClosed() - { - return _closed; - } - - public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) - { - if(state instanceof Outcome) - { - OutcomeAction action; - if((action = _outcomeActions.remove(deliveryTag)) != null) - { - action.onOutcome(deliveryTag, (Outcome) state); - } - if(!Boolean.TRUE.equals(settled)) - { - _endpoint.updateDisposition(deliveryTag, state, true); - } - } - else if(state instanceof TransactionalState) - { - OutcomeAction action; - - if((action = _outcomeActions.remove(deliveryTag)) != null) - { - action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome()); - } - - } - } - - public SendingLinkEndpoint getEndpoint() - { - return _endpoint; - } - - public Map getRemoteUnsettled() - { - return _endpoint.getInitialUnsettledMap(); - } - - public Session getSession() - { - return _session; - } - - - private void remoteError() - { - if(_remoteErrorTask != null) - { - _remoteErrorTask.run(); - } - } - - - public void setRemoteErrorListener(Runnable listener) - { - _remoteErrorTask = listener; - } - - public Error getError() - { - return _error; - } - - public class SenderCreationException extends Exception - { - public SenderCreationException(Throwable e) - { - super(e); - } - - public SenderCreationException(String e) - { - super(e); - - } - } - - public class SenderClosingException extends Exception - { - public SenderClosingException(Throwable e) - { - super(e); - } - } - - public static interface OutcomeAction - { - public void onOutcome(Binary deliveryTag, Outcome outcome); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.*; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.qpid.amqp_1_0.type.transport.Error; + +public class Sender implements DeliveryStateHandler +{ + private SendingLinkEndpoint _endpoint; + private int _id; + private Session _session; + private int _windowSize; + private Map _outcomeActions = Collections.synchronizedMap(new HashMap()); + private boolean _closed; + private Error _error; + private Runnable _remoteErrorTask; + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, false); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + boolean synchronous) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window) throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO); + } + + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window) throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, target, source, window, AcknowledgeMode.ALO); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, mode, null); + } + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window, AcknowledgeMode mode) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, target, source, window, mode, null); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode, Map unsettled) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode, boolean isDurable, Map unsettled) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled); + } + + private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr) + { + org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source(); + source.setAddress(sourceAddr); + return source; + } + + private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable) + { + org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target(); + target.setAddress(targetAddr); + if(isDurable) + { + target.setDurable(TerminusDurability.UNSETTLED_STATE); + target.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + } + return target; + } + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window, AcknowledgeMode mode, Map unsettled) + throws SenderCreationException, ConnectionClosedException + { + + _session = session; + session.getConnection().checkNotClosed(); + _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName, + source, target, unsettled); + + + switch(mode) + { + case ALO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); + break; + case EO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + _endpoint.setDeliveryStateHandler(this); + _endpoint.attach(); + _windowSize = window; + + synchronized(_endpoint.getLock()) + { + while(!(_endpoint.isAttached() || _endpoint.isDetached())) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + throw new SenderCreationException(e); + } + } + if(_endpoint.getTarget()== null) + { + throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); + }; + } + + _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() + { + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + _error = detach.getError(); + if(_error != null) + { + remoteError(); + } + super.remoteDetached(endpoint, detach); + } + }); + } + + public Source getSource() + { + return _endpoint.getSource(); + } + + public Target getTarget() + { + return _endpoint.getTarget(); + } + + public void send(Message message) throws LinkDetachedException + { + send(message, null, null); + } + + public void send(Message message, final OutcomeAction action) throws LinkDetachedException + { + send(message, null, action); + } + + public void send(Message message, final Transaction txn) throws LinkDetachedException + { + send(message, txn, null); + } + + public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException + { + + List
sections = message.getPayload(); + + Transfer xfr = new Transfer(); + + if(sections != null && !sections.isEmpty()) + { + SectionEncoder encoder = _session.getSectionEncoder(); + encoder.reset(); + + int sectionNumber = 0; + for(Section section : sections) + { + encoder.encodeObject(section); + } + + + Binary encoding = encoder.getEncoding(); + ByteBuffer payload = encoding.asByteBuffer(); + xfr.setPayload(payload); + } + if(message.getDeliveryTag() == null) + { + message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes())); + } + if(message.isResume()) + { + xfr.setResume(Boolean.TRUE); + } + if(message.getDeliveryState() != null) + { + xfr.setState(message.getDeliveryState()); + } + + xfr.setDeliveryTag(message.getDeliveryTag()); + //xfr.setSettled(_windowSize ==0); + if(txn != null) + { + xfr.setSettled(false); + TransactionalState deliveryState = new TransactionalState(); + deliveryState.setTxnId(txn.getTxnId()); + xfr.setState(deliveryState); + } + else + { + xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED); + } + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + if(_endpoint.isDetached()) + { + throw new LinkDetachedException(_error); + } + if(action != null) + { + _outcomeActions.put(message.getDeliveryTag(), action); + } + _endpoint.transfer(xfr); + //TODO - rationalise sending of flows + // _endpoint.sendFlow(); + } + + if(_windowSize != 0) + { + synchronized(lock) + { + + + while(_endpoint.getUnsettledCount() >= _windowSize) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + + + } + + public void close() throws SenderClosingException + { + + if(_windowSize != 0) + { + synchronized(_endpoint.getLock()) + { + + + while(_endpoint.getUnsettledCount() > 0) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + _session.removeSender(this); + _endpoint.setSource(null); + _endpoint.detach(); + _closed = true; + + synchronized(_endpoint.getLock()) + { + while(!_endpoint.isDetached()) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + throw new SenderClosingException(e); + } + } + } + } + + public boolean isClosed() + { + return _closed; + } + + public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) + { + if(state instanceof Outcome) + { + OutcomeAction action; + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + action.onOutcome(deliveryTag, (Outcome) state); + } + if(!Boolean.TRUE.equals(settled)) + { + _endpoint.updateDisposition(deliveryTag, state, true); + } + } + else if(state instanceof TransactionalState) + { + OutcomeAction action; + + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome()); + } + + } + } + + public SendingLinkEndpoint getEndpoint() + { + return _endpoint; + } + + public Map getRemoteUnsettled() + { + return _endpoint.getInitialUnsettledMap(); + } + + public Session getSession() + { + return _session; + } + + + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + + + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } + + public Error getError() + { + return _error; + } + + public class SenderCreationException extends Exception + { + public SenderCreationException(Throwable e) + { + super(e); + } + + public SenderCreationException(String e) + { + super(e); + + } + } + + public class SenderClosingException extends Exception + { + public SenderClosingException(Throwable e) + { + super(e); + } + } + + public static interface OutcomeAction + { + public void onOutcome(Binary deliveryTag, Outcome outcome); + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index 182d904a9c..79ed3b4457 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -1,384 +1,384 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; -import org.apache.qpid.amqp_1_0.transport.SessionState; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; -import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -public class Session -{ - private SessionEndpoint _endpoint; - private List _receivers = new ArrayList(); - private List _senders = new ArrayList(); - private SectionEncoder _sectionEncoder; - private SectionDecoder _sectionDecoder; - private TransactionController _sessionLocalTC; - private Connection _connection; - - public Session(final Connection connection, String name) - { - _connection = connection; - _endpoint = connection.getEndpoint().createSession(name); - _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); - _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); - } - - - public synchronized Sender createSender(final String targetName) - throws Sender.SenderCreationException, ConnectionClosedException - { - return createSender(targetName, false); - } - - public synchronized Sender createSender(final String targetName, boolean synchronous) - throws Sender.SenderCreationException, ConnectionClosedException - { - - final String sourceName = UUID.randomUUID().toString(); - return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous); - - } - - public synchronized Sender createSender(final String targetName, int window) - throws Sender.SenderCreationException, ConnectionClosedException - { - final String sourceName = UUID.randomUUID().toString(); - return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window); - - } - - public Sender createSender(String targetName, int window, AcknowledgeMode mode) - throws Sender.SenderCreationException, ConnectionClosedException - { - - return createSender(targetName, window, mode, null); - } - - public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName) - throws Sender.SenderCreationException, ConnectionClosedException - { - return createSender(targetName, window, mode, linkName, null); - } - public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map unsettled) - throws Sender.SenderCreationException, ConnectionClosedException - { - return createSender(targetName, window, mode, linkName, false, unsettled); - } - - public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, - boolean isDurable, Map unsettled) - throws Sender.SenderCreationException, ConnectionClosedException - { - return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName, - targetName, null, window, mode, isDurable, unsettled); - - } - - - public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException - { - return createReceiver(sourceAddr, null, AcknowledgeMode.ALO); - } - - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode) - throws ConnectionErrorException - { - return createReceiver(queue, null, mode); - } - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName) - throws ConnectionErrorException - { - return createReceiver(queue, null, mode, linkName); - } - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable) - throws ConnectionErrorException - { - return createReceiver(queue, null, mode, linkName, isDurable); - } - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable, - Map filters, Map unsettled) - throws ConnectionErrorException - { - return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled); - } - - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, - boolean isDurable, Map unsettled) - throws ConnectionErrorException - { - return createReceiver(queue, null, mode, linkName, isDurable, unsettled); - } - - - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO); - } - - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName); - } - - - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, mode, ackMode, null); - } - - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName) - throws ConnectionErrorException - { - return createReceiver(sourceAddr,mode, ackMode, linkName, false); - } - - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName, boolean isDurable) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null); - } - - private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName, boolean isDurable, - Map unsettled) - throws ConnectionErrorException - { - return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled); - } - - public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName, boolean isDurable, - Map filters, Map unsettled) - throws ConnectionErrorException - { - - final Target target = new Target(); - final Source source = new Source(); - source.setAddress(sourceAddr); - source.setDistributionMode(mode); - source.setFilter(filters); - - if(linkName == null) - { - linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")"; - } - - final Receiver receiver = - new Receiver(this, linkName, - target, source, ackMode, isDurable, unsettled); - _receivers.add(receiver); - - return receiver; - - } - - public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException - { - return createReceiver(sourceAddr, StdDistMode.COPY); - } - - public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException - { - return createReceiver(sourceAddr, StdDistMode.MOVE); - } - - public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException - { - Source source = new Source(); - source.setDynamic(true); - - final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(), - source, AcknowledgeMode.ALO); - _receivers.add(receiver); - return receiver; - } - - public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException - { - Target target = new Target(); - target.setDynamic(true); - - final Sender sender; - sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target, - new Source(), 0, AcknowledgeMode.ALO); - _senders.add(sender); - return sender; - } - - - - public SessionEndpoint getEndpoint() - { - return _endpoint; - } - - public synchronized void close() - { - try - { - for(Sender sender : new ArrayList(_senders)) - { - sender.close(); - } - for(Receiver receiver : new ArrayList(_receivers)) - { - receiver.detach(); - } - if(_sessionLocalTC != null) - { - _sessionLocalTC.close(); - } - _endpoint.end(); - } - catch (Sender.SenderClosingException e) - { -// TODO - e.printStackTrace(); - } - - //TODO - - } - - void removeSender(Sender sender) - { - _senders.remove(sender); - } - - void removeReceiver(Receiver receiver) - { - _receivers.remove(receiver); - } - - public SectionEncoder getSectionEncoder() - { - return _sectionEncoder; - } - - public SectionDecoder getSectionDecoder() - { - return _sectionDecoder; - } - - - public Transaction createSessionLocalTransaction() - { - TransactionController localController = getSessionLocalTransactionController(); - return localController.beginTransaction(); - } - - private TransactionController getSessionLocalTransactionController() - { - if(_sessionLocalTC == null) - { - _sessionLocalTC = createSessionLocalTransactionController(); - } - return _sessionLocalTC; - } - - private TransactionController createSessionLocalTransactionController() - { - String name = "txnControllerLink"; - SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN, - TxnCapability.MULTI_TXNS_PER_SSN); - tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - tcLinkEndpoint.attach(); - return new TransactionController(this, tcLinkEndpoint); - } - - - public Message receive() - { - while(getEndpoint().getState() == SessionState.ACTIVE) - { - synchronized (getEndpoint().getLock()) - { - try - { - for(Receiver r : _receivers) - { - Message m = r.receive(false); - if(m != null) - return m; - } - wait(); - } - catch (InterruptedException e) - { - } - } - } - return null; - } - - public Connection getConnection() - { - return _connection; - } - - public void awaitActive() - { - synchronized(getEndpoint().getLock()) - { - while(!getEndpoint().isEnded() && !getEndpoint().isActive()) - { - try - { - getEndpoint().getLock().wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionState; +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.messaging.Filter; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; +import org.apache.qpid.amqp_1_0.type.messaging.Target; +import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class Session +{ + private SessionEndpoint _endpoint; + private List _receivers = new ArrayList(); + private List _senders = new ArrayList(); + private SectionEncoder _sectionEncoder; + private SectionDecoder _sectionDecoder; + private TransactionController _sessionLocalTC; + private Connection _connection; + + public Session(final Connection connection, String name) + { + _connection = connection; + _endpoint = connection.getEndpoint().createSession(name); + _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); + _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); + } + + + public synchronized Sender createSender(final String targetName) + throws Sender.SenderCreationException, ConnectionClosedException + { + return createSender(targetName, false); + } + + public synchronized Sender createSender(final String targetName, boolean synchronous) + throws Sender.SenderCreationException, ConnectionClosedException + { + + final String sourceName = UUID.randomUUID().toString(); + return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous); + + } + + public synchronized Sender createSender(final String targetName, int window) + throws Sender.SenderCreationException, ConnectionClosedException + { + final String sourceName = UUID.randomUUID().toString(); + return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window); + + } + + public Sender createSender(String targetName, int window, AcknowledgeMode mode) + throws Sender.SenderCreationException, ConnectionClosedException + { + + return createSender(targetName, window, mode, null); + } + + public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName) + throws Sender.SenderCreationException, ConnectionClosedException + { + return createSender(targetName, window, mode, linkName, null); + } + public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map unsettled) + throws Sender.SenderCreationException, ConnectionClosedException + { + return createSender(targetName, window, mode, linkName, false, unsettled); + } + + public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, + boolean isDurable, Map unsettled) + throws Sender.SenderCreationException, ConnectionClosedException + { + return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName, + targetName, null, window, mode, isDurable, unsettled); + + } + + + public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException + { + return createReceiver(sourceAddr, null, AcknowledgeMode.ALO); + } + + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode); + } + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName); + } + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName, isDurable); + } + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable, + Map filters, Map unsettled) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled); + } + + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, + boolean isDurable, Map unsettled) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName, isDurable, unsettled); + } + + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName); + } + + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, ackMode, null); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName) + throws ConnectionErrorException + { + return createReceiver(sourceAddr,mode, ackMode, linkName, false); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName, boolean isDurable) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName, boolean isDurable, + Map unsettled) + throws ConnectionErrorException + { + return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled); + } + + public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName, boolean isDurable, + Map filters, Map unsettled) + throws ConnectionErrorException + { + + final Target target = new Target(); + final Source source = new Source(); + source.setAddress(sourceAddr); + source.setDistributionMode(mode); + source.setFilter(filters); + + if(linkName == null) + { + linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")"; + } + + final Receiver receiver = + new Receiver(this, linkName, + target, source, ackMode, isDurable, unsettled); + _receivers.add(receiver); + + return receiver; + + } + + public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException + { + return createReceiver(sourceAddr, StdDistMode.COPY); + } + + public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException + { + return createReceiver(sourceAddr, StdDistMode.MOVE); + } + + public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException + { + Source source = new Source(); + source.setDynamic(true); + + final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(), + source, AcknowledgeMode.ALO); + _receivers.add(receiver); + return receiver; + } + + public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException + { + Target target = new Target(); + target.setDynamic(true); + + final Sender sender; + sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target, + new Source(), 0, AcknowledgeMode.ALO); + _senders.add(sender); + return sender; + } + + + + public SessionEndpoint getEndpoint() + { + return _endpoint; + } + + public synchronized void close() + { + try + { + for(Sender sender : new ArrayList(_senders)) + { + sender.close(); + } + for(Receiver receiver : new ArrayList(_receivers)) + { + receiver.detach(); + } + if(_sessionLocalTC != null) + { + _sessionLocalTC.close(); + } + _endpoint.end(); + } + catch (Sender.SenderClosingException e) + { +// TODO + e.printStackTrace(); + } + + //TODO + + } + + void removeSender(Sender sender) + { + _senders.remove(sender); + } + + void removeReceiver(Receiver receiver) + { + _receivers.remove(receiver); + } + + public SectionEncoder getSectionEncoder() + { + return _sectionEncoder; + } + + public SectionDecoder getSectionDecoder() + { + return _sectionDecoder; + } + + + public Transaction createSessionLocalTransaction() + { + TransactionController localController = getSessionLocalTransactionController(); + return localController.beginTransaction(); + } + + private TransactionController getSessionLocalTransactionController() + { + if(_sessionLocalTC == null) + { + _sessionLocalTC = createSessionLocalTransactionController(); + } + return _sessionLocalTC; + } + + private TransactionController createSessionLocalTransactionController() + { + String name = "txnControllerLink"; + SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN, + TxnCapability.MULTI_TXNS_PER_SSN); + tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + tcLinkEndpoint.attach(); + return new TransactionController(this, tcLinkEndpoint); + } + + + public Message receive() + { + while(getEndpoint().getState() == SessionState.ACTIVE) + { + synchronized (getEndpoint().getLock()) + { + try + { + for(Receiver r : _receivers) + { + Message m = r.receive(false); + if(m != null) + return m; + } + wait(); + } + catch (InterruptedException e) + { + } + } + } + return null; + } + + public Connection getConnection() + { + return _connection; + } + + public void awaitActive() + { + synchronized(getEndpoint().getLock()) + { + while(!getEndpoint().isEnded() && !getEndpoint().isActive()) + { + try + { + getEndpoint().getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } +} -- cgit v1.2.1