diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:29:21 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:29:21 +0000 |
| commit | 95fc93485ab66966713611a4e1429d917dabde64 (patch) | |
| tree | 09ee31bc9462449dbcfc62379a393017c8f39843 /qpid/java/common/src/main | |
| parent | 28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (diff) | |
| download | qpid-python-95fc93485ab66966713611a4e1429d917dabde64.tar.gz | |
QPID-6164 : Add synchronous publish capability to 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
14 files changed, 383 insertions, 1 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java index 5048193cac..9bdc1dd889 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java @@ -209,6 +209,9 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl case 0x003c0048: BasicGetEmptyBody.process(in, channelMethodProcessor); break; + case 0x003c0050: + BasicAckBody.process(in, channelMethodProcessor); + break; case 0x003c0065: if(!channelMethodProcessor.ignoreAllButCloseOk()) { @@ -221,6 +224,18 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl channelMethodProcessor.receiveBasicRecoverSyncOk(); } break; + case 0x003c0078: + BasicNackBody.process(in, channelMethodProcessor); + break; + + // CONFIRM CLASS: + + case 0x0055000b: + if(!channelMethodProcessor.ignoreAllButCloseOk()) + { + channelMethodProcessor.receiveConfirmSelectOk(); + } + break; // TX_CLASS: diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java index 3b138ba278..32a45da60c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java @@ -197,6 +197,15 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se case 0x003c006e: BasicRecoverSyncBody.process(in, channelMethodProcessor); break; + case 0x003c0078: + BasicNackBody.process(in, channelMethodProcessor); + break; + + // CONFIRM CLASS: + + case 0x0055000a: + ConfirmSelectBody.process(in, channelMethodProcessor); + break; // TX_CLASS: @@ -219,6 +228,7 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se } break; + default: throw newUnknownMethodException(classId, methodId, methodProcessor.getProtocolVersion()); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java index 68782231fe..01ca6f95f1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java @@ -113,7 +113,7 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB } public static void process(final MarkableDataInput buffer, - final ServerChannelMethodProcessor dispatcher) throws IOException + final ChannelMethodProcessor dispatcher) throws IOException { long deliveryTag = buffer.readLong(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java new file mode 100644 index 0000000000..33ccb10f39 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java @@ -0,0 +1,143 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP version: + * 8-0 + */ + +package org.apache.qpid.framing; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; + +public class BasicNackBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody +{ + + public static final int CLASS_ID = 60; + public static final int METHOD_ID = 120; + + // Fields declared in specification + private final long _deliveryTag; // [deliveryTag] + private final byte _bitfield0; // [multiple] + + // Constructor + public BasicNackBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException + { + _deliveryTag = buffer.readLong(); + _bitfield0 = buffer.readByte(); + } + + public BasicNackBody( + long deliveryTag, + boolean multiple, + boolean requeue + ) + { + _deliveryTag = deliveryTag; + byte bitfield0 = (byte)0; + if( multiple ) + { + bitfield0 = (byte) (((int) bitfield0) | 1); + + } + if( requeue ) + { + bitfield0 = (byte) (((int) bitfield0) | 2); + } + _bitfield0 = bitfield0; + } + + public int getClazz() + { + return CLASS_ID; + } + + public int getMethod() + { + return METHOD_ID; + } + + public final long getDeliveryTag() + { + return _deliveryTag; + } + + public final boolean getMultiple() + { + return (((int)(_bitfield0)) & 1) != 0; + } + + public final boolean getRequeue() + { + return (((int)(_bitfield0)) & 2 ) != 0; + } + + protected int getBodySize() + { + int size = 9; + return size; + } + + public void writeMethodPayload(DataOutput buffer) throws IOException + { + writeLong( buffer, _deliveryTag ); + writeBitfield( buffer, _bitfield0 ); + } + + public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException + { + return dispatcher.dispatchBasicNack(this, channelId); + } + + public String toString() + { + StringBuilder buf = new StringBuilder("[BasicNackBodyImpl: "); + buf.append( "deliveryTag=" ); + buf.append( getDeliveryTag() ); + buf.append( ", " ); + buf.append( "multiple=" ); + buf.append( getMultiple() ); + buf.append( ", " ); + buf.append( "requeue=" ); + buf.append( getRequeue() ); + buf.append("]"); + return buf.toString(); + } + + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor dispatcher) throws IOException + { + + long deliveryTag = buffer.readLong(); + byte bitfield = buffer.readByte(); + boolean multiple = (bitfield & 0x01) != 0; + boolean requeue = (bitfield & 0x02) != 0; + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveBasicNack(deliveryTag, multiple, requeue); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java index 84cd1e13c2..75fbb15629 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java @@ -35,4 +35,8 @@ public interface ChannelMethodProcessor void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize); boolean ignoreAllButCloseOk(); + + void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue); + + void receiveBasicAck(long deliveryTag, boolean multiple); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java index bef143e39b..289a284d6b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java @@ -75,4 +75,5 @@ public interface ClientChannelMethodProcessor extends ChannelMethodProcessor void receiveTxRollbackOk(); + void receiveConfirmSelectOk(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java index 97de0ac487..4f1b6b917e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java @@ -72,4 +72,6 @@ public interface ClientMethodDispatcher throws AMQException; boolean dispatchChannelAlert(ChannelAlertBody channelAlertBody, int channelId) throws AMQException; + + boolean dispatchConfirmSelectOk(ConfirmSelectOkBody confirmSelectOkBody, int channelId) throws AMQException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java new file mode 100644 index 0000000000..7f9e56caa6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP version: + * 8-0 + */ + +package org.apache.qpid.framing; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; + +public class ConfirmSelectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody +{ + + public static final int CLASS_ID = 85; + public static final int METHOD_ID = 10; + + // Fields declared in specification + private final boolean _nowait; // [active] + + // Constructor + public ConfirmSelectBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException + { + _nowait = (buffer.readByte() & 0x01) == 0x01; + } + + public ConfirmSelectBody(boolean nowait) + { + _nowait = nowait; + } + + public int getClazz() + { + return CLASS_ID; + } + + public int getMethod() + { + return METHOD_ID; + } + + public final boolean getNowait() + { + return _nowait; + } + + protected int getBodySize() + { + return 1; + } + + public void writeMethodPayload(DataOutput buffer) throws IOException + { + writeBitfield( buffer, _nowait ? (byte)1 : (byte)0 ); + } + + public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException + { + return dispatcher.dispatchConfirmSelect(this, channelId); + } + + public String toString() + { + StringBuilder buf = new StringBuilder("[ConfirmSelectBody: "); + buf.append( "active=" ); + buf.append( getNowait() ); + buf.append("]"); + return buf.toString(); + } + + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) + throws IOException + { + boolean nowait = (buffer.readByte() & 0x01) == 0x01; + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveConfirmSelect(nowait); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java new file mode 100644 index 0000000000..d0e3bd093d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. + * Supported AMQP version: + * 8-0 + */ + +package org.apache.qpid.framing; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.qpid.AMQException; + +public class ConfirmSelectOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody +{ + + public static final int CLASS_ID = 85; + public static final int METHOD_ID = 11; + + public static final ConfirmSelectOkBody INSTANCE = new ConfirmSelectOkBody(); + + private ConfirmSelectOkBody() + { + } + + public int getClazz() + { + return CLASS_ID; + } + + public int getMethod() + { + return METHOD_ID; + } + + + protected int getBodySize() + { + return 0; + } + + public void writeMethodPayload(DataOutput buffer) throws IOException + { + } + + public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException + { + return dispatcher.dispatchConfirmSelectOk(this, channelId); + } + + public String toString() + { + return "[ConfirmSelectOkBody]"; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java index 19b091a359..0d160a73d5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java @@ -358,6 +358,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati } @Override + public void receiveConfirmSelectOk() + { + _processedMethods.add(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE)); + } + + @Override public void receiveAccessRequest(final AMQShortString realm, final boolean exclusive, final boolean passive, @@ -564,6 +570,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati } @Override + public void receiveConfirmSelect(final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new ConfirmSelectBody(nowait))); + } + + @Override public void receiveChannelFlow(final boolean active) { _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active))); @@ -607,5 +619,11 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati { return false; } + + @Override + public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicNackBody(deliveryTag, multiple, requeue))); + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java index 03b122a7a7..a485397a5e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java @@ -32,4 +32,6 @@ package org.apache.qpid.framing; public interface MethodDispatcher extends ClientMethodDispatcher, ServerMethodDispatcher { + + boolean dispatchBasicNack(BasicNackBody basicNackBody, int channelId); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java index 89b75c2d2f..6d43accc96 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java @@ -89,4 +89,5 @@ public interface ServerChannelMethodProcessor extends ChannelMethodProcessor void receiveTxRollback(); + void receiveConfirmSelect(boolean nowait); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java index f4ab67dad4..d3961a1a59 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java @@ -68,4 +68,6 @@ public interface ServerMethodDispatcher boolean dispatchQueueUnbind(QueueUnbindBody queueUnbindBody, int channelId) throws AMQException; boolean dispatchBasicRecoverSync(BasicRecoverSyncBody basicRecoverSyncBody, int channelId) throws AMQException; + + boolean dispatchConfirmSelect(ConfirmSelectBody confirmSelectBody, int channelId) throws AMQException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java index 8f1a1d0be0..4f88fe7071 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -60,6 +60,8 @@ public class ConnectionStartProperties public static final String SESSION_FLOW = "qpid.session_flow"; + public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported"; + public static int _pid; public static final String _platformInfo; |
