diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-03-17 16:44:47 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-03-17 16:44:47 +0000 |
| commit | ba09630a4258cded77842e1bd5d746b8fbda0cfe (patch) | |
| tree | da5fd4e29ce839185c6759a1a141fb2d65f0250c /qpid/java/common/src | |
| parent | 1533a95469482a820d3f883c44e7e92fa02c5eb3 (diff) | |
| download | qpid-python-ba09630a4258cded77842e1bd5d746b8fbda0cfe.tar.gz | |
QPID-4000 : [Java Broker] Add conversion of 0-x messages to 1-0 subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1457482 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
4 files changed, 1090 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java new file mode 100644 index 0000000000..0e12ac65d8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.typedmessage; + +public interface TypedBytesCodes +{ + static final byte BOOLEAN_TYPE = (byte) 1; + + static final byte BYTE_TYPE = (byte) 2; + + static final byte BYTEARRAY_TYPE = (byte) 3; + + static final byte SHORT_TYPE = (byte) 4; + + static final byte CHAR_TYPE = (byte) 5; + + static final byte INT_TYPE = (byte) 6; + + static final byte LONG_TYPE = (byte) 7; + + static final byte FLOAT_TYPE = (byte) 8; + + static final byte DOUBLE_TYPE = (byte) 9; + + static final byte STRING_TYPE = (byte) 10; + + static final byte NULL_STRING_TYPE = (byte) 11; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java new file mode 100644 index 0000000000..0ba865f1e6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java @@ -0,0 +1,669 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.typedmessage; + + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +public class TypedBytesContentReader implements TypedBytesCodes +{ + + private final ByteBuffer _data; + private final int _position; + private final int _limit; + + + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder(); + + private int _byteArrayRemaining = -1; + + + public TypedBytesContentReader(final ByteBuffer data) + { + _data = data.duplicate(); + _position = _data.position(); + _limit = _data.limit(); + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + public void checkAvailable(int len) throws EOFException + { + if (_data.remaining() < len) + { + throw new EOFException("Unable to read " + len + " bytes"); + } + } + + public byte readWireType() throws TypedBytesFormatException, EOFException + { + checkAvailable(1); + return _data.get(); + } + + public boolean readBoolean() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public boolean readBooleanImpl() + { + return _data.get() != 0; + } + + public byte readByte() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + public byte readByteImpl() + { + return _data.get(); + } + + public short readShort() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + public short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + public char readChar() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if (wireType == NULL_STRING_TYPE) + { + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public char readCharImpl() + { + return _data.getChar(); + } + + public int readInt() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public int readIntImpl() + { + return _data.getInt(); + } + + public long readLong() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public long readLongImpl() + { + return _data.getLong(); + } + + public float readFloat() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public float readFloatImpl() + { + return _data.getFloat(); + } + + public double readDouble() throws TypedBytesFormatException, EOFException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public double readDoubleImpl() + { + return _data.getDouble(); + } + + public String readString() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public String readStringImpl() throws TypedBytesFormatException + { + try + { + _charsetDecoder.reset(); + ByteBuffer dup = _data.duplicate(); + int pos = _data.position(); + byte b; + while((b = _data.get()) != 0) {}; + dup.limit(_data.position()-1); + return _charsetDecoder.decode(dup).toString(); + + } + catch (CharacterCodingException e) + { + TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e); + jmse.initCause(e); + throw jmse; + } + } + + public int readBytes(byte[] bytes) throws EOFException, TypedBytesFormatException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new EOFException("Byte array has stated length " + + size + + " but message only contains " + + + _data.remaining() + + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public Object readObject() throws EOFException, TypedBytesFormatException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + public void reset() + { + _byteArrayRemaining = -1; + _data.position(_position); + _data.limit(_limit); + } + + public ByteBuffer getData() + { + ByteBuffer buf = _data.duplicate(); + buf.position(_position); + buf.limit(_limit); + return buf; + } + + public long size() + { + return _limit - _position; + } + + public int remaining() + { + return _data.remaining(); + } + + public void readRawBytes(final byte[] bytes, final int offset, final int count) + { + _data.get(bytes, offset, count); + } + + public String readLengthPrefixedUTF() throws TypedBytesFormatException + { + try + { + short length = readShortImpl(); + if(length == 0) + { + return ""; + } + else + { + _charsetDecoder.reset(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = _charsetDecoder.decode(encodedString); + + return string.toString(); + } + } + catch(CharacterCodingException e) + { + TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e); + jmse.initCause(e); + throw jmse; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java new file mode 100644 index 0000000000..c7ca2d7df7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java @@ -0,0 +1,366 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.typedmessage; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; + +public class TypedBytesContentWriter implements TypedBytesCodes +{ + private final ByteArrayOutputStream _baos = new ByteArrayOutputStream(); + private final DataOutputStream _data = new DataOutputStream(_baos); + private static final Charset UTF8 = Charset.forName("UTF-8"); + + protected void writeTypeDiscriminator(byte type) + { + try + { + _data.writeByte(type); + } + catch (IOException e) + { + throw handle(e); + } + } + + private RuntimeException handle(final IOException e) + { + RuntimeException jmsEx = new RuntimeException("Unable to write value: " + e.getMessage()); + return jmsEx; + } + + + public void writeBoolean(boolean b) + { + writeTypeDiscriminator(BOOLEAN_TYPE); + writeBooleanImpl(b); + } + + public void writeBooleanImpl(final boolean b) + { + try + { + _data.writeByte(b ? (byte) 1 : (byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeByte(byte b) + { + writeTypeDiscriminator(BYTE_TYPE); + writeByteImpl(b); + } + + public void writeByteImpl(final byte b) + { + try + { + _data.writeByte(b); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeShort(short i) + { + writeTypeDiscriminator(SHORT_TYPE); + writeShortImpl(i); + } + + public void writeShortImpl(final short i) + { + try + { + _data.writeShort(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeChar(char c) + { + writeTypeDiscriminator(CHAR_TYPE); + writeCharImpl(c); + } + + public void writeCharImpl(final char c) + { + try + { + _data.writeChar(c); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeInt(int i) + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + public void writeIntImpl(int i) + { + try + { + _data.writeInt(i); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeLong(long l) + { + writeTypeDiscriminator(LONG_TYPE); + writeLongImpl(l); + } + + public void writeLongImpl(final long l) + { + try + { + _data.writeLong(l); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeFloat(float v) + { + writeTypeDiscriminator(FLOAT_TYPE); + writeFloatImpl(v); + } + + public void writeFloatImpl(final float v) + { + try + { + _data.writeFloat(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeDouble(double v) + { + writeTypeDiscriminator(DOUBLE_TYPE); + writeDoubleImpl(v); + } + + public void writeDoubleImpl(final double v) + { + try + { + _data.writeDouble(v); + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeString(String string) + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + writeNullTerminatedStringImpl(string); + } + } + + public void writeNullTerminatedStringImpl(String string) + + { + try + { + _data.write(string.getBytes(UTF8)); + _data.writeByte((byte) 0); + } + catch (IOException e) + { + throw handle(e); + } + + } + + public void writeBytes(byte[] bytes) + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + public void writeBytes(byte[] bytes, int offset, int length) + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + writeBytesImpl(bytes, offset, length); + } + + public void writeBytesImpl(final byte[] bytes, final int offset, final int length) + { + try + { + if (bytes == null) + { + _data.writeInt(-1); + } + else + { + _data.writeInt(length); + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + public void writeBytesRaw(final byte[] bytes, final int offset, final int length) + { + try + { + if (bytes != null) + { + _data.write(bytes, offset, length); + } + } + catch (IOException e) + { + throw handle(e); + } + } + + + public void writeObject(Object object) throws TypedBytesFormatException + { + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new TypedBytesFormatException("Only primitives plus byte arrays and String are valid types"); + } + } + + public ByteBuffer getData() + { + return ByteBuffer.wrap(_baos.toByteArray()); + } + + public void writeLengthPrefixedUTF(final String string) throws TypedBytesFormatException + { + try + { + CharsetEncoder encoder = UTF8.newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + writeShortImpl((short) encodedString.limit()); + while(encodedString.hasRemaining()) + { + _data.writeByte(encodedString.get()); + } + } + catch (CharacterCodingException e) + { + TypedBytesFormatException jmse = new TypedBytesFormatException("Unable to encode string: " + e); + jmse.initCause(e); + throw jmse; + } + catch (IOException e) + { + throw handle(e); + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java new file mode 100644 index 0000000000..95e7ea0acc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java @@ -0,0 +1,9 @@ +package org.apache.qpid.typedmessage; + +public class TypedBytesFormatException extends Exception +{ + public TypedBytesFormatException(String s) + { + super(s); + } +} |
