diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 15:37:04 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 15:37:04 +0000 |
| commit | 43d9b8b9928c56ee200ae1a3323796f393dec0f7 (patch) | |
| tree | 1e687035925c659aa321cff9d3ae9c37930c7a2b /qpid/java/broker | |
| parent | b89341f0b6382d09ef438bafe09235ca48ea5767 (diff) | |
| download | qpid-python-43d9b8b9928c56ee200ae1a3323796f393dec0f7.tar.gz | |
QPID-4659 : [Java Broker] move amqp 1-0 implementation into a plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503303 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
30 files changed, 3 insertions, 5823 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index 3c83715305..d34414f44d 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -19,15 +19,15 @@ - --> <project name="AMQ Broker" default="build"> - <property name="module.depends" value="management/common common amqp-1-0-common"/> + <property name="module.depends" value="management/common common"/> <property name="module.test.depends" value="common/tests" /> <property name="module.main" value="org.apache.qpid.server.Main"/> <property name="module.genpom" value="true"/> <!-- Add dependencies to the broker pom for the broker-plugins and bdbstore modules --> - <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx broker-plugins/jdbc-store broker-plugins/derby-store"/> + <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx broker-plugins/jdbc-store broker-plugins/derby-store broker-plugins/amqp-1-0-protocol broker-plugins/amqp-msg-conv-0-8-to-1-0 broker-plugins/amqp-msg-conv-0-10-to-1-0"/> <!-- Make them runtime dependencies, make bdbstore modules optional --> - <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone -Sqpid-broker-plugins-jdbc-store=runtime -Sqpid-broker-plugins-derby-store=runtime"/> + <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone -Sqpid-broker-plugins-jdbc-store=runtime -Sqpid-broker-plugins-derby-store=runtime -Sqpid-broker-plugins-amqp-1-0-protocol=runtime -Sqpid-broker-plugins-amqp-msg-conv-0-8-to-1-0=runtime -Sqpid-broker-plugins-amqp-msg-conv-0-10-to-1-0=runtime"/> <import file="../module.xml"/> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java deleted file mode 100644 index a70bd4b243..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.protocol.converter.v0_10_v1_0; - -import java.util.ArrayList; -import java.util.List; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedByte; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; -import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; -import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageProperties; - -public class MessageConverter_0_10_to_1_0 extends MessageConverter_to_1_0<MessageTransferMessage> -{ - @Override - public Class<MessageTransferMessage> getInputClass() - { - return MessageTransferMessage.class; - } - - - @Override - protected MessageMetaData_1_0 convertMetaData(MessageTransferMessage serverMessage, - SectionEncoder sectionEncoder) - { - List<Section> sections = new ArrayList<Section>(3); - final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties(); - final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties(); - - Header header = new Header(); - if(deliveryProps != null) - { - header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); - if(deliveryProps.hasPriority()) - { - header.setPriority(UnsignedByte.valueOf((byte) deliveryProps.getPriority().getValue())); - } - if(deliveryProps.hasTtl()) - { - header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl())); - } - sections.add(header); - } - - Properties props = new Properties(); - - /* - TODO: the current properties are not currently set: - - absoluteExpiryTime - creationTime - groupId - groupSequence - replyToGroupId - to - */ - - if(msgProps != null) - { - if(msgProps.hasContentEncoding()) - { - props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding())); - } - - if(msgProps.hasCorrelationId()) - { - props.setCorrelationId(msgProps.getCorrelationId()); - } - - if(msgProps.hasMessageId()) - { - props.setMessageId(msgProps.getMessageId()); - } - if(msgProps.hasReplyTo()) - { - props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey()); - } - if(msgProps.hasContentType()) - { - props.setContentType(Symbol.valueOf(msgProps.getContentType())); - - // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client - if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) - { - props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); - } - } - - props.setSubject(serverMessage.getRoutingKey()); - - if(msgProps.hasUserId()) - { - props.setUserId(new Binary(msgProps.getUserId())); - } - - sections.add(props); - - if(msgProps.getApplicationHeaders() != null) - { - sections.add(new ApplicationProperties(msgProps.getApplicationHeaders())); - } - } - return new MessageMetaData_1_0(sections, sectionEncoder); - } - - @Override - public String getType() - { - return "v0-10 to v1-0"; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java deleted file mode 100644 index 0d9d59ff56..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.converter.v0_8_v1_0; - -import java.util.ArrayList; -import java.util.List; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedByte; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; -import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; - -public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage> -{ - @Override - public Class<AMQMessage> getInputClass() - { - return AMQMessage.class; - } - - protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder) - { - - List<Section> sections = new ArrayList<Section>(3); - - Header header = new Header(); - - header.setDurable(serverMessage.isPersistent()); - - BasicContentHeaderProperties contentHeader = - (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); - - header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); - final long expiration = serverMessage.getExpiration(); - final long arrivalTime = serverMessage.getArrivalTime(); - - if(expiration > arrivalTime) - { - header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime)); - } - sections.add(header); - - - Properties props = new Properties(); - - /* - TODO: The following properties are not currently set: - - creationTime - groupId - groupSequence - replyToGroupId - to - */ - - props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); - - props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); - - // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client - if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) - { - props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); - } - - final AMQShortString correlationId = contentHeader.getCorrelationId(); - if(correlationId != null) - { - props.setCorrelationId(new Binary(correlationId.getBytes())); - } - - final AMQShortString messageId = contentHeader.getMessageId(); - if(messageId != null) - { - props.setMessageId(new Binary(messageId.getBytes())); - } - props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); - - props.setSubject(serverMessage.getRoutingKey()); - if(contentHeader.getUserId() != null) - { - props.setUserId(new Binary(contentHeader.getUserId().getBytes())); - } - - sections.add(props); - - sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders()))); - - return new MessageMetaData_1_0(sections, sectionEncoder); - } - - @Override - public String getType() - { - return "v0-8 to v1-0"; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java deleted file mode 100644 index 320875cc97..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.text.MessageFormat; -import java.util.Collection; -import org.apache.qpid.AMQException; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; -import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; - -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; - -public class Connection_1_0 implements ConnectionEventListener -{ - - private final Port _port; - private VirtualHost _vhost; - private final Transport _transport; - private final ConnectionEndpoint _conn; - private final long _connectionId; - private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); - - - public static interface Task - { - public void doTask(Connection_1_0 connection); - } - - - private List<Task> _closeTasks = - Collections.synchronizedList(new ArrayList<Task>()); - - - - public Connection_1_0(VirtualHost virtualHost, - ConnectionEndpoint conn, - long connectionId, - Port port, - Transport transport) - { - _vhost = virtualHost; - _port = port; - _transport = transport; - _conn = conn; - _connectionId = connectionId; - _vhost.getConnectionRegistry().registerConnection(_model); - - } - - public void remoteSessionCreation(SessionEndpoint endpoint) - { - Session_1_0 session = new Session_1_0(_vhost, this); - _sessions.add(session); - endpoint.setSessionEventListener(session); - } - - void sessionEnded(Session_1_0 session) - { - _sessions.remove(session); - } - - void removeConnectionCloseTask(final Task task) - { - _closeTasks.remove( task ); - } - - void addConnectionCloseTask(final Task task) - { - _closeTasks.add( task ); - } - - public void closeReceived() - { - List<Task> taskCopy; - synchronized (_closeTasks) - { - taskCopy = new ArrayList<Task>(_closeTasks); - } - for(Task task : taskCopy) - { - task.doTask(this); - } - synchronized (_closeTasks) - { - _closeTasks.clear(); - } - _vhost.getConnectionRegistry().deregisterConnection(_model); - - - } - - public void closed() - { - closeReceived(); - } - - private final AMQConnectionModel _model = new AMQConnectionModel() - { - private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); - private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); - - private final LogSubject _logSubject = new LogSubject() - { - @Override - public String toLogString() - { - return "[" + - MessageFormat.format(CONNECTION_FORMAT, - getConnectionId(), - getClientId(), - getRemoteAddressString(), - _vhost.getName()) - + "] "; - - } - }; - - private volatile boolean _stopped; - - @Override - public void close(AMQConstant cause, String message) throws AMQException - { - _conn.close(); - } - - @Override - public void block() - { - // TODO - } - - @Override - public void unblock() - { - // TODO - } - - @Override - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException - { - // TODO - } - - @Override - public long getConnectionId() - { - return _connectionId; - } - - @Override - public List<AMQSessionModel> getSessionModels() - { - return new ArrayList<AMQSessionModel>(_sessions); - } - - @Override - public LogSubject getLogSubject() - { - return _logSubject; - } - - @Override - public String getUserName() - { - return getPrincipalAsString(); - } - - @Override - public boolean isSessionNameUnique(byte[] name) - { - return true; // TODO - } - - @Override - public String getRemoteAddressString() - { - return String.valueOf(_conn.getRemoteAddress()); - } - - @Override - public String getClientId() - { - return _conn.getRemoteContainerId(); - } - - @Override - public String getClientVersion() - { - return ""; //TODO - } - - @Override - public String getPrincipalAsString() - { - return String.valueOf(_conn.getUser()); - } - - @Override - public long getSessionCountLimit() - { - return 0; // TODO - } - - @Override - public long getLastIoTime() - { - return 0; // TODO - } - - @Override - public String getVirtualHostName() - { - return _vhost == null ? null : _vhost.getName(); - } - - @Override - public Port getPort() - { - return _port; - } - - @Override - public Transport getTransport() - { - return _transport; - } - - @Override - public void stop() - { - _stopped = true; - } - - @Override - public boolean isStopped() - { - return _stopped; - } - - @Override - public void initialiseStatistics() - { - _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId()); - _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId()); - _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId()); - _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId()); - } - - @Override - public void registerMessageReceived(long messageSize, long timestamp) - { - _messageReceiptStatistics.registerEvent(1L, timestamp); - _dataReceiptStatistics.registerEvent(messageSize, timestamp); - _vhost.registerMessageReceived(messageSize,timestamp); - - } - - @Override - public void registerMessageDelivered(long messageSize) - { - - _messageDeliveryStatistics.registerEvent(1L); - _dataDeliveryStatistics.registerEvent(messageSize); - _vhost.registerMessageDelivered(messageSize); - } - - @Override - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messageDeliveryStatistics; - } - - @Override - public StatisticsCounter getMessageReceiptStatistics() - { - return _messageReceiptStatistics; - } - - @Override - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDeliveryStatistics; - } - - @Override - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceiptStatistics; - } - - @Override - public void resetStatistics() - { - _dataDeliveryStatistics.reset(); - _dataReceiptStatistics.reset(); - _messageDeliveryStatistics.reset(); - _messageReceiptStatistics.reset(); - } - - - }; - - AMQConnectionModel getModel() - { - return _model; - } - - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java deleted file mode 100644 index d45758391c..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - - -public interface Destination -{ - - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java deleted file mode 100644 index 2cef27267b..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.util.List; -import org.apache.qpid.AMQException; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.txn.ServerTransaction; - -public class ExchangeDestination implements ReceivingDestination, SendingDestination -{ - private static final Accepted ACCEPTED = new Accepted(); - private static final Outcome[] OUTCOMES = { ACCEPTED }; - - private Exchange _exchange; - private TerminusDurability _durability; - private TerminusExpiryPolicy _expiryPolicy; - - public ExchangeDestination(Exchange exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) - { - _exchange = exchange; - _durability = durable; - _expiryPolicy = expiryPolicy; - } - - public Outcome[] getOutcomes() - { - return OUTCOMES; - } - - public Outcome send(final Message_1_0 message, ServerTransaction txn) - { - final List<? extends BaseQueue> queues = _exchange.route(message); - - txn.enqueue(queues,message, new ServerTransaction.Action() - { - - BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); - - public void postCommit() - { - for(int i = 0; i < _queues.length; i++) - { - try - { - _queues[i].enqueue(message); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - } - - public void onRollback() - { - // NO-OP - } - }); - - return ACCEPTED; - } - - TerminusDurability getDurability() - { - return _durability; - } - - TerminusExpiryPolicy getExpiryPolicy() - { - return _expiryPolicy; - } - - public int getCredit() - { - // TODO - fix - return 20000; - } - - public Exchange getExchange() - { - return _exchange; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java deleted file mode 100644 index 5ce24f406d..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.server.protocol.LinkModel; - -public interface Link_1_0 extends LinkModel -{ - void start(); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java deleted file mode 100644 index be9b0323a3..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.io.EOFException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.typedmessage.TypedBytesContentReader; -import org.apache.qpid.typedmessage.TypedBytesFormatException; - -public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0> -{ - private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer() - .registerTransactionLayer() - .registerSecurityLayer(); - - @Override - public final Class<Message_1_0> getOutputClass() - { - return Message_1_0.class; - } - - @Override - public final Message_1_0 convert(M message, VirtualHost vhost) - { - - SectionEncoder sectionEncoder = new SectionEncoderImpl(_typeRegistry); - return new Message_1_0(convertToStoredMessage(message, sectionEncoder)); - } - - - private StoredMessage<MessageMetaData_1_0> convertToStoredMessage(final M serverMessage, SectionEncoder sectionEncoder) - { - final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, sectionEncoder); - return convertServerMessage(metaData, serverMessage, sectionEncoder); - } - - abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage, SectionEncoder sectionEncoder); - - - private static Section convertMessageBody(String mimeType, byte[] data) - { - if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) - { - String text = new String(data); - return new AmqpValue(text); - } - else if("jms/map-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - LinkedHashMap map = new LinkedHashMap(); - final int entries = reader.readIntImpl(); - for (int i = 0; i < entries; i++) - { - try - { - String propName = reader.readStringImpl(); - Object value = reader.readObject(); - map.put(propName, value); - } - catch (EOFException e) - { - throw new IllegalArgumentException(e); - } - catch (TypedBytesFormatException e) - { - throw new IllegalArgumentException(e); - } - - } - - return new AmqpValue(map); - - } - else if("amqp/map".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - return new AmqpValue(decoder.readMap()); - - } - else if("amqp/list".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - return new AmqpValue(decoder.readList()); - } - else if("jms/stream-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - List list = new ArrayList(); - while (reader.remaining() != 0) - { - try - { - list.add(reader.readObject()); - } - catch (TypedBytesFormatException e) - { - throw new RuntimeException(e); // TODO - Implement - } - catch (EOFException e) - { - throw new RuntimeException(e); // TODO - Implement - } - } - return new AmqpValue(list); - } - else - { - return new Data(new Binary(data)); - - } - } - - private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData, - final ServerMessage serverMessage, - SectionEncoder sectionEncoder) - { - final String mimeType = serverMessage.getMessageHeader().getMimeType(); - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getContent(ByteBuffer.wrap(data), 0); - - Section bodySection = convertMessageBody(mimeType, data); - - final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder); - - return new StoredMessage<MessageMetaData_1_0>() - { - @Override - public MessageMetaData_1_0 getMetaData() - { - return metaData; - } - - @Override - public long getMessageNumber() - { - return serverMessage.getMessageNumber(); - } - - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override - public int getContent(int offsetInMessage, ByteBuffer dst) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - int size; - if(dst.remaining()<buf.remaining()) - { - buf.limit(dst.remaining()); - size = dst.remaining(); - } - else - { - size = buf.remaining(); - } - dst.put(buf); - return size; - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - if(size < buf.remaining()) - { - buf.limit(size); - } - return buf; - } - - @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override - public void remove() - { - serverMessage.getStoredMessage().remove(); - } - }; - } - - private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder) - { - int headerSize = (int) metaData.getStorableSize(); - - sectionEncoder.reset(); - sectionEncoder.encodeObject(bodySection); - Binary dataEncoding = sectionEncoder.getEncoding(); - - final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength()); - metaData.writeToBuffer(0,allData); - allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength()); - return allData; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java deleted file mode 100644 index 44b1de74e1..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.nio.ByteBuffer; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.store.StoredMessage; - -public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaData_1_0> -{ - - public static final int TYPE = 2; - - @Override - public int ordinal() - { - return TYPE; - } - - @Override - public MessageMetaData_1_0 createMetaData(ByteBuffer buf) - { - return MessageMetaData_1_0.FACTORY.createMetaData(buf); - } - - @Override - public ServerMessage<MessageMetaData_1_0> createMessage(StoredMessage<MessageMetaData_1_0> msg) - { - return new Message_1_0(msg); - } - - public int hashCode() - { - return ordinal(); - } - - public boolean equals(Object o) - { - return o != null && o.getClass() == getClass(); - } - - @Override - public String getType() - { - return AmqpProtocolVersion.v1_0_0.toString(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java deleted file mode 100755 index 8d48d70d9a..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ /dev/null @@ -1,569 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.protocol.v1_0; - -import java.nio.ByteBuffer; -import java.util.*; -import org.apache.qpid.amqp_1_0.codec.ValueHandler; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; -import org.apache.qpid.amqp_1_0.type.messaging.Footer; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.store.StorableMessageMetaData; - -public class MessageMetaData_1_0 implements StorableMessageMetaData -{ - // TODO move to somewhere more useful - public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); - public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory(); - private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0(); - - - private Header _header; - private Properties _properties; - private Map _deliveryAnnotations; - private Map _messageAnnotations; - private Map _appProperties; - private Map _footer; - - private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3); - - private volatile ByteBuffer _encoded; - private MessageHeader_1_0 _messageHeader; - - - public MessageMetaData_1_0(List<Section> sections, SectionEncoder encoder) - { - this(sections, encodeSections(sections, encoder)); - } - - private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) - { - ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size()); - for(Section section : sections) - { - encoder.encodeObject(section); - encodedSections.add(encoder.getEncoding().asByteBuffer()); - encoder.reset(); - } - return encodedSections; - } - - public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder) - { - this(fragments, decoder, new ArrayList<ByteBuffer>(3)); - } - - public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immuatableSections) - { - this(constructSections(fragments, decoder,immuatableSections), immuatableSections); - } - - private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections) - { - _encodedSections = encodedSections; - - Iterator<Section> sectIter = sections.iterator(); - - Section section = sectIter.hasNext() ? sectIter.next() : null; - if(section instanceof Header) - { - _header = (Header) section; - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof DeliveryAnnotations) - { - _deliveryAnnotations = ((DeliveryAnnotations) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof MessageAnnotations) - { - _messageAnnotations = ((MessageAnnotations) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof Properties) - { - _properties = (Properties) section; - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof ApplicationProperties) - { - _appProperties = ((ApplicationProperties) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof Footer) - { - _footer = ((Footer) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - _messageHeader = new MessageHeader_1_0(); - - } - - private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections) - { - List<Section> sections = new ArrayList<Section>(3); - - ByteBuffer src; - if(fragments.length == 1) - { - src = fragments[0].duplicate(); - } - else - { - int size = 0; - for(ByteBuffer buf : fragments) - { - size += buf.remaining(); - } - src = ByteBuffer.allocate(size); - for(ByteBuffer buf : fragments) - { - src.put(buf.duplicate()); - } - src.flip(); - - } - - try - { - int startBarePos = -1; - int lastPos = src.position(); - Section s = decoder.readSection(src); - - - - if(s instanceof Header) - { - sections.add(s); - lastPos = src.position(); - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof DeliveryAnnotations) - { - sections.add(s); - lastPos = src.position(); - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof MessageAnnotations) - { - sections.add(s); - lastPos = src.position(); - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof Properties) - { - sections.add(s); - if(startBarePos == -1) - { - startBarePos = lastPos; - } - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof ApplicationProperties) - { - sections.add(s); - if(startBarePos == -1) - { - startBarePos = lastPos; - } - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof AmqpValue) - { - if(startBarePos == -1) - { - startBarePos = lastPos; - } - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - else if(s instanceof Data) - { - if(startBarePos == -1) - { - startBarePos = lastPos; - } - do - { - s = src.hasRemaining() ? decoder.readSection(src) : null; - } while(s instanceof Data); - } - else if(s instanceof AmqpSequence) - { - if(startBarePos == -1) - { - startBarePos = lastPos; - } - do - { - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - while(s instanceof AmqpSequence); - } - - if(s instanceof Footer) - { - sections.add(s); - } - - - int pos = 0; - for(ByteBuffer buf : fragments) - { -/* - if(pos < startBarePos) - { - if(pos + buf.remaining() > startBarePos) - { - ByteBuffer dup = buf.duplicate(); - dup.position(dup.position()+startBarePos-pos); - dup.slice(); - encodedSections.add(dup); - } - } - else -*/ - { - encodedSections.add(buf.duplicate()); - } - pos += buf.remaining(); - } - - return sections; - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - throw new IllegalArgumentException(e); - } - } - - - public MessageMetaDataType getType() - { - return TYPE; - } - - - public int getStorableSize() - { - int size = 0; - - for(ByteBuffer bin : _encodedSections) - { - size += bin.limit(); - } - - return size; - } - - private ByteBuffer encodeAsBuffer() - { - ByteBuffer buf = ByteBuffer.allocate(getStorableSize()); - - for(ByteBuffer bin : _encodedSections) - { - buf.put(bin.duplicate()); - } - - return buf; - } - - public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) - { - ByteBuffer buf = _encoded; - - if(buf == null) - { - buf = encodeAsBuffer(); - _encoded = buf; - } - - buf = buf.duplicate(); - - buf.position(offsetInMetaData); - - if(dest.remaining() < buf.limit()) - { - buf.limit(dest.remaining()); - } - dest.put(buf); - return buf.limit(); - } - - public int getContentSize() - { - ByteBuffer buf = _encoded; - - if(buf == null) - { - buf = encodeAsBuffer(); - _encoded = buf; - } - return buf.remaining(); - } - - public boolean isPersistent() - { - return _header != null && Boolean.TRUE.equals(_header.getDurable()); - } - - public MessageHeader_1_0 getMessageHeader() - { - return _messageHeader; - } - - - - - private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0> - { - private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - - private MetaDataFactory() - { - _typeRegistry.registerTransportLayer(); - _typeRegistry.registerMessagingLayer(); - _typeRegistry.registerTransactionLayer(); - _typeRegistry.registerSecurityLayer(); - } - - public MessageMetaData_1_0 createMetaData(ByteBuffer buf) - { - ValueHandler valueHandler = new ValueHandler(_typeRegistry); - - ArrayList<Section> sections = new ArrayList<Section>(3); - ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3); - - while(buf.hasRemaining()) - { - try - { - ByteBuffer encodedBuf = buf.duplicate(); - Object parse = valueHandler.parse(buf); - sections.add((Section) parse); - encodedBuf.limit(buf.position()); - encodedSections.add(encodedBuf); - - } - catch (AmqpErrorException e) - { - //TODO - throw new RuntimeException(e); - } - - } - - return new MessageMetaData_1_0(sections,encodedSections); - - } - } - - public class MessageHeader_1_0 implements AMQMessageHeader - { - - public String getCorrelationId() - { - if(_properties == null || _properties.getCorrelationId() == null) - { - return null; - } - else - { - return _properties.getMessageId().toString(); - } - } - - public long getExpiration() - { - return 0; //TODO - } - - public String getMessageId() - { - if(_properties == null || _properties.getCorrelationId() == null) - { - return null; - } - else - { - return _properties.getCorrelationId().toString(); - } - } - - public String getMimeType() - { - - if(_properties == null || _properties.getContentType() == null) - { - return null; - } - else - { - return _properties.getContentType().toString(); - } - } - - public String getEncoding() - { - return null; //TODO - } - - public byte getPriority() - { - if(_header == null || _header.getPriority() == null) - { - return 4; //javax.jms.Message.DEFAULT_PRIORITY; - } - else - { - return _header.getPriority().byteValue(); - } - } - - public long getTimestamp() - { - if(_properties == null || _properties.getCreationTime() == null) - { - return 0L; - } - else - { - return _properties.getCreationTime().getTime(); - } - - } - - public String getType() - { - - if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null) - { - return null; - } - else - { - return _messageAnnotations.get(JMS_TYPE).toString(); - } - } - - public String getReplyTo() - { - if(_properties == null || _properties.getReplyTo() == null) - { - return null; - } - else - { - return _properties.getReplyTo().toString(); - } - } - - public String getReplyToExchange() - { - return null; //TODO - } - - public String getReplyToRoutingKey() - { - return null; //TODO - } - - public String getAppId() - { - //TODO - return null; - } - - public String getUserId() - { - // TODO - return null; - } - - public Object getHeader(final String name) - { - return _appProperties == null ? null : _appProperties.get(name); - } - - public boolean containsHeaders(final Set<String> names) - { - if(_appProperties == null) - { - return false; - } - - for(String key : names) - { - if(!_appProperties.containsKey(key)) - { - return false; - } - } - return true; - } - - @Override - public Collection<String> getHeaderNames() - { - if(_appProperties == null) - { - return Collections.emptySet(); - } - return Collections.unmodifiableCollection(_appProperties.keySet()); - } - - public boolean containsHeader(final String name) - { - return _appProperties != null && _appProperties.containsKey(name); - } - - public String getSubject() - { - return _properties == null ? null : _properties.getSubject(); - } - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java deleted file mode 100644 index 9dc063e3ea..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - - -import java.lang.ref.WeakReference; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.store.StoredMessage; - -public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage -{ - - - private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater = - AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount"); - - private volatile int _referenceCount = 0; - - private final StoredMessage<MessageMetaData_1_0> _storedMessage; - private List<ByteBuffer> _fragments; - private WeakReference<Session_1_0> _session; - private long _arrivalTime; - - - public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage) - { - _storedMessage = storedMessage; - _session = null; - _fragments = restoreFragments(storedMessage); - } - - private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage) - { - ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>(); - final int FRAGMENT_SIZE = 2048; - int offset = 0; - ByteBuffer b; - do - { - - b = storedMessage.getContent(offset,FRAGMENT_SIZE); - if(b.hasRemaining()) - { - fragments.add(b); - offset+= b.remaining(); - } - } - while(b.hasRemaining()); - return fragments; - } - - public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, - final List<ByteBuffer> fragments, - final Session_1_0 session) - { - _storedMessage = storedMessage; - _fragments = fragments; - _session = new WeakReference<Session_1_0>(session); - _arrivalTime = System.currentTimeMillis(); - } - - public String getRoutingKey() - { - Object routingKey = getMessageHeader().getHeader("routing-key"); - if(routingKey != null) - { - return routingKey.toString(); - } - else - { - return getMessageHeader().getSubject(); - } - } - - public AMQShortString getRoutingKeyShortString() - { - return AMQShortString.valueOf(getRoutingKey()); - } - - private MessageMetaData_1_0 getMessageMetaData() - { - return _storedMessage.getMetaData(); - } - - public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader() - { - return getMessageMetaData().getMessageHeader(); - } - - public StoredMessage getStoredMessage() - { - return _storedMessage; - } - - public boolean isPersistent() - { - return getMessageMetaData().isPersistent(); - } - - public boolean isRedelivered() - { - // TODO - return false; - } - - public long getSize() - { - long size = 0l; - if(_fragments != null) - { - for(ByteBuffer buf : _fragments) - { - size += buf.remaining(); - } - } - - return size; - } - - public boolean isImmediate() - { - return false; - } - - public long getExpiration() - { - return getMessageHeader().getExpiration(); - } - - public MessageReference<Message_1_0> newReference() - { - return new Reference(this); - } - - public long getMessageNumber() - { - return _storedMessage.getMessageNumber(); - } - - public long getArrivalTime() - { - return _arrivalTime; - } - - public int getContent(final ByteBuffer buf, final int offset) - { - return _storedMessage.getContent(offset, buf); - } - - public ByteBuffer getContent(int offset, int size) - { - ByteBuffer buf = ByteBuffer.allocate(size); - buf.limit(getContent(buf, offset)); - - return buf; - } - - public List<ByteBuffer> getFragments() - { - return _fragments; - } - - public Session_1_0 getSession() - { - return _session == null ? null : _session.get(); - } - - - public boolean incrementReference() - { - if(_refCountUpdater.incrementAndGet(this) <= 0) - { - _refCountUpdater.decrementAndGet(this); - return false; - } - else - { - return true; - } - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - */ - - public void decrementReference() - { - int count = _refCountUpdater.decrementAndGet(this); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _refCountUpdater.set(this,Integer.MIN_VALUE/2); - - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - if (_storedMessage != null) - { - _storedMessage.remove(); - } - } - else - { - if (count < 0) - { - throw new RuntimeException("Reference count for message id " + getMessageNumber() - + " has gone below 0."); - } - } - } - - public static class Reference extends MessageReference<Message_1_0> - { - public Reference(Message_1_0 message) - { - super(message); - } - - protected void onReference(Message_1_0 message) - { - message.incrementReference(); - } - - protected void onRelease(Message_1_0 message) - { - message.decrementReference(); - } - - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java deleted file mode 100644 index c06af603de..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.transport.network.NetworkConnection; - -public class ProtocolEngineCreator_1_0_0 implements ProtocolEngineCreator -{ - private static final byte[] AMQP_1_0_0_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }; - - public ProtocolEngineCreator_1_0_0() - { - } - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v1_0_0; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_1_0_0_HEADER; - } - - public ServerProtocolEngine newProtocolEngine(Broker broker, - NetworkConnection network, - Port port, - Transport transport, - long id) - { - return new ProtocolEngine_1_0_0(network, broker, id, port, transport); - } - - private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_1_0_0(); - - public static ProtocolEngineCreator getInstance() - { - return INSTANCE; - } - - @Override - public String getType() - { - return getVersion().toString() + "_NO_SASL"; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java deleted file mode 100644 index d3936782da..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.transport.network.NetworkConnection; - -public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator -{ - private static final byte[] AMQP_SASL_1_0_0_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 3, - (byte) 1, - (byte) 0, - (byte) 0 - }; - - public ProtocolEngineCreator_1_0_0_SASL() - { - } - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v1_0_0; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_SASL_1_0_0_HEADER; - } - - public ServerProtocolEngine newProtocolEngine(Broker broker, - NetworkConnection network, - Port port, - Transport transport, - long id) - { - return new ProtocolEngine_1_0_0_SASL(network, broker, id, port, transport); - } - - private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_1_0_0_SASL(); - - public static ProtocolEngineCreator getInstance() - { - return INSTANCE; - } - - @Override - public String getType() - { - return getVersion().toString(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java deleted file mode 100755 index 1bddda2f38..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; -import org.apache.qpid.amqp_1_0.codec.FrameWriter; -import org.apache.qpid.amqp_1_0.framing.AMQFrame; -import org.apache.qpid.amqp_1_0.framing.FrameHandler; -import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; -import org.apache.qpid.amqp_1_0.transport.SaslServerProvider; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v1_0.Connection_1_0; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; - -public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler -{ - static final AtomicLong _connectionIdSource = new AtomicLong(0L); - private final Port _port; - private final Transport _transport; - - //private NetworkConnection _networkDriver; - private long _readBytes; - private long _writtenBytes; - private long _lastReadTime; - private long _lastWriteTime; - private final Broker _broker; - private long _createTime = System.currentTimeMillis(); - private ConnectionEndpoint _conn; - private final long _connectionId; - - private static final ByteBuffer HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }); - - private FrameWriter _frameWriter; - private FrameHandler _frameHandler; - private Object _sendLock = new Object(); - private byte _major; - private byte _minor; - private byte _revision; - private NetworkConnection _network; - private Sender<ByteBuffer> _sender; - - - static enum State { - A, - M, - Q, - P, - PROTOCOL, - MAJOR, - MINOR, - REVISION, - FRAME - } - - private State _state = State.A; - - - - public ProtocolEngine_1_0_0(final NetworkConnection networkDriver, - final Broker broker, - long id, - Port port, - Transport transport) - { - _broker = broker; - _port = port; - _transport = transport; - _connectionId = id; - if(networkDriver != null) - { - setNetworkConnection(networkDriver, networkDriver.getSender()); - } - } - - - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - public long getReadBytes() - { - return _readBytes; - } - - public long getWrittenBytes() - { - return _writtenBytes; - } - - public void writerIdle() - { - //Todo - } - - public void readerIdle() - { - //Todo - } - - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) - { - _network = network; - _sender = sender; - - Container container = new Container(_broker.getId().toString()); - - VirtualHost virtualHost = _broker.getVirtualHostRegistry().getVirtualHost((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); - - _conn = new ConnectionEndpoint(container, asSaslServerProvider(_broker.getSubjectCreator( - getLocalAddress()))); - - Map<Symbol,Object> serverProperties = new LinkedHashMap<Symbol, Object>(); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), QpidProperties.getProductName()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), QpidProperties.getReleaseVersion()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), QpidProperties.getBuildVersion()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), _broker.getName()); - - _conn.setProperties(serverProperties); - - _conn.setRemoteAddress(_network.getRemoteAddress()); - _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport)); - _conn.setFrameOutputHandler(this); - - _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); - _frameHandler = new FrameHandler(_conn); - - _sender.send(HEADER.duplicate()); - _sender.flush(); - } - - private SaslServerProvider asSaslServerProvider(final SubjectCreator subjectCreator) - { - return new SaslServerProvider() - { - @Override - public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException - { - return subjectCreator.createSaslServer(mechanism, fqdn, null); - } - - @Override - public Principal getAuthenticatedPrincipal(SaslServer server) - { - return new UsernamePrincipal(server.getAuthorizationID()); - } - }; - } - - public String getAddress() - { - return getRemoteAddress().toString(); - } - - public boolean isDurable() - { - return false; - } - - public synchronized void received(ByteBuffer msg) - { - _lastReadTime = System.currentTimeMillis(); - if(RAW_LOGGER.isLoggable(Level.FINE)) - { - ByteBuffer dup = msg.duplicate(); - byte[] data = new byte[dup.remaining()]; - dup.get(data); - Binary bin = new Binary(data); - RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString()); - } - _readBytes += msg.remaining(); - switch(_state) - { - case A: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - break; - } - case M: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.M; - break; - } - - case Q: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.Q; - break; - } - case P: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.P; - break; - } - case PROTOCOL: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.PROTOCOL; - break; - } - case MAJOR: - if(msg.hasRemaining()) - { - _major = msg.get(); - } - else - { - _state = State.MAJOR; - break; - } - case MINOR: - if(msg.hasRemaining()) - { - _minor = msg.get(); - } - else - { - _state = State.MINOR; - break; - } - case REVISION: - if(msg.hasRemaining()) - { - _revision = msg.get(); - - _state = State.FRAME; - } - else - { - _state = State.REVISION; - break; - } - case FRAME: - if(msg.hasRemaining()) - { - _frameHandler.parse(msg); - } - } - - } - - public void exception(Throwable t) - { - t.printStackTrace(); - } - - public void closed() - { - _conn.inputClosed(); - if(_conn != null && _conn.getConnectionEventListener() != null) - { - ((Connection_1_0)_conn.getConnectionEventListener()).closed(); - } - - } - - public long getCreateTime() - { - return _createTime; - } - - - public boolean canSend() - { - return true; - } - - public void send(final AMQFrame amqFrame) - { - send(amqFrame, null); - } - - private final Logger FRAME_LOGGER = Logger.getLogger("FRM"); - private final Logger RAW_LOGGER = Logger.getLogger("RAW"); - - - public void send(final AMQFrame amqFrame, ByteBuffer buf) - { - synchronized(_sendLock) - { - - _lastWriteTime = System.currentTimeMillis(); - if(FRAME_LOGGER.isLoggable(Level.FINE)) - { - FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); - } - - - _frameWriter.setValue(amqFrame); - - - - ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); - - int size = _frameWriter.writeToBuffer(dup); - if(size > _conn.getMaxFrameSize()) - { - throw new OversizeFrameException(amqFrame,size); - } - - dup.flip(); - _writtenBytes += dup.limit(); - - if(RAW_LOGGER.isLoggable(Level.FINE)) - { - ByteBuffer dup2 = dup.duplicate(); - byte[] data = new byte[dup2.remaining()]; - dup2.get(data); - Binary bin = new Binary(data); - RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); - } - - - _sender.send(dup); - _sender.flush(); - - } - } - - public void send(short channel, FrameBody body) - { - AMQFrame frame = AMQFrame.createAMQFrame(channel, body); - send(frame); - - } - - public void close() - { - //TODO - } - - public long getConnectionId() - { - return _connectionId; - } - - public long getLastReadTime() - { - return _lastReadTime; - } - - public long getLastWriteTime() - { - return _lastWriteTime; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java deleted file mode 100644 index d614f44981..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ /dev/null @@ -1,462 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.io.PrintWriter; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; -import org.apache.qpid.amqp_1_0.codec.FrameWriter; -import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; -import org.apache.qpid.amqp_1_0.framing.AMQFrame; -import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; -import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler; -import org.apache.qpid.amqp_1_0.transport.SaslServerProvider; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v1_0.Connection_1_0; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; - -public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler -{ - private final Port _port; - private final Transport _transport; - private long _readBytes; - private long _writtenBytes; - - private long _lastReadTime; - private long _lastWriteTime; - private final Broker _broker; - private long _createTime = System.currentTimeMillis(); - private ConnectionEndpoint _conn; - private long _connectionId; - - private static final ByteBuffer HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 3, - (byte) 1, - (byte) 0, - (byte) 0 - }); - - private static final ByteBuffer PROTOCOL_HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }); - - - private FrameWriter _frameWriter; - private ProtocolHandler _frameHandler; - private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); - private Object _sendLock = new Object(); - private byte _major; - private byte _minor; - private byte _revision; - private PrintWriter _out; - private NetworkConnection _network; - private Sender<ByteBuffer> _sender; - - - static enum State { - A, - M, - Q, - P, - PROTOCOL, - MAJOR, - MINOR, - REVISION, - FRAME - } - - private State _state = State.A; - - - public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker, - long id, Port port, Transport transport) - { - _connectionId = id; - _broker = broker; - _port = port; - _transport = transport; - if(networkDriver != null) - { - setNetworkConnection(networkDriver, networkDriver.getSender()); - } - } - - - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - public long getReadBytes() - { - return _readBytes; - } - - public long getWrittenBytes() - { - return _writtenBytes; - } - - public void writerIdle() - { - //Todo - } - - public void readerIdle() - { - //Todo - } - - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) - { - _network = network; - _sender = sender; - - Container container = new Container(_broker.getId().toString()); - - VirtualHost virtualHost = _broker.getVirtualHostRegistry().getVirtualHost((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); - SubjectCreator subjectCreator = _broker.getSubjectCreator(getLocalAddress()); - _conn = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator)); - - Map<Symbol,Object> serverProperties = new LinkedHashMap<Symbol, Object>(); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), QpidProperties.getProductName()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), QpidProperties.getReleaseVersion()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), QpidProperties.getBuildVersion()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), _broker.getName()); - - _conn.setProperties(serverProperties); - - _conn.setRemoteAddress(getRemoteAddress()); - _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport)); - _conn.setFrameOutputHandler(this); - _conn.setSaslFrameOutput(this); - - _conn.setOnSaslComplete(new Runnable() - { - public void run() - { - if(_conn.isAuthenticated()) - { - _sender.send(PROTOCOL_HEADER.duplicate()); - _sender.flush(); - } - else - { - _network.close(); - } - } - }); - _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); - _frameHandler = new SASLFrameHandler(_conn); - - _sender.send(HEADER.duplicate()); - _sender.flush(); - - _conn.initiateSASL(subjectCreator.getMechanisms().split(" ")); - - - } - - private SaslServerProvider asSaslServerProvider(final SubjectCreator subjectCreator) - { - return new SaslServerProvider() - { - @Override - public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException - { - return subjectCreator.createSaslServer(mechanism, fqdn, _network.getPeerPrincipal()); - } - - @Override - public Principal getAuthenticatedPrincipal(SaslServer server) - { - return new UsernamePrincipal(server.getAuthorizationID()); - } - }; - } - - public String getAddress() - { - return getRemoteAddress().toString(); - } - - public boolean isDurable() - { - return false; - } - - private final Logger RAW_LOGGER = Logger.getLogger("RAW"); - - - public synchronized void received(ByteBuffer msg) - { - _lastReadTime = System.currentTimeMillis(); - if(RAW_LOGGER.isLoggable(Level.FINE)) - { - ByteBuffer dup = msg.duplicate(); - byte[] data = new byte[dup.remaining()]; - dup.get(data); - Binary bin = new Binary(data); - RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString()); - } - _readBytes += msg.remaining(); - switch(_state) - { - case A: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - break; - } - case M: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.M; - break; - } - - case Q: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.Q; - break; - } - case P: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.P; - break; - } - case PROTOCOL: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.PROTOCOL; - break; - } - case MAJOR: - if(msg.hasRemaining()) - { - _major = msg.get(); - } - else - { - _state = State.MAJOR; - break; - } - case MINOR: - if(msg.hasRemaining()) - { - _minor = msg.get(); - } - else - { - _state = State.MINOR; - break; - } - case REVISION: - if(msg.hasRemaining()) - { - _revision = msg.get(); - - _state = State.FRAME; - } - else - { - _state = State.REVISION; - break; - } - case FRAME: - if(msg.hasRemaining()) - { - _frameHandler = _frameHandler.parse(msg); - } - } - - } - - public void exception(Throwable t) - { - t.printStackTrace(); - } - - public void closed() - { - // todo - _conn.inputClosed(); - if (_conn != null && _conn.getConnectionEventListener() != null) - { - ((Connection_1_0) _conn.getConnectionEventListener()).closed(); - } - - } - - public long getCreateTime() - { - return _createTime; - } - - - public boolean canSend() - { - return true; - } - - public void send(final AMQFrame amqFrame) - { - send(amqFrame, null); - } - - private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); - - - public void send(final AMQFrame amqFrame, ByteBuffer buf) - { - - synchronized (_sendLock) - { - _lastWriteTime = System.currentTimeMillis(); - if (FRAME_LOGGER.isLoggable(Level.FINE)) - { - FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); - } - - _frameWriter.setValue(amqFrame); - - ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); - - int size = _frameWriter.writeToBuffer(dup); - if (size > _conn.getMaxFrameSize()) - { - throw new OversizeFrameException(amqFrame, size); - } - - dup.flip(); - _writtenBytes += dup.limit(); - - if (RAW_LOGGER.isLoggable(Level.FINE)) - { - ByteBuffer dup2 = dup.duplicate(); - byte[] data = new byte[dup2.remaining()]; - dup2.get(data); - Binary bin = new Binary(data); - RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); - } - - _sender.send(dup); - _sender.flush(); - - - } - } - - public void send(short channel, FrameBody body) - { - AMQFrame frame = AMQFrame.createAMQFrame(channel, body); - send(frame); - - } - - public void close() - { - _sender.close(); - } - - public void setLogOutput(final PrintWriter out) - { - _out = out; - } - - public long getConnectionId() - { - return _connectionId; - } - - public long getLastReadTime() - { - return _lastReadTime; - } - - public long getLastWriteTime() - { - return _lastWriteTime; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java deleted file mode 100644 index af3f0b7872..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.AMQQueue; - -import org.apache.qpid.server.txn.ServerTransaction; - -import java.util.Arrays; - -public class QueueDestination implements SendingDestination, ReceivingDestination -{ - private static final Accepted ACCEPTED = new Accepted(); - private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED }; - - - private AMQQueue _queue; - - public QueueDestination(AMQQueue queue) - { - _queue = queue; - } - - public Outcome[] getOutcomes() - { - return OUTCOMES; - } - - public Outcome send(final Message_1_0 message, ServerTransaction txn) - { - - try - { - txn.enqueue(_queue,message, new ServerTransaction.Action() - { - - - public void postCommit() - { - try - { - - _queue.enqueue(message); - } - catch (Exception e) - { - // TODO - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - // NO-OP - } - }); - } - catch(Exception e) - { - e.printStackTrace(); - throw new RuntimeException(e); - } - return ACCEPTED; - } - - public int getCredit() - { - // TODO - fix - return 100; - } - - public AMQQueue getQueue() - { - return _queue; - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java deleted file mode 100644 index 4ae0596e25..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.amqp_1_0.type.Outcome; - -import org.apache.qpid.server.txn.ServerTransaction; - -public interface ReceivingDestination extends Destination -{ - - Outcome[] getOutcomes(); - - Outcome send(Message_1_0 message, ServerTransaction txn); - - int getCredit(); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java deleted file mode 100644 index 46b9682c74..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Source; -import org.apache.qpid.amqp_1_0.type.Target; - -public class ReceivingLinkAttachment -{ - private final Session_1_0 _session; - private final ReceivingLinkEndpoint _endpoint; - - public ReceivingLinkAttachment(final Session_1_0 session, final ReceivingLinkEndpoint endpoint) - { - _session = session; - _endpoint = endpoint; - } - - public Session_1_0 getSession() - { - return _session; - } - - public ReceivingLinkEndpoint getEndpoint() - { - return _endpoint; - } - - public Source getSource() - { - return getEndpoint().getSource(); - } - - public void setDeliveryStateHandler(final DeliveryStateHandler handler) - { - getEndpoint().setDeliveryStateHandler(handler); - } - - public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled) - { - getEndpoint().updateDisposition(deliveryTag, state, settled); - } - - public Target getTarget() - { - return getEndpoint().getTarget(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java deleted file mode 100644 index e971672767..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.Detach; -import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, DeliveryStateHandler -{ - private VirtualHost _vhost; - - private ReceivingDestination _destination; - private SectionDecoderImpl _sectionDecoder; - private volatile ReceivingLinkAttachment _attachment; - - - private ArrayList<Transfer> _incompleteMessage; - private TerminusDurability _durability; - - private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>()); - private boolean _resumedMessage; - private Binary _messageDeliveryTag; - private ReceiverSettleMode _receivingSettlementMode; - - - public ReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, VirtualHost vhost, - ReceivingDestination destination) - { - _vhost = vhost; - _destination = destination; - _attachment = receivingLinkAttachment; - receivingLinkAttachment.setDeliveryStateHandler(this); - - _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable(); - - _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry()); - - - } - - public void messageTransfer(Transfer xfr) - { - // TODO - cope with fragmented messages - - List<ByteBuffer> fragments = null; - - - - if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null) - { - _incompleteMessage = new ArrayList<Transfer>(); - _incompleteMessage.add(xfr); - _resumedMessage = Boolean.TRUE.equals(xfr.getResume()); - _messageDeliveryTag = xfr.getDeliveryTag(); - return; - } - else if(_incompleteMessage != null) - { - _incompleteMessage.add(xfr); - - if(Boolean.TRUE.equals(xfr.getMore())) - { - return; - } - - fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size()); - for(Transfer t : _incompleteMessage) - { - fragments.add(t.getPayload()); - } - _incompleteMessage=null; - - } - else - { - _resumedMessage = Boolean.TRUE.equals(xfr.getResume()); - _messageDeliveryTag = xfr.getDeliveryTag(); - fragments = Collections.singletonList(xfr.getPayload()); - } - - if(_resumedMessage) - { - if(_unsettledMap.containsKey(_messageDeliveryTag)) - { - Outcome outcome = _unsettledMap.get(_messageDeliveryTag); - boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode()); - getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled); - if(settled) - { - _unsettledMap.remove(_messageDeliveryTag); - } - } - else - { - System.err.println("UNEXPECTED!!"); - System.err.println("Delivery Tag: " + _messageDeliveryTag); - System.err.println("_unsettledMap: " + _unsettledMap); - - } - } - else - { - MessageMetaData_1_0 mmd = null; - List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3); - mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]), - _sectionDecoder, - immutableSections); - - StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd); - - boolean skipping = true; - int offset = 0; - - for(ByteBuffer bareMessageBuf : immutableSections) - { - storedMessage.addContent(offset, bareMessageBuf.duplicate()); - offset += bareMessageBuf.remaining(); - } - - storedMessage.flushToStore(); - - Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession()); - - - Binary transactionId = null; - org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState(); - if(xfrState != null) - { - if(xfrState instanceof TransactionalState) - { - transactionId = ((TransactionalState)xfrState).getTxnId(); - } - } - - ServerTransaction transaction = null; - if(transactionId != null) - { - transaction = getSession().getTransaction(transactionId); - } - else - { - Session_1_0 session = getSession(); - transaction = session != null ? session.getTransaction(null) : new AutoCommitTransaction(_vhost.getMessageStore()); - } - - Outcome outcome = _destination.send(message, transaction); - - DeliveryState resultantState; - - if(transactionId == null) - { - resultantState = (DeliveryState) outcome; - } - else - { - TransactionalState transactionalState = new TransactionalState(); - transactionalState.setOutcome(outcome); - transactionalState.setTxnId(transactionId); - resultantState = transactionalState; - - } - - - boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode()); - - final Binary deliveryTag = xfr.getDeliveryTag(); - - if(!settled) - { - _unsettledMap.put(deliveryTag, outcome); - } - - getEndpoint().updateDisposition(deliveryTag, resultantState, settled); - - getSession().getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - - if(!(transaction instanceof AutoCommitTransaction)) - { - ServerTransaction.Action a; - transaction.addPostTransactionAction(new ServerTransaction.Action() - { - public void postCommit() - { - getEndpoint().updateDisposition(deliveryTag, null, true); - } - - public void onRollback() - { - getEndpoint().updateDisposition(deliveryTag, null, true); - } - }); - } - } - } - - private ReceiverSettleMode getReceivingSettlementMode() - { - return _receivingSettlementMode; - } - - public void remoteDetached(LinkEndpoint endpoint, Detach detach) - { - //TODO - // if not durable or close - if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) || - (detach != null && Boolean.TRUE.equals(detach.getClosed()))) - { - endpoint.close(); - } - else if(detach == null || detach.getError() != null) - { - _attachment = null; - } - } - - public void start() - { - getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit())); - getEndpoint().setCreditWindow(); - } - - public ReceivingLinkEndpoint getEndpoint() - { - return _attachment.getEndpoint(); - } - - - public Session_1_0 getSession() - { - ReceivingLinkAttachment attachment = _attachment; - return attachment == null ? null : attachment.getSession(); - } - - public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) - { - if(Boolean.TRUE.equals(settled)) - { - _unsettledMap.remove(deliveryTag); - } - } - - public void setLinkAttachment(ReceivingLinkAttachment linkAttachment) - { - _attachment = linkAttachment; - _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode(); - ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint(); - Map initialUnsettledMap = endpoint.getInitialUnsettledMap(); - - Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap); - for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet()) - { - Binary deliveryTag = entry.getKey(); - if(!initialUnsettledMap.containsKey(deliveryTag)) - { - _unsettledMap.remove(deliveryTag); - } - } - - } - - public Map getUnsettledOutcomeMap() - { - return _unsettledMap; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java deleted file mode 100644 index 6d601c9dda..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - - -public interface SendingDestination extends Destination -{ - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java deleted file mode 100644 index 09a2ddea3a..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Source; - -public class SendingLinkAttachment -{ - private final Session_1_0 _session; - private final SendingLinkEndpoint _endpoint; - - public SendingLinkAttachment(final Session_1_0 session, final SendingLinkEndpoint endpoint) - { - _session = session; - _endpoint = endpoint; - } - - public Session_1_0 getSession() - { - return _session; - } - - public SendingLinkEndpoint getEndpoint() - { - return _endpoint; - } - - public Source getSource() - { - return getEndpoint().getSource(); - } - - public void setDeliveryStateHandler(final DeliveryStateHandler handler) - { - getEndpoint().setDeliveryStateHandler(handler); - } - - public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled) - { - getEndpoint().updateDisposition(deliveryTag, state, settled); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java deleted file mode 100644 index ca67b6f79b..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ /dev/null @@ -1,685 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; -import org.apache.qpid.AMQSecurityException; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Modified; -import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.transport.AmqpError; -import org.apache.qpid.amqp_1_0.type.transport.Detach; -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.filter.SelectorParsingException; -import org.apache.qpid.filter.selector.ParseException; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler -{ - private VirtualHost _vhost; - private SendingDestination _destination; - - private Subscription_1_0 _subscription; - private boolean _draining; - private final Map<Binary, QueueEntry> _unsettledMap = - new HashMap<Binary, QueueEntry>(); - - private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap = - new ConcurrentHashMap<Binary, UnsettledAction>(); - private volatile SendingLinkAttachment _linkAttachment; - private TerminusDurability _durability; - private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>(); - private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>(); - private Runnable _closeAction; - - public SendingLink_1_0(final SendingLinkAttachment linkAttachment, - final VirtualHost vhost, - final SendingDestination destination) - throws AmqpErrorException - { - _vhost = vhost; - _destination = destination; - _linkAttachment = linkAttachment; - final Source source = (Source) linkAttachment.getSource(); - _durability = source.getDurable(); - linkAttachment.setDeliveryStateHandler(this); - QueueDestination qd = null; - AMQQueue queue = null; - - - - boolean noLocal = false; - JMSSelectorFilter messageFilter = null; - - if(destination instanceof QueueDestination) - { - queue = ((QueueDestination) _destination).getQueue(); - if(queue.getArguments() != null && queue.getArguments().containsKey("topic")) - { - source.setDistributionMode(StdDistMode.COPY); - } - qd = (QueueDestination) destination; - - Map<Symbol,Filter> filters = source.getFilter(); - - Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>(); - - if(filters != null) - { - for(Map.Entry<Symbol,Filter> entry : filters.entrySet()) - { - if(entry.getValue() instanceof NoLocalFilter) - { - actualFilters.put(entry.getKey(), entry.getValue()); - noLocal = true; - } - else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) - { - - org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue(); - try - { - messageFilter = new JMSSelectorFilter(selectorFilter.getValue()); - - actualFilters.put(entry.getKey(), entry.getValue()); - } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - - - } - } - } - source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - - _subscription = new Subscription_1_0(this, qd); - } - else if(destination instanceof ExchangeDestination) - { - try - { - - ExchangeDestination exchangeDestination = (ExchangeDestination) destination; - - boolean isDurable = exchangeDestination.getDurability() == TerminusDurability.CONFIGURATION - || exchangeDestination.getDurability() == TerminusDurability.UNSETTLED_STATE; - String name; - if(isDurable) - { - String remoteContainerId = getEndpoint().getSession().getConnection().getRemoteContainerId(); - remoteContainerId = remoteContainerId.replace("_","__").replace(".", "_:"); - - String endpointName = linkAttachment.getEndpoint().getName(); - endpointName = endpointName - .replace("_", "__") - .replace(".", "_:") - .replace("(", "_O") - .replace(")", "_C") - .replace("<", "_L") - .replace(">", "_R"); - name = "qpid_/" + remoteContainerId + "_/" + endpointName; - } - else - { - name = UUID.randomUUID().toString(); - } - - queue = _vhost.getQueueRegistry().getQueue(name); - Exchange exchange = exchangeDestination.getExchange(); - - if(queue == null) - { - queue = AMQQueueFactory.createAMQQueueImpl( - UUIDGenerator.generateQueueUUID(name, _vhost.getName()), - name, - isDurable, - null, - true, - true, - _vhost, - Collections.EMPTY_MAP); - } - else - { - List<Binding> bindings = queue.getBindings(); - List<Binding> bindingsToRemove = new ArrayList<Binding>(); - for(Binding existingBinding : bindings) - { - if(existingBinding.getExchange() != _vhost.getDefaultExchange() - && existingBinding.getExchange() != exchange) - { - bindingsToRemove.add(existingBinding); - } - } - for(Binding existingBinding : bindingsToRemove) - { - existingBinding.getExchange().removeBinding(existingBinding); - } - } - - - String binding = ""; - - Map<Symbol,Filter> filters = source.getFilter(); - Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>(); - boolean hasBindingFilter = false; - if(filters != null && !filters.isEmpty()) - { - - for(Map.Entry<Symbol,Filter> entry : filters.entrySet()) - { - if(!hasBindingFilter - && entry.getValue() instanceof ExactSubjectFilter - && exchange.getType() == DirectExchange.TYPE) - { - ExactSubjectFilter filter = (ExactSubjectFilter) filters.values().iterator().next(); - source.setFilter(filters); - binding = filter.getValue(); - actualFilters.put(entry.getKey(), entry.getValue()); - hasBindingFilter = true; - } - else if(!hasBindingFilter - && entry.getValue() instanceof MatchingSubjectFilter - && exchange.getType() == TopicExchange.TYPE) - { - MatchingSubjectFilter filter = (MatchingSubjectFilter) filters.values().iterator().next(); - source.setFilter(filters); - binding = filter.getValue(); - actualFilters.put(entry.getKey(), entry.getValue()); - hasBindingFilter = true; - } - else if(entry.getValue() instanceof NoLocalFilter) - { - actualFilters.put(entry.getKey(), entry.getValue()); - noLocal = true; - } - else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) - { - - org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue(); - try - { - messageFilter = new JMSSelectorFilter(selectorFilter.getValue()); - - actualFilters.put(entry.getKey(), entry.getValue()); - } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - - - } - } - } - source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - - exchange.addBinding(binding,queue,null); - source.setDistributionMode(StdDistMode.COPY); - - if(!isDurable) - { - final String queueName = name; - final AMQQueue tempQueue = queue; - - final Connection_1_0.Task deleteQueueTask = - new Connection_1_0.Task() - { - public void doTask(Connection_1_0 session) - { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) - { - try - { - tempQueue.delete(); - } - catch (AMQException e) - { - e.printStackTrace(); //TODO. - } - } - } - }; - - getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) - { - getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); - } - - - }); - } - - qd = new QueueDestination(queue); - } - catch (AMQSecurityException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (AMQInternalException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (AMQException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - _subscription = new Subscription_1_0(this, qd, true); - - } - - if(_subscription != null) - { - _subscription.setNoLocal(noLocal); - if(messageFilter!=null) - { - _subscription.setFilters(new SimpleFilterManager(messageFilter)); - } - - try - { - - queue.registerSubscription(_subscription, false); - } - catch (AMQException e) - { - e.printStackTrace(); //TODO - } - } - - } - - public void resume(SendingLinkAttachment linkAttachment) - { - _linkAttachment = linkAttachment; - - } - - public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) - { - //TODO - // if not durable or close - if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) - { - AMQQueue queue = _subscription.getQueue(); - - try - { - - queue.unregisterSubscription(_subscription); - - } - catch (AMQException e) - { - e.printStackTrace(); //TODO - } - - Modified state = new Modified(); - state.setDeliveryFailed(true); - - for(UnsettledAction action : _unsettledActionMap.values()) - { - - action.process(state,Boolean.TRUE); - } - _unsettledActionMap.clear(); - - endpoint.close(); - - if(_destination instanceof ExchangeDestination - && (_durability == TerminusDurability.CONFIGURATION - || _durability == TerminusDurability.UNSETTLED_STATE)) - { - try - { - queue.delete(); - } - catch(AMQException e) - { - e.printStackTrace(); // TODO - Implement - } - } - - if(_closeAction != null) - { - _closeAction.run(); - } - } - else if(detach == null || detach.getError() != null) - { - _linkAttachment = null; - _subscription.flowStateChanged(); - } - else - { - endpoint.detach(); - } - } - - public void start() - { - //TODO - } - - public SendingLinkEndpoint getEndpoint() - { - return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ; - } - - public Session_1_0 getSession() - { - return _linkAttachment == null ? null : _linkAttachment.getSession(); - } - - public void flowStateChanged() - { - if(Boolean.TRUE.equals(getEndpoint().getDrain()) - && hasCredit()) - { - _draining = true; - } - - while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend()) - { - Accepted accepted = new Accepted(); - synchronized(getLock()) - { - - Transfer xfr = new Transfer(); - Binary dt = _resumeAcceptedTransfers.remove(0); - xfr.setDeliveryTag(dt); - xfr.setState(accepted); - xfr.setResume(Boolean.TRUE); - getEndpoint().transfer(xfr); - } - - } - if(_resumeAcceptedTransfers.isEmpty()) - { - _subscription.flowStateChanged(); - } - - } - - boolean hasCredit() - { - return getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0; - } - - public boolean isDraining() - { - return false; //TODO - } - - public boolean drained() - { - if(getEndpoint() != null) - { - synchronized(getEndpoint().getLock()) - { - if(_draining) - { - //TODO - getEndpoint().drained(); - _draining = false; - return true; - } - else - { - return false; - } - } - } - else - { - return false; - } - } - - public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry) - { - _unsettledActionMap.put(tag,unsettledAction); - if(getTransactionId() == null) - { - _unsettledMap.put(tag, queueEntry); - } - } - - public void removeUnsettled(Binary tag) - { - _unsettledActionMap.remove(tag); - } - - public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) - { - UnsettledAction action = _unsettledActionMap.get(deliveryTag); - boolean localSettle = false; - if(action != null) - { - localSettle = action.process(state, settled); - if(localSettle && !Boolean.TRUE.equals(settled)) - { - _linkAttachment.updateDisposition(deliveryTag, state, true); - } - } - if(Boolean.TRUE.equals(settled) || localSettle) - { - _unsettledActionMap.remove(deliveryTag); - _unsettledMap.remove(deliveryTag); - } - } - - ServerTransaction getTransaction(Binary transactionId) - { - return _linkAttachment.getSession().getTransaction(transactionId); - } - - public Binary getTransactionId() - { - SendingLinkEndpoint endpoint = getEndpoint(); - return endpoint == null ? null : endpoint.getTransactionId(); - } - - public synchronized Object getLock() - { - return _linkAttachment == null ? this : getEndpoint().getLock(); - } - - public boolean isDetached() - { - return _linkAttachment == null || getEndpoint().isDetached(); - } - - public boolean isAttached() - { - return _linkAttachment != null && getEndpoint().isAttached(); - } - - public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment) - { - - if(_subscription.isActive()) - { - _subscription.suspend(); - } - - _linkAttachment = linkAttachment; - - SendingLinkEndpoint endpoint = linkAttachment.getEndpoint(); - endpoint.setDeliveryStateHandler(this); - Map initialUnsettledMap = endpoint.getInitialUnsettledMap(); - Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap); - _resumeAcceptedTransfers.clear(); - _resumeFullTransfers.clear(); - - for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet()) - { - Binary deliveryTag = entry.getKey(); - final QueueEntry queueEntry = entry.getValue(); - if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag)) - { - queueEntry.setRedelivered(); - queueEntry.release(); - _unsettledMap.remove(deliveryTag); - } - else if(initialUnsettledMap != null && (initialUnsettledMap.get(deliveryTag) instanceof Outcome)) - { - Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag); - - if(outcome instanceof Accepted) - { - AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); - if(_subscription.acquires()) - { - txn.dequeue(Collections.singleton(queueEntry), - new ServerTransaction.Action() - { - public void postCommit() - { - queueEntry.discard(); - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); - } - } - else if(outcome instanceof Released) - { - AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); - if(_subscription.acquires()) - { - txn.dequeue(Collections.singleton(queueEntry), - new ServerTransaction.Action() - { - public void postCommit() - { - queueEntry.release(); - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); - } - } - //_unsettledMap.remove(deliveryTag); - initialUnsettledMap.remove(deliveryTag); - _resumeAcceptedTransfers.add(deliveryTag); - } - else - { - _resumeFullTransfers.add(queueEntry); - // exists in receivers map, but not yet got an outcome ... should resend with resume = true - } - // TODO - else - } - - - } - - public Map getUnsettledOutcomeMap() - { - Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap); - - for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet()) - { - entry.setValue(null); - } - - return unsettled; - } - - public void setCloseAction(Runnable action) - { - _closeAction = action; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java deleted file mode 100644 index ed75a8c165..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ /dev/null @@ -1,619 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.text.MessageFormat; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SessionEventListener; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.transaction.Coordinator; -import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; -import org.apache.qpid.amqp_1_0.type.transport.*; - -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.LinkRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.*; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; - -public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject -{ - private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); - private VirtualHost _vhost; - private AutoCommitTransaction _transaction; - - private final LinkedHashMap<Integer, ServerTransaction> _openTransactions = - new LinkedHashMap<Integer, ServerTransaction>(); - private final Connection_1_0 _connection; - private UUID _id = UUID.randomUUID(); - - - public Session_1_0(VirtualHost vhost, final Connection_1_0 connection) - { - _vhost = vhost; - _transaction = new AutoCommitTransaction(vhost.getMessageStore()); - _connection = connection; - - } - - public void remoteLinkCreation(final LinkEndpoint endpoint) - { - - - Destination destination; - Link_1_0 link = null; - Error error = null; - - final - LinkRegistry - linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId()); - - - if(endpoint.getRole() == Role.SENDER) - { - - SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); - - if(previousLink == null) - { - - Target target = (Target) endpoint.getTarget(); - Source source = (Source) endpoint.getSource(); - - - if(source != null) - { - if(Boolean.TRUE.equals(source.getDynamic())) - { - AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties()); - source.setAddress(tempQueue.getName()); - } - String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); - if(queue != null) - { - - destination = new QueueDestination(queue); - - - - } - else - { - Exchange exchg = _vhost.getExchange(addr); - if(exchg != null) - { - destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy()); - } - else - { - - endpoint.setSource(null); - destination = null; - } - } - - } - else - { - destination = null; - } - - if(destination != null) - { - final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; - try - { - final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint), - _vhost, - (SendingDestination) destination - ); - sendingLinkEndpoint.setLinkEventListener(sendingLink); - link = sendingLink; - if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) - { - linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); - } - } - catch(AmqpErrorException e) - { - e.printStackTrace(); - destination = null; - sendingLinkEndpoint.setSource(null); - error = e.getError(); - } - } - } - else - { - Source newSource = (Source) endpoint.getSource(); - - Source oldSource = (Source) previousLink.getEndpoint().getSource(); - final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable(); - if(newSourceDurable != null) - { - oldSource.setDurable(newSourceDurable); - if(newSourceDurable.equals(TerminusDurability.NONE)) - { - linkRegistry.unregisterSendingLink(endpoint.getName()); - } - } - endpoint.setSource(oldSource); - SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; - previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); - sendingLinkEndpoint.setLinkEventListener(previousLink); - link = previousLink; - endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); - } - } - else - { - if(endpoint.getTarget() instanceof Coordinator) - { - Coordinator coordinator = (Coordinator) endpoint.getTarget(); - TxnCapability[] capabilities = coordinator.getCapabilities(); - boolean localTxn = false; - boolean multiplePerSession = false; - if(capabilities != null) - { - for(TxnCapability capability : capabilities) - { - if(capability.equals(TxnCapability.LOCAL_TXN)) - { - localTxn = true; - } - else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN)) - { - multiplePerSession = true; - } - else - { - error = new Error(); - error.setCondition(AmqpError.NOT_IMPLEMENTED); - error.setDescription("Unsupported capability: " + capability); - break; - } - } - } - - /* if(!localTxn) - { - capabilities.add(TxnCapabilities.LOCAL_TXN); - }*/ - - final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; - final TxnCoordinatorLink_1_0 coordinatorLink = - new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions); - receivingLinkEndpoint.setLinkEventListener(coordinatorLink); - link = coordinatorLink; - - - } - else - { - - ReceivingLink_1_0 previousLink = - (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName()); - - if(previousLink == null) - { - - Target target = (Target) endpoint.getTarget(); - - if(target != null) - { - if(Boolean.TRUE.equals(target.getDynamic())) - { - - AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties()); - target.setAddress(tempQueue.getName()); - } - - String addr = target.getAddress(); - Exchange exchg = _vhost.getExchange(addr); - if(exchg != null) - { - destination = new ExchangeDestination(exchg, target.getDurable(), - target.getExpiryPolicy()); - } - else - { - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); - if(queue != null) - { - - destination = new QueueDestination(queue); - } - else - { - endpoint.setTarget(null); - destination = null; - } - - } - - - } - else - { - destination = null; - } - if(destination != null) - { - final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; - final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost, - (ReceivingDestination) destination); - receivingLinkEndpoint.setLinkEventListener(receivingLink); - link = receivingLink; - if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) - { - linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); - } - } - } - else - { - ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; - previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint)); - receivingLinkEndpoint.setLinkEventListener(previousLink); - link = previousLink; - endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); - - } - } - } - - endpoint.attach(); - - if(link == null) - { - if(error == null) - { - error = new Error(); - error.setCondition(AmqpError.NOT_FOUND); - } - endpoint.detach(error); - } - else - { - link.start(); - } - } - - - private AMQQueue createTemporaryQueue(Map properties) - { - final String queueName = UUID.randomUUID().toString(); - AMQQueue queue = null; - try - { - LifetimePolicy lifetimePolicy = properties == null - ? null - : (LifetimePolicy) properties.get(LIFETIME_POLICY); - - final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - _vhost, - properties); - - - - if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) - { - final Connection_1_0.Task deleteQueueTask = - new Connection_1_0.Task() - { - public void doTask(Connection_1_0 session) - { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) - { - try - { - tempQueue.delete(); - } - catch (AMQException e) - { - e.printStackTrace(); //TODO. - } - } - } - }; - - _connection.addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) - { - _connection.removeConnectionCloseTask(deleteQueueTask); - } - - - }); - } - else if(lifetimePolicy instanceof DeleteOnNoLinks) - { - - } - else if(lifetimePolicy instanceof DeleteOnNoMessages) - { - - } - else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) - { - - } - } - catch (AMQSecurityException e) - { - e.printStackTrace(); //TODO. - } catch (AMQException e) - { - e.printStackTrace(); //TODO - } - - return queue; - } - - public ServerTransaction getTransaction(Binary transactionId) - { - // TODO should treat invalid id differently to null - ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId)); - return transaction == null ? _transaction : transaction; - } - - public void remoteEnd(End end) - { - Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator(); - - while(iter.hasNext()) - { - Map.Entry<Integer, ServerTransaction> entry = iter.next(); - entry.getValue().rollback(); - iter.remove(); - } - - _connection.sessionEnded(this); - - } - - Integer binaryToInteger(final Binary txnId) - { - if(txnId == null) - { - return null; - } - - if(txnId.getLength() > 4) - throw new IllegalArgumentException(); - - int id = 0; - byte[] data = txnId.getArray(); - for(int i = 0; i < txnId.getLength(); i++) - { - id <<= 8; - id += data[i+txnId.getArrayOffset()]; - } - - return id; - - } - - Binary integerToBinary(final int txnId) - { - byte[] data = new byte[4]; - data[3] = (byte) (txnId & 0xff); - data[2] = (byte) ((txnId & 0xff00) >> 8); - data[1] = (byte) ((txnId & 0xff0000) >> 16); - data[0] = (byte) ((txnId & 0xff000000) >> 24); - return new Binary(data); - - } - - public void forceEnd() - { - } - - - @Override - public UUID getId() - { - return _id; - } - - @Override - public AMQConnectionModel getConnectionModel() - { - return _connection.getModel(); - } - - @Override - public String getClientID() - { - // TODO - return ""; - } - - @Override - public void close() throws AMQException - { - // TODO - required for AMQSessionModel / management initiated closing - } - - - @Override - public void close(AMQConstant cause, String message) throws AMQException - { - // TODO - required for AMQSessionModel - } - - @Override - public LogSubject getLogSubject() - { - return this; - } - - @Override - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException - { - // TODO - required for AMQSessionModel / long running transaction detection - } - - @Override - public void block(AMQQueue queue) - { - // TODO - required for AMQSessionModel / producer side flow control - } - - @Override - public void unblock(AMQQueue queue) - { - // TODO - required for AMQSessionModel / producer side flow control - } - - @Override - public void block() - { - // TODO - required for AMQSessionModel / producer side flow control - } - - @Override - public void unblock() - { - // TODO - required for AMQSessionModel / producer side flow control - } - - @Override - public boolean getBlocking() - { - // TODO - return false; - } - - @Override - public boolean onSameConnection(InboundMessage inbound) - { - // TODO - return false; - } - - @Override - public int getUnacknowledgedMessageCount() - { - // TODO - return 0; - } - - @Override - public Long getTxnCount() - { - // TODO - return 0l; - } - - @Override - public Long getTxnStart() - { - // TODO - return 0l; - } - - @Override - public Long getTxnCommits() - { - // TODO - return 0l; - } - - @Override - public Long getTxnRejects() - { - // TODO - return 0l; - } - - @Override - public int getChannelId() - { - // TODO - return 0; - } - - @Override - public int getConsumerCount() - { - // TODO - return 0; - } - - - public String toLogString() - { - long connectionId = getConnectionModel().getConnectionId(); - - String remoteAddress = getConnectionModel().getRemoteAddressString(); - - return "[" + - MessageFormat.format(CHANNEL_FORMAT, - connectionId, - getClientID(), - remoteAddress, - _vhost.getName(), // TODO - virtual host - 0) // TODO - channel) - + "] "; - } - - @Override - public int compareTo(AMQSessionModel o) - { - return getId().compareTo(o.getId()); - } - - public Connection_1_0 getConnection() - { - return _connection; - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java deleted file mode 100644 index 28d212c93d..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ /dev/null @@ -1,718 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import java.io.EOFException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.qpid.AMQException; -import org.apache.qpid.amqp_1_0.codec.ValueHandler; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedByte; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Modified; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.typedmessage.TypedBytesContentReader; -import org.apache.qpid.typedmessage.TypedBytesFormatException; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.codec.BBDecoder; - -class Subscription_1_0 implements Subscription -{ - private SendingLink_1_0 _link; - - private AMQQueue _queue; - - private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED); - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - private final long _id; - private final boolean _acquires; - private volatile AMQQueue.Context _queueContext; - private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private ReentrantLock _stateChangeLock = new ReentrantLock(); - - private boolean _noLocal; - private FilterManager _filters; - - private long _deliveryTag = 0L; - private StateListener _stateListener; - - private Binary _transactionId; - private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer() - .registerTransactionLayer() - .registerSecurityLayer(); - private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry); - - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination) - { - this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY); - } - - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires) - { - _link = link; - _queue = destination.getQueue(); - _id = getEndpoint().getLocalHandle().longValue(); - _acquires = acquires; - } - - private SendingLinkEndpoint getEndpoint() - { - return _link.getEndpoint(); - } - - public LogActor getLogActor() - { - return null; //TODO - } - - public boolean isTransient() - { - return true; //TODO - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(final AMQQueue queue, final boolean exclusive) - { - //TODO - } - - public void setNoLocal(final boolean noLocal) - { - _noLocal = noLocal; - } - - public long getSubscriptionID() - { - return _id; - } - - public boolean isSuspended() - { - return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend(); - - } - - public boolean hasInterest(final QueueEntry entry) - { - if(entry.getMessage() instanceof Message_1_0) - { - if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession()) - { - return false; - } - } - else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) - { - return false; - } - return checkFilters(entry); - - } - - private boolean checkFilters(final QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry); - } - - public boolean isClosed() - { - return !getEndpoint().isAttached(); - } - - public boolean acquires() - { - return _acquires; - } - - public boolean seesRequeues() - { - // TODO - return acquires(); - } - - public void close() - { - getEndpoint().detach(); - } - - public void send(QueueEntry entry, boolean batch) throws AMQException - { - // TODO - send(entry); - } - - public void flushBatched() - { - // TODO - } - - public void send(final QueueEntry queueEntry) throws AMQException - { - ServerMessage serverMessage = queueEntry.getMessage(); - Message_1_0 message; - if(serverMessage instanceof Message_1_0) - { - message = (Message_1_0) serverMessage; - } - else - { - final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class); - message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost()); - } - - Transfer transfer = new Transfer(); - //TODO - - - List<ByteBuffer> fragments = message.getFragments(); - ByteBuffer payload; - if(fragments.size() == 1) - { - payload = fragments.get(0); - } - else - { - int size = 0; - for(ByteBuffer fragment : fragments) - { - size += fragment.remaining(); - } - - payload = ByteBuffer.allocate(size); - - for(ByteBuffer fragment : fragments) - { - payload.put(fragment.duplicate()); - } - - payload.flip(); - } - - if(queueEntry.getDeliveryCount() != 0) - { - payload = payload.duplicate(); - ValueHandler valueHandler = new ValueHandler(_typeRegistry); - - Header oldHeader = null; - try - { - ByteBuffer encodedBuf = payload.duplicate(); - Object value = valueHandler.parse(payload); - if(value instanceof Header) - { - oldHeader = (Header) value; - } - else - { - payload.position(0); - } - } - catch (AmqpErrorException e) - { - //TODO - throw new RuntimeException(e); - } - - Header header = new Header(); - if(oldHeader != null) - { - header.setDurable(oldHeader.getDurable()); - header.setPriority(oldHeader.getPriority()); - header.setTtl(oldHeader.getTtl()); - } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); - _sectionEncoder.reset(); - _sectionEncoder.encodeObject(header); - Binary encodedHeader = _sectionEncoder.getEncoding(); - - ByteBuffer oldPayload = payload; - payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength()); - payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); - payload.put(oldPayload); - payload.flip(); - } - - transfer.setPayload(payload); - byte[] data = new byte[8]; - ByteBuffer.wrap(data).putLong(_deliveryTag++); - final Binary tag = new Binary(data); - - transfer.setDeliveryTag(tag); - - synchronized(_link.getLock()) - { - if(_link.isAttached()) - { - if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) - { - transfer.setSettled(true); - } - else - { - UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); - - _link.addUnsettled(tag, action, queueEntry); - } - - if(_transactionId != null) - { - TransactionalState state = new TransactionalState(); - state.setTxnId(_transactionId); - transfer.setState(state); - } - // TODO - need to deal with failure here - if(_acquires && _transactionId != null) - { - ServerTransaction txn = _link.getTransaction(_transactionId); - if(txn != null) - { - txn.addPostTransactionAction(new ServerTransaction.Action(){ - - public void postCommit() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void onRollback() - { - if(queueEntry.isAcquiredBy(Subscription_1_0.this)) - { - queueEntry.release(); - _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); - - - } - } - }); - } - - } - getSession().getConnectionModel().registerMessageDelivered(message.getSize()); - getEndpoint().transfer(transfer); - } - else - { - queueEntry.release(); - } - } - - } - - public void queueDeleted(final AMQQueue queue) - { - //TODO - getEndpoint().setSource(null); - getEndpoint().detach(); - } - - public boolean wouldSuspend(final QueueEntry msg) - { - synchronized (_link.getLock()) - { - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); - if(!hasCredit && getState() == State.ACTIVE) - { - suspend(); - } - - return !hasCredit; - } - } - - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - public void suspend() - { - synchronized(_link.getLock()) - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - } - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void releaseQueueEntry(QueueEntry queueEntryImpl) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - - public void onDequeue(final QueueEntry queueEntry) - { - //TODO - } - - public void restoreCredit(final QueueEntry queueEntry) - { - //TODO - } - - public void setStateListener(final StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - public boolean isSessionTransactional() - { - return false; //TODO - } - - public void queueEmpty() - { - synchronized(_link.getLock()) - { - if(_link.drained()) - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - } - } - - public void flowStateChanged() - { - synchronized(_link.getLock()) - { - if(isSuspended() && getEndpoint() != null) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - _transactionId = _link.getTransactionId(); - } - } - } - - public Session_1_0 getSession() - { - return _link.getSession(); - } - - private class DispositionAction implements UnsettledAction - { - - private final QueueEntry _queueEntry; - private final Binary _deliveryTag; - - public DispositionAction(Binary tag, QueueEntry queueEntry) - { - _deliveryTag = tag; - _queueEntry = queueEntry; - } - - public boolean process(DeliveryState state, final Boolean settled) - { - - Binary transactionId = null; - final Outcome outcome; - // If disposition is settled this overrides the txn? - if(state instanceof TransactionalState) - { - transactionId = ((TransactionalState)state).getTxnId(); - outcome = ((TransactionalState)state).getOutcome(); - } - else if (state instanceof Outcome) - { - outcome = (Outcome) state; - } - else - { - outcome = null; - } - - - ServerTransaction txn = _link.getTransaction(transactionId); - - if(outcome instanceof Accepted) - { - txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - if(_queueEntry.isAcquiredBy(Subscription_1_0.this)) - { - _queueEntry.discard(); - } - } - - public void onRollback() - { - - } - }); - txn.addPostTransactionAction(new ServerTransaction.Action() - { - public void postCommit() - { - //_link.getEndpoint().settle(_deliveryTag); - _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState)outcome, true); - _link.getEndpoint().sendFlowConditional(); - } - - public void onRollback() - { - if(Boolean.TRUE.equals(settled)) - { - final Modified modified = new Modified(); - modified.setDeliveryFailed(true); - _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); - _link.getEndpoint().sendFlowConditional(); - } - } - }); - } - else if(outcome instanceof Released) - { - txn.addPostTransactionAction(new ServerTransaction.Action() - { - public void postCommit() - { - - _queueEntry.release(); - _link.getEndpoint().settle(_deliveryTag); - } - - public void onRollback() - { - _link.getEndpoint().settle(_deliveryTag); - } - }); - } - - else if(outcome instanceof Modified) - { - txn.addPostTransactionAction(new ServerTransaction.Action() - { - public void postCommit() - { - - _queueEntry.release(); - if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed())) - { - _queueEntry.incrementDeliveryCount(); - } - _link.getEndpoint().settle(_deliveryTag); - } - - public void onRollback() - { - if(Boolean.TRUE.equals(settled)) - { - final Modified modified = new Modified(); - modified.setDeliveryFailed(true); - _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); - _link.getEndpoint().sendFlowConditional(); - } - } - }); - } - - return (transactionId == null && outcome != null); - } - } - - private class DoNothingAction implements UnsettledAction - { - public DoNothingAction(final Binary tag, - final QueueEntry queueEntry) - { - } - - public boolean process(final DeliveryState state, final Boolean settled) - { - Binary transactionId = null; - Outcome outcome = null; - // If disposition is settled this overrides the txn? - if(state instanceof TransactionalState) - { - transactionId = ((TransactionalState)state).getTxnId(); - outcome = ((TransactionalState)state).getOutcome(); - } - else if (state instanceof Outcome) - { - outcome = (Outcome) state; - } - return true; - } - } - - public FilterManager getFilters() - { - return _filters; - } - - public void setFilters(final FilterManager filters) - { - _filters = filters; - } - - @Override - public AMQSessionModel getSessionModel() - { - // TODO - return getSession(); - } - - @Override - public long getBytesOut() - { - // TODO - return 0; - } - - @Override - public long getMessagesOut() - { - // TODO - return 0; - } - - @Override - public long getUnacknowledgedBytes() - { - // TODO - return 0; - } - - @Override - public long getUnacknowledgedMessages() - { - // TODO - return 0; - } - - @Override - public String getConsumerName() - { - //TODO - return "TODO"; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java deleted file mode 100644 index a05d14816a..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.transaction.Declare; -import org.apache.qpid.amqp_1_0.type.transaction.Declared; -import org.apache.qpid.amqp_1_0.type.transaction.Discharge; -import org.apache.qpid.amqp_1_0.type.transport.*; -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.nio.ByteBuffer; -import java.util.*; - -public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0 -{ - private VirtualHost _vhost; - private ReceivingLinkEndpoint _endpoint; - - private ArrayList<Transfer> _incompleteMessage; - private SectionDecoder _sectionDecoder; - private LinkedHashMap<Integer, ServerTransaction> _openTransactions; - private Session_1_0 _session; - - - public TxnCoordinatorLink_1_0(VirtualHost vhost, - Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint, - LinkedHashMap<Integer, ServerTransaction> openTransactions) - { - _vhost = vhost; - _session = session_1_0; - _endpoint = endpoint; - _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry()); - _openTransactions = openTransactions; - } - - public void messageTransfer(Transfer xfr) - { - // TODO - cope with fragmented messages - - ByteBuffer payload = null; - - - if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null) - { - _incompleteMessage = new ArrayList<Transfer>(); - _incompleteMessage.add(xfr); - return; - } - else if(_incompleteMessage != null) - { - _incompleteMessage.add(xfr); - if(Boolean.TRUE.equals(xfr.getMore())) - { - return; - } - - int size = 0; - for(Transfer t : _incompleteMessage) - { - size += t.getPayload().limit(); - } - payload = ByteBuffer.allocate(size); - for(Transfer t : _incompleteMessage) - { - payload.put(t.getPayload().duplicate()); - } - payload.flip(); - _incompleteMessage=null; - - } - else - { - payload = xfr.getPayload(); - } - - - // Only interested int he amqp-value section that holds the message to the co-ordinator - try - { - List<Section> sections = _sectionDecoder.parseAll(payload); - - for(Section section : sections) - { - if(section instanceof AmqpValue) - { - Object command = ((AmqpValue) section).getValue(); - - if(command instanceof Declare) - { - Integer txnId = Integer.valueOf(0); - Iterator<Integer> existingTxn = _openTransactions.keySet().iterator(); - while(existingTxn.hasNext()) - { - txnId = existingTxn.next(); - } - txnId = Integer.valueOf(txnId.intValue() + 1); - - _openTransactions.put(txnId, new LocalTransaction(_vhost.getMessageStore())); - - Declared state = new Declared(); - - - - state.setTxnId(_session.integerToBinary(txnId)); - _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true); - - } - else if(command instanceof Discharge) - { - Discharge discharge = (Discharge) command; - - DeliveryState state = xfr.getState(); - discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail()); - _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true); - - } - } - } - - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - public void remoteDetached(LinkEndpoint endpoint, Detach detach) - { - //TODO - endpoint.detach(); - } - - private Error discharge(Integer transactionId, boolean fail) - { - Error error = null; - ServerTransaction txn = _openTransactions.get(transactionId); - if(txn != null) - { - if(fail) - { - txn.rollback(); - } - else - { - txn.commit(); - } - _openTransactions.remove(transactionId); - } - else - { - error = new Error(); - error.setCondition(AmqpError.NOT_FOUND); - error.setDescription("Unkown transactionId" + transactionId); - } - return error; - } - - - - public void start() - { - _endpoint.setLinkCredit(UnsignedInteger.ONE); - _endpoint.setCreditWindow(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java deleted file mode 100644 index 0fee4086b4..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0; - -import org.apache.qpid.amqp_1_0.type.DeliveryState; - -public interface UnsettledAction -{ - boolean process(DeliveryState state, Boolean settled); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index f7cb6e2bed..f364d93d98 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -39,15 +39,12 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter index 64c497d433..ab1546ed0a 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter @@ -18,6 +18,4 @@ # org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_8_to_0_10 org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_10_to_0_8 -org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_0_8_to_1_0 -org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0 org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10 diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType index 9aa1d4ce11..e0c0aa5873 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType @@ -18,4 +18,3 @@ # org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8 org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10 -org.apache.qpid.server.protocol.v1_0.MessageMetaDataType_1_0 diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator index 50f59d8a55..b771e25328 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator @@ -20,5 +20,3 @@ org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8 org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9 org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1 org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10 -org.apache.qpid.server.protocol.v1_0.ProtocolEngineCreator_1_0_0 -org.apache.qpid.server.protocol.v1_0.ProtocolEngineCreator_1_0_0_SASL |
