summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java149
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java323
2 files changed, 0 insertions, 472 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java
deleted file mode 100755
index 113a9c974a..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.protocol.v0_8.output;
-
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.ReplyTo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class HeaderPropertiesConverter
-{
- private HeaderPropertiesConverter()
- {
- }
-
- public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage, VirtualHost vhost)
- {
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- Header header = messageTransferMessage.getHeader();
- DeliveryProperties deliveryProps = header.getDeliveryProperties();
- MessageProperties messageProps = header.getMessageProperties();
-
- if(deliveryProps != null)
- {
- if(deliveryProps.hasDeliveryMode())
- {
- props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT));
- }
- if(deliveryProps.hasExpiration())
- {
- props.setExpiration(deliveryProps.getExpiration());
- }
- if(deliveryProps.hasPriority())
- {
- props.setPriority((byte)deliveryProps.getPriority().getValue());
- }
- if(deliveryProps.hasTimestamp())
- {
- props.setTimestamp(deliveryProps.getTimestamp());
- }
- }
- if(messageProps != null)
- {
- if(messageProps.hasAppId())
- {
- props.setAppId(new AMQShortString(messageProps.getAppId()));
- }
- if(messageProps.hasContentType())
- {
- props.setContentType(messageProps.getContentType());
- }
- if(messageProps.hasCorrelationId())
- {
- props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
- }
- if(messageProps.hasContentEncoding())
- {
- props.setEncoding(messageProps.getContentEncoding());
- }
- if(messageProps.hasMessageId())
- {
- props.setMessageId("ID:" + messageProps.getMessageId().toString());
- }
- if(messageProps.hasReplyTo())
- {
- ReplyTo replyTo = messageProps.getReplyTo();
- String exchangeName = replyTo.getExchange();
- String routingKey = replyTo.getRoutingKey();
- if(exchangeName == null)
- {
- exchangeName = "";
- }
-
- Exchange exchange = vhost.getExchange(exchangeName);
- String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
- props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
-
- }
- if(messageProps.hasUserId())
- {
- props.setUserId(new AMQShortString(messageProps.getUserId()));
- }
-
- if(messageProps.hasApplicationHeaders())
- {
- Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
- if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
- {
- props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
- }
-
- FieldTable ft = new FieldTable();
- for(Map.Entry<String, Object> entry : appHeaders.entrySet())
- {
- try
- {
- ft.put(new AMQShortString(entry.getKey()), entry.getValue());
- }
- catch(AMQPInvalidClassException e)
- {
- // TODO
- // log here, but ignore - just can;t convert
- }
- }
- props.setHeaders(ft);
-
- }
- }
-
-
-
-
-
-
-
- return props;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 7d0e3e3dbb..28d212c93d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -376,329 +376,6 @@ class Subscription_1_0 implements Subscription
}
- private StoredMessage<MessageMetaData_1_0> convert010Message(final MessageTransferMessage serverMessage)
- {
- final MessageMetaData_1_0 metaData = convertMetaData(serverMessage);
-
- return convertServerMessage(metaData, serverMessage);
-
- }
-
- private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage)
- {
- List<Section> sections = new ArrayList<Section>(3);
- final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
- final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties();
-
- Header header = new Header();
- if(deliveryProps != null)
- {
- header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
- if(deliveryProps.hasPriority())
- {
- header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue()));
- }
- if(deliveryProps.hasTtl())
- {
- header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl()));
- }
- sections.add(header);
- }
-
- Properties props = new Properties();
- if(msgProps != null)
- {
- // props.setAbsoluteExpiryTime();
- if(msgProps.hasContentEncoding())
- {
- props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding()));
- }
-
- if(msgProps.hasCorrelationId())
- {
- props.setCorrelationId(msgProps.getCorrelationId());
- }
- // props.setCreationTime();
- // props.setGroupId();
- // props.setGroupSequence();
- if(msgProps.hasMessageId())
- {
- props.setMessageId(msgProps.getMessageId());
- }
- if(msgProps.hasReplyTo())
- {
- props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey());
- }
- if(msgProps.hasContentType())
- {
- props.setContentType(Symbol.valueOf(msgProps.getContentType()));
-
- // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
- if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
- {
- props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
- }
- }
- // props.setReplyToGroupId();
- props.setSubject(serverMessage.getRoutingKey());
- // props.setTo();
- if(msgProps.hasUserId())
- {
- props.setUserId(new Binary(msgProps.getUserId()));
- }
-
- sections.add(props);
-
- if(msgProps.getApplicationHeaders() != null)
- {
- sections.add(new ApplicationProperties(msgProps.getApplicationHeaders()));
- }
- }
- return new MessageMetaData_1_0(sections, _sectionEncoder);
- }
-
- private StoredMessage<MessageMetaData_1_0> convert08Message(final AMQMessage serverMessage)
- {
- final MessageMetaData_1_0 metaData = convertMetaData(serverMessage);
-
- return convertServerMessage(metaData, serverMessage);
-
-
- }
-
- private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
- final ServerMessage serverMessage)
- {
- final String mimeType = serverMessage.getMessageHeader().getMimeType();
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data), 0);
-
- Section bodySection = convertMessageBody(mimeType, data);
-
- final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection);
-
- return new StoredMessage<MessageMetaData_1_0>()
- {
- @Override
- public MessageMetaData_1_0 getMetaData()
- {
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return serverMessage.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- int size;
- if(dst.remaining()<buf.remaining())
- {
- buf.limit(dst.remaining());
- size = dst.remaining();
- }
- else
- {
- size = buf.remaining();
- }
- dst.put(buf);
- return size;
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- if(size < buf.remaining())
- {
- buf.limit(size);
- }
- return buf;
- }
-
- @Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void remove()
- {
- serverMessage.getStoredMessage().remove();
- }
- };
- }
-
- private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection)
- {
- int headerSize = (int) metaData.getStorableSize();
-
- _sectionEncoder.reset();
- _sectionEncoder.encodeObject(bodySection);
- Binary dataEncoding = _sectionEncoder.getEncoding();
-
- final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
- metaData.writeToBuffer(0,allData);
- allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
- return allData;
- }
-
- private static Section convertMessageBody(String mimeType, byte[] data)
- {
- if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
- {
- String text = new String(data);
- return new AmqpValue(text);
- }
- else if("jms/map-message".equals(mimeType))
- {
- TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
-
- LinkedHashMap map = new LinkedHashMap();
- final int entries = reader.readIntImpl();
- for (int i = 0; i < entries; i++)
- {
- try
- {
- String propName = reader.readStringImpl();
- Object value = reader.readObject();
- map.put(propName, value);
- }
- catch (EOFException e)
- {
- throw new IllegalArgumentException(e);
- }
- catch (TypedBytesFormatException e)
- {
- throw new IllegalArgumentException(e);
- }
-
- }
-
- return new AmqpValue(map);
-
- }
- else if("amqp/map".equals(mimeType))
- {
- BBDecoder decoder = new BBDecoder();
- decoder.init(ByteBuffer.wrap(data));
- return new AmqpValue(decoder.readMap());
-
- }
- else if("amqp/list".equals(mimeType))
- {
- BBDecoder decoder = new BBDecoder();
- decoder.init(ByteBuffer.wrap(data));
- return new AmqpValue(decoder.readList());
- }
- else if("jms/stream-message".equals(mimeType))
- {
- TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
-
- List list = new ArrayList();
- while (reader.remaining() != 0)
- {
- try
- {
- list.add(reader.readObject());
- }
- catch (TypedBytesFormatException e)
- {
- throw new RuntimeException(e); // TODO - Implement
- }
- catch (EOFException e)
- {
- throw new RuntimeException(e); // TODO - Implement
- }
- }
- return new AmqpValue(list);
- }
- else
- {
- return new Data(new Binary(data));
-
- }
- }
-
- private MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage)
- {
-
- List<Section> sections = new ArrayList<Section>(3);
-
- Header header = new Header();
-
- header.setDurable(serverMessage.isPersistent());
-
- BasicContentHeaderProperties contentHeader =
- (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
-
- header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
- final long expiration = serverMessage.getExpiration();
- final long arrivalTime = serverMessage.getArrivalTime();
-
- if(expiration > arrivalTime)
- {
- header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
- }
- sections.add(header);
-
-
- Properties props = new Properties();
-
- props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
-
- props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
-
- // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
- if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
- {
- props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
- }
-
- final AMQShortString correlationId = contentHeader.getCorrelationId();
- if(correlationId != null)
- {
- props.setCorrelationId(new Binary(correlationId.getBytes()));
- }
- // props.setCreationTime();
- // props.setGroupId();
- // props.setGroupSequence();
- final AMQShortString messageId = contentHeader.getMessageId();
- if(messageId != null)
- {
- props.setMessageId(new Binary(messageId.getBytes()));
- }
- props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
-
- // props.setReplyToGroupId();
- props.setSubject(serverMessage.getRoutingKey());
- // props.setTo();
- if(contentHeader.getUserId() != null)
- {
- props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
- }
- sections.add(props);
-
- sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
-
- return new MessageMetaData_1_0(sections, _sectionEncoder);
- }
-
public void queueDeleted(final AMQQueue queue)
{
//TODO