diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-18 18:20:41 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-18 18:20:41 +0000 |
| commit | 53d03c43b4e00ce525d5e30c4cbe64c05a1b0222 (patch) | |
| tree | a6e4d9de842ef6d5a49a8599ac3c3879b4126cc6 /qpid/java/common/src | |
| parent | e5021e250fed784435d5d00ad5120f94fe33df5b (diff) | |
| download | qpid-python-53d03c43b4e00ce525d5e30c4cbe64c05a1b0222.tar.gz | |
Copied remotely
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@488382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
80 files changed, 11094 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java new file mode 100644 index 0000000000..cd8b40c6da --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ channel closed exception. + */ +public class AMQChannelClosedException extends AMQException +{ + public AMQChannelClosedException(int errorCode, String msg) + { + super(errorCode, msg); + } +} + + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java new file mode 100644 index 0000000000..4d604f8c0b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQFrame; + +public class AMQChannelException extends AMQException +{ + private final int _classId; + private final int _methodId; + + public AMQChannelException(int errorCode, String msg, int classId, int methodId, Throwable t) + { + super(errorCode, msg, t); + _classId = classId; + _methodId = methodId; + } + + public AMQChannelException(int errorCode, String msg, int classId, int methodId) + { + super(errorCode, msg); + _classId = classId; + _methodId = methodId; + } + + public AMQFrame getCloseFrame(int channel) + { + return ChannelCloseBody.createAMQFrame(channel, getErrorCode(), getMessage(), _classId, _methodId); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java new file mode 100644 index 0000000000..6ec18bad20 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ channel closed exception. + */ +public class AMQConnectionClosedException extends AMQException +{ + public AMQConnectionClosedException(int errorCode, String msg) + { + super(errorCode, msg); + } +} + + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java new file mode 100644 index 0000000000..6254d80f32 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid; + +public class AMQConnectionException extends AMQException +{ + public AMQConnectionException(String message) + { + super(message); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java new file mode 100644 index 0000000000..b142eea73c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +/** + * AMQ disconnected exception. + */ +public class AMQDisconnectedException extends AMQException +{ + public AMQDisconnectedException(String msg) + { + super(msg); + } +} + + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java new file mode 100644 index 0000000000..93c31e4fa8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +import org.apache.log4j.Logger; + +/** + * Generic AMQ exception. + */ +public class AMQException extends Exception +{ + private int _errorCode; + + public AMQException(String message) + { + super(message); + } + + public AMQException(String msg, Throwable t) + { + super(msg, t); + } + + public AMQException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public AMQException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public AMQException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java new file mode 100644 index 0000000000..883e13e5e6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + + +public class AMQPInvalidClassException extends RuntimeException +{ + public AMQPInvalidClassException(String s) + { + super(s); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java new file mode 100644 index 0000000000..4944ccc371 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +/** + * Generic AMQ exception. + */ +public class AMQUndeliveredException extends AMQException +{ + private Object _bounced; + + public AMQUndeliveredException(int errorCode, String msg, Object bounced) + { + super(errorCode, msg); + + _bounced = bounced; + } + + public Object getUndeliveredMessage() + { + return _bounced; + } + +} + + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java new file mode 100644 index 0000000000..2201903ded --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +public class AMQUnresolvedAddressException extends AMQException +{ + String _broker; + + public AMQUnresolvedAddressException(String message, String broker) + { + super(message); + _broker = broker; + } + + public String toString() + { + return super.toString() + " Broker, \"" + _broker +"\""; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java new file mode 100644 index 0000000000..d7f1edbc30 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.filter.codec.ProtocolCodecFactory; +import org.apache.mina.filter.codec.ProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolEncoder; + +public class AMQCodecFactory implements ProtocolCodecFactory +{ + private AMQEncoder _encoder = new AMQEncoder(); + + private AMQDecoder _frameDecoder; + + /** + * @param expectProtocolInitiation true if the first frame received is going to be + * a protocol initiation frame, false if it is going to be a standard AMQ data block. + * The former case is used for the broker, which always expects to received the + * protocol initiation first from a newly connected client. + */ + public AMQCodecFactory(boolean expectProtocolInitiation) + { + _frameDecoder = new AMQDecoder(expectProtocolInitiation); + } + + public ProtocolEncoder getEncoder() + { + return _encoder; + } + + public ProtocolDecoder getDecoder() + { + return _frameDecoder; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java new file mode 100644 index 0000000000..bb981a242f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.CumulativeProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.ProtocolInitiation; + +/** + * There is one instance of this class per session. Any changes or configuration done + * at run time to the encoders or decoders only affects decoding/encoding of the + * protocol session data to which is it bound. + * + */ +public class AMQDecoder extends CumulativeProtocolDecoder +{ + private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); + + private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder(); + + private boolean _expectProtocolInitiation; + + public AMQDecoder(boolean expectProtocolInitiation) + { + _expectProtocolInitiation = expectProtocolInitiation; + } + + protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + if (_expectProtocolInitiation) + { + return doDecodePI(session, in, out); + } + else + { + return doDecodeDataBlock(session, in, out); + } + } + + protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + int pos = in.position(); + boolean enoughData = _dataBlockDecoder.decodable(session, in); + in.position(pos); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + _dataBlockDecoder.decode(session, in, out); + return true; + } + } + + private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + boolean enoughData = _piDecoder.decodable(session, in); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + _piDecoder.decode(session, in, out); + return true; + } + } + + public void setExpectProtocolInitiation(boolean expectProtocolInitiation) + { + _expectProtocolInitiation = expectProtocolInitiation; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java new file mode 100644 index 0000000000..ad252aec35 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQEncoder.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.codec; + +import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.common.IoSession; +import org.apache.qpid.framing.AMQDataBlockEncoder; + +public class AMQEncoder implements ProtocolEncoder +{ + private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder(); + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception + { + _dataBlockEncoder.encode(session, message, out); + } + + public void dispose(IoSession session) throws Exception + { + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java new file mode 100644 index 0000000000..2160dd6295 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/Configured.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.configuration; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * Marks a field as being "configured" externally. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface Configured +{ + /** + * The Commons Configuration path to the configuration element + */ + String path(); + + /** + * The default value to use should the path not be found in the configuration source + */ + String defaultValue(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java new file mode 100644 index 0000000000..12eebbf2b0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.configuration; + +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +/** + * Indicates an error parsing a property expansion. + */ +public class PropertyException extends AMQException +{ + public PropertyException(String message) + { + super(message); + } + + public PropertyException(String msg, Throwable t) + { + super(msg, t); + } + + public PropertyException(int errorCode, String msg, Throwable t) + { + super(errorCode, msg, t); + } + + public PropertyException(int errorCode, String msg) + { + super(errorCode, msg); + } + + public PropertyException(Logger logger, String msg, Throwable t) + { + super(logger, msg, t); + } + + public PropertyException(Logger logger, String msg) + { + super(logger, msg); + } + + public PropertyException(Logger logger, int errorCode, String msg) + { + super(logger, errorCode, msg); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java new file mode 100644 index 0000000000..37d8af2501 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.configuration; + +import java.util.ArrayList; +import java.util.Iterator; + +/** + * Based on code in Apache Ant, this utility class handles property expansion. This + * is most useful in config files and so on. + */ +public class PropertyUtils +{ + /** + * Replaces <code>${xxx}</code> style constructions in the given value + * with the string value of the corresponding data types. Replaces only system + * properties + * + * @param value The string to be scanned for property references. + * May be <code>null</code>, in which case this + * method returns immediately with no effect. + * @return the original string with the properties replaced, or + * <code>null</code> if the original string is <code>null</code>. + * @throws PropertyException if the string contains an opening + * <code>${</code> without a closing + * <code>}</code> + */ + public static String replaceProperties(String value) throws PropertyException + { + if (value == null) + { + return null; + } + + ArrayList<String> fragments = new ArrayList<String>(); + ArrayList<String> propertyRefs = new ArrayList<String>(); + parsePropertyString(value, fragments, propertyRefs); + + StringBuffer sb = new StringBuffer(); + Iterator j = propertyRefs.iterator(); + + for (String fragment : fragments) + { + if (fragment == null) + { + String propertyName = (String) j.next(); + + // try to get it from the project or keys + // Backward compatibility + String replacement = System.getProperty(propertyName); + + if (replacement == null) + { + throw new PropertyException("Property ${" + propertyName + + "} has not been set"); + } + fragment = replacement; + } + sb.append(fragment); + } + + return sb.toString(); + } + + /** + * Default parsing method. Parses the supplied value for properties which are specified + * using ${foo} syntax. $X is left as is, and $$ specifies a single $. + * @param value the property string to parse + * @param fragments is populated with the string fragments. A null means "insert a + * property value here. The number of nulls in the list when populated is equal to the + * size of the propertyRefs list + * @param propertyRefs populated with the property names to be added into the final + * String. + */ + private static void parsePropertyString(String value, ArrayList<String> fragments, + ArrayList<String> propertyRefs) + throws PropertyException + { + int prev = 0; + int pos; + //search for the next instance of $ from the 'prev' position + while ((pos = value.indexOf("$", prev)) >= 0) + { + + //if there was any text before this, add it as a fragment + if (pos > 0) + { + fragments.add(value.substring(prev, pos)); + } + //if we are at the end of the string, we tack on a $ + //then move past it + if (pos == (value.length() - 1)) + { + fragments.add("$"); + prev = pos + 1; + } + else if (value.charAt(pos + 1) != '{') + { + //peek ahead to see if the next char is a property or not + //not a property: insert the char as a literal + if (value.charAt(pos + 1) == '$') + { + // two $ map to one $ + fragments.add("$"); + prev = pos + 2; + } + else + { + // $X maps to $X for all values of X!='$' + fragments.add(value.substring(pos, pos + 2)); + prev = pos + 2; + } + } + else + { + // property found, extract its name or bail on a typo + int endName = value.indexOf('}', pos); + if (endName < 0) + { + throw new PropertyException("Syntax error in property: " + + value); + } + String propertyName = value.substring(pos + 2, endName); + fragments.add(null); + propertyRefs.add(propertyName); + prev = endName + 1; + } + } + //no more $ signs found + //if there is any tail to the file, append it + if (prev < value.length()) + { + fragments.add(value.substring(prev)); + } + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java new file mode 100644 index 0000000000..e67a5ba7fe --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.exchange; + +public class ExchangeDefaults +{ + public final static String TOPIC_EXCHANGE_NAME = "amq.topic"; + + public final static String TOPIC_EXCHANGE_CLASS = "topic"; + + public final static String DIRECT_EXCHANGE_NAME = "amq.direct"; + + public final static String DIRECT_EXCHANGE_CLASS = "direct"; + + public final static String HEADERS_EXCHANGE_NAME = "amq.match"; + + public final static String HEADERS_EXCHANGE_CLASS = "headers"; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java new file mode 100644 index 0000000000..d829144b11 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public abstract class AMQBody +{ + protected abstract byte getFrameType(); + + /** + * Get the size of the body + * @return unsigned short + */ + protected abstract int getSize(); + + protected abstract void writePayload(ByteBuffer buffer); + + protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java new file mode 100644 index 0000000000..9155c4024f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * A data block represents something that has a size in bytes and the ability to write itself to a byte + * buffer (similar to a byte array). + */ +public abstract class AMQDataBlock implements EncodableAMQDataBlock +{ + /** + * Get the size of buffer needed to store the byte representation of this + * frame. + * @return unsigned integer + */ + public abstract long getSize(); + + /** + * Writes the datablock to the specified buffer. + * @param buffer + */ + public abstract void writePayload(ByteBuffer buffer); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java new file mode 100644 index 0000000000..438bfa8d82 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; + +import java.util.HashMap; +import java.util.Map; + +public class AMQDataBlockDecoder +{ + Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); + + private final Map _supportedBodies = new HashMap(); + + public AMQDataBlockDecoder() + { + _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance()); + _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance()); + _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance()); + _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); + } + + public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + { + // type, channel, body size and end byte + if (in.remaining() < (1 + 2 + 4 + 1)) + { + return false; + } + + final byte type = in.get(); + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if (type <= 0 || channel < 0 || bodySize < 0) + { + throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); + } + + if (in.remaining() < (bodySize + 1)) + { + return false; + } + return true; + } + + private boolean isSupportedFrameType(byte frameType) + { + final boolean result = _supportedBodies.containsKey(new Byte(frameType)); + + if (!result) + { + _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType); + } + + return result; + } + + protected Object createAndPopulateFrame(ByteBuffer in) + throws AMQFrameDecodingException + { + final byte type = in.get(); + if (!isSupportedFrameType(type)) + { + throw new AMQFrameDecodingException("Unsupported frame type: " + type); + } + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type)); + if (bodyFactory == null) + { + throw new AMQFrameDecodingException("Unsupported body type: " + type); + } + AMQFrame frame = new AMQFrame(); + + frame.populateFromBuffer(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type); + } + return frame; + } + + public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + throws Exception + { + out.write(createAndPopulateFrame(in)); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java new file mode 100644 index 0000000000..3446563d35 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.demux.MessageEncoder; + +import java.util.HashSet; +import java.util.Set; + +public class AMQDataBlockEncoder implements MessageEncoder +{ + Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); + + private Set _messageTypes; + + public AMQDataBlockEncoder() + { + _messageTypes = new HashSet(); + _messageTypes.add(EncodableAMQDataBlock.class); + } + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception + { + final AMQDataBlock frame = (AMQDataBlock) message; + int frameSize = (int)frame.getSize(); + final ByteBuffer buffer = ByteBuffer.allocate(frameSize); + //buffer.setAutoExpand(true); + frame.writePayload(buffer); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); + } + + buffer.flip(); + out.write(buffer); + } + + public Set getMessageTypes() + { + return _messageTypes; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java new file mode 100644 index 0000000000..e75f37d623 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock +{ + public int channel; + + public AMQBody bodyFrame; + + public AMQFrame() + { + } + + public AMQFrame(int channel, AMQBody bodyFrame) + { + this.channel = channel; + this.bodyFrame = bodyFrame; + } + + public long getSize() + { + return 1 + 2 + 4 + bodyFrame.getSize() + 1; + } + + public void writePayload(ByteBuffer buffer) + { + buffer.put(bodyFrame.getFrameType()); + // TODO: how does channel get populated + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize()); + bodyFrame.writePayload(buffer); + buffer.put((byte) 0xCE); + } + + /** + * + * @param buffer + * @param channel unsigned short + * @param bodySize unsigned integer + * @param bodyFactory + * @throws AMQFrameDecodingException + */ + public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory) + throws AMQFrameDecodingException + { + this.channel = channel; + bodyFrame = bodyFactory.createBody(buffer); + bodyFrame.populateFromBuffer(buffer, bodySize); + } + + public String toString() + { + return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java new file mode 100644 index 0000000000..a24bd6aaa9 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + +public class AMQFrameDecodingException extends AMQException +{ + public AMQFrameDecodingException(String message) + { + super(message); + } + + public AMQFrameDecodingException(String message, Throwable t) + { + super(message, t); + } + + public AMQFrameDecodingException(Logger log, String message) + { + super(log, message); + } + + public AMQFrameDecodingException(Logger log, String message, Throwable t) + { + super(log, message, t); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java new file mode 100644 index 0000000000..6659b4ff8f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; + +public abstract class AMQMethodBody extends AMQBody +{ + public static final byte TYPE = 1; + + /** unsigned short */ + protected abstract int getBodySize(); + + /** + * @return unsigned short + */ + protected abstract int getClazz(); + + /** + * @return unsigned short + */ + protected abstract int getMethod(); + + protected abstract void writeMethodPayload(ByteBuffer buffer); + + protected byte getFrameType() + { + return TYPE; + } + + protected int getSize() + { + return 2 + 2 + getBodySize(); + } + + protected void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + populateMethodBodyFromBuffer(buffer); + } + + public String toString() + { + StringBuffer buf = new StringBuffer(getClass().toString()); + buf.append(" Class: ").append(getClazz()); + buf.append(" Method: ").append(getMethod()); + return buf.toString(); + } + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception + * should include the class and method ids of the body it resulted from). + */ + public AMQChannelException getChannelException(int code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod()); + } + + public AMQChannelException getChannelException(int code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), cause); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java new file mode 100644 index 0000000000..107af67dc7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class AMQMethodBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory(); + + public static AMQMethodBodyFactory getInstance() + { + return _instance; + } + + private AMQMethodBodyFactory() + { + _log.debug("Creating method body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort()); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java new file mode 100644 index 0000000000..e6acad502f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public class AMQProtocolClassException extends AMQProtocolHeaderException +{ + public AMQProtocolClassException(String message) + { + super(message); + } +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java new file mode 100644 index 0000000000..888ed14faf --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.qpid.AMQException; + +public class AMQProtocolHeaderException extends AMQException +{ + public AMQProtocolHeaderException(String message) + { + super(message); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java new file mode 100644 index 0000000000..c58979f876 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public class AMQProtocolInstanceException extends AMQProtocolHeaderException +{ + public AMQProtocolInstanceException(String message) + { + super(message); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java new file mode 100644 index 0000000000..7b326a0dc4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +/** + * Exception that is thrown when the client and server differ on expected protocol version (header) information. + * + */ +public class AMQProtocolVersionException extends AMQProtocolHeaderException +{ + public AMQProtocolVersionException(String message) + { + super(message); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java new file mode 100644 index 0000000000..61837f65cc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -0,0 +1,624 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class BasicContentHeaderProperties implements ContentHeaderProperties +{ + private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class); + + /** + * We store the encoded form when we decode the content header so that if we need to + * write it out without modifying it we can do so without incurring the expense of + * reencoding it + */ + private byte[] _encodedForm; + + /** + * Flag indicating whether the entire content header has been decoded yet + */ + private boolean _decoded = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker + * for routing in some cases so we can decode that separately. + */ + private boolean _decodedHeaders = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The content type is used by all + * clients to determine the message type + */ + private boolean _decodedContentType = true; + + private String _contentType; + + private String _encoding; + + private FieldTable _headers; + + private JMSPropertyFieldTable _jmsHeaders; + + private byte _deliveryMode; + + private byte _priority; + + private String _correlationId; + + private String _replyTo; + + private long _expiration; + + private String _messageId; + + private long _timestamp; + + private String _type; + + private String _userId; + + private String _appId; + + private String _clusterId; + + private int _propertyFlags = 0; + + public BasicContentHeaderProperties() + { + } + + public int getPropertyListSize() + { + if (_encodedForm != null) + { + return _encodedForm.length; + } + else + { + int size = 0; + + if ((_propertyFlags & (1 << 15)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_contentType); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_encoding); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + size += EncodingUtils.encodedFieldTableLength(_headers); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + size += 1; + } + if ((_propertyFlags & (1 << 11)) > 0) + { + size += 1; + } + if ((_propertyFlags & (1 << 10)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_correlationId); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_replyTo); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + size += EncodingUtils.encodedShortStringLength(String.valueOf(_expiration)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_messageId); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + size += 8; + } + if ((_propertyFlags & (1 << 5)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_type); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_userId); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_appId); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_clusterId); + } + return size; + } + } + + private void clearEncodedForm() + { + if (!_decoded && _encodedForm != null) + { + //decode(); + } + _encodedForm = null; + } + + public void setPropertyFlags(int propertyFlags) + { + clearEncodedForm(); + _propertyFlags = propertyFlags; + } + + public int getPropertyFlags() + { + return _propertyFlags; + } + + public void writePropertyListPayload(ByteBuffer buffer) + { + if (_encodedForm != null) + { + buffer.put(_encodedForm); + } + else + { + if ((_propertyFlags & (1 << 15)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _contentType); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _encoding); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + EncodingUtils.writeFieldTableBytes(buffer, _headers); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + buffer.put(_deliveryMode); + } + if ((_propertyFlags & (1 << 11)) > 0) + { + buffer.put(_priority); + } + if ((_propertyFlags & (1 << 10)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _correlationId); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _replyTo); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _messageId); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + EncodingUtils.writeTimestamp(buffer, _timestamp); + } + if ((_propertyFlags & (1 << 5)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _type); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _userId); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _appId); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + EncodingUtils.writeShortStringBytes(buffer, _clusterId); + } + } + } + + public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) + throws AMQFrameDecodingException + { + _propertyFlags = propertyFlags; + + if (_logger.isDebugEnabled()) + { + _logger.debug("Property flags: " + _propertyFlags); + } + decode(buffer); + /*_encodedForm = new byte[size]; + buffer.get(_encodedForm, 0, size); + _decoded = false; + _decodedHeaders = false; + _decodedContentType = false;*/ + } + + private void decode(ByteBuffer buffer) + { + //ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int pos = buffer.position(); + try + { + if ((_propertyFlags & (1 << 15)) > 0) + { + _contentType = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + _encoding = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + setJMSHeaders(); + } + if ((_propertyFlags & (1 << 12)) > 0) + { + _deliveryMode = buffer.get(); + } + if ((_propertyFlags & (1 << 11)) > 0) + { + _priority = buffer.get(); + } + if ((_propertyFlags & (1 << 10)) > 0) + { + _correlationId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 9)) > 0) + { + _replyTo = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 8)) > 0) + { + _expiration = Long.parseLong(EncodingUtils.readShortString(buffer)); + } + if ((_propertyFlags & (1 << 7)) > 0) + { + _messageId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 6)) > 0) + { + _timestamp = EncodingUtils.readTimestamp(buffer); + } + if ((_propertyFlags & (1 << 5)) > 0) + { + _type = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 4)) > 0) + { + _userId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 3)) > 0) + { + _appId = EncodingUtils.readShortString(buffer); + } + if ((_propertyFlags & (1 << 2)) > 0) + { + _clusterId = EncodingUtils.readShortString(buffer); + } + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e); + } + + final int endPos = buffer.position(); + buffer.position(pos); + final int len = endPos - pos; + _encodedForm = new byte[len]; + final int limit = buffer.limit(); + buffer.limit(endPos); + buffer.get(_encodedForm, 0, len); + buffer.limit(limit); + buffer.position(endPos); + _decoded = true; + } + + + private void decodeUpToHeaders() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + try + { + if ((_propertyFlags & (1 << 15)) > 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + if ((_propertyFlags & (1 << 14)) > 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + if ((_propertyFlags & (1 << 13)) > 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + setJMSHeaders(); + + } + _decodedHeaders = true; + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e); + } + } + + private void decodeUpToContentType() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + + if ((_propertyFlags & (1 << 15)) > 0) + { + _contentType = EncodingUtils.readShortString(buffer); + } + + _decodedContentType = true; + } + + private void decodeIfNecessary() + { + if (!_decoded) + { + //decode(); + } + } + + private void decodeHeadersIfNecessary() + { + if (!_decoded && !_decodedHeaders) + { + decodeUpToHeaders(); + } + } + + private void decodeContentTypeIfNecessary() + { + if (!_decoded && !_decodedContentType) + { + decodeUpToContentType(); + } + } + + public String getContentType() + { + decodeContentTypeIfNecessary(); + return _contentType; + } + + public void setContentType(String contentType) + { + clearEncodedForm(); + _propertyFlags |= (1 << 15); + _contentType = contentType; + } + + public String getEncoding() + { + decodeIfNecessary(); + return _encoding; + } + + public void setEncoding(String encoding) + { + clearEncodedForm(); + _propertyFlags |= (1 << 14); + _encoding = encoding; + } + + public FieldTable getHeaders() + { + decodeHeadersIfNecessary(); + + if (_headers == null) + { + setHeaders(FieldTableFactory.newFieldTable()); + } + + return _headers; + } + + public void setHeaders(FieldTable headers) + { + clearEncodedForm(); + _propertyFlags |= (1 << 13); + _headers = headers; + setJMSHeaders(); + } + + private void setJMSHeaders() + { + if (_jmsHeaders == null) + { + _jmsHeaders = new JMSPropertyFieldTable(_headers); + } + else + { + _jmsHeaders.setFieldTable(_headers); + } + } + + public JMSPropertyFieldTable getJMSHeaders() + { + //This will ensure we have a blank header + getHeaders(); + return _jmsHeaders; + } + + public byte getDeliveryMode() + { + decodeIfNecessary(); + return _deliveryMode; + } + + public void setDeliveryMode(byte deliveryMode) + { + clearEncodedForm(); + _propertyFlags |= (1 << 12); + _deliveryMode = deliveryMode; + } + + public byte getPriority() + { + decodeIfNecessary(); + return _priority; + } + + public void setPriority(byte priority) + { + clearEncodedForm(); + _propertyFlags |= (1 << 11); + _priority = priority; + } + + public String getCorrelationId() + { + decodeIfNecessary(); + return _correlationId; + } + + public void setCorrelationId(String correlationId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 10); + _correlationId = correlationId; + } + + public String getReplyTo() + { + decodeIfNecessary(); + return _replyTo; + } + + public void setReplyTo(String replyTo) + { + clearEncodedForm(); + _propertyFlags |= (1 << 9); + _replyTo = replyTo; + } + + public long getExpiration() + { + decodeIfNecessary(); + return _expiration; + } + + public void setExpiration(long expiration) + { + clearEncodedForm(); + _propertyFlags |= (1 << 8); + _expiration = expiration; + } + + + public String getMessageId() + { + decodeIfNecessary(); + return _messageId; + } + + public void setMessageId(String messageId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 7); + _messageId = messageId; + } + + public long getTimestamp() + { + decodeIfNecessary(); + return _timestamp; + } + + public void setTimestamp(long timestamp) + { + clearEncodedForm(); + _propertyFlags |= (1 << 6); + _timestamp = timestamp; + } + + public String getType() + { + decodeIfNecessary(); + return _type; + } + + public void setType(String type) + { + clearEncodedForm(); + _propertyFlags |= (1 << 5); + _type = type; + } + + public String getUserId() + { + decodeIfNecessary(); + return _userId; + } + + public void setUserId(String userId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 4); + _userId = userId; + } + + public String getAppId() + { + decodeIfNecessary(); + return _appId; + } + + public void setAppId(String appId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 3); + _appId = appId; + } + + public String getClusterId() + { + decodeIfNecessary(); + return _clusterId; + } + + public void setClusterId(String clusterId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 2); + _clusterId = clusterId; + } + + public String toString() + { + return "reply-to = " + _replyTo + " propertyFlags = " + _propertyFlags; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java new file mode 100644 index 0000000000..cf5708d993 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. + */ +public interface BodyFactory +{ + AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java new file mode 100644 index 0000000000..5ec62ede93 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock +{ + private ByteBuffer _encodedBlock; + + private AMQDataBlock[] _blocks; + + public CompositeAMQDataBlock(AMQDataBlock[] blocks) + { + _blocks = blocks; + } + + /** + * The encoded block will be logically first before the AMQDataBlocks which are encoded + * into the buffer afterwards. + * @param encodedBlock already-encoded data + * @param blocks some blocks to be encoded. + */ + public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks) + { + this(blocks); + _encodedBlock = encodedBlock; + } + + public AMQDataBlock[] getBlocks() + { + return _blocks; + } + + public ByteBuffer getEncodedBlock() + { + return _encodedBlock; + } + + public long getSize() + { + long frameSize = 0; + for (int i = 0; i < _blocks.length; i++) + { + frameSize += _blocks[i].getSize(); + } + if (_encodedBlock != null) + { + _encodedBlock.rewind(); + frameSize += _encodedBlock.remaining(); + } + return frameSize; + } + + public void writePayload(ByteBuffer buffer) + { + if (_encodedBlock != null) + { + buffer.put(_encodedBlock); + } + for (int i = 0; i < _blocks.length; i++) + { + _blocks[i].writePayload(buffer); + } + } + + public String toString() + { + if (_blocks == null) + { + return "No blocks contained in composite frame"; + } + else + { + StringBuilder buf = new StringBuilder(this.getClass().getName()); + buf.append("{encodedBlock=").append(_encodedBlock); + for (int i = 0 ; i < _blocks.length; i++) + { + buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); + } + buf.append("}"); + return buf.toString(); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java new file mode 100644 index 0000000000..e5feeec2a4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface Content +{ + // TODO: New Content class required for AMQP 0-9. +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java new file mode 100644 index 0000000000..3a2e4b3b3c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentBody extends AMQBody +{ + public static final byte TYPE = 3; + + public ByteBuffer payload; + + protected byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return (payload == null ? 0 : payload.limit()); + } + + public void writePayload(ByteBuffer buffer) + { + if (payload != null) + { + ByteBuffer copy = payload.duplicate(); + buffer.put(copy.rewind()); + } + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); + } + + } + + public void reduceBufferToFit() + { + if (payload != null && (payload.remaining() < payload.capacity() / 2)) + { + int size = payload.limit(); + ByteBuffer newPayload = ByteBuffer.allocate(size); + + newPayload.put(payload); + newPayload.flip(); + + //reduce reference count on payload + payload.release(); + + payload = newPayload; + } + } + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java new file mode 100644 index 0000000000..22af331ab7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class ContentBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final ContentBodyFactory _instance = new ContentBodyFactory(); + + public static ContentBodyFactory getInstance() + { + return _instance; + } + + private ContentBodyFactory() + { + _log.debug("Creating content body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return new ContentBody(); + } +} + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java new file mode 100644 index 0000000000..a59869b1d8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderBody extends AMQBody +{ + public static final byte TYPE = 2; + + public int classId; + + public int weight; + + /** unsigned long but java can't handle that anyway when allocating byte array */ + public long bodySize; + + /** must never be null */ + public ContentHeaderProperties properties; + + public ContentHeaderBody() + { + } + + public ContentHeaderBody(ContentHeaderProperties props, int classId) + { + properties = props; + this.classId = classId; + } + + public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize) + { + this(props, classId); + this.weight = weight; + this.bodySize = bodySize; + } + + protected byte getFrameType() + { + return TYPE; + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + classId = buffer.getUnsignedShort(); + weight = buffer.getUnsignedShort(); + bodySize = buffer.getLong(); + int propertyFlags = buffer.getUnsignedShort(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); + properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + } + + /** + * Helper method that is used currently by the persistence layer (by BDB at the moment). + * @param buffer + * @param size + * @return + * @throws AMQFrameDecodingException + */ + public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + ContentHeaderBody body = new ContentHeaderBody(); + body.populateFromBuffer(buffer, size); + return body; + } + + public int getSize() + { + return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, classId); + EncodingUtils.writeUnsignedShort(buffer, weight); + buffer.putLong(bodySize); + EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); + properties.writePropertyListPayload(buffer); + } + + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + long bodySize) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize); + return frame; + } + + public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) + { + final AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java new file mode 100644 index 0000000000..ddf63f8aa3 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderBodyFactory implements BodyFactory +{ + private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory(); + + public static ContentHeaderBodyFactory getInstance() + { + return _instance; + } + + private ContentHeaderBodyFactory() + { + _log.debug("Creating content header body factory"); + } + + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + // all content headers are the same - it is only the properties that differ. + // the content header body further delegates construction of properties + return new ContentHeaderBody(); + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java new file mode 100644 index 0000000000..561d7852fd --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * There will be an implementation of this interface for each content type. All content types have associated + * header properties and this provides a way to encode and decode them. + */ +public interface ContentHeaderProperties +{ + /** + * Writes the property list to the buffer, in a suitably encoded form. + * @param buffer The buffer to write to + */ + void writePropertyListPayload(ByteBuffer buffer); + + /** + * Populates the properties from buffer. + * @param buffer The buffer to read from. + * @param propertyFlags he property flags. + * @throws AMQFrameDecodingException when the buffer does not contain valid data + */ + void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) + throws AMQFrameDecodingException; + + /** + * @return the size of the encoded property list in bytes. + */ + int getPropertyListSize(); + + /** + * Gets the property flags. Property flags indicate which properties are set in the list. The + * position and meaning of each flag is defined in the protocol specification for the particular + * content type with which these properties are associated. + * @return flags + */ + int getPropertyFlags(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java new file mode 100644 index 0000000000..cec413cb9d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class ContentHeaderPropertiesFactory +{ + private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); + + public static ContentHeaderPropertiesFactory getInstance() + { + return _instance; + } + + private ContentHeaderPropertiesFactory() + { + } + + public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, + ByteBuffer buffer, int size) + throws AMQFrameDecodingException + { + ContentHeaderProperties properties; + switch (classId) + { + case BasicConsumeBody.CLASS_ID: + properties = new BasicContentHeaderProperties(); + break; + default: + throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); + } + properties.populatePropertiesFromBuffer(buffer, propertyFlags, size); + return properties; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java new file mode 100644 index 0000000000..9cf96e698c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +/** + * Marker interface to indicate to MINA that a data block should be encoded with the + * single encoder/decoder that we have defined. + * + * Note that due to a bug in MINA all classes must directly implement this interface, even if + * a superclass implements it. + * TODO: fix MINA so that this is not necessary + * + */ +public interface EncodableAMQDataBlock +{ + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java new file mode 100644 index 0000000000..46dff9ffa8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -0,0 +1,630 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +import java.nio.charset.Charset; + +public class EncodingUtils +{ + private static final Logger _logger = Logger.getLogger(EncodingUtils.class); + + private static final String STRING_ENCODING = "iso8859-15"; + + private static final Charset _charset = Charset.forName("iso8859-15"); + + public static final int SIZEOF_UNSIGNED_SHORT = 2; + public static final int SIZEOF_UNSIGNED_INT = 4; + + public static int encodedShortStringLength(String s) + { + if (s == null) + { + return 1; + } + else + { + return (short) (1 + s.length()); + } + } + + public static int encodedLongStringLength(String s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length(); + } + } + + public static int encodedLongStringLength(char[] s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length; + } + } + + public static int encodedLongstrLength(byte[] bytes) + { + if (bytes == null) + { + return 4; + } + else + { + return 4 + bytes.length; + } + } + + public static int encodedFieldTableLength(FieldTable table) + { + if (table == null) + { + // size is encoded as 4 octets + return 4; + } + else + { + // size of the table plus 4 octets for the size + return (int) table.getEncodedSize() + 4; + } + } + + public static int encodedContentLength(Content table) + { + // TODO: New Content class required for AMQP 0-9. + return 0; + } + + public static void writeShortStringBytes(ByteBuffer buffer, String s) + { + if (s != null) + { + byte[] encodedString = new byte[s.length()]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + writeBytes(buffer, encodedString); + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, String s) + { + assert s == null || s.length() <= 0xFFFE; + if (s != null) + { + int len = s.length(); + writeUnsignedInteger(buffer, s.length()); + byte[] encodedString = new byte[len]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, char[] s) + { + assert s == null || s.length <= 0xFFFE; + if (s != null) + { + int len = s.length; + writeUnsignedInteger(buffer, s.length); + byte[] encodedString = new byte[len]; + for (int i = 0; i < s.length; i++) + { + encodedString[i] = (byte) s[i]; + } + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes) + { + assert bytes == null || bytes.length <= 0xFFFE; + if (bytes != null) + { + writeUnsignedInteger(buffer, bytes.length); + buffer.put(bytes); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeUnsignedByte(ByteBuffer buffer, short b) + { + byte bv = (byte) b; + buffer.put(bv); + } + + public static void writeUnsignedShort(ByteBuffer buffer, int s) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (s < Short.MAX_VALUE) + { + buffer.putShort((short) s); + } + else + { + short sv = (short) s; + buffer.put((byte) (0xFF & (sv >> 8))); + buffer.put((byte) (0xFF & sv)); + } + } + + + public static long unsignedIntegerLength() + { + return 4; + } + + public static void writeUnsignedInteger(ByteBuffer buffer, long l) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (l < Integer.MAX_VALUE) + { + buffer.putInt((int) l); + } + else + { + int iv = (int) l; + + // FIXME: This *may* go faster if we build this into a local 4-byte array and then + // put the array in a single call. + buffer.put((byte) (0xFF & (iv >> 24))); + buffer.put((byte) (0xFF & (iv >> 16))); + buffer.put((byte) (0xFF & (iv >> 8))); + buffer.put((byte) (0xFF & iv)); + } + } + + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) + { + if (table != null) + { + table.writeToBuffer(buffer); + } + else + { + EncodingUtils.writeUnsignedInteger(buffer, 0); + } + } + + public static void writeContentBytes(ByteBuffer buffer, Content content) + { + // TODO: New Content class required for AMQP 0-9. + } + + public static void writeBooleans(ByteBuffer buffer, boolean[] values) + { + byte packedValue = 0; + for (int i = 0; i < values.length; i++) + { + if (values[i]) + { + packedValue = (byte) (packedValue | (1 << i)); + } + } + + buffer.put(packedValue); + } + + /** + * This is used for writing longstrs. + * + * @param buffer + * @param data + */ + public static void writeLongstr(ByteBuffer buffer, byte[] data) + { + if (data != null) + { + writeUnsignedInteger(buffer, data.length); + buffer.put(data); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeTimestamp(ByteBuffer buffer, long timestamp) + { + writeUnsignedInteger(buffer, 0/*timestamp msb*/); + writeUnsignedInteger(buffer, timestamp); + } + + public static boolean[] readBooleans(ByteBuffer buffer) + { + byte packedValue = buffer.get(); + boolean[] result = new boolean[8]; + + for (int i = 0; i < 8; i++) + { + result[i] = ((packedValue & (1 << i)) != 0); + } + return result; + } + + public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + return FieldTableFactory.newFieldTable(buffer, length); + } + } + + public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException + { + // TODO: New Content class required for AMQP 0-9. + return null; + } + + public static String readShortString(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[length]; + buffer.get(stringBytes, 0, length); + char[] stringChars = new char[length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + + return new String(stringChars); + } + } + + public static String readLongString(ByteBuffer buffer) + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return ""; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[(int) length]; + buffer.get(stringBytes, 0, (int) length); + char[] stringChars = new char[(int) length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + return new String(stringChars); + } + } + + public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + byte[] result = new byte[(int) length]; + buffer.get(result); + return result; + } + } + + public static long readTimestamp(ByteBuffer buffer) + { + // Discard msb from AMQ timestamp + buffer.getUnsignedInt(); + return buffer.getUnsignedInt(); + } + + + static byte[] hexToByteArray(String id) + { + // Should check param for null, long enough for this check, upper-case and trailing char + String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x + + int len = s.length(); + int byte_len = len / 2; + byte[] b = new byte[byte_len]; + + for (int i = 0; i < byte_len; i++) + { + // fixme: refine these repetitive subscript calcs. + int ch = i * 2; + + byte b1 = Byte.parseByte(s.substring(ch, ch + 1), 16); + byte b2 = Byte.parseByte(s.substring(ch + 1, ch + 2), 16); + + b[i] = (byte) (b1 * 16 + b2); + } + + return (b); + } + + public static char[] convertToHexCharArray(byte[] from) + { + int length = from.length; + char[] result_buff = new char[length * 2 + 2]; + + result_buff[0] = '0'; + result_buff[1] = 'x'; + + int bite; + int dest = 2; + + for (int i = 0; i < length; i++) + { + bite = from[i]; + + if (bite < 0) + { + bite += 256; + } + + result_buff[dest++] = hex_chars[bite >> 4]; + result_buff[dest++] = hex_chars[bite & 0x0f]; + } + + return (result_buff); + } + + public static String convertToHexString(byte[] from) + { + return (new String(convertToHexCharArray(from))); + } + + public static String convertToHexString(ByteBuffer bb) + { + int size = bb.limit(); + + byte[] from = new byte[size]; + + // Is this not the same. + //bb.get(from, 0, size); + for (int i = 0; i < size; i++) + { + from[i] = bb.get(i); + } + + return (new String(convertToHexCharArray(from))); + } + + private static char hex_chars[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + + //**** new methods + + // AMQP_BOOLEAN_PROPERTY_PREFIX + + public static void writeBoolean(ByteBuffer buffer, Boolean aBoolean) + { + buffer.put((byte) (aBoolean ? 1 : 0)); + } + + public static Boolean readBoolean(ByteBuffer buffer) + { + byte packedValue = buffer.get(); + return (packedValue == 1); + } + + public static int encodedBooleanLength() + { + return 1; + } + + // AMQP_BYTE_PROPERTY_PREFIX + public static void writeByte(ByteBuffer buffer, Byte aByte) + { + buffer.put(aByte); + } + + public static Byte readByte(ByteBuffer buffer) + { + return buffer.get(); + } + + public static int encodedByteLength() + { + return 1; + } + + + // AMQP_SHORT_PROPERTY_PREFIX + public static void writeShort(ByteBuffer buffer, Short aShort) + { + buffer.putShort(aShort); + } + + public static Short readShort(ByteBuffer buffer) + { + return buffer.getShort(); + } + + public static int encodedShortLength() + { + return 2; + } + + // INTEGER_PROPERTY_PREFIX + public static void writeInteger(ByteBuffer buffer, Integer aInteger) + { + buffer.putInt(aInteger); + } + + public static Integer readInteger(ByteBuffer buffer) + { + return buffer.getInt(); + } + + public static int encodedIntegerLength() + { + return 4; + } + + // AMQP_LONG_PROPERTY_PREFIX + public static void writeLong(ByteBuffer buffer, Long aLong) + { + buffer.putLong(aLong); + } + + public static Long readLong(ByteBuffer buffer) + { + return buffer.getLong(); + } + + public static int encodedLongLength() + { + return 8; + } + + // Float_PROPERTY_PREFIX + public static void writeFloat(ByteBuffer buffer, Float aFloat) + { + buffer.putFloat(aFloat); + } + + public static Float readFloat(ByteBuffer buffer) + { + return buffer.getFloat(); + } + + public static int encodedFloatLength() + { + return 4; + } + + + // Double_PROPERTY_PREFIX + public static void writeDouble(ByteBuffer buffer, Double aDouble) + { + buffer.putDouble(aDouble); + } + + public static Double readDouble(ByteBuffer buffer) + { + return buffer.getDouble(); + } + + public static int encodedDoubleLength() + { + return 8; + } + + + public static byte[] readBytes(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + byte[] dataBytes = new byte[length]; + buffer.get(dataBytes, 0, length); + + return dataBytes; + } + } + + public static void writeBytes(ByteBuffer buffer, byte[] data) + { + if (data != null) + { + // TODO: check length fits in an unsigned byte + writeUnsignedByte(buffer, (short) data.length); + buffer.put(data); + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + + //CHAR_PROPERTY + public static int encodedCharLength() + { + return encodedByteLength(); + } + + public static char readChar(ByteBuffer buffer) + { + //This is valid as we know that the Character is ASCII 0..127 + return (char) buffer.get(); + } + + public static void writeChar(ByteBuffer buffer, char character) + { + //This is valid as we know that the Character is ASCII 0..127 + writeByte(buffer, (byte) character); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java new file mode 100644 index 0000000000..193c7adf1c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Map; +import java.util.Enumeration; + +public interface FieldTable extends Map +{ + void writeToBuffer(ByteBuffer buffer); + + void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException; + + byte[] getDataAsBytes(); + + public long getEncodedSize(); + + Object put(Object key, Object value); + + Object remove(Object key); + + + public Enumeration getPropertyNames(); + + public boolean propertyExists(String propertyName); + + //Getters + + public Boolean getBoolean(String string); + + public Byte getByte(String string); + + public Short getShort(String string); + + public Integer getInteger(String string); + + public Long getLong(String string); + + public Float getFloat(String string); + + public Double getDouble(String string); + + public String getString(String string); + + public Character getCharacter(String string); + + public byte[] getBytes(String string); + + public Object getObject(String string); + + // Setters + public Object setBoolean(String string, boolean b); + + public Object setByte(String string, byte b); + + public Object setShort(String string, short i); + + public Object setInteger(String string, int i); + + public Object setLong(String string, long l); + + public Object setFloat(String string, float v); + + public Object setDouble(String string, double v); + + public Object setString(String string, String string1); + + public Object setChar(String string, char c); + + public Object setBytes(String string, byte[] bytes); + + public Object setBytes(String string, byte[] bytes, int start, int length); + + public Object setObject(String string, Object object); + + public boolean isNullStringValue(String name); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java new file mode 100644 index 0000000000..b1fcd8a20b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class FieldTableFactory +{ + public static FieldTable newFieldTable() + { + return new PropertyFieldTable(); + } + + public static FieldTable newFieldTable(ByteBuffer byteBuffer, long length) throws AMQFrameDecodingException + { + return new PropertyFieldTable(byteBuffer, length); + } + + public static FieldTable newFieldTable(String text) + { + return new PropertyFieldTable(text); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java new file mode 100644 index 0000000000..7a160ef471 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class HeartbeatBody extends AMQBody +{ + public static final byte TYPE = 8; + public static AMQFrame FRAME = new HeartbeatBody().toFrame(); + + protected byte getFrameType() + { + return TYPE; + } + + protected int getSize() + { + return 0;//heartbeats we generate have no payload + } + + protected void writePayload(ByteBuffer buffer) + { + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + + public AMQFrame toFrame() + { + return new AMQFrame(0, this); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java new file mode 100644 index 0000000000..97bd3d9253 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class HeartbeatBodyFactory implements BodyFactory +{ + public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + { + return new HeartbeatBody(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java new file mode 100644 index 0000000000..142a689a01 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; + +import javax.jms.MessageFormatException; +import javax.jms.JMSException; +import java.util.Enumeration; + + +public class JMSPropertyFieldTable +{ + private FieldTable _fieldtable; + + public JMSPropertyFieldTable() + { + _fieldtable = new PropertyFieldTable(); + } + + public JMSPropertyFieldTable(FieldTable table) + { + _fieldtable = table; + } + + public JMSPropertyFieldTable(ByteBuffer buffer, long length) throws JMSException + { + try + { + _fieldtable = new PropertyFieldTable(buffer, length); + } + catch (AMQFrameDecodingException e) + { + JMSException error = new JMSException(e.getMessage()); + error.setLinkedException(e); + throw error; + } + } + + private void checkPropertyName(String propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if ("".equals(propertyName)) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected static void checkIdentiferFormat(String propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// – Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, “Null Values.” +// – The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// – Identifiers are case sensitive. +// – Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } + + // MapMessage Interface + public boolean getBoolean(String string) throws JMSException + { + Boolean b = _fieldtable.getBoolean(string); + + if (b == null) + { + if (_fieldtable.containsKey(string)) + { + Object str = _fieldtable.getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public char getCharacter(String string) throws JMSException + { + Character c = _fieldtable.getCharacter(string); + + if (c == null) + { + if (_fieldtable.isNullStringValue(string)) + { + throw new NullPointerException("Cannot convert null char"); + } + else + { + throw new MessageFormatException("getChar can't use " + string + " item."); + } + } + else + { + return (char) c; + } + } + + public byte[] getBytes(String string) throws JMSException + { + byte[] bs = _fieldtable.getBytes(string); + + if (bs == null) + { + throw new MessageFormatException("getBytes can't use " + string + " item."); + } + else + { + return bs; + } + } + + public byte getByte(String string) throws JMSException + { + Byte b = _fieldtable.getByte(string); + if (b == null) + { + if (_fieldtable.containsKey(string)) + { + Object str = _fieldtable.getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getByte can't use " + string + " item."); + } + else + { + return Byte.valueOf((String) str); + } + } + else + { + b = Byte.valueOf(null); + } + } + + return b; + } + + public short getShort(String string) throws JMSException + { + Short s = _fieldtable.getShort(string); + + if (s == null) + { + s = Short.valueOf(getByte(string)); + } + + return s; + } + + public int getInteger(String string) throws JMSException + { + Integer i = _fieldtable.getInteger(string); + + if (i == null) + { + i = Integer.valueOf(getShort(string)); + } + + return i; + } + + public long getLong(String string) throws JMSException + { + Long l = _fieldtable.getLong(string); + + if (l == null) + { + l = Long.valueOf(getInteger(string)); + } + + return l; + } + + public float getFloat(String string) throws JMSException + { + Float f = _fieldtable.getFloat(string); + + if (f == null) + { + if (_fieldtable.containsKey(string)) + { + Object str = _fieldtable.getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getFloat can't use " + string + " item."); + } + else + { + return Float.valueOf((String) str); + } + } + else + { + f = Float.valueOf(null); + } + + } + + return f; + } + + public double getDouble(String string) throws JMSException + { + Double d = _fieldtable.getDouble(string); + + if (d == null) + { + d = Double.valueOf(getFloat(string)); + } + + return d; + } + + public String getString(String string) throws JMSException + { + String s = _fieldtable.getString(string); + + if (s == null) + { + if (_fieldtable.containsKey(string)) + { + Object o = _fieldtable.getObject(string); + if (o instanceof byte[]) + { + throw new MessageFormatException("getObject couldn't find " + string + " item."); + } + else + { + if (o == null) + { + return null; + } + else + { + s = String.valueOf(o); + } + } + } + } + + return s; + } + + public Object getObject(String string) throws JMSException + { + return _fieldtable.getObject(string); + } + + public void setBoolean(String string, boolean b) throws JMSException + { + checkPropertyName(string); + _fieldtable.setBoolean(string, b); + } + + public void setChar(String string, char c) throws JMSException + { + checkPropertyName(string); + _fieldtable.setChar(string, c); + } + + public Object setBytes(String string, byte[] bytes) + { + return _fieldtable.setBytes(string, bytes, 0, bytes.length); + } + + public Object setBytes(String string, byte[] bytes, int start, int length) + { + return _fieldtable.setBytes(string, bytes, start, length); + } + + public void setByte(String string, byte b) throws JMSException + { + checkPropertyName(string); + _fieldtable.setByte(string, b); + } + + public void setShort(String string, short i) throws JMSException + { + checkPropertyName(string); + _fieldtable.setShort(string, i); + } + + public void setInteger(String string, int i) throws JMSException + { + checkPropertyName(string); + _fieldtable.setInteger(string, i); + } + + public void setLong(String string, long l) throws JMSException + { + checkPropertyName(string); + _fieldtable.setLong(string, l); + } + + public void setFloat(String string, float v) throws JMSException + { + checkPropertyName(string); + _fieldtable.setFloat(string, v); + } + + public void setDouble(String string, double v) throws JMSException + { + checkPropertyName(string); + _fieldtable.setDouble(string, v); + } + + public void setString(String string, String string1) throws JMSException + { + checkPropertyName(string); + _fieldtable.setString(string, string1); + } + + public void setObject(String string, Object object) throws JMSException + { + checkPropertyName(string); + try + { + _fieldtable.setObject(string, object); + } + catch (AMQPInvalidClassException aice) + { + throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); + } + } + + public boolean itemExists(String string) throws JMSException + { + return _fieldtable.containsKey(string); + } + + public void setFieldTable(FieldTable headers) + { + _fieldtable = headers; + } + + public Enumeration getPropertyNames() + { + return _fieldtable.getPropertyNames(); + } + + public void clear() + { + _fieldtable.clear(); + } + + public boolean propertyExists(String propertyName) + { + return _fieldtable.propertyExists(propertyName); + } + + public Object put(Object key, Object value) + { + return _fieldtable.put(key, value); + } + + public Object remove(String propertyName) + { + return _fieldtable.remove(propertyName); + } + + public boolean isEmpty() + { + return _fieldtable.isEmpty(); + } + + public void writeToBuffer(ByteBuffer data) + { + _fieldtable.writeToBuffer(data); + } + + public Enumeration getMapNames() + { + return getPropertyNames(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java new file mode 100644 index 0000000000..1292ff2f6e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java @@ -0,0 +1,1280 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; + +import java.util.Collection; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.Vector; +import java.util.HashMap; + +//extends FieldTable +public class PropertyFieldTable implements FieldTable +{ + private static final Logger _logger = Logger.getLogger(PropertyFieldTable.class); + + private static final String BOOLEAN = "boolean"; + private static final String BYTE = "byte"; + private static final String BYTES = "bytes"; + private static final String SHORT = "short"; + private static final String INT = "int"; + private static final String LONG = "long"; + private static final String FLOAT = "float"; + private static final String DOUBLE = "double"; + private static final String STRING = "string"; + private static final String NULL_STRING = "nullstring"; + private static final String CHAR = "char"; + private static final String UNKNOWN = "unknown type"; + + private static final String PROPERTY_FIELD_TABLE_CLOSE_XML = "</PropertyFieldTable>"; + private static final String PROPERTY_FIELD_TABLE_OPEN_XML = "<PropertyFieldTable>"; + private static final String BYTES_CLOSE_XML = "</" + BYTES + ">"; + private static final String BYTES_OPEN_XML_START = "<" + BYTES; + + public static enum Prefix + { + //AMQP FieldTable Wire Types + AMQP_DECIMAL_PROPERTY_PREFIX('D'), + AMQP_UNSIGNED_SHORT_PROPERTY_PREFIX('S'), + AMQP_UNSIGNED_INT_PROPERTY_PREFIX('I'), + AMQP_UNSIGNED_LONG_PROPERTY_PREFIX('L'), + AMQP_DOUBLE_EXTTENDED_PROPERTY_PREFIX('D'), + + AMQP_TIMESTAMP_PROPERTY_PREFIX('T'), + AMQP_BINARY_PROPERTY_PREFIX('x'), + + //Strings + AMQP_ASCII_STRING_PROPERTY_PREFIX('c'), + AMQP_WIDE_STRING_PROPERTY_PREFIX('C'), + AMQP_NULL_STRING_PROPERTY_PREFIX('n'), + + //Java Primative Types + AMQP_BOOLEAN_PROPERTY_PREFIX('t'), + AMQP_BYTE_PROPERTY_PREFIX('b'), + AMQP_ASCII_CHARACTER_PROPERTY_PREFIX('k'), + AMQP_SHORT_PROPERTY_PREFIX('s'), + AMQP_INT_PROPERTY_PREFIX('i'), + AMQP_LONG_PROPERTY_PREFIX('l'), + AMQP_FLOAT_PROPERTY_PREFIX('f'), + AMQP_DOUBLE_PROPERTY_PREFIX('d'); + + private final char _identifier; + + Prefix(char identifier) + { + _identifier = identifier; + //_reverseTypeMap.put(identifier, this); + } + + public final char identifier() + { + return _identifier; + } + + } + + public static Map<Character, Prefix> _reverseTypeMap = new HashMap<Character, Prefix>(); + + static + { + for (Prefix p : Prefix.values()) + { + _reverseTypeMap.put(p.identifier(), p); + } + } + + private LinkedHashMap<String, Object> _properties; + private LinkedHashMap<String, Prefix> _propertyNamesTypeMap; + private long _encodedSize = 0; + + public PropertyFieldTable() + { + super(); + _properties = new LinkedHashMap<String, Object>(); + _propertyNamesTypeMap = new LinkedHashMap<String, Prefix>(); + } + + public PropertyFieldTable(String textFormat) + { + this(); + try + { + parsePropertyFieldTable(textFormat); + } + catch (Exception e) + { + _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat, e); + throw new IllegalArgumentException("Unable to decode PropertyFieldTable format:" + textFormat); + } + } + + /** + * Construct a new field table. + * + * @param buffer the buffer from which to read data. The length byte must be read already + * @param length the length of the field table. Must be > 0. + * @throws AMQFrameDecodingException if there is an error decoding the table + */ + public PropertyFieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException + { + this(); + setFromBuffer(buffer, length); + } + + // ************ Getters + private Object get(String propertyName, Prefix prefix) + { + //Retrieve the type associated with this name + Prefix type = _propertyNamesTypeMap.get(propertyName); + + if (type == null) + { + return null; + } + + if (type.equals(prefix)) + { + return _properties.get(propertyName); + } + else + { + return null; + } + } + + public Boolean getBoolean(String string) + { + Object o = get(string, Prefix.AMQP_BOOLEAN_PROPERTY_PREFIX); + if (o != null && o instanceof Boolean) + { + return (Boolean) o; + } + else + { + return null; + } + } + + public Byte getByte(String string) + { + Object o = get(string, Prefix.AMQP_BYTE_PROPERTY_PREFIX); + if (o != null) + { + return (Byte) o; + } + else + { + return null; + } + } + + public Short getShort(String string) + { + Object o = get(string, Prefix.AMQP_SHORT_PROPERTY_PREFIX); + if (o != null) + { + return (Short) o; + } + else + { + return null; + } + } + + public Integer getInteger(String string) + { + Object o = get(string, Prefix.AMQP_INT_PROPERTY_PREFIX); + if (o != null) + { + return (Integer) o; + } + else + { + return null; + } + } + + public Long getLong(String string) + { + Object o = get(string, Prefix.AMQP_LONG_PROPERTY_PREFIX); + if (o != null) + { + return (Long) o; + } + else + { + return null; + } + } + + public Float getFloat(String string) + { + Object o = get(string, Prefix.AMQP_FLOAT_PROPERTY_PREFIX); + if (o != null) + { + return (Float) o; + } + else + { + return null; + } + } + + public Double getDouble(String string) + { + Object o = get(string, Prefix.AMQP_DOUBLE_PROPERTY_PREFIX); + if (o != null) + { + return (Double) o; + } + else + { + return null; + } + } + + public String getString(String string) + { + Object o = get(string, Prefix.AMQP_ASCII_STRING_PROPERTY_PREFIX); + if (o != null) + { + return (String) o; + } + else + { + o = get(string, Prefix.AMQP_WIDE_STRING_PROPERTY_PREFIX); + if (o != null) + { + return (String) o; + } + else + { + + Prefix type = _propertyNamesTypeMap.get(string); + + if (type == null || type.equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX)) + { + return null; + } + else + { + switch (type) + { + case AMQP_ASCII_STRING_PROPERTY_PREFIX: + case AMQP_WIDE_STRING_PROPERTY_PREFIX: + case AMQP_BINARY_PROPERTY_PREFIX: + return null; + default: + case AMQP_BYTE_PROPERTY_PREFIX: + case AMQP_BOOLEAN_PROPERTY_PREFIX: + case AMQP_SHORT_PROPERTY_PREFIX: + case AMQP_INT_PROPERTY_PREFIX: + case AMQP_LONG_PROPERTY_PREFIX: + case AMQP_FLOAT_PROPERTY_PREFIX: + case AMQP_DOUBLE_PROPERTY_PREFIX: + return String.valueOf(_properties.get(string)); + case AMQP_ASCII_CHARACTER_PROPERTY_PREFIX: + Object value = _properties.get(string); + if (value == null) + { + throw new NullPointerException("null char cannot be converted to String"); + } + else + { + return String.valueOf(value); + } + } + } + } + } + } + + public Character getCharacter(String string) + { + Object o = get(string, Prefix.AMQP_ASCII_CHARACTER_PROPERTY_PREFIX); + if (o != null) + { + return (Character) o; + } + else + { + return null; + } + } + + public byte[] getBytes(String string) + { + Object o = get(string, Prefix.AMQP_BINARY_PROPERTY_PREFIX); + if (o != null) + { + return (byte[]) o; + } + else + { + return null; + } + } + + public Object getObject(String string) + { + return _properties.get(string); + } + + // ************ Setters + + public Object setBoolean(String string, boolean b) + { + return put(Prefix.AMQP_BOOLEAN_PROPERTY_PREFIX, string, b); + } + + public Object setByte(String string, byte b) + { + return put(Prefix.AMQP_BYTE_PROPERTY_PREFIX, string, b); + } + + public Object setShort(String string, short i) + { + return put(Prefix.AMQP_SHORT_PROPERTY_PREFIX, string, i); + } + + public Object setInteger(String string, int i) + { + return put(Prefix.AMQP_INT_PROPERTY_PREFIX, string, i); + } + + public Object setLong(String string, long l) + { + return put(Prefix.AMQP_LONG_PROPERTY_PREFIX, string, l); + } + + public Object setFloat(String string, float v) + { + return put(Prefix.AMQP_FLOAT_PROPERTY_PREFIX, string, v); + } + + public Object setDouble(String string, double v) + { + return put(Prefix.AMQP_DOUBLE_PROPERTY_PREFIX, string, v); + } + + public Object setString(String string, String string1) + { + if (string1 == null) + { + return put(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX, string, null); + } + else + { + //FIXME: determine string encoding and set either WIDE or ASCII string +// if () + { + return put(Prefix.AMQP_WIDE_STRING_PROPERTY_PREFIX, string, string1); + } +// else +// { +// return put(Prefix.AMQP_ASCII_STRING_PROPERTY_PREFIX, string, string1); +// } + } + } + + public Object setChar(String string, char c) + { + return put(Prefix.AMQP_ASCII_CHARACTER_PROPERTY_PREFIX, string, c); + } + + public Object setBytes(String string, byte[] bytes) + { + return setBytes(string, bytes, 0, bytes.length); + } + + public Object setBytes(String string, byte[] bytes, int start, int length) + { + return put(Prefix.AMQP_BINARY_PROPERTY_PREFIX, string, sizeByteArray(bytes, start, length)); + } + + private byte[] sizeByteArray(byte[] bytes, int start, int length) + { + byte[] resized = new byte[length]; + int newIndex = 0; + for (int oldIndex = start; oldIndex < length; oldIndex++) + { + resized[newIndex] = bytes[oldIndex]; + newIndex++; + } + + return resized; + } + + + public Object setObject(String string, Object object) + { + if (object instanceof Boolean) + { + return setBoolean(string, (Boolean) object); + } + else if (object instanceof Byte) + { + return setByte(string, (Byte) object); + } + else if (object instanceof Short) + { + return setShort(string, (Short) object); + } + else if (object instanceof Integer) + { + return setInteger(string, (Integer) object); + } + else if (object instanceof Long) + { + return setLong(string, (Long) object); + } + else if (object instanceof Float) + { + return setFloat(string, (Float) object); + } + else if (object instanceof Double) + { + return setDouble(string, (Double) object); + } + else if (object instanceof String) + { + return setString(string, (String) object); + } + else if (object instanceof Character) + { + return setChar(string, (Character) object); + } + else if (object instanceof byte[]) + { + return setBytes(string, (byte[]) object); + } + + throw new AMQPInvalidClassException("Only Primatives objects allowed Object is:" + object.getClass()); + } + + + public boolean isNullStringValue(String name) + { + return _properties.containsKey(name) && (_properties.get(name) == null) && + _propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX); + + + } + + // ***** Methods + + public Enumeration getPropertyNames() + { + Vector<String> names = new Vector<String>(); + + Iterator keys = _properties.keySet().iterator(); + + while (keys.hasNext()) + { + String key = (String) keys.next(); + + names.add(key); + } + + return names.elements(); + } + + public boolean propertyExists(String propertyName) + { + return itemExists(propertyName); + } + + public boolean itemExists(String string) + { + return _properties.containsKey(string); + } + + public String toString() + { + return valueOf(this); + } + + public static String valueOf(PropertyFieldTable table) + { + StringBuffer buf = new StringBuffer(PROPERTY_FIELD_TABLE_OPEN_XML); + + final Iterator it = table._properties.entrySet().iterator(); + + while (it.hasNext()) + { + final Map.Entry entry = (Map.Entry) it.next(); + final String propertyName = (String) entry.getKey(); + + buf.append('\n'); + buf.append(valueAsXML(table._propertyNamesTypeMap.get(propertyName), propertyName, entry.getValue())); + } + buf.append("\n"); + buf.append(PROPERTY_FIELD_TABLE_CLOSE_XML); + + return buf.toString(); + } + + private static String valueAsXML(Prefix type, String propertyName, Object value) + { + StringBuffer buf = new StringBuffer(); + // Start Tag + buf.append(propertyXML(type, propertyName, true)); + + // Value + if (type.equals(Prefix.AMQP_BINARY_PROPERTY_PREFIX)) + { + //remove '>' + buf.deleteCharAt(buf.length() - 1); + + byte[] bytes = (byte[]) value; + buf.append(" length='").append(bytes.length).append("'>"); + + buf.append(byteArrayToXML(propertyName, bytes)); + } + else + { + if (!type.equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX)) + { + buf.append(String.valueOf(value)); + } + } + //End Tag + buf.append(propertyXML(type, propertyName, false)); + + return buf.toString(); + } + + private void checkPropertyName(String propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if ("".equals(propertyName)) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + + protected static void checkIdentiferFormat(String propertyName) + { +// AMQP Spec: 4.2.5.5 Field Tables +// Guidelines for implementers: +// * Field names MUST start with a letter, '$' or '#' and may continue with +// letters, '$' or '#', digits, or underlines, to a maximum length of 128 +// characters. +// * The server SHOULD validate field names and upon receiving an invalid +// field name, it SHOULD signal a connection exception with reply code +// 503 (syntax error). Conformance test: amq_wlp_table_01. +// * A peer MUST handle duplicate fields by using only the first instance. + + // AMQP length limit + if (propertyName.length() > 128) + { + throw new IllegalArgumentException("AMQP limits property names to 128 characters"); + } + + // AMQ start character + if (!(Character.isLetter(propertyName.charAt(0)) + || propertyName.charAt(0) == '$' + || propertyName.charAt(0) == '#')) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character"); + } + } + + private static String propertyXML(Prefix type, String propertyName, boolean start) + { + StringBuffer buf = new StringBuffer(); + + if (start) + { + buf.append("<"); + } + else + { + buf.append("</"); + } + + switch (type) + { + case AMQP_BOOLEAN_PROPERTY_PREFIX: + buf.append(BOOLEAN); + break; + case AMQP_BYTE_PROPERTY_PREFIX: + buf.append(BYTE); + break; + case AMQP_BINARY_PROPERTY_PREFIX: + buf.append(BYTES); + break; + case AMQP_SHORT_PROPERTY_PREFIX: + buf.append(SHORT); + break; + case AMQP_INT_PROPERTY_PREFIX: + buf.append(INT); + break; + case AMQP_LONG_PROPERTY_PREFIX: + buf.append(LONG); + break; + case AMQP_FLOAT_PROPERTY_PREFIX: + buf.append(FLOAT); + break; + case AMQP_DOUBLE_PROPERTY_PREFIX: + buf.append(DOUBLE); + break; + case AMQP_NULL_STRING_PROPERTY_PREFIX: + buf.append(NULL_STRING); + break; + case AMQP_ASCII_STRING_PROPERTY_PREFIX: + case AMQP_WIDE_STRING_PROPERTY_PREFIX: + buf.append(STRING); + break; + case AMQP_ASCII_CHARACTER_PROPERTY_PREFIX: + buf.append(CHAR); + break; + default: + buf.append(UNKNOWN + " (identifier ").append(type.identifier()).append(")"); + break; + } + + if (start) + { + buf.append(" name='").append(propertyName).append("'"); + } + + buf.append(">"); + + return buf.toString(); + } + + private static String byteArrayToXML(String propertyName, byte[] bytes) + { + StringBuffer buf = new StringBuffer(); + + for (int index = 0; index < bytes.length; index++) + { + buf.append("\n"); + buf.append(propertyXML(Prefix.AMQP_BYTE_PROPERTY_PREFIX, propertyName + "[" + index + "]", true)); + buf.append(bytes[index]); + buf.append(propertyXML(Prefix.AMQP_BYTE_PROPERTY_PREFIX, propertyName + "[" + index + "]", false)); + } + buf.append("\n"); + return buf.toString(); + } + + private void processBytesXMLLine(String xmlline) + { + String propertyName = xmlline.substring(xmlline.indexOf('\'') + 1, + xmlline.indexOf('\'', xmlline.indexOf('\'') + 1)); + String value = xmlline.substring(xmlline.indexOf(">") + 1, + xmlline.indexOf("</")); + + Integer index = Integer.parseInt(propertyName.substring(propertyName.lastIndexOf("[") + 1, + propertyName.lastIndexOf("]"))); + propertyName = propertyName.substring(0, propertyName.lastIndexOf("[")); + + getBytes(propertyName)[index] = Byte.parseByte(value); + } + + private void parsePropertyFieldTable(String textFormat) + { + StringTokenizer tokenizer = new StringTokenizer(textFormat, "\n"); + + boolean finished = false; + boolean processing = false; + + boolean processing_bytes = false; + + if (!tokenizer.hasMoreTokens()) + { + throw new IllegalArgumentException("XML has no tokens to parse."); + } + + while (tokenizer.hasMoreTokens()) + { + String token = tokenizer.nextToken(); + + if (token.equals(PROPERTY_FIELD_TABLE_CLOSE_XML)) + { + processing = false; + finished = true; + } + if (token.equals(BYTES_CLOSE_XML)) + { + processing = false; + } + + if (token.equals(BYTES_CLOSE_XML)) + { + processing_bytes = false; + } + + if (processing) + { + processXMLLine(token); + } + else if (processing_bytes) + { + processBytesXMLLine(token); + } + + if (token.startsWith(BYTES_OPEN_XML_START)) + { + processing_bytes = true; + processing = false; + } + + if (token.equals(PROPERTY_FIELD_TABLE_OPEN_XML) || + token.equals(BYTES_CLOSE_XML)) + { + processing = true; + } + } + + if (!finished) + { + throw new IllegalArgumentException("XML was not in a valid format."); + } + + } + + private void processXMLLine(String xmlline) + { + // <<type> name='<property>'><value></<type>> + // <string name='message' >Message 99</string > + + String type = xmlline.substring(1, xmlline.indexOf(" ")); + + String propertyName = xmlline.substring(xmlline.indexOf('\'') + 1, + xmlline.indexOf('\'', xmlline.indexOf('\'') + 1)); + + String value = ""; + + if (!type.equals(BYTES)) + { + value = xmlline.substring(xmlline.indexOf(">") + 1, + xmlline.indexOf("</")); + } + + if (type.equals(BOOLEAN)) + { + setBoolean(propertyName, Boolean.parseBoolean(value)); + } + if (type.equals(BYTE)) + { + setByte(propertyName, Byte.parseByte(value)); + } + if (type.equals(BYTES)) + { + int headerEnd = xmlline.indexOf('>'); + String bytesHeader = xmlline.substring(0, headerEnd); + + //Extract length value + Integer length = Integer.parseInt(bytesHeader.substring( + bytesHeader.lastIndexOf("=") + 2 + , bytesHeader.lastIndexOf("'"))); + + + byte[] bytes = new byte[length]; + setBytes(propertyName, bytes); + + //Check if the line contains all the byte values + // This is needed as the XMLLine sent across the wire is the bytes value + + int byteStart = xmlline.indexOf('<', headerEnd); + + //Don't think this is required. + if (byteStart > 0) + { + while (!xmlline.startsWith(BYTES_CLOSE_XML, byteStart)) + { + //This should be the next byte line + int bytePrefixEnd = xmlline.indexOf('>', byteStart) + 1; + int byteEnd = xmlline.indexOf('>', bytePrefixEnd) + 1; + + String byteline = xmlline.substring(byteStart, byteEnd); + + processBytesXMLLine(byteline); + + byteStart = xmlline.indexOf('<', byteEnd); + } + } + + } + if (type.equals(SHORT)) + { + setShort(propertyName, Short.parseShort(value)); + } + if (type.equals(INT)) + { + setInteger(propertyName, Integer.parseInt(value)); + } + if (type.equals(LONG)) + { + setLong(propertyName, Long.parseLong(value)); + } + if (type.equals(FLOAT)) + { + setFloat(propertyName, Float.parseFloat(value)); + } + if (type.equals(DOUBLE)) + { + setDouble(propertyName, Double.parseDouble(value)); + } + if (type.equals(STRING) || type.equals(NULL_STRING)) + { + if (type.equals(NULL_STRING)) + { + value = null; + } + setString(propertyName, value); + } + if (type.equals(CHAR)) + { + setChar(propertyName, value.charAt(0)); + } + if (type.equals(UNKNOWN)) + { + _logger.warn("Ignoring unknown property value:" + xmlline); + } + } + + // ************************* Byte Buffer Processing + + public void writeToBuffer(ByteBuffer buffer) + { + final boolean trace = _logger.isTraceEnabled(); + + if (trace) + { + _logger.trace("FieldTable::writeToBuffer: Writing encoded size of " + _encodedSize + "..."); + } + + EncodingUtils.writeUnsignedInteger(buffer, _encodedSize); + + putDataInBuffer(buffer); + } + + public byte[] getDataAsBytes() + { + final ByteBuffer buffer = ByteBuffer.allocate((int) _encodedSize); // FIXME XXX: Is cast a problem? + + putDataInBuffer(buffer); + + final byte[] result = new byte[(int) _encodedSize]; + buffer.flip(); + buffer.get(result); + buffer.release(); + return result; + } + + + public int size() + { + return _properties.size(); + } + + public boolean isEmpty() + { + return _properties.isEmpty(); + } + + public boolean containsKey(Object key) + { + return _properties.containsKey(key); + } + + public boolean containsValue(Object value) + { + return _properties.containsValue(value); + } + + public Object get(Object key) + { + return _properties.get(key); + } + + + public Object put(Object key, Object value) + { + return setObject(key.toString(), value); + } + + protected Object put(Prefix type, String propertyName, Object value) + { + checkPropertyName(propertyName); + + //remove the previous value + Object previous = remove(propertyName); + + + if (_logger.isTraceEnabled()) + { + int valueSize = 0; + if (value != null) + { + valueSize = getEncodingSize(type, value); + } + _logger.trace("Put:" + propertyName + + " encoding size Now:" + _encodedSize + + " name size= " + EncodingUtils.encodedShortStringLength(propertyName) + + " value size= " + valueSize); + } + + //Add the size of the propertyName plus one for the type identifier + _encodedSize += EncodingUtils.encodedShortStringLength(propertyName) + 1; + + if (value != null) + { + //Add the size of the content + _encodedSize += getEncodingSize(type, value); + } + + //Store new values + _propertyNamesTypeMap.put(propertyName, type); + _properties.put(propertyName, value); + + return previous; + } + + public Object remove(Object key) + { + if (_properties.containsKey(key)) + { + final Object value = _properties.remove(key); + Prefix type = _propertyNamesTypeMap.remove(key); + // plus one for the type + _encodedSize -= EncodingUtils.encodedShortStringLength(((String) key)) + 1; + + // This check is, for now, unnecessary (we don't store null values). + if (value != null) + { + _encodedSize -= getEncodingSize(type, value); + } + return value; + } + else + { + return null; + } + } + + public void putAll(Map t) + { + Iterator it = t.keySet().iterator(); + + while (it.hasNext()) + { + Object key = it.next(); + put(key, t.get(key)); + } + } + + public void clear() + { + _encodedSize = 0; + _properties.clear(); + _propertyNamesTypeMap.clear(); + } + + public Set keySet() + { + return _properties.keySet(); + } + + public Collection values() + { + return _properties.values(); + } + + public Set entrySet() + { + return _properties.entrySet(); + } + + public long getEncodedSize() + { + return _encodedSize; + } + + + private void putDataInBuffer(ByteBuffer buffer) + { + + final Iterator it = _properties.entrySet().iterator(); + + //If there are values then write out the encoded Size... could check _encodedSize != 0 + // write out the total length, which we have kept up to date as data is added + + + while (it.hasNext()) + { + Map.Entry me = (Map.Entry) it.next(); + String propertyName = (String) me.getKey(); + + //The type value + Prefix type = _propertyNamesTypeMap.get(propertyName); + + Object value = me.getValue(); + try + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Writing Property:" + propertyName + + " Type:" + type + + " Value:" + value); + _logger.trace("Buffer Position:" + buffer.position() + + " Remaining:" + buffer.remaining()); + } + + //Write the actual parameter name + EncodingUtils.writeShortStringBytes(buffer, propertyName); + + switch (type) + { + case AMQP_BOOLEAN_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_BOOLEAN_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeBoolean(buffer, (Boolean) value); + break; + case AMQP_BYTE_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_BYTE_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeByte(buffer, (Byte) value); + break; + case AMQP_SHORT_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_SHORT_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeShort(buffer, (Short) value); + break; + case AMQP_INT_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_INT_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeInteger(buffer, (Integer) value); + break; + case AMQP_UNSIGNED_INT_PROPERTY_PREFIX: // Currently we don't create these + buffer.put((byte) Prefix.AMQP_UNSIGNED_INT_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeUnsignedInteger(buffer, (Long) value); + break; + case AMQP_LONG_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_LONG_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeLong(buffer, (Long) value); + break; + case AMQP_FLOAT_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_FLOAT_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeFloat(buffer, (Float) value); + break; + case AMQP_DOUBLE_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_DOUBLE_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeDouble(buffer, (Double) value); + break; + case AMQP_NULL_STRING_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX.identifier()); + break; + case AMQP_WIDE_STRING_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_WIDE_STRING_PROPERTY_PREFIX.identifier()); + // FIXME: use proper charset encoder + EncodingUtils.writeLongStringBytes(buffer, (String) value); + break; + case AMQP_ASCII_STRING_PROPERTY_PREFIX: + //This is a simple ASCII string + buffer.put((byte) Prefix.AMQP_ASCII_STRING_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeLongStringBytes(buffer, (String) value); + break; + case AMQP_ASCII_CHARACTER_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_ASCII_CHARACTER_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeChar(buffer, (Character) value); + break; + case AMQP_BINARY_PROPERTY_PREFIX: + buffer.put((byte) Prefix.AMQP_BINARY_PROPERTY_PREFIX.identifier()); + EncodingUtils.writeBytes(buffer, (byte[]) value); + break; + default: + { + // Should never get here + throw new IllegalArgumentException("Key '" + propertyName + "': Unsupported type in field table, type: " + ((value == null) ? "null-object" : value.getClass())); + } + } + } + catch (Exception e) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Exception thrown:" + e); + _logger.trace("Writing Property:" + propertyName + + " Type:" + type + + " Value:" + value); + _logger.trace("Buffer Position:" + buffer.position() + + " Remaining:" + buffer.remaining()); + } + throw new RuntimeException(e); + } + } + } + + + public void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException + { + final boolean trace = _logger.isTraceEnabled(); + + int sizeRead = 0; + while (sizeRead < length) + { + int sizeRemaining = buffer.remaining(); + final String key = EncodingUtils.readShortString(buffer); + + byte iType = buffer.get(); + + Character mapKey = new Character((char) iType); + Prefix type = _reverseTypeMap.get(mapKey); + + if (type == null) + { + String msg = "Field '" + key + "' - unsupported field table type: " + type + "."; + //some extra trace information... + msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining; + throw new AMQFrameDecodingException(msg); + } + Object value; + + switch (type) + { + case AMQP_BOOLEAN_PROPERTY_PREFIX: + value = EncodingUtils.readBoolean(buffer); + break; + case AMQP_BYTE_PROPERTY_PREFIX: + value = EncodingUtils.readByte(buffer); + break; + case AMQP_SHORT_PROPERTY_PREFIX: + value = EncodingUtils.readShort(buffer); + break; + case AMQP_INT_PROPERTY_PREFIX: + value = EncodingUtils.readInteger(buffer); + break; + case AMQP_UNSIGNED_INT_PROPERTY_PREFIX:// This will only fit in a long + //Change this type for java lookups + type = Prefix.AMQP_LONG_PROPERTY_PREFIX; + case AMQP_LONG_PROPERTY_PREFIX: + value = EncodingUtils.readLong(buffer); + break; + case AMQP_FLOAT_PROPERTY_PREFIX: + value = EncodingUtils.readFloat(buffer); + break; + case AMQP_DOUBLE_PROPERTY_PREFIX: + value = EncodingUtils.readDouble(buffer); + break; + case AMQP_WIDE_STRING_PROPERTY_PREFIX: + // FIXME: use proper charset encoder + case AMQP_ASCII_STRING_PROPERTY_PREFIX: + value = EncodingUtils.readLongString(buffer); + break; + case AMQP_NULL_STRING_PROPERTY_PREFIX: + value = null; + break; + case AMQP_ASCII_CHARACTER_PROPERTY_PREFIX: + value = EncodingUtils.readChar((buffer)); + break; + case AMQP_BINARY_PROPERTY_PREFIX: + value = EncodingUtils.readBytes(buffer); + break; + default: + String msg = "Internal error, the following type identifier is not handled: " + type; + throw new AMQFrameDecodingException(msg); + } + + sizeRead += (sizeRemaining - buffer.remaining()); + + if (trace) + { + _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + type + "', key '" + key + "', value '" + value + "' (now read " + sizeRead + " of " + length + " encoded bytes)..."); + } + + put(type, key, value); + } + + if (trace) + { + _logger.trace("FieldTable::FieldTable(buffer," + length + "): Done."); + } + } + + /** + * @param type the type to calucluate encoding for + * @param value the property value + * @return integer + */ + private static int getEncodingSize(Prefix type, Object value) + { + int encodingSize = 0; + + switch (type) + { + case AMQP_BOOLEAN_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedBooleanLength(); + break; + case AMQP_BYTE_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedByteLength(); + break; + case AMQP_SHORT_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedShortLength(); + break; + case AMQP_INT_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedIntegerLength(); + break; + case AMQP_LONG_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedLongLength(); + break; + case AMQP_FLOAT_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedFloatLength(); + break; + case AMQP_DOUBLE_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedDoubleLength(); + break; + case AMQP_WIDE_STRING_PROPERTY_PREFIX: + // FIXME: use proper charset encoder + case AMQP_ASCII_STRING_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedLongStringLength((String) value); + break; +// This is not required as this method is never called if the value is null +// case AMQP_NULL_STRING_PROPERTY_PREFIX: +// // There is no need for additional size beyond the prefix +// break; + case AMQP_ASCII_CHARACTER_PROPERTY_PREFIX: + encodingSize = EncodingUtils.encodedCharLength(); + break; + case AMQP_BINARY_PROPERTY_PREFIX: + encodingSize = 1 + ((byte[]) value).length; + break; + default: + throw new IllegalArgumentException("Unsupported type in field table: " + value.getClass()); + } + + // the extra byte for the type indicator is calculated in the name + return encodingSize; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java new file mode 100644 index 0000000000..f0d5489527 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.AMQException; + +public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock +{ + public char[] header = new char[]{'A','M','Q','P'}; + // TODO: generate these constants automatically from the xml protocol spec file + + private static byte CURRENT_PROTOCOL_CLASS = 1; + private static final int CURRENT_PROTOCOL_INSTANCE = 1; + + public byte protocolClass = CURRENT_PROTOCOL_CLASS; + public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE; + public byte protocolMajor; + public byte protocolMinor; + +// public ProtocolInitiation() {} + + public ProtocolInitiation(byte major, byte minor) + { + protocolMajor = major; + protocolMinor = minor; + } + + public long getSize() + { + return 4 + 1 + 1 + 1 + 1; + } + + public void writePayload(ByteBuffer buffer) + { + for (int i = 0; i < header.length; i++) + { + buffer.put((byte) header[i]); + } + buffer.put(protocolClass); + buffer.put(protocolInstance); + buffer.put(protocolMajor); + buffer.put(protocolMinor); + } + + public void populateFromBuffer(ByteBuffer buffer) throws AMQException + { + throw new AMQException("Method not implemented"); + } + + public boolean equals(Object o) + { + if (!(o instanceof ProtocolInitiation)) + { + return false; + } + + ProtocolInitiation pi = (ProtocolInitiation) o; + if (pi.header == null) + { + return false; + } + + if (header.length != pi.header.length) + { + return false; + } + + for (int i = 0; i < header.length; i++) + { + if (header[i] != pi.header[i]) + { + return false; + } + } + + return (protocolClass == pi.protocolClass && + protocolInstance == pi.protocolInstance && + protocolMajor == pi.protocolMajor && + protocolMinor == pi.protocolMinor); + } + + public static class Decoder //implements MessageDecoder + { + /** + * + * @param session + * @param in + * @return true if we have enough data to decode the PI frame fully, false if more + * data is required + */ + public boolean decodable(IoSession session, ByteBuffer in) + { + return (in.remaining() >= 8); + } + + public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + throws Exception + { + byte[] theHeader = new byte[4]; + in.get(theHeader); + ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0); + pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]}; + String stringHeader = new String(pi.header); + if (!"AMQP".equals(stringHeader)) + { + throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader); + } + pi.protocolClass = in.get(); + pi.protocolInstance = in.get(); + pi.protocolMajor = in.get(); + pi.protocolMinor = in.get(); + out.write(pi); + } + } + + public void checkVersion(ProtocolVersionList pvl) throws AMQException + { + if (protocolClass != CURRENT_PROTOCOL_CLASS) + { + throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + + protocolClass); + } + if (protocolInstance != CURRENT_PROTOCOL_INSTANCE) + { + throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " + + protocolInstance); + } + /* + if (protocolMajor != CURRENT_PROTOCOL_VERSION_MAJOR) + { + throw new AMQProtocolVersionException("Protocol major version " + CURRENT_PROTOCOL_VERSION_MAJOR + + " was expected; received " + protocolMajor); + } + if (protocolMinor != CURRENT_PROTOCOL_VERSION_MINOR) + { + throw new AMQProtocolVersionException("Protocol minor version " + CURRENT_PROTOCOL_VERSION_MINOR + + " was expected; received " + protocolMinor); + } + */ + + /* Look through list of available protocol versions */ + boolean found = false; + for (int i=0; i<pvl.pv.length; i++) + { + if (pvl.pv[i][pvl.PROTOCOL_MAJOR] == protocolMajor && + pvl.pv[i][pvl.PROTOCOL_MINOR] == protocolMinor) + { + found = true; + } + } + if (!found) + { + // TODO: add list of available versions in list to msg... + throw new AMQProtocolVersionException("Protocol version " + + protocolMajor + "." + protocolMinor + " not found in protocol version list."); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java new file mode 100644 index 0000000000..7364b9293a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoFilter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IdleStatus; + +/** + * Represents an operation on IoFilter. + */ +enum EventType +{ + OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION +} + +class Event +{ + private static final Logger _log = Logger.getLogger(Event.class); + + private final EventType type; + private final IoFilter.NextFilter nextFilter; + private final Object data; + + public Event(IoFilter.NextFilter nextFilter, EventType type, Object data) + { + this.type = type; + this.nextFilter = nextFilter; + this.data = data; + if (type == EventType.EXCEPTION) + { + _log.error("Exception event constructed: " + data, (Throwable) data); + } + } + + public Object getData() + { + return data; + } + + + public IoFilter.NextFilter getNextFilter() + { + return nextFilter; + } + + + public EventType getType() + { + return type; + } + + void process(IoSession session) + { + if (_log.isDebugEnabled()) + { + _log.debug("Processing " + this); + } + if (type == EventType.RECEIVED) + { + nextFilter.messageReceived(session, data); + //ByteBufferUtil.releaseIfPossible( data ); + } + else if (type == EventType.SENT) + { + nextFilter.messageSent(session, data); + //ByteBufferUtil.releaseIfPossible( data ); + } + else if (type == EventType.EXCEPTION) + { + nextFilter.exceptionCaught(session, (Throwable) data); + } + else if (type == EventType.IDLE) + { + nextFilter.sessionIdle(session, (IdleStatus) data); + } + else if (type == EventType.OPENED) + { + nextFilter.sessionOpened(session); + } + else if (type == EventType.WRITE) + { + nextFilter.filterWrite(session, (IoFilter.WriteRequest) data); + } + else if (type == EventType.CLOSED) + { + nextFilter.sessionClosed(session); + } + } + + public String toString() + { + return "Event: type " + type + ", data: " + data; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java new file mode 100644 index 0000000000..b9673cd48f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.mina.common.IoSession; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Holds events for a session that will be processed asynchronously by + * the thread pool in PoolingFilter. + */ +class Job implements Runnable +{ + private final int _maxEvents; + private final IoSession _session; + private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final AtomicBoolean _active = new AtomicBoolean(); + private final AtomicInteger _refCount = new AtomicInteger(); + private final JobCompletionHandler _completionHandler; + + Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) + { + _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + } + + void acquire() + { + _refCount.incrementAndGet(); + } + + void release() + { + _refCount.decrementAndGet(); + } + + boolean isReferenced() + { + return _refCount.get() > 0; + } + + void add(Event evt) + { + _eventQueue.add(evt); + } + + void processAll() + { + //limit the number of events processed in one run + for (int i = 0; i < _maxEvents; i++) + { + Event e = _eventQueue.poll(); + if (e == null) + { + break; + } + else + { + e.process(_session); + } + } + } + + boolean isComplete() + { + return _eventQueue.peek() == null; + } + + boolean activate() + { + return _active.compareAndSet(false, true); + } + + void deactivate() + { + _active.set(false); + } + + public void run() + { + processAll(); + deactivate(); + _completionHandler.completed(_session, this); + } + + + static interface JobCompletionHandler + { + public void completed(IoSession session, Job job); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java new file mode 100644 index 0000000000..38cfa68c78 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoSession; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler +{ + private static final Logger _logger = Logger.getLogger(PoolingFilter.class); + public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED)); + public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE)); + + private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); + private final ReferenceCountingExecutorService _poolReference; + private final Set<EventType> _asyncTypes; + + private final String _name; + private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name) + { + _poolReference = refCountingPool; + _asyncTypes = asyncTypes; + _name = name; + } + + private void fireEvent(IoSession session, Event event) + { + if (_asyncTypes.contains(event.getType())) + { + Job job = getJobForSession(session); + job.acquire(); //prevents this job being removed from _jobs + job.add(event); + + //Additional checks on pool to check that it hasn't shutdown. + // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) + { + _poolReference.getPool().execute(job); + } + } + else + { + event.process(session); + } + } + + private Job getJobForSession(IoSession session) + { + Job job = _jobs.get(session); + return job == null ? createJobForSession(session) : job; + } + + private Job createJobForSession(IoSession session) + { + return addJobForSession(session, new Job(session, this, _maxEvents)); + } + + private Job addJobForSession(IoSession session, Job job) + { + //atomic so ensures all threads agree on the same job + Job existing = _jobs.putIfAbsent(session, job); + return existing == null ? job : existing; + } + + //Job.JobCompletionHandler + public void completed(IoSession session, Job job) + { + if (job.isComplete()) + { + job.release(); + if (!job.isReferenced()) + { + _jobs.remove(session); + } + } + else + { + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) + { + _poolReference.getPool().execute(job); + } + } + } + + //IoFilter methods that are processed by threads on the pool + + public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.OPENED, null)); + } + + public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.CLOSED, null)); + } + + public void sessionIdle(NextFilter nextFilter, IoSession session, + IdleStatus status) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.IDLE, status)); + } + + public void exceptionCaught(NextFilter nextFilter, IoSession session, + Throwable cause) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause)); + } + + public void messageReceived(NextFilter nextFilter, IoSession session, + Object message) throws Exception + { + //ByteBufferUtil.acquireIfPossible( message ); + fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message)); + } + + public void messageSent(NextFilter nextFilter, IoSession session, + Object message) throws Exception + { + //ByteBufferUtil.acquireIfPossible( message ); + fireEvent(session, new Event(nextFilter, EventType.SENT, message)); + } + + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + { + fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest)); + } + + //IoFilter methods that are processed on current thread (NOT on pooled thread) + + public void filterClose(NextFilter nextFilter, IoSession session) throws Exception + { + nextFilter.filterClose(session); + } + + public void sessionCreated(NextFilter nextFilter, IoSession session) + { + nextFilter.sessionCreated(session); + } + + public String toString() + { + return _name; + } + + // LifeCycle methods + + public void init() + { + _logger.info("Init called on PoolingFilter " + toString()); + // called when the filter is initialised in the chain. If the reference count is + // zero this acquire will initialise the pool + _poolReference.acquireExecutorService(); + } + + public void destroy() + { + _logger.info("Destroy called on PoolingFilter " + toString()); + // when the reference count gets to zero we release the executor service + _poolReference.releaseExecutorService(); + } +} + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java new file mode 100644 index 0000000000..d4dbf1309a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.filter.ReferenceCountingIoFilter; +import org.apache.mina.common.ThreadModel; + +public class ReadWriteThreadModel implements ThreadModel +{ + public void buildFilterChain(IoFilterChain chain) throws Exception + { + ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); + PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS, + "AsynchronousReadFilter"); + PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS, + "AsynchronousWriteFilter"); + + chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); + chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java new file mode 100644 index 0000000000..637464f247 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * We share the executor service among several PoolingFilters. This class reference counts + * how many filter chains are using the executor service and destroys the service, thus + * freeing up its threads, when the count reaches zero. It recreates the service when + * the count is incremented. + * + * This is particularly important on the client where failing to destroy the executor + * service prevents the JVM from shutting down due to the existence of non-daemon threads. + * + */ +public class ReferenceCountingExecutorService +{ + private static final int MINIMUM_POOL_SIZE = 4; + private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE); + + /** + * We need to be able to check the current reference count and if necessary + * create the executor service atomically. + */ + private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService(); + + private final Object _lock = new Object(); + + private ExecutorService _pool; + + private int _refCount = 0; + + private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + + public static ReferenceCountingExecutorService getInstance() + { + return _instance; + } + + private ReferenceCountingExecutorService() + { + } + + ExecutorService acquireExecutorService() + { + synchronized (_lock) + { + if (_refCount++ == 0) + { + _pool = Executors.newFixedThreadPool(_poolSize); + } + return _pool; + } + } + + void releaseExecutorService() + { + synchronized (_lock) + { + if (--_refCount == 0) + { + _pool.shutdownNow(); + } + } + } + + /** + * The filters that use the executor service should call this method to get access + * to the service. Note that this method does not alter the reference count. + * + * @return the underlying executor service + */ + public ExecutorService getPool() + { + return _pool; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java new file mode 100644 index 0000000000..fc83c0726d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +import java.util.Map; +import java.util.HashMap; + +public final class AMQConstant +{ + private int _code; + + private String _name; + + private static Map _codeMap = new HashMap(); + + private AMQConstant(int code, String name, boolean map) + { + _code = code; + _name = name; + if (map) + { + _codeMap.put(new Integer(code), this); + } + } + + public String toString() + { + return _code + ": " + _name; + } + + public int getCode() + { + return _code; + } + + public String getName() + { + return _name; + } + + public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true); + + public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true); + + public static final AMQConstant REPLY_SUCCESS = new AMQConstant(200, "reply success", true); + + public static final AMQConstant NOT_DELIVERED = new AMQConstant(310, "not delivered", true); + + public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true); + + public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true); + + public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true); + + public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true); + + public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true); + + public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); + + public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true); + + public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true); + + public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true); + + public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true); + + public static final AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true); + + public static final AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error", true); + + public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); + + public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + + public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); + + public static final AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal error", true); + + public static AMQConstant getConstant(int code) + { + AMQConstant c = (AMQConstant) _codeMap.get(new Integer(code)); + if (c == null) + { + c = new AMQConstant(code, "unknown code", false); + } + return c; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java new file mode 100644 index 0000000000..fee02c9d93 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ssl/BogusSSLContextFactory.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.security.GeneralSecurityException; +import java.security.KeyStore; + +/** + * Factory to create a bogus SSLContext. This means that it is easy to test SSL but this + * cannot be used in a production environment. + * <p/> + * This is based on the sample that comes with MINA, written by Trustin Lee + */ +public class BogusSSLContextFactory +{ + /** + * Protocol to use. + */ + private static final String PROTOCOL = "TLS"; + + /** + * Bougus Server certificate keystore file name. + */ + private static final String BOGUS_KEYSTORE = "qpid.cert"; + + // NOTE: The keystore was generated using keytool: + // keytool -genkey -alias qpid -keysize 512 -validity 3650 + // -keyalg RSA -dname "CN=amqp.org" -keypass qpidpw + // -storepass qpidpw -keystore qpid.cert + + private static final char[] BOGUS_KEYSTORE_PASSWORD = {'q', 'p', 'i', 'd', 'p', 'w'}; + + private static SSLContext serverInstance = null; + + private static SSLContext clientInstance = null; + + /** + * Get SSLContext singleton. + * + * @return SSLContext + * @throws java.security.GeneralSecurityException + */ + public static SSLContext getInstance(boolean server) + throws GeneralSecurityException + { + SSLContext retInstance; + if (server) + { + // FIXME: looks like double-checking locking + if (serverInstance == null) + { + synchronized (BogusSSLContextFactory.class) + { + if (serverInstance == null) + { + try + { + serverInstance = createBougusServerSSLContext(); + } + catch (Exception ioe) + { + throw new GeneralSecurityException( + "Can't create Server SSLContext:" + ioe); + } + } + } + } + retInstance = serverInstance; + } + else + { + // FIXME: looks like double-checking locking + if (clientInstance == null) + { + synchronized (BogusSSLContextFactory.class) + { + if (clientInstance == null) + { + clientInstance = createBougusClientSSLContext(); + } + } + } + retInstance = clientInstance; + } + return retInstance; + } + + private static SSLContext createBougusServerSSLContext() + throws GeneralSecurityException, IOException + { + // Create keystore + KeyStore ks = KeyStore.getInstance("JKS"); + InputStream in = null; + try + { + in = BogusSSLContextFactory.class.getResourceAsStream(BOGUS_KEYSTORE); + if (in == null) + { + throw new IOException("Unable to load keystore resource: " + BOGUS_KEYSTORE); + } + ks.load(in, BOGUS_KEYSTORE_PASSWORD); + } + finally + { + if (in != null) + { + //noinspection EmptyCatchBlock + try + { + in.close(); + } + catch (IOException ignored) + { + } + } + } + + // Set up key manager factory to use our key store + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, BOGUS_KEYSTORE_PASSWORD); + + // Initialize the SSLContext to work with our key managers. + SSLContext sslContext = SSLContext.getInstance(PROTOCOL); + sslContext.init(kmf.getKeyManagers(), BogusTrustManagerFactory.X509_MANAGERS, null); + + return sslContext; + } + + private static SSLContext createBougusClientSSLContext() + throws GeneralSecurityException + { + SSLContext context = SSLContext.getInstance(PROTOCOL); + context.init(null, BogusTrustManagerFactory.X509_MANAGERS, null); + return context; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java new file mode 100644 index 0000000000..4fb6f75b8d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ssl/BogusTrustManagerFactory.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ssl.ManagerFactoryParameters; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactorySpi; +import javax.net.ssl.X509TrustManager; +import java.security.InvalidAlgorithmParameterException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +/** + * Bogus trust manager factory. Used to make testing SSL simpler - i.e no need to + * mess about with keystores. + * <p/> + * This is based on the example that comes with MINA, written by Trustin Lee. + */ +class BogusTrustManagerFactory extends TrustManagerFactorySpi +{ + + static final X509TrustManager X509 = new X509TrustManager() + { + public void checkClientTrusted(X509Certificate[] x509Certificates, + String s) throws CertificateException + { + } + + public void checkServerTrusted(X509Certificate[] x509Certificates, + String s) throws CertificateException + { + } + + public X509Certificate[] getAcceptedIssuers() + { + return new X509Certificate[ 0 ]; + } + }; + + static final TrustManager[] X509_MANAGERS = new TrustManager[]{X509}; + + public BogusTrustManagerFactory() + { + } + + protected TrustManager[] engineGetTrustManagers() + { + return X509_MANAGERS; + } + + protected void engineInit(KeyStore keystore) throws KeyStoreException + { + // noop + } + + protected void engineInit( + ManagerFactoryParameters managerFactoryParameters) + throws InvalidAlgorithmParameterException + { + // noop + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java new file mode 100644 index 0000000000..90b3589752 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLServerSocketFactory.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.ServerSocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.security.GeneralSecurityException; + +/** + * Simple Server Socket factory to create sockets with or without SSL enabled. + * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes) + * <p/> + * This is based on the example that comes with MINA, written by Trustin Lee. + */ +public class SSLServerSocketFactory extends javax.net.ServerSocketFactory +{ + private static boolean sslEnabled = false; + + private static javax.net.ServerSocketFactory sslFactory = null; + + private static ServerSocketFactory factory = null; + + public SSLServerSocketFactory() + { + super(); + } + + public ServerSocket createServerSocket(int port) throws IOException + { + return new ServerSocket(port); + } + + public ServerSocket createServerSocket(int port, int backlog) + throws IOException + { + return new ServerSocket(port, backlog); + } + + public ServerSocket createServerSocket(int port, int backlog, + InetAddress ifAddress) + throws IOException + { + return new ServerSocket(port, backlog, ifAddress); + } + + public static javax.net.ServerSocketFactory getServerSocketFactory() + throws IOException + { + if (isSslEnabled()) + { + if (sslFactory == null) + { + try + { + sslFactory = BogusSSLContextFactory.getInstance(true) + .getServerSocketFactory(); + } + catch (GeneralSecurityException e) + { + IOException ioe = new IOException( + "could not create SSL socket"); + ioe.initCause(e); + throw ioe; + } + } + return sslFactory; + } + else + { + if (factory == null) + { + factory = new SSLServerSocketFactory(); + } + return factory; + } + + } + + public static boolean isSslEnabled() + { + return sslEnabled; + } + + public static void setSslEnabled(boolean newSslEnabled) + { + sslEnabled = newSslEnabled; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java new file mode 100644 index 0000000000..ef9820f067 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLSocketFactory.java @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ssl; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.security.GeneralSecurityException; + +/** + * Simple Socket factory to create sockets with or without SSL enabled. + * If SSL enabled a "bogus" SSL Context is used (suitable for test purposes). + * <p/> + * This is based on an example that comes with MINA, written by Trustin Lee. + */ +public class SSLSocketFactory extends SocketFactory +{ + private static boolean sslEnabled = false; + + private static javax.net.ssl.SSLSocketFactory sslFactory = null; + + private static javax.net.SocketFactory factory = null; + + public SSLSocketFactory() + { + super(); + } + + public Socket createSocket(String arg1, int arg2) throws IOException, + UnknownHostException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2); + } + else + { + return new Socket(arg1, arg2); + } + } + + public Socket createSocket(String arg1, int arg2, InetAddress arg3, + int arg4) throws IOException, + UnknownHostException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2, arg3, arg4); + } + else + { + return new Socket(arg1, arg2, arg3, arg4); + } + } + + public Socket createSocket(InetAddress arg1, int arg2) + throws IOException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2); + } + else + { + return new Socket(arg1, arg2); + } + } + + public Socket createSocket(InetAddress arg1, int arg2, InetAddress arg3, + int arg4) throws IOException + { + if (isSslEnabled()) + { + return getSSLFactory().createSocket(arg1, arg2, arg3, arg4); + } + else + { + return new Socket(arg1, arg2, arg3, arg4); + } + } + + public static javax.net.SocketFactory getSocketFactory() + { + if (factory == null) + { + factory = new SSLSocketFactory(); + } + return factory; + } + + private javax.net.ssl.SSLSocketFactory getSSLFactory() + { + if (sslFactory == null) + { + try + { + sslFactory = BogusSSLContextFactory.getInstance(false) + .getSocketFactory(); + } + catch (GeneralSecurityException e) + { + throw new RuntimeException("could not create SSL socket", e); + } + } + return sslFactory; + } + + public static boolean isSslEnabled() + { + return sslEnabled; + } + + public static void setSslEnabled(boolean newSslEnabled) + { + sslEnabled = newSslEnabled; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java new file mode 100644 index 0000000000..b6a0bd500a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.url; + +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.exchange.ExchangeDefaults; + +import java.util.HashMap; +import java.net.URI; +import java.net.URISyntaxException; + +public class AMQBindingURL implements BindingURL +{ + String _url; + String _exchangeClass; + String _exchangeName; + String _destinationName; + String _queueName; + private HashMap<String, String> _options; + + + public AMQBindingURL(String url) throws URLSyntaxException + { + //format: + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + + _url = url; + _options = new HashMap<String, String>(); + + parseBindingURL(); + } + + private void parseBindingURL() throws URLSyntaxException + { + try + { + URI connection = new URI(_url); + + String exchangeClass = connection.getScheme(); + + if (exchangeClass == null) + { + _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + + ExchangeDefaults.DIRECT_EXCHANGE_NAME + "//" + _url; + //URLHelper.parseError(-1, "Exchange Class not specified.", _url); + parseBindingURL(); + return; + } + else + { + setExchangeClass(exchangeClass); + } + + String exchangeName = connection.getHost(); + + if (exchangeName == null) + { + URLHelper.parseError(-1, "Exchange Name not specified.", _url); + } + else + { + setExchangeName(exchangeName); + } + + if (connection.getPath() == null || + connection.getPath().equals("")) + { + URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination or Queue requried", _url); + } + else + { + int slash = connection.getPath().indexOf("/", 1); + if (slash == -1) + { + URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + "Destination requried", _url); + } + else + { + String path = connection.getPath(); + setDestinationName(path.substring(1, slash)); + + setQueueName(path.substring(slash + 1)); + + } + } + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + + } + } + + private void processOptions() + { + //this is where we would parse any options that needed more than just storage. + } + + public String getURL() + { + return _url; + } + + public String getExchangeClass() + { + return _exchangeClass; + } + + public void setExchangeClass(String exchangeClass) + { + _exchangeClass = exchangeClass; + } + + public String getExchangeName() + { + return _exchangeName; + } + + public void setExchangeName(String name) + { + _exchangeName = name; + + if (name.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) + { + setOption(BindingURL.OPTION_EXCLUSIVE, "true"); + } + } + + public String getDestinationName() + { + return _destinationName; + } + + public void setDestinationName(String name) + { + _destinationName = name; + } + + public String getQueueName() + { + if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) + { + if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) + { + return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + } + else + { + return getDestinationName(); + } + } + else + { + return getDestinationName(); + } + } + else + { + return _queueName; + } + } + + public void setQueueName(String name) + { + _queueName = name; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public boolean containsOption(String key) + { + return _options.containsKey(key); + } + + public String getRoutingKey() + { + if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return getQueueName(); + } + + if (containsOption(BindingURL.OPTION_ROUTING_KEY)) + { + return getOption(OPTION_ROUTING_KEY); + } + + return getDestinationName(); + } + + public void setRoutingKey(String key) + { + setOption(OPTION_ROUTING_KEY, key); + } + + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_exchangeClass); + sb.append("://"); + sb.append(_exchangeName); + sb.append('/'); + sb.append(_destinationName); + sb.append('/'); + sb.append(_queueName); + + sb.append(URLHelper.printOptions(_options)); + return sb.toString(); + } + + public static void main(String args[]) throws URLSyntaxException + { + String url = "exchangeClass://exchangeName/Destination/Queue?option='value',option2='value2'"; + + AMQBindingURL dest = new AMQBindingURL(url); + + System.out.println(url); + System.out.println(dest); + + } + +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java new file mode 100644 index 0000000000..76690b3230 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.url; + +/* + Binding URL format: + <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* +*/ +public interface BindingURL +{ + public static final String OPTION_EXCLUSIVE = "exclusive"; + public static final String OPTION_AUTODELETE = "autodelete"; + public static final String OPTION_DURABLE = "durable"; + public static final String OPTION_CLIENTID = "clientid"; + public static final String OPTION_SUBSCRIPTION = "subscription"; + public static final String OPTION_ROUTING_KEY = "routingkey"; + + + String getURL(); + + String getExchangeClass(); + + void setExchangeClass(String exchangeClass); + + String getExchangeName(); + + void setExchangeName(String name); + + String getDestinationName(); + + void setDestinationName(String name); + + String getQueueName(); + + void setQueueName(String name); + + String getOption(String key); + + void setOption(String key, String value); + + boolean containsOption(String key); + + String getRoutingKey(); + + void setRoutingKey(String key); + + String toString(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java new file mode 100644 index 0000000000..2121346c02 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java @@ -0,0 +1,176 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.url; + +import java.util.HashMap; + +public class URLHelper +{ + public static char DEFAULT_OPTION_SEPERATOR = '&'; + public static char ALTERNATIVE_OPTION_SEPARATOR = ','; + public static char BROKER_SEPARATOR = ';'; + + public static void parseOptions(HashMap<String, String> optionMap, String options) throws URLSyntaxException + { + //options looks like this + //brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'' + + if (options == null || options.indexOf('=') == -1) + { + return; + } + + int optionIndex = options.indexOf('='); + + String option = options.substring(0, optionIndex); + + int length = options.length(); + + int nestedQuotes = 0; + + // to store index of final "'" + int valueIndex = optionIndex; + + //Walk remainder of url. + while (nestedQuotes > 0 || valueIndex < length) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options.charAt(valueIndex) == '\'') + { + if (valueIndex + 1 < options.length()) + { + if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR || + options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR || + options.charAt(valueIndex + 1) == BROKER_SEPARATOR || + options.charAt(valueIndex + 1) == '\'') + { + nestedQuotes--; +// System.out.println( +// options + "\n" + "-" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + if (nestedQuotes == 0) + { + //We've found the value of an option + break; + } + } + else + { + nestedQuotes++; +// System.out.println( +// options + "\n" + "+" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1)); + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options.charAt(valueIndex) == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + + if (nestedQuotes != 0 || valueIndex < (optionIndex + 2)) + { + int sepIndex = 0; + + //Try and identify illegal separator character + if (nestedQuotes > 1) + { + for (int i = 0; i < nestedQuotes; i++) + { + sepIndex = options.indexOf('\'', sepIndex); + sepIndex++; + } + } + + if (sepIndex >= options.length() || sepIndex == 0) + { + parseError(valueIndex, "Unterminated option", options); + } + else + { + parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + + options.charAt(sepIndex) + "'", options); + } + } + + // optionIndex +2 to skip "='" + String value = options.substring(optionIndex + 2, valueIndex); + + optionMap.put(option, value); + + if (valueIndex < (options.length() - 1)) + { + //Recurse to get remaining options + parseOptions(optionMap, options.substring(valueIndex + 2)); + } + } + + + public static void parseError(int index, String error, String url) throws URLSyntaxException + { + parseError(index, 1, error, url); + } + + public static void parseError(int index, int length, String error, String url) throws URLSyntaxException + { + throw new URLSyntaxException(url, error, index, length); + } + + public static String printOptions(HashMap<String, String> options) + { + if (options.isEmpty()) + { + return ""; + } + else + { + StringBuffer sb = new StringBuffer(); + sb.append('?'); + for (String key : options.keySet()) + { + sb.append(key); + + sb.append("='"); + + sb.append(options.get(key)); + + sb.append("'"); + sb.append(DEFAULT_OPTION_SEPERATOR); + } + + sb.deleteCharAt(sb.length() - 1); + + return sb.toString(); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java b/qpid/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java new file mode 100644 index 0000000000..3ff7195794 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.url; + +import java.net.URISyntaxException; + +public class URLSyntaxException extends URISyntaxException +{ + private int _length; + + public URLSyntaxException(String url, String error, int index, int length) + { + super(url, error, index); + + _length = length; + } + + private static String getPositionString(int index, int length) + { + StringBuffer sb = new StringBuffer(index + 1); + + for (int i = 0; i < index; i++) + { + sb.append(" "); + } + + if (length > -1) + { + for (int i = 0; i < length; i++) + { + sb.append('^'); + } + } + + return sb.toString(); + } + + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(getReason()); + + if (getIndex() > -1) + { + if (_length != -1) + { + sb.append(" between indicies "); + sb.append(getIndex()); + sb.append(" and "); + sb.append(_length); + } + else + { + sb.append(" at index "); + sb.append(getIndex()); + } + } + + sb.append(" "); + if (getIndex() != -1) + { + sb.append("\n"); + } + + sb.append(getInput()); + + if (getIndex() != -1) + { + sb.append("\n"); + sb.append(getPositionString(getIndex(), _length)); + } + + return sb.toString(); + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java new file mode 100644 index 0000000000..c2d758611d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConcurrentLinkedQueueAtomicSize<E> extends ConcurrentLinkedQueue<E> +{ + AtomicInteger _size = new AtomicInteger(0); + + public int size() + { + return _size.get(); + } + + public boolean offer(E o) + { + + if (super.offer(o)) + { + _size.incrementAndGet(); + return true; + } + + return false; + } + + public E poll() + { + E e = super.poll(); + + if (e != null) + { + _size.decrementAndGet(); + } + + return e; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java new file mode 100644 index 0000000000..1f168345a1 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E> +{ + public int size() + { + if (isEmpty()) + { + return 0; + } + else + { + return 1; + } + } +} diff --git a/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert b/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert Binary files differnew file mode 100644 index 0000000000..e6702108e6 --- /dev/null +++ b/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert diff --git a/qpid/java/common/src/main/versions/ProtocolVersionList.java.tmpl b/qpid/java/common/src/main/versions/ProtocolVersionList.java.tmpl new file mode 100644 index 0000000000..f0e202dac9 --- /dev/null +++ b/qpid/java/common/src/main/versions/ProtocolVersionList.java.tmpl @@ -0,0 +1,40 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This class is autogenerated, do not modify. + */ + +package org.apache.qpid.framing; + +/** + * NOTE: Don't remove the line containing the token VER or VER1 - these are + * markers for code generation. + */ + +public interface ProtocolVersionList +{ + public final int PROTOCOL_MAJOR = 0; + public final int PROTOCOL_MINOR = 1; + public final byte pv[][] = { + // !VER1! + }; +} diff --git a/qpid/java/common/src/main/xsl/cluster.asl b/qpid/java/common/src/main/xsl/cluster.asl new file mode 100644 index 0000000000..40ca937904 --- /dev/null +++ b/qpid/java/common/src/main/xsl/cluster.asl @@ -0,0 +1,59 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> + +<amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80"> + +<class name = "cluster" index = "101"> + +<doc> + An extension that allows brokers to communicate in order to + provide a clustered service to clients. +</doc> + +<method name = "join"> + <field name = "broker" type = "shortstr" /> +</method> + +<method name = "membership"> + <field name = "members" type = "longstr" /> +</method> + +<method name = "synch"> +</method> + +<method name = "leave"> + <field name = "broker" type = "shortstr" /> +</method> + +<method name = "suspect"> + <field name = "broker" type = "shortstr" /> +</method> + +<method name = "ping"> + <field name = "broker" type = "shortstr" /> + <field name = "load" type = "long" /> + <field name = "response required" type = "bit" /> +</method> + +</class> + +</amqp> diff --git a/qpid/java/common/src/main/xsl/framing.xsl b/qpid/java/common/src/main/xsl/framing.xsl new file mode 100644 index 0000000000..b8ae20aaf5 --- /dev/null +++ b/qpid/java/common/src/main/xsl/framing.xsl @@ -0,0 +1,64 @@ +<?xml version='1.0'?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="prepare1.xsl"/> +<xsl:import href="prepare2.xsl"/> +<xsl:import href="prepare3.xsl"/> +<xsl:import href="java.xsl"/> + +<xsl:output indent="yes"/> +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:template match="/"> + <xsl:variable name="prepare1"> + <xsl:apply-templates mode="prepare1" select="."/> + </xsl:variable> + + <xsl:variable name="prepare2"> + <xsl:apply-templates mode="prepare2" select="$prepare1"/> + </xsl:variable> + + <xsl:variable name="model"> + <xsl:apply-templates mode="prepare3" select="$prepare2"/> + </xsl:variable> + + <xsl:apply-templates mode="generate-multi" select="$model"/> + <xsl:apply-templates mode="list-registry" select="$model"/> + + <!-- dump out the intermediary files for debugging --> + <!-- + <xsl:result-document href="prepare1.out"> + <xsl:copy-of select="$prepare1"/> + </xsl:result-document> + + <xsl:result-document href="prepare2.out"> + <xsl:copy-of select="$prepare2"/> + </xsl:result-document> + + <xsl:result-document href="model.out"> + <xsl:copy-of select="$model"/> + </xsl:result-document> + --> +</xsl:template> + +</xsl:stylesheet> diff --git a/qpid/java/common/src/main/xsl/java.xsl b/qpid/java/common/src/main/xsl/java.xsl new file mode 100644 index 0000000000..948415fc18 --- /dev/null +++ b/qpid/java/common/src/main/xsl/java.xsl @@ -0,0 +1,248 @@ +<?xml version='1.0'?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<!-- this class contains the templates for generating java source code for a given framing model --> +<xsl:import href="utils.xsl"/> +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:param name="major"/> +<xsl:param name="minor"/> +<xsl:param name="registry_name"/> +<xsl:param name="version_list_name"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="generate-multi" select="frames"/> + <xsl:apply-templates mode="generate-registry" select="frames"/> +</xsl:template> + +<!-- processes all frames outputting the classes in a single stream --> +<!-- (useful for debugging etc) --> +<xsl:template match="frame" mode="generate-single"> + <xsl:call-template name="generate-class"> + <xsl:with-param name="f" select="."/> + </xsl:call-template> +</xsl:template> + +<!-- generates seperate file for each class/frame --> +<xsl:template match="frame" mode="generate-multi"> + <xsl:variable name="uri" select="concat(@name, '.java')"/> + wrote <xsl:value-of select="$uri"/> + <xsl:result-document href="{$uri}" format="textFormat"> + <xsl:call-template name="generate-class"> + <xsl:with-param name="f" select="."/> + </xsl:call-template> + </xsl:result-document> +</xsl:template> + +<!-- main class generation template --> +<xsl:template name="generate-class"> + <xsl:param name="f"/> + <xsl:value-of select="amq:copyright()"/> +<!-- package org.apache.qpid.framing_<xsl:value-of select="$major"/>_<xsl:value-of select="$minor"/>; --> +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.EncodableAMQDataBlock; + +/** + * This class is autogenerated, do not modify. [From <xsl:value-of select="$f/parent::frames/@protocol"/>] + */ +public class <xsl:value-of select="$f/@name"/> extends AMQMethodBody implements EncodableAMQDataBlock +{ + public static final int CLASS_ID = <xsl:value-of select="$f/@class-id"/>; + public static final int METHOD_ID = <xsl:value-of select="$f/@method-id"/>; + + <xsl:for-each select="$f/field"> + <xsl:text>public </xsl:text><xsl:value-of select="@java-type"/> + <xsl:text> </xsl:text> + <xsl:value-of select="@name"/>; + </xsl:for-each> + + protected int getClazz() + { + return <xsl:value-of select="$f/@class-id"/>; + } + + protected int getMethod() + { + return <xsl:value-of select="$f/@method-id"/>; + } + + protected int getBodySize() + { + <xsl:choose> + <xsl:when test="$f/field"> + return + <xsl:for-each select="$f/field"> + <xsl:if test="position() != 1">+ + </xsl:if> + <xsl:value-of select="amq:field-length(.)"/> + </xsl:for-each> + ; + </xsl:when> + <xsl:otherwise>return 0;</xsl:otherwise> + </xsl:choose> + } + + protected void writeMethodPayload(ByteBuffer buffer) + { + <xsl:for-each select="$f/field"> + <xsl:if test="@type != 'bit'"> + <xsl:value-of select="amq:encoder(.)"/>; + </xsl:if> + <xsl:if test="@type = 'bit' and @boolean-index = 1"> + <xsl:text>EncodingUtils.writeBooleans(buffer, new boolean[]{</xsl:text> + <xsl:value-of select="$f/field[@type='bit']/@name" separator=", "/>}); + </xsl:if> + </xsl:for-each> + } + + public void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException + { + <xsl:for-each select="$f/field"> + <xsl:value-of select="amq:decoder(.)"/>; + </xsl:for-each> + } + + public String toString() + { + StringBuffer buf = new StringBuffer(super.toString()); + <xsl:for-each select="$f/field"> + <xsl:text>buf.append(" </xsl:text><xsl:value-of select="@name"/>: ").append(<xsl:value-of select="@name"/>); + </xsl:for-each> + return buf.toString(); + } + + public static AMQFrame createAMQFrame(int channelId<xsl:if test="$f/field">, </xsl:if><xsl:value-of select="$f/field/concat(@java-type, ' ', @name)" separator=", "/>) + { + <xsl:value-of select="@name"/> body = new <xsl:value-of select="@name"/>(); + <xsl:for-each select="$f/field"> + <xsl:value-of select="concat('body.', @name, ' = ', @name)"/>; + </xsl:for-each> + AMQFrame frame = new AMQFrame(); + frame.channel = channelId; + frame.bodyFrame = body; + return frame; + } +} +</xsl:template> + +<xsl:template match="/" mode="generate-registry"> + <xsl:text>Matching root for registry mode!</xsl:text> + <xsl:value-of select="."/> + <xsl:apply-templates select="frames" mode="generate-registry"/> +</xsl:template> + +<xsl:template match="registries" mode="generate-registry"> +Wrote MethodBodyDecoderRegistry.java + <xsl:result-document href="MethodBodyDecoderRegistry.java" format="textFormat"> + <xsl:value-of select="amq:copyright()"/> +<!-- package org.apache.qpid.framing_<xsl:value-of select="$major"/>_<xsl:value-of select="$minor"/>; --> +package org.apache.qpid.framing; + +import java.util.Map; +import java.util.HashMap; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; + +/** + * This class is autogenerated, do not modify. + */ +public final class MethodBodyDecoderRegistry +{ + private static final Logger _log = Logger.getLogger(MethodBodyDecoderRegistry.class); + + private static final Map _classMethodProductToMethodBodyMap = new HashMap(); + + static + { + <xsl:for-each select="registry"> + <xsl:value-of select="concat(@name, '.register(_classMethodProductToMethodBodyMap)')"/>; + </xsl:for-each> + } + + public static AMQMethodBody get(int clazz, int method) throws AMQFrameDecodingException + { + Class bodyClass = (Class) _classMethodProductToMethodBodyMap.get(new Integer(clazz * 1000 + method)); + if (bodyClass != null) + { + try + { + return (AMQMethodBody) bodyClass.newInstance(); + } + catch (Exception e) + { + throw new AMQFrameDecodingException(_log, + "Unable to instantiate body class for class " + clazz + " and method " + method + ": " + e, e); + } + } + else + { + throw new AMQFrameDecodingException(_log, + "Unable to find a suitable decoder for class " + clazz + " and method " + method); + } + } +} +</xsl:result-document> +</xsl:template> + +<xsl:template match="frames" mode="list-registry"> + <xsl:if test="$registry_name"> + + <xsl:variable name="file" select="concat($registry_name, '.java')"/> + wrote <xsl:value-of select="$file"/> + <xsl:result-document href="{$file}" format="textFormat"> + <xsl:value-of select="amq:copyright()"/> +<!-- package org.apache.qpid.framing_<xsl:value-of select="$major"/>_<xsl:value-of select="$minor"/>; --> +package org.apache.qpid.framing; + +import java.util.Map; + +/** + * This class is autogenerated, do not modify. [From <xsl:value-of select="@protocol"/>] + */ +class <xsl:value-of select="$registry_name"/> +{ + static void register(Map map) + { + <xsl:for-each select="frame"> + <xsl:text>map.put(new Integer(</xsl:text> + <xsl:value-of select="@class-id"/> + <xsl:text> * 1000 + </xsl:text> + <xsl:value-of select="@method-id"/> + <xsl:text>), </xsl:text> + <xsl:value-of select="concat(@name, '.class')"/>); + </xsl:for-each> + } +} + </xsl:result-document> + + </xsl:if> +</xsl:template> + +</xsl:stylesheet> diff --git a/qpid/java/common/src/main/xsl/prepare1.xsl b/qpid/java/common/src/main/xsl/prepare1.xsl new file mode 100644 index 0000000000..03e1fa7634 --- /dev/null +++ b/qpid/java/common/src/main/xsl/prepare1.xsl @@ -0,0 +1,114 @@ +<?xml version='1.0'?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> +<xsl:param name="asl_base"/> + +<!-- pre-process, phase 1 --> + +<xsl:template match="/"> + <xsl:apply-templates select="protocol" mode="prepare1"/> +</xsl:template> + +<xsl:template match="amqp" mode="prepare1"> + <frames> + <xsl:attribute name="protocol"> + <xsl:value-of select="@comment"/> + <xsl:text> (</xsl:text> + <xsl:text>major=</xsl:text><xsl:value-of select="@major"/> + <xsl:text>, minor=</xsl:text><xsl:value-of select="@minor"/> + <xsl:text>)</xsl:text> + </xsl:attribute> + <xsl:attribute name="major"> + <xsl:value-of select="@major"/> + </xsl:attribute> + <xsl:attribute name="minor"> + <xsl:value-of select="@minor"/> + </xsl:attribute> + <xsl:apply-templates mode="prepare1" select="inherit"/> + <xsl:apply-templates mode="prepare1" select="include"/> + <xsl:apply-templates mode="prepare1" select="domain"/> + <xsl:apply-templates mode="prepare1" select="class"/> + </frames> +</xsl:template> + +<xsl:template match="include" mode="prepare1"> + <xsl:if test="@filename != 'asl_constants.asl'"> + <!-- skip asl_constants.asl, we don't need it and it is not well formed so causes error warnings --> + <xsl:apply-templates select="document(@filename)" mode="prepare1"/> + </xsl:if> +</xsl:template> + +<xsl:template match="inherit" mode="prepare1"> + <xsl:variable name="ibase" select="concat('file:///', $asl_base, '/', @name, '.asl')"/> + <xsl:choose> + <xsl:when test="document($ibase)"> + <xsl:apply-templates select="document($ibase)" mode="prepare1"/> + </xsl:when> + <xsl:otherwise> + <xsl:message> + Could not inherit from <xsl:value-of select="$ibase"/>; file not found. + </xsl:message> + </xsl:otherwise> + </xsl:choose> +</xsl:template> + +<xsl:template match="class[@index]" mode="prepare1"> + <xsl:apply-templates select="method" mode="prepare1"/> +</xsl:template> + +<xsl:template match="method" mode="prepare1"> + <xsl:if test="parent::class[@index]"><!-- there is a template class that has no index, which we want to skip --> + <frame> + <xsl:attribute name="name"><xsl:value-of select="amq:class-name(parent::class/@name, @name)"/></xsl:attribute> + <xsl:attribute name="class-id"><xsl:value-of select="parent::class/@index"/></xsl:attribute> + <xsl:if test="@index"> + <xsl:attribute name="method-id"><xsl:value-of select="@index"/></xsl:attribute> + </xsl:if> + <xsl:if test="not(@index)"> + <xsl:attribute name="method-id"><xsl:number count="method"/></xsl:attribute> + </xsl:if> + + <xsl:apply-templates select="field" mode="prepare1"/> + </frame> + </xsl:if> +</xsl:template> + +<xsl:template match="domain" mode="prepare1"> + <domain> + <name><xsl:value-of select="@name"/></name> + <type><xsl:value-of select="@type"/></type> + </domain> +</xsl:template> + +<xsl:template match="field" mode="prepare1"> + <field> + <xsl:copy-of select="@name"/> + <xsl:copy-of select="@type"/> + <xsl:copy-of select="@domain"/> + </field> +</xsl:template> + +</xsl:stylesheet> diff --git a/qpid/java/common/src/main/xsl/prepare2.xsl b/qpid/java/common/src/main/xsl/prepare2.xsl new file mode 100644 index 0000000000..14f4f33841 --- /dev/null +++ b/qpid/java/common/src/main/xsl/prepare2.xsl @@ -0,0 +1,69 @@ +<?xml version='1.0'?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> + +<!-- pre-process, phase 2 --> + +<xsl:key name="domain-lookup" match="domain" use="name"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="prepare2" select="frames"/> +</xsl:template> + +<xsl:template match="field[@domain]" mode="prepare2"> + <field> + <xsl:variable name="t1" select="key('domain-lookup', @domain)/type"/> + <xsl:attribute name="name"><xsl:value-of select="amq:field-name(@name)"/></xsl:attribute> + <xsl:attribute name="type"><xsl:value-of select="$t1"/></xsl:attribute> + </field> +</xsl:template> + +<xsl:template match="field[@type]" mode="prepare2"> + <field> + <xsl:attribute name="name"><xsl:value-of select="amq:field-name(@name)"/></xsl:attribute> + <xsl:attribute name="type"><xsl:value-of select="@type"/></xsl:attribute> + </field> +</xsl:template> + +<xsl:template match="frames" mode="prepare2"> + <frames> + <xsl:copy-of select="@protocol"/> + <xsl:copy-of select="@major"/> + <xsl:copy-of select="@minor"/> + <xsl:apply-templates mode="prepare2"/> + </frames> +</xsl:template> + +<xsl:template match="frame" mode="prepare2"> + <xsl:element name="{name()}"> + <xsl:copy-of select="@*"/> + <xsl:apply-templates mode="prepare2" select="field"/> + </xsl:element> +</xsl:template> + +<xsl:template match="domain" mode="prepare2"></xsl:template> + +</xsl:stylesheet> diff --git a/qpid/java/common/src/main/xsl/prepare3.xsl b/qpid/java/common/src/main/xsl/prepare3.xsl new file mode 100644 index 0000000000..f8cf0c8932 --- /dev/null +++ b/qpid/java/common/src/main/xsl/prepare3.xsl @@ -0,0 +1,65 @@ +<?xml version='1.0'?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> + +<!-- final preparation of the model --> + +<xsl:template match="/"> + <xsl:apply-templates mode="prepare3"/> +</xsl:template> + +<xsl:template match="frames" mode="prepare3"> + <frames> + <xsl:copy-of select="@protocol"/> + <xsl:copy-of select="@major"/> + <xsl:copy-of select="@minor"/> + <xsl:apply-templates mode="prepare3"/> + </frames> +</xsl:template> + +<xsl:template match="frame" mode="prepare3"> + <xsl:element name="frame"> + <xsl:copy-of select="@*"/> + <xsl:if test="field[@type='bit']"><xsl:attribute name="has-bit-field">true</xsl:attribute></xsl:if> + <xsl:apply-templates mode="prepare3"/> + </xsl:element> +</xsl:template> + + +<xsl:template match="field" mode="prepare3"> + <field> + <xsl:attribute name="type"><xsl:value-of select="@type"/></xsl:attribute> + <!-- ensure the field name is processed to be a valid java name --> + <xsl:attribute name="name"><xsl:value-of select="amq:field-name(@name)"/></xsl:attribute> + <!-- add some attributes to make code generation easier --> + <xsl:attribute name="java-type"><xsl:value-of select="amq:java-type(@type)"/></xsl:attribute> + <xsl:if test="@type='bit'"> + <xsl:attribute name="boolean-index"><xsl:number count="field[@type='bit']"/></xsl:attribute> + </xsl:if> + </field> +</xsl:template> + +</xsl:stylesheet> diff --git a/qpid/java/common/src/main/xsl/readme.txt b/qpid/java/common/src/main/xsl/readme.txt new file mode 100644 index 0000000000..b373055df9 --- /dev/null +++ b/qpid/java/common/src/main/xsl/readme.txt @@ -0,0 +1,52 @@ +This directory contains the xsl stylesheets used to generate the code from the +AMQP protocol specification. They require an XSLT2.0 processor, currently +Saxon 8 is used. + +The generation process is controlled by the framing.xsl stylesheet. This performs +several phases of transformation, using the other stylesheets. The transformation +in each phase is defined in a separate file, and these are designed to also allow +then to be run individually. + +The generation takes the amq.asl as input, it also requires that the path to the +directory where the base asl definitions reside (those definitions that the main +amq.asl defintion inherits from) be passed in via a paramter called asl_base. + +The files involved are as follows: + + framing.xsl The control file for the entire generation process + + prepare1.xsl Resolves the separate files that make up the protocol + definition, building a single tree containing all the + information as a set of 'frame' elements, each of which + has attributes for its name, and ids for the class and + method it refers to and contains zero or more field + elements. + + A method id is generated based on the order of the + method elements within the class elements in the original + specification. The class id is taken from the enclosing + class element. + + prepare2.xsl Resolves domains into their corresponding types. (This is + much easier when all the information is in a single tree, + hence the separate frame). + + prepare3.xsl Converts names into valid java names and augments the + tree to include information that makes the subsequent + generation phase simpler e.g. the index of boolean + fields as several boolean flags are combined into a + single byte. (This is easier once the domains have been + resolved, hence the separate phase). + + java.xsl Generates java classes for each frame, and a registry of + all the frames to a 'magic' number generated from their + class and method id. + + utils.xsl Contains some utility methods for e.g. producing valid + java names. + +For debugging the framing.xsl can output the intermediary files. This can be +enabled by uncommenting the relevant lines (a comment explaining this is +provided inline). + +
\ No newline at end of file diff --git a/qpid/java/common/src/main/xsl/registry.template b/qpid/java/common/src/main/xsl/registry.template new file mode 100644 index 0000000000..87c5afcb7b --- /dev/null +++ b/qpid/java/common/src/main/xsl/registry.template @@ -0,0 +1,25 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<registries> + <registry name="MainRegistry"/> + <registry name="ClusterRegistry"/> +</registries> diff --git a/qpid/java/common/src/main/xsl/registry.xsl b/qpid/java/common/src/main/xsl/registry.xsl new file mode 100644 index 0000000000..c70dbe21a5 --- /dev/null +++ b/qpid/java/common/src/main/xsl/registry.xsl @@ -0,0 +1,32 @@ +<?xml version='1.0'?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<xsl:import href="java.xsl"/> + +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="generate-registry" select="registries"/> +</xsl:template> + +</xsl:stylesheet> diff --git a/qpid/java/common/src/main/xsl/utils.xsl b/qpid/java/common/src/main/xsl/utils.xsl new file mode 100644 index 0000000000..95e15c6e38 --- /dev/null +++ b/qpid/java/common/src/main/xsl/utils.xsl @@ -0,0 +1,207 @@ +<?xml version='1.0'?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amq="http://amq.org"> + +<!-- This file contains functions that are used in the generation of the java classes for framing --> + +<!-- create copyright notice for generated files --> +<xsl:function name="amq:copyright">/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +</xsl:function> + +<!-- retrieve the java type of a given amq type --> +<xsl:function name="amq:java-type"> + <xsl:param name="t"/> + <xsl:choose> + <xsl:when test="$t='char'">char</xsl:when> + <xsl:when test="$t='octet'">short</xsl:when> + <xsl:when test="$t='short'">int</xsl:when> + <xsl:when test="$t='shortstr'">String</xsl:when> + <xsl:when test="$t='longstr'">byte[]</xsl:when> + <xsl:when test="$t='bit'">boolean</xsl:when> + <xsl:when test="$t='long'">long</xsl:when> + <xsl:when test="$t='longlong'">long</xsl:when> + <xsl:when test="$t='table'">FieldTable</xsl:when> + <xsl:otherwise>Object /*WARNING: undefined type*/</xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to get the field size of a given amq type --> +<xsl:function name="amq:field-length"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='bit' and $f/@boolean-index=1"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='bit' and $f/@boolean-index > 1"> + <xsl:value-of select="concat('0 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='char'"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat('2 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat('4 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat('8 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat('EncodingUtils.encodedShortStringLength(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat('4 + (', $f/@name, ' == null ? 0 : ', $f/@name, '.length)')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat('EncodingUtils.encodedFieldTableLength(', $f/@name, ')')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE FIELD SIZE */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to encode a field of a given amq type --> +<!-- Note: + This method will not provide an encoder for a bit field. + Bit fields should be encoded together separately. --> + +<xsl:function name="amq:encoder"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='char'"> + <xsl:value-of select="concat('EncodingUtils.writeChar(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat('EncodingUtils.writeUnsignedByte(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat('EncodingUtils.writeUnsignedShort(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat('EncodingUtils.writeUnsignedInteger(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat('buffer.putLong(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat('EncodingUtils.writeShortStringBytes(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat('EncodingUtils.writeLongstr(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat('EncodingUtils.writeFieldTableBytes(buffer, ', $f/@name, ')')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE ENCODER */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to decode a field of a given amq type --> +<xsl:function name="amq:decoder"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='bit'"> + <xsl:if test="$f/@boolean-index = 1"> + <xsl:text>boolean[] bools = EncodingUtils.readBooleans(buffer);</xsl:text> + </xsl:if> + <xsl:value-of select="concat($f/@name, ' = bools[', $f/@boolean-index - 1 , ']')"/> + </xsl:when> + <xsl:when test="$f/@type='char'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getChar()')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getUnsigned()')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getUnsignedShort()')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getUnsignedInt()')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getLong()')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat($f/@name, ' = EncodingUtils.readShortString(buffer)')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat($f/@name, ' = EncodingUtils.readLongstr(buffer)')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat($f/@name, ' = EncodingUtils.readFieldTable(buffer)')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE DECODER */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- create the class name for a frame, based on class and method (passed in) --> +<xsl:function name="amq:class-name"> + <xsl:param name="class"/> + <xsl:param name="method"/> + <xsl:value-of select="concat(amq:upper-first($class),amq:upper-first(amq:field-name($method)), 'Body')"/> +</xsl:function> + +<!-- get a valid field name, processing spaces and '-'s where appropriate --> +<xsl:function name="amq:field-name"> + <xsl:param name="name"/> + <xsl:choose> + <xsl:when test="contains($name, ' ')"> + <xsl:value-of select="concat(substring-before($name, ' '), amq:upper-first(substring-after($name, ' ')))"/> + </xsl:when> + <xsl:when test="contains($name, '-')"> + <xsl:value-of select="concat(substring-before($name, '-'), amq:upper-first(substring-after($name, '-')))"/> + </xsl:when> + <xsl:otherwise> + <xsl:value-of select="$name"/> + </xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- convert the first character of the input to upper-case --> +<xsl:function name="amq:upper-first"> + <xsl:param name="in"/> + <xsl:value-of select="concat(upper-case(substring($in, 1, 1)), substring($in, 2))"/> +</xsl:function> + +</xsl:stylesheet> diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java new file mode 100644 index 0000000000..66dd1b10ef --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import java.util.HashMap; + +import junit.framework.TestCase; + + +public class BasicContentHeaderPropertiesTest extends TestCase +{ + + BasicContentHeaderProperties _testProperties; + PropertyFieldTable _testTable; + String _testString = "This is a test string"; + int _testint = 666; + + /** + * Currently only test setting/getting String, int and boolean props + */ + public BasicContentHeaderPropertiesTest() + { + _testProperties = new BasicContentHeaderProperties(); + } + + public void setUp() + { + HashMap _testMap = new HashMap(10); + _testMap.put("TestString", _testString); + _testMap.put("Testint", _testint); + _testTable = new PropertyFieldTable(); + _testTable.putAll(_testMap); + _testProperties = new BasicContentHeaderProperties(); + _testProperties.setHeaders(_testTable); + } + + public void testGetPropertyListSize() + { + //needs a better test but at least we're exercising the code ! + // FT size is encoded in an int + int expectedSize = EncodingUtils.encodedIntegerLength(); + + expectedSize += EncodingUtils.encodedShortStringLength("TestInt"); + // 1 is for the Encoding Letter. here an 'i' + expectedSize += 1 + EncodingUtils.encodedIntegerLength(); + + expectedSize += EncodingUtils.encodedShortStringLength("TestString"); + // 1 is for the Encoding Letter. here an 'S' + expectedSize += 1 + EncodingUtils.encodedLongStringLength(_testString); + + + int size = _testProperties.getPropertyListSize(); + + assertEquals(expectedSize, size); + } + + public void testGetSetPropertyFlags() + { + _testProperties.setPropertyFlags(99); + assertEquals(99, _testProperties.getPropertyFlags()); + } + + public void testWritePropertyListPayload() + { + ByteBuffer buf = ByteBuffer.allocate(300); + _testProperties.writePropertyListPayload(buf); + } + + public void testPopulatePropertiesFromBuffer() throws Exception + { + ByteBuffer buf = ByteBuffer.allocate(300); + _testProperties.populatePropertiesFromBuffer(buf, 99, 99); + } + + public void testSetGetContentType() + { + String contentType = "contentType"; + _testProperties.setContentType(contentType); + assertEquals(contentType, _testProperties.getContentType()); + } + + public void testSetGetEncoding() + { + String encoding = "encoding"; + _testProperties.setEncoding(encoding); + assertEquals(encoding, _testProperties.getEncoding()); + } + + public void testSetGetHeaders() + { + _testProperties.setHeaders(_testTable); + assertEquals(_testTable, _testProperties.getHeaders()); + } + + public void testSetGetDeliveryMode() + { + byte deliveryMode = 1; + _testProperties.setDeliveryMode(deliveryMode); + assertEquals(deliveryMode, _testProperties.getDeliveryMode()); + } + + public void testSetGetPriority() + { + byte priority = 1; + _testProperties.setPriority(priority); + assertEquals(priority, _testProperties.getPriority()); + } + + public void testSetGetCorrelationId() + { + String correlationId = "correlationId"; + _testProperties.setCorrelationId(correlationId); + assertEquals(correlationId, _testProperties.getCorrelationId()); + } + + public void testSetGetReplyTo() + { + String replyTo = "replyTo"; + _testProperties.setReplyTo(replyTo); + assertEquals(replyTo, _testProperties.getReplyTo()); + } + + public void testSetGetExpiration() + { + long expiration = 999999999; + _testProperties.setExpiration(expiration); + assertEquals(expiration, _testProperties.getExpiration()); + } + + public void testSetGetMessageId() + { + String messageId = "messageId"; + _testProperties.setMessageId(messageId); + assertEquals(messageId, _testProperties.getMessageId()); + } + + public void testSetGetTimestamp() + { + long timestamp = 999999999; + _testProperties.setTimestamp(timestamp); + assertEquals(timestamp, _testProperties.getTimestamp()); + } + + public void testSetGetType() + { + String type = "type"; + _testProperties.setType(type); + assertEquals(type, _testProperties.getType()); + } + + public void testSetGetUserId() + { + String userId = "userId"; + _testProperties.setUserId(userId); + assertEquals(userId, _testProperties.getUserId()); + } + + public void testSetGetAppId() + { + String appId = "appId"; + _testProperties.setAppId(appId); + assertEquals(appId, _testProperties.getAppId()); + } + + public void testSetGetClusterId() + { + String clusterId = "clusterId"; + _testProperties.setClusterId(clusterId); + assertEquals(clusterId, _testProperties.getClusterId()); + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java new file mode 100644 index 0000000000..9cad31766b --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java @@ -0,0 +1,1016 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.framing; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import java.util.Enumeration; + +import org.apache.log4j.Logger; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +public class JMSPropertyFieldTableTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(JMSPropertyFieldTableTest.class); + + + public void setUp() + { + System.getProperties().setProperty("strict-jms", "true"); + } + + public void tearDown() + { + System.getProperties().remove("strict-jms"); + } + + /** + * Test that setting a similar named value replaces any previous value set on that name + */ + public void testReplacement() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + //Set a boolean value + table1.setBoolean("value", true); + + // reset value to an integer + table1.setInteger("value", Integer.MAX_VALUE); + + //Check boolean value is null + try + { + table1.getBoolean("value"); + } + catch (MessageFormatException mfe) + { + //normal execution + } + // ... and integer value is good + Assert.assertEquals(Integer.MAX_VALUE, table1.getInteger("value")); + } + + public void testRemoval() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + //Set a boolean value + table1.setBoolean("value", true); + + Assert.assertTrue(table1.getBoolean("value")); + + table1.remove("value"); + + //Check boolean value is null + try + { + table1.getBoolean("value"); + } + catch (MessageFormatException mfe) + { + //normal execution + } + } + + + /** + * Set a boolean and check that we can only get it back as a boolean and a string + * Check that attempting to lookup a non existent value returns null + */ + public void testBoolean() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setBoolean("value", true); + Assert.assertTrue(table1.propertyExists("value")); + + //Test Getting right value back + Assert.assertEquals(true, table1.getBoolean("value")); + + //Check we don't get anything back for other gets + try + { + table1.getByte("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getByte("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getShort("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getDouble("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getFloat("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getInteger("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getLong("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + //except value as a string + Assert.assertEquals("true", table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value will return false + Assert.assertFalse(table1.getBoolean("Rubbish")); + } + + /** + * Set a byte and check that we can only get it back as a byte and a string + * Check that attempting to lookup a non existent value returns null + */ + public void testByte() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setByte("value", Byte.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + try + { + table1.getBoolean("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getDouble("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getFloat("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getShort("value")); + Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getInteger("value")); + Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getLong("value")); + Assert.assertEquals(Byte.MAX_VALUE, table1.getByte("value")); + //... and a the string value of it. + Assert.assertEquals("" + Byte.MAX_VALUE, table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + try + { + table1.getByte("Rubbish"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException mfs) + { + //normal Execution + } + + } + + + /** + * Set a short and check that we can only get it back as a short and a string + * Check that attempting to lookup a non existent value returns null + */ + public void testShort() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setShort("value", Short.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + + try + { + table1.getBoolean("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getByte("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + try + { + table1.getDouble("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getFloat("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + + Assert.assertEquals(Short.MAX_VALUE, (short) table1.getLong("value")); + Assert.assertEquals(Short.MAX_VALUE, (short) table1.getInteger("value")); + Assert.assertEquals(Short.MAX_VALUE, table1.getShort("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Short.MAX_VALUE, table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + try + { + table1.getShort("Rubbish"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException mfe) + { + //normal path + } + } + + + /** + * Set a double and check that we can only get it back as a double + * Check that attempting to lookup a non existent value returns null + */ + public void testDouble() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setDouble("value", Double.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + try + { + table1.getBoolean("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getByte("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getShort("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getFloat("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getInteger("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getLong("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + Assert.assertEquals(Double.MAX_VALUE, table1.getDouble("value")); + //... and a the string value of it. + Assert.assertEquals("" + Double.MAX_VALUE, table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + try + { + table1.getDouble("Rubbish"); + fail("Should throw NullPointerException as float.valueOf will try sunreadJavaFormatString"); + } + catch (NullPointerException mfe) + { + //normal path + } + + } + + + /** + * Set a float and check that we can only get it back as a float + * Check that attempting to lookup a non existent value returns null + */ + public void testFloat() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setFloat("value", Float.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + try + { + table1.getBoolean("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getByte("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getShort("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getInteger("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getLong("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + + Assert.assertEquals(Float.MAX_VALUE, table1.getFloat("value")); + Assert.assertEquals(Float.MAX_VALUE, (float) table1.getDouble("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Float.MAX_VALUE, table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + try + { + table1.getFloat("Rubbish"); + fail("Should throw NullPointerException as float.valueOf will try sunreadJavaFormatString"); + } + catch (NullPointerException mfe) + { + //normal path + } + } + + + /** + * Set an int and check that we can only get it back as an int + * Check that attempting to lookup a non existent value returns null + */ + public void testInt() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setInteger("value", Integer.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + try + { + table1.getBoolean("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getByte("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getShort("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getDouble("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getFloat("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + + Assert.assertEquals(Integer.MAX_VALUE, table1.getLong("value")); + + Assert.assertEquals(Integer.MAX_VALUE, table1.getInteger("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Integer.MAX_VALUE, table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + try + { + table1.getInteger("Rubbish"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException mfe) + { + //normal path + } + } + + + /** + * Set a long and check that we can only get it back as a long + * Check that attempting to lookup a non existent value returns null + */ + public void testLong() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setLong("value", Long.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + try + { + table1.getBoolean("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getByte("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getShort("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getDouble("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getFloat("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + try + { + table1.getInteger("value"); + fail("Should throw MessageFormatException"); + } + catch (MessageFormatException mfs) + { + //normal Execution + } + + + Assert.assertEquals(Long.MAX_VALUE, table1.getLong("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Long.MAX_VALUE, table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value + try + { + table1.getLong("Rubbish"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException mfs) + { + //normal Execution + } + + } + + + /** + * Calls all methods that can be used to check the table is empty + * - getEncodedSize + * - isEmpty + * - size + * + * @param table to check is empty + */ + private void checkEmpty(JMSPropertyFieldTable table) + { + Assert.assertFalse(table.getPropertyNames().hasMoreElements()); + } + + + /** + * Set a String and check that we can only get it back as a String + * Check that attempting to lookup a non existent value returns null + */ + public void testString() throws JMSException + { + JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(); + table1.setString("value", "Hello"); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(false, table1.getBoolean("value")); + + try + { + table1.getByte("value"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException nfs) + { + //normal Execution + } + try + { + table1.getShort("value"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException nfs) + { + //normal Execution + } + try + { + table1.getDouble("value"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException nfs) + { + //normal Execution + } + try + { + table1.getFloat("value"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException nfs) + { + //normal Execution + } + try + { + table1.getInteger("value"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException nfs) + { + //normal Execution + } + try + { + table1.getLong("value"); + fail("Should throw NumberFormatException"); + } + catch (NumberFormatException nfs) + { + //normal Execution + } + + Assert.assertEquals("Hello", table1.getString("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.propertyExists("value")); + + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getString("Rubbish")); + + //Additional Test that haven't been covered for string + table1.setObject("value", "Hello"); + //Check that it was set correctly + Assert.assertEquals("Hello", table1.getString("value")); + } + + + public void testValues() throws JMSException + { + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + table.setBoolean("bool", true); + table.setDouble("double", Double.MAX_VALUE); + table.setFloat("float", Float.MAX_VALUE); + table.setInteger("int", Integer.MAX_VALUE); + table.setLong("long", Long.MAX_VALUE); + table.setShort("short", Short.MAX_VALUE); + table.setString("string", "Hello"); + table.setString("nullstring", null); + + table.setObject("objectbool", true); + table.setObject("objectdouble", Double.MAX_VALUE); + table.setObject("objectfloat", Float.MAX_VALUE); + table.setObject("objectint", Integer.MAX_VALUE); + table.setObject("objectlong", Long.MAX_VALUE); + table.setObject("objectshort", Short.MAX_VALUE); + table.setObject("objectstring", "Hello"); + + + Assert.assertEquals(true, table.getBoolean("bool")); + + Assert.assertEquals(Double.MAX_VALUE, table.getDouble("double")); + Assert.assertEquals(Float.MAX_VALUE, table.getFloat("float")); + Assert.assertEquals(Integer.MAX_VALUE, table.getInteger("int")); + Assert.assertEquals(Long.MAX_VALUE, table.getLong("long")); + Assert.assertEquals(Short.MAX_VALUE, table.getShort("short")); + Assert.assertEquals("Hello", table.getString("string")); + Assert.assertEquals(null, table.getString("null-string")); + + Assert.assertEquals(true, table.getObject("objectbool")); + Assert.assertEquals(Double.MAX_VALUE, table.getObject("objectdouble")); + Assert.assertEquals(Float.MAX_VALUE, table.getObject("objectfloat")); + Assert.assertEquals(Integer.MAX_VALUE, table.getObject("objectint")); + Assert.assertEquals(Long.MAX_VALUE, table.getObject("objectlong")); + Assert.assertEquals(Short.MAX_VALUE, table.getObject("objectshort")); + Assert.assertEquals("Hello", table.getObject("objectstring")); + } + + /** + * Additional test checkPropertyName doesn't accept Null + */ + public void testCheckPropertyNameasNull() throws JMSException + { + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + + try + { + table.setObject(null, "String"); + fail("Null property name is not allowed"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + checkEmpty(table); + } + + + /** + * Additional test checkPropertyName doesn't accept an empty String + */ + public void testCheckPropertyNameasEmptyString() throws JMSException + { + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + + try + { + table.setObject("", "String"); + fail("empty property name is not allowed"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + checkEmpty(table); + } + + + /** + * Additional test checkPropertyName doesn't accept an empty String + */ + public void testCheckPropertyNamehasMaxLength() throws JMSException + { + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + + StringBuffer longPropertyName = new StringBuffer(129); + + for (int i = 0; i < 129; i++) + { + longPropertyName.append("x"); + } + + try + { + table.setObject(longPropertyName.toString(), "String"); + fail("property name must be < 128 characters"); + } + catch (IllegalArgumentException iae) + { + _logger.warn("JMS requires infinite property names AMQP limits us to 128 characters"); + } + + checkEmpty(table); + } + + + /** + * Additional test checkPropertyName starts with a letter + */ + public void testCheckPropertyNameStartCharacterIsLetter() throws JMSException + { + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + + //Try a name that starts with a number + try + { + table.setObject("1", "String"); + fail("property name must start with a letter"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + + checkEmpty(table); + } + + /** + * Additional test checkPropertyName starts with a letter + */ + public void testCheckPropertyNameContainsInvalidCharacter() throws JMSException + { + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + + //Try a name that starts with a number + try + { + table.setObject("hello there", "String"); + fail("property name cannot contain spaces"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + + checkEmpty(table); + } + + + /** + * Additional test checkPropertyName starts with a letter + */ + public void testCheckPropertyNameIsInvalid() throws JMSException + { + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + + //Try a name that starts with a number + try + { + table.setObject("ESCAPE", "String"); + fail("property name must not contains spaces"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + + checkEmpty(table); + } + + /** + * Additional test checkPropertyName starts with a hash or a dollar + */ + public void testCheckPropertyNameStartCharacterIsHashorDollar() throws JMSException + { + _logger.warn("Test:testCheckPropertyNameStartCharacterIsHashorDollar will fail JMS compilance as # and $ are not valid in a jms identifier"); +// JMSPropertyFieldTable table = new JMSPropertyFieldTable(); +// +// //Try a name that starts with a number +// try +// { +// table.setObject("#", "String"); +// table.setObject("$", "String"); +// } +// catch (IllegalArgumentException iae) +// { +// fail("property name are allowed to start with # and $s in AMQP"); +// } + } + + /** + * Test the contents of the sets + */ + public void testSets() + { + + JMSPropertyFieldTable table = new JMSPropertyFieldTable(); + + table.put("n1", "1"); + table.put("n2", "2"); + table.put("n3", "3"); + + Enumeration enumerator = table.getPropertyNames(); + Assert.assertEquals("n1", enumerator.nextElement()); + Assert.assertEquals("n2", enumerator.nextElement()); + Assert.assertEquals("n3", enumerator.nextElement()); + Assert.assertFalse(enumerator.hasMoreElements()); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(JMSPropertyFieldTableTest.class); + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java new file mode 100644 index 0000000000..5256c62054 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -0,0 +1,1162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.framing; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import java.util.Enumeration; +import java.util.Iterator; +import java.util.Map; +import java.util.HashMap; + +import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQPInvalidClassException; + +public class PropertyFieldTableTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(PropertyFieldTableTest.class); + + /** + * Test that modifying a byte[] after setting property doesn't change property + */ + public void testByteModification() + { + PropertyFieldTable table = new PropertyFieldTable(); + byte[] bytes = {99, 98, 97, 96, 95}; + table.setBytes("bytes", bytes); + bytes[0] = 1; + bytes[1] = 2; + bytes[2] = 3; + bytes[3] = 4; + bytes[4] = 5; + + assertBytesNotEqual(bytes, table.getBytes("bytes")); + } + + /** + * Test that setting a similar named value replaces any previous value set on that name + */ + public void testReplacement() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + //Set a boolean value + table1.setBoolean("value", true); + //Check size of table is correct (<Value length> + <type> + <Boolean length>) + int size = EncodingUtils.encodedShortStringLength("value") + 1 + EncodingUtils.encodedBooleanLength(); + Assert.assertEquals(size, table1.getEncodedSize()); + + // reset value to an integer + table1.setInteger("value", Integer.MAX_VALUE); + + // Check the size has changed accordingly (<Value length> + <type> + <Integer length>) + size = EncodingUtils.encodedShortStringLength("value") + 1 + EncodingUtils.encodedIntegerLength(); + Assert.assertEquals(size, table1.getEncodedSize()); + + //Check boolean value is null + Assert.assertEquals(null, table1.getBoolean("value")); + // ... and integer value is good + Assert.assertEquals((Integer) Integer.MAX_VALUE, table1.getInteger("value")); + } + + + /** + * Set a boolean and check that we can only get it back as a boolean and a string + * Check that attempting to lookup a non existent value returns null + */ + public void testBoolean() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setBoolean("value", true); + Assert.assertTrue(table1.propertyExists("value")); + + //Test Getting right value back + Assert.assertEquals((Boolean) true, table1.getBoolean("value")); + + //Check we don't get anything back for other gets + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //except value as a string + Assert.assertEquals("true", table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_BOOLEAN_PROPERTY_PREFIX, "value", null); + + // Should be able to get the null back + Assert.assertEquals(null, table1.getBoolean("value")); + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getBoolean("Rubbish")); + } + + /** + * Set a byte and check that we can only get it back as a byte and a string + * Check that attempting to lookup a non existent value returns null + */ + public void testByte() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setByte("value", Byte.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Byte.MAX_VALUE, table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_BYTE_PROPERTY_PREFIX, "value", null); + + // Should be able to get the null back + Assert.assertEquals(null, table1.getByte("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getByte("Rubbish")); + } + + /** + * Set a short and check that we can only get it back as a short and a string + * Check that attempting to lookup a non existent value returns null + */ + public void testShort() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setShort("value", Short.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(Short.MAX_VALUE, (short) table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Short.MAX_VALUE, table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_SHORT_PROPERTY_PREFIX, "value", null); + + // Should be able to get the null back + Assert.assertEquals(null, table1.getShort("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getShort("Rubbish")); + } + + + /** + * Set a char and check that we can only get it back as a char + * Check that attempting to lookup a non existent value returns null + */ + public void testChar() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setChar("value", 'c'); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals('c', (char) table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //... and a the string value of it. + Assert.assertEquals("c", table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_ASCII_CHARACTER_PROPERTY_PREFIX, "value", null); + + try + { + table1.getString("value"); + fail("Should throw NullPointerException"); + } + catch (NullPointerException npe) + { + //Normal Path + } + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getCharacter("Rubbish")); + } + + + /** + * Set a double and check that we can only get it back as a double + * Check that attempting to lookup a non existent value returns null + */ + public void testDouble() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setDouble("value", Double.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(Double.MAX_VALUE, (double) table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Double.MAX_VALUE, table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_DOUBLE_PROPERTY_PREFIX, "value", null); + + Assert.assertEquals(null, table1.getDouble("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getDouble("Rubbish")); + } + + + /** + * Set a float and check that we can only get it back as a float + * Check that attempting to lookup a non existent value returns null + */ + public void testFloat() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setFloat("value", Float.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(Float.MAX_VALUE, (float) table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Float.MAX_VALUE, table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_FLOAT_PROPERTY_PREFIX, "value", null); + + Assert.assertEquals(null, table1.getFloat("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getFloat("Rubbish")); + } + + + /** + * Set an int and check that we can only get it back as an int + * Check that attempting to lookup a non existent value returns null + */ + public void testInt() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setInteger("value", Integer.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(Integer.MAX_VALUE, (int) table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Integer.MAX_VALUE, table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_INT_PROPERTY_PREFIX, "value", null); + + Assert.assertEquals(null, table1.getInteger("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getInteger("Rubbish")); + } + + + /** + * Set a long and check that we can only get it back as a long + * Check that attempting to lookup a non existent value returns null + */ + public void testLong() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setLong("value", Long.MAX_VALUE); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(Long.MAX_VALUE, (long) table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + + //... and a the string value of it. + Assert.assertEquals("" + Long.MAX_VALUE, table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_LONG_PROPERTY_PREFIX, "value", null); + + Assert.assertEquals(null, table1.getLong("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getLong("Rubbish")); + } + + + /** + * Set a double and check that we can only get it back as a double + * Check that attempting to lookup a non existent value returns null + */ + public void testBytes() + { + byte[] bytes = {99, 98, 97, 96, 95}; + + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setBytes("value", bytes); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + assertBytesEqual(bytes, table1.getBytes("value")); + + //... and a the string value of it is null + Assert.assertEquals(null, table1.getString("value")); + + //Try setting a null value and read it back + table1.put(PropertyFieldTable.Prefix.AMQP_BINARY_PROPERTY_PREFIX, "value", null); + + Assert.assertEquals(null, table1.getBytes("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + // Table should now have zero size for encoding + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getBytes("Rubbish")); + } + + /** + * Calls all methods that can be used to check the table is empty + * - getEncodedSize + * - isEmpty + * - size + * + * @param table to check is empty + */ + private void checkEmpty(PropertyFieldTable table) + { + Assert.assertEquals(0, table.getEncodedSize()); + Assert.assertTrue(table.isEmpty()); + Assert.assertEquals(0, table.size()); + + Assert.assertEquals(0, table.keySet().size()); + Assert.assertEquals(0, table.values().size()); + Assert.assertEquals(0, table.entrySet().size()); + } + + + /** + * Set a String and check that we can only get it back as a String + * Check that attempting to lookup a non existent value returns null + */ + public void testString() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setString("value", "Hello"); + Assert.assertTrue(table1.propertyExists("value")); + + //Tets lookups we shouldn't get anything back for other gets + //we should get right value back for this type .... + Assert.assertEquals(null, table1.getBoolean("value")); + Assert.assertEquals(null, table1.getByte("value")); + Assert.assertEquals(null, table1.getShort("value")); + Assert.assertEquals(null, table1.getCharacter("value")); + Assert.assertEquals(null, table1.getDouble("value")); + Assert.assertEquals(null, table1.getFloat("value")); + Assert.assertEquals(null, table1.getInteger("value")); + Assert.assertEquals(null, table1.getLong("value")); + Assert.assertEquals(null, table1.getBytes("value")); + Assert.assertEquals("Hello", table1.getString("value")); + + //Try setting a null value and read it back + table1.setString("value", null); + + Assert.assertEquals(null, table1.getString("value")); + + //but still contains the value + Assert.assertTrue(table1.containsKey("value")); + + table1.remove("value"); + //but after a remove it doesn't + Assert.assertFalse(table1.containsKey("value")); + + checkEmpty(table1); + + //Looking up an invalid value returns null + Assert.assertEquals(null, table1.getString("Rubbish")); + + //Additional Test that haven't been covered for string + table1.setObject("value", "Hello"); + //Check that it was set correctly + Assert.assertEquals("Hello", table1.getString("value")); + } + + + /** + * Test that the generated XML can be used to create a field table with the same values. + */ + public void testValidXML() + { + PropertyFieldTable table1 = new PropertyFieldTable(); + table1.setBoolean("bool", true); + table1.setByte("byte", Byte.MAX_VALUE); + byte[] bytes = {99, 98, 97, 96, 95}; + table1.setBytes("bytes", bytes); + table1.setChar("char", 'c'); + table1.setDouble("double", Double.MAX_VALUE); + table1.setFloat("float", Float.MAX_VALUE); + table1.setInteger("int", Integer.MAX_VALUE); + table1.setLong("long", Long.MAX_VALUE); + table1.setShort("short", Short.MAX_VALUE); + table1.setString("string", "Hello"); + table1.setString("null-string", null); + + table1.setObject("object-bool", true); + table1.setObject("object-byte", Byte.MAX_VALUE); + table1.setObject("object-bytes", bytes); + table1.setObject("object-char", 'c'); + table1.setObject("object-double", Double.MAX_VALUE); + table1.setObject("object-float", Float.MAX_VALUE); + table1.setObject("object-int", Integer.MAX_VALUE); + table1.setObject("object-long", Long.MAX_VALUE); + table1.setObject("object-short", Short.MAX_VALUE); + table1.setObject("object-string", "Hello"); + + Assert.assertEquals(21, table1.size()); + + String table1XML = table1.toString(); + + PropertyFieldTable table2 = new PropertyFieldTable(table1XML); + + Assert.assertEquals(table1XML, table2.toString()); + + //Check that when bytes is written out as a string with no new line between items that it is read in ok. + + } + + /** + * Test that invalid input throws the correct Exception + */ + public void testInvalidXML() + { + try + { + _logger.warn("Testing Invalid XML expecting IllegalArgumentException"); + new PropertyFieldTable("Rubbish"); + fail("IllegalArgumentException expected"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + try + { + _logger.warn("Testing Invalid XML expecting IllegalArgumentException"); + new PropertyFieldTable(""); + fail("IllegalArgumentException expected"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + } + + + public void testKeyEnumeration() + { + PropertyFieldTable table = new PropertyFieldTable(); + table.setLong("one", 1L); + table.setLong("two", 2L); + table.setLong("three", 3L); + table.setLong("four", 4L); + table.setLong("five", 5L); + + Enumeration e = table.getPropertyNames(); + + Assert.assertTrue("one".equals(e.nextElement())); + Assert.assertTrue("two".equals(e.nextElement())); + Assert.assertTrue("three".equals(e.nextElement())); + Assert.assertTrue("four".equals(e.nextElement())); + Assert.assertTrue("five".equals(e.nextElement())); + } + + public void testValues() + { + PropertyFieldTable table = new PropertyFieldTable(); + table.setBoolean("bool", true); + table.setByte("byte", Byte.MAX_VALUE); + byte[] bytes = {99, 98, 97, 96, 95}; + table.setBytes("bytes", bytes); + table.setChar("char", 'c'); + table.setDouble("double", Double.MAX_VALUE); + table.setFloat("float", Float.MAX_VALUE); + table.setInteger("int", Integer.MAX_VALUE); + table.setLong("long", Long.MAX_VALUE); + table.setShort("short", Short.MAX_VALUE); + table.setString("string", "Hello"); + table.setString("null-string", null); + + table.setObject("object-bool", true); + table.setObject("object-byte", Byte.MAX_VALUE); + table.setObject("object-bytes", bytes); + table.setObject("object-char", 'c'); + table.setObject("object-double", Double.MAX_VALUE); + table.setObject("object-float", Float.MAX_VALUE); + table.setObject("object-int", Integer.MAX_VALUE); + table.setObject("object-long", Long.MAX_VALUE); + table.setObject("object-short", Short.MAX_VALUE); + table.setObject("object-string", "Hello"); + + + Assert.assertEquals((Boolean) true, table.getBoolean("bool")); + Assert.assertEquals((Byte) Byte.MAX_VALUE, table.getByte("byte")); + assertBytesEqual(bytes, table.getBytes("bytes")); + Assert.assertEquals((Character) 'c', table.getCharacter("char")); + Assert.assertEquals(Double.MAX_VALUE, table.getDouble("double")); + Assert.assertEquals(Float.MAX_VALUE, table.getFloat("float")); + Assert.assertEquals((Integer) Integer.MAX_VALUE, table.getInteger("int")); + Assert.assertEquals((Long) Long.MAX_VALUE, table.getLong("long")); + Assert.assertEquals((Short) Short.MAX_VALUE, table.getShort("short")); + Assert.assertEquals("Hello", table.getString("string")); + Assert.assertEquals(null, table.getString("null-string")); + + Assert.assertEquals(true, table.getObject("object-bool")); + Assert.assertEquals(Byte.MAX_VALUE, table.getObject("object-byte")); + assertBytesEqual(bytes, (byte[]) table.getObject("object-bytes")); + Assert.assertEquals('c', table.getObject("object-char")); + Assert.assertEquals(Double.MAX_VALUE, table.getObject("object-double")); + Assert.assertEquals(Float.MAX_VALUE, table.getObject("object-float")); + Assert.assertEquals(Integer.MAX_VALUE, table.getObject("object-int")); + Assert.assertEquals(Long.MAX_VALUE, table.getObject("object-long")); + Assert.assertEquals(Short.MAX_VALUE, table.getObject("object-short")); + Assert.assertEquals("Hello", table.getObject("object-string")); + } + + + public void testwriteBuffer() + { + byte[] bytes = {99, 98, 97, 96, 95}; + + PropertyFieldTable table = new PropertyFieldTable(); + table.setBoolean("bool", true); + table.setByte("byte", Byte.MAX_VALUE); + + table.setBytes("bytes", bytes); + table.setChar("char", 'c'); + table.setDouble("double", Double.MAX_VALUE); + table.setFloat("float", Float.MAX_VALUE); + table.setInteger("int", Integer.MAX_VALUE); + table.setLong("long", Long.MAX_VALUE); + table.setShort("short", Short.MAX_VALUE); + table.setString("string", "hello"); + table.setString("null-string", null); + + + final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize()); // FIXME XXX: Is cast a problem? + + table.writeToBuffer(buffer); + + buffer.flip(); + + long length = buffer.getUnsignedInt(); + + try + { + PropertyFieldTable table2 = new PropertyFieldTable(buffer, length); + + Assert.assertEquals((Boolean) true, table2.getBoolean("bool")); + Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte")); + assertBytesEqual(bytes, table2.getBytes("bytes")); + Assert.assertEquals((Character) 'c', table2.getCharacter("char")); + Assert.assertEquals(Double.MAX_VALUE, table2.getDouble("double")); + Assert.assertEquals(Float.MAX_VALUE, table2.getFloat("float")); + Assert.assertEquals((Integer) Integer.MAX_VALUE, table2.getInteger("int")); + Assert.assertEquals((Long) Long.MAX_VALUE, table2.getLong("long")); + Assert.assertEquals((Short) Short.MAX_VALUE, table2.getShort("short")); + Assert.assertEquals("hello", table2.getString("string")); + Assert.assertEquals(null, table2.getString("null-string")); + + } + catch (AMQFrameDecodingException e) + { + e.printStackTrace(); + fail("PFT should be instantiated from bytes." + e.getCause()); + } + } + + public void testEncodingSize() + { + PropertyFieldTable result = new PropertyFieldTable(); + int size = 0; + + result.setBoolean("boolean", true); + size += 1 + EncodingUtils.encodedShortStringLength("boolean") + EncodingUtils.encodedBooleanLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + + result.setByte("byte", (byte) Byte.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("byte") + EncodingUtils.encodedByteLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + + byte[] _bytes = {99, 98, 97, 96, 95}; + + result.setBytes("bytes", _bytes); + size += 1 + EncodingUtils.encodedShortStringLength("bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length; + Assert.assertEquals(size, result.getEncodedSize()); + + result.setChar("char", (char) 'c'); + size += 1 + EncodingUtils.encodedShortStringLength("char") + EncodingUtils.encodedCharLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setDouble("double", (double) Double.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("double") + EncodingUtils.encodedDoubleLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setFloat("float", (float) Float.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("float") + EncodingUtils.encodedFloatLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setInteger("int", (int) Integer.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("int") + EncodingUtils.encodedIntegerLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + + result.setLong("long", (long) Long.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("long") + EncodingUtils.encodedLongLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setShort("short", (short) Short.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("short") + EncodingUtils.encodedShortLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setString("result", "Hello"); + size += 1 + EncodingUtils.encodedShortStringLength("result") + EncodingUtils.encodedLongStringLength("Hello"); + Assert.assertEquals(size, result.getEncodedSize()); + + + result.setObject("object-bool", true); + size += 1 + EncodingUtils.encodedShortStringLength("object-bool") + EncodingUtils.encodedBooleanLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setObject("object-byte", Byte.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("object-byte") + EncodingUtils.encodedByteLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setObject("object-bytes", _bytes); + size += 1 + EncodingUtils.encodedShortStringLength("object-bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length; + Assert.assertEquals(size, result.getEncodedSize()); + + result.setObject("object-char", 'c'); + size += 1 + EncodingUtils.encodedShortStringLength("object-char") + EncodingUtils.encodedCharLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setObject("object-double", Double.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("object-double") + EncodingUtils.encodedDoubleLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setObject("object-float", Float.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("object-float") + EncodingUtils.encodedFloatLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setObject("object-int", Integer.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("object-int") + EncodingUtils.encodedIntegerLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + + result.setObject("object-long", Long.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("object-long") + EncodingUtils.encodedLongLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + result.setObject("object-short", Short.MAX_VALUE); + size += 1 + EncodingUtils.encodedShortStringLength("object-short") + EncodingUtils.encodedShortLength(); + Assert.assertEquals(size, result.getEncodedSize()); + + } + +// public void testEncodingSize1() +// { +// PropertyFieldTable table = new PropertyFieldTable(); +// int size = 0; +// result.put("one", 1L); +// size = EncodingUtils.encodedShortStringLength("one"); +// size += 1 + EncodingUtils.encodedLongLength(); +// assertEquals(size, result.getEncodedSize()); +// +// result.put("two", 2L); +// size += EncodingUtils.encodedShortStringLength("two"); +// size += 1 + EncodingUtils.encodedLongLength(); +// assertEquals(size, result.getEncodedSize()); +// +// result.put("three", 3L); +// size += EncodingUtils.encodedShortStringLength("three"); +// size += 1 + EncodingUtils.encodedLongLength(); +// assertEquals(size, result.getEncodedSize()); +// +// result.put("four", 4L); +// size += EncodingUtils.encodedShortStringLength("four"); +// size += 1 + EncodingUtils.encodedLongLength(); +// assertEquals(size, result.getEncodedSize()); +// +// result.put("five", 5L); +// size += EncodingUtils.encodedShortStringLength("five"); +// size += 1 + EncodingUtils.encodedLongLength(); +// assertEquals(size, result.getEncodedSize()); +// +// //fixme should perhaps be expanded to incorporate all types. +// +// final ByteBuffer buffer = ByteBuffer.allocate((int) result.getEncodedSize()); // FIXME XXX: Is cast a problem? +// +// result.writeToBuffer(buffer); +// +// buffer.flip(); +// +// long length = buffer.getUnsignedInt(); +// +// try +// { +// PropertyFieldTable table2 = new PropertyFieldTable(buffer, length); +// +// Assert.assertEquals((Long) 1L, table2.getLong("one")); +// Assert.assertEquals((Long) 2L, table2.getLong("two")); +// Assert.assertEquals((Long) 3L, table2.getLong("three")); +// Assert.assertEquals((Long) 4L, table2.getLong("four")); +// Assert.assertEquals((Long) 5L, table2.getLong("five")); +// } +// catch (AMQFrameDecodingException e) +// { +// e.printStackTrace(); +// fail("PFT should be instantiated from bytes." + e.getCause()); +// } +// +// } + + + /** + * Additional test for setObject + */ + public void testSetObject() + { + PropertyFieldTable table = new PropertyFieldTable(); + + //Try setting a non primative object + + try + { + table.setObject("value", this); + fail("Only primative values allowed in setObject"); + } + catch (AMQPInvalidClassException iae) + { + //normal path + } + // so size should be zero + Assert.assertEquals(0, table.getEncodedSize()); + } + + /** + * Additional test checkPropertyName doesn't accept Null + */ + public void testCheckPropertyNameasNull() + { + PropertyFieldTable table = new PropertyFieldTable(); + + try + { + table.setObject(null, "String"); + fail("Null property name is not allowed"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + // so size should be zero + Assert.assertEquals(0, table.getEncodedSize()); + } + + + /** + * Additional test checkPropertyName doesn't accept an empty String + */ + public void testCheckPropertyNameasEmptyString() + { + PropertyFieldTable table = new PropertyFieldTable(); + + try + { + table.setObject("", "String"); + fail("empty property name is not allowed"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + // so size should be zero + Assert.assertEquals(0, table.getEncodedSize()); + } + + + /** + * Additional test checkPropertyName doesn't accept an empty String + */ + public void testCheckPropertyNamehasMaxLength() + { + PropertyFieldTable table = new PropertyFieldTable(); + + StringBuffer longPropertyName = new StringBuffer(129); + + for (int i = 0; i < 129; i++) + { + longPropertyName.append("x"); + } + + try + { + table.setObject(longPropertyName.toString(), "String"); + fail("property name must be < 128 characters"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + // so size should be zero + Assert.assertEquals(0, table.getEncodedSize()); + } + + + /** + * Additional test checkPropertyName starts with a letter + */ + public void testCheckPropertyNameStartCharacterIsLetter() + { + PropertyFieldTable table = new PropertyFieldTable(); + + //Try a name that starts with a number + try + { + table.setObject("1", "String"); + fail("property name must start with a letter"); + } + catch (IllegalArgumentException iae) + { + //normal path + } + // so size should be zero + Assert.assertEquals(0, table.getEncodedSize()); + } + + + /** + * Additional test checkPropertyName starts with a hash or a dollar + */ + public void testCheckPropertyNameStartCharacterIsHashorDollar() + { + PropertyFieldTable table = new PropertyFieldTable(); + + //Try a name that starts with a number + try + { + table.setObject("#", "String"); + table.setObject("$", "String"); + } + catch (IllegalArgumentException iae) + { + fail("property name are allowed to start with # and $s"); + } + } + + + /** + * Additional test to test the contents of the table + */ + public void testContents() + { + PropertyFieldTable table = new PropertyFieldTable(); + + table.put("StringProperty", "String"); + + Assert.assertTrue(table.containsValue("String")); + + Assert.assertEquals("String", table.get("StringProperty")); + + //Test Clear + + table.clear(); + + checkEmpty(table); + } + + /** + * Test the contents of the sets + */ + public void testSets() + { + + PropertyFieldTable table = new PropertyFieldTable(); + + table.put("n1", "1"); + table.put("n2", "2"); + table.put("n3", "3"); + + Iterator iterator = table.keySet().iterator(); + Assert.assertEquals("n1", iterator.next()); + Assert.assertEquals("n2", iterator.next()); + Assert.assertEquals("n3", iterator.next()); + Assert.assertFalse(iterator.hasNext()); + + + iterator = table.values().iterator(); + Assert.assertEquals("1", iterator.next()); + Assert.assertEquals("2", iterator.next()); + Assert.assertEquals("3", iterator.next()); + Assert.assertFalse(iterator.hasNext()); + + + iterator = table.entrySet().iterator(); + Map.Entry entry = (Map.Entry) iterator.next(); + Assert.assertEquals("n1", entry.getKey()); + Assert.assertEquals("1", entry.getValue()); + entry = (Map.Entry) iterator.next(); + Assert.assertEquals("n2", entry.getKey()); + Assert.assertEquals("2", entry.getValue()); + entry = (Map.Entry) iterator.next(); + Assert.assertEquals("n3", entry.getKey()); + Assert.assertEquals("3", entry.getValue()); + Assert.assertFalse(iterator.hasNext()); + + + } + + + /** + * Test that all the values are preserved after a putAll + */ + public void testPutAll() + { + Map map = new HashMap(); + + map.put("char", 'c'); + map.put("double", Double.MAX_VALUE); + map.put("float", Float.MAX_VALUE); + map.put("int", Integer.MAX_VALUE); + map.put("long", Long.MAX_VALUE); + map.put("short", Short.MAX_VALUE); + + PropertyFieldTable table = new PropertyFieldTable(); + + table.putAll(map); + + Assert.assertEquals(6, table.size()); + + Assert.assertTrue(table.containsKey("char")); + Assert.assertEquals('c', (char) table.getCharacter("char")); + Assert.assertTrue(table.containsKey("double")); + Assert.assertEquals(Double.MAX_VALUE, table.getDouble("double")); + Assert.assertTrue(table.containsKey("float")); + Assert.assertEquals(Float.MAX_VALUE, table.getFloat("float")); + Assert.assertTrue(table.containsKey("int")); + Assert.assertEquals(Integer.MAX_VALUE, (int) table.getInteger("int")); + Assert.assertTrue(table.containsKey("long")); + Assert.assertEquals(Long.MAX_VALUE, (long) table.getLong("long")); + Assert.assertTrue(table.containsKey("short")); + Assert.assertEquals(Short.MAX_VALUE, (short) table.getShort("short")); + Assert.assertEquals(Short.MAX_VALUE, (short) table.getShort("short")); + } + + + private void assertBytesEqual(byte[] expected, byte[] actual) + { + Assert.assertEquals(expected.length, actual.length); + + for (int index = 0; index < expected.length; index++) + { + Assert.assertEquals(expected[index], actual[index]); + } + } + + private void assertBytesNotEqual(byte[] expected, byte[] actual) + { + Assert.assertEquals(expected.length, actual.length); + + for (int index = 0; index < expected.length; index++) + { + Assert.assertFalse(expected[index] == actual[index]); + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(PropertyFieldTableTest.class); + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java new file mode 100644 index 0000000000..972a935257 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.pool; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.qpid.session.TestSession; +import org.apache.mina.common.IoFilter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IdleStatus; + +import java.util.concurrent.RejectedExecutionException; + +public class PoolingFilterTest extends TestCase +{ + private PoolingFilter _pool; + ReferenceCountingExecutorService _executorService; + + public void setUp() + { + //Create Pool + _executorService = ReferenceCountingExecutorService.getInstance(); + _executorService.acquireExecutorService(); + _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS, + "AsynchronousWriteFilter"); + + } + + public void testRejectedExecution() throws Exception + { + _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message")); + + //Shutdown the pool + _executorService.getPool().shutdownNow(); + + try + { + //prior to fix for QPID-172 this would throw RejectedExecutionException + _pool.filterWrite(null, new TestSession(), null); + } + catch (RejectedExecutionException rje) + { + Assert.fail("RejectedExecutionException should not occur after pool has shutdown:" + rje); + } + } + + private static class NoOpFilter implements IoFilter.NextFilter + { + + public void sessionOpened(IoSession session) + { + } + + public void sessionClosed(IoSession session) + { + } + + public void sessionIdle(IoSession session, IdleStatus status) + { + } + + public void exceptionCaught(IoSession session, Throwable cause) + { + } + + public void messageReceived(IoSession session, Object message) + { + } + + public void messageSent(IoSession session, Object message) + { + } + + public void filterWrite(IoSession session, IoFilter.WriteRequest writeRequest) + { + } + + public void filterClose(IoSession session) + { + } + + public void sessionCreated(IoSession session) + { + } + } +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java new file mode 100644 index 0000000000..f10d55e9d0 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java @@ -0,0 +1,273 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.session; + +import org.apache.mina.common.*; + +import java.net.SocketAddress; +import java.util.Set; + +public class TestSession implements IoSession +{ + public TestSession() + { + } + + public IoService getService() + { + return null; //TODO + } + + public IoServiceConfig getServiceConfig() + { + return null; //TODO + } + + public IoHandler getHandler() + { + return null; //TODO + } + + public IoSessionConfig getConfig() + { + return null; //TODO + } + + public IoFilterChain getFilterChain() + { + return null; //TODO + } + + public WriteFuture write(Object message) + { + return null; //TODO + } + + public CloseFuture close() + { + return null; //TODO + } + + public Object getAttachment() + { + return null; //TODO + } + + public Object setAttachment(Object attachment) + { + return null; //TODO + } + + public Object getAttribute(String key) + { + return null; //TODO + } + + public Object setAttribute(String key, Object value) + { + return null; //TODO + } + + public Object setAttribute(String key) + { + return null; //TODO + } + + public Object removeAttribute(String key) + { + return null; //TODO + } + + public boolean containsAttribute(String key) + { + return false; //TODO + } + + public Set getAttributeKeys() + { + return null; //TODO + } + + public TransportType getTransportType() + { + return null; //TODO + } + + public boolean isConnected() + { + return false; //TODO + } + + public boolean isClosing() + { + return false; //TODO + } + + public CloseFuture getCloseFuture() + { + return null; //TODO + } + + public SocketAddress getRemoteAddress() + { + return null; //TODO + } + + public SocketAddress getLocalAddress() + { + return null; //TODO + } + + public SocketAddress getServiceAddress() + { + return null; //TODO + } + + public int getIdleTime(IdleStatus status) + { + return 0; //TODO + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //TODO + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //TODO + } + + public int getWriteTimeout() + { + return 0; //TODO + } + + public long getWriteTimeoutInMillis() + { + return 0; //TODO + } + + public void setWriteTimeout(int writeTimeout) + { + //TODO + } + + public TrafficMask getTrafficMask() + { + return null; //TODO + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //TODO + } + + public void suspendRead() + { + //TODO + } + + public void suspendWrite() + { + //TODO + } + + public void resumeRead() + { + //TODO + } + + public void resumeWrite() + { + //TODO + } + + public long getReadBytes() + { + return 0; //TODO + } + + public long getWrittenBytes() + { + return 0; //TODO + } + + public long getReadMessages() + { + return 0; + } + + public long getWrittenMessages() + { + return 0; + } + + public long getWrittenWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //TODO + } + + public long getLastIoTime() + { + return 0; //TODO + } + + public long getLastReadTime() + { + return 0; //TODO + } + + public long getLastWriteTime() + { + return 0; //TODO + } + + public boolean isIdle(IdleStatus status) + { + return false; //TODO + } + + public int getIdleCount(IdleStatus status) + { + return 0; //TODO + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //TODO + } +} |
