diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-15 17:22:55 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-15 17:22:55 +0000 |
| commit | 6d96b69a7f6d1c64db6181fe2de6b1c9cc29c2d1 (patch) | |
| tree | 0f80cde196433845be6cce584285d4d731b3e474 | |
| parent | 152aa820ef2b024f6481b378b4bf7f87ff1f0565 (diff) | |
| download | qpid-python-6d96b69a7f6d1c64db6181fe2de6b1c9cc29c2d1.tar.gz | |
QPID-4027 Created an AbstractMessageFactory which delegates the creation
of version and/or implementation specific delegates to a concrete
implementation.
Created a concrete implementation for 0-10 and the c++/jni implementations. For the time
being the c++/jni implementation assumes 0-10 only, hence directly extending the 0-10 impl.
Added MessageInternal interface, intended for API impelementors.
The MessageFactories implementations provides String, Map and List
message support in addition to creating version/implementation specific
delegates.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350709 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 893 insertions, 385 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Connection.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Connection.java index 04241cd468..5f95ed8b24 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Connection.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Connection.java @@ -67,6 +67,7 @@ public interface Connection /** * Returns a reference to the message factory for this connection. * @return MessageFactory + * @exception If the connection is not valid. */ - public MessageFactory getMessageFactory(); + public MessageFactory getMessageFactory() throws MessagingException; } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Message.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Message.java index 8f61b72da8..0b825e4794 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Message.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/Message.java @@ -49,7 +49,7 @@ public interface Message { public final static String QPID_SUBJECT = "qpid.subject"; - public ByteBuffer getContent(); + public ByteBuffer getContent() throws MessagingException; public String getMessageId() throws MessagingException; diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/MessageFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/MessageFactory.java index 204f7cc35f..81561aa414 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/MessageFactory.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/MessageFactory.java @@ -32,19 +32,48 @@ import java.util.Map; */ public interface MessageFactory { - Message createMessage(String text) throws MessageEncodingException; + /** + * Supported Message Types. + * Use + */ + public enum MessageType {BINARY, STRING, MAP, LIST} - Message createMessage(byte[] bytes) throws MessageEncodingException; + public Message createMessage(String text) throws MessageEncodingException; - Message createMessage(ByteBuffer buf) throws MessageEncodingException; + public Message createMessage(byte[] bytes) throws MessageEncodingException; - Message createMessage(Map<String,Object> map) throws MessageEncodingException; + public Message createMessage(ByteBuffer buf) throws MessageEncodingException; - Message createMessage(List<Object> list) throws MessageEncodingException; + public Message createMessage(Map<String,Object> map) throws MessageEncodingException; - String getContentAsString(Message m) throws MessageEncodingException; + public Message createMessage(List<Object> list) throws MessageEncodingException; - Map<String,Object> getContentAsMap(Message m) throws MessageEncodingException; + public String getContentAsString(Message m) throws MessageEncodingException; - List<Object> getContentAsList(Message m) throws MessageEncodingException; + public Map<String,Object> getContentAsMap(Message m) throws MessageEncodingException; + + public List<Object> getContentAsList(Message m) throws MessageEncodingException; + + /** + * You could use this method to map your custom content-type to one + * of the supported MessageType's (@see MessageType), provided the content + * of the message conforms to the expected type. + * + * Ex. foo/bar -> STRING, will tell the client to treat any message that has + * the content-type foo/bar to be treated as a STRING Message. + * + * Currently supported content types are as follows. + * default - BINARY + * application/octet-stream - BINARY + * text/plain - STRING + * text/xml - STRING + * amqp/map - MAP + * amqp-0-10/map - MAP + * amqp/list - LIST + * amqp-0-10/list - LIST + * + * @param contentType The content type you want to register. + * @param type The MessageType @see MessageType + */ + public void registerContentType(String contentType, MessageType type); } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java index 4f163d7f6b..1916346e2a 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java @@ -27,14 +27,15 @@ public class CppConnectionFactory extends ConnectionFactory { private static final Logger _logger = LoggerFactory.getLogger(CppConnectionFactory.class); - static + static { + System.setProperty("qpid.allocate-direct","true"); System.loadLibrary("cqpid_java"); _logger.info("native qpid library was loaded sucessfully"); } public CppConnectionFactory() - { + { } public Connection createConnection(String url) diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java new file mode 100644 index 0000000000..4fddcaa326 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java @@ -0,0 +1,240 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.cpp; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.messaging.ListMessage; +import org.apache.qpid.messaging.MapMessage; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessageEncodingException; +import org.apache.qpid.messaging.MessageFactory; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.StringMessage; +import org.apache.qpid.messaging.cpp.jni.Address; +import org.apache.qpid.messaging.cpp.jni.Duration; +import org.apache.qpid.messaging.util.MessageFactory_AMQP_0_10; + +/** + * For the time being 0-10 is assumed. + */ +public class CppMessageFactory extends MessageFactory_AMQP_0_10 +{ + @Override + protected Class<? extends MessageFactory> getFactoryClass() + { + return CppMessageFactory.class; + } + + @Override + protected Message createFactorySpecificMessageDelegate() + { + return new CppMessageDelegate(); + } + + public Message createMessage(org.apache.qpid.messaging.cpp.jni.Message m) throws MessagingException + { + return createMessage(new CppMessageDelegate(m), m.getContentAsByteBuffer()); + } + + class CppMessageDelegate implements Message + { + private org.apache.qpid.messaging.cpp.jni.Message _cppMessage; + + public CppMessageDelegate() + { + this(new org.apache.qpid.messaging.cpp.jni.Message()); + } + + public CppMessageDelegate(org.apache.qpid.messaging.cpp.jni.Message msg) + { + _cppMessage = msg; + } + + @Override + public String getMessageId() + { + return _cppMessage.getMessageId(); + } + + @Override + public void setMessageId(String messageId) + { + _cppMessage.setMessageId(messageId); + + } + + @Override + public String getSubject() + { + return _cppMessage.getSubject(); + } + + @Override + public void setSubject(String subject) + { + _cppMessage.setMessageId(subject); + } + + @Override + public String getContentType() + { + return _cppMessage.getContentType(); + } + + @Override + public void setContentType(String contentType) + { + _cppMessage.setContentType(contentType); + } + + @Override + public String getCorrelationId() + { + return _cppMessage.getCorrelationId(); + } + + @Override + public void setCorrelationId(String correlationId) + { + _cppMessage.setCorrelationId(correlationId); + } + + @Override + public String getReplyTo() + { + return _cppMessage.getReplyTo().toString(); + } + + @Override + public void setReplyTo(String replyTo) + { + _cppMessage.setReplyTo(new Address(replyTo)); + } + + @Override + public String getUserId() + { + return _cppMessage.getUserId(); + } + + @Override + public void setUserId(String userId) + { + _cppMessage.setUserId(userId); + } + + @Override + public boolean isDurable() + { + return _cppMessage.getDurable(); + } + + @Override + public void setDurable(boolean durable) + { + _cppMessage.setDurable(durable); + } + + @Override + public boolean isRedelivered() + { + return _cppMessage.getRedelivered(); + } + + @Override + public void setRedelivered(boolean redelivered) + { + _cppMessage.setRedelivered(redelivered); + } + + @Override + public int getPriority() + { + return _cppMessage.getPriority(); + } + + @Override + public void setPriority(int priority) + { + _cppMessage.setPriority((byte)priority); + } + + @Override + public long getTtl() + { + return _cppMessage.getTtl().getMilliseconds(); + } + + @Override + public void setTtl(long ttl) + { + _cppMessage.setTtl(new Duration(ttl)); + } + + @Override + public long getTimestamp() + { + return 0; + } + + @Override + public void setTimestamp(long timestamp) + { + //ignore the c++ client will set it when sending + } + + @SuppressWarnings("unchecked") + @Override + public Map<String, Object> getProperties() + { + return _cppMessage.getProperties(); + } + + @Override + public void setProperty(String key, Object value) + { + _cppMessage.setProperty(key, value); + } + + protected org.apache.qpid.messaging.cpp.jni.Message getCppMessage() + { + return _cppMessage; + } + + @Override + public String toString() + { + return _cppMessage.toString(); + } + + @Override + public ByteBuffer getContent() throws MessagingException + { + return null; // The delegate is only for the headers + } + + public org.apache.qpid.messaging.cpp.jni.Message getNativeMessage() + { + return _cppMessage; + } + } + +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/MessageInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/MessageInternal.java new file mode 100644 index 0000000000..30fefd6835 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/MessageInternal.java @@ -0,0 +1,52 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessageFactory; +import org.apache.qpid.messaging.cpp.CppMessageFactory; + +public interface MessageInternal extends Message +{ + /** + * Allows Internal objects to determine if a message + * was created by a compatible message factory without + * having to resort to casting to find out. + * + * Class was used, in order to allow reuse of compatible + * MessageFactories between implementations, instead of locking into + * the exact instance that created it. + * + * @return The Class that created this Message. + */ + public Class<? extends MessageFactory> getMessageFactoryClass(); + + /** + * Provides a reference to the Factory specific Message delegate + * that was used when creating a concrete instance of this message. + * You cannot assume the immediate delegate of a message + * object to be the native format. There could be several + * adapters and/or decorators around the native message. + * + * The calling Object should know how to cast the generic object + * to the required type. + * Ex @see {@link CppMessageFactory#CppMessageDelegate} + * and @see {@link CppSender + */ + public Object getFactorySpecificMessageDelegate(); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java new file mode 100644 index 0000000000..91009dc8b6 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java @@ -0,0 +1,474 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.messaging.ListMessage; +import org.apache.qpid.messaging.MapMessage; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessageEncodingException; +import org.apache.qpid.messaging.MessageFactory; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.StringMessage; + +/** + * A generic message factory that has abstract methods for + * protocol or implementation specific message delegates and codecs. + * + * @see MessageFactory_AMQP_0_10 @see CppMessageFactory + */ +public abstract class AbstractMessageFactory implements MessageFactory +{ + protected static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); + protected static boolean ALLOCATE_DIRECT = Boolean.getBoolean("qpid.allocate-direct"); + protected static final ByteBuffer EMPTY_BYTE_BUFFER = ALLOCATE_DIRECT ? ByteBuffer.allocateDirect(0) : ByteBuffer.allocate(0); + + protected final Map<String, MessageType> _contentTypeToMsgTypeMap = new HashMap<String, MessageType>(); + + protected AbstractMessageFactory() + { + _contentTypeToMsgTypeMap.put("text/plain", MessageType.STRING); + _contentTypeToMsgTypeMap.put("text/xml", MessageType.STRING); + _contentTypeToMsgTypeMap.put("amqp/map", MessageType.MAP); + _contentTypeToMsgTypeMap.put("amqp-0-10/map", MessageType.MAP); + _contentTypeToMsgTypeMap.put("amqp/list", MessageType.LIST); + _contentTypeToMsgTypeMap.put("amqp-0-10/list", MessageType.LIST); + _contentTypeToMsgTypeMap.put("application/octet-stream", MessageType.BINARY); + } + + // ----------- Methods for public API -------------------------- + @Override + public Message createMessage(String text) throws MessageEncodingException + { + return createStringMessage(createFactorySpecificMessageDelegate(), text); + } + + @Override + public Message createMessage(byte[] bytes) throws MessageEncodingException + { + ByteBuffer b; + if (ALLOCATE_DIRECT) + { + b = ByteBuffer.allocateDirect(bytes.length); + b.put(bytes); + } + else + { + b = ByteBuffer.wrap(bytes); + } + return createMessage(b); + } + + @Override + public Message createMessage(ByteBuffer buf) throws MessageEncodingException + { + if (ALLOCATE_DIRECT) + { + if (buf.isDirect()) + { + return createMessage(buf); + } + else + { + // Silently copying the data to a direct buffer is not a good thing as it can + // add a perf overhead. So an exception is a more reasonable option. + throw new MessageEncodingException("The ByteBuffer needs to be direct allocated"); + } + } + else + { + return createMessage(buf); + } + } + + @Override + public Message createMessage(Map<String, Object> map) throws MessageEncodingException + { + return createMapMessage(createFactorySpecificMessageDelegate(), map); + } + + @Override + public Message createMessage(List<Object> list) throws MessageEncodingException + { + return createListMessage(createFactorySpecificMessageDelegate(), list); + } + + @Override + public String getContentAsString(Message m) throws MessageEncodingException + { + if (m instanceof StringMessage) + { + return ((StringMessage)m).getString(); + } + else + { + try + { + return decodeAsString(m.getContent()); + } + catch (MessagingException e) + { + throw new MessageEncodingException("Unable to access content",e); + } + } + } + + @Override + public Map<String, Object> getContentAsMap(Message m) throws MessageEncodingException + { + if (m instanceof MapMessage) + { + return ((MapMessage)m).getMap(); + } + else + { + try + { + return decodeAsMap(m.getContent()); + } + catch (MessagingException e) + { + throw new MessageEncodingException("Unable to access content",e); + } + } + } + + @Override + public List<Object> getContentAsList(Message m) throws MessageEncodingException + { + if (m instanceof ListMessage) + { + return ((ListMessage)m).getList(); + } + else + { + try + { + return decodeAsList(m.getContent()); + } + catch (MessagingException e) + { + throw new MessageEncodingException("Unable to access content",e); + } + } + } + + public void registerContentType(String contentType, MessageType type) + { + _contentTypeToMsgTypeMap.put(contentType, type); + } + + // ----------- Methods for internal API -------------------------- + + protected abstract Message createFactorySpecificMessageDelegate(); + + protected abstract Class<? extends MessageFactory> getFactoryClass(); + + protected abstract Map<String,Object> decodeAsMap(ByteBuffer buf) throws MessageEncodingException; + + protected abstract ByteBuffer encodeMap(Map<String,Object> map) throws MessageEncodingException; + + protected abstract List<Object> decodeAsList(ByteBuffer buf) throws MessageEncodingException; + + protected abstract ByteBuffer encodeList(List<Object> list) throws MessageEncodingException; + + protected Message makeMessageReadOnly(Message m) + { + return new ReadOnlyMessageAdapter(m); + } + + protected Message createDefaultMessage(Message delegate, ByteBuffer data) + { + return new DefaultMessageImpl(delegate, data); + } + + protected StringMessage createStringMessage(Message delegate, String str) throws MessageEncodingException + { + return new StringMessageImpl(delegate, str); + } + + protected StringMessage createStringMessage(Message delegate, ByteBuffer buf) + { + return new StringMessageImpl(delegate, buf); + } + + protected MapMessage createMapMessage(Message delegate, Map<String, Object> map) throws MessageEncodingException + { + return new MapMessageImpl(delegate, map); + } + + protected MapMessage createMapMessage(Message delegate, ByteBuffer buf) + { + return new MapMessageImpl(delegate, buf); + } + + protected ListMessage createListMessage(Message delegate, List<Object> list) throws MessageEncodingException + { + return new ListMessageImpl(delegate, list); + } + + protected ListMessage createListMessage(Message delegate, ByteBuffer buf) throws MessageEncodingException + { + return new ListMessageImpl(delegate, buf); + } + + protected Message createMessage(Message delegate, ByteBuffer buf) throws MessagingException + { + MessageType type = _contentTypeToMsgTypeMap.containsKey(delegate.getContentType())? + _contentTypeToMsgTypeMap.get(delegate.getContentType()) : MessageType.BINARY; + + switch (type) + { + case STRING: + return createStringMessage(delegate,buf); + + case MAP: + return createMapMessage(delegate,buf); + + case LIST: + return createListMessage(delegate,buf); + + default: + return createDefaultMessage(delegate,buf); + } + } + + protected class DefaultMessageImpl extends GenericMessageAdapter + { + protected ByteBuffer _rawData; + + public DefaultMessageImpl(Message delegate, ByteBuffer data) + { + super(delegate); + _rawData = (ByteBuffer) data.rewind(); + } + + public DefaultMessageImpl(Message delegate) + { + super(delegate); + _rawData = EMPTY_BYTE_BUFFER; + } + + @Override + public ByteBuffer getContent() + { + return _rawData; + } + + @Override + public Class<? extends MessageFactory> getMessageFactoryClass() + { + return getFactoryClass(); + } + + @Override + public Object getFactorySpecificMessageDelegate() + { + return super.getDelegate(); + } + } + + protected class StringMessageImpl extends DefaultMessageImpl implements StringMessage + { + private String _str; + private MessageEncodingException _exception; + + /** + * @param data The ByteBuffer passed will be read from position zero. + */ + public StringMessageImpl(Message delegate, ByteBuffer data) + { + super(delegate, data); + try + { + _str = decodeAsString(_rawData.duplicate()); + } + catch (MessageEncodingException e) + { + _exception = e; + } + } + + public StringMessageImpl(Message delegate, String str) throws MessageEncodingException + { + super(delegate); + _str = str; + if(str != null && !str.isEmpty()) + { + _rawData = encodeString(str); + } + } + + @Override + public String getString() throws MessageEncodingException + { + if (_exception != null) + { + throw _exception; + } + else + { + return _str; + } + } + } + + /** + * @param data The ByteBuffer passed will be read from position zero. + */ + protected class MapMessageImpl extends DefaultMessageImpl implements MapMessage + { + private Map<String,Object> _map; + private MessageEncodingException _exception; + + public MapMessageImpl(Message delegate, ByteBuffer data) + { + super(delegate, data); + try + { + _map = decodeAsMap(_rawData.duplicate()); + } + catch (MessageEncodingException e) + { + _exception = e; + } + } + + public MapMessageImpl(Message delegate, Map<String,Object> map) throws MessageEncodingException + { + super(delegate); + _map = map; + if(map != null && !map.isEmpty()) + { + _rawData = encodeMap(map); + } + } + + @SuppressWarnings("unchecked") + @Override + public Map<String,Object> getMap() throws MessageEncodingException + { + if (_exception != null) + { + throw _exception; + } + else + { + return _map == null ? Collections.EMPTY_MAP : _map; + } + } + } + + /** + * @param data The ByteBuffer passed will be read from position zero. + */ + protected class ListMessageImpl extends DefaultMessageImpl implements ListMessage + { + private List<Object> _list; + private MessageEncodingException _exception; + + public ListMessageImpl(Message delegate, ByteBuffer data) + { + super(delegate, data); + try + { + _list = decodeAsList(_rawData.duplicate()); + } + catch (MessageEncodingException e) + { + _exception = e; + } + } + + public ListMessageImpl(Message delegate, List<Object> list) throws MessageEncodingException + { + super(delegate); + _list = list; + if(list != null && !list.isEmpty()) + { + _rawData = encodeList(list); + } + } + + @SuppressWarnings("unchecked") + @Override + public List<Object> getList() throws MessageEncodingException + { + if (_exception != null) + { + throw _exception; + } + else + { + return _list == null ? Collections.EMPTY_LIST : _list; + } + } + } + + protected String decodeAsString(ByteBuffer buf) throws MessageEncodingException + { + final CharsetDecoder decoder = DEFAULT_CHARSET.newDecoder(); + try + { + return decoder.decode(buf).toString(); + } + catch (CharacterCodingException e) + { + throw new MessageEncodingException("Error decoding content as String using UTF-8",e); + } + + } + + protected ByteBuffer encodeString(String str) throws MessageEncodingException + { + final CharsetEncoder encoder = DEFAULT_CHARSET.newEncoder(); + ByteBuffer b; + try + { + b = encoder.encode(CharBuffer.wrap(str)); + b.flip(); + } + catch (CharacterCodingException e) + { + throw new MessageEncodingException("Cannot encode string in UFT-8: " + str,e); + } + if (ALLOCATE_DIRECT) + { + // unfortunately this extra copy is required as it does not seem possible + // to create a CharSetEncoder that returns a buffer allocated directly. + ByteBuffer direct = ByteBuffer.allocateDirect(b.remaining()); + direct.put(b); + direct.flip(); + return direct; + } + else + { + return b; + } + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/GenericMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/GenericMessageAdapter.java index 4928f1df38..6bd19a44c2 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/GenericMessageAdapter.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/GenericMessageAdapter.java @@ -22,8 +22,10 @@ import java.util.Collections; import java.util.Map; import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessageFactory; import org.apache.qpid.messaging.MessageNotWritableException; import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.ext.MessageInternal; /** * A generic message adapter that simply delegates @@ -34,7 +36,7 @@ import org.apache.qpid.messaging.MessagingException; * @see StringMessage_AMQP_0_10 * @see MapMessage_AMQP_0_10 */ -public class GenericMessageAdapter implements Message +public class GenericMessageAdapter implements MessageInternal { private Message _delegate; @@ -188,8 +190,41 @@ public class GenericMessageAdapter implements Message } @Override - public ByteBuffer getContent() + public ByteBuffer getContent() throws MessagingException { return _delegate.getContent(); } + + @Override + public Class<? extends MessageFactory> getMessageFactoryClass() + { + if (_delegate instanceof MessageInternal) + { + return ((MessageInternal)_delegate).getMessageFactoryClass(); + } + else + { + throw new UnsupportedOperationException( + "This Adapter nor it's delegate have the required info"); + } + } + + @Override + public Object getFactorySpecificMessageDelegate() + { + if (_delegate instanceof MessageInternal) + { + return ((MessageInternal)_delegate).getFactorySpecificMessageDelegate(); + } + else + { + throw new UnsupportedOperationException( + "This Adapter nor it's delegate have the required info"); + } + } + + public Message getDelegate() + { + return _delegate; + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java index 330d74cd53..cbe18796f6 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java @@ -1,22 +1,14 @@ package org.apache.qpid.messaging.util; import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.qpid.messaging.ListMessage; -import org.apache.qpid.messaging.MapMessage; import org.apache.qpid.messaging.Message; import org.apache.qpid.messaging.MessageEncodingException; import org.apache.qpid.messaging.MessageFactory; import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.StringMessage; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; @@ -30,139 +22,96 @@ import org.apache.qpid.util.UUIDs; * A generic message factory that is based on the AMQO 0-10 encoding. * */ -public class MessageFactory_AMQP_0_10 implements MessageFactory +public class MessageFactory_AMQP_0_10 extends AbstractMessageFactory { - private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); - private static boolean ALLOCATE_DIRECT = Boolean.getBoolean("qpid.allocate-direct"); - private static final ByteBuffer EMPTY_BYTE_BUFFER = ALLOCATE_DIRECT ? ByteBuffer.allocateDirect(0) : ByteBuffer.allocate(0); - - @Override - public Message createMessage(String text) throws MessageEncodingException + protected Class<? extends MessageFactory> getFactoryClass() { - return new StringMessage_AMQP_0_10(new Mesage_AMQP_0_10(), text); + return MessageFactory_AMQP_0_10.class; } - @Override - public Message createMessage(byte[] bytes) throws MessageEncodingException + protected Map<String,Object> decodeAsMap(ByteBuffer buf) throws MessageEncodingException { - ByteBuffer b; - if (ALLOCATE_DIRECT) + try { - b = ByteBuffer.allocateDirect(bytes.length); - b.put(bytes); + BBDecoder decorder = new BBDecoder(); + decorder.init(buf); + return decorder.readMap(); } - else + catch (Exception e) { - b = ByteBuffer.wrap(bytes); + throw new MessageEncodingException("Error decoding content as Map",e); } - return new Mesage_AMQP_0_10(b); } - @Override - public Message createMessage(ByteBuffer buf) throws MessageEncodingException + protected ByteBuffer encodeMap(Map<String,Object> map) throws MessageEncodingException { - if (ALLOCATE_DIRECT) + try { - if (buf.isDirect()) - { - return new Mesage_AMQP_0_10(buf); - } - else - { - // Silently copying the data to a direct buffer is not a good thing as it can - // add a perf overhead. So an exception is a more reasonable option. - throw new MessageEncodingException("The ByteBuffer needs to be direct allocated"); - } + //need to investigate the capacity here. + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap(map); + return (ByteBuffer)encoder.buffer().flip(); } - else + catch (Exception e) { - return new Mesage_AMQP_0_10(buf); + throw new MessageEncodingException("Cannot encode Map" ,e); } } - @Override - public Message createMessage(Map<String, Object> map) throws MessageEncodingException - { - return new MapMessage_AMQP_0_10(new Mesage_AMQP_0_10(), map); - } - - @Override - public Message createMessage(List<Object> list) throws MessageEncodingException - { - return new ListMessage_AMQP_0_10(new Mesage_AMQP_0_10(), list); - } - - @Override - public String getContentAsString(Message m) throws MessageEncodingException + protected List<Object> decodeAsList(ByteBuffer buf) throws MessageEncodingException { - if (m instanceof StringMessage) + try { - return ((StringMessage)m).getString(); + BBDecoder decorder = new BBDecoder(); + decorder.init(buf); + return decorder.readList(); } - else + catch (Exception e) { - return decodeAsString(m.getContent()); + throw new MessageEncodingException("Error decoding content as List",e); } } - @Override - public Map<String, Object> getContentAsMap(Message m) throws MessageEncodingException + protected ByteBuffer encodeList(List<Object> list) throws MessageEncodingException { - if (m instanceof MapMessage) + try { - return ((MapMessage)m).getMap(); + //need to investigate the capacity here. + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList(list); + return (ByteBuffer)encoder.buffer().flip(); } - else + catch (Exception e) { - return decodeAsMap(m.getContent()); + throw new MessageEncodingException("Cannot encode List" ,e); } } @Override - public List<Object> getContentAsList(Message m) throws MessageEncodingException + protected Message createFactorySpecificMessageDelegate() { - if (m instanceof ListMessage) - { - return ((ListMessage)m).getList(); - } - else - { - return decodeAsList(m.getContent()); - } + return new Mesage_AMQP_0_10_Delegate(); } - class Mesage_AMQP_0_10 implements Message + class Mesage_AMQP_0_10_Delegate implements Message { private MessageProperties _messageProps; private DeliveryProperties _deliveryProps; - private ByteBuffer _data; private UUIDGen _ssnNameGenerator = UUIDs.newGenerator(); - protected Mesage_AMQP_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps) + // creating a new message for sending + protected Mesage_AMQP_0_10_Delegate() { - this(messageProps, deliveryProps,EMPTY_BYTE_BUFFER); + this(new MessageProperties(),new DeliveryProperties()); } - protected Mesage_AMQP_0_10(MessageProperties messageProps, - DeliveryProperties deliveryProps, - ByteBuffer buf) + + // Message received with data. + protected Mesage_AMQP_0_10_Delegate(MessageProperties messageProps, + DeliveryProperties deliveryProps) { _messageProps = messageProps; _deliveryProps = deliveryProps; - _data = buf; - } - - protected Mesage_AMQP_0_10() - { - _messageProps = new MessageProperties(); - _deliveryProps = new DeliveryProperties(); - } - - protected Mesage_AMQP_0_10(ByteBuffer buf) - { - _messageProps = new MessageProperties(); - _deliveryProps = new DeliveryProperties(); - _data = buf; } @Override @@ -343,284 +292,11 @@ public class MessageFactory_AMQP_0_10 implements MessageFactory } } + // noop, this is for the headers only. @Override - public ByteBuffer getContent() - { - return _data; - } - } - - class StringMessage_AMQP_0_10 extends GenericMessageAdapter implements StringMessage - { - private String _str; - private ByteBuffer _rawData; - private MessageEncodingException _exception; - - /** - * @param data The ByteBuffer passed will be read from position zero. - */ - public StringMessage_AMQP_0_10(MessageProperties messageProps, - DeliveryProperties deliveryProps, - ByteBuffer data) - { - super(new Mesage_AMQP_0_10(messageProps, deliveryProps)); - _rawData = (ByteBuffer) data.rewind(); - try - { - _str = decodeAsString(_rawData.duplicate()); - } - catch (MessageEncodingException e) - { - _exception = e; - } - } - - public StringMessage_AMQP_0_10(Message delegate, String str) throws MessageEncodingException - { - super(delegate); - if(_str == null || _str.isEmpty()) - { - _rawData = EMPTY_BYTE_BUFFER; - } - else - { - _rawData = encodeString(str); - } - } - - @Override - public String getString() throws MessageEncodingException - { - if (_exception != null) - { - throw _exception; - } - else - { - return _str; - } - } - - @Override - public ByteBuffer getContent() - { - return _rawData; - } - } - - /** - * @param data The ByteBuffer passed will be read from position zero. - */ - class MapMessage_AMQP_0_10 extends GenericMessageAdapter implements MapMessage - { - private Map<String,Object> _map; - private ByteBuffer _rawData; - private MessageEncodingException _exception; - - public MapMessage_AMQP_0_10(MessageProperties messageProps, - DeliveryProperties deliveryProps, - ByteBuffer data) - { - super(new Mesage_AMQP_0_10(messageProps, deliveryProps)); - _rawData = (ByteBuffer) data.rewind(); - try - { - _map = decodeAsMap(_rawData.duplicate()); - } - catch (MessageEncodingException e) - { - _exception = e; - } - } - - public MapMessage_AMQP_0_10(Message delegate, Map<String,Object> map) throws MessageEncodingException - { - super(delegate); - if(map == null || map.isEmpty()) - { - _rawData = EMPTY_BYTE_BUFFER; - } - else - { - _rawData = encodeMap(map); - } - } - - @Override - public Map<String,Object> getMap() throws MessageEncodingException - { - if (_exception != null) - { - throw _exception; - } - else - { - return _map; - } - } - - @Override - public ByteBuffer getContent() - { - return _rawData; - } - } - - /** - * @param data The ByteBuffer passed will be read from position zero. - */ - class ListMessage_AMQP_0_10 extends GenericMessageAdapter implements ListMessage - { - private List<Object> _list; - private ByteBuffer _rawData; - private MessageEncodingException _exception; - - public ListMessage_AMQP_0_10(MessageProperties messageProps, - DeliveryProperties deliveryProps, - ByteBuffer data) - { - super(new Mesage_AMQP_0_10(messageProps, deliveryProps)); - _rawData = (ByteBuffer) data.rewind(); - try - { - _list = decodeAsList(_rawData.duplicate()); - } - catch (MessageEncodingException e) - { - _exception = e; - } - } - - public ListMessage_AMQP_0_10(Message delegate, List<Object> list) throws MessageEncodingException - { - super(delegate); - if(list == null || list.isEmpty()) - { - _rawData = EMPTY_BYTE_BUFFER; - } - else - { - _rawData = encodeList(list); - } - } - - @Override - public List<Object> getList() throws MessageEncodingException - { - if (_exception != null) - { - throw _exception; - } - else - { - return _list; - } - } - - @Override - public ByteBuffer getContent() - { - return _rawData; - } - } - - protected static String decodeAsString(ByteBuffer buf) throws MessageEncodingException - { - final CharsetDecoder decoder = DEFAULT_CHARSET.newDecoder(); - try + public ByteBuffer getContent() throws MessagingException { - return decoder.decode(buf).toString(); - } - catch (CharacterCodingException e) - { - throw new MessageEncodingException("Error decoding content as String using UTF-8",e); - } - - } - - protected static ByteBuffer encodeString(String str) throws MessageEncodingException - { - final CharsetEncoder encoder = DEFAULT_CHARSET.newEncoder(); - ByteBuffer b; - try - { - b = encoder.encode(CharBuffer.wrap(str)); - b.flip(); - } - catch (CharacterCodingException e) - { - throw new MessageEncodingException("Cannot encode string in UFT-8: " + str,e); - } - if (ALLOCATE_DIRECT) - { - // unfortunately this extra copy is required as it does not seem possible - // to create a CharSetEncoder that returns a buffer allocated directly. - ByteBuffer direct = ByteBuffer.allocateDirect(b.remaining()); - direct.put(b); - direct.flip(); - return direct; - } - else - { - return b; - } - } - - protected static Map<String,Object> decodeAsMap(ByteBuffer buf) throws MessageEncodingException - { - try - { - BBDecoder decorder = new BBDecoder(); - decorder.init(buf); - return decorder.readMap(); - } - catch (Exception e) - { - throw new MessageEncodingException("Error decoding content as Map",e); - } - } - - protected static ByteBuffer encodeMap(Map<String,Object> map) throws MessageEncodingException - { - try - { - //need to investigate the capacity here. - BBEncoder encoder = new BBEncoder(1024); - encoder.writeMap(map); - return (ByteBuffer)encoder.buffer().flip(); - } - catch (Exception e) - { - throw new MessageEncodingException("Cannot encode Map" ,e); - } - } - - protected static List<Object> decodeAsList(ByteBuffer buf) throws MessageEncodingException - { - try - { - BBDecoder decorder = new BBDecoder(); - decorder.init(buf); - return decorder.readList(); - } - catch (Exception e) - { - throw new MessageEncodingException("Error decoding content as List",e); - } - } - - protected static ByteBuffer encodeList(List<Object> list) throws MessageEncodingException - { - try - { - //need to investigate the capacity here. - BBEncoder encoder = new BBEncoder(1024); - encoder.writeList(list); - return (ByteBuffer)encoder.buffer().flip(); - } - catch (Exception e) - { - throw new MessageEncodingException("Cannot encode List" ,e); + throw new MessagingException("Empty!"); } } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java index 1c93680a7f..2b90f30d68 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java @@ -114,7 +114,7 @@ public class ReadOnlyMessageAdapter extends GenericMessageAdapter } @Override - public ByteBuffer getContent() + public ByteBuffer getContent() throws MessagingException { return super.getContent().asReadOnlyBuffer(); } |
