summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-15 15:37:04 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-15 15:37:04 +0000
commit43d9b8b9928c56ee200ae1a3323796f393dec0f7 (patch)
tree1e687035925c659aa321cff9d3ae9c37930c7a2b /qpid/java/broker
parentb89341f0b6382d09ef438bafe09235ca48ea5767 (diff)
downloadqpid-python-43d9b8b9928c56ee200ae1a3323796f393dec0f7.tar.gz
QPID-4659 : [Java Broker] move amqp 1-0 implementation into a plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503303 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/build.xml6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java140
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java125
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java343
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java108
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java245
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java67
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java569
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java262
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java80
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java80
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java427
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java462
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java100
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java71
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java306
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java64
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java685
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java619
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java718
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java195
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java28
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java3
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter2
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType1
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator2
30 files changed, 3 insertions, 5823 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index 3c83715305..d34414f44d 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -19,15 +19,15 @@
-
-->
<project name="AMQ Broker" default="build">
- <property name="module.depends" value="management/common common amqp-1-0-common"/>
+ <property name="module.depends" value="management/common common"/>
<property name="module.test.depends" value="common/tests" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<property name="module.genpom" value="true"/>
<!-- Add dependencies to the broker pom for the broker-plugins and bdbstore modules -->
- <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx broker-plugins/jdbc-store broker-plugins/derby-store"/>
+ <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx broker-plugins/jdbc-store broker-plugins/derby-store broker-plugins/amqp-1-0-protocol broker-plugins/amqp-msg-conv-0-8-to-1-0 broker-plugins/amqp-msg-conv-0-10-to-1-0"/>
<!-- Make them runtime dependencies, make bdbstore modules optional -->
- <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone -Sqpid-broker-plugins-jdbc-store=runtime -Sqpid-broker-plugins-derby-store=runtime"/>
+ <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone -Sqpid-broker-plugins-jdbc-store=runtime -Sqpid-broker-plugins-derby-store=runtime -Sqpid-broker-plugins-amqp-1-0-protocol=runtime -Sqpid-broker-plugins-amqp-msg-conv-0-8-to-1-0=runtime -Sqpid-broker-plugins-amqp-msg-conv-0-10-to-1-0=runtime"/>
<import file="../module.xml"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
deleted file mode 100644
index a70bd4b243..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedByte;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
-import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageProperties;
-
-public class MessageConverter_0_10_to_1_0 extends MessageConverter_to_1_0<MessageTransferMessage>
-{
- @Override
- public Class<MessageTransferMessage> getInputClass()
- {
- return MessageTransferMessage.class;
- }
-
-
- @Override
- protected MessageMetaData_1_0 convertMetaData(MessageTransferMessage serverMessage,
- SectionEncoder sectionEncoder)
- {
- List<Section> sections = new ArrayList<Section>(3);
- final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
- final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties();
-
- Header header = new Header();
- if(deliveryProps != null)
- {
- header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
- if(deliveryProps.hasPriority())
- {
- header.setPriority(UnsignedByte.valueOf((byte) deliveryProps.getPriority().getValue()));
- }
- if(deliveryProps.hasTtl())
- {
- header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl()));
- }
- sections.add(header);
- }
-
- Properties props = new Properties();
-
- /*
- TODO: the current properties are not currently set:
-
- absoluteExpiryTime
- creationTime
- groupId
- groupSequence
- replyToGroupId
- to
- */
-
- if(msgProps != null)
- {
- if(msgProps.hasContentEncoding())
- {
- props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding()));
- }
-
- if(msgProps.hasCorrelationId())
- {
- props.setCorrelationId(msgProps.getCorrelationId());
- }
-
- if(msgProps.hasMessageId())
- {
- props.setMessageId(msgProps.getMessageId());
- }
- if(msgProps.hasReplyTo())
- {
- props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey());
- }
- if(msgProps.hasContentType())
- {
- props.setContentType(Symbol.valueOf(msgProps.getContentType()));
-
- // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
- if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
- {
- props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
- }
- }
-
- props.setSubject(serverMessage.getRoutingKey());
-
- if(msgProps.hasUserId())
- {
- props.setUserId(new Binary(msgProps.getUserId()));
- }
-
- sections.add(props);
-
- if(msgProps.getApplicationHeaders() != null)
- {
- sections.add(new ApplicationProperties(msgProps.getApplicationHeaders()));
- }
- }
- return new MessageMetaData_1_0(sections, sectionEncoder);
- }
-
- @Override
- public String getType()
- {
- return "v0-10 to v1-0";
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
deleted file mode 100644
index 0d9d59ff56..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedByte;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
-import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
-
-public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
-{
- @Override
- public Class<AMQMessage> getInputClass()
- {
- return AMQMessage.class;
- }
-
- protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder)
- {
-
- List<Section> sections = new ArrayList<Section>(3);
-
- Header header = new Header();
-
- header.setDurable(serverMessage.isPersistent());
-
- BasicContentHeaderProperties contentHeader =
- (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
-
- header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
- final long expiration = serverMessage.getExpiration();
- final long arrivalTime = serverMessage.getArrivalTime();
-
- if(expiration > arrivalTime)
- {
- header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
- }
- sections.add(header);
-
-
- Properties props = new Properties();
-
- /*
- TODO: The following properties are not currently set:
-
- creationTime
- groupId
- groupSequence
- replyToGroupId
- to
- */
-
- props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
-
- props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
-
- // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
- if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
- {
- props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
- }
-
- final AMQShortString correlationId = contentHeader.getCorrelationId();
- if(correlationId != null)
- {
- props.setCorrelationId(new Binary(correlationId.getBytes()));
- }
-
- final AMQShortString messageId = contentHeader.getMessageId();
- if(messageId != null)
- {
- props.setMessageId(new Binary(messageId.getBytes()));
- }
- props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
-
- props.setSubject(serverMessage.getRoutingKey());
- if(contentHeader.getUserId() != null)
- {
- props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
- }
-
- sections.add(props);
-
- sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
-
- return new MessageMetaData_1_0(sections, sectionEncoder);
- }
-
- @Override
- public String getType()
- {
- return "v0-8 to v1-0";
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
deleted file mode 100644
index 320875cc97..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.text.MessageFormat;
-import java.util.Collection;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
-import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-
-public class Connection_1_0 implements ConnectionEventListener
-{
-
- private final Port _port;
- private VirtualHost _vhost;
- private final Transport _transport;
- private final ConnectionEndpoint _conn;
- private final long _connectionId;
- private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
-
-
- public static interface Task
- {
- public void doTask(Connection_1_0 connection);
- }
-
-
- private List<Task> _closeTasks =
- Collections.synchronizedList(new ArrayList<Task>());
-
-
-
- public Connection_1_0(VirtualHost virtualHost,
- ConnectionEndpoint conn,
- long connectionId,
- Port port,
- Transport transport)
- {
- _vhost = virtualHost;
- _port = port;
- _transport = transport;
- _conn = conn;
- _connectionId = connectionId;
- _vhost.getConnectionRegistry().registerConnection(_model);
-
- }
-
- public void remoteSessionCreation(SessionEndpoint endpoint)
- {
- Session_1_0 session = new Session_1_0(_vhost, this);
- _sessions.add(session);
- endpoint.setSessionEventListener(session);
- }
-
- void sessionEnded(Session_1_0 session)
- {
- _sessions.remove(session);
- }
-
- void removeConnectionCloseTask(final Task task)
- {
- _closeTasks.remove( task );
- }
-
- void addConnectionCloseTask(final Task task)
- {
- _closeTasks.add( task );
- }
-
- public void closeReceived()
- {
- List<Task> taskCopy;
- synchronized (_closeTasks)
- {
- taskCopy = new ArrayList<Task>(_closeTasks);
- }
- for(Task task : taskCopy)
- {
- task.doTask(this);
- }
- synchronized (_closeTasks)
- {
- _closeTasks.clear();
- }
- _vhost.getConnectionRegistry().deregisterConnection(_model);
-
-
- }
-
- public void closed()
- {
- closeReceived();
- }
-
- private final AMQConnectionModel _model = new AMQConnectionModel()
- {
- private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
- private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
- private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
- private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
-
- private final LogSubject _logSubject = new LogSubject()
- {
- @Override
- public String toLogString()
- {
- return "[" +
- MessageFormat.format(CONNECTION_FORMAT,
- getConnectionId(),
- getClientId(),
- getRemoteAddressString(),
- _vhost.getName())
- + "] ";
-
- }
- };
-
- private volatile boolean _stopped;
-
- @Override
- public void close(AMQConstant cause, String message) throws AMQException
- {
- _conn.close();
- }
-
- @Override
- public void block()
- {
- // TODO
- }
-
- @Override
- public void unblock()
- {
- // TODO
- }
-
- @Override
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
- {
- // TODO
- }
-
- @Override
- public long getConnectionId()
- {
- return _connectionId;
- }
-
- @Override
- public List<AMQSessionModel> getSessionModels()
- {
- return new ArrayList<AMQSessionModel>(_sessions);
- }
-
- @Override
- public LogSubject getLogSubject()
- {
- return _logSubject;
- }
-
- @Override
- public String getUserName()
- {
- return getPrincipalAsString();
- }
-
- @Override
- public boolean isSessionNameUnique(byte[] name)
- {
- return true; // TODO
- }
-
- @Override
- public String getRemoteAddressString()
- {
- return String.valueOf(_conn.getRemoteAddress());
- }
-
- @Override
- public String getClientId()
- {
- return _conn.getRemoteContainerId();
- }
-
- @Override
- public String getClientVersion()
- {
- return ""; //TODO
- }
-
- @Override
- public String getPrincipalAsString()
- {
- return String.valueOf(_conn.getUser());
- }
-
- @Override
- public long getSessionCountLimit()
- {
- return 0; // TODO
- }
-
- @Override
- public long getLastIoTime()
- {
- return 0; // TODO
- }
-
- @Override
- public String getVirtualHostName()
- {
- return _vhost == null ? null : _vhost.getName();
- }
-
- @Override
- public Port getPort()
- {
- return _port;
- }
-
- @Override
- public Transport getTransport()
- {
- return _transport;
- }
-
- @Override
- public void stop()
- {
- _stopped = true;
- }
-
- @Override
- public boolean isStopped()
- {
- return _stopped;
- }
-
- @Override
- public void initialiseStatistics()
- {
- _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
- _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId());
- _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId());
- _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId());
- }
-
- @Override
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- _messageReceiptStatistics.registerEvent(1L, timestamp);
- _dataReceiptStatistics.registerEvent(messageSize, timestamp);
- _vhost.registerMessageReceived(messageSize,timestamp);
-
- }
-
- @Override
- public void registerMessageDelivered(long messageSize)
- {
-
- _messageDeliveryStatistics.registerEvent(1L);
- _dataDeliveryStatistics.registerEvent(messageSize);
- _vhost.registerMessageDelivered(messageSize);
- }
-
- @Override
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messageDeliveryStatistics;
- }
-
- @Override
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messageReceiptStatistics;
- }
-
- @Override
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDeliveryStatistics;
- }
-
- @Override
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceiptStatistics;
- }
-
- @Override
- public void resetStatistics()
- {
- _dataDeliveryStatistics.reset();
- _dataReceiptStatistics.reset();
- _messageDeliveryStatistics.reset();
- _messageReceiptStatistics.reset();
- }
-
-
- };
-
- AMQConnectionModel getModel()
- {
- return _model;
- }
-
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
deleted file mode 100644
index d45758391c..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-
-public interface Destination
-{
-
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
deleted file mode 100644
index 2cef27267b..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.util.List;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.txn.ServerTransaction;
-
-public class ExchangeDestination implements ReceivingDestination, SendingDestination
-{
- private static final Accepted ACCEPTED = new Accepted();
- private static final Outcome[] OUTCOMES = { ACCEPTED };
-
- private Exchange _exchange;
- private TerminusDurability _durability;
- private TerminusExpiryPolicy _expiryPolicy;
-
- public ExchangeDestination(Exchange exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
- {
- _exchange = exchange;
- _durability = durable;
- _expiryPolicy = expiryPolicy;
- }
-
- public Outcome[] getOutcomes()
- {
- return OUTCOMES;
- }
-
- public Outcome send(final Message_1_0 message, ServerTransaction txn)
- {
- final List<? extends BaseQueue> queues = _exchange.route(message);
-
- txn.enqueue(queues,message, new ServerTransaction.Action()
- {
-
- BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
-
- public void postCommit()
- {
- for(int i = 0; i < _queues.length; i++)
- {
- try
- {
- _queues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- });
-
- return ACCEPTED;
- }
-
- TerminusDurability getDurability()
- {
- return _durability;
- }
-
- TerminusExpiryPolicy getExpiryPolicy()
- {
- return _expiryPolicy;
- }
-
- public int getCredit()
- {
- // TODO - fix
- return 20000;
- }
-
- public Exchange getExchange()
- {
- return _exchange;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
deleted file mode 100644
index 5ce24f406d..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.server.protocol.LinkModel;
-
-public interface Link_1_0 extends LinkModel
-{
- void start();
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
deleted file mode 100644
index be9b0323a3..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.io.EOFException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.typedmessage.TypedBytesContentReader;
-import org.apache.qpid.typedmessage.TypedBytesFormatException;
-
-public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0>
-{
- private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer();
-
- @Override
- public final Class<Message_1_0> getOutputClass()
- {
- return Message_1_0.class;
- }
-
- @Override
- public final Message_1_0 convert(M message, VirtualHost vhost)
- {
-
- SectionEncoder sectionEncoder = new SectionEncoderImpl(_typeRegistry);
- return new Message_1_0(convertToStoredMessage(message, sectionEncoder));
- }
-
-
- private StoredMessage<MessageMetaData_1_0> convertToStoredMessage(final M serverMessage, SectionEncoder sectionEncoder)
- {
- final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, sectionEncoder);
- return convertServerMessage(metaData, serverMessage, sectionEncoder);
- }
-
- abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage, SectionEncoder sectionEncoder);
-
-
- private static Section convertMessageBody(String mimeType, byte[] data)
- {
- if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
- {
- String text = new String(data);
- return new AmqpValue(text);
- }
- else if("jms/map-message".equals(mimeType))
- {
- TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
-
- LinkedHashMap map = new LinkedHashMap();
- final int entries = reader.readIntImpl();
- for (int i = 0; i < entries; i++)
- {
- try
- {
- String propName = reader.readStringImpl();
- Object value = reader.readObject();
- map.put(propName, value);
- }
- catch (EOFException e)
- {
- throw new IllegalArgumentException(e);
- }
- catch (TypedBytesFormatException e)
- {
- throw new IllegalArgumentException(e);
- }
-
- }
-
- return new AmqpValue(map);
-
- }
- else if("amqp/map".equals(mimeType))
- {
- BBDecoder decoder = new BBDecoder();
- decoder.init(ByteBuffer.wrap(data));
- return new AmqpValue(decoder.readMap());
-
- }
- else if("amqp/list".equals(mimeType))
- {
- BBDecoder decoder = new BBDecoder();
- decoder.init(ByteBuffer.wrap(data));
- return new AmqpValue(decoder.readList());
- }
- else if("jms/stream-message".equals(mimeType))
- {
- TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
-
- List list = new ArrayList();
- while (reader.remaining() != 0)
- {
- try
- {
- list.add(reader.readObject());
- }
- catch (TypedBytesFormatException e)
- {
- throw new RuntimeException(e); // TODO - Implement
- }
- catch (EOFException e)
- {
- throw new RuntimeException(e); // TODO - Implement
- }
- }
- return new AmqpValue(list);
- }
- else
- {
- return new Data(new Binary(data));
-
- }
- }
-
- private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
- final ServerMessage serverMessage,
- SectionEncoder sectionEncoder)
- {
- final String mimeType = serverMessage.getMessageHeader().getMimeType();
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data), 0);
-
- Section bodySection = convertMessageBody(mimeType, data);
-
- final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
-
- return new StoredMessage<MessageMetaData_1_0>()
- {
- @Override
- public MessageMetaData_1_0 getMetaData()
- {
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return serverMessage.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- int size;
- if(dst.remaining()<buf.remaining())
- {
- buf.limit(dst.remaining());
- size = dst.remaining();
- }
- else
- {
- size = buf.remaining();
- }
- dst.put(buf);
- return size;
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- if(size < buf.remaining())
- {
- buf.limit(size);
- }
- return buf;
- }
-
- @Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void remove()
- {
- serverMessage.getStoredMessage().remove();
- }
- };
- }
-
- private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
- {
- int headerSize = (int) metaData.getStorableSize();
-
- sectionEncoder.reset();
- sectionEncoder.encodeObject(bodySection);
- Binary dataEncoding = sectionEncoder.getEncoding();
-
- final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
- metaData.writeToBuffer(0,allData);
- allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
- return allData;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
deleted file mode 100644
index 44b1de74e1..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.nio.ByteBuffer;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.store.StoredMessage;
-
-public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaData_1_0>
-{
-
- public static final int TYPE = 2;
-
- @Override
- public int ordinal()
- {
- return TYPE;
- }
-
- @Override
- public MessageMetaData_1_0 createMetaData(ByteBuffer buf)
- {
- return MessageMetaData_1_0.FACTORY.createMetaData(buf);
- }
-
- @Override
- public ServerMessage<MessageMetaData_1_0> createMessage(StoredMessage<MessageMetaData_1_0> msg)
- {
- return new Message_1_0(msg);
- }
-
- public int hashCode()
- {
- return ordinal();
- }
-
- public boolean equals(Object o)
- {
- return o != null && o.getClass() == getClass();
- }
-
- @Override
- public String getType()
- {
- return AmqpProtocolVersion.v1_0_0.toString();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
deleted file mode 100755
index 8d48d70d9a..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ /dev/null
@@ -1,569 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations;
-import org.apache.qpid.amqp_1_0.type.messaging.Footer;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-
-public class MessageMetaData_1_0 implements StorableMessageMetaData
-{
- // TODO move to somewhere more useful
- public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
- public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
- private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0();
-
-
- private Header _header;
- private Properties _properties;
- private Map _deliveryAnnotations;
- private Map _messageAnnotations;
- private Map _appProperties;
- private Map _footer;
-
- private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3);
-
- private volatile ByteBuffer _encoded;
- private MessageHeader_1_0 _messageHeader;
-
-
- public MessageMetaData_1_0(List<Section> sections, SectionEncoder encoder)
- {
- this(sections, encodeSections(sections, encoder));
- }
-
- private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
- {
- ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size());
- for(Section section : sections)
- {
- encoder.encodeObject(section);
- encodedSections.add(encoder.getEncoding().asByteBuffer());
- encoder.reset();
- }
- return encodedSections;
- }
-
- public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder)
- {
- this(fragments, decoder, new ArrayList<ByteBuffer>(3));
- }
-
- public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immuatableSections)
- {
- this(constructSections(fragments, decoder,immuatableSections), immuatableSections);
- }
-
- private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections)
- {
- _encodedSections = encodedSections;
-
- Iterator<Section> sectIter = sections.iterator();
-
- Section section = sectIter.hasNext() ? sectIter.next() : null;
- if(section instanceof Header)
- {
- _header = (Header) section;
- section = sectIter.hasNext() ? sectIter.next() : null;
- }
-
- if(section instanceof DeliveryAnnotations)
- {
- _deliveryAnnotations = ((DeliveryAnnotations) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
- }
-
- if(section instanceof MessageAnnotations)
- {
- _messageAnnotations = ((MessageAnnotations) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
- }
-
- if(section instanceof Properties)
- {
- _properties = (Properties) section;
- section = sectIter.hasNext() ? sectIter.next() : null;
- }
-
- if(section instanceof ApplicationProperties)
- {
- _appProperties = ((ApplicationProperties) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
- }
-
- if(section instanceof Footer)
- {
- _footer = ((Footer) section).getValue();
- section = sectIter.hasNext() ? sectIter.next() : null;
- }
-
- _messageHeader = new MessageHeader_1_0();
-
- }
-
- private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections)
- {
- List<Section> sections = new ArrayList<Section>(3);
-
- ByteBuffer src;
- if(fragments.length == 1)
- {
- src = fragments[0].duplicate();
- }
- else
- {
- int size = 0;
- for(ByteBuffer buf : fragments)
- {
- size += buf.remaining();
- }
- src = ByteBuffer.allocate(size);
- for(ByteBuffer buf : fragments)
- {
- src.put(buf.duplicate());
- }
- src.flip();
-
- }
-
- try
- {
- int startBarePos = -1;
- int lastPos = src.position();
- Section s = decoder.readSection(src);
-
-
-
- if(s instanceof Header)
- {
- sections.add(s);
- lastPos = src.position();
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- }
-
- if(s instanceof DeliveryAnnotations)
- {
- sections.add(s);
- lastPos = src.position();
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- }
-
- if(s instanceof MessageAnnotations)
- {
- sections.add(s);
- lastPos = src.position();
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- }
-
- if(s instanceof Properties)
- {
- sections.add(s);
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- }
-
- if(s instanceof ApplicationProperties)
- {
- sections.add(s);
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- }
-
- if(s instanceof AmqpValue)
- {
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- }
- else if(s instanceof Data)
- {
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- do
- {
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- } while(s instanceof Data);
- }
- else if(s instanceof AmqpSequence)
- {
- if(startBarePos == -1)
- {
- startBarePos = lastPos;
- }
- do
- {
- s = src.hasRemaining() ? decoder.readSection(src) : null;
- }
- while(s instanceof AmqpSequence);
- }
-
- if(s instanceof Footer)
- {
- sections.add(s);
- }
-
-
- int pos = 0;
- for(ByteBuffer buf : fragments)
- {
-/*
- if(pos < startBarePos)
- {
- if(pos + buf.remaining() > startBarePos)
- {
- ByteBuffer dup = buf.duplicate();
- dup.position(dup.position()+startBarePos-pos);
- dup.slice();
- encodedSections.add(dup);
- }
- }
- else
-*/
- {
- encodedSections.add(buf.duplicate());
- }
- pos += buf.remaining();
- }
-
- return sections;
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- throw new IllegalArgumentException(e);
- }
- }
-
-
- public MessageMetaDataType getType()
- {
- return TYPE;
- }
-
-
- public int getStorableSize()
- {
- int size = 0;
-
- for(ByteBuffer bin : _encodedSections)
- {
- size += bin.limit();
- }
-
- return size;
- }
-
- private ByteBuffer encodeAsBuffer()
- {
- ByteBuffer buf = ByteBuffer.allocate(getStorableSize());
-
- for(ByteBuffer bin : _encodedSections)
- {
- buf.put(bin.duplicate());
- }
-
- return buf;
- }
-
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
- {
- ByteBuffer buf = _encoded;
-
- if(buf == null)
- {
- buf = encodeAsBuffer();
- _encoded = buf;
- }
-
- buf = buf.duplicate();
-
- buf.position(offsetInMetaData);
-
- if(dest.remaining() < buf.limit())
- {
- buf.limit(dest.remaining());
- }
- dest.put(buf);
- return buf.limit();
- }
-
- public int getContentSize()
- {
- ByteBuffer buf = _encoded;
-
- if(buf == null)
- {
- buf = encodeAsBuffer();
- _encoded = buf;
- }
- return buf.remaining();
- }
-
- public boolean isPersistent()
- {
- return _header != null && Boolean.TRUE.equals(_header.getDurable());
- }
-
- public MessageHeader_1_0 getMessageHeader()
- {
- return _messageHeader;
- }
-
-
-
-
- private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0>
- {
- private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance();
-
- private MetaDataFactory()
- {
- _typeRegistry.registerTransportLayer();
- _typeRegistry.registerMessagingLayer();
- _typeRegistry.registerTransactionLayer();
- _typeRegistry.registerSecurityLayer();
- }
-
- public MessageMetaData_1_0 createMetaData(ByteBuffer buf)
- {
- ValueHandler valueHandler = new ValueHandler(_typeRegistry);
-
- ArrayList<Section> sections = new ArrayList<Section>(3);
- ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3);
-
- while(buf.hasRemaining())
- {
- try
- {
- ByteBuffer encodedBuf = buf.duplicate();
- Object parse = valueHandler.parse(buf);
- sections.add((Section) parse);
- encodedBuf.limit(buf.position());
- encodedSections.add(encodedBuf);
-
- }
- catch (AmqpErrorException e)
- {
- //TODO
- throw new RuntimeException(e);
- }
-
- }
-
- return new MessageMetaData_1_0(sections,encodedSections);
-
- }
- }
-
- public class MessageHeader_1_0 implements AMQMessageHeader
- {
-
- public String getCorrelationId()
- {
- if(_properties == null || _properties.getCorrelationId() == null)
- {
- return null;
- }
- else
- {
- return _properties.getMessageId().toString();
- }
- }
-
- public long getExpiration()
- {
- return 0; //TODO
- }
-
- public String getMessageId()
- {
- if(_properties == null || _properties.getCorrelationId() == null)
- {
- return null;
- }
- else
- {
- return _properties.getCorrelationId().toString();
- }
- }
-
- public String getMimeType()
- {
-
- if(_properties == null || _properties.getContentType() == null)
- {
- return null;
- }
- else
- {
- return _properties.getContentType().toString();
- }
- }
-
- public String getEncoding()
- {
- return null; //TODO
- }
-
- public byte getPriority()
- {
- if(_header == null || _header.getPriority() == null)
- {
- return 4; //javax.jms.Message.DEFAULT_PRIORITY;
- }
- else
- {
- return _header.getPriority().byteValue();
- }
- }
-
- public long getTimestamp()
- {
- if(_properties == null || _properties.getCreationTime() == null)
- {
- return 0L;
- }
- else
- {
- return _properties.getCreationTime().getTime();
- }
-
- }
-
- public String getType()
- {
-
- if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null)
- {
- return null;
- }
- else
- {
- return _messageAnnotations.get(JMS_TYPE).toString();
- }
- }
-
- public String getReplyTo()
- {
- if(_properties == null || _properties.getReplyTo() == null)
- {
- return null;
- }
- else
- {
- return _properties.getReplyTo().toString();
- }
- }
-
- public String getReplyToExchange()
- {
- return null; //TODO
- }
-
- public String getReplyToRoutingKey()
- {
- return null; //TODO
- }
-
- public String getAppId()
- {
- //TODO
- return null;
- }
-
- public String getUserId()
- {
- // TODO
- return null;
- }
-
- public Object getHeader(final String name)
- {
- return _appProperties == null ? null : _appProperties.get(name);
- }
-
- public boolean containsHeaders(final Set<String> names)
- {
- if(_appProperties == null)
- {
- return false;
- }
-
- for(String key : names)
- {
- if(!_appProperties.containsKey(key))
- {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Collection<String> getHeaderNames()
- {
- if(_appProperties == null)
- {
- return Collections.emptySet();
- }
- return Collections.unmodifiableCollection(_appProperties.keySet());
- }
-
- public boolean containsHeader(final String name)
- {
- return _appProperties != null && _appProperties.containsKey(name);
- }
-
- public String getSubject()
- {
- return _properties == null ? null : _properties.getSubject();
- }
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
deleted file mode 100644
index 9dc063e3ea..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.store.StoredMessage;
-
-public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage
-{
-
-
- private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
- AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
-
- private volatile int _referenceCount = 0;
-
- private final StoredMessage<MessageMetaData_1_0> _storedMessage;
- private List<ByteBuffer> _fragments;
- private WeakReference<Session_1_0> _session;
- private long _arrivalTime;
-
-
- public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
- {
- _storedMessage = storedMessage;
- _session = null;
- _fragments = restoreFragments(storedMessage);
- }
-
- private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
- {
- ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
- final int FRAGMENT_SIZE = 2048;
- int offset = 0;
- ByteBuffer b;
- do
- {
-
- b = storedMessage.getContent(offset,FRAGMENT_SIZE);
- if(b.hasRemaining())
- {
- fragments.add(b);
- offset+= b.remaining();
- }
- }
- while(b.hasRemaining());
- return fragments;
- }
-
- public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
- final List<ByteBuffer> fragments,
- final Session_1_0 session)
- {
- _storedMessage = storedMessage;
- _fragments = fragments;
- _session = new WeakReference<Session_1_0>(session);
- _arrivalTime = System.currentTimeMillis();
- }
-
- public String getRoutingKey()
- {
- Object routingKey = getMessageHeader().getHeader("routing-key");
- if(routingKey != null)
- {
- return routingKey.toString();
- }
- else
- {
- return getMessageHeader().getSubject();
- }
- }
-
- public AMQShortString getRoutingKeyShortString()
- {
- return AMQShortString.valueOf(getRoutingKey());
- }
-
- private MessageMetaData_1_0 getMessageMetaData()
- {
- return _storedMessage.getMetaData();
- }
-
- public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
- {
- return getMessageMetaData().getMessageHeader();
- }
-
- public StoredMessage getStoredMessage()
- {
- return _storedMessage;
- }
-
- public boolean isPersistent()
- {
- return getMessageMetaData().isPersistent();
- }
-
- public boolean isRedelivered()
- {
- // TODO
- return false;
- }
-
- public long getSize()
- {
- long size = 0l;
- if(_fragments != null)
- {
- for(ByteBuffer buf : _fragments)
- {
- size += buf.remaining();
- }
- }
-
- return size;
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public long getExpiration()
- {
- return getMessageHeader().getExpiration();
- }
-
- public MessageReference<Message_1_0> newReference()
- {
- return new Reference(this);
- }
-
- public long getMessageNumber()
- {
- return _storedMessage.getMessageNumber();
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
- public int getContent(final ByteBuffer buf, final int offset)
- {
- return _storedMessage.getContent(offset, buf);
- }
-
- public ByteBuffer getContent(int offset, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- buf.limit(getContent(buf, offset));
-
- return buf;
- }
-
- public List<ByteBuffer> getFragments()
- {
- return _fragments;
- }
-
- public Session_1_0 getSession()
- {
- return _session == null ? null : _session.get();
- }
-
-
- public boolean incrementReference()
- {
- if(_refCountUpdater.incrementAndGet(this) <= 0)
- {
- _refCountUpdater.decrementAndGet(this);
- return false;
- }
- else
- {
- return true;
- }
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- */
-
- public void decrementReference()
- {
- int count = _refCountUpdater.decrementAndGet(this);
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _refCountUpdater.set(this,Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_storedMessage != null)
- {
- _storedMessage.remove();
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + getMessageNumber()
- + " has gone below 0.");
- }
- }
- }
-
- public static class Reference extends MessageReference<Message_1_0>
- {
- public Reference(Message_1_0 message)
- {
- super(message);
- }
-
- protected void onReference(Message_1_0 message)
- {
- message.incrementReference();
- }
-
- protected void onRelease(Message_1_0 message)
- {
- message.decrementReference();
- }
-
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
deleted file mode 100644
index c06af603de..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class ProtocolEngineCreator_1_0_0 implements ProtocolEngineCreator
-{
- private static final byte[] AMQP_1_0_0_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- };
-
- public ProtocolEngineCreator_1_0_0()
- {
- }
-
- public AmqpProtocolVersion getVersion()
- {
- return AmqpProtocolVersion.v1_0_0;
- }
-
-
- public byte[] getHeaderIdentifier()
- {
- return AMQP_1_0_0_HEADER;
- }
-
- public ServerProtocolEngine newProtocolEngine(Broker broker,
- NetworkConnection network,
- Port port,
- Transport transport,
- long id)
- {
- return new ProtocolEngine_1_0_0(network, broker, id, port, transport);
- }
-
- private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_1_0_0();
-
- public static ProtocolEngineCreator getInstance()
- {
- return INSTANCE;
- }
-
- @Override
- public String getType()
- {
- return getVersion().toString() + "_NO_SASL";
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
deleted file mode 100644
index d3936782da..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator
-{
- private static final byte[] AMQP_SASL_1_0_0_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 3,
- (byte) 1,
- (byte) 0,
- (byte) 0
- };
-
- public ProtocolEngineCreator_1_0_0_SASL()
- {
- }
-
- public AmqpProtocolVersion getVersion()
- {
- return AmqpProtocolVersion.v1_0_0;
- }
-
-
- public byte[] getHeaderIdentifier()
- {
- return AMQP_SASL_1_0_0_HEADER;
- }
-
- public ServerProtocolEngine newProtocolEngine(Broker broker,
- NetworkConnection network,
- Port port,
- Transport transport,
- long id)
- {
- return new ProtocolEngine_1_0_0_SASL(network, broker, id, port, transport);
- }
-
- private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_1_0_0_SASL();
-
- public static ProtocolEngineCreator getInstance()
- {
- return INSTANCE;
- }
-
- @Override
- public String getType()
- {
- return getVersion().toString();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
deleted file mode 100755
index 1bddda2f38..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.FrameHandler;
-import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler
-{
- static final AtomicLong _connectionIdSource = new AtomicLong(0L);
- private final Port _port;
- private final Transport _transport;
-
- //private NetworkConnection _networkDriver;
- private long _readBytes;
- private long _writtenBytes;
- private long _lastReadTime;
- private long _lastWriteTime;
- private final Broker _broker;
- private long _createTime = System.currentTimeMillis();
- private ConnectionEndpoint _conn;
- private final long _connectionId;
-
- private static final ByteBuffer HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
- private FrameWriter _frameWriter;
- private FrameHandler _frameHandler;
- private Object _sendLock = new Object();
- private byte _major;
- private byte _minor;
- private byte _revision;
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
-
-
- static enum State {
- A,
- M,
- Q,
- P,
- PROTOCOL,
- MAJOR,
- MINOR,
- REVISION,
- FRAME
- }
-
- private State _state = State.A;
-
-
-
- public ProtocolEngine_1_0_0(final NetworkConnection networkDriver,
- final Broker broker,
- long id,
- Port port,
- Transport transport)
- {
- _broker = broker;
- _port = port;
- _transport = transport;
- _connectionId = id;
- if(networkDriver != null)
- {
- setNetworkConnection(networkDriver, networkDriver.getSender());
- }
- }
-
-
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- public long getWrittenBytes()
- {
- return _writtenBytes;
- }
-
- public void writerIdle()
- {
- //Todo
- }
-
- public void readerIdle()
- {
- //Todo
- }
-
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
- {
- _network = network;
- _sender = sender;
-
- Container container = new Container(_broker.getId().toString());
-
- VirtualHost virtualHost = _broker.getVirtualHostRegistry().getVirtualHost((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST));
-
- _conn = new ConnectionEndpoint(container, asSaslServerProvider(_broker.getSubjectCreator(
- getLocalAddress())));
-
- Map<Symbol,Object> serverProperties = new LinkedHashMap<Symbol, Object>();
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), QpidProperties.getProductName());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), QpidProperties.getReleaseVersion());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), QpidProperties.getBuildVersion());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), _broker.getName());
-
- _conn.setProperties(serverProperties);
-
- _conn.setRemoteAddress(_network.getRemoteAddress());
- _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport));
- _conn.setFrameOutputHandler(this);
-
- _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
- _frameHandler = new FrameHandler(_conn);
-
- _sender.send(HEADER.duplicate());
- _sender.flush();
- }
-
- private SaslServerProvider asSaslServerProvider(final SubjectCreator subjectCreator)
- {
- return new SaslServerProvider()
- {
- @Override
- public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
- {
- return subjectCreator.createSaslServer(mechanism, fqdn, null);
- }
-
- @Override
- public Principal getAuthenticatedPrincipal(SaslServer server)
- {
- return new UsernamePrincipal(server.getAuthorizationID());
- }
- };
- }
-
- public String getAddress()
- {
- return getRemoteAddress().toString();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- public synchronized void received(ByteBuffer msg)
- {
- _lastReadTime = System.currentTimeMillis();
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- ByteBuffer dup = msg.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
- }
- _readBytes += msg.remaining();
- switch(_state)
- {
- case A:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- break;
- }
- case M:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.M;
- break;
- }
-
- case Q:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.Q;
- break;
- }
- case P:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.P;
- break;
- }
- case PROTOCOL:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.PROTOCOL;
- break;
- }
- case MAJOR:
- if(msg.hasRemaining())
- {
- _major = msg.get();
- }
- else
- {
- _state = State.MAJOR;
- break;
- }
- case MINOR:
- if(msg.hasRemaining())
- {
- _minor = msg.get();
- }
- else
- {
- _state = State.MINOR;
- break;
- }
- case REVISION:
- if(msg.hasRemaining())
- {
- _revision = msg.get();
-
- _state = State.FRAME;
- }
- else
- {
- _state = State.REVISION;
- break;
- }
- case FRAME:
- if(msg.hasRemaining())
- {
- _frameHandler.parse(msg);
- }
- }
-
- }
-
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
-
- public void closed()
- {
- _conn.inputClosed();
- if(_conn != null && _conn.getConnectionEventListener() != null)
- {
- ((Connection_1_0)_conn.getConnectionEventListener()).closed();
- }
-
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
-
- public boolean canSend()
- {
- return true;
- }
-
- public void send(final AMQFrame amqFrame)
- {
- send(amqFrame, null);
- }
-
- private final Logger FRAME_LOGGER = Logger.getLogger("FRM");
- private final Logger RAW_LOGGER = Logger.getLogger("RAW");
-
-
- public void send(final AMQFrame amqFrame, ByteBuffer buf)
- {
- synchronized(_sendLock)
- {
-
- _lastWriteTime = System.currentTimeMillis();
- if(FRAME_LOGGER.isLoggable(Level.FINE))
- {
- FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
- }
-
-
- _frameWriter.setValue(amqFrame);
-
-
-
- ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
-
- int size = _frameWriter.writeToBuffer(dup);
- if(size > _conn.getMaxFrameSize())
- {
- throw new OversizeFrameException(amqFrame,size);
- }
-
- dup.flip();
- _writtenBytes += dup.limit();
-
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- ByteBuffer dup2 = dup.duplicate();
- byte[] data = new byte[dup2.remaining()];
- dup2.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
- }
-
-
- _sender.send(dup);
- _sender.flush();
-
- }
- }
-
- public void send(short channel, FrameBody body)
- {
- AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
- send(frame);
-
- }
-
- public void close()
- {
- //TODO
- }
-
- public long getConnectionId()
- {
- return _connectionId;
- }
-
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
- public long getLastWriteTime()
- {
- return _lastWriteTime;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
deleted file mode 100644
index d614f44981..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
-import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
-{
- private final Port _port;
- private final Transport _transport;
- private long _readBytes;
- private long _writtenBytes;
-
- private long _lastReadTime;
- private long _lastWriteTime;
- private final Broker _broker;
- private long _createTime = System.currentTimeMillis();
- private ConnectionEndpoint _conn;
- private long _connectionId;
-
- private static final ByteBuffer HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 3,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
- private static final ByteBuffer PROTOCOL_HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
-
- private FrameWriter _frameWriter;
- private ProtocolHandler _frameHandler;
- private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
- private Object _sendLock = new Object();
- private byte _major;
- private byte _minor;
- private byte _revision;
- private PrintWriter _out;
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
-
-
- static enum State {
- A,
- M,
- Q,
- P,
- PROTOCOL,
- MAJOR,
- MINOR,
- REVISION,
- FRAME
- }
-
- private State _state = State.A;
-
-
- public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker,
- long id, Port port, Transport transport)
- {
- _connectionId = id;
- _broker = broker;
- _port = port;
- _transport = transport;
- if(networkDriver != null)
- {
- setNetworkConnection(networkDriver, networkDriver.getSender());
- }
- }
-
-
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- public long getWrittenBytes()
- {
- return _writtenBytes;
- }
-
- public void writerIdle()
- {
- //Todo
- }
-
- public void readerIdle()
- {
- //Todo
- }
-
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
- {
- _network = network;
- _sender = sender;
-
- Container container = new Container(_broker.getId().toString());
-
- VirtualHost virtualHost = _broker.getVirtualHostRegistry().getVirtualHost((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST));
- SubjectCreator subjectCreator = _broker.getSubjectCreator(getLocalAddress());
- _conn = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator));
-
- Map<Symbol,Object> serverProperties = new LinkedHashMap<Symbol, Object>();
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), QpidProperties.getProductName());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), QpidProperties.getReleaseVersion());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), QpidProperties.getBuildVersion());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), _broker.getName());
-
- _conn.setProperties(serverProperties);
-
- _conn.setRemoteAddress(getRemoteAddress());
- _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport));
- _conn.setFrameOutputHandler(this);
- _conn.setSaslFrameOutput(this);
-
- _conn.setOnSaslComplete(new Runnable()
- {
- public void run()
- {
- if(_conn.isAuthenticated())
- {
- _sender.send(PROTOCOL_HEADER.duplicate());
- _sender.flush();
- }
- else
- {
- _network.close();
- }
- }
- });
- _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
- _frameHandler = new SASLFrameHandler(_conn);
-
- _sender.send(HEADER.duplicate());
- _sender.flush();
-
- _conn.initiateSASL(subjectCreator.getMechanisms().split(" "));
-
-
- }
-
- private SaslServerProvider asSaslServerProvider(final SubjectCreator subjectCreator)
- {
- return new SaslServerProvider()
- {
- @Override
- public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
- {
- return subjectCreator.createSaslServer(mechanism, fqdn, _network.getPeerPrincipal());
- }
-
- @Override
- public Principal getAuthenticatedPrincipal(SaslServer server)
- {
- return new UsernamePrincipal(server.getAuthorizationID());
- }
- };
- }
-
- public String getAddress()
- {
- return getRemoteAddress().toString();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- private final Logger RAW_LOGGER = Logger.getLogger("RAW");
-
-
- public synchronized void received(ByteBuffer msg)
- {
- _lastReadTime = System.currentTimeMillis();
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- ByteBuffer dup = msg.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
- }
- _readBytes += msg.remaining();
- switch(_state)
- {
- case A:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- break;
- }
- case M:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.M;
- break;
- }
-
- case Q:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.Q;
- break;
- }
- case P:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.P;
- break;
- }
- case PROTOCOL:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.PROTOCOL;
- break;
- }
- case MAJOR:
- if(msg.hasRemaining())
- {
- _major = msg.get();
- }
- else
- {
- _state = State.MAJOR;
- break;
- }
- case MINOR:
- if(msg.hasRemaining())
- {
- _minor = msg.get();
- }
- else
- {
- _state = State.MINOR;
- break;
- }
- case REVISION:
- if(msg.hasRemaining())
- {
- _revision = msg.get();
-
- _state = State.FRAME;
- }
- else
- {
- _state = State.REVISION;
- break;
- }
- case FRAME:
- if(msg.hasRemaining())
- {
- _frameHandler = _frameHandler.parse(msg);
- }
- }
-
- }
-
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
-
- public void closed()
- {
- // todo
- _conn.inputClosed();
- if (_conn != null && _conn.getConnectionEventListener() != null)
- {
- ((Connection_1_0) _conn.getConnectionEventListener()).closed();
- }
-
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
-
- public boolean canSend()
- {
- return true;
- }
-
- public void send(final AMQFrame amqFrame)
- {
- send(amqFrame, null);
- }
-
- private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
-
-
- public void send(final AMQFrame amqFrame, ByteBuffer buf)
- {
-
- synchronized (_sendLock)
- {
- _lastWriteTime = System.currentTimeMillis();
- if (FRAME_LOGGER.isLoggable(Level.FINE))
- {
- FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
- }
-
- _frameWriter.setValue(amqFrame);
-
- ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
-
- int size = _frameWriter.writeToBuffer(dup);
- if (size > _conn.getMaxFrameSize())
- {
- throw new OversizeFrameException(amqFrame, size);
- }
-
- dup.flip();
- _writtenBytes += dup.limit();
-
- if (RAW_LOGGER.isLoggable(Level.FINE))
- {
- ByteBuffer dup2 = dup.duplicate();
- byte[] data = new byte[dup2.remaining()];
- dup2.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
- }
-
- _sender.send(dup);
- _sender.flush();
-
-
- }
- }
-
- public void send(short channel, FrameBody body)
- {
- AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
- send(frame);
-
- }
-
- public void close()
- {
- _sender.close();
- }
-
- public void setLogOutput(final PrintWriter out)
- {
- _out = out;
- }
-
- public long getConnectionId()
- {
- return _connectionId;
- }
-
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
- public long getLastWriteTime()
- {
- return _lastWriteTime;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
deleted file mode 100644
index af3f0b7872..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQQueue;
-
-import org.apache.qpid.server.txn.ServerTransaction;
-
-import java.util.Arrays;
-
-public class QueueDestination implements SendingDestination, ReceivingDestination
-{
- private static final Accepted ACCEPTED = new Accepted();
- private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
-
-
- private AMQQueue _queue;
-
- public QueueDestination(AMQQueue queue)
- {
- _queue = queue;
- }
-
- public Outcome[] getOutcomes()
- {
- return OUTCOMES;
- }
-
- public Outcome send(final Message_1_0 message, ServerTransaction txn)
- {
-
- try
- {
- txn.enqueue(_queue,message, new ServerTransaction.Action()
- {
-
-
- public void postCommit()
- {
- try
- {
-
- _queue.enqueue(message);
- }
- catch (Exception e)
- {
- // TODO
- throw new RuntimeException(e);
- }
-
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- });
- }
- catch(Exception e)
- {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- return ACCEPTED;
- }
-
- public int getCredit()
- {
- // TODO - fix
- return 100;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
deleted file mode 100644
index 4ae0596e25..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.amqp_1_0.type.Outcome;
-
-import org.apache.qpid.server.txn.ServerTransaction;
-
-public interface ReceivingDestination extends Destination
-{
-
- Outcome[] getOutcomes();
-
- Outcome send(Message_1_0 message, ServerTransaction txn);
-
- int getCredit();
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
deleted file mode 100644
index 46b9682c74..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-
-public class ReceivingLinkAttachment
-{
- private final Session_1_0 _session;
- private final ReceivingLinkEndpoint _endpoint;
-
- public ReceivingLinkAttachment(final Session_1_0 session, final ReceivingLinkEndpoint endpoint)
- {
- _session = session;
- _endpoint = endpoint;
- }
-
- public Session_1_0 getSession()
- {
- return _session;
- }
-
- public ReceivingLinkEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public Source getSource()
- {
- return getEndpoint().getSource();
- }
-
- public void setDeliveryStateHandler(final DeliveryStateHandler handler)
- {
- getEndpoint().setDeliveryStateHandler(handler);
- }
-
- public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
- {
- getEndpoint().updateDisposition(deliveryTag, state, settled);
- }
-
- public Target getTarget()
- {
- return getEndpoint().getTarget();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
deleted file mode 100644
index e971672767..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.Detach;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, DeliveryStateHandler
-{
- private VirtualHost _vhost;
-
- private ReceivingDestination _destination;
- private SectionDecoderImpl _sectionDecoder;
- private volatile ReceivingLinkAttachment _attachment;
-
-
- private ArrayList<Transfer> _incompleteMessage;
- private TerminusDurability _durability;
-
- private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
- private boolean _resumedMessage;
- private Binary _messageDeliveryTag;
- private ReceiverSettleMode _receivingSettlementMode;
-
-
- public ReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, VirtualHost vhost,
- ReceivingDestination destination)
- {
- _vhost = vhost;
- _destination = destination;
- _attachment = receivingLinkAttachment;
- receivingLinkAttachment.setDeliveryStateHandler(this);
-
- _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
-
- _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());
-
-
- }
-
- public void messageTransfer(Transfer xfr)
- {
- // TODO - cope with fragmented messages
-
- List<ByteBuffer> fragments = null;
-
-
-
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<Transfer>();
- _incompleteMessage.add(xfr);
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = xfr.getDeliveryTag();
- return;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
-
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return;
- }
-
- fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size());
- for(Transfer t : _incompleteMessage)
- {
- fragments.add(t.getPayload());
- }
- _incompleteMessage=null;
-
- }
- else
- {
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = xfr.getDeliveryTag();
- fragments = Collections.singletonList(xfr.getPayload());
- }
-
- if(_resumedMessage)
- {
- if(_unsettledMap.containsKey(_messageDeliveryTag))
- {
- Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
- boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
- getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
- if(settled)
- {
- _unsettledMap.remove(_messageDeliveryTag);
- }
- }
- else
- {
- System.err.println("UNEXPECTED!!");
- System.err.println("Delivery Tag: " + _messageDeliveryTag);
- System.err.println("_unsettledMap: " + _unsettledMap);
-
- }
- }
- else
- {
- MessageMetaData_1_0 mmd = null;
- List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3);
- mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]),
- _sectionDecoder,
- immutableSections);
-
- StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd);
-
- boolean skipping = true;
- int offset = 0;
-
- for(ByteBuffer bareMessageBuf : immutableSections)
- {
- storedMessage.addContent(offset, bareMessageBuf.duplicate());
- offset += bareMessageBuf.remaining();
- }
-
- storedMessage.flushToStore();
-
- Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession());
-
-
- Binary transactionId = null;
- org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
- if(xfrState != null)
- {
- if(xfrState instanceof TransactionalState)
- {
- transactionId = ((TransactionalState)xfrState).getTxnId();
- }
- }
-
- ServerTransaction transaction = null;
- if(transactionId != null)
- {
- transaction = getSession().getTransaction(transactionId);
- }
- else
- {
- Session_1_0 session = getSession();
- transaction = session != null ? session.getTransaction(null) : new AutoCommitTransaction(_vhost.getMessageStore());
- }
-
- Outcome outcome = _destination.send(message, transaction);
-
- DeliveryState resultantState;
-
- if(transactionId == null)
- {
- resultantState = (DeliveryState) outcome;
- }
- else
- {
- TransactionalState transactionalState = new TransactionalState();
- transactionalState.setOutcome(outcome);
- transactionalState.setTxnId(transactionId);
- resultantState = transactionalState;
-
- }
-
-
- boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
-
- final Binary deliveryTag = xfr.getDeliveryTag();
-
- if(!settled)
- {
- _unsettledMap.put(deliveryTag, outcome);
- }
-
- getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
-
- getSession().getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
-
- if(!(transaction instanceof AutoCommitTransaction))
- {
- ServerTransaction.Action a;
- transaction.addPostTransactionAction(new ServerTransaction.Action()
- {
- public void postCommit()
- {
- getEndpoint().updateDisposition(deliveryTag, null, true);
- }
-
- public void onRollback()
- {
- getEndpoint().updateDisposition(deliveryTag, null, true);
- }
- });
- }
- }
- }
-
- private ReceiverSettleMode getReceivingSettlementMode()
- {
- return _receivingSettlementMode;
- }
-
- public void remoteDetached(LinkEndpoint endpoint, Detach detach)
- {
- //TODO
- // if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
- (detach != null && Boolean.TRUE.equals(detach.getClosed())))
- {
- endpoint.close();
- }
- else if(detach == null || detach.getError() != null)
- {
- _attachment = null;
- }
- }
-
- public void start()
- {
- getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
- getEndpoint().setCreditWindow();
- }
-
- public ReceivingLinkEndpoint getEndpoint()
- {
- return _attachment.getEndpoint();
- }
-
-
- public Session_1_0 getSession()
- {
- ReceivingLinkAttachment attachment = _attachment;
- return attachment == null ? null : attachment.getSession();
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- if(Boolean.TRUE.equals(settled))
- {
- _unsettledMap.remove(deliveryTag);
- }
- }
-
- public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
- {
- _attachment = linkAttachment;
- _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
- ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
- Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
-
- Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
- for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
- {
- Binary deliveryTag = entry.getKey();
- if(!initialUnsettledMap.containsKey(deliveryTag))
- {
- _unsettledMap.remove(deliveryTag);
- }
- }
-
- }
-
- public Map getUnsettledOutcomeMap()
- {
- return _unsettledMap;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
deleted file mode 100644
index 6d601c9dda..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-
-public interface SendingDestination extends Destination
-{
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
deleted file mode 100644
index 09a2ddea3a..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Source;
-
-public class SendingLinkAttachment
-{
- private final Session_1_0 _session;
- private final SendingLinkEndpoint _endpoint;
-
- public SendingLinkAttachment(final Session_1_0 session, final SendingLinkEndpoint endpoint)
- {
- _session = session;
- _endpoint = endpoint;
- }
-
- public Session_1_0 getSession()
- {
- return _session;
- }
-
- public SendingLinkEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public Source getSource()
- {
- return getEndpoint().getSource();
- }
-
- public void setDeliveryStateHandler(final DeliveryStateHandler handler)
- {
- getEndpoint().setDeliveryStateHandler(handler);
- }
-
- public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
- {
- getEndpoint().updateDisposition(deliveryTag, state, settled);
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
deleted file mode 100644
index ca67b6f79b..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ /dev/null
@@ -1,685 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
-import org.apache.qpid.amqp_1_0.type.transport.Detach;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.filter.SelectorParsingException;
-import org.apache.qpid.filter.selector.ParseException;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
-{
- private VirtualHost _vhost;
- private SendingDestination _destination;
-
- private Subscription_1_0 _subscription;
- private boolean _draining;
- private final Map<Binary, QueueEntry> _unsettledMap =
- new HashMap<Binary, QueueEntry>();
-
- private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
- new ConcurrentHashMap<Binary, UnsettledAction>();
- private volatile SendingLinkAttachment _linkAttachment;
- private TerminusDurability _durability;
- private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
- private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
- private Runnable _closeAction;
-
- public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
- final VirtualHost vhost,
- final SendingDestination destination)
- throws AmqpErrorException
- {
- _vhost = vhost;
- _destination = destination;
- _linkAttachment = linkAttachment;
- final Source source = (Source) linkAttachment.getSource();
- _durability = source.getDurable();
- linkAttachment.setDeliveryStateHandler(this);
- QueueDestination qd = null;
- AMQQueue queue = null;
-
-
-
- boolean noLocal = false;
- JMSSelectorFilter messageFilter = null;
-
- if(destination instanceof QueueDestination)
- {
- queue = ((QueueDestination) _destination).getQueue();
- if(queue.getArguments() != null && queue.getArguments().containsKey("topic"))
- {
- source.setDistributionMode(StdDistMode.COPY);
- }
- qd = (QueueDestination) destination;
-
- Map<Symbol,Filter> filters = source.getFilter();
-
- Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
-
- if(filters != null)
- {
- for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
- {
- if(entry.getValue() instanceof NoLocalFilter)
- {
- actualFilters.put(entry.getKey(), entry.getValue());
- noLocal = true;
- }
- else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter)
- {
-
- org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue();
- try
- {
- messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
-
- actualFilters.put(entry.getKey(), entry.getValue());
- }
- catch (ParseException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
- catch (SelectorParsingException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
-
-
- }
- }
- }
- source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
-
- _subscription = new Subscription_1_0(this, qd);
- }
- else if(destination instanceof ExchangeDestination)
- {
- try
- {
-
- ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
-
- boolean isDurable = exchangeDestination.getDurability() == TerminusDurability.CONFIGURATION
- || exchangeDestination.getDurability() == TerminusDurability.UNSETTLED_STATE;
- String name;
- if(isDurable)
- {
- String remoteContainerId = getEndpoint().getSession().getConnection().getRemoteContainerId();
- remoteContainerId = remoteContainerId.replace("_","__").replace(".", "_:");
-
- String endpointName = linkAttachment.getEndpoint().getName();
- endpointName = endpointName
- .replace("_", "__")
- .replace(".", "_:")
- .replace("(", "_O")
- .replace(")", "_C")
- .replace("<", "_L")
- .replace(">", "_R");
- name = "qpid_/" + remoteContainerId + "_/" + endpointName;
- }
- else
- {
- name = UUID.randomUUID().toString();
- }
-
- queue = _vhost.getQueueRegistry().getQueue(name);
- Exchange exchange = exchangeDestination.getExchange();
-
- if(queue == null)
- {
- queue = AMQQueueFactory.createAMQQueueImpl(
- UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
- name,
- isDurable,
- null,
- true,
- true,
- _vhost,
- Collections.EMPTY_MAP);
- }
- else
- {
- List<Binding> bindings = queue.getBindings();
- List<Binding> bindingsToRemove = new ArrayList<Binding>();
- for(Binding existingBinding : bindings)
- {
- if(existingBinding.getExchange() != _vhost.getDefaultExchange()
- && existingBinding.getExchange() != exchange)
- {
- bindingsToRemove.add(existingBinding);
- }
- }
- for(Binding existingBinding : bindingsToRemove)
- {
- existingBinding.getExchange().removeBinding(existingBinding);
- }
- }
-
-
- String binding = "";
-
- Map<Symbol,Filter> filters = source.getFilter();
- Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
- boolean hasBindingFilter = false;
- if(filters != null && !filters.isEmpty())
- {
-
- for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
- {
- if(!hasBindingFilter
- && entry.getValue() instanceof ExactSubjectFilter
- && exchange.getType() == DirectExchange.TYPE)
- {
- ExactSubjectFilter filter = (ExactSubjectFilter) filters.values().iterator().next();
- source.setFilter(filters);
- binding = filter.getValue();
- actualFilters.put(entry.getKey(), entry.getValue());
- hasBindingFilter = true;
- }
- else if(!hasBindingFilter
- && entry.getValue() instanceof MatchingSubjectFilter
- && exchange.getType() == TopicExchange.TYPE)
- {
- MatchingSubjectFilter filter = (MatchingSubjectFilter) filters.values().iterator().next();
- source.setFilter(filters);
- binding = filter.getValue();
- actualFilters.put(entry.getKey(), entry.getValue());
- hasBindingFilter = true;
- }
- else if(entry.getValue() instanceof NoLocalFilter)
- {
- actualFilters.put(entry.getKey(), entry.getValue());
- noLocal = true;
- }
- else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter)
- {
-
- org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue();
- try
- {
- messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
-
- actualFilters.put(entry.getKey(), entry.getValue());
- }
- catch (ParseException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
- catch (SelectorParsingException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
-
-
- }
- }
- }
- source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
-
- exchange.addBinding(binding,queue,null);
- source.setDistributionMode(StdDistMode.COPY);
-
- if(!isDurable)
- {
- final String queueName = name;
- final AMQQueue tempQueue = queue;
-
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
- {
- public void doTask(Connection_1_0 session)
- {
- if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
- {
- try
- {
- tempQueue.delete();
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
- }
- };
-
- getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue)
- {
- getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
- }
-
-
- });
- }
-
- qd = new QueueDestination(queue);
- }
- catch (AMQSecurityException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- catch (AMQInternalException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (AMQException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- _subscription = new Subscription_1_0(this, qd, true);
-
- }
-
- if(_subscription != null)
- {
- _subscription.setNoLocal(noLocal);
- if(messageFilter!=null)
- {
- _subscription.setFilters(new SimpleFilterManager(messageFilter));
- }
-
- try
- {
-
- queue.registerSubscription(_subscription, false);
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //TODO
- }
- }
-
- }
-
- public void resume(SendingLinkAttachment linkAttachment)
- {
- _linkAttachment = linkAttachment;
-
- }
-
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- //TODO
- // if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
- {
- AMQQueue queue = _subscription.getQueue();
-
- try
- {
-
- queue.unregisterSubscription(_subscription);
-
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //TODO
- }
-
- Modified state = new Modified();
- state.setDeliveryFailed(true);
-
- for(UnsettledAction action : _unsettledActionMap.values())
- {
-
- action.process(state,Boolean.TRUE);
- }
- _unsettledActionMap.clear();
-
- endpoint.close();
-
- if(_destination instanceof ExchangeDestination
- && (_durability == TerminusDurability.CONFIGURATION
- || _durability == TerminusDurability.UNSETTLED_STATE))
- {
- try
- {
- queue.delete();
- }
- catch(AMQException e)
- {
- e.printStackTrace(); // TODO - Implement
- }
- }
-
- if(_closeAction != null)
- {
- _closeAction.run();
- }
- }
- else if(detach == null || detach.getError() != null)
- {
- _linkAttachment = null;
- _subscription.flowStateChanged();
- }
- else
- {
- endpoint.detach();
- }
- }
-
- public void start()
- {
- //TODO
- }
-
- public SendingLinkEndpoint getEndpoint()
- {
- return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ;
- }
-
- public Session_1_0 getSession()
- {
- return _linkAttachment == null ? null : _linkAttachment.getSession();
- }
-
- public void flowStateChanged()
- {
- if(Boolean.TRUE.equals(getEndpoint().getDrain())
- && hasCredit())
- {
- _draining = true;
- }
-
- while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend())
- {
- Accepted accepted = new Accepted();
- synchronized(getLock())
- {
-
- Transfer xfr = new Transfer();
- Binary dt = _resumeAcceptedTransfers.remove(0);
- xfr.setDeliveryTag(dt);
- xfr.setState(accepted);
- xfr.setResume(Boolean.TRUE);
- getEndpoint().transfer(xfr);
- }
-
- }
- if(_resumeAcceptedTransfers.isEmpty())
- {
- _subscription.flowStateChanged();
- }
-
- }
-
- boolean hasCredit()
- {
- return getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0;
- }
-
- public boolean isDraining()
- {
- return false; //TODO
- }
-
- public boolean drained()
- {
- if(getEndpoint() != null)
- {
- synchronized(getEndpoint().getLock())
- {
- if(_draining)
- {
- //TODO
- getEndpoint().drained();
- _draining = false;
- return true;
- }
- else
- {
- return false;
- }
- }
- }
- else
- {
- return false;
- }
- }
-
- public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
- {
- _unsettledActionMap.put(tag,unsettledAction);
- if(getTransactionId() == null)
- {
- _unsettledMap.put(tag, queueEntry);
- }
- }
-
- public void removeUnsettled(Binary tag)
- {
- _unsettledActionMap.remove(tag);
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- UnsettledAction action = _unsettledActionMap.get(deliveryTag);
- boolean localSettle = false;
- if(action != null)
- {
- localSettle = action.process(state, settled);
- if(localSettle && !Boolean.TRUE.equals(settled))
- {
- _linkAttachment.updateDisposition(deliveryTag, state, true);
- }
- }
- if(Boolean.TRUE.equals(settled) || localSettle)
- {
- _unsettledActionMap.remove(deliveryTag);
- _unsettledMap.remove(deliveryTag);
- }
- }
-
- ServerTransaction getTransaction(Binary transactionId)
- {
- return _linkAttachment.getSession().getTransaction(transactionId);
- }
-
- public Binary getTransactionId()
- {
- SendingLinkEndpoint endpoint = getEndpoint();
- return endpoint == null ? null : endpoint.getTransactionId();
- }
-
- public synchronized Object getLock()
- {
- return _linkAttachment == null ? this : getEndpoint().getLock();
- }
-
- public boolean isDetached()
- {
- return _linkAttachment == null || getEndpoint().isDetached();
- }
-
- public boolean isAttached()
- {
- return _linkAttachment != null && getEndpoint().isAttached();
- }
-
- public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
- {
-
- if(_subscription.isActive())
- {
- _subscription.suspend();
- }
-
- _linkAttachment = linkAttachment;
-
- SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
- endpoint.setDeliveryStateHandler(this);
- Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
- Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
- _resumeAcceptedTransfers.clear();
- _resumeFullTransfers.clear();
-
- for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
- {
- Binary deliveryTag = entry.getKey();
- final QueueEntry queueEntry = entry.getValue();
- if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
- {
- queueEntry.setRedelivered();
- queueEntry.release();
- _unsettledMap.remove(deliveryTag);
- }
- else if(initialUnsettledMap != null && (initialUnsettledMap.get(deliveryTag) instanceof Outcome))
- {
- Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
-
- if(outcome instanceof Accepted)
- {
- AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
- {
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.discard();
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
- }
- }
- else if(outcome instanceof Released)
- {
- AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
- {
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.release();
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
- }
- }
- //_unsettledMap.remove(deliveryTag);
- initialUnsettledMap.remove(deliveryTag);
- _resumeAcceptedTransfers.add(deliveryTag);
- }
- else
- {
- _resumeFullTransfers.add(queueEntry);
- // exists in receivers map, but not yet got an outcome ... should resend with resume = true
- }
- // TODO - else
- }
-
-
- }
-
- public Map getUnsettledOutcomeMap()
- {
- Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
-
- for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
- {
- entry.setValue(null);
- }
-
- return unsettled;
- }
-
- public void setCloseAction(Runnable action)
- {
- _closeAction = action;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
deleted file mode 100644
index ed75a8c165..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.text.MessageFormat;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.LinkRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.util.*;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-
-public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
-{
- private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
- private VirtualHost _vhost;
- private AutoCommitTransaction _transaction;
-
- private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
- new LinkedHashMap<Integer, ServerTransaction>();
- private final Connection_1_0 _connection;
- private UUID _id = UUID.randomUUID();
-
-
- public Session_1_0(VirtualHost vhost, final Connection_1_0 connection)
- {
- _vhost = vhost;
- _transaction = new AutoCommitTransaction(vhost.getMessageStore());
- _connection = connection;
-
- }
-
- public void remoteLinkCreation(final LinkEndpoint endpoint)
- {
-
-
- Destination destination;
- Link_1_0 link = null;
- Error error = null;
-
- final
- LinkRegistry
- linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
-
-
- if(endpoint.getRole() == Role.SENDER)
- {
-
- SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
-
- if(previousLink == null)
- {
-
- Target target = (Target) endpoint.getTarget();
- Source source = (Source) endpoint.getSource();
-
-
- if(source != null)
- {
- if(Boolean.TRUE.equals(source.getDynamic()))
- {
- AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
- source.setAddress(tempQueue.getName());
- }
- String addr = source.getAddress();
- AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
- if(queue != null)
- {
-
- destination = new QueueDestination(queue);
-
-
-
- }
- else
- {
- Exchange exchg = _vhost.getExchange(addr);
- if(exchg != null)
- {
- destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
- }
- else
- {
-
- endpoint.setSource(null);
- destination = null;
- }
- }
-
- }
- else
- {
- destination = null;
- }
-
- if(destination != null)
- {
- final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
- try
- {
- final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
- _vhost,
- (SendingDestination) destination
- );
- sendingLinkEndpoint.setLinkEventListener(sendingLink);
- link = sendingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
- {
- linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
- }
- }
- catch(AmqpErrorException e)
- {
- e.printStackTrace();
- destination = null;
- sendingLinkEndpoint.setSource(null);
- error = e.getError();
- }
- }
- }
- else
- {
- Source newSource = (Source) endpoint.getSource();
-
- Source oldSource = (Source) previousLink.getEndpoint().getSource();
- final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
- if(newSourceDurable != null)
- {
- oldSource.setDurable(newSourceDurable);
- if(newSourceDurable.equals(TerminusDurability.NONE))
- {
- linkRegistry.unregisterSendingLink(endpoint.getName());
- }
- }
- endpoint.setSource(oldSource);
- SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
- previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
- sendingLinkEndpoint.setLinkEventListener(previousLink);
- link = previousLink;
- endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
- }
- }
- else
- {
- if(endpoint.getTarget() instanceof Coordinator)
- {
- Coordinator coordinator = (Coordinator) endpoint.getTarget();
- TxnCapability[] capabilities = coordinator.getCapabilities();
- boolean localTxn = false;
- boolean multiplePerSession = false;
- if(capabilities != null)
- {
- for(TxnCapability capability : capabilities)
- {
- if(capability.equals(TxnCapability.LOCAL_TXN))
- {
- localTxn = true;
- }
- else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
- {
- multiplePerSession = true;
- }
- else
- {
- error = new Error();
- error.setCondition(AmqpError.NOT_IMPLEMENTED);
- error.setDescription("Unsupported capability: " + capability);
- break;
- }
- }
- }
-
- /* if(!localTxn)
- {
- capabilities.add(TxnCapabilities.LOCAL_TXN);
- }*/
-
- final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final TxnCoordinatorLink_1_0 coordinatorLink =
- new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
- receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
- link = coordinatorLink;
-
-
- }
- else
- {
-
- ReceivingLink_1_0 previousLink =
- (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
-
- if(previousLink == null)
- {
-
- Target target = (Target) endpoint.getTarget();
-
- if(target != null)
- {
- if(Boolean.TRUE.equals(target.getDynamic()))
- {
-
- AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
- target.setAddress(tempQueue.getName());
- }
-
- String addr = target.getAddress();
- Exchange exchg = _vhost.getExchange(addr);
- if(exchg != null)
- {
- destination = new ExchangeDestination(exchg, target.getDurable(),
- target.getExpiryPolicy());
- }
- else
- {
- AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
- if(queue != null)
- {
-
- destination = new QueueDestination(queue);
- }
- else
- {
- endpoint.setTarget(null);
- destination = null;
- }
-
- }
-
-
- }
- else
- {
- destination = null;
- }
- if(destination != null)
- {
- final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
- (ReceivingDestination) destination);
- receivingLinkEndpoint.setLinkEventListener(receivingLink);
- link = receivingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
- {
- linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
- }
- }
- }
- else
- {
- ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
- receivingLinkEndpoint.setLinkEventListener(previousLink);
- link = previousLink;
- endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
-
- }
- }
- }
-
- endpoint.attach();
-
- if(link == null)
- {
- if(error == null)
- {
- error = new Error();
- error.setCondition(AmqpError.NOT_FOUND);
- }
- endpoint.detach(error);
- }
- else
- {
- link.start();
- }
- }
-
-
- private AMQQueue createTemporaryQueue(Map properties)
- {
- final String queueName = UUID.randomUUID().toString();
- AMQQueue queue = null;
- try
- {
- LifetimePolicy lifetimePolicy = properties == null
- ? null
- : (LifetimePolicy) properties.get(LIFETIME_POLICY);
-
- final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
- queueName,
- false, // durable
- null, // owner
- false, // autodelete
- false, // exclusive
- _vhost,
- properties);
-
-
-
- if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
- {
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
- {
- public void doTask(Connection_1_0 session)
- {
- if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
- {
- try
- {
- tempQueue.delete();
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
- }
- };
-
- _connection.addConnectionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue)
- {
- _connection.removeConnectionCloseTask(deleteQueueTask);
- }
-
-
- });
- }
- else if(lifetimePolicy instanceof DeleteOnNoLinks)
- {
-
- }
- else if(lifetimePolicy instanceof DeleteOnNoMessages)
- {
-
- }
- else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
- {
-
- }
- }
- catch (AMQSecurityException e)
- {
- e.printStackTrace(); //TODO.
- } catch (AMQException e)
- {
- e.printStackTrace(); //TODO
- }
-
- return queue;
- }
-
- public ServerTransaction getTransaction(Binary transactionId)
- {
- // TODO should treat invalid id differently to null
- ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
- return transaction == null ? _transaction : transaction;
- }
-
- public void remoteEnd(End end)
- {
- Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
-
- while(iter.hasNext())
- {
- Map.Entry<Integer, ServerTransaction> entry = iter.next();
- entry.getValue().rollback();
- iter.remove();
- }
-
- _connection.sessionEnded(this);
-
- }
-
- Integer binaryToInteger(final Binary txnId)
- {
- if(txnId == null)
- {
- return null;
- }
-
- if(txnId.getLength() > 4)
- throw new IllegalArgumentException();
-
- int id = 0;
- byte[] data = txnId.getArray();
- for(int i = 0; i < txnId.getLength(); i++)
- {
- id <<= 8;
- id += data[i+txnId.getArrayOffset()];
- }
-
- return id;
-
- }
-
- Binary integerToBinary(final int txnId)
- {
- byte[] data = new byte[4];
- data[3] = (byte) (txnId & 0xff);
- data[2] = (byte) ((txnId & 0xff00) >> 8);
- data[1] = (byte) ((txnId & 0xff0000) >> 16);
- data[0] = (byte) ((txnId & 0xff000000) >> 24);
- return new Binary(data);
-
- }
-
- public void forceEnd()
- {
- }
-
-
- @Override
- public UUID getId()
- {
- return _id;
- }
-
- @Override
- public AMQConnectionModel getConnectionModel()
- {
- return _connection.getModel();
- }
-
- @Override
- public String getClientID()
- {
- // TODO
- return "";
- }
-
- @Override
- public void close() throws AMQException
- {
- // TODO - required for AMQSessionModel / management initiated closing
- }
-
-
- @Override
- public void close(AMQConstant cause, String message) throws AMQException
- {
- // TODO - required for AMQSessionModel
- }
-
- @Override
- public LogSubject getLogSubject()
- {
- return this;
- }
-
- @Override
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
- {
- // TODO - required for AMQSessionModel / long running transaction detection
- }
-
- @Override
- public void block(AMQQueue queue)
- {
- // TODO - required for AMQSessionModel / producer side flow control
- }
-
- @Override
- public void unblock(AMQQueue queue)
- {
- // TODO - required for AMQSessionModel / producer side flow control
- }
-
- @Override
- public void block()
- {
- // TODO - required for AMQSessionModel / producer side flow control
- }
-
- @Override
- public void unblock()
- {
- // TODO - required for AMQSessionModel / producer side flow control
- }
-
- @Override
- public boolean getBlocking()
- {
- // TODO
- return false;
- }
-
- @Override
- public boolean onSameConnection(InboundMessage inbound)
- {
- // TODO
- return false;
- }
-
- @Override
- public int getUnacknowledgedMessageCount()
- {
- // TODO
- return 0;
- }
-
- @Override
- public Long getTxnCount()
- {
- // TODO
- return 0l;
- }
-
- @Override
- public Long getTxnStart()
- {
- // TODO
- return 0l;
- }
-
- @Override
- public Long getTxnCommits()
- {
- // TODO
- return 0l;
- }
-
- @Override
- public Long getTxnRejects()
- {
- // TODO
- return 0l;
- }
-
- @Override
- public int getChannelId()
- {
- // TODO
- return 0;
- }
-
- @Override
- public int getConsumerCount()
- {
- // TODO
- return 0;
- }
-
-
- public String toLogString()
- {
- long connectionId = getConnectionModel().getConnectionId();
-
- String remoteAddress = getConnectionModel().getRemoteAddressString();
-
- return "[" +
- MessageFormat.format(CHANNEL_FORMAT,
- connectionId,
- getClientID(),
- remoteAddress,
- _vhost.getName(), // TODO - virtual host
- 0) // TODO - channel)
- + "] ";
- }
-
- @Override
- public int compareTo(AMQSessionModel o)
- {
- return getId().compareTo(o.getId());
- }
-
- public Connection_1_0 getConnection()
- {
- return _connection;
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
deleted file mode 100644
index 28d212c93d..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ /dev/null
@@ -1,718 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.io.EOFException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedByte;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.typedmessage.TypedBytesContentReader;
-import org.apache.qpid.typedmessage.TypedBytesFormatException;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.codec.BBDecoder;
-
-class Subscription_1_0 implements Subscription
-{
- private SendingLink_1_0 _link;
-
- private AMQQueue _queue;
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED);
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
- private final long _id;
- private final boolean _acquires;
- private volatile AMQQueue.Context _queueContext;
- private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private ReentrantLock _stateChangeLock = new ReentrantLock();
-
- private boolean _noLocal;
- private FilterManager _filters;
-
- private long _deliveryTag = 0L;
- private StateListener _stateListener;
-
- private Binary _transactionId;
- private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer();
- private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
-
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination)
- {
- this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY);
- }
-
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires)
- {
- _link = link;
- _queue = destination.getQueue();
- _id = getEndpoint().getLocalHandle().longValue();
- _acquires = acquires;
- }
-
- private SendingLinkEndpoint getEndpoint()
- {
- return _link.getEndpoint();
- }
-
- public LogActor getLogActor()
- {
- return null; //TODO
- }
-
- public boolean isTransient()
- {
- return true; //TODO
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(final AMQQueue queue, final boolean exclusive)
- {
- //TODO
- }
-
- public void setNoLocal(final boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public long getSubscriptionID()
- {
- return _id;
- }
-
- public boolean isSuspended()
- {
- return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
-
- }
-
- public boolean hasInterest(final QueueEntry entry)
- {
- if(entry.getMessage() instanceof Message_1_0)
- {
- if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession())
- {
- return false;
- }
- }
- else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
- {
- return false;
- }
- return checkFilters(entry);
-
- }
-
- private boolean checkFilters(final QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry);
- }
-
- public boolean isClosed()
- {
- return !getEndpoint().isAttached();
- }
-
- public boolean acquires()
- {
- return _acquires;
- }
-
- public boolean seesRequeues()
- {
- // TODO
- return acquires();
- }
-
- public void close()
- {
- getEndpoint().detach();
- }
-
- public void send(QueueEntry entry, boolean batch) throws AMQException
- {
- // TODO
- send(entry);
- }
-
- public void flushBatched()
- {
- // TODO
- }
-
- public void send(final QueueEntry queueEntry) throws AMQException
- {
- ServerMessage serverMessage = queueEntry.getMessage();
- Message_1_0 message;
- if(serverMessage instanceof Message_1_0)
- {
- message = (Message_1_0) serverMessage;
- }
- else
- {
- final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
- message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost());
- }
-
- Transfer transfer = new Transfer();
- //TODO
-
-
- List<ByteBuffer> fragments = message.getFragments();
- ByteBuffer payload;
- if(fragments.size() == 1)
- {
- payload = fragments.get(0);
- }
- else
- {
- int size = 0;
- for(ByteBuffer fragment : fragments)
- {
- size += fragment.remaining();
- }
-
- payload = ByteBuffer.allocate(size);
-
- for(ByteBuffer fragment : fragments)
- {
- payload.put(fragment.duplicate());
- }
-
- payload.flip();
- }
-
- if(queueEntry.getDeliveryCount() != 0)
- {
- payload = payload.duplicate();
- ValueHandler valueHandler = new ValueHandler(_typeRegistry);
-
- Header oldHeader = null;
- try
- {
- ByteBuffer encodedBuf = payload.duplicate();
- Object value = valueHandler.parse(payload);
- if(value instanceof Header)
- {
- oldHeader = (Header) value;
- }
- else
- {
- payload.position(0);
- }
- }
- catch (AmqpErrorException e)
- {
- //TODO
- throw new RuntimeException(e);
- }
-
- Header header = new Header();
- if(oldHeader != null)
- {
- header.setDurable(oldHeader.getDurable());
- header.setPriority(oldHeader.getPriority());
- header.setTtl(oldHeader.getTtl());
- }
- header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
- _sectionEncoder.reset();
- _sectionEncoder.encodeObject(header);
- Binary encodedHeader = _sectionEncoder.getEncoding();
-
- ByteBuffer oldPayload = payload;
- payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength());
- payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
- payload.put(oldPayload);
- payload.flip();
- }
-
- transfer.setPayload(payload);
- byte[] data = new byte[8];
- ByteBuffer.wrap(data).putLong(_deliveryTag++);
- final Binary tag = new Binary(data);
-
- transfer.setDeliveryTag(tag);
-
- synchronized(_link.getLock())
- {
- if(_link.isAttached())
- {
- if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
- {
- transfer.setSettled(true);
- }
- else
- {
- UnsettledAction action = _acquires
- ? new DispositionAction(tag, queueEntry)
- : new DoNothingAction(tag, queueEntry);
-
- _link.addUnsettled(tag, action, queueEntry);
- }
-
- if(_transactionId != null)
- {
- TransactionalState state = new TransactionalState();
- state.setTxnId(_transactionId);
- transfer.setState(state);
- }
- // TODO - need to deal with failure here
- if(_acquires && _transactionId != null)
- {
- ServerTransaction txn = _link.getTransaction(_transactionId);
- if(txn != null)
- {
- txn.addPostTransactionAction(new ServerTransaction.Action(){
-
- public void postCommit()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void onRollback()
- {
- if(queueEntry.isAcquiredBy(Subscription_1_0.this))
- {
- queueEntry.release();
- _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
-
-
- }
- }
- });
- }
-
- }
- getSession().getConnectionModel().registerMessageDelivered(message.getSize());
- getEndpoint().transfer(transfer);
- }
- else
- {
- queueEntry.release();
- }
- }
-
- }
-
- public void queueDeleted(final AMQQueue queue)
- {
- //TODO
- getEndpoint().setSource(null);
- getEndpoint().detach();
- }
-
- public boolean wouldSuspend(final QueueEntry msg)
- {
- synchronized (_link.getLock())
- {
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
- if(!hasCredit && getState() == State.ACTIVE)
- {
- suspend();
- }
-
- return !hasCredit;
- }
- }
-
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
- public void suspend()
- {
- synchronized(_link.getLock())
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- }
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void releaseQueueEntry(QueueEntry queueEntryImpl)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- public void onDequeue(final QueueEntry queueEntry)
- {
- //TODO
- }
-
- public void restoreCredit(final QueueEntry queueEntry)
- {
- //TODO
- }
-
- public void setStateListener(final StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
- public boolean isSessionTransactional()
- {
- return false; //TODO
- }
-
- public void queueEmpty()
- {
- synchronized(_link.getLock())
- {
- if(_link.drained())
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- }
- }
-
- public void flowStateChanged()
- {
- synchronized(_link.getLock())
- {
- if(isSuspended() && getEndpoint() != null)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- _transactionId = _link.getTransactionId();
- }
- }
- }
-
- public Session_1_0 getSession()
- {
- return _link.getSession();
- }
-
- private class DispositionAction implements UnsettledAction
- {
-
- private final QueueEntry _queueEntry;
- private final Binary _deliveryTag;
-
- public DispositionAction(Binary tag, QueueEntry queueEntry)
- {
- _deliveryTag = tag;
- _queueEntry = queueEntry;
- }
-
- public boolean process(DeliveryState state, final Boolean settled)
- {
-
- Binary transactionId = null;
- final Outcome outcome;
- // If disposition is settled this overrides the txn?
- if(state instanceof TransactionalState)
- {
- transactionId = ((TransactionalState)state).getTxnId();
- outcome = ((TransactionalState)state).getOutcome();
- }
- else if (state instanceof Outcome)
- {
- outcome = (Outcome) state;
- }
- else
- {
- outcome = null;
- }
-
-
- ServerTransaction txn = _link.getTransaction(transactionId);
-
- if(outcome instanceof Accepted)
- {
- txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
- {
- _queueEntry.discard();
- }
- }
-
- public void onRollback()
- {
-
- }
- });
- txn.addPostTransactionAction(new ServerTransaction.Action()
- {
- public void postCommit()
- {
- //_link.getEndpoint().settle(_deliveryTag);
- _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState)outcome, true);
- _link.getEndpoint().sendFlowConditional();
- }
-
- public void onRollback()
- {
- if(Boolean.TRUE.equals(settled))
- {
- final Modified modified = new Modified();
- modified.setDeliveryFailed(true);
- _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
- _link.getEndpoint().sendFlowConditional();
- }
- }
- });
- }
- else if(outcome instanceof Released)
- {
- txn.addPostTransactionAction(new ServerTransaction.Action()
- {
- public void postCommit()
- {
-
- _queueEntry.release();
- _link.getEndpoint().settle(_deliveryTag);
- }
-
- public void onRollback()
- {
- _link.getEndpoint().settle(_deliveryTag);
- }
- });
- }
-
- else if(outcome instanceof Modified)
- {
- txn.addPostTransactionAction(new ServerTransaction.Action()
- {
- public void postCommit()
- {
-
- _queueEntry.release();
- if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
- {
- _queueEntry.incrementDeliveryCount();
- }
- _link.getEndpoint().settle(_deliveryTag);
- }
-
- public void onRollback()
- {
- if(Boolean.TRUE.equals(settled))
- {
- final Modified modified = new Modified();
- modified.setDeliveryFailed(true);
- _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
- _link.getEndpoint().sendFlowConditional();
- }
- }
- });
- }
-
- return (transactionId == null && outcome != null);
- }
- }
-
- private class DoNothingAction implements UnsettledAction
- {
- public DoNothingAction(final Binary tag,
- final QueueEntry queueEntry)
- {
- }
-
- public boolean process(final DeliveryState state, final Boolean settled)
- {
- Binary transactionId = null;
- Outcome outcome = null;
- // If disposition is settled this overrides the txn?
- if(state instanceof TransactionalState)
- {
- transactionId = ((TransactionalState)state).getTxnId();
- outcome = ((TransactionalState)state).getOutcome();
- }
- else if (state instanceof Outcome)
- {
- outcome = (Outcome) state;
- }
- return true;
- }
- }
-
- public FilterManager getFilters()
- {
- return _filters;
- }
-
- public void setFilters(final FilterManager filters)
- {
- _filters = filters;
- }
-
- @Override
- public AMQSessionModel getSessionModel()
- {
- // TODO
- return getSession();
- }
-
- @Override
- public long getBytesOut()
- {
- // TODO
- return 0;
- }
-
- @Override
- public long getMessagesOut()
- {
- // TODO
- return 0;
- }
-
- @Override
- public long getUnacknowledgedBytes()
- {
- // TODO
- return 0;
- }
-
- @Override
- public long getUnacknowledgedMessages()
- {
- // TODO
- return 0;
- }
-
- @Override
- public String getConsumerName()
- {
- //TODO
- return "TODO";
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
deleted file mode 100644
index a05d14816a..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.transaction.Declare;
-import org.apache.qpid.amqp_1_0.type.transaction.Declared;
-import org.apache.qpid.amqp_1_0.type.transaction.Discharge;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0
-{
- private VirtualHost _vhost;
- private ReceivingLinkEndpoint _endpoint;
-
- private ArrayList<Transfer> _incompleteMessage;
- private SectionDecoder _sectionDecoder;
- private LinkedHashMap<Integer, ServerTransaction> _openTransactions;
- private Session_1_0 _session;
-
-
- public TxnCoordinatorLink_1_0(VirtualHost vhost,
- Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint,
- LinkedHashMap<Integer, ServerTransaction> openTransactions)
- {
- _vhost = vhost;
- _session = session_1_0;
- _endpoint = endpoint;
- _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry());
- _openTransactions = openTransactions;
- }
-
- public void messageTransfer(Transfer xfr)
- {
- // TODO - cope with fragmented messages
-
- ByteBuffer payload = null;
-
-
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<Transfer>();
- _incompleteMessage.add(xfr);
- return;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return;
- }
-
- int size = 0;
- for(Transfer t : _incompleteMessage)
- {
- size += t.getPayload().limit();
- }
- payload = ByteBuffer.allocate(size);
- for(Transfer t : _incompleteMessage)
- {
- payload.put(t.getPayload().duplicate());
- }
- payload.flip();
- _incompleteMessage=null;
-
- }
- else
- {
- payload = xfr.getPayload();
- }
-
-
- // Only interested int he amqp-value section that holds the message to the co-ordinator
- try
- {
- List<Section> sections = _sectionDecoder.parseAll(payload);
-
- for(Section section : sections)
- {
- if(section instanceof AmqpValue)
- {
- Object command = ((AmqpValue) section).getValue();
-
- if(command instanceof Declare)
- {
- Integer txnId = Integer.valueOf(0);
- Iterator<Integer> existingTxn = _openTransactions.keySet().iterator();
- while(existingTxn.hasNext())
- {
- txnId = existingTxn.next();
- }
- txnId = Integer.valueOf(txnId.intValue() + 1);
-
- _openTransactions.put(txnId, new LocalTransaction(_vhost.getMessageStore()));
-
- Declared state = new Declared();
-
-
-
- state.setTxnId(_session.integerToBinary(txnId));
- _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true);
-
- }
- else if(command instanceof Discharge)
- {
- Discharge discharge = (Discharge) command;
-
- DeliveryState state = xfr.getState();
- discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail());
- _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true);
-
- }
- }
- }
-
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- public void remoteDetached(LinkEndpoint endpoint, Detach detach)
- {
- //TODO
- endpoint.detach();
- }
-
- private Error discharge(Integer transactionId, boolean fail)
- {
- Error error = null;
- ServerTransaction txn = _openTransactions.get(transactionId);
- if(txn != null)
- {
- if(fail)
- {
- txn.rollback();
- }
- else
- {
- txn.commit();
- }
- _openTransactions.remove(transactionId);
- }
- else
- {
- error = new Error();
- error.setCondition(AmqpError.NOT_FOUND);
- error.setDescription("Unkown transactionId" + transactionId);
- }
- return error;
- }
-
-
-
- public void start()
- {
- _endpoint.setLinkCredit(UnsignedInteger.ONE);
- _endpoint.setCreditWindow();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
deleted file mode 100644
index 0fee4086b4..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-
-public interface UnsettledAction
-{
- boolean process(DeliveryState state, Boolean settled);
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index f7cb6e2bed..f364d93d98 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -39,15 +39,12 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
index 64c497d433..ab1546ed0a 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
@@ -18,6 +18,4 @@
#
org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_8_to_0_10
org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_10_to_0_8
-org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_0_8_to_1_0
-org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0
org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
index 9aa1d4ce11..e0c0aa5873 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
@@ -18,4 +18,3 @@
#
org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8
org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10
-org.apache.qpid.server.protocol.v1_0.MessageMetaDataType_1_0
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator
index 50f59d8a55..b771e25328 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator
@@ -20,5 +20,3 @@ org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8
org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9
org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1
org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10
-org.apache.qpid.server.protocol.v1_0.ProtocolEngineCreator_1_0_0
-org.apache.qpid.server.protocol.v1_0.ProtocolEngineCreator_1_0_0_SASL