summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-04-04 21:10:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-04-04 21:10:07 +0000
commitf93a96fc0cccd29885383f2781f01484f7833eeb (patch)
tree2f4a81835f29bc9a61f9d4947eed8fc0654f3fe7 /java/broker
parent6d83a5faddfeff7c3a788907d886dae7459dbaa1 (diff)
downloadqpid-python-f93a96fc0cccd29885383f2781f01484f7833eeb.tar.gz
QPID-3933 : [Java] Add interim AMQP 1-0 implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1309594 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/build.xml2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Broker.java33
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java6
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java522
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java2
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java64
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java391
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java453
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java98
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java108
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java59
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java172
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java305
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java646
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java440
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java621
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java195
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java8
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java2
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java20
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java5
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java18
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java6
50 files changed, 4573 insertions, 74 deletions
diff --git a/java/broker/build.xml b/java/broker/build.xml
index bb809583ee..9e8bf12f18 100644
--- a/java/broker/build.xml
+++ b/java/broker/build.xml
@@ -19,7 +19,7 @@
-
-->
<project name="AMQ Broker" default="build">
- <property name="module.depends" value="management/common common"/>
+ <property name="module.depends" value="management/common common amqp-1-0-common"/>
<property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<property name="module.genpom" value="true"/>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 6910247577..8198cec821 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -89,6 +89,7 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index 263b7fe40a..5004d320c2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -53,10 +53,16 @@ import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.EnumSet;
+import java.util.Formatter;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.FileHandler;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
public class Broker
{
@@ -159,6 +165,12 @@ public class Broker
parsePortList(sslPorts, serverConfig.getSSLPorts());
}
+ Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0));
+ if(exclude_1_0.isEmpty())
+ {
+ parsePortList(exclude_1_0, serverConfig.getPortExclude10());
+ }
+
Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10));
if(exclude_0_10.isEmpty())
{
@@ -208,7 +220,8 @@ public class Broker
final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port);
final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, serverConfig);
+ getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9,
+ exclude_0_8, serverConfig);
final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
@@ -237,7 +250,8 @@ public class Broker
final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort);
final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, serverConfig);
+ getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1,
+ exclude_0_9, exclude_0_8, serverConfig);
final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
@@ -262,13 +276,20 @@ public class Broker
}
}
- private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10,
- final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9,
- final Set<Integer> exclude_0_8,
- final ServerConfiguration serverConfig)
+ private static Set<AmqpProtocolVersion> getSupportedVersions(final int port,
+ final Set<Integer> exclude_1_0,
+ final Set<Integer> exclude_0_10,
+ final Set<Integer> exclude_0_9_1,
+ final Set<Integer> exclude_0_9,
+ final Set<Integer> exclude_0_8,
+ final ServerConfiguration serverConfig)
{
final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
+ if(exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled())
+ {
+ supported.remove(AmqpProtocolVersion.v1_0_0);
+ }
if(exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled())
{
supported.remove(AmqpProtocolVersion.v0_10);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java b/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
index 1f96a24701..d871c724fd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
@@ -47,7 +47,6 @@ public class BrokerOptions
private Integer _logWatchFrequency = 0;
-
public void addPort(final int port)
{
_ports.add(port);
@@ -107,7 +106,6 @@ public class BrokerOptions
{
_jmxPortConnectorServer = jmxPortConnectorServer;
}
-
public String getQpidHome()
{
return System.getProperty(QPID_HOME);
@@ -163,5 +161,4 @@ public class BrokerOptions
{
_bundleContext = bundleContext;
}
-
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 5fcd8a7b52..70fa414e3c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -59,6 +59,12 @@ public class Main
.withDescription("SSL port. Overrides any value in the config file")
.withLongOpt("sslport").create("s");
+
+ private static final Option OPTION_EXCLUDE_1_0 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP1-0 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-1-0").create();
+
private static final Option OPTION_EXCLUDE_0_10 =
OptionBuilder.withArgName("port").hasArg()
.withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
@@ -116,6 +122,7 @@ public class Main
OPTIONS.addOption(OPTION_LOG_WATCH);
OPTIONS.addOption(OPTION_PORT);
OPTIONS.addOption(OPTION_SSLPORT);
+ OPTIONS.addOption(OPTION_EXCLUDE_1_0);
OPTIONS.addOption(OPTION_EXCLUDE_0_10);
OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
OPTIONS.addOption(OPTION_EXCLUDE_0_9);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java b/java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java
index 22d97d36dd..fe6e32173f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java
@@ -28,7 +28,8 @@ public enum ProtocolExclusion
v0_8("exclude-0-8","--exclude-0-8"),
v0_9("exclude-0-9", "--exclude-0-9"),
v0_9_1("exclude-0-9-1", "--exclude-0-9-1"),
- v0_10("exclude-0-10", "--exclude-0-10");
+ v0_10("exclude-0-10", "--exclude-0-10"),
+ v1_0("exclude-1-0", "--exclude-1-0");
private static final Map<String, ProtocolExclusion> MAP = new HashMap<String,ProtocolExclusion>();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 5d0546f6a7..46027d02c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -88,6 +88,7 @@ public class ServerConfiguration extends ConfigurationPlugin
public static final String MGMT_JMXPORT_CONNECTORSERVER = "management.jmxport.connectorServer";
public static final String STATUS_UPDATES = "status-updates";
public static final String ADVANCED_LOCALE = "advanced.locale";
+ public static final String CONNECTOR_AMQP10ENABLED = "connector.amqp10enabled";
public static final String CONNECTOR_AMQP010ENABLED = "connector.amqp010enabled";
public static final String CONNECTOR_AMQP091ENABLED = "connector.amqp091enabled";
public static final String CONNECTOR_AMQP09ENABLED = "connector.amqp09enabled";
@@ -667,6 +668,11 @@ public class ServerConfiguration extends ConfigurationPlugin
return getListValue("connector.port", Collections.<Integer>singletonList(DEFAULT_PORT));
}
+ public List getPortExclude10()
+ {
+ return getListValue("connector.non10port");
+ }
+
public List getPortExclude010()
{
return getListValue("connector.non010port");
@@ -843,6 +849,11 @@ public class ServerConfiguration extends ConfigurationPlugin
return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
}
+ public boolean isAmqp10enabled()
+ {
+ return getConfig().getBoolean(CONNECTOR_AMQP10ENABLED, true);
+ }
+
public boolean isAmqp010enabled()
{
return getConfig().getBoolean(CONNECTOR_AMQP010ENABLED, true);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
index 18a0e4c8bf..6c158de8b5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
@@ -39,6 +39,12 @@ public class SimpleFilterManager implements FilterManager
_filters = new ConcurrentLinkedQueue<MessageFilter>();
}
+ public SimpleFilterManager(JMSSelectorFilter messageFilter)
+ {
+ this();
+ add(messageFilter);
+ }
+
public void add(MessageFilter filter)
{
_filters.add(filter);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
new file mode 100755
index 0000000000..9cb5904bb3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
@@ -0,0 +1,522 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.message;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.qpid.amqp_1_0.codec.ValueHandler;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.amqp_1_0.type.messaging.Footer;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+public class MessageMetaData_1_0 implements StorableMessageMetaData
+{
+ // TODO move to somewhere more useful
+ public static final Symbol JMS_TYPE = Symbol.valueOf("jms-type");
+
+
+ private Header _header;
+ private Properties _properties;
+ private Map _deliveryAnnotations;
+ private Map _messageAnnotations;
+ private Map _appProperties;
+ private Map _footer;
+
+ private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3);
+
+ private volatile ByteBuffer _encoded;
+ private MessageHeader_1_0 _messageHeader;
+
+
+
+ public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder)
+ {
+ this(fragments, decoder, new ArrayList<ByteBuffer>(3));
+ }
+
+ public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immuatableSections)
+ {
+ this(constructSections(fragments, decoder,immuatableSections), immuatableSections);
+ }
+
+ private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections)
+ {
+ _encodedSections = encodedSections;
+
+ Iterator<Section> sectIter = sections.iterator();
+
+ Section section = sectIter.hasNext() ? sectIter.next() : null;
+ if(section instanceof Header)
+ {
+ _header = (Header) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof DeliveryAnnotations)
+ {
+ _deliveryAnnotations = ((DeliveryAnnotations) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof MessageAnnotations)
+ {
+ _messageAnnotations = ((MessageAnnotations) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof Properties)
+ {
+ _properties = (Properties) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof ApplicationProperties)
+ {
+ _appProperties = ((ApplicationProperties) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof Footer)
+ {
+ _footer = ((Footer) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ _messageHeader = new MessageHeader_1_0();
+
+ }
+
+ private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+
+ ByteBuffer src;
+ if(fragments.length == 1)
+ {
+ src = fragments[0].duplicate();
+ }
+ else
+ {
+ int size = 0;
+ for(ByteBuffer buf : fragments)
+ {
+ size += buf.remaining();
+ }
+ src = ByteBuffer.allocate(size);
+ for(ByteBuffer buf : fragments)
+ {
+ src.put(buf.duplicate());
+ }
+ src.flip();
+
+ }
+
+ try
+ {
+ int startBarePos = -1;
+ int lastPos = src.position();
+ Section s = decoder.readSection(src);
+
+
+
+ if(s instanceof Header)
+ {
+ sections.add(s);
+ lastPos = src.position();
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof DeliveryAnnotations)
+ {
+ sections.add(s);
+ lastPos = src.position();
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof MessageAnnotations)
+ {
+ sections.add(s);
+ lastPos = src.position();
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof Properties)
+ {
+ sections.add(s);
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof ApplicationProperties)
+ {
+ sections.add(s);
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof AmqpValue)
+ {
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+ else if(s instanceof Data)
+ {
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ do
+ {
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ } while(s instanceof Data);
+ }
+ else if(s instanceof AmqpSequence)
+ {
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ do
+ {
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+ while(s instanceof AmqpSequence);
+ }
+
+ if(s instanceof Footer)
+ {
+ sections.add(s);
+ }
+
+
+ int pos = 0;
+ for(ByteBuffer buf : fragments)
+ {
+/*
+ if(pos < startBarePos)
+ {
+ if(pos + buf.remaining() > startBarePos)
+ {
+ ByteBuffer dup = buf.duplicate();
+ dup.position(dup.position()+startBarePos-pos);
+ dup.slice();
+ encodedSections.add(dup);
+ }
+ }
+ else
+*/
+ {
+ encodedSections.add(buf.duplicate());
+ }
+ pos += buf.remaining();
+ }
+
+ return sections;
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+
+ public MessageMetaDataType getType()
+ {
+ return MessageMetaDataType.META_DATA_1_0;
+ }
+
+
+ public int getStorableSize()
+ {
+ int size = 0;
+
+ for(ByteBuffer bin : _encodedSections)
+ {
+ size += bin.limit();
+ }
+
+ return size;
+ }
+
+ private ByteBuffer encodeAsBuffer()
+ {
+ ByteBuffer buf = ByteBuffer.allocate(getStorableSize());
+
+ for(ByteBuffer bin : _encodedSections)
+ {
+ buf.put(bin.duplicate());
+ }
+
+ return buf;
+ }
+
+ public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ {
+ ByteBuffer buf = _encoded;
+
+ if(buf == null)
+ {
+ buf = encodeAsBuffer();
+ _encoded = buf;
+ }
+
+ buf = buf.duplicate();
+
+ buf.position(offsetInMetaData);
+
+ if(dest.remaining() < buf.limit())
+ {
+ buf.limit(dest.remaining());
+ }
+ dest.put(buf);
+ return buf.limit();
+ }
+
+ public int getContentSize()
+ {
+ ByteBuffer buf = _encoded;
+
+ if(buf == null)
+ {
+ buf = encodeAsBuffer();
+ _encoded = buf;
+ }
+ return buf.remaining();
+ }
+
+ public boolean isPersistent()
+ {
+ return _header != null && Boolean.TRUE.equals(_header.getDurable());
+ }
+
+ public MessageHeader_1_0 getMessageHeader()
+ {
+ return _messageHeader;
+ }
+
+ public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
+
+
+ private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0>
+ {
+ private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance();
+
+ public MessageMetaData_1_0 createMetaData(ByteBuffer buf)
+ {
+ ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+
+ ArrayList<Section> sections = new ArrayList<Section>(3);
+ ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3);
+
+ while(buf.hasRemaining())
+ {
+ try
+ {
+ ByteBuffer encodedBuf = buf.duplicate();
+ sections.add((Section) valueHandler.parse(buf));
+ encodedBuf.limit(buf.position());
+ encodedSections.add(encodedBuf);
+
+ }
+ catch (AmqpErrorException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ return new MessageMetaData_1_0(sections,encodedSections);
+
+ }
+ }
+
+ public class MessageHeader_1_0 implements AMQMessageHeader
+ {
+
+ public String getCorrelationId()
+ {
+ if(_properties == null || _properties.getCorrelationId() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getMessageId().toString();
+ }
+ }
+
+ public long getExpiration()
+ {
+ return 0; //TODO
+ }
+
+ public String getMessageId()
+ {
+ if(_properties == null || _properties.getCorrelationId() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getCorrelationId().toString();
+ }
+ }
+
+ public String getMimeType()
+ {
+
+ if(_properties == null || _properties.getContentType() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getContentType().toString();
+ }
+ }
+
+ public String getEncoding()
+ {
+ return null; //TODO
+ }
+
+ public byte getPriority()
+ {
+ if(_header == null || _header.getPriority() == null)
+ {
+ return 4; //javax.jms.Message.DEFAULT_PRIORITY;
+ }
+ else
+ {
+ return _header.getPriority().byteValue();
+ }
+ }
+
+ public long getTimestamp()
+ {
+ if(_properties == null || _properties.getCreationTime() == null)
+ {
+ return 0L;
+ }
+ else
+ {
+ return _properties.getCreationTime().getTime();
+ }
+
+ }
+
+ public String getType()
+ {
+
+ if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _messageAnnotations.get(JMS_TYPE).toString();
+ }
+ }
+
+ public String getReplyTo()
+ {
+ if(_properties == null || _properties.getReplyTo() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getReplyTo().toString();
+ }
+ }
+
+ public String getReplyToExchange()
+ {
+ return null; //TODO
+ }
+
+ public String getReplyToRoutingKey()
+ {
+ return null; //TODO
+ }
+
+ public Object getHeader(final String name)
+ {
+ return _appProperties == null ? null : _appProperties.get(name);
+ }
+
+ public boolean containsHeaders(final Set<String> names)
+ {
+ if(_appProperties == null)
+ {
+ return false;
+ }
+
+ for(String key : names)
+ {
+ if(!_appProperties.containsKey(key))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean containsHeader(final String name)
+ {
+ return _appProperties != null && _appProperties.containsKey(name);
+ }
+
+ public String getSubject()
+ {
+ return _properties == null ? null : _properties.getSubject();
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1a055240b9..b750b29952 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -58,6 +58,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
@@ -1455,7 +1456,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
throws AMQException
{
registerMessageDelivered(entry.getMessage().getSize());
- _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, sub.getConsumerTag());
+ _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
entry.incrementDeliveryCount();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java
index e925d7a1ec..0a71fe257a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java
@@ -20,4 +20,4 @@
*/
package org.apache.qpid.server.protocol;
-public enum AmqpProtocolVersion { v0_8, v0_9, v0_9_1, v0_10 } \ No newline at end of file
+public enum AmqpProtocolVersion { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 }
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 3b26f05f84..652ffee004 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -175,6 +175,28 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
(byte) 10
};
+ private static final byte[] AMQP_1_0_0_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ };
+
+ private static final byte[] AMQP_SASL_1_0_0_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ };
+
public void setNetworkConnection(NetworkConnection networkConnection)
{
setNetworkConnection(networkConnection, networkConnection.getSender());
@@ -289,8 +311,48 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
};
+ private DelegateCreator creator_1_0_0 = new DelegateCreator()
+ {
+
+ public AmqpProtocolVersion getVersion()
+ {
+ return AmqpProtocolVersion.v1_0_0;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_1_0_0_HEADER;
+ }
+
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return new ProtocolEngine_1_0_0(_appRegistry,_id);
+ }
+ };
+
+ private DelegateCreator creator_1_0_0_SASL = new DelegateCreator()
+ {
+
+ public AmqpProtocolVersion getVersion()
+ {
+ return AmqpProtocolVersion.v1_0_0;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_SASL_1_0_0_HEADER;
+ }
+
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return new ProtocolEngine_1_0_0_SASL(_network, _appRegistry, _id);
+ }
+ };
+
private final DelegateCreator[] _creators =
- new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+ new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10, creator_1_0_0_SASL, creator_1_0_0 };
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
new file mode 100755
index 0000000000..e6c79a4077
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -0,0 +1,391 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.FrameHandler;
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.transport.*;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+import javax.security.auth.callback.CallbackHandler;
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.UUID;
+import java.util.logging.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler
+{
+ static final AtomicLong _connectionIdSource = new AtomicLong(0L);
+
+ //private NetworkConnection _networkDriver;
+ private long _readBytes;
+ private long _writtenBytes;
+ private final UUID _id;
+ private final IApplicationRegistry _appRegistry;
+ private long _createTime = System.currentTimeMillis();
+ private ConnectionEndpoint _conn;
+ private final long _connectionId;
+
+ private static final ByteBuffer HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private FrameWriter _frameWriter;
+ private FrameHandler _frameHandler;
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
+
+
+ static enum State {
+ A,
+ M,
+ Q,
+ P,
+ PROTOCOL,
+ MAJOR,
+ MINOR,
+ REVISION,
+ FRAME
+ }
+
+ private State _state = State.A;
+
+ public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id)
+ {
+ _id = appRegistry.getConfigStore().createId();
+ _appRegistry = appRegistry;
+ _connectionId = id;
+ }
+
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _network.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _network.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+
+ Container container = new Container();
+
+ _conn = new ConnectionEndpoint(container,asCallbackHandlerSource(_appRegistry.getAuthenticationManager()));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setFrameOutputHandler(this);
+ _conn.setRemoteAddress(_network.getRemoteAddress());
+
+ _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
+ _frameHandler = new FrameHandler(_conn);
+
+ _sender.send(HEADER.duplicate());
+ _sender.flush();
+ }
+
+ private CallbackHandlerSource asCallbackHandlerSource(final AuthenticationManager authenticationManager)
+ {
+ return new CallbackHandlerSource()
+ {
+ @Override
+ public CallbackHandler getHandler(String mechanism)
+ {
+ return authenticationManager.getHandler(mechanism);
+ }
+ };
+ }
+
+ public String getAddress()
+ {
+ return getRemoteAddress().toString();
+ }
+
+
+ public ConfigStore getConfigStore()
+ {
+ return _appRegistry.getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public synchronized void received(ByteBuffer msg)
+ {
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+ _readBytes += msg.remaining();
+ switch(_state)
+ {
+ case A:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if(msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if(msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if(msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ if(msg.hasRemaining())
+ {
+ _frameHandler.parse(msg);
+ }
+ }
+
+ }
+
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ public void closed()
+ {
+ _conn.inputClosed();
+ if(_conn != null && _conn.getConnectionEventListener() != null)
+ {
+ ((Connection_1_0)_conn.getConnectionEventListener()).closed();
+ }
+
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public boolean canSend()
+ {
+ return true;
+ }
+
+ public void send(final AMQFrame amqFrame)
+ {
+ send(amqFrame, null);
+ }
+
+ private final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+ private final Logger RAW_LOGGER = Logger.getLogger("RAW");
+
+
+ public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ {
+ synchronized(_sendLock)
+ {
+
+ if(FRAME_LOGGER.isLoggable(Level.FINE))
+ {
+ FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
+ }
+
+
+ _frameWriter.setValue(amqFrame);
+
+
+
+ ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
+
+ int size = _frameWriter.writeToBuffer(dup);
+ if(size > _conn.getMaxFrameSize())
+ {
+ throw new OversizeFrameException(amqFrame,size);
+ }
+
+ dup.flip();
+ _writtenBytes += dup.limit();
+
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+
+
+ _sender.send(dup);
+ _sender.flush();
+
+ }
+ }
+
+ public void send(short channel, FrameBody body)
+ {
+ AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+ send(frame);
+
+ }
+
+ public void close()
+ {
+ //TODO
+ }
+
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
new file mode 100644
index 0000000000..e4487e00f9
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -0,0 +1,453 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.FrameHandler;
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
+import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+import javax.security.auth.callback.CallbackHandler;
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
+{
+ private long _readBytes;
+ private long _writtenBytes;
+ private final UUID _id;
+ private final IApplicationRegistry _appRegistry;
+ private long _createTime = System.currentTimeMillis();
+ private ConnectionEndpoint _conn;
+ private long _connectionId;
+
+ private static final ByteBuffer HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private static final ByteBuffer PROTOCOL_HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+
+ private FrameWriter _frameWriter;
+ private ProtocolHandler _frameHandler;
+ private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
+ private PrintWriter _out;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
+
+
+ static enum State {
+ A,
+ M,
+ Q,
+ P,
+ PROTOCOL,
+ MAJOR,
+ MINOR,
+ REVISION,
+ FRAME
+ }
+
+ private State _state = State.A;
+
+
+ public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final IApplicationRegistry appRegistry,
+ long id)
+ {
+ _id = appRegistry.getConfigStore().createId();
+ _connectionId = id;
+ _appRegistry = appRegistry;
+
+ if(networkDriver != null)
+ {
+ setNetworkConnection(networkDriver, networkDriver.getSender());
+ }
+ }
+
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _network.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _network.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+
+ Container container = new Container();
+
+ _conn = new ConnectionEndpoint(container, asCallbackHandlerSource(ApplicationRegistry.getInstance()
+ .getAuthenticationManager()));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setRemoteAddress(getRemoteAddress());
+
+
+ _conn.setFrameOutputHandler(this);
+ _conn.setSaslFrameOutput(this);
+
+ _conn.setOnSaslComplete(new Runnable()
+ {
+
+
+ public void run()
+ {
+ if(_conn.isAuthenticated())
+ {
+ _sender.send(PROTOCOL_HEADER.duplicate());
+ _sender.flush();
+ }
+ else
+ {
+ _network.close();
+ }
+ }
+ });
+ _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
+ _frameHandler = new SASLFrameHandler(_conn);
+
+ _sender.send(HEADER.duplicate());
+ _sender.flush();
+
+ _conn.initiateSASL();
+
+
+ }
+
+ private CallbackHandlerSource asCallbackHandlerSource(final AuthenticationManager authenticationManager)
+ {
+ return new CallbackHandlerSource()
+ {
+ @Override
+ public CallbackHandler getHandler(String mechanism)
+ {
+ return authenticationManager.getHandler(mechanism);
+ }
+ };
+ }
+
+ public String getAddress()
+ {
+ return getRemoteAddress().toString();
+ }
+
+
+ public ConfigStore getConfigStore()
+ {
+ return _appRegistry.getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ private final Logger RAW_LOGGER = Logger.getLogger("RAW");
+
+
+ public synchronized void received(ByteBuffer msg)
+ {
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+ _readBytes += msg.remaining();
+ switch(_state)
+ {
+ case A:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if(msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if(msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if(msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ if(msg.hasRemaining())
+ {
+ _frameHandler = _frameHandler.parse(msg);
+ }
+ }
+
+ }
+
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ public void closed()
+ {
+ // todo
+ _conn.inputClosed();
+ if(_conn != null && _conn.getConnectionEventListener() != null)
+ {
+ ((Connection_1_0)_conn.getConnectionEventListener()).closed();
+ }
+
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public boolean canSend()
+ {
+ return true;
+ }
+
+ public void send(final AMQFrame amqFrame)
+ {
+ send(amqFrame, null);
+ }
+
+ private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+
+
+ public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ {
+
+ synchronized(_sendLock)
+ {
+
+ if(FRAME_LOGGER.isLoggable(Level.FINE))
+ {
+ FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
+ }
+
+
+ _frameWriter.setValue(amqFrame);
+
+
+
+ ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
+
+ int size = _frameWriter.writeToBuffer(dup);
+ if(size > _conn.getMaxFrameSize())
+ {
+ throw new OversizeFrameException(amqFrame,size);
+ }
+
+ dup.flip();
+ _writtenBytes += dup.limit();
+
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+
+
+ _sender.send(dup);
+ _sender.flush();
+
+
+ }
+ }
+
+ public void send(short channel, FrameBody body)
+ {
+ AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+ send(frame);
+
+ }
+
+ public void close()
+ {
+ _sender.close();
+ }
+
+ public void setLogOutput(final PrintWriter out)
+ {
+ _out = out;
+ }
+
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
new file mode 100644
index 0000000000..318a240b27
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Connection_1_0 implements ConnectionEventListener
+{
+
+ private IApplicationRegistry _appRegistry;
+ private VirtualHost _vhost;
+
+
+ public static interface Task
+ {
+ public void doTask(Connection_1_0 connection);
+ }
+
+
+ private List<Task> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Task>());
+
+
+
+ public Connection_1_0(IApplicationRegistry appRegistry)
+ {
+ _appRegistry = appRegistry;
+ _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+ }
+
+ public void remoteSessionCreation(SessionEndpoint endpoint)
+ {
+ Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+ endpoint.setSessionEventListener(session);
+ }
+
+
+ void removeConnectionCloseTask(final Task task)
+ {
+ _closeTasks.remove( task );
+ }
+
+ void addConnectionCloseTask(final Task task)
+ {
+ _closeTasks.add( task );
+ }
+
+ public void closeReceived()
+ {
+ List<Task> taskCopy;
+ synchronized (_closeTasks)
+ {
+ taskCopy = new ArrayList<Task>(_closeTasks);
+ }
+ for(Task task : taskCopy)
+ {
+ task.doTask(this);
+ }
+ synchronized (_closeTasks)
+ {
+ _closeTasks.clear();
+ }
+
+ }
+
+ public void closed()
+ {
+ closeReceived();
+ }
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
new file mode 100644
index 0000000000..d45758391c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+
+public interface Destination
+{
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
new file mode 100644
index 0000000000..ba1a1ca45c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.List;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public class ExchangeDestination implements ReceivingDestination, SendingDestination
+{
+ private static final Accepted ACCEPTED = new Accepted();
+ private static final Outcome[] OUTCOMES = { ACCEPTED };
+
+ private Exchange _exchange;
+ private TerminusDurability _durability;
+ private TerminusExpiryPolicy _expiryPolicy;
+
+ public ExchangeDestination(Exchange exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
+ {
+ _exchange = exchange;
+ _durability = durable;
+ _expiryPolicy = expiryPolicy;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ {
+ final List<? extends BaseQueue> queues = _exchange.route(message);
+
+ txn.enqueue(queues,message, new ServerTransaction.Action()
+ {
+
+ BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
+
+ public void postCommit()
+ {
+ for(int i = 0; i < _queues.length; i++)
+ {
+ try
+ {
+ _queues[i].enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ }, System.currentTimeMillis());
+
+ return ACCEPTED;
+ }
+
+ TerminusDurability getDurability()
+ {
+ return _durability;
+ }
+
+ TerminusExpiryPolicy getExpiryPolicy()
+ {
+ return _expiryPolicy;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 20000;
+ }
+
+ public Exchange getExchange()
+ {
+ return _exchange;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
new file mode 100644
index 0000000000..42eea05d37
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
@@ -0,0 +1,59 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LinkRegistry
+{
+ private final Map<String, SendingLink_1_0> _sendingLinks = new HashMap<String, SendingLink_1_0>();
+ private final Map<String, ReceivingLink_1_0> _receivingLinks = new HashMap<String, ReceivingLink_1_0>();
+
+ public synchronized SendingLink_1_0 getDurableSendingLink(String name)
+ {
+ return _sendingLinks.get(name);
+ }
+
+ public synchronized boolean registerSendingLink(String name, SendingLink_1_0 link)
+ {
+ if(_sendingLinks.containsKey(name))
+ {
+ return false;
+ }
+ else
+ {
+ _sendingLinks.put(name, link);
+ return true;
+ }
+ }
+
+ public synchronized boolean unregisterSendingLink(String name)
+ {
+ if(!_sendingLinks.containsKey(name))
+ {
+ return false;
+ }
+ else
+ {
+ _sendingLinks.remove(name);
+ return true;
+ }
+ }
+
+ public synchronized ReceivingLink_1_0 getDurableReceivingLink(String name)
+ {
+ return _receivingLinks.get(name);
+ }
+
+ public synchronized boolean registerReceivingLink(String name, ReceivingLink_1_0 link)
+ {
+ if(_receivingLinks.containsKey(name))
+ {
+ return false;
+ }
+ else
+ {
+ _receivingLinks.put(name, link);
+ return true;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
new file mode 100644
index 0000000000..db81d3b205
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+public interface Link_1_0
+{
+ void start();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
new file mode 100644
index 0000000000..140a815f57
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class Message_1_0 implements ServerMessage, InboundMessage
+{
+ private final StoredMessage<MessageMetaData_1_0> _storedMessage;
+ private List<ByteBuffer> _fragments;
+ private WeakReference<Session_1_0> _session;
+
+
+ public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
+ final List<ByteBuffer> fragments,
+ final Session_1_0 session)
+ {
+ _storedMessage = storedMessage;
+ _fragments = fragments;
+ _session = new WeakReference<Session_1_0>(session);
+ }
+
+ public String getRoutingKey()
+ {
+ Object routingKey = getMessageHeader().getHeader("routing-key");
+ if(routingKey != null)
+ {
+ return routingKey.toString();
+ }
+ else
+ {
+ return getMessageHeader().getSubject();
+ }
+ }
+
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(getRoutingKey());
+ }
+
+ private MessageMetaData_1_0 getMessageMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
+ {
+ return getMessageMetaData().getMessageHeader();
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return _storedMessage;
+ }
+
+ public boolean isPersistent()
+ {
+ return getMessageMetaData().isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ // TODO
+ return false;
+ }
+
+ public long getSize()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public long getExpiration()
+ {
+ return getMessageHeader().getExpiration();
+ }
+
+ public MessageReference<Message_1_0> newReference()
+ {
+ return new Reference(this);
+ }
+
+ public long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public long getArrivalTime()
+ {
+ return 0; //TODO
+ }
+
+ public int getContent(final ByteBuffer buf, final int offset)
+ {
+ return _storedMessage.getContent(offset, buf);
+ }
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ buf.limit(getContent(buf, offset));
+
+ return buf;
+ }
+
+ public SessionConfig getSessionConfig()
+ {
+ return null; //TODO
+ }
+
+ public List<ByteBuffer> getFragments()
+ {
+ return _fragments;
+ }
+
+ public Session_1_0 getSession()
+ {
+ return _session.get();
+ }
+
+ public static class Reference extends MessageReference<Message_1_0>
+ {
+ public Reference(Message_1_0 message)
+ {
+ super(message);
+ }
+
+ protected void onReference(Message_1_0 message)
+ {
+
+ }
+
+ protected void onRelease(Message_1_0 message)
+ {
+
+ }
+
+}
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
new file mode 100644
index 0000000000..af3f0b7872
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+
+import java.util.Arrays;
+
+public class QueueDestination implements SendingDestination, ReceivingDestination
+{
+ private static final Accepted ACCEPTED = new Accepted();
+ private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
+
+
+ private AMQQueue _queue;
+
+ public QueueDestination(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ {
+
+ try
+ {
+ txn.enqueue(_queue,message, new ServerTransaction.Action()
+ {
+
+
+ public void postCommit()
+ {
+ try
+ {
+
+ _queue.enqueue(message);
+ }
+ catch (Exception e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ });
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ return ACCEPTED;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 100;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
new file mode 100644
index 0000000000..4ae0596e25
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.Outcome;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public interface ReceivingDestination extends Destination
+{
+
+ Outcome[] getOutcomes();
+
+ Outcome send(Message_1_0 message, ServerTransaction txn);
+
+ int getCredit();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
new file mode 100644
index 0000000000..6da5081185
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
@@ -0,0 +1,51 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Target;
+
+public class ReceivingLinkAttachment
+{
+ private final Session_1_0 _session;
+ private final ReceivingLinkEndpoint _endpoint;
+
+ public ReceivingLinkAttachment(final Session_1_0 session, final ReceivingLinkEndpoint endpoint)
+ {
+ _session = session;
+ _endpoint = endpoint;
+ }
+
+ public Session_1_0 getSession()
+ {
+ return _session;
+ }
+
+ public ReceivingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public Source getSource()
+ {
+ return getEndpoint().getSource();
+ }
+
+ public void setDeliveryStateHandler(final DeliveryStateHandler handler)
+ {
+ getEndpoint().setDeliveryStateHandler(handler);
+ }
+
+ public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
+ {
+ getEndpoint().updateDisposition(deliveryTag, state, settled);
+ }
+
+ public Target getTarget()
+ {
+ return getEndpoint().getTarget();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
new file mode 100644
index 0000000000..e097dd5c83
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -0,0 +1,305 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, DeliveryStateHandler
+{
+ private VirtualHost _vhost;
+
+ private ReceivingDestination _destination;
+ private SectionDecoderImpl _sectionDecoder;
+ private volatile ReceivingLinkAttachment _attachment;
+
+
+ private ArrayList<Transfer> _incompleteMessage;
+ private TerminusDurability _durability;
+
+ private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
+ private boolean _resumedMessage;
+ private Binary _messageDeliveryTag;
+ private ReceiverSettleMode _receivingSettlementMode;
+
+
+ public ReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, VirtualHost vhost,
+ ReceivingDestination destination)
+ {
+ _vhost = vhost;
+ _destination = destination;
+ _attachment = receivingLinkAttachment;
+ receivingLinkAttachment.setDeliveryStateHandler(this);
+
+ _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
+
+ _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());
+
+
+ }
+
+ public void messageTransfer(Transfer xfr)
+ {
+ // TODO - cope with fragmented messages
+
+ List<ByteBuffer> fragments = null;
+
+
+
+ if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
+ {
+ _incompleteMessage = new ArrayList<Transfer>();
+ _incompleteMessage.add(xfr);
+ _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
+ _messageDeliveryTag = xfr.getDeliveryTag();
+ return;
+ }
+ else if(_incompleteMessage != null)
+ {
+ _incompleteMessage.add(xfr);
+
+ if(Boolean.TRUE.equals(xfr.getMore()))
+ {
+ return;
+ }
+
+ fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size());
+ for(Transfer t : _incompleteMessage)
+ {
+ fragments.add(t.getPayload());
+ }
+ _incompleteMessage=null;
+
+ }
+ else
+ {
+ _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
+ _messageDeliveryTag = xfr.getDeliveryTag();
+ fragments = Collections.singletonList(xfr.getPayload());
+ }
+
+ if(_resumedMessage)
+ {
+ if(_unsettledMap.containsKey(_messageDeliveryTag))
+ {
+ Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
+ boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+ getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
+ if(settled)
+ {
+ _unsettledMap.remove(_messageDeliveryTag);
+ }
+ }
+ else
+ {
+ System.err.println("UNEXPECTED!!");
+ System.err.println("Delivery Tag: " + _messageDeliveryTag);
+ System.err.println("_unsettledMap: " + _unsettledMap);
+
+ }
+ }
+ else
+ {
+ MessageMetaData_1_0 mmd = null;
+ List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3);
+ mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]),
+ _sectionDecoder,
+ immutableSections);
+
+ StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd);
+
+ boolean skipping = true;
+ int offset = 0;
+
+ for(ByteBuffer bareMessageBuf : immutableSections)
+ {
+ storedMessage.addContent(offset, bareMessageBuf.duplicate());
+ offset += bareMessageBuf.remaining();
+ }
+
+ storedMessage.flushToStore();
+
+ Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession());
+
+
+ Binary transactionId = null;
+ org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
+ if(xfrState != null)
+ {
+ if(xfrState instanceof TransactionalState)
+ {
+ transactionId = ((TransactionalState)xfrState).getTxnId();
+ }
+ }
+
+ ServerTransaction transaction = null;
+ if(transactionId != null)
+ {
+ transaction = getSession().getTransaction(transactionId);
+ }
+ else
+ {
+ Session_1_0 session = getSession();
+ transaction = session != null ? session.getTransaction(null) : new AutoCommitTransaction(_vhost.getMessageStore());
+ }
+
+ Outcome outcome = _destination.send(message, transaction);
+
+ DeliveryState resultantState;
+
+ if(transactionId == null)
+ {
+ resultantState = (DeliveryState) outcome;
+ }
+ else
+ {
+ TransactionalState transactionalState = new TransactionalState();
+ transactionalState.setOutcome(outcome);
+ transactionalState.setTxnId(transactionId);
+ resultantState = transactionalState;
+
+ }
+
+
+ boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+
+ final Binary deliveryTag = xfr.getDeliveryTag();
+
+ if(!settled)
+ {
+ _unsettledMap.put(deliveryTag, outcome);
+ }
+
+ getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
+
+ if(!(transaction instanceof AutoCommitTransaction))
+ {
+ ServerTransaction.Action a;
+ transaction.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ getEndpoint().updateDisposition(deliveryTag, null, true);
+ }
+
+ public void onRollback()
+ {
+ getEndpoint().updateDisposition(deliveryTag, null, true);
+ }
+ });
+ }
+ }
+ }
+
+ private ReceiverSettleMode getReceivingSettlementMode()
+ {
+ return _receivingSettlementMode;
+ }
+
+ public void remoteDetached(LinkEndpoint endpoint, Detach detach)
+ {
+ //TODO
+ // if not durable or close
+ if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
+ (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ {
+ endpoint.close();
+ }
+ else if(detach == null || detach.getError() != null)
+ {
+ _attachment = null;
+ }
+ }
+
+ public void start()
+ {
+ getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
+ getEndpoint().setCreditWindow();
+ }
+
+ public ReceivingLinkEndpoint getEndpoint()
+ {
+ return _attachment.getEndpoint();
+ }
+
+
+ public Session_1_0 getSession()
+ {
+ ReceivingLinkAttachment attachment = _attachment;
+ return attachment == null ? null : attachment.getSession();
+ }
+
+ public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ _unsettledMap.remove(deliveryTag);
+ }
+ }
+
+ public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
+ {
+ _attachment = linkAttachment;
+ _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
+ ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
+ Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+
+ Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
+ for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
+ {
+ Binary deliveryTag = entry.getKey();
+ if(!initialUnsettledMap.containsKey(deliveryTag))
+ {
+ _unsettledMap.remove(deliveryTag);
+ }
+ }
+
+ }
+
+ public Map getUnsettledOutcomeMap()
+ {
+ return _unsettledMap;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
new file mode 100644
index 0000000000..6d601c9dda
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+
+public interface SendingDestination extends Destination
+{
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
new file mode 100644
index 0000000000..9d7af24135
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
@@ -0,0 +1,44 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Source;
+
+public class SendingLinkAttachment
+{
+ private final Session_1_0 _session;
+ private final SendingLinkEndpoint _endpoint;
+
+ public SendingLinkAttachment(final Session_1_0 session, final SendingLinkEndpoint endpoint)
+ {
+ _session = session;
+ _endpoint = endpoint;
+ }
+
+ public Session_1_0 getSession()
+ {
+ return _session;
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public Source getSource()
+ {
+ return getEndpoint().getSource();
+ }
+
+ public void setDeliveryStateHandler(final DeliveryStateHandler handler)
+ {
+ getEndpoint().setDeliveryStateHandler(handler);
+ }
+
+ public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
+ {
+ getEndpoint().updateDisposition(deliveryTag, state, settled);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
new file mode 100644
index 0000000000..edd3bb6248
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -0,0 +1,646 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
+{
+ private VirtualHost _vhost;
+ private SendingDestination _destination;
+
+ private Subscription_1_0 _subscription;
+ private boolean _draining;
+ private final Map<Binary, QueueEntry> _unsettledMap =
+ new HashMap<Binary, QueueEntry>();
+
+ private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
+ new ConcurrentHashMap<Binary, UnsettledAction>();
+ private volatile SendingLinkAttachment _linkAttachment;
+ private TerminusDurability _durability;
+ private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+ private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
+ private Runnable _closeAction;
+
+ public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
+ final VirtualHost vhost,
+ final SendingDestination destination)
+ throws AmqpErrorException
+ {
+ _vhost = vhost;
+ _destination = destination;
+ _linkAttachment = linkAttachment;
+ final Source source = (Source) linkAttachment.getSource();
+ _durability = source.getDurable();
+ linkAttachment.setDeliveryStateHandler(this);
+ QueueDestination qd = null;
+ AMQQueue queue = null;
+
+
+
+ boolean noLocal = false;
+ JMSSelectorFilter messageFilter = null;
+
+ if(destination instanceof QueueDestination)
+ {
+ queue = ((QueueDestination) _destination).getQueue();
+ if(queue.getArguments() != null && queue.getArguments().containsKey("topic"))
+ {
+ source.setDistributionMode(StdDistMode.COPY);
+ }
+ qd = (QueueDestination) destination;
+
+ Map<Symbol,Filter> filters = source.getFilter();
+
+ Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
+
+ if(filters != null)
+ {
+ for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+ {
+ if(entry.getValue() instanceof NoLocalFilter)
+ {
+ actualFilters.put(entry.getKey(), entry.getValue());
+ noLocal = true;
+ }
+ else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter)
+ {
+
+ org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+ try
+ {
+ messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
+
+ actualFilters.put(entry.getKey(), entry.getValue());
+ }
+ catch (ParseException e)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.INVALID_FIELD);
+ error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
+ catch (SelectorParsingException e)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.INVALID_FIELD);
+ error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
+
+
+ }
+ }
+ }
+ source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
+
+ _subscription = new Subscription_1_0(this, qd);
+ }
+ else if(destination instanceof ExchangeDestination)
+ {
+ try
+ {
+
+ ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
+
+ boolean isDurable = exchangeDestination.getDurability() == TerminusDurability.CONFIGURATION
+ || exchangeDestination.getDurability() == TerminusDurability.UNSETTLED_STATE;
+ String name;
+ if(isDurable)
+ {
+ String remoteContainerId = getEndpoint().getSession().getConnection().getRemoteContainerId();
+ remoteContainerId = remoteContainerId.replace("_","__").replace(".", "_:");
+
+ String endpointName = linkAttachment.getEndpoint().getName();
+ endpointName = endpointName
+ .replace("_", "__")
+ .replace(".", "_:")
+ .replace("(", "_O")
+ .replace(")", "_C")
+ .replace("<", "_L")
+ .replace(">", "_R");
+ name = "qpid_/" + remoteContainerId + "_/" + endpointName;
+ }
+ else
+ {
+ name = UUID.randomUUID().toString();
+ }
+
+ queue = _vhost.getQueueRegistry().getQueue(name);
+ Exchange exchange = exchangeDestination.getExchange();
+
+ if(queue == null)
+ {
+ queue = AMQQueueFactory.createAMQQueueImpl(
+ name,
+ isDurable,
+ null,
+ true,
+ true,
+ _vhost,
+ Collections.EMPTY_MAP);
+ }
+ else
+ {
+ List<Binding> bindings = queue.getBindings();
+ List<Binding> bindingsToRemove = new ArrayList<Binding>();
+ for(Binding existingBinding : bindings)
+ {
+ if(existingBinding.getExchange() != _vhost.getExchangeRegistry().getDefaultExchange()
+ && existingBinding.getExchange() != exchange)
+ {
+ bindingsToRemove.add(existingBinding);
+ }
+ }
+ for(Binding existingBinding : bindingsToRemove)
+ {
+ existingBinding.getExchange().removeBinding(existingBinding);
+ }
+ }
+
+
+ String binding = "";
+
+ Map<Symbol,Filter> filters = source.getFilter();
+ Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
+ boolean hasBindingFilter = false;
+ if(filters != null && !filters.isEmpty())
+ {
+
+ for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+ {
+ if(!hasBindingFilter
+ && entry.getValue() instanceof ExactSubjectFilter
+ && exchange.getType() == DirectExchange.TYPE)
+ {
+ ExactSubjectFilter filter = (ExactSubjectFilter) filters.values().iterator().next();
+ source.setFilter(filters);
+ binding = filter.getValue();
+ actualFilters.put(entry.getKey(), entry.getValue());
+ hasBindingFilter = true;
+ }
+ else if(!hasBindingFilter
+ && entry.getValue() instanceof MatchingSubjectFilter
+ && exchange.getType() == TopicExchange.TYPE)
+ {
+ MatchingSubjectFilter filter = (MatchingSubjectFilter) filters.values().iterator().next();
+ source.setFilter(filters);
+ binding = filter.getValue();
+ actualFilters.put(entry.getKey(), entry.getValue());
+ hasBindingFilter = true;
+ }
+ else if(entry.getValue() instanceof NoLocalFilter)
+ {
+ actualFilters.put(entry.getKey(), entry.getValue());
+ noLocal = true;
+ }
+ else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter)
+ {
+
+ org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+ try
+ {
+ messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
+
+ actualFilters.put(entry.getKey(), entry.getValue());
+ }
+ catch (ParseException e)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.INVALID_FIELD);
+ error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
+ catch (SelectorParsingException e)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.INVALID_FIELD);
+ error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
+
+
+ }
+ }
+ }
+ source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
+
+ vhost.getBindingFactory().addBinding(binding,queue,exchange,null);
+ source.setDistributionMode(StdDistMode.COPY);
+
+ qd = new QueueDestination(queue);
+ }
+ catch (AMQSecurityException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (AMQInternalException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ _subscription = new Subscription_1_0(this, qd, true);
+
+ }
+
+ if(_subscription != null)
+ {
+ _subscription.setNoLocal(noLocal);
+ if(messageFilter!=null)
+ {
+ _subscription.setFilters(new SimpleFilterManager(messageFilter));
+ }
+
+ try
+ {
+
+ queue.registerSubscription(_subscription, false);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+ }
+
+ }
+
+ public void resume(SendingLinkAttachment linkAttachment)
+ {
+ _linkAttachment = linkAttachment;
+
+ }
+
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ //TODO
+ // if not durable or close
+ if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
+ (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ {
+
+ AMQQueue queue = _subscription.getQueue();
+
+ try
+ {
+
+ queue.unregisterSubscription(_subscription);
+
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+
+ DeliveryState state = new Released();
+
+ for(UnsettledAction action : _unsettledActionMap.values())
+ {
+
+ action.process(state,Boolean.TRUE);
+ }
+ _unsettledActionMap.clear();
+
+ endpoint.close();
+
+ if(_destination instanceof ExchangeDestination
+ && (_durability == TerminusDurability.CONFIGURATION
+ || _durability == TerminusDurability.UNSETTLED_STATE))
+ {
+ try
+ {
+ queue.delete();
+ }
+ catch(AMQException e)
+ {
+ e.printStackTrace(); // TODO - Implement
+ }
+ }
+
+ if(_closeAction != null)
+ {
+ _closeAction.run();
+ }
+ }
+ else if(detach == null || detach.getError() != null)
+ {
+ _linkAttachment = null;
+ _subscription.flowStateChanged();
+ }
+ else
+ {
+ endpoint.detach();
+ }
+ }
+
+ public void start()
+ {
+ //TODO
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ;
+ }
+
+ public Session_1_0 getSession()
+ {
+ return _linkAttachment == null ? null : _linkAttachment.getSession();
+ }
+
+ public void flowStateChanged()
+ {
+ if(Boolean.TRUE.equals(getEndpoint().getDrain())
+ && hasCredit())
+ {
+ _draining = true;
+ }
+
+ while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend())
+ {
+ Accepted accepted = new Accepted();
+ synchronized(getLock())
+ {
+
+ Transfer xfr = new Transfer();
+ Binary dt = _resumeAcceptedTransfers.remove(0);
+ xfr.setDeliveryTag(dt);
+ xfr.setState(accepted);
+ xfr.setResume(Boolean.TRUE);
+ getEndpoint().transfer(xfr);
+ }
+
+ }
+ if(_resumeAcceptedTransfers.isEmpty())
+ {
+ _subscription.flowStateChanged();
+ }
+
+ }
+
+ boolean hasCredit()
+ {
+ return getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0;
+ }
+
+ public boolean isDraining()
+ {
+ return false; //TODO
+ }
+
+ public boolean drained()
+ {
+ if(getEndpoint() != null)
+ {
+ synchronized(getEndpoint().getLock())
+ {
+ if(_draining)
+ {
+ //TODO
+ getEndpoint().drained();
+ _draining = false;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+ {
+ _unsettledActionMap.put(tag,unsettledAction);
+ if(getTransactionId() == null)
+ {
+ _unsettledMap.put(tag, queueEntry);
+ }
+ }
+
+ public void removeUnsettled(Binary tag)
+ {
+ _unsettledActionMap.remove(tag);
+ }
+
+ public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+ boolean localSettle = false;
+ if(action != null)
+ {
+ localSettle = action.process(state, settled);
+ if(localSettle && !Boolean.TRUE.equals(settled))
+ {
+ _linkAttachment.updateDisposition(deliveryTag, state, true);
+ }
+ }
+ if(Boolean.TRUE.equals(settled) || localSettle)
+ {
+ _unsettledActionMap.remove(deliveryTag);
+ _unsettledMap.remove(deliveryTag);
+ }
+ }
+
+ ServerTransaction getTransaction(Binary transactionId)
+ {
+ return _linkAttachment.getSession().getTransaction(transactionId);
+ }
+
+ public Binary getTransactionId()
+ {
+ SendingLinkEndpoint endpoint = getEndpoint();
+ return endpoint == null ? null : endpoint.getTransactionId();
+ }
+
+ public synchronized Object getLock()
+ {
+ return _linkAttachment == null ? this : getEndpoint().getLock();
+ }
+
+ public boolean isDetached()
+ {
+ return _linkAttachment == null || getEndpoint().isDetached();
+ }
+
+ public boolean isAttached()
+ {
+ return _linkAttachment != null && getEndpoint().isAttached();
+ }
+
+ public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
+ {
+
+ if(_subscription.isActive())
+ {
+ _subscription.suspend();
+ }
+
+ _linkAttachment = linkAttachment;
+
+ SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
+ endpoint.setDeliveryStateHandler(this);
+ Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+ Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ _resumeAcceptedTransfers.clear();
+ _resumeFullTransfers.clear();
+
+ for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+ {
+ Binary deliveryTag = entry.getKey();
+ final QueueEntry queueEntry = entry.getValue();
+ if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+ {
+ queueEntry.setRedelivered();
+ queueEntry.release();
+ _unsettledMap.remove(deliveryTag);
+ }
+ else if(initialUnsettledMap != null && (initialUnsettledMap.get(deliveryTag) instanceof Outcome))
+ {
+ Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+
+ if(outcome instanceof Accepted)
+ {
+ AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
+ if(_subscription.acquires())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.discard();
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+ }
+ }
+ else if(outcome instanceof Released)
+ {
+ AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
+ if(_subscription.acquires())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.release();
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+ }
+ }
+ //_unsettledMap.remove(deliveryTag);
+ initialUnsettledMap.remove(deliveryTag);
+ _resumeAcceptedTransfers.add(deliveryTag);
+ }
+ else
+ {
+ _resumeFullTransfers.add(queueEntry);
+ // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+ }
+ // TODO - else
+ }
+
+
+ }
+
+ public Map getUnsettledOutcomeMap()
+ {
+ Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+
+ for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+ {
+ entry.setValue(null);
+ }
+
+ return unsettled;
+ }
+
+ public void setCloseAction(Runnable action)
+ {
+ _closeAction = action;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
new file mode 100644
index 0000000000..3f7eb18989
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.*;
+
+public class Session_1_0 implements SessionEventListener
+{
+ private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+ private IApplicationRegistry _appRegistry;
+ private VirtualHost _vhost;
+ private AutoCommitTransaction _transaction;
+
+ private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
+ new LinkedHashMap<Integer, ServerTransaction>();
+ private final Connection_1_0 _connection;
+
+
+ public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
+ {
+ _appRegistry = appRegistry;
+ _vhost = vhost;
+ _transaction = new AutoCommitTransaction(vhost.getMessageStore());
+ _connection = connection;
+
+ }
+
+ public void remoteLinkCreation(final LinkEndpoint endpoint)
+ {
+
+
+ Destination destination;
+ Link_1_0 link = null;
+ Error error = null;
+
+ final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
+
+
+ if(endpoint.getRole() == Role.SENDER)
+ {
+
+ SendingLink_1_0 previousLink = linkRegistry.getDurableSendingLink(endpoint.getName());
+
+ if(previousLink == null)
+ {
+
+ Target target = (Target) endpoint.getTarget();
+ Source source = (Source) endpoint.getSource();
+
+
+ if(source != null)
+ {
+ if(Boolean.TRUE.equals(source.getDynamic()))
+ {
+ AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
+ source.setAddress(tempQueue.getName());
+ }
+ String addr = source.getAddress();
+ AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ if(queue != null)
+ {
+
+ destination = new QueueDestination(queue);
+
+
+
+ }
+ else
+ {
+ Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+ if(exchg != null)
+ {
+ destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
+ }
+ else
+ {
+
+ endpoint.setSource(null);
+ destination = null;
+ }
+ }
+
+ }
+ else
+ {
+ destination = null;
+ }
+
+ if(destination != null)
+ {
+ final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+ try
+ {
+ final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+ _vhost,
+ (SendingDestination) destination
+ );
+ sendingLinkEndpoint.setLinkEventListener(sendingLink);
+ link = sendingLink;
+ if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+ {
+ linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
+ sendingLink.setCloseAction(new Runnable() {
+
+ public void run()
+ {
+ linkRegistry.unregisterSendingLink(endpoint.getName());
+ }
+ });
+ }
+ }
+ catch(AmqpErrorException e)
+ {
+ e.printStackTrace();
+ destination = null;
+ sendingLinkEndpoint.setSource(null);
+ error = e.getError();
+ }
+ }
+ }
+ else
+ {
+ endpoint.setSource(previousLink.getEndpoint().getSource());
+ SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+ previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
+ sendingLinkEndpoint.setLinkEventListener(previousLink);
+ link = previousLink;
+ endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+ }
+ }
+ else
+ {
+ if(endpoint.getTarget() instanceof Coordinator)
+ {
+ Coordinator coordinator = (Coordinator) endpoint.getTarget();
+ TxnCapability[] capabilities = coordinator.getCapabilities();
+ boolean localTxn = false;
+ boolean multiplePerSession = false;
+ if(capabilities != null)
+ {
+ for(TxnCapability capability : capabilities)
+ {
+ if(capability.equals(TxnCapability.LOCAL_TXN))
+ {
+ localTxn = true;
+ }
+ else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+ {
+ multiplePerSession = true;
+ }
+ else
+ {
+ error = new Error();
+ error.setCondition(AmqpError.NOT_IMPLEMENTED);
+ error.setDescription("Unsupported capability: " + capability);
+ break;
+ }
+ }
+ }
+
+ /* if(!localTxn)
+ {
+ capabilities.add(TxnCapabilities.LOCAL_TXN);
+ }*/
+
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final TxnCoordinatorLink_1_0 coordinatorLink =
+ new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
+ receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
+ link = coordinatorLink;
+
+
+ }
+ else
+ {
+
+ ReceivingLink_1_0 previousLink = linkRegistry.getDurableReceivingLink(endpoint.getName());
+
+ if(previousLink == null)
+ {
+
+ Target target = (Target) endpoint.getTarget();
+
+ if(target != null)
+ {
+ if(Boolean.TRUE.equals(target.getDynamic()))
+ {
+
+ AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
+ target.setAddress(tempQueue.getName());
+ }
+
+ String addr = target.getAddress();
+ Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+ if(exchg != null)
+ {
+ destination = new ExchangeDestination(exchg, target.getDurable(),
+ target.getExpiryPolicy());
+ }
+ else
+ {
+ AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ if(queue != null)
+ {
+
+ destination = new QueueDestination(queue);
+ }
+ else
+ {
+ endpoint.setTarget(null);
+ destination = null;
+ }
+
+ }
+
+
+ }
+ else
+ {
+ destination = null;
+ }
+ if(destination != null)
+ {
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
+ (ReceivingDestination) destination);
+ receivingLinkEndpoint.setLinkEventListener(receivingLink);
+ link = receivingLink;
+ if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
+ {
+ linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
+ }
+ }
+ }
+ else
+ {
+ ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+ receivingLinkEndpoint.setLinkEventListener(previousLink);
+ link = previousLink;
+ endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+
+ }
+ }
+ }
+
+ endpoint.attach();
+
+ if(link == null)
+ {
+ if(error == null)
+ {
+ error = new Error();
+ error.setCondition(AmqpError.NOT_FOUND);
+ }
+ endpoint.detach(error);
+ }
+ else
+ {
+ link.start();
+ }
+ }
+
+
+ private AMQQueue createTemporaryQueue(Map properties)
+ {
+ final String queueName = UUID.randomUUID().toString();
+ AMQQueue queue = null;
+ try
+ {
+ LifetimePolicy lifetimePolicy = properties == null
+ ? null
+ : (LifetimePolicy) properties.get(LIFETIME_POLICY);
+
+ final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl(queueName,
+ false, // durable
+ null, // owner
+ false, // autodelete
+ false, // exclusive
+ _vhost,
+ properties);
+
+
+
+ if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
+ {
+ final Connection_1_0.Task deleteQueueTask =
+ new Connection_1_0.Task()
+ {
+ public void doTask(Connection_1_0 session)
+ {
+ if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
+ {
+ try
+ {
+ tempQueue.delete();
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ }
+ }
+ };
+
+ _connection.addConnectionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ _connection.removeConnectionCloseTask(deleteQueueTask);
+ }
+
+
+ });
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoLinks)
+ {
+
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoMessages)
+ {
+
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+ {
+
+ }
+ }
+ catch (AMQSecurityException e)
+ {
+ e.printStackTrace(); //TODO.
+ } catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+
+ return queue;
+ }
+
+ public ServerTransaction getTransaction(Binary transactionId)
+ {
+ // TODO should treat invalid id differently to null
+ ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
+ return transaction == null ? _transaction : transaction;
+ }
+
+ public void remoteEnd(End end)
+ {
+ Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
+
+ while(iter.hasNext())
+ {
+ Map.Entry<Integer, ServerTransaction> entry = iter.next();
+ entry.getValue().rollback();
+ iter.remove();
+ }
+
+ }
+
+ Integer binaryToInteger(final Binary txnId)
+ {
+ if(txnId == null)
+ {
+ return null;
+ }
+
+ if(txnId.getLength() > 4)
+ throw new IllegalArgumentException();
+
+ int id = 0;
+ byte[] data = txnId.getArray();
+ for(int i = 0; i < txnId.getLength(); i++)
+ {
+ id <<= 8;
+ id += data[i+txnId.getArrayOffset()];
+ }
+
+ return id;
+
+ }
+
+ Binary integerToBinary(final int txnId)
+ {
+ byte[] data = new byte[4];
+ data[3] = (byte) (txnId & 0xff);
+ data[2] = (byte) ((txnId & 0xff00) >> 8);
+ data[1] = (byte) ((txnId & 0xff0000) >> 16);
+ data[0] = (byte) ((txnId & 0xff000000) >> 24);
+ return new Binary(data);
+
+ }
+
+ public void forceEnd()
+ {
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
new file mode 100644
index 0000000000..425f63dd90
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -0,0 +1,621 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.amqp_1_0.codec.ValueHandler;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+class Subscription_1_0 implements Subscription
+{
+ private SendingLink_1_0 _link;
+
+ private AMQQueue _queue;
+
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED);
+
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+ private final long _id;
+ private final boolean _acquires;
+ private AMQQueue.Context _queueContext;
+ private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+ private ReentrantLock _stateChangeLock = new ReentrantLock();
+
+ private boolean _noLocal;
+ private FilterManager _filters;
+
+ private long _deliveryTag = 0L;
+ private StateListener _stateListener;
+
+ private Binary _transactionId;
+ private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
+ private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
+
+ public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination)
+ {
+ this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY);
+ }
+
+ public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires)
+ {
+ _link = link;
+ _queue = destination.getQueue();
+ _id = getEndpoint().getLocalHandle().longValue();
+ _acquires = acquires;
+ }
+
+ private SendingLinkEndpoint getEndpoint()
+ {
+ return _link.getEndpoint();
+ }
+
+ public LogActor getLogActor()
+ {
+ return null; //TODO
+ }
+
+ public boolean isTransient()
+ {
+ return true; //TODO
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+ public QueueEntry.SubscriptionAssignedState getAssignedState()
+ {
+ return _assignedState;
+ }
+
+ public void setQueue(final AMQQueue queue, final boolean exclusive)
+ {
+ //TODO
+ }
+
+ public void setNoLocal(final boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _noLocal;
+ }
+
+ public long getSubscriptionID()
+ {
+ return _id;
+ }
+
+ public boolean isSuspended()
+ {
+ return !isActive();// || !getEndpoint().hasCreditToSend();
+
+ }
+
+ public boolean hasInterest(final QueueEntry entry)
+ {
+ return !(_noLocal && (entry.getMessage() instanceof Message_1_0)
+ && ((Message_1_0)entry.getMessage()).getSession() == getSession())
+ && checkFilters(entry);
+
+ }
+
+ private boolean checkFilters(final QueueEntry entry)
+ {
+ return (_filters == null) || _filters.allAllow(entry);
+ }
+
+ public boolean isClosed()
+ {
+ return !getEndpoint().isAttached();
+ }
+
+ public boolean acquires()
+ {
+ return _acquires;
+ }
+
+ public boolean seesRequeues()
+ {
+ // TODO
+ return acquires();
+ }
+
+ public void close()
+ {
+ getEndpoint().detach();
+ }
+
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+ // TODO
+ send(entry);
+ }
+
+ public void flushBatched()
+ {
+ // TODO
+ }
+
+ public void send(final QueueEntry queueEntry) throws AMQException
+ {
+ //TODO
+ ServerMessage serverMessage = queueEntry.getMessage();
+ if(serverMessage instanceof Message_1_0)
+ {
+ Message_1_0 message = (Message_1_0) serverMessage;
+ Transfer transfer = new Transfer();
+ //TODO
+
+
+ List<ByteBuffer> fragments = message.getFragments();
+ ByteBuffer payload;
+ if(fragments.size() == 1)
+ {
+ payload = fragments.get(0);
+ }
+ else
+ {
+ int size = 0;
+ for(ByteBuffer fragment : fragments)
+ {
+ size += fragment.remaining();
+ }
+
+ payload = ByteBuffer.allocate(size);
+
+ for(ByteBuffer fragment : fragments)
+ {
+ payload.put(fragment.duplicate());
+ }
+
+ payload.flip();
+ }
+
+ if(queueEntry.getDeliveryCount() != 0)
+ {
+ payload = payload.duplicate();
+ ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+
+ Header oldHeader = null;
+ try
+ {
+ ByteBuffer encodedBuf = payload.duplicate();
+ Object value = valueHandler.parse(payload);
+ if(value instanceof Header)
+ {
+ oldHeader = (Header) value;
+ }
+ else
+ {
+ payload.position(0);
+ }
+ }
+ catch (AmqpErrorException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+
+ Header header = new Header();
+ if(oldHeader != null)
+ {
+ header.setDurable(oldHeader.getDurable());
+ header.setPriority(oldHeader.getPriority());
+ header.setTtl(oldHeader.getTtl());
+ }
+ header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+ _sectionEncoder.reset();
+ _sectionEncoder.encodeObject(header);
+ Binary encodedHeader = _sectionEncoder.getEncoding();
+
+ ByteBuffer oldPayload = payload;
+ payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength());
+ payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
+ payload.put(oldPayload);
+ payload.flip();
+ }
+
+ transfer.setPayload(payload);
+ byte[] data = new byte[8];
+ ByteBuffer.wrap(data).putLong(_deliveryTag++);
+ final Binary tag = new Binary(data);
+
+ transfer.setDeliveryTag(tag);
+
+ synchronized(_link.getLock())
+ {
+ if(_link.isAttached())
+ {
+ if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+ {
+ transfer.setSettled(true);
+ }
+ else
+ {
+ UnsettledAction action = _acquires
+ ? new DispositionAction(tag, queueEntry)
+ : new DoNothingAction(tag, queueEntry);
+
+ _link.addUnsettled(tag, action, queueEntry);
+ }
+
+ if(_transactionId != null)
+ {
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(_transactionId);
+ transfer.setState(state);
+ }
+ // TODO - need to deal with failure here
+ if(_acquires && _transactionId != null)
+ {
+ ServerTransaction txn = _link.getTransaction(_transactionId);
+ if(txn != null)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action(){
+
+ public void postCommit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void onRollback()
+ {
+ if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+ {
+ queueEntry.release();
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+
+
+ }
+ }
+ });
+ }
+
+ }
+
+ getEndpoint().transfer(transfer);
+ }
+ else
+ {
+ queueEntry.release();
+ }
+ }
+ }
+
+ }
+
+ public void queueDeleted(final AMQQueue queue)
+ {
+ //TODO
+ getEndpoint().setSource(null);
+ getEndpoint().detach();
+ }
+
+ public synchronized boolean wouldSuspend(final QueueEntry msg)
+ {
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+ if(!hasCredit && getState() == State.ACTIVE)
+ {
+ suspend();
+ }
+
+ return !hasCredit;
+ }
+
+ public boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ public synchronized void suspend()
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ public void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public void releaseQueueEntry(QueueEntry queueEntryImpl)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ public void onDequeue(final QueueEntry queueEntry)
+ {
+ //TODO
+ }
+
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+ //TODO
+ }
+
+ public void setStateListener(final StateListener listener)
+ {
+ _stateListener = listener;
+ }
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ public AMQQueue.Context getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
+
+ public boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+ public boolean isSessionTransactional()
+ {
+ return false; //TODO
+ }
+
+ public synchronized void queueEmpty()
+ {
+ if(_link.drained())
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+ }
+
+ public synchronized void flowStateChanged()
+ {
+ if(isSuspended() && getEndpoint() != null)
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ _transactionId = _link.getTransactionId();
+ }
+ }
+
+ public Session_1_0 getSession()
+ {
+ return _link.getSession();
+ }
+
+ private class DispositionAction implements UnsettledAction
+ {
+
+ private final QueueEntry _queueEntry;
+ private final Binary _deliveryTag;
+
+ public DispositionAction(Binary tag, QueueEntry queueEntry)
+ {
+ _deliveryTag = tag;
+ _queueEntry = queueEntry;
+ }
+
+ public boolean process(DeliveryState state, Boolean settled)
+ {
+
+ Binary transactionId = null;
+ final Outcome outcome;
+ // If disposition is settled this overrides the txn?
+ if(state instanceof TransactionalState)
+ {
+ transactionId = ((TransactionalState)state).getTxnId();
+ outcome = ((TransactionalState)state).getOutcome();
+ }
+ else if (state instanceof Outcome)
+ {
+ outcome = (Outcome) state;
+ }
+ else
+ {
+ outcome = null;
+ }
+
+
+ ServerTransaction txn = _link.getTransaction(transactionId);
+
+ if(outcome instanceof Accepted)
+ {
+ txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
+ {
+ _queueEntry.discard();
+ }
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ //_link.getEndpoint().settle(_deliveryTag);
+ _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState)outcome, true);
+ _link.getEndpoint().sendFlowConditional();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+ else if(outcome instanceof Released)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+
+ _queueEntry.release();
+ _link.getEndpoint().settle(_deliveryTag);
+ }
+
+ public void onRollback()
+ {
+ _link.getEndpoint().settle(_deliveryTag);
+ }
+ });
+ }
+
+ else if(outcome instanceof Modified)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+
+ _queueEntry.release();
+ _queueEntry.incrementDeliveryCount();
+ _link.getEndpoint().settle(_deliveryTag);
+ }
+
+ public void onRollback()
+ {
+ _link.getEndpoint().settle(_deliveryTag);
+ }
+ });
+ }
+
+ return (transactionId == null && outcome != null);
+ }
+ }
+
+ private class DoNothingAction implements UnsettledAction
+ {
+ public DoNothingAction(final Binary tag,
+ final QueueEntry queueEntry)
+ {
+ }
+
+ public boolean process(final DeliveryState state, final Boolean settled)
+ {
+ Binary transactionId = null;
+ Outcome outcome = null;
+ // If disposition is settled this overrides the txn?
+ if(state instanceof TransactionalState)
+ {
+ transactionId = ((TransactionalState)state).getTxnId();
+ outcome = ((TransactionalState)state).getOutcome();
+ }
+ else if (state instanceof Outcome)
+ {
+ outcome = (Outcome) state;
+ }
+ return true;
+ }
+ }
+
+ public FilterManager getFilters()
+ {
+ return _filters;
+ }
+
+ public void setFilters(final FilterManager filters)
+ {
+ _filters = filters;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
new file mode 100644
index 0000000000..a05d14816a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.Declare;
+import org.apache.qpid.amqp_1_0.type.transaction.Declared;
+import org.apache.qpid.amqp_1_0.type.transaction.Discharge;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0
+{
+ private VirtualHost _vhost;
+ private ReceivingLinkEndpoint _endpoint;
+
+ private ArrayList<Transfer> _incompleteMessage;
+ private SectionDecoder _sectionDecoder;
+ private LinkedHashMap<Integer, ServerTransaction> _openTransactions;
+ private Session_1_0 _session;
+
+
+ public TxnCoordinatorLink_1_0(VirtualHost vhost,
+ Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint,
+ LinkedHashMap<Integer, ServerTransaction> openTransactions)
+ {
+ _vhost = vhost;
+ _session = session_1_0;
+ _endpoint = endpoint;
+ _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry());
+ _openTransactions = openTransactions;
+ }
+
+ public void messageTransfer(Transfer xfr)
+ {
+ // TODO - cope with fragmented messages
+
+ ByteBuffer payload = null;
+
+
+ if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
+ {
+ _incompleteMessage = new ArrayList<Transfer>();
+ _incompleteMessage.add(xfr);
+ return;
+ }
+ else if(_incompleteMessage != null)
+ {
+ _incompleteMessage.add(xfr);
+ if(Boolean.TRUE.equals(xfr.getMore()))
+ {
+ return;
+ }
+
+ int size = 0;
+ for(Transfer t : _incompleteMessage)
+ {
+ size += t.getPayload().limit();
+ }
+ payload = ByteBuffer.allocate(size);
+ for(Transfer t : _incompleteMessage)
+ {
+ payload.put(t.getPayload().duplicate());
+ }
+ payload.flip();
+ _incompleteMessage=null;
+
+ }
+ else
+ {
+ payload = xfr.getPayload();
+ }
+
+
+ // Only interested int he amqp-value section that holds the message to the co-ordinator
+ try
+ {
+ List<Section> sections = _sectionDecoder.parseAll(payload);
+
+ for(Section section : sections)
+ {
+ if(section instanceof AmqpValue)
+ {
+ Object command = ((AmqpValue) section).getValue();
+
+ if(command instanceof Declare)
+ {
+ Integer txnId = Integer.valueOf(0);
+ Iterator<Integer> existingTxn = _openTransactions.keySet().iterator();
+ while(existingTxn.hasNext())
+ {
+ txnId = existingTxn.next();
+ }
+ txnId = Integer.valueOf(txnId.intValue() + 1);
+
+ _openTransactions.put(txnId, new LocalTransaction(_vhost.getMessageStore()));
+
+ Declared state = new Declared();
+
+
+
+ state.setTxnId(_session.integerToBinary(txnId));
+ _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true);
+
+ }
+ else if(command instanceof Discharge)
+ {
+ Discharge discharge = (Discharge) command;
+
+ DeliveryState state = xfr.getState();
+ discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail());
+ _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true);
+
+ }
+ }
+ }
+
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ public void remoteDetached(LinkEndpoint endpoint, Detach detach)
+ {
+ //TODO
+ endpoint.detach();
+ }
+
+ private Error discharge(Integer transactionId, boolean fail)
+ {
+ Error error = null;
+ ServerTransaction txn = _openTransactions.get(transactionId);
+ if(txn != null)
+ {
+ if(fail)
+ {
+ txn.rollback();
+ }
+ else
+ {
+ txn.commit();
+ }
+ _openTransactions.remove(transactionId);
+ }
+ else
+ {
+ error = new Error();
+ error.setCondition(AmqpError.NOT_FOUND);
+ error.setDescription("Unkown transactionId" + transactionId);
+ }
+ return error;
+ }
+
+
+
+ public void start()
+ {
+ _endpoint.setLinkCredit(UnsignedInteger.ONE);
+ _endpoint.setCreditWindow();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
new file mode 100644
index 0000000000..c497cc5146
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
@@ -0,0 +1,8 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+
+public interface UnsettledAction
+{
+ boolean process(DeliveryState state, Boolean settled);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
index c8f04c7b96..79279b44c7 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
@@ -52,4 +52,13 @@ final class QueueContext implements AMQQueue.Context
{
return _releasedEntry;
}
+
+ @Override
+ public String toString()
+ {
+ return "QueueContext{" +
+ "_lastSeenEntry=" + _lastSeenEntry +
+ ", _releasedEntry=" + _releasedEntry +
+ '}';
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 6c9e918324..209553e8fa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -227,9 +227,10 @@ public abstract class QueueEntryImpl implements QueueEntry
public void release()
{
EntryState state = _state;
-
+
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
+
if(state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount();
@@ -254,6 +255,7 @@ public abstract class QueueEntryImpl implements QueueEntry
routeToAlternate();
}
}
+
}
public boolean releaseButRetain()
@@ -267,7 +269,6 @@ public abstract class QueueEntryImpl implements QueueEntry
Subscription sub = ((SubscriptionAcquiredState) state).getSubscription();
if(_stateUpdater.compareAndSet(this, state, sub.getAssignedState()))
{
- System.err.println("Message released (and retained)" + getMessage().getMessageNumber());
getQueue().requeue(this);
if(_stateChangeListeners != null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 1cd7e3505f..e6f059a875 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -862,7 +862,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void requeue(QueueEntry entry)
{
-
SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
@@ -1743,6 +1742,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+ boolean queueEmpty = false;
try
{
@@ -1760,12 +1760,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
atTail = attemptDelivery(sub, true);
- if (atTail && !sub.isSuspended() && sub.isAutoClose())
+ if (atTail && getNextAvailableEntry(sub) == null)
{
- unregisterSubscription(sub);
-
- sub.confirmAutoClose();
-
+ queueEmpty = true;
}
else if (!atTail)
{
@@ -1787,6 +1784,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
sub.releaseSendLock();
}
+ if(queueEmpty)
+ {
+ sub.queueEmpty();
+ }
+
sub.flushBatched();
}
@@ -2009,13 +2011,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (subscriptionDone)
{
sub.flushBatched();
- //close autoClose subscriptions if we are not currently intent on continuing
- if (lastLoop && !sub.isSuspended() && sub.isAutoClose())
+ if (lastLoop && !sub.isSuspended())
{
-
- unregisterSubscription(sub);
-
- sub.confirmAutoClose();
+ sub.queueEmpty();
}
break;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
index 4ffa5a4bc2..6c1a917d5b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
@@ -88,4 +89,6 @@ public interface AuthenticationManager extends Closeable, Plugin
* @return authentication result
*/
AuthenticationResult authenticate(String username, String password);
+
+ CallbackHandler getHandler(String mechanism);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index 3fa0de2af0..b5d70d9200 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -300,6 +300,11 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
}
}
+ public CallbackHandler getHandler(String mechanism)
+ {
+ return _callbackHandlerMap.get(mechanism);
+ }
+
/**
* @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
index 3664568b75..4650234972 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
@@ -58,4 +58,4 @@ public class AnonymousSaslServerFactory implements SaslServerFactory
return new String[]{AnonymousSaslServer.MECHANISM};
}
}
-} \ No newline at end of file
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
index 428bb1e41b..0fab60b6f3 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
@@ -22,13 +22,16 @@ package org.apache.qpid.server.store;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
import java.nio.ByteBuffer;
public enum MessageMetaDataType
{
META_DATA_0_8 { public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } },
- META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } };
+ META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } },
+ META_DATA_1_0 { public Factory<MessageMetaData_1_0> getFactory() { return MessageMetaData_1_0.FACTORY; } };
+
public static interface Factory<M extends StorableMessageMetaData>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index f8a585b562..66825caa24 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -54,16 +54,12 @@ public interface Subscription
void setNoLocal(boolean noLocal);
- AMQShortString getConsumerTag();
-
long getSubscriptionID();
boolean isSuspended();
boolean hasInterest(QueueEntry msg);
- boolean isAutoClose();
-
boolean isClosed();
boolean acquires();
@@ -105,11 +101,11 @@ public interface Subscription
boolean isActive();
- void confirmAutoClose();
-
public void set(String key, Object value);
public Object get(String key);
boolean isSessionTransactional();
+
+ void queueEmpty() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 32baa17fc7..1f25c215cc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -375,7 +375,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
return getQueue().getConfigStore();
}
-
+
public Long getDelivered()
{
return _deliveredCount.get();
@@ -810,12 +810,22 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
return _channel.isTransactional();
}
-
+
public long getCreateTime()
{
return _createTime;
}
+ public void queueEmpty() throws AMQException
+ {
+ if (isAutoClose())
+ {
+ _queue.unregisterSubscription(this);
+
+ confirmAutoClose();
+ }
+ }
+
public void flushBatched()
{
_channel.getProtocolSession().setDeferFlush(false);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index a75467ac42..76d975a789 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -109,7 +109,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void stateChange(Subscription sub, State oldState, State newState)
{
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+ CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
}
};
private AMQQueue _queue;
@@ -199,12 +199,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
filterLogString.length() > 0));
}
-
- }
- public AMQShortString getConsumerTag()
- {
- return new AMQShortString(_destination);
}
public boolean isSuspended()
@@ -244,12 +239,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return (_filters == null) || _filters.allAllow(entry);
}
- public boolean isAutoClose()
- {
- // no such thing in 0-10
- return false;
- }
-
public boolean isClosed()
{
return getState() == State.CLOSED;
@@ -302,7 +291,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
return getQueue().getConfigStore();
}
-
+
public Long getDelivered()
{
return _deliveredCount.get();
@@ -823,11 +812,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return getState() == State.ACTIVE;
}
- public void confirmAutoClose()
- {
- //No such thing in 0-10
- }
-
public void set(String key, Object value)
{
_properties.put(key, value);
@@ -1026,6 +1010,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _session.isTransactional();
}
+ public void queueEmpty()
+ {
+ }
+
public long getCreateTime()
{
return _createTime;
@@ -1033,7 +1021,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public String toLogString()
{
- String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
+ String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
_queue.getNameShortString());
String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
// queueString is "vh(/{0})/qu({1}) " so need to trim
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 70239b0fee..6f979e035e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -134,7 +134,7 @@ public class ServerSession extends Session
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -153,7 +153,7 @@ public class ServerSession extends Session
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
{
super(connection, delegate, name, expiry);
- _connectionConfig = connConfig;
+ _connectionConfig = connConfig;
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
@@ -353,7 +353,7 @@ public class ServerSession extends Session
}
}
- public void removeDispositionListener(Method method)
+ public void removeDispositionListener(Method method)
{
_messageDispositionListenerMap.remove(method.getId());
}
@@ -381,7 +381,7 @@ public class ServerSession extends Session
{
task.doTask(this);
}
-
+
CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
}
@@ -430,7 +430,7 @@ public class ServerSession extends Session
public void unregister(Subscription_0_10 sub)
{
- _subscriptions.remove(sub.getConsumerTag().toString());
+ _subscriptions.remove(sub.getName());
try
{
sub.getSendLock();
@@ -559,7 +559,7 @@ public class ServerSession extends Session
public void commit()
{
_transaction.commit();
-
+
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -568,13 +568,13 @@ public class ServerSession extends Session
public void rollback()
{
_transaction.rollback();
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
-
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -584,7 +584,7 @@ public class ServerSession extends Session
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -625,7 +625,7 @@ public class ServerSession extends Session
{
return _txnCount.get();
}
-
+
public Principal getAuthorizedPrincipal()
{
return getConnection().getAuthorizedPrincipal();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index 7617544451..c568ae67aa 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.server.txn;
+import java.util.Collection;
+import java.util.List;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import java.util.Collection;
-import java.util.List;
-
/**
* The ServerTransaction interface allows a set enqueue/dequeue operations to be
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 2ef110641e..489b985222 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
@@ -96,6 +97,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
void removeBrokerConnection(BrokerLink brokerLink);
+ LinkRegistry getLinkRegistry(String remoteContainerId);
+
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
State getState();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 530be46d70..d4f3b11a8c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -47,6 +47,7 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -67,7 +68,8 @@ import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
import javax.management.JMException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
-
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -126,6 +128,8 @@ public class VirtualHostImpl implements VirtualHost
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+
public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
{
if (hostConfig == null)
@@ -673,6 +677,17 @@ public class VirtualHostImpl implements VirtualHost
}
}
+ public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
+ {
+ LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
+ if(linkRegistry == null)
+ {
+ linkRegistry = new LinkRegistry();
+ _linkRegistry.put(remoteContainerId, linkRegistry);
+ }
+ return linkRegistry;
+ }
+
public ConfigStore getConfigStore()
{
return getApplicationRegistry().getConfigStore();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index db37cc0965..96c67941f9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TestNetworkConnection;
@@ -239,12 +240,12 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
_channelDelivers.put(_channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getConsumerTag());
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(sub.getConsumerTag(), consumerDelivers);
+ consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
}
consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index d8b5cd02cf..6081be8efd 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -35,6 +35,7 @@ import java.util.Set;
public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
{
+
protected void setUp() throws Exception
{
super.setUp();
@@ -93,7 +94,20 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
(byte) 0,
(byte) 10
};
-
+
+
+ private static final byte[] AMQP_1_0_0_HEADER =
+ new byte[] {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ };
+
private byte[] getAmqpHeader(final AmqpProtocolVersion version)
{
switch(version)
@@ -106,6 +120,8 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
return AMQP_0_9_1_HEADER;
case v0_10:
return AMQP_0_10_HEADER;
+ case v1_0_0:
+ return AMQP_1_0_0_HEADER;
default:
fail("unknown AMQP version, appropriate header must be added for new protocol version");
return null;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java
index ba0a715001..0e2437978c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import javax.management.remote.JMXPrincipal;
import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.util.Collections;
@@ -254,6 +255,11 @@ public class RMIPasswordAuthenticatorTest extends TestCase
return new AuthenticationResult(AuthenticationStatus.CONTINUE);
}
}
+
+ public CallbackHandler getHandler(String mechanism)
+ {
+ return null;
+ }
};
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 1d6ccfbbc2..5ba9c0c015 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -277,6 +277,10 @@ public class MockSubscription implements Subscription
return false;
}
+ public void queueEmpty() throws AMQException
+ {
+ }
+
public void setActive(final boolean isActive)
{
_isActive = isActive;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index f27dc33dc3..91174c5d10 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
@@ -171,6 +172,11 @@ public class MockVirtualHost implements VirtualHost
}
+ public LinkRegistry getLinkRegistry(String remoteContainerId)
+ {
+ return null;
+ }
+
public ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask)
{
return null;