summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:29:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:29:21 +0000
commit95fc93485ab66966713611a4e1429d917dabde64 (patch)
tree09ee31bc9462449dbcfc62379a393017c8f39843 /qpid/java/common/src/main
parent28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (diff)
downloadqpid-python-95fc93485ab66966713611a4e1429d917dabde64.tar.gz
QPID-6164 : Add synchronous publish capability to 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java143
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java105
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java77
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java2
14 files changed, 383 insertions, 1 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
index 5048193cac..9bdc1dd889 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
@@ -209,6 +209,9 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
case 0x003c0048:
BasicGetEmptyBody.process(in, channelMethodProcessor);
break;
+ case 0x003c0050:
+ BasicAckBody.process(in, channelMethodProcessor);
+ break;
case 0x003c0065:
if(!channelMethodProcessor.ignoreAllButCloseOk())
{
@@ -221,6 +224,18 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
channelMethodProcessor.receiveBasicRecoverSyncOk();
}
break;
+ case 0x003c0078:
+ BasicNackBody.process(in, channelMethodProcessor);
+ break;
+
+ // CONFIRM CLASS:
+
+ case 0x0055000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveConfirmSelectOk();
+ }
+ break;
// TX_CLASS:
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
index 3b138ba278..32a45da60c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
@@ -197,6 +197,15 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
case 0x003c006e:
BasicRecoverSyncBody.process(in, channelMethodProcessor);
break;
+ case 0x003c0078:
+ BasicNackBody.process(in, channelMethodProcessor);
+ break;
+
+ // CONFIRM CLASS:
+
+ case 0x0055000a:
+ ConfirmSelectBody.process(in, channelMethodProcessor);
+ break;
// TX_CLASS:
@@ -219,6 +228,7 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
}
break;
+
default:
throw newUnknownMethodException(classId, methodId,
methodProcessor.getProtocolVersion());
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
index 68782231fe..01ca6f95f1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
@@ -113,7 +113,7 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
}
public static void process(final MarkableDataInput buffer,
- final ServerChannelMethodProcessor dispatcher) throws IOException
+ final ChannelMethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
new file mode 100644
index 0000000000..33ccb10f39
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 8-0
+ */
+
+package org.apache.qpid.framing;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class BasicNackBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
+{
+
+ public static final int CLASS_ID = 60;
+ public static final int METHOD_ID = 120;
+
+ // Fields declared in specification
+ private final long _deliveryTag; // [deliveryTag]
+ private final byte _bitfield0; // [multiple]
+
+ // Constructor
+ public BasicNackBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
+ {
+ _deliveryTag = buffer.readLong();
+ _bitfield0 = buffer.readByte();
+ }
+
+ public BasicNackBody(
+ long deliveryTag,
+ boolean multiple,
+ boolean requeue
+ )
+ {
+ _deliveryTag = deliveryTag;
+ byte bitfield0 = (byte)0;
+ if( multiple )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | 1);
+
+ }
+ if( requeue )
+ {
+ bitfield0 = (byte) (((int) bitfield0) | 2);
+ }
+ _bitfield0 = bitfield0;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+ public final long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ public final boolean getMultiple()
+ {
+ return (((int)(_bitfield0)) & 1) != 0;
+ }
+
+ public final boolean getRequeue()
+ {
+ return (((int)(_bitfield0)) & 2 ) != 0;
+ }
+
+ protected int getBodySize()
+ {
+ int size = 9;
+ return size;
+ }
+
+ public void writeMethodPayload(DataOutput buffer) throws IOException
+ {
+ writeLong( buffer, _deliveryTag );
+ writeBitfield( buffer, _bitfield0 );
+ }
+
+ public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
+ {
+ return dispatcher.dispatchBasicNack(this, channelId);
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[BasicNackBodyImpl: ");
+ buf.append( "deliveryTag=" );
+ buf.append( getDeliveryTag() );
+ buf.append( ", " );
+ buf.append( "multiple=" );
+ buf.append( getMultiple() );
+ buf.append( ", " );
+ buf.append( "requeue=" );
+ buf.append( getRequeue() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public static void process(final MarkableDataInput buffer,
+ final ChannelMethodProcessor dispatcher) throws IOException
+ {
+
+ long deliveryTag = buffer.readLong();
+ byte bitfield = buffer.readByte();
+ boolean multiple = (bitfield & 0x01) != 0;
+ boolean requeue = (bitfield & 0x02) != 0;
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveBasicNack(deliveryTag, multiple, requeue);
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
index 84cd1e13c2..75fbb15629 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
@@ -35,4 +35,8 @@ public interface ChannelMethodProcessor
void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
boolean ignoreAllButCloseOk();
+
+ void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue);
+
+ void receiveBasicAck(long deliveryTag, boolean multiple);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
index bef143e39b..289a284d6b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
@@ -75,4 +75,5 @@ public interface ClientChannelMethodProcessor extends ChannelMethodProcessor
void receiveTxRollbackOk();
+ void receiveConfirmSelectOk();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
index 97de0ac487..4f1b6b917e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
@@ -72,4 +72,6 @@ public interface ClientMethodDispatcher
throws AMQException;
boolean dispatchChannelAlert(ChannelAlertBody channelAlertBody, int channelId) throws AMQException;
+
+ boolean dispatchConfirmSelectOk(ConfirmSelectOkBody confirmSelectOkBody, int channelId) throws AMQException;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
new file mode 100644
index 0000000000..7f9e56caa6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 8-0
+ */
+
+package org.apache.qpid.framing;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+
+public class ConfirmSelectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
+{
+
+ public static final int CLASS_ID = 85;
+ public static final int METHOD_ID = 10;
+
+ // Fields declared in specification
+ private final boolean _nowait; // [active]
+
+ // Constructor
+ public ConfirmSelectBody(MarkableDataInput buffer) throws AMQFrameDecodingException, IOException
+ {
+ _nowait = (buffer.readByte() & 0x01) == 0x01;
+ }
+
+ public ConfirmSelectBody(boolean nowait)
+ {
+ _nowait = nowait;
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+ public final boolean getNowait()
+ {
+ return _nowait;
+ }
+
+ protected int getBodySize()
+ {
+ return 1;
+ }
+
+ public void writeMethodPayload(DataOutput buffer) throws IOException
+ {
+ writeBitfield( buffer, _nowait ? (byte)1 : (byte)0 );
+ }
+
+ public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
+ {
+ return dispatcher.dispatchConfirmSelect(this, channelId);
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("[ConfirmSelectBody: ");
+ buf.append( "active=" );
+ buf.append( getNowait() );
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public static void process(final MarkableDataInput buffer,
+ final ServerChannelMethodProcessor dispatcher)
+ throws IOException
+ {
+ boolean nowait = (buffer.readByte() & 0x01) == 0x01;
+ if(!dispatcher.ignoreAllButCloseOk())
+ {
+ dispatcher.receiveConfirmSelect(nowait);
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
new file mode 100644
index 0000000000..d0e3bd093d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP version:
+ * 8-0
+ */
+
+package org.apache.qpid.framing;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.qpid.AMQException;
+
+public class ConfirmSelectOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
+{
+
+ public static final int CLASS_ID = 85;
+ public static final int METHOD_ID = 11;
+
+ public static final ConfirmSelectOkBody INSTANCE = new ConfirmSelectOkBody();
+
+ private ConfirmSelectOkBody()
+ {
+ }
+
+ public int getClazz()
+ {
+ return CLASS_ID;
+ }
+
+ public int getMethod()
+ {
+ return METHOD_ID;
+ }
+
+
+ protected int getBodySize()
+ {
+ return 0;
+ }
+
+ public void writeMethodPayload(DataOutput buffer) throws IOException
+ {
+ }
+
+ public boolean execute(MethodDispatcher dispatcher, int channelId) throws AMQException
+ {
+ return dispatcher.dispatchConfirmSelectOk(this, channelId);
+ }
+
+ public String toString()
+ {
+ return "[ConfirmSelectOkBody]";
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
index 19b091a359..0d160a73d5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
@@ -358,6 +358,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati
}
@Override
+ public void receiveConfirmSelectOk()
+ {
+ _processedMethods.add(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE));
+ }
+
+ @Override
public void receiveAccessRequest(final AMQShortString realm,
final boolean exclusive,
final boolean passive,
@@ -564,6 +570,12 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati
}
@Override
+ public void receiveConfirmSelect(final boolean nowait)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new ConfirmSelectBody(nowait)));
+ }
+
+ @Override
public void receiveChannelFlow(final boolean active)
{
_processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active)));
@@ -607,5 +619,11 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati
{
return false;
}
+
+ @Override
+ public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue)
+ {
+ _processedMethods.add(new AMQFrame(_channelId, new BasicNackBody(deliveryTag, multiple, requeue)));
+ }
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
index 03b122a7a7..a485397a5e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
@@ -32,4 +32,6 @@ package org.apache.qpid.framing;
public interface MethodDispatcher extends
ClientMethodDispatcher, ServerMethodDispatcher
{
+
+ boolean dispatchBasicNack(BasicNackBody basicNackBody, int channelId);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
index 89b75c2d2f..6d43accc96 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
@@ -89,4 +89,5 @@ public interface ServerChannelMethodProcessor extends ChannelMethodProcessor
void receiveTxRollback();
+ void receiveConfirmSelect(boolean nowait);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
index f4ab67dad4..d3961a1a59 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
@@ -68,4 +68,6 @@ public interface ServerMethodDispatcher
boolean dispatchQueueUnbind(QueueUnbindBody queueUnbindBody, int channelId) throws AMQException;
boolean dispatchBasicRecoverSync(BasicRecoverSyncBody basicRecoverSyncBody, int channelId) throws AMQException;
+
+ boolean dispatchConfirmSelect(ConfirmSelectBody confirmSelectBody, int channelId) throws AMQException;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
index 8f1a1d0be0..4f88fe7071 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
@@ -60,6 +60,8 @@ public class ConnectionStartProperties
public static final String SESSION_FLOW = "qpid.session_flow";
+ public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported";
+
public static int _pid;
public static final String _platformInfo;