diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 14:40:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 14:40:07 +0000 |
| commit | b89341f0b6382d09ef438bafe09235ca48ea5767 (patch) | |
| tree | 1be8d0e2a04769d77a3d4e4be00eb6c9732a0da6 /qpid/java/broker | |
| parent | 4ad719f82cadddaef7e6693b43c3e767f3eb14cd (diff) | |
| download | qpid-python-b89341f0b6382d09ef438bafe09235ca48ea5767.tar.gz | |
QPID-4659 : [Java Broker] remove redundant code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503272 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
2 files changed, 0 insertions, 472 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java deleted file mode 100755 index 113a9c974a..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java +++ /dev/null @@ -1,149 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.protocol.v0_8.output; - -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.ReplyTo; - -import java.util.HashMap; -import java.util.Map; - -public class HeaderPropertiesConverter -{ - private HeaderPropertiesConverter() - { - } - - public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage, VirtualHost vhost) - { - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - - Header header = messageTransferMessage.getHeader(); - DeliveryProperties deliveryProps = header.getDeliveryProperties(); - MessageProperties messageProps = header.getMessageProperties(); - - if(deliveryProps != null) - { - if(deliveryProps.hasDeliveryMode()) - { - props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT)); - } - if(deliveryProps.hasExpiration()) - { - props.setExpiration(deliveryProps.getExpiration()); - } - if(deliveryProps.hasPriority()) - { - props.setPriority((byte)deliveryProps.getPriority().getValue()); - } - if(deliveryProps.hasTimestamp()) - { - props.setTimestamp(deliveryProps.getTimestamp()); - } - } - if(messageProps != null) - { - if(messageProps.hasAppId()) - { - props.setAppId(new AMQShortString(messageProps.getAppId())); - } - if(messageProps.hasContentType()) - { - props.setContentType(messageProps.getContentType()); - } - if(messageProps.hasCorrelationId()) - { - props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId())); - } - if(messageProps.hasContentEncoding()) - { - props.setEncoding(messageProps.getContentEncoding()); - } - if(messageProps.hasMessageId()) - { - props.setMessageId("ID:" + messageProps.getMessageId().toString()); - } - if(messageProps.hasReplyTo()) - { - ReplyTo replyTo = messageProps.getReplyTo(); - String exchangeName = replyTo.getExchange(); - String routingKey = replyTo.getRoutingKey(); - if(exchangeName == null) - { - exchangeName = ""; - } - - Exchange exchange = vhost.getExchange(exchangeName); - String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString(); - props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'")); - - } - if(messageProps.hasUserId()) - { - props.setUserId(new AMQShortString(messageProps.getUserId())); - } - - if(messageProps.hasApplicationHeaders()) - { - Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders()); - if(messageProps.getApplicationHeaders().containsKey("x-jms-type")) - { - props.setType(String.valueOf(appHeaders.remove("x-jms-type"))); - } - - FieldTable ft = new FieldTable(); - for(Map.Entry<String, Object> entry : appHeaders.entrySet()) - { - try - { - ft.put(new AMQShortString(entry.getKey()), entry.getValue()); - } - catch(AMQPInvalidClassException e) - { - // TODO - // log here, but ignore - just can;t convert - } - } - props.setHeaders(ft); - - } - } - - - - - - - - return props; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 7d0e3e3dbb..28d212c93d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -376,329 +376,6 @@ class Subscription_1_0 implements Subscription } - private StoredMessage<MessageMetaData_1_0> convert010Message(final MessageTransferMessage serverMessage) - { - final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); - - return convertServerMessage(metaData, serverMessage); - - } - - private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage) - { - List<Section> sections = new ArrayList<Section>(3); - final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties(); - final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties(); - - Header header = new Header(); - if(deliveryProps != null) - { - header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); - if(deliveryProps.hasPriority()) - { - header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue())); - } - if(deliveryProps.hasTtl()) - { - header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl())); - } - sections.add(header); - } - - Properties props = new Properties(); - if(msgProps != null) - { - // props.setAbsoluteExpiryTime(); - if(msgProps.hasContentEncoding()) - { - props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding())); - } - - if(msgProps.hasCorrelationId()) - { - props.setCorrelationId(msgProps.getCorrelationId()); - } - // props.setCreationTime(); - // props.setGroupId(); - // props.setGroupSequence(); - if(msgProps.hasMessageId()) - { - props.setMessageId(msgProps.getMessageId()); - } - if(msgProps.hasReplyTo()) - { - props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey()); - } - if(msgProps.hasContentType()) - { - props.setContentType(Symbol.valueOf(msgProps.getContentType())); - - // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client - if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) - { - props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); - } - } - // props.setReplyToGroupId(); - props.setSubject(serverMessage.getRoutingKey()); - // props.setTo(); - if(msgProps.hasUserId()) - { - props.setUserId(new Binary(msgProps.getUserId())); - } - - sections.add(props); - - if(msgProps.getApplicationHeaders() != null) - { - sections.add(new ApplicationProperties(msgProps.getApplicationHeaders())); - } - } - return new MessageMetaData_1_0(sections, _sectionEncoder); - } - - private StoredMessage<MessageMetaData_1_0> convert08Message(final AMQMessage serverMessage) - { - final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); - - return convertServerMessage(metaData, serverMessage); - - - } - - private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData, - final ServerMessage serverMessage) - { - final String mimeType = serverMessage.getMessageHeader().getMimeType(); - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getContent(ByteBuffer.wrap(data), 0); - - Section bodySection = convertMessageBody(mimeType, data); - - final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection); - - return new StoredMessage<MessageMetaData_1_0>() - { - @Override - public MessageMetaData_1_0 getMetaData() - { - return metaData; - } - - @Override - public long getMessageNumber() - { - return serverMessage.getMessageNumber(); - } - - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override - public int getContent(int offsetInMessage, ByteBuffer dst) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - int size; - if(dst.remaining()<buf.remaining()) - { - buf.limit(dst.remaining()); - size = dst.remaining(); - } - else - { - size = buf.remaining(); - } - dst.put(buf); - return size; - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - if(size < buf.remaining()) - { - buf.limit(size); - } - return buf; - } - - @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override - public void remove() - { - serverMessage.getStoredMessage().remove(); - } - }; - } - - private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection) - { - int headerSize = (int) metaData.getStorableSize(); - - _sectionEncoder.reset(); - _sectionEncoder.encodeObject(bodySection); - Binary dataEncoding = _sectionEncoder.getEncoding(); - - final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength()); - metaData.writeToBuffer(0,allData); - allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength()); - return allData; - } - - private static Section convertMessageBody(String mimeType, byte[] data) - { - if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) - { - String text = new String(data); - return new AmqpValue(text); - } - else if("jms/map-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - LinkedHashMap map = new LinkedHashMap(); - final int entries = reader.readIntImpl(); - for (int i = 0; i < entries; i++) - { - try - { - String propName = reader.readStringImpl(); - Object value = reader.readObject(); - map.put(propName, value); - } - catch (EOFException e) - { - throw new IllegalArgumentException(e); - } - catch (TypedBytesFormatException e) - { - throw new IllegalArgumentException(e); - } - - } - - return new AmqpValue(map); - - } - else if("amqp/map".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - return new AmqpValue(decoder.readMap()); - - } - else if("amqp/list".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - return new AmqpValue(decoder.readList()); - } - else if("jms/stream-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - List list = new ArrayList(); - while (reader.remaining() != 0) - { - try - { - list.add(reader.readObject()); - } - catch (TypedBytesFormatException e) - { - throw new RuntimeException(e); // TODO - Implement - } - catch (EOFException e) - { - throw new RuntimeException(e); // TODO - Implement - } - } - return new AmqpValue(list); - } - else - { - return new Data(new Binary(data)); - - } - } - - private MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage) - { - - List<Section> sections = new ArrayList<Section>(3); - - Header header = new Header(); - - header.setDurable(serverMessage.isPersistent()); - - BasicContentHeaderProperties contentHeader = - (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); - - header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); - final long expiration = serverMessage.getExpiration(); - final long arrivalTime = serverMessage.getArrivalTime(); - - if(expiration > arrivalTime) - { - header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime)); - } - sections.add(header); - - - Properties props = new Properties(); - - props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); - - props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); - - // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client - if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) - { - props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); - } - - final AMQShortString correlationId = contentHeader.getCorrelationId(); - if(correlationId != null) - { - props.setCorrelationId(new Binary(correlationId.getBytes())); - } - // props.setCreationTime(); - // props.setGroupId(); - // props.setGroupSequence(); - final AMQShortString messageId = contentHeader.getMessageId(); - if(messageId != null) - { - props.setMessageId(new Binary(messageId.getBytes())); - } - props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); - - // props.setReplyToGroupId(); - props.setSubject(serverMessage.getRoutingKey()); - // props.setTo(); - if(contentHeader.getUserId() != null) - { - props.setUserId(new Binary(contentHeader.getUserId().getBytes())); - } - sections.add(props); - - sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders()))); - - return new MessageMetaData_1_0(sections, _sectionEncoder); - } - public void queueDeleted(final AMQQueue queue) { //TODO |
