From 7673f4e000757a72f5da72ba05174ac982f4da8a Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Thu, 13 Sep 2007 21:42:57 +0000 Subject: * moved most of the classes in the org.apache.qpidity package to org.apache.qpidity.transport * factored out the network specific pieces into org.apache.qpidity.transport * moved the mina specific code to org.apache.qpidity.transport.network.mina * replaced the handler chain with Sender/Receiver chains that can deal with close request/closed notifications * moved from an anonymous struct[] to a real Header class * removed an excess copy from message data transmit git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@575474 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpidity/BodyHandler.java | 46 --- .../src/main/java/org/apache/qpidity/Channel.java | 220 -------------- .../java/org/apache/qpidity/ChannelDelegate.java | 51 ---- .../java/org/apache/qpidity/CommandDispatcher.java | 53 ---- .../main/java/org/apache/qpidity/Connection.java | 141 --------- .../org/apache/qpidity/ConnectionDelegate.java | 283 ----------------- .../java/org/apache/qpidity/ConsoleOutput.java | 13 +- .../java/org/apache/qpidity/ContentHandler.java | 45 --- .../main/java/org/apache/qpidity/Delegator.java | 35 --- .../src/main/java/org/apache/qpidity/Event.java | 44 --- .../src/main/java/org/apache/qpidity/Frame.java | 167 ---------- .../main/java/org/apache/qpidity/Functions.java | 80 ----- .../src/main/java/org/apache/qpidity/Future.java | 37 --- .../src/main/java/org/apache/qpidity/Handler.java | 35 --- .../src/main/java/org/apache/qpidity/Header.java | 30 -- .../java/org/apache/qpidity/HeaderHandler.java | 66 ---- .../main/java/org/apache/qpidity/InputHandler.java | 230 -------------- .../src/main/java/org/apache/qpidity/Method.java | 64 ---- .../java/org/apache/qpidity/MethodDecoder.java | 62 ---- .../java/org/apache/qpidity/MethodDispatcher.java | 47 --- .../java/org/apache/qpidity/MethodHandler.java | 42 --- .../main/java/org/apache/qpidity/MinaHandler.java | 160 ---------- .../java/org/apache/qpidity/ProtocolActions.java | 39 --- .../java/org/apache/qpidity/ProtocolError.java | 47 --- .../java/org/apache/qpidity/ProtocolHandler.java | 39 --- .../java/org/apache/qpidity/ProtocolHeader.java | 83 ----- .../src/main/java/org/apache/qpidity/Range.java | 85 ------ .../src/main/java/org/apache/qpidity/RangeSet.java | 123 -------- .../src/main/java/org/apache/qpidity/Result.java | 30 -- .../src/main/java/org/apache/qpidity/Segment.java | 74 ----- .../java/org/apache/qpidity/SegmentAssembler.java | 65 ---- .../src/main/java/org/apache/qpidity/Session.java | 317 ------------------- .../java/org/apache/qpidity/SessionDelegate.java | 67 ---- .../java/org/apache/qpidity/SessionResolver.java | 47 --- .../java/org/apache/qpidity/SliceIterator.java | 59 ---- .../src/main/java/org/apache/qpidity/Struct.java | 42 --- .../src/main/java/org/apache/qpidity/Stub.java | 109 ------- .../src/main/java/org/apache/qpidity/Switch.java | 58 ---- .../main/java/org/apache/qpidity/ToyBroker.java | 104 +++---- .../main/java/org/apache/qpidity/ToyClient.java | 21 +- .../main/java/org/apache/qpidity/TrackSwitch.java | 39 --- .../main/java/org/apache/qpidity/TypeSwitch.java | 39 --- .../main/java/org/apache/qpidity/api/Message.java | 49 ++- .../org/apache/qpidity/codec/AbstractDecoder.java | 6 +- .../org/apache/qpidity/codec/AbstractEncoder.java | 8 +- .../java/org/apache/qpidity/codec/Decoder.java | 4 +- .../java/org/apache/qpidity/codec/Encoder.java | 4 +- .../org/apache/qpidity/codec/SegmentEncoder.java | 175 ----------- .../apache/qpidity/transport/AbstractDelegate.java | 40 +++ .../java/org/apache/qpidity/transport/Channel.java | 182 +++++++++++ .../apache/qpidity/transport/ChannelDelegate.java | 51 ++++ .../org/apache/qpidity/transport/Connection.java | 104 +++++++ .../qpidity/transport/ConnectionDelegate.java | 309 +++++++++++++++++++ .../apache/qpidity/transport/ConnectionEvent.java | 51 ++++ .../java/org/apache/qpidity/transport/Data.java | 84 +++++ .../java/org/apache/qpidity/transport/Future.java | 37 +++ .../java/org/apache/qpidity/transport/Header.java | 76 +++++ .../java/org/apache/qpidity/transport/Method.java | 62 ++++ .../apache/qpidity/transport/ProtocolError.java | 72 +++++ .../apache/qpidity/transport/ProtocolEvent.java | 49 +++ .../apache/qpidity/transport/ProtocolHeader.java | 112 +++++++ .../java/org/apache/qpidity/transport/Range.java | 85 ++++++ .../org/apache/qpidity/transport/RangeSet.java | 123 ++++++++ .../org/apache/qpidity/transport/Receiver.java | 36 +++ .../java/org/apache/qpidity/transport/Result.java | 30 ++ .../java/org/apache/qpidity/transport/Sender.java | 36 +++ .../java/org/apache/qpidity/transport/Session.java | 337 +++++++++++++++++++++ .../apache/qpidity/transport/SessionDelegate.java | 65 ++++ .../java/org/apache/qpidity/transport/Struct.java | 42 +++ .../qpidity/transport/network/Assembler.java | 184 +++++++++++ .../qpidity/transport/network/Disassembler.java | 168 ++++++++++ .../apache/qpidity/transport/network/Frame.java | 179 +++++++++++ .../qpidity/transport/network/InputHandler.java | 239 +++++++++++++++ .../qpidity/transport/network/NetworkDelegate.java | 42 +++ .../qpidity/transport/network/NetworkEvent.java | 34 +++ .../qpidity/transport/network/OutputHandler.java | 99 ++++++ .../transport/network/mina/MinaHandler.java | 174 +++++++++++ .../qpidity/transport/network/mina/MinaSender.java | 54 ++++ .../apache/qpidity/transport/util/Functions.java | 80 +++++ .../qpidity/transport/util/SliceIterator.java | 59 ++++ 80 files changed, 3405 insertions(+), 3574 deletions(-) delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/BodyHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Channel.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Connection.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/ContentHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Delegator.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Event.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Frame.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Functions.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Future.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Handler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Header.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/InputHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Method.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/MethodHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Range.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Result.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Segment.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Session.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/SessionResolver.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Struct.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Stub.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/Switch.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java (limited to 'qpid/java/common/src') diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/BodyHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/BodyHandler.java deleted file mode 100644 index 26e654d3a7..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/BodyHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * BodyHandler - * - * @author Rafael H. Schloming - */ - -class BodyHandler implements Handler> -{ - - private final SessionDelegate delegate; - - public BodyHandler(SessionDelegate delegate) - { - this.delegate = delegate; - } - - public void handle(Event event) - { - System.out.println("got body frame: " + event.target); - delegate.data(event.context, event.target); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java b/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java deleted file mode 100644 index 3c734ba8f4..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.List; -import java.util.ArrayList; - -import org.apache.qpidity.codec.SegmentEncoder; -import org.apache.qpidity.codec.SizeEncoder; - -import static org.apache.qpidity.Frame.*; -import static org.apache.qpidity.Functions.*; - - -/** - * Channel - * - * @author Rafael H. Schloming - */ - -public class Channel extends Invoker implements Handler -{ - - final private Connection connection; - final private int channel; - final private TrackSwitch tracks; - final private Delegate delegate; - final private SessionDelegate sessionDelegate; - // session may be null - private Session session; - - private Method method = null; - private List data = null; - private int dataSize; - - public Channel(Connection connection, int channel, SessionDelegate delegate) - { - this.connection = connection; - this.channel = channel; - this.delegate = new ChannelDelegate(); - this.sessionDelegate = delegate; - - tracks = new TrackSwitch(); - tracks.map(L1, new MethodHandler - (getMajor(), getMinor(), connection.getConnectionDelegate())); - tracks.map(L2, new MethodHandler - (getMajor(), getMinor(), this.delegate)); - tracks.map(L3, new SessionResolver - (new MethodHandler - (getMajor(), getMinor(), delegate))); - tracks.map(L4, new SessionResolver - (new ContentHandler(getMajor(), getMinor(), delegate))); - } - - public byte getMajor() - { - return connection.getMajor(); - } - - public byte getMinor() - { - return connection.getMinor(); - } - - public int getEncodedChannel() { - return channel; - } - - public Session getSession() - { - return session; - } - - void setSession(Session session) - { - this.session = session; - } - - public void handle(Frame frame) - { - tracks.handle(new Event(this, frame)); - } - - private SegmentEncoder newEncoder(byte flags, byte track, byte type, int size) - { - return new SegmentEncoder(getMajor(), - getMinor(), - connection.getOutputHandler(), - connection.getMaxFrame(), - (byte) (flags | VERSION), - track, - type, - channel, - size); - } - - public void method(Method m) - { - SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor()); - sizer.writeLong(m.getEncodedType()); - m.write(sizer, getMajor(), getMinor()); - sizer.flush(); - int size = sizer.getSize(); - - byte flags = FIRST_SEG; - - if (!m.hasPayload()) - { - flags |= LAST_SEG; - } - - SegmentEncoder enc = newEncoder(flags, m.getEncodedTrack(), - m.getSegmentType(), size); - enc.writeLong(m.getEncodedType()); - m.write(enc, getMajor(), getMinor()); - enc.flush(); - - if (m.hasPayload()) - { - method = m; - } - - if (m.getEncodedTrack() != Frame.L4) - { - System.out.println("sent control " + m.getClass().getName()); - } - } - - public void headers(Struct ... headers) - { - if (method == null) - { - throw new IllegalStateException("cannot write headers without method"); - } - - SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor()); - for (Struct hdr : headers) - { - sizer.writeLongStruct(hdr); - } - - SegmentEncoder enc = newEncoder((byte) 0x0, - method.getEncodedTrack(), - HEADER, - sizer.getSize()); - for (Struct hdr : headers) - { - enc.writeLongStruct(hdr); - enc.flush(); - System.out.println("sent " + hdr); - } - } - - public void data(ByteBuffer buf) - { - if (data == null) - { - data = new ArrayList(); - dataSize = 0; - } - data.add(buf); - dataSize += buf.remaining(); - } - - public void data(String str) - { - data(str.getBytes()); - } - - public void data(byte[] bytes) - { - data(ByteBuffer.wrap(bytes)); - } - - public void end() - { - byte flags = LAST_SEG; - SegmentEncoder enc = newEncoder(flags, method.getEncodedTrack(), - BODY, dataSize); - for (ByteBuffer buf : data) - { - enc.put(buf); - System.out.println("sent " + str(buf)); - } - enc.flush(); - data = null; - dataSize = 0; - } - - protected void invoke(Method m) - { - method(m); - } - - protected Future invoke(Method m, Class cls) - { - throw new UnsupportedOperationException(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java deleted file mode 100644 index d486868621..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.util.UUID; - - -/** - * ChannelDelegate - * - * @author Rafael H. Schloming - */ - -class ChannelDelegate extends Delegate -{ - - public @Override void sessionOpen(Channel channel, SessionOpen open) - { - Session ssn = new Session(); - ssn.attach(channel); - long lifetime = open.getDetachedLifetime(); - System.out.println("Session Opened lifetime = " + lifetime); - ssn.sessionAttached(UUID.randomUUID(), lifetime); - } - - public @Override void sessionAttached(Channel channel, - SessionAttached attached) - { - System.out.println("Session attached: " + attached.getSessionId() + ", " + - attached.getDetachedLifetime()); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java deleted file mode 100644 index c6190dc3d7..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * CommandDispatcher - * - * @author Rafael H. Schloming - */ - -class CommandDispatcher implements Handler> -{ - - private final Delegate delegate; - - public CommandDispatcher(Delegate delegate) - { - this.delegate = delegate; - } - - public void handle(Event event) - { - Session ssn = event.context; - Method method = event.target; - method.setId(ssn.nextCommandId()); - System.out.println("\n Delegating " + method.getClass().getName() + "[" + method.getId() + "] to " + delegate.getClass().getName() + "\n"); - method.delegate(ssn, delegate); - if (!method.hasPayload()) - { - ssn.processed(method); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java b/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java deleted file mode 100644 index b70c8fae18..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.util.HashMap; -import java.util.Map; - -import java.nio.ByteBuffer; - - -/** - * Connection - * - * @author Rafael H. Schloming - * - * @todo the channels map should probably be replaced with something - * more efficient, e.g. an array or a map implementation that can use - * short instead of Short - */ - -// RA making this public until we sort out the package issues -public class Connection implements ProtocolActions -{ - - final private Handler input; - final private Handler output; - final private ConnectionDelegate delegate; - - final private Map channels = new HashMap(); - // XXX: hardcoded versions - private ProtocolHeader header = new ProtocolHeader((byte) 1, (byte) 0, (byte) 10); - // XXX - private int maxFrame = 64*1024; - - public Connection(Handler output, - ConnectionDelegate delegate, - InputHandler.State state) - { - this.input = new InputHandler(this, state); - this.output = output; - this.delegate = delegate; - } - - public ConnectionDelegate getConnectionDelegate() - { - return delegate; - } - - public Connection(Handler output, - ConnectionDelegate delegate) - { - this(output, delegate, InputHandler.State.PROTO_HDR); - } - - public Handler getInputHandler() - { - return input; - } - - public Handler getOutputHandler() - { - return output; - } - - public ProtocolHeader getHeader() - { - return header; - } - - public byte getMajor() - { - return header.getMajor(); - } - - public byte getMinor() - { - return header.getMinor(); - } - - public int getMaxFrame() - { - return maxFrame; - } - - public void init(ProtocolHeader hdr) - { - System.out.println(header); - if (hdr.getMajor() != header.getMajor() && - hdr.getMinor() != header.getMinor()) - { - output.handle(header.toByteBuffer()); - // XXX: how do we close the connection? - } - - // not sure if this is the right place - System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); - - getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); - } - - public Channel getChannel(int number) - { - Channel channel = channels.get(number); - if (channel == null) - { - channel = new Channel(this, number, delegate.getSessionDelegate()); - channels.put(number, channel); - } - return channel; - } - - public void frame(Frame frame) - { - Channel channel = getChannel(frame.getChannel()); - channel.handle(frame); - } - - public void error(ProtocolError error) - { - throw new RuntimeException(error.getMessage()); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java deleted file mode 100644 index ff89567cee..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.io.UnsupportedEncodingException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - - -/** - * ConnectionDelegate - * - * @author Rafael H. Schloming - */ - -/** - * Currently only implemented client specific methods - * the server specific methods are dummy impls for testing - * - * the connectionClose is kind of different for both sides - */ -public abstract class ConnectionDelegate extends Delegate -{ - private String _username = "guest"; - private String _password = "guest";; - private String _mechanism; - private String _virtualHost; - private SaslClient saslClient; - private SaslServer saslServer; - private String _locale = "utf8"; - private int maxFrame = 64*1024; - private Condition _negotiationComplete; - private Lock _negotiationCompleteLock; - - public abstract SessionDelegate getSessionDelegate(); - - public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete) - { - _negotiationComplete = negotiationComplete; - _negotiationCompleteLock = negotiationCompleteLock; - } - - // ---------------------------------------------- - // Client side - //----------------------------------------------- - @Override public void connectionStart(Channel context, ConnectionStart struct) - { - System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n"); - System.out.println("The broker has sent connection-start"); - - String mechanism = null; - String response = null; - try - { - mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); - saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, - SecurityHelper.createCallbackHandler(mechanism,_username,_password )); - response = new String(saslClient.evaluateChallenge(new byte[0]),_locale); - } - catch (UnsupportedEncodingException e) - { - // need error handling - } - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - } - - Map props = new HashMap(); - context.connectionStartOk(props, mechanism, response, _locale); - } - - @Override public void connectionSecure(Channel context, ConnectionSecure struct) - { - System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge()); - - try - { - String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); - context.connectionSecureOk(response); - } - catch (UnsupportedEncodingException e) - { - // need error handling - } - catch (SaslException e) - { - // need error handling - } - } - - @Override public void connectionTune(Channel context, ConnectionTune struct) - { - System.out.println("The broker has sent connection-tune " + struct.toString()); - - // should update the channel max given by the broker. - context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); - context.connectionOpen(_virtualHost, null, Option.INSIST); - } - - - @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) - { - String knownHosts = struct.getKnownHosts(); - System.out.println("The broker has opened the connection for use"); - System.out.println("The broker supplied the following hosts for failover " + knownHosts); - if(_negotiationCompleteLock != null) - { - _negotiationCompleteLock.lock(); - try - { - _negotiationComplete.signalAll(); - } - finally - { - _negotiationCompleteLock.unlock(); - } - } - System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n"); - } - - public void connectionRedirect(Channel context, ConnectionRedirect struct) - { - // not going to bother at the moment - } - - // ---------------------------------------------- - // Server side - //----------------------------------------------- - @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) - { - //set the client side locale on the server side - _locale = struct.getLocale(); - _mechanism = struct.getMechanism(); - - System.out.println("The client has sent connection-start-ok"); - - //try - //{ - //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); - byte[] challenge = null; - if ( challenge == null) - { - System.out.println("Authentication sucessfull"); - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); - } - else - { - System.out.println("Authentication failed"); - try - { - context.connectionSecure(new String(challenge,_locale)); - } - catch(Exception e) - { - - } - } - - - /*} - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - }*/ - } - - @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct) - { - System.out.println("The client has excepted the tune params"); - } - - @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) - { - System.out.println("The client has sent connection-secure-ok"); - try - { - saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); - if ( challenge == null) - { - System.out.println("Authentication sucessfull"); - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); - } - else - { - System.out.println("Authentication failed"); - try - { - context.connectionSecure(new String(challenge,_locale)); - } - catch(Exception e) - { - - } - } - - - } - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - } - } - - - @Override public void connectionOpen(Channel context, ConnectionOpen struct) - { - String hosts = "amqp:1223243232325"; - System.out.println("The client has sent connection-open"); - context.connectionOpenOk(hosts); - System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n"); - } - - - public String getPassword() - { - return _password; - } - - public void setPassword(String password) - { - _password = password; - } - - public String getUsername() - { - return _username; - } - - public void setUsername(String username) - { - _username = username; - } - - public String getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(String host) - { - _virtualHost = host; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java index fa35e9332f..4e05aa574c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java @@ -22,7 +22,9 @@ package org.apache.qpidity; import java.nio.ByteBuffer; -import static org.apache.qpidity.Functions.*; +import org.apache.qpidity.transport.Sender; + +import static org.apache.qpidity.transport.util.Functions.*; /** @@ -31,12 +33,17 @@ import static org.apache.qpidity.Functions.*; * @author Rafael H. Schloming */ -class ConsoleOutput implements Handler +public class ConsoleOutput implements Sender { - public void handle(ByteBuffer buf) + public void send(ByteBuffer buf) { System.out.println(str(buf)); } + public void close() + { + System.out.println("CLOSED"); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/ContentHandler.java deleted file mode 100644 index b54e28c85e..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ContentHandler.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * ContentHandler is a stateful handler that aggregates and dispatches - * method and header frames, and passes body frames through to another - * handler. - * - * @author Rafael H. Schloming - */ - -class ContentHandler extends TypeSwitch -{ - - public ContentHandler(byte major, byte minor, SessionDelegate delegate) - { - CommandDispatcher disp = new CommandDispatcher(delegate); - MethodDecoder dec = new MethodDecoder(major, minor, disp); - HeaderHandler hh = new HeaderHandler(major, minor, delegate); - map(Frame.METHOD, new SegmentAssembler(dec)); - map(Frame.HEADER, new SegmentAssembler(hh)); - map(Frame.BODY, new BodyHandler(delegate)); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Delegator.java b/qpid/java/common/src/main/java/org/apache/qpidity/Delegator.java deleted file mode 100644 index 00e769e0f2..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Delegator.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * Delegator - * - * @author Rafael H. Schloming - */ - -interface Delegator -{ - - void delegate(C context, Delegate delegate); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Event.java b/qpid/java/common/src/main/java/org/apache/qpidity/Event.java deleted file mode 100644 index ae210f4c20..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Event.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * Events are a common class of thing to handle. An event has a - * context and a target. This division permits the same target - * instance to be used in a variety of contexts. - * - * @author Rafael H. Schloming - */ - -public class Event -{ - - C context; - T target; - - public Event(C context, T target) - { - this.context = context; - this.target = target; - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java b/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java deleted file mode 100644 index 89e7579cb3..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.List; -import java.util.Iterator; - -import static org.apache.qpidity.Functions.*; - - -/** - * Frame - * - * @author Rafael H. Schloming - */ - -// RA: changed it to public until we sort the package issues -public class Frame implements Iterable -{ - public static final int HEADER_SIZE = 12; - - // XXX: enums? - public static final byte L1 = 0; - public static final byte L2 = 1; - public static final byte L3 = 2; - public static final byte L4 = 3; - - public static final byte METHOD = 1; - public static final byte HEADER = 2; - public static final byte BODY = 3; - - public static final byte RESERVED = 0x0; - - public static final byte VERSION = 0x0; - - public static final byte FIRST_SEG = 0x8; - public static final byte LAST_SEG = 0x4; - public static final byte FIRST_FRAME = 0x2; - public static final byte LAST_FRAME = 0x1; - - final private byte flags; - final private byte type; - final private byte track; - final private int channel; - final private List fragments; - private int size; - - public Frame(byte flags, byte type, byte track, int channel) - { - this.flags = flags; - this.type = type; - this.track = track; - this.channel = channel; - this.size = 0; - this.fragments = new ArrayList(); - } - - public void addFragment(ByteBuffer fragment) - { - fragments.add(fragment); - size += fragment.remaining(); - } - - public int getChannel() - { - return channel; - } - - public int getSize() - { - return size; - } - - public byte getType() - { - return type; - } - - public byte getTrack() - { - return track; - } - - private boolean flag(byte mask) - { - return (flags & mask) != 0; - } - - public boolean isFirstSegment() - { - return flag(FIRST_SEG); - } - - public boolean isLastSegment() - { - return flag(LAST_SEG); - } - - public boolean isFirstFrame() - { - return flag(FIRST_FRAME); - } - - public boolean isLastFrame() - { - return flag(LAST_FRAME); - } - - public Iterator getFragments() - { - return new SliceIterator(fragments.iterator()); - } - - public Iterator iterator() - { - return getFragments(); - } - - public String toString() - { - StringBuilder str = new StringBuilder(); - str.append(String.format - ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(), - getTrack(), getType(), - isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, - isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); - - boolean first = true; - for (ByteBuffer buf : this) - { - if (first) - { - first = false; - } - else - { - str.append(" | "); - } - - str.append(str(buf)); - } - - return str.toString(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java b/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java deleted file mode 100644 index 1008966b01..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - - -/** - * Functions - * - * @author Rafael H. Schloming - */ - -public class Functions -{ - - public static final short unsigned(byte b) - { - return (short) ((0x100 + b) & 0xFF); - } - - public static final int unsigned(short s) - { - return (0x10000 + s) & 0xFFFF; - } - - public static final long unsigned(int i) - { - return (0x1000000000L + i) & 0xFFFFFFFFL; - } - - public static final byte lsb(int i) - { - return (byte) (0xFF & i); - } - - public static final byte lsb(long l) - { - return (byte) (0xFF & l); - } - - public static final String str(ByteBuffer buf) - { - return str(buf, buf.limit()); - } - - public static final String str(ByteBuffer buf, int limit) - { - StringBuilder str = new StringBuilder(); - for (int i = 0; i < limit; i++) - { - if (i > 0 && i % 2 == 0) - { - str.append(" "); - } - str.append(String.format("%02x", buf.get(i))); - } - - return str.toString(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Future.java b/qpid/java/common/src/main/java/org/apache/qpidity/Future.java deleted file mode 100644 index 8902446e95..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Future.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * Future - * - * @author Rafael H. Schloming - */ - -public interface Future -{ - - T get(); - - boolean isDone(); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Handler.java b/qpid/java/common/src/main/java/org/apache/qpidity/Handler.java deleted file mode 100644 index 0ff949cc1d..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Handler.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -/** - * Handler is a basic interface used throughout this library for - * callbacks, listeners, event handlers, etc. - * - * @author Rafael H. Schloming - */ - -public interface Handler -{ - - void handle(E event); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Header.java b/qpid/java/common/src/main/java/org/apache/qpidity/Header.java deleted file mode 100644 index 9b6373df19..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Header.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * Header - * - * @author Rafael H. Schloming - */ - -public abstract class Header extends Struct {} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java deleted file mode 100644 index 8737932712..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.ArrayList; -import java.util.Iterator; - -import org.apache.qpidity.codec.FragmentDecoder; - - -/** - * HeaderHandler - * - * @author Rafael H. Schloming - */ - -class HeaderHandler implements Handler> -{ - - private static final Struct[] EMPTY_STRUCT_ARRAY = {}; - - private final byte major; - private final byte minor; - private final SessionDelegate delegate; - - public HeaderHandler(byte major, byte minor, SessionDelegate delegate) - { - this.major = major; - this.minor = minor; - this.delegate = delegate; - } - - public void handle(Event event) - { - System.out.println("got header segment:\n " + event.target); - Iterator fragments = event.target.getFragments(); - FragmentDecoder dec = new FragmentDecoder(major, minor, fragments); - ArrayList headers = new ArrayList(); - while (dec.hasRemaining()) - { - headers.add(dec.readLongStruct()); - } - delegate.headers(event.context, headers.toArray(EMPTY_STRUCT_ARRAY)); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/InputHandler.java deleted file mode 100644 index 78cb3d1b60..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/InputHandler.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import static org.apache.qpidity.InputHandler.State.*; - - -/** - * InputHandler - * - * @author Rafael H. Schloming - */ - -class InputHandler implements Handler -{ - - public enum State - { - PROTO_HDR, - PROTO_HDR_M, - PROTO_HDR_Q, - PROTO_HDR_P, - PROTO_HDR_CLASS, - PROTO_HDR_INSTANCE, - PROTO_HDR_MAJOR, - PROTO_HDR_MINOR, - FRAME_HDR, - FRAME_HDR_TYPE, - FRAME_HDR_SIZE1, - FRAME_HDR_SIZE2, - FRAME_HDR_RSVD1, - FRAME_HDR_TRACK, - FRAME_HDR_CH1, - FRAME_HDR_CH2, - FRAME_HDR_RSVD2, - FRAME_HDR_RSVD3, - FRAME_HDR_RSVD4, - FRAME_HDR_RSVD5, - FRAME_PAYLOAD, - FRAME_FRAGMENT, - ERROR; - } - - private final ProtocolActions actions; - private State state; - - private byte instance; - private byte major; - private byte minor; - - private byte flags; - private byte type; - private byte track; - private int channel; - private int size; - private Frame frame; - - public InputHandler(ProtocolActions actions, State state) - { - this.actions = actions; - this.state = state; - } - - public InputHandler(ProtocolActions actions) - { - this(actions, PROTO_HDR); - } - - private void init() - { - actions.init(new ProtocolHeader(instance, major, minor)); - } - - private void frame() - { - assert size == frame.getSize(); - actions.frame(frame); - frame = null; - } - - private void error(String fmt, Object ... args) - { - actions.error(new ProtocolError(fmt, args)); - } - - public void handle(ByteBuffer buf) - { - while (buf.hasRemaining()) - { - state = next(buf); - } - } - - private State next(ByteBuffer buf) - { - switch (state) { - case PROTO_HDR: - return expect(buf, 'A', PROTO_HDR_M); - case PROTO_HDR_M: - return expect(buf, 'M', PROTO_HDR_Q); - case PROTO_HDR_Q: - return expect(buf, 'Q', PROTO_HDR_P); - case PROTO_HDR_P: - return expect(buf, 'P', PROTO_HDR_CLASS); - case PROTO_HDR_CLASS: - return expect(buf, 1, PROTO_HDR_INSTANCE); - case PROTO_HDR_INSTANCE: - instance = buf.get(); - return PROTO_HDR_MAJOR; - case PROTO_HDR_MAJOR: - major = buf.get(); - return PROTO_HDR_MINOR; - case PROTO_HDR_MINOR: - minor = buf.get(); - init(); - return FRAME_HDR; - case FRAME_HDR: - flags = buf.get(); - return FRAME_HDR_TYPE; - case FRAME_HDR_TYPE: - type = buf.get(); - return FRAME_HDR_SIZE1; - case FRAME_HDR_SIZE1: - size = buf.get() << 8; - return FRAME_HDR_SIZE2; - case FRAME_HDR_SIZE2: - size += buf.get(); - size -= 12; - return FRAME_HDR_RSVD1; - case FRAME_HDR_RSVD1: - return expect(buf, 0, FRAME_HDR_TRACK); - case FRAME_HDR_TRACK: - byte b = buf.get(); - if ((b & 0xF0) != 0) { - error("non-zero reserved bits in upper nibble of " + - "frame header byte 5: '%x'", b); - return ERROR; - } else { - track = (byte) (b & 0xF); - return FRAME_HDR_CH1; - } - case FRAME_HDR_CH1: - channel = buf.get() << 8; - return FRAME_HDR_CH2; - case FRAME_HDR_CH2: - channel += buf.get(); - return FRAME_HDR_RSVD2; - case FRAME_HDR_RSVD2: - return expect(buf, 0, FRAME_HDR_RSVD3); - case FRAME_HDR_RSVD3: - return expect(buf, 0, FRAME_HDR_RSVD4); - case FRAME_HDR_RSVD4: - return expect(buf, 0, FRAME_HDR_RSVD5); - case FRAME_HDR_RSVD5: - return expect(buf, 0, FRAME_PAYLOAD); - case FRAME_PAYLOAD: - frame = new Frame(flags, type, track, channel); - if (size > buf.remaining()) { - frame.addFragment(buf.slice()); - buf.position(buf.limit()); - return FRAME_FRAGMENT; - } else { - ByteBuffer payload = buf.slice(); - payload.limit(size); - buf.position(buf.position() + size); - frame.addFragment(payload); - frame(); - return FRAME_HDR; - } - case FRAME_FRAGMENT: - int delta = size - frame.getSize(); - if (delta > buf.remaining()) { - frame.addFragment(buf.slice()); - buf.position(buf.limit()); - return FRAME_FRAGMENT; - } else { - ByteBuffer fragment = buf.slice(); - fragment.limit(delta); - buf.position(buf.position() + delta); - frame.addFragment(fragment); - frame(); - return FRAME_HDR; - } - default: - throw new IllegalStateException(); - } - } - - private State expect(ByteBuffer buf, int expected, State next) - { - return expect(buf, (byte) expected, next); - } - - private State expect(ByteBuffer buf, char expected, State next) - { - return expect(buf, (byte) expected, next); - } - - private State expect(ByteBuffer buf, byte expected, State next) - { - byte b = buf.get(); - if (b == expected) { - return next; - } else { - error("expecting '%x', got '%x'", expected, b); - return ERROR; - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Method.java b/qpid/java/common/src/main/java/org/apache/qpidity/Method.java deleted file mode 100644 index 43865d36aa..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Method.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * Method - * - * @author Rafael H. Schloming - */ - -public abstract class Method extends Struct -{ - - public static final Method create(int type) - { - // XXX: should generate separate factories for separate - // namespaces - return (Method) Struct.create(type); - } - - // XXX: command subclass? - private long id; - - public final long getId() - { - return id; - } - - void setId(long id) - { - this.id = id; - } - - public abstract boolean hasPayload(); - - public abstract byte getEncodedTrack(); - - // XXX: do we need a segment base type? - public byte getSegmentType() - { - // XXX - return Frame.METHOD; - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java deleted file mode 100644 index b01f067381..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.Iterator; - -import org.apache.qpidity.codec.Decoder; -import org.apache.qpidity.codec.FragmentDecoder; - - -/** - * MethodDecoder - * - * @author Rafael H. Schloming - */ - -class MethodDecoder implements Handler> -{ - - private final byte major; - private final byte minor; - private final Handler> handler; - - public MethodDecoder(byte major, byte minor, Handler> handler) - { - this.major = major; - this.minor = minor; - this.handler = handler; - } - - public void handle(Event event) - { - //System.out.println("got method segment:\n " + event.target); - Iterator fragments = event.target.getFragments(); - Decoder dec = new FragmentDecoder(major, minor, fragments); - int type = (int) dec.readLong(); - Method method = Method.create(type); - method.read(dec, major, minor); - handler.handle(new Event(event.context, method)); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java deleted file mode 100644 index f5a040166e..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * A MethodDispatcher parses and dispatches a method segment. - * - * @author Rafael H. Schloming - */ - -class MethodDispatcher implements Handler> -{ - - final private Delegate delegate; - - public MethodDispatcher(Delegate delegate) - { - this.delegate = delegate; - } - - public void handle(Event event) - { - Method method = event.target; - System.out.println("\nDelegating " + method.getClass().getName() + " to " + delegate.getClass().getName() + "\n"); - method.delegate(event.context, delegate); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/MethodHandler.java deleted file mode 100644 index 86dac241a2..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/MethodHandler.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * MethodHandler is a stateful handler that aggregates frames into - * method segments and dispatches the resulting method. It does not - * accept any segment type other than Frame.METHOD. - * - * @author Rafael H. Schloming - */ - -class MethodHandler extends TypeSwitch -{ - - public MethodHandler(byte major, byte minor, Delegate delegate) - { - MethodDispatcher disp = new MethodDispatcher(delegate); - MethodDecoder dec = new MethodDecoder(major, minor, disp); - map(Frame.METHOD, new SegmentAssembler(dec)); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java deleted file mode 100644 index f255b56d0b..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; - -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketConnector; - - -/** - * MinaHandler - * - * @author Rafael H. Schloming - */ -//RA making this public until we sort out the package issues -public class MinaHandler implements IoHandler -{ - - private final ConnectionDelegate delegate; - private final InputHandler.State state; - - public MinaHandler(ConnectionDelegate delegate, InputHandler.State state) - { - this.delegate = delegate; - this.state = state; - } - - public void messageReceived(IoSession ssn, Object obj) - { - Connection conn = (Connection) ssn.getAttachment(); - ByteBuffer buf = (ByteBuffer) obj; - conn.getInputHandler().handle(buf.buf()); - } - - public void messageSent(IoSession ssn, Object obj) - { - // do nothing - } - - public void exceptionCaught(IoSession ssn, Throwable e) - { - e.printStackTrace(); - } - - public void sessionCreated(final IoSession ssn) - { - // do nothing - } - - public void sessionOpened(final IoSession ssn) - { - System.out.println("opened " + ssn); - Connection conn = new Connection(new Handler() - { - public void handle(java.nio.ByteBuffer buf) - { - ssn.write(ByteBuffer.wrap(buf)); - } - }, - delegate, - state); - ssn.setAttachment(conn); - // XXX - synchronized (ssn) - { - ssn.notifyAll(); - } - } - - public void sessionClosed(IoSession ssn) - { - System.out.println("closed " + ssn); - ssn.setAttachment(null); - } - - public void sessionIdle(IoSession ssn, IdleStatus status) - { - System.out.println(status); - } - - public static final void main(String[] args) throws IOException - { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - if (args[0].equals("accept")) { - accept("0.0.0.0", 5672, SessionDelegateStub.source()); - } else if (args[0].equals("connect")) { - connect("0.0.0.0", 5672, SessionDelegateStub.source()); - } - } - - public static final void accept(String host, int port, - ConnectionDelegate delegate) - throws IOException - { - IoAcceptor acceptor = new SocketAcceptor(); - acceptor.bind(new InetSocketAddress(host, port), - new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); - - } - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate) - { - MinaHandler handler = new MinaHandler(delegate, - InputHandler.State.FRAME_HDR); - SocketAddress addr = new InetSocketAddress(host, port); - IoConnector connector = new SocketConnector(); - ConnectFuture cf = connector.connect(addr, handler); - cf.join(); - IoSession ssn = cf.getSession(); - // XXX - synchronized (ssn) - { - while (ssn.getAttachment() == null) - { - try - { - ssn.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - Connection conn = (Connection) ssn.getAttachment(); - return conn; - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java b/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java deleted file mode 100644 index c658320989..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * ProtocolActions - * - * @author Rafael H. Schloming - */ - -interface ProtocolActions -{ - - void init(ProtocolHeader header); - - void frame(Frame frame); - - void error(ProtocolError error); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java b/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java deleted file mode 100644 index a4a83fad35..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * ProtocolError - * - * @author Rafael H. Schloming - */ - -class ProtocolError -{ - - private final String format; - private final Object[] args; - - public ProtocolError(String format, Object ... args) - { - this.format = format; - this.args = args; - } - - public String getMessage() - { - return String.format(format, args); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java deleted file mode 100644 index 526ef50211..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * ProtocolHandler - * - * @author Rafael H. Schloming - */ - -interface ProtocolHandler -{ - - void init(ProtocolHeader header); - - void frame(Frame frame); - - void error(ProtocolError error); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java deleted file mode 100644 index 140d5ecbe3..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - - -/** - * ProtocolHeader - * - * @author Rafael H. Schloming - */ - -//RA making this public until we sort out the package issues - -public class ProtocolHeader -{ - - private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; - private static final byte CLASS = 1; - - final private byte instance; - final private byte major; - final private byte minor; - - public ProtocolHeader(byte instance, byte major, byte minor) - { - this.instance = instance; - this.major = major; - this.minor = minor; - } - - public byte getInstance() - { - return instance; - } - - public byte getMajor() - { - return major; - } - - public byte getMinor() - { - return minor; - } - - public ByteBuffer toByteBuffer() - { - ByteBuffer buf = ByteBuffer.allocate(8); - buf.put(AMQP); - buf.put(CLASS); - buf.put(instance); - buf.put(major); - buf.put(minor); - buf.flip(); - return buf; - } - - public String toString() - { - return String.format("AMQP.%d %d-%d", instance, major, minor); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Range.java b/qpid/java/common/src/main/java/org/apache/qpidity/Range.java deleted file mode 100644 index 9da7112a6d..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Range.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import static java.lang.Math.*; - - -/** - * Range - * - * @author Rafael H. Schloming - */ - -public class Range -{ - private final long lower; - private final long upper; - - public Range(long lower, long upper) - { - this.lower = lower; - this.upper = upper; - } - - public long getLower() - { - return lower; - } - - public long getUpper() - { - return upper; - } - - public boolean includes(long value) - { - return lower <= value && value <= upper; - } - - public boolean includes(Range range) - { - return includes(range.lower) && includes(range.upper); - } - - public boolean intersects(Range range) - { - return (includes(range.lower) || includes(range.upper) || - range.includes(lower) || range.includes(upper)); - } - - public boolean touches(Range range) - { - return (includes(range.upper + 1) || includes(range.lower - 1) || - range.includes(upper + 1) || range.includes(lower - 1)); - } - - public Range span(Range range) - { - return new Range(min(lower, range.lower), max(upper, range.upper)); - } - - public String toString() - { - return "[" + lower + ", " + upper + "]"; - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java deleted file mode 100644 index 2e79ee8a72..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.util.Collection; -import java.util.Iterator; -import java.util.ListIterator; -import java.util.LinkedList; - - -/** - * RangeSet - * - * @author Rafael H. Schloming - */ - -public class RangeSet implements Iterable -{ - - private LinkedList ranges = new LinkedList(); - - public int size() - { - return ranges.size(); - } - - public Iterator iterator() - { - return ranges.iterator(); - } - - public boolean includes(Range range) - { - for (Range r : this) - { - if (r.includes(range)) - { - return true; - } - } - - return false; - } - - public void add(Range range) - { - ListIterator it = ranges.listIterator(); - - while (it.hasNext()) - { - Range next = it.next(); - if (range.touches(next)) - { - it.remove(); - range = range.span(next); - } - else if (range.getUpper() < next.getLower()) - { - it.previous(); - it.add(range); - return; - } - } - - it.add(range); - } - - public void add(long lower, long upper) - { - add(new Range(lower, upper)); - } - - public void add(long value) - { - add(value, value); - } - - public void clear() - { - ranges.clear(); - } - - public String toString() - { - return ranges.toString(); - } - - public static final void main(String[] args) - { - RangeSet ranges = new RangeSet(); - ranges.add(5, 10); - System.out.println(ranges); - ranges.add(15, 20); - System.out.println(ranges); - ranges.add(23, 25); - System.out.println(ranges); - ranges.add(12, 14); - System.out.println(ranges); - ranges.add(0, 1); - System.out.println(ranges); - ranges.add(3, 11); - System.out.println(ranges); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Result.java b/qpid/java/common/src/main/java/org/apache/qpidity/Result.java deleted file mode 100644 index 7fe6c869a4..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Result.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * Result - * - * @author Rafael H. Schloming - */ - -public abstract class Result extends Struct {} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Segment.java b/qpid/java/common/src/main/java/org/apache/qpidity/Segment.java deleted file mode 100644 index ee9f60fad8..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Segment.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; - -import java.nio.ByteBuffer; - -import static org.apache.qpidity.Functions.*; - - -/** - * Segment - * - * @author Rafael H. Schloming - */ - -class Segment implements Iterable -{ - - private final Collection fragments = new ArrayList(); - - public void add(ByteBuffer fragment) - { - fragments.add(fragment); - } - - public Iterator getFragments() - { - return new SliceIterator(fragments.iterator()); - } - - public Iterator iterator() - { - return getFragments(); - } - - public String toString() - { - StringBuilder str = new StringBuilder(); - String sep = " | "; - - for (ByteBuffer buf : this) - { - str.append(str(buf)); - str.append(sep); - } - - str.setLength(str.length() - sep.length()); - - return str.toString(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java b/qpid/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java deleted file mode 100644 index 0fb90a2f53..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - - -/** - * SegmentAssembler is a stateful handler that aggregates Frame events - * into Segment events. This should only be used where it is necessary - * to assemble a Segment before processing, e.g. for Method and Header - * segments. - * - * @author Rafael H. Schloming - */ - -class SegmentAssembler implements Handler> -{ - - final private Handler> handler; - private Segment segment; - - public SegmentAssembler(Handler> handler) - { - this.handler = handler; - } - - public void handle(Event event) - { - Frame frame = event.target; - if (frame.isFirstFrame()) - { - segment = new Segment(); - } - - for (ByteBuffer fragment : frame) - { - segment.add(fragment); - } - - if (frame.isLastFrame()) - { - handler.handle(new Event(event.context, segment)); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/Session.java deleted file mode 100644 index 6f0bd5c757..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Session.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - - -/** - * Session - * - * @author Rafael H. Schloming - */ - -public class Session extends Invoker -{ - - // channel may be null - Channel channel; - - // incoming command count - private long commandsIn = 0; - // completed incoming commands - private final RangeSet processed = new RangeSet(); - private Range syncPoint = null; - - // outgoing command count - private long commandsOut = 0; - private Map commands = new HashMap(); - private long mark = 0; - - - public Map getOutstandingCommands() - { - return commands; - } - - public long getCommandsOut() - { - return commandsOut; - } - - public long getCommandsIn() - { - return commandsIn; - } - - public long nextCommandId() - { - return commandsIn++; - } - - public RangeSet getProcessed() - { - return processed; - } - - public void processed(Method command) - { - processed(command.getId()); - } - - public void processed(long command) - { - processed(new Range(command, command)); - } - - public void processed(long lower, long upper) - { - processed(new Range(lower, upper)); - } - - public void processed(Range range) - { - boolean flush; - synchronized (processed) - { - processed.add(range); - flush = syncPoint != null && processed.includes(syncPoint); - } - if (flush) - { - flushProcessed(); - } - } - - void flushProcessed() - { - for (Range r: processed) - { - System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); - } - System.out.println("Notifying peer with execution complete"); - executionComplete(0, processed); - } - - void syncPoint() - { - System.out.println("===========Request received to sync=========================="); - - Range range = new Range(0, getCommandsIn() - 1); - boolean flush; - synchronized (processed) - { - flush = processed.includes(range); - if (!flush) - { - syncPoint = range; - } - } - if (flush) - { - flushProcessed(); - } - } - - public void attach(Channel channel) - { - this.channel = channel; - channel.setSession(this); - } - - public Method getCommand(long id) - { - synchronized (commands) - { - return commands.get(id); - } - } - - void complete(long lower, long upper) - { - synchronized (commands) - { - for (long id = lower; id <= upper; id++) - { - commands.remove(id); - } - - if (commands.isEmpty()) - { - System.out.println("\n All outstanding commands are completed !!!! \n"); - commands.notifyAll(); - } - } - } - - void complete(long mark) - { - complete(this.mark, mark); - this.mark = mark; - } - - protected void invoke(Method m) - { - if (m.getEncodedTrack() == Frame.L4) - { - synchronized (commands) - { - System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut); - commands.put(commandsOut++, m); - } - } - channel.method(m); - } - - public void headers(Struct ... headers) - { - channel.headers(headers); - } - - public void data(ByteBuffer buf) - { - channel.data(buf); - } - - public void data(String str) - { - channel.data(str); - } - - public void data(byte[] bytes) - { - channel.data(bytes); - } - - public void endData() - { - channel.end(); - } - - public void sync() - { - System.out.println("calling sync()"); - synchronized (commands) - { - if (!commands.isEmpty()) - { - executionSync(); - } - - while (!commands.isEmpty()) - { - try { - System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); - commands.wait(); - System.out.println("\n============sync() got notified=========================================\n"); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - } - - private Map> results = - new HashMap>(); - - void result(long command, Struct result) - { - ResultFuture future; - synchronized (results) - { - future = results.remove(command); - } - future.set(result); - } - protected Future invoke(Method m, Class klass) - { - long command = commandsOut; - ResultFuture future = new ResultFuture(klass); - synchronized (results) - { - results.put(command, future); - } - invoke(m); - return future; - } - - private class ResultFuture implements Future - { - - private final Class klass; - private T result; - - private ResultFuture(Class klass) - { - this.klass = klass; - } - - private void set(Struct result) - { - synchronized (this) - { - this.result = klass.cast(result); - notifyAll(); - } - } - - public T get(long timeout, int nanos) - { - synchronized (this) - { - while (!isDone()) - { - try - { - wait(timeout, nanos); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - return result; - } - - public T get(long timeout) - { - return get(timeout, 0); - } - - public T get() - { - return get(0); - } - - public boolean isDone() - { - return result != null; - } - - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java deleted file mode 100644 index e6c107ced2..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * SessionDelegate - * - * @author Rafael H. Schloming - */ - -public abstract class SessionDelegate extends Delegate -{ - - public abstract void headers(Session ssn, Struct ... headers); - - public abstract void data(Session ssn, Frame frame); - - @Override public void executionResult(Session ssn, ExecutionResult result) - { - ssn.result(result.getCommandId(), result.getData()); - } - - @Override public void executionComplete(Session ssn, ExecutionComplete excmp) - { - RangeSet ranges = excmp.getRangedExecutionSet(); - if (ranges != null) - { - for (Range range : ranges) - { - System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper()); - ssn.complete(range.getLower(), range.getUpper()); - } - } - ssn.complete(excmp.getCumulativeExecutionMark()); - System.out.println("outstanding commands: " + ssn.getOutstandingCommands()); - } - - @Override public void executionFlush(Session ssn, ExecutionFlush flush) - { - ssn.flushProcessed(); - } - - @Override public void executionSync(Session ssn, ExecutionSync sync) - { - ssn.syncPoint(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/SessionResolver.java b/qpid/java/common/src/main/java/org/apache/qpidity/SessionResolver.java deleted file mode 100644 index 24e52839fd..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/SessionResolver.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * SessionResolver is a stateless handler that accepts incoming events - * whose context is a Channel, and produces an event whose context is - * a Session. - * - * @author Rafael H. Schloming - */ - -class SessionResolver implements Handler> -{ - - final private Handler> handler; - - public SessionResolver(Handler> handler) - { - this.handler = handler; - } - - public void handle(Event event) - { - handler.handle(new Event(event.context.getSession(), event.target)); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java b/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java deleted file mode 100644 index 9b4a8f90f7..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.nio.ByteBuffer; - -import java.util.Iterator; - - -/** - * SliceIterator - * - * @author Rafael H. Schloming - */ - -class SliceIterator implements Iterator -{ - - final private Iterator iterator; - - public SliceIterator(Iterator iterator) - { - this.iterator = iterator; - } - - public boolean hasNext() - { - return iterator.hasNext(); - } - - public ByteBuffer next() - { - return iterator.next().slice(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java b/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java deleted file mode 100644 index 50da4910ab..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import org.apache.qpidity.codec.Encodable; - - -/** - * Struct - * - * @author Rafael H. Schloming - */ - -public abstract class Struct implements Delegator, Encodable -{ - - public static Struct create(int type) - { - return StructFactory.create(type); - } - - public abstract int getEncodedType(); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Stub.java b/qpid/java/common/src/main/java/org/apache/qpidity/Stub.java deleted file mode 100644 index 7aa5030672..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Stub.java +++ /dev/null @@ -1,109 +0,0 @@ -package org.apache.qpidity; - -import java.util.*; -import java.lang.annotation.*; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.codec.BBEncoder; -import org.apache.qpidity.codec.Encoder; -import org.apache.qpidity.codec.SizeEncoder; - -import static org.apache.qpidity.Option.*; - - -public class Stub { - - private static final byte major = 0; - private static final byte minor = 10; - - private static Connection conn = new Connection(new ConsoleOutput(), - SessionDelegateStub.source()); - - static - { - conn.init(new ProtocolHeader((byte) 1, major, minor)); - } - - private static void frame(byte track, byte type, boolean first, boolean last) { - frame(track, type, first, last, null); - } - - private static void frame(byte track, byte type, boolean first, boolean last, Method m) { - SizeEncoder sizer = new SizeEncoder(major, minor); - if (m != null) { - sizer.writeLong(m.getEncodedType()); - m.write(sizer, major, minor); - sizer.flush(); - } - ByteBuffer buf = ByteBuffer.allocate(sizer.getSize()); - if (m != null) { - Encoder enc = new BBEncoder(major, minor, buf); - enc.writeLong(m.getEncodedType()); - m.write(enc, major, minor); - enc.flush(); - } - buf.flip(); - byte flags = Frame.VERSION; - if (first) { flags |= Frame.FIRST_FRAME; } - if (last) { flags |= Frame.LAST_FRAME; } - Frame frame = new Frame(flags, type, track, 0); - frame.addFragment(buf); - conn.frame(frame); - } - - public static final void main(String[] args) { - frame(Frame.L2, Frame.METHOD, true, true, new SessionOpen(0)); - frame(Frame.L4, Frame.METHOD, true, false, - new QueueDeclare("asdf", "alternate", null, DURABLE)); - frame(Frame.L4, Frame.METHOD, false, false); - frame(Frame.L3, Frame.METHOD, true, true, - new ExchangeDeclare("exchange", "type", "alternate", null)); - frame(Frame.L4, Frame.METHOD, false, true); - frame(Frame.L4, Frame.HEADER, true, false); - frame(Frame.L4, Frame.HEADER, false, false); - frame(Frame.L4, Frame.HEADER, false, true); - frame(Frame.L4, Frame.BODY, true, false); - frame(Frame.L4, Frame.BODY, false, false); - frame(Frame.L4, Frame.BODY, false, false); - frame(Frame.L1, Frame.METHOD, true, true, - new ExchangeDeclare("exchange", "type", "alternate", null)); - frame(Frame.L4, Frame.BODY, false, false); - frame(Frame.L4, Frame.BODY, false, true); - } - -} - -class SessionDelegateStub extends SessionDelegate { - - public static final ConnectionDelegate source() - { - return new ConnectionDelegate() - { - public SessionDelegate getSessionDelegate() - { - return new SessionDelegateStub(); - } - }; - } - - public @Override void queueDeclare(Session session, QueueDeclare qd) { - System.out.println("got a queue declare: " + qd.getQueue()); - } - - public @Override void exchangeDeclare(Session session, ExchangeDeclare ed) { - System.out.println("got an exchange declare: " + ed.getExchange() + ", " + ed.getType()); - session.queueDeclare("asdf", "alternate", null); - } - - public void data(Session ssn, Frame frame) - { - System.out.println("got data: " + frame); - } - - public void headers(Session ssn, Struct ... headers) - { - System.out.println("got headers: " + headers); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/Switch.java b/qpid/java/common/src/main/java/org/apache/qpidity/Switch.java deleted file mode 100644 index 166dc33134..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/Switch.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - -import java.util.HashMap; -import java.util.Map; - -/** - * A Switch performs generic event dispatch. - * - * @author Rafael H. Schloming - */ - -abstract class Switch implements Handler -{ - - final private Map> handlers = - new HashMap>(); - - public void map(K key, Handler handler) - { - handlers.put(key, handler); - } - - public void handle(E event) - { - K key = resolve(event); - Handler handler = handlers.get(key); - if (handler == null) - { - throw new IllegalStateException("no such key: " + key + - " this = " + this + - " handlers = " + handlers); - } - handler.handle(event); - } - - abstract K resolve(E event); - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 0a48a0a990..f69b72227c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -20,7 +20,10 @@ */ package org.apache.qpidity; -import static org.apache.qpidity.Functions.str; +import org.apache.qpidity.transport.*; +import org.apache.qpidity.transport.network.mina.MinaHandler; + +import static org.apache.qpidity.transport.util.Functions.str; import java.io.IOException; import java.nio.ByteBuffer; @@ -44,10 +47,10 @@ class ToyBroker extends SessionDelegate private ToyExchange exchange; private MessageTransfer xfr = null; private DeliveryProperties props = null; - private Struct[] headers = null; - private List frames = null; + private Header header = null; + private List body = null; private Map consumers = new ConcurrentHashMap(); - + public ToyBroker(ToyExchange exchange) { this.exchange = exchange; @@ -58,7 +61,7 @@ class ToyBroker extends SessionDelegate exchange.createQueue(qd.getQueue()); System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); } - + @Override public void queueBind(Session ssn, QueueBind qb) { exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); @@ -70,22 +73,22 @@ class ToyBroker extends SessionDelegate QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); ssn.executionResult(qq.getId(), result); } - + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) { Consumer c = new Consumer(); c._queueName = ms.getQueue(); consumers.put(ms.getDestination(),c); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); - } - + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + @Override public void messageFlow(Session ssn,MessageFlow struct) { Consumer c = consumers.get(struct.getDestination()); c._credit = struct.getValue(); System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); } - + @Override public void messageFlush(Session ssn,MessageFlush struct) { System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); @@ -95,47 +98,44 @@ class ToyBroker extends SessionDelegate @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { this.xfr = xfr; - frames = new ArrayList(); + body = new ArrayList(); System.out.println("received transfer " + xfr.getDestination()); } - public void headers(Session ssn, Struct ... headers) + @Override public void header(Session ssn, Header header) { - if (xfr == null || frames == null) + if (xfr == null || body == null) { ssn.connectionClose(503, "no method segment", 0, 0); - // XXX: close at our end + ssn.close(); return; } - for (Struct hdr : headers) + props = header.get(DeliveryProperties.class); + if (props != null) { - if (hdr instanceof DeliveryProperties) - { - props = (DeliveryProperties) hdr; - System.out.println("received headers routing_key " + props.getRoutingKey()); - } + System.out.println("received headers routing_key " + props.getRoutingKey()); } - - this.headers = headers; + + this.header = header; } - public void data(Session ssn, Frame frame) + @Override public void data(Session ssn, Data data) { - if (xfr == null || frames == null) + if (xfr == null || body == null) { ssn.connectionClose(503, "no method segment", 0, 0); - // XXX: close at our end + ssn.close(); return; } - frames.add(frame); + body.add(data); - if (frame.isLastSegment() && frame.isLastFrame()) + if (data.isLast()) { String dest = xfr.getDestination(); - Message m = new Message(headers, frames); - + Message m = new Message(header, body); + if (exchange.route(dest,props.getRoutingKey(),m)) { System.out.println("queued " + m); @@ -143,12 +143,12 @@ class ToyBroker extends SessionDelegate } else { - + reject(ssn); } ssn.processed(xfr); xfr = null; - frames = null; + body = null; } } @@ -165,22 +165,22 @@ class ToyBroker extends SessionDelegate ssn.messageReject(ranges, 0, "no such destination"); } } - + private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); ssn.messageTransfer(dest, (short)0, (short)0); - ssn.headers(m.headers); - for (Frame f : m.frames) + ssn.header(m.header); + for (Data d : m.body) { - for (ByteBuffer b : f) + for (ByteBuffer b : d.getFragments()) { ssn.data(b); } } ssn.endData(); } - + private void dispatchMessages(Session ssn) { for (String dest: consumers.keySet()) @@ -188,8 +188,8 @@ class ToyBroker extends SessionDelegate checkAndSendMessagesToConsumer(ssn,dest); } } - - private void checkAndSendMessagesToConsumer(Session ssn,String dest) + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) { Consumer c = consumers.get(dest); LinkedBlockingQueue queue = exchange.getQueue(c._queueName); @@ -204,33 +204,33 @@ class ToyBroker extends SessionDelegate class Message { - private final Struct[] headers; - private final List frames; + private final Header header; + private final List body; - public Message(Struct[] headers, List frames) + public Message(Header header, List body) { - this.headers = headers; - this.frames = frames; + this.header = header; + this.body = body; } public String toString() { StringBuilder sb = new StringBuilder(); - if (headers != null) + if (header != null) { boolean first = true; - for (Struct hdr : headers) + for (Struct st : header.getStructs()) { if (first) { first = false; } else { sb.append(" "); } - sb.append(hdr); + sb.append(st); } } - for (Frame f : frames) + for (Data d : body) { - for (ByteBuffer b : f) + for (ByteBuffer b : d.getFragments()) { sb.append(" | "); sb.append(str(b)); @@ -241,7 +241,7 @@ class ToyBroker extends SessionDelegate } } - + // ugly, but who cares :) // assumes unit is always no of messages, not bytes // assumes it's credit mode and not window @@ -253,7 +253,7 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { - final ToyExchange exchange = new ToyExchange(); + final ToyExchange exchange = new ToyExchange(); ConnectionDelegate delegate = new ConnectionDelegate() { public SessionDelegate getSessionDelegate() @@ -261,11 +261,11 @@ class ToyBroker extends SessionDelegate return new ToyBroker(exchange); } }; - + //hack delegate.setUsername("guest"); delegate.setPassword("guest"); - + MinaHandler.accept("0.0.0.0", 5672, delegate); } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java index e325fb93be..d924c55c7b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -20,6 +20,9 @@ */ package org.apache.qpidity; +import org.apache.qpidity.transport.*; +import org.apache.qpidity.transport.network.mina.MinaHandler; + /** * ToyClient @@ -42,17 +45,17 @@ class ToyClient extends SessionDelegate } } - public void headers(Session ssn, Struct ... headers) + @Override public void header(Session ssn, Header header) { - for (Struct hdr : headers) + for (Struct st : header.getStructs()) { - System.out.println("header: " + hdr); + System.out.println("header: " + st); } } - public void data(Session ssn, Frame frame) + @Override public void data(Session ssn, Data data) { - System.out.println("got data: " + frame); + System.out.println("got data: " + data); } public static final void main(String[] args) @@ -65,7 +68,7 @@ class ToyClient extends SessionDelegate return new ToyClient(); } }); - conn.getOutputHandler().handle(conn.getHeader().toByteBuffer()); + conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); Channel ch = conn.getChannel(0); Session ssn = new Session(); @@ -76,8 +79,8 @@ class ToyClient extends SessionDelegate ssn.sync(); ssn.messageTransfer("asdf", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties(), - new MessageProperties()); + ssn.header(new DeliveryProperties(), + new MessageProperties()); ssn.data("this is the data"); ssn.endData(); @@ -88,6 +91,8 @@ class ToyClient extends SessionDelegate Future future = ssn.queueQuery("asdf"); System.out.println(future.get().getQueue()); + ssn.close(); + conn.close(); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java b/qpid/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java deleted file mode 100644 index 28a7d75f05..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * A TrackSwitch sends incoming frames to a different handler based on - * track. - * - * @author Rafael H. Schloming - */ - -class TrackSwitch extends Switch> -{ - - public Byte resolve(Event event) - { - return event.target.getTrack(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java b/qpid/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java deleted file mode 100644 index fc53b0b9b4..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * A TypeSwitch sends incoming frames to a different handler based on - * type. - * - * @author Rafael H. Schloming - */ - -class TypeSwitch extends Switch> -{ - - public Byte resolve(Event event) - { - return event.target.getType(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java b/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java index ab45ee01aa..afb17547ac 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -3,8 +3,8 @@ package org.apache.qpidity.api; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.transport.MessageProperties; +import org.apache.qpidity.transport.DeliveryProperties; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -14,9 +14,9 @@ import org.apache.qpidity.DeliveryProperties; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,13 +30,13 @@ public interface Message public MessageProperties getMessageProperties(); public DeliveryProperties getDeliveryProperties(); - + /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * - * The appendData function might write data to + * + * The appendData function might write data to *
    *
  • Memory (Ex: ByteBuffer) *
  • To Disk @@ -50,54 +50,54 @@ public interface Message * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * - * The appendData function might write data to + * + * The appendData function might write data to *
      *
    • Memory (Ex: ByteBuffer) *
    • To Disk *
    • To Socket (Stream) *
    * @param src - the data to append - */ + */ public void appendData(ByteBuffer src) throws IOException; - + /** * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * + * * The read function might copy data from *
      *
    • From memory (Ex: ByteBuffer) *
    • From Disk *
    • From Socket as and when it gets streamed *
    - * @param target The target byte[] which the data gets copied to + * @param target The target byte[] which the data gets copied to */ - public void readData(byte[] target) throws IOException; - + public void readData(byte[] target) throws IOException; + /** * * This will abstract the underlying message data. * The Message implementation may not hold all message * data in memory (especially in the case of large messages) - * + * * The read function might copy data from *
      *
    • From memory (Ex: ByteBuffer) *
    • From Disk *
    • From Socket as and when it gets streamed *
    - * + * * @return A ByteBuffer containing data * @throws IOException */ - public ByteBuffer readData() throws IOException; - + public ByteBuffer readData() throws IOException; + /** * This should clear the body of the message. */ public void clearData(); - + /** * The provides access to the command Id assigned to the * message transfer. @@ -107,15 +107,14 @@ public interface Message * you could use this id to accquire it. *
  • For releasing a message. You can use this id to release an acquired * message - *
  • For Acknowledging a message - You need to pass this ID, in order to + *
  • For Acknowledging a message - You need to pass this ID, in order to * acknowledge the message *
  • For Rejecting a message - You need to pass this ID, in order to reject - * the message. + * the message. *
- * + * * @return the message transfer id. */ public long getMessageTransferId(); - -} +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java index 134c9cdf47..cfb9dfbe92 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java @@ -23,10 +23,10 @@ package org.apache.qpidity.codec; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; -import static org.apache.qpidity.Functions.*; +import static org.apache.qpidity.transport.util.Functions.*; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java index b6f875bc5d..0cc5a4157a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java @@ -25,11 +25,11 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.Range; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; -import static org.apache.qpidity.Functions.*; +import static org.apache.qpidity.transport.util.Functions.*; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java index ec91c877f8..0d224056de 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java @@ -23,8 +23,8 @@ package org.apache.qpidity.codec; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java index b879ed5cd7..9a79ec70fa 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java @@ -23,8 +23,8 @@ package org.apache.qpidity.codec; import java.util.Map; import java.util.UUID; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Struct; +import org.apache.qpidity.transport.RangeSet; +import org.apache.qpidity.transport.Struct; /** diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java b/qpid/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java deleted file mode 100644 index 8fb396f278..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/codec/SegmentEncoder.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.codec; - -import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import org.apache.qpidity.Frame; -import org.apache.qpidity.Handler; - -import static java.lang.Math.*; - -import static org.apache.qpidity.Frame.*; - - -/** - * SegmentEncoder - * - * @author Rafael H. Schloming - */ - -public class SegmentEncoder extends AbstractEncoder -{ - - private final Handler handler; - private final int max; - private final byte flags; - private final byte track; - private final byte type; - private final int channel; - - private int remaining; - private ByteBuffer frame; - private boolean first; - - public SegmentEncoder(byte major, byte minor, Handler handler, - int max, byte flags, byte track, byte type, - int channel, int remaining) - { - super(major, minor); - if (max < HEADER_SIZE + 1) - { - throw new IllegalArgumentException - ("max frame size must be large enough to include header"); - } - - this.handler = handler; - this.max = max; - this.flags = flags; - this.track = track; - this.type = type; - this.channel = channel; - this.remaining = remaining; - this.frame = null; - this.first = true; - } - - private void preWrite() { - if (remaining == 0) - { - throw new BufferOverflowException(); - } - - if (frame == null) - { - frame = ByteBuffer.allocate(min(max, remaining + HEADER_SIZE)); - frame.order(ByteOrder.BIG_ENDIAN); - - byte frameFlags = flags; - if (first) { frameFlags |= FIRST_FRAME; first = false; } - if (remaining <= (frame.remaining() - HEADER_SIZE)) - { - frameFlags |= LAST_FRAME; - } - - frame.put(frameFlags); - frame.put(type); - frame.putShort((short) frame.limit()); - frame.put(RESERVED); - frame.put(track); - frame.putShort((short) channel); - frame.put(RESERVED); - frame.put(RESERVED); - frame.put(RESERVED); - frame.put(RESERVED); - - assert frame.position() == HEADER_SIZE; - } - } - - private void postWrite() { - if (!frame.hasRemaining()) - { - frame.flip(); - handler.handle(frame); - frame = null; - } - } - - @Override public void put(byte b) - { - preWrite(); - frame.put(b); - remaining -= 1; - postWrite(); - } - - @Override public void put(ByteBuffer src) - { - if (src.remaining() > remaining) - { - throw new BufferOverflowException(); - } - - while (src.hasRemaining()) - { - preWrite(); - int limit = src.limit(); - src.limit(src.position() + min(frame.remaining(), src.remaining())); - remaining -= src.remaining(); - frame.put(src); - src.limit(limit); - postWrite(); - } - } - - public static final void main(String[] args) { - ByteBuffer buf = ByteBuffer.allocate(1024); - buf.put("AMQP_PROTOCOL_HEADER".getBytes()); - buf.flip(); - - SegmentEncoder enc = new SegmentEncoder((byte) 0, (byte) 10, - new Handler() - { - public void handle(ByteBuffer frame) - { - System.out.println(frame); - } - }, - 16, - (byte) 0x0, - (byte) Frame.L1, - (byte) Frame.METHOD, - 0, - 7 + buf.remaining()); - enc.put((byte)0); - enc.put((byte)1); - enc.put((byte)2); - enc.put((byte)3); - enc.put((byte)4); - enc.put((byte)5); - enc.put((byte)6); - enc.put(buf); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java new file mode 100644 index 0000000000..a8059d669f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/AbstractDelegate.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * AbstractDelegate + * + */ + +class AbstractDelegate +{ + + public void init(C context, ProtocolHeader header) {} + + public void error(C context, ProtocolError error) {} + + public void header(C context, Header header) {} + + public void data(C context, Data data) {} + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java new file mode 100644 index 0000000000..426f954c17 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.nio.ByteBuffer; + +import java.util.List; +import java.util.ArrayList; + +import static org.apache.qpidity.transport.network.Frame.*; +import static org.apache.qpidity.transport.util.Functions.*; + + +/** + * Channel + * + * @author Rafael H. Schloming + */ + +public class Channel extends Invoker implements Receiver +{ + + final private Connection connection; + final private int channel; + final private Delegate delegate; + final private SessionDelegate sessionDelegate; + // session may be null + private Session session; + + private boolean first = true; + private ByteBuffer data = null; + + public Channel(Connection connection, int channel, SessionDelegate delegate) + { + this.connection = connection; + this.channel = channel; + this.delegate = new ChannelDelegate(); + this.sessionDelegate = delegate; + } + + public Connection getConnection() + { + return connection; + } + + public void received(ProtocolEvent event) + { + switch (event.getEncodedTrack()) + { + case L1: + event.delegate(this, connection.getConnectionDelegate()); + break; + case L2: + event.delegate(this, delegate); + break; + case L3: + event.delegate(session, sessionDelegate); + break; + case L4: + // XXX + if (event instanceof Method) + { + Method method = (Method) event; + method.setId(session.nextCommandId()); + method.delegate(session, sessionDelegate); + if (!method.hasPayload()) + { + session.processed(method); + } + } + else + { + event.delegate(session, sessionDelegate); + } + break; + default: + throw new IllegalStateException + ("unknown track: " + event.getEncodedTrack()); + } + } + + public void closed() + { + System.out.println("channel closed: " + this); + } + + public void close() + { + connection.removeChannel(channel); + } + + public int getEncodedChannel() { + return channel; + } + + public Session getSession() + { + return session; + } + + void setSession(Session session) + { + this.session = session; + } + + private void emit(ProtocolEvent event) + { + connection.send(new ConnectionEvent(channel, event)); + } + + public void method(Method m) + { + emit(m); + + if (m.getEncodedTrack() != L4) + { + System.out.println("sent control " + m.getClass().getName()); + } + } + + public void header(Header header) + { + emit(header); + } + + public void data(ByteBuffer buf) + { + if (data != null) + { + emit(new Data(data, first, false)); + first = false; + } + + data = buf; + } + + public void data(String str) + { + data(str.getBytes()); + } + + public void data(byte[] bytes) + { + data(ByteBuffer.wrap(bytes)); + } + + public void end() + { + emit(new Data(data, first, true)); + first = true; + data = null; + } + + protected void invoke(Method m) + { + method(m); + } + + protected Future invoke(Method m, Class cls) + { + throw new UnsupportedOperationException(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java new file mode 100644 index 0000000000..9d28b1a81e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.util.UUID; + + +/** + * ChannelDelegate + * + * @author Rafael H. Schloming + */ + +class ChannelDelegate extends Delegate +{ + + public @Override void sessionOpen(Channel channel, SessionOpen open) + { + Session ssn = new Session(); + ssn.attach(channel); + long lifetime = open.getDetachedLifetime(); + System.out.println("Session Opened lifetime = " + lifetime); + ssn.sessionAttached(UUID.randomUUID(), lifetime); + } + + public @Override void sessionAttached(Channel channel, + SessionAttached attached) + { + System.out.println("Session attached: " + attached.getSessionId() + ", " + + attached.getDetachedLifetime()); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java new file mode 100644 index 0000000000..2c68b7c38b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.util.HashMap; +import java.util.Map; + +import java.nio.ByteBuffer; + + +/** + * Connection + * + * @author Rafael H. Schloming + * + * @todo the channels map should probably be replaced with something + * more efficient, e.g. an array or a map implementation that can use + * short instead of Short + */ + +// RA making this public until we sort out the package issues +public class Connection + implements Receiver, Sender +{ + + final private Sender sender; + final private ConnectionDelegate delegate; + + final private Map channels = new HashMap(); + + public Connection(Sender sender, + ConnectionDelegate delegate) + { + this.sender = sender; + this.delegate = delegate; + } + + public ConnectionDelegate getConnectionDelegate() + { + return delegate; + } + + public void received(ConnectionEvent event) + { + Channel channel = getChannel(event.getChannel()); + channel.received(event.getProtocolEvent()); + } + + public void send(ConnectionEvent event) + { + sender.send(event); + } + + public Channel getChannel(int number) + { + synchronized (channels) + { + Channel channel = channels.get(number); + if (channel == null) + { + channel = new Channel(this, number, delegate.getSessionDelegate()); + channels.put(number, channel); + } + return channel; + } + } + + void removeChannel(int number) + { + synchronized (channels) + { + channels.remove(number); + } + } + + public void closed() + { + System.out.println("connection closed: " + this); + } + + public void close() + { + sender.close(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java new file mode 100644 index 0000000000..d500cc6b81 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -0,0 +1,309 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import org.apache.qpidity.SecurityHelper; +import org.apache.qpidity.QpidException; + +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + + +/** + * ConnectionDelegate + * + * @author Rafael H. Schloming + */ + +/** + * Currently only implemented client specific methods + * the server specific methods are dummy impls for testing + * + * the connectionClose is kind of different for both sides + */ +public abstract class ConnectionDelegate extends Delegate +{ + private String _username = "guest"; + private String _password = "guest";; + private String _mechanism; + private String _virtualHost; + private SaslClient saslClient; + private SaslServer saslServer; + private String _locale = "utf8"; + private int maxFrame = 64*1024; + private Condition _negotiationComplete; + private Lock _negotiationCompleteLock; + + public abstract SessionDelegate getSessionDelegate(); + + public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete) + { + _negotiationComplete = negotiationComplete; + _negotiationCompleteLock = negotiationCompleteLock; + } + + @Override public void init(Channel ch, ProtocolHeader hdr) + { + System.out.println(hdr); + // XXX: hardcoded version + if (hdr.getMajor() != 0 && hdr.getMinor() != 10) + { + // XXX + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); + ch.getConnection().close(); + } + else + { + + System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n"); + + ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8"); + } + } + + @Override public void error(Channel ch, ProtocolError error) + { + throw new RuntimeException(error.getMessage()); + } + + // ---------------------------------------------- + // Client side + //----------------------------------------------- + @Override public void connectionStart(Channel context, ConnectionStart struct) + { + System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n"); + System.out.println("The broker has sent connection-start"); + + String mechanism = null; + String response = null; + try + { + mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); + saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, + SecurityHelper.createCallbackHandler(mechanism,_username,_password )); + response = new String(saslClient.evaluateChallenge(new byte[0]),_locale); + } + catch (UnsupportedEncodingException e) + { + // need error handling + } + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + } + + Map props = new HashMap(); + context.connectionStartOk(props, mechanism, response, _locale); + } + + @Override public void connectionSecure(Channel context, ConnectionSecure struct) + { + System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge()); + + try + { + String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); + context.connectionSecureOk(response); + } + catch (UnsupportedEncodingException e) + { + // need error handling + } + catch (SaslException e) + { + // need error handling + } + } + + @Override public void connectionTune(Channel context, ConnectionTune struct) + { + System.out.println("The broker has sent connection-tune " + struct.toString()); + + // should update the channel max given by the broker. + context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); + context.connectionOpen(_virtualHost, null, Option.INSIST); + } + + + @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) + { + String knownHosts = struct.getKnownHosts(); + System.out.println("The broker has opened the connection for use"); + System.out.println("The broker supplied the following hosts for failover " + knownHosts); + if(_negotiationCompleteLock != null) + { + _negotiationCompleteLock.lock(); + try + { + _negotiationComplete.signalAll(); + } + finally + { + _negotiationCompleteLock.unlock(); + } + } + System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n"); + } + + public void connectionRedirect(Channel context, ConnectionRedirect struct) + { + // not going to bother at the moment + } + + // ---------------------------------------------- + // Server side + //----------------------------------------------- + @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) + { + //set the client side locale on the server side + _locale = struct.getLocale(); + _mechanism = struct.getMechanism(); + + System.out.println("The client has sent connection-start-ok"); + + //try + //{ + //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); + //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + byte[] challenge = null; + if ( challenge == null) + { + System.out.println("Authentication sucessfull"); + context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + } + else + { + System.out.println("Authentication failed"); + try + { + context.connectionSecure(new String(challenge,_locale)); + } + catch(Exception e) + { + + } + } + + + /*} + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + }*/ + } + + @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct) + { + System.out.println("The client has excepted the tune params"); + } + + @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) + { + System.out.println("The client has sent connection-secure-ok"); + try + { + saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); + byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + if ( challenge == null) + { + System.out.println("Authentication sucessfull"); + context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + } + else + { + System.out.println("Authentication failed"); + try + { + context.connectionSecure(new String(challenge,_locale)); + } + catch(Exception e) + { + + } + } + + + } + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + } + } + + + @Override public void connectionOpen(Channel context, ConnectionOpen struct) + { + String hosts = "amqp:1223243232325"; + System.out.println("The client has sent connection-open"); + context.connectionOpenOk(hosts); + System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n"); + } + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(String host) + { + _virtualHost = host; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java new file mode 100644 index 0000000000..6ed0df57a7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * ConnectionEvent + * + */ + +public class ConnectionEvent +{ + + private final int channel; + private final ProtocolEvent event; + + public ConnectionEvent(int channel, ProtocolEvent event) + { + this.channel = channel; + this.event = event; + } + + public int getChannel() + { + return channel; + } + + public ProtocolEvent getProtocolEvent() + { + return event; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java new file mode 100644 index 0000000000..55cde84d5e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import org.apache.qpidity.transport.network.Frame; + +import java.nio.ByteBuffer; + +import java.util.Collections; + + +/** + * Data + * + */ + +public class Data implements ProtocolEvent +{ + + private final Iterable fragments; + private final boolean first; + private final boolean last; + + public Data(Iterable fragments, boolean first, boolean last) + { + this.fragments = fragments; + this.first = first; + this.last = last; + } + + public Data(ByteBuffer buf, boolean first, boolean last) + { + this(Collections.singletonList(buf), first, last); + } + + public Iterable getFragments() + { + return fragments; + } + + public boolean isFirst() + { + return first; + } + + public boolean isLast() + { + return last; + } + + public byte getEncodedTrack() + { + return Frame.L4; + } + + public void delegate(C context, Delegate delegate) + { + delegate.data(context, this); + } + + public void delegate(C context, Switch sw) + { + sw.data(context, this); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java new file mode 100644 index 0000000000..8936f06831 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * Future + * + * @author Rafael H. Schloming + */ + +public interface Future +{ + + T get(); + + boolean isDone(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java new file mode 100644 index 0000000000..632dc137c1 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import org.apache.qpidity.transport.network.Frame; + +import java.util.List; + + +/** + * Header + * + * @author Rafael H. Schloming + */ + +public class Header implements ProtocolEvent { + + private final List structs; + + public Header(List structs) + { + this.structs = structs; + } + + public List getStructs() + { + return structs; + } + + public T get(Class klass) + { + for (Struct st : structs) + { + if (klass.isInstance(st)) + { + return klass.cast(st); + } + } + + return null; + } + + public byte getEncodedTrack() + { + return Frame.L4; + } + + public void delegate(C context, Delegate delegate) + { + delegate.header(context, this); + } + + public void delegate(C context, Switch sw) + { + sw.header(context, this); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java new file mode 100644 index 0000000000..edd9116a73 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * Method + * + * @author Rafael H. Schloming + */ + +public abstract class Method extends Struct implements ProtocolEvent +{ + + public static final Method create(int type) + { + // XXX: should generate separate factories for separate + // namespaces + return (Method) Struct.create(type); + } + + // XXX: command subclass? + private long id; + + public final long getId() + { + return id; + } + + void setId(long id) + { + this.id = id; + } + + public abstract boolean hasPayload(); + + public abstract byte getEncodedTrack(); + + public void delegate(C context, Switch sw) + { + sw.method(context, this); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java new file mode 100644 index 0000000000..cd9fb3b94a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import org.apache.qpidity.transport.network.NetworkDelegate; +import org.apache.qpidity.transport.network.NetworkEvent; + + +/** + * ProtocolError + * + * @author Rafael H. Schloming + */ + +public class ProtocolError implements NetworkEvent, ProtocolEvent +{ + + private final byte track; + private final String format; + private final Object[] args; + + public ProtocolError(byte track, String format, Object ... args) + { + this.track = track; + this.format = format; + this.args = args; + } + + public byte getEncodedTrack() + { + return track; + } + + public String getMessage() + { + return String.format(format, args); + } + + public void delegate(C context, Switch sw) + { + sw.error(context, this); + } + + public void delegate(NetworkDelegate delegate) + { + delegate.error(this); + } + + public void delegate(C context, Delegate delegate) + { + delegate.error(context, this); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java new file mode 100644 index 0000000000..e2adefba9e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * ProtocolEvent + * + */ + +public interface ProtocolEvent +{ + + public interface Switch + { + void init(C context, ProtocolHeader header); + void method(C context, Method method); + void header(C context, Header header); + void data(C context, Data data); + void error(C context, ProtocolError error); + } + + // XXX: could do this switching with cascading defaults for the + // specific dispatch methods + void delegate(C context, Switch sw); + + void delegate(C context, Delegate delegate); + + byte getEncodedTrack(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java new file mode 100644 index 0000000000..50cae51171 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.transport.network.NetworkDelegate; +import org.apache.qpidity.transport.network.NetworkEvent; +import org.apache.qpidity.transport.network.Frame; + + +/** + * ProtocolHeader + * + * @author Rafael H. Schloming + */ + +//RA making this public until we sort out the package issues + +public class ProtocolHeader implements NetworkEvent, ProtocolEvent +{ + + private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; + private static final byte CLASS = 1; + + final private byte instance; + final private byte major; + final private byte minor; + + public ProtocolHeader(byte instance, byte major, byte minor) + { + this.instance = instance; + this.major = major; + this.minor = minor; + } + + public ProtocolHeader(int instance, int major, int minor) + { + this((byte) instance, (byte) major, (byte) minor); + } + + public byte getInstance() + { + return instance; + } + + public byte getMajor() + { + return major; + } + + public byte getMinor() + { + return minor; + } + + public byte getEncodedTrack() + { + return Frame.L1; + } + + public ByteBuffer toByteBuffer() + { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.put(AMQP); + buf.put(CLASS); + buf.put(instance); + buf.put(major); + buf.put(minor); + buf.flip(); + return buf; + } + + public void delegate(C context, Switch sw) + { + sw.init(context, this); + } + + public void delegate(NetworkDelegate delegate) + { + delegate.init(this); + } + + public void delegate(C context, Delegate delegate) + { + delegate.init(context, this); + } + + public String toString() + { + return String.format("AMQP.%d %d-%d", instance, major, minor); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java new file mode 100644 index 0000000000..ed745bf5ec --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import static java.lang.Math.*; + + +/** + * Range + * + * @author Rafael H. Schloming + */ + +public class Range +{ + private final long lower; + private final long upper; + + public Range(long lower, long upper) + { + this.lower = lower; + this.upper = upper; + } + + public long getLower() + { + return lower; + } + + public long getUpper() + { + return upper; + } + + public boolean includes(long value) + { + return lower <= value && value <= upper; + } + + public boolean includes(Range range) + { + return includes(range.lower) && includes(range.upper); + } + + public boolean intersects(Range range) + { + return (includes(range.lower) || includes(range.upper) || + range.includes(lower) || range.includes(upper)); + } + + public boolean touches(Range range) + { + return (includes(range.upper + 1) || includes(range.lower - 1) || + range.includes(upper + 1) || range.includes(lower - 1)); + } + + public Range span(Range range) + { + return new Range(min(lower, range.lower), max(upper, range.upper)); + } + + public String toString() + { + return "[" + lower + ", " + upper + "]"; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java new file mode 100644 index 0000000000..dfaec3702c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import java.util.Collection; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.LinkedList; + + +/** + * RangeSet + * + * @author Rafael H. Schloming + */ + +public class RangeSet implements Iterable +{ + + private LinkedList ranges = new LinkedList(); + + public int size() + { + return ranges.size(); + } + + public Iterator iterator() + { + return ranges.iterator(); + } + + public boolean includes(Range range) + { + for (Range r : this) + { + if (r.includes(range)) + { + return true; + } + } + + return false; + } + + public void add(Range range) + { + ListIterator it = ranges.listIterator(); + + while (it.hasNext()) + { + Range next = it.next(); + if (range.touches(next)) + { + it.remove(); + range = range.span(next); + } + else if (range.getUpper() < next.getLower()) + { + it.previous(); + it.add(range); + return; + } + } + + it.add(range); + } + + public void add(long lower, long upper) + { + add(new Range(lower, upper)); + } + + public void add(long value) + { + add(value, value); + } + + public void clear() + { + ranges.clear(); + } + + public String toString() + { + return ranges.toString(); + } + + public static final void main(String[] args) + { + RangeSet ranges = new RangeSet(); + ranges.add(5, 10); + System.out.println(ranges); + ranges.add(15, 20); + System.out.println(ranges); + ranges.add(23, 25); + System.out.println(ranges); + ranges.add(12, 14); + System.out.println(ranges); + ranges.add(0, 1); + System.out.println(ranges); + ranges.add(3, 11); + System.out.println(ranges); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java new file mode 100644 index 0000000000..65edb3a6ec --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * Receiver + * + */ + +public interface Receiver +{ + + void received(T msg); + + void closed(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java new file mode 100644 index 0000000000..2126a76a53 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * Result + * + * @author Rafael H. Schloming + */ + +public abstract class Result extends Struct {} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java new file mode 100644 index 0000000000..6da8358bd6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * Sender + * + */ + +public interface Sender +{ + + void send(T msg); + + void close(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java new file mode 100644 index 0000000000..59e8daae31 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -0,0 +1,337 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import org.apache.qpidity.transport.network.Frame; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * Session + * + * @author Rafael H. Schloming + */ + +public class Session extends Invoker +{ + + // channel may be null + Channel channel; + + // incoming command count + private long commandsIn = 0; + // completed incoming commands + private final RangeSet processed = new RangeSet(); + private Range syncPoint = null; + + // outgoing command count + private long commandsOut = 0; + private Map commands = new HashMap(); + private long mark = 0; + + + public Map getOutstandingCommands() + { + return commands; + } + + public long getCommandsOut() + { + return commandsOut; + } + + public long getCommandsIn() + { + return commandsIn; + } + + public long nextCommandId() + { + return commandsIn++; + } + + public RangeSet getProcessed() + { + return processed; + } + + public void processed(Method command) + { + processed(command.getId()); + } + + public void processed(long command) + { + processed(new Range(command, command)); + } + + public void processed(long lower, long upper) + { + processed(new Range(lower, upper)); + } + + public void processed(Range range) + { + boolean flush; + synchronized (processed) + { + processed.add(range); + flush = syncPoint != null && processed.includes(syncPoint); + } + if (flush) + { + flushProcessed(); + } + } + + void flushProcessed() + { + for (Range r: processed) + { + System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" ); + } + System.out.println("Notifying peer with execution complete"); + executionComplete(0, processed); + } + + void syncPoint() + { + System.out.println("===========Request received to sync=========================="); + + Range range = new Range(0, getCommandsIn() - 1); + boolean flush; + synchronized (processed) + { + flush = processed.includes(range); + if (!flush) + { + syncPoint = range; + } + } + if (flush) + { + flushProcessed(); + } + } + + public void attach(Channel channel) + { + this.channel = channel; + channel.setSession(this); + } + + public Method getCommand(long id) + { + synchronized (commands) + { + return commands.get(id); + } + } + + void complete(long lower, long upper) + { + synchronized (commands) + { + for (long id = lower; id <= upper; id++) + { + commands.remove(id); + } + + if (commands.isEmpty()) + { + System.out.println("\n All outstanding commands are completed !!!! \n"); + commands.notifyAll(); + } + } + } + + void complete(long mark) + { + complete(this.mark, mark); + this.mark = mark; + } + + protected void invoke(Method m) + { + if (m.getEncodedTrack() == Frame.L4) + { + synchronized (commands) + { + System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut); + commands.put(commandsOut++, m); + } + } + channel.method(m); + } + + public void header(Header header) + { + channel.header(header); + } + + public void header(List structs) + { + header(new Header(structs)); + } + + public void header(Struct ... structs) + { + header(Arrays.asList(structs)); + } + + public void data(ByteBuffer buf) + { + channel.data(buf); + } + + public void data(String str) + { + channel.data(str); + } + + public void data(byte[] bytes) + { + channel.data(bytes); + } + + public void endData() + { + channel.end(); + } + + public void sync() + { + System.out.println("calling sync()"); + synchronized (commands) + { + if (!commands.isEmpty()) + { + executionSync(); + } + + while (!commands.isEmpty()) + { + try { + System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); + commands.wait(); + System.out.println("\n============sync() got notified=========================================\n"); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + } + + private Map> results = + new HashMap>(); + + void result(long command, Struct result) + { + ResultFuture future; + synchronized (results) + { + future = results.remove(command); + } + future.set(result); + } + protected Future invoke(Method m, Class klass) + { + long command = commandsOut; + ResultFuture future = new ResultFuture(klass); + synchronized (results) + { + results.put(command, future); + } + invoke(m); + return future; + } + + private class ResultFuture implements Future + { + + private final Class klass; + private T result; + + private ResultFuture(Class klass) + { + this.klass = klass; + } + + private void set(Struct result) + { + synchronized (this) + { + this.result = klass.cast(result); + notifyAll(); + } + } + + public T get(long timeout, int nanos) + { + synchronized (this) + { + while (!isDone()) + { + try + { + wait(timeout, nanos); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + return result; + } + + public T get(long timeout) + { + return get(timeout, 0); + } + + public T get() + { + return get(0); + } + + public boolean isDone() + { + return result != null; + } + + } + + public void close() + { + sessionClose(); + channel.close(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java new file mode 100644 index 0000000000..8b3c661075 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * SessionDelegate + * + * @author Rafael H. Schloming + */ + +public abstract class SessionDelegate extends Delegate +{ + + private static final Struct[] EMPTY_STRUCT_ARRAY = {}; + + @Override public void executionResult(Session ssn, ExecutionResult result) + { + ssn.result(result.getCommandId(), result.getData()); + } + + @Override public void executionComplete(Session ssn, ExecutionComplete excmp) + { + RangeSet ranges = excmp.getRangedExecutionSet(); + if (ranges != null) + { + for (Range range : ranges) + { + System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper()); + ssn.complete(range.getLower(), range.getUpper()); + } + } + ssn.complete(excmp.getCumulativeExecutionMark()); + System.out.println("outstanding commands: " + ssn.getOutstandingCommands()); + } + + @Override public void executionFlush(Session ssn, ExecutionFlush flush) + { + ssn.flushProcessed(); + } + + @Override public void executionSync(Session ssn, ExecutionSync sync) + { + ssn.syncPoint(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java new file mode 100644 index 0000000000..f6464780e7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + +import org.apache.qpidity.codec.Encodable; + + +/** + * Struct + * + * @author Rafael H. Schloming + */ + +public abstract class Struct implements Encodable +{ + + public static Struct create(int type) + { + return StructFactory.create(type); + } + + public abstract int getEncodedType(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java new file mode 100644 index 0000000000..e17b37bb36 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.codec.FragmentDecoder; + +import org.apache.qpidity.transport.ConnectionEvent; +import org.apache.qpidity.transport.Data; +import org.apache.qpidity.transport.Header; +import org.apache.qpidity.transport.Method; +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolEvent; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.Struct; + + +/** + * Assembler + * + */ + +public class Assembler implements Receiver, NetworkDelegate +{ + + private final Receiver receiver; + private final byte major; + private final byte minor; + private final Map> segments; + + public Assembler(Receiver receiver, byte major, byte minor) + { + this.receiver = receiver; + this.major = major; + this.minor = minor; + segments = new HashMap>(); + } + + private int segmentKey(Frame frame) + { + // XXX: can this overflow? + return (frame.getTrack() + 1) * frame.getChannel(); + } + + private List getSegment(Frame frame) + { + return segments.get(segmentKey(frame)); + } + + private void setSegment(Frame frame, List segment) + { + int key = segmentKey(frame); + if (segments.containsKey(key)) + { + error(new ProtocolError(Frame.L2, "segment in progress: %s", + frame)); + } + segments.put(segmentKey(frame), segment); + } + + private void clearSegment(Frame frame) + { + segments.remove(segmentKey(frame)); + } + + private void emit(int channel, ProtocolEvent event) + { + receiver.received(new ConnectionEvent(channel, event)); + } + + private void emit(Frame frame, ProtocolEvent event) + { + emit(frame.getChannel(), event); + } + + public void received(NetworkEvent event) + { + event.delegate(this); + } + + public void closed() + { + this.receiver.closed(); + } + + public void init(ProtocolHeader header) + { + emit(0, header); + } + + public void frame(Frame frame) + { + switch (frame.getType()) + { + case Frame.BODY: + emit(frame, new Data(frame, frame.isFirstFrame(), + frame.isLastFrame())); + break; + default: + assemble(frame); + break; + } + } + + public void error(ProtocolError error) + { + emit(0, error); + } + + private void assemble(Frame frame) + { + List segment; + if (frame.isFirstFrame()) + { + segment = new ArrayList(); + setSegment(frame, segment); + } + else + { + segment = getSegment(frame); + } + + for (ByteBuffer buf : frame) + { + segment.add(buf); + } + + if (frame.isLastFrame()) + { + clearSegment(frame); + emit(frame, decode(frame.getType(), segment)); + } + } + + private ProtocolEvent decode(byte type, List segment) + { + FragmentDecoder dec = + new FragmentDecoder(major, minor, segment.iterator()); + + switch (type) + { + case Frame.METHOD: + int methodType = (int) dec.readLong(); + Method method = Method.create(methodType); + method.read(dec, major, minor); + return method; + case Frame.HEADER: + List structs = new ArrayList(); + while (dec.hasRemaining()) + { + structs.add(dec.readLongStruct()); + } + return new Header(structs); + default: + throw new IllegalStateException("unknown frame type: " + type); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java new file mode 100644 index 0000000000..73be9c3492 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import org.apache.qpidity.codec.BBEncoder; +import org.apache.qpidity.codec.SizeEncoder; + +import org.apache.qpidity.transport.ConnectionEvent; +import org.apache.qpidity.transport.Data; +import org.apache.qpidity.transport.Header; +import org.apache.qpidity.transport.Method; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolEvent; +import org.apache.qpidity.transport.Sender; +import org.apache.qpidity.transport.Struct; + +import java.nio.ByteBuffer; + +import static org.apache.qpidity.transport.network.Frame.*; + +import static java.lang.Math.*; + + +/** + * Disassembler + * + */ + +public class Disassembler + implements Sender, ProtocolEvent.Switch +{ + + private final Sender sender; + private final int maxFrame; + private final byte major; + private final byte minor; + + public Disassembler(Sender sender, byte major, byte minor, + int maxFrame) + { + this.sender = sender; + this.major = major; + this.minor = minor; + this.maxFrame = maxFrame; + } + + public void send(ConnectionEvent event) + { + event.getProtocolEvent().delegate(event, this); + } + + public void close() + { + sender.close(); + } + + private void fragment(byte flags, byte type, ConnectionEvent event, + ByteBuffer buf, boolean first, boolean last) + { + while (buf.hasRemaining()) + { + ByteBuffer slice = buf.slice(); + slice.limit(min(maxFrame, slice.remaining())); + buf.position(buf.position() + slice.remaining()); + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (last && !buf.hasRemaining()) + { + newflags |= LAST_FRAME; + } + + Frame frame = new Frame(newflags, type, + event.getProtocolEvent().getEncodedTrack(), + event.getChannel()); + frame.addFragment(slice); + sender.send(frame); + } + } + + public void init(ConnectionEvent event, ProtocolHeader header) + { + sender.send(header); + } + + public void method(ConnectionEvent event, Method method) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + sizer.writeLong(method.getEncodedType()); + method.write(sizer, major, minor); + sizer.flush(); + int size = sizer.getSize(); + + ByteBuffer buf = ByteBuffer.allocate(size); + BBEncoder enc = new BBEncoder(major, minor, buf); + enc.writeLong(method.getEncodedType()); + method.write(enc, major, minor); + enc.flush(); + buf.flip(); + + byte flags = FIRST_SEG; + + if (!method.hasPayload()) + { + flags |= LAST_SEG; + } + + fragment(flags, METHOD, event, buf, true, true); + } + + public void header(ConnectionEvent event, Header header) + { + SizeEncoder sizer = new SizeEncoder(major, minor); + for (Struct st : header.getStructs()) + { + sizer.writeLongStruct(st); + } + + ByteBuffer buf = ByteBuffer.allocate(sizer.getSize()); + BBEncoder enc = new BBEncoder(major, minor, buf); + for (Struct st : header.getStructs()) + { + enc.writeLongStruct(st); + enc.flush(); + } + buf.flip(); + + fragment((byte) 0x0, HEADER, event, buf, true, true); + } + + public void data(ConnectionEvent event, Data data) + { + for (ByteBuffer buf : data.getFragments()) + { + fragment(LAST_SEG, BODY, event, buf, data.isFirst(), + data.isLast()); + } + } + + public void error(ConnectionEvent event, ProtocolError error) + { + sender.send(error); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java new file mode 100644 index 0000000000..c36b03b104 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import org.apache.qpidity.transport.util.SliceIterator; + +import java.nio.ByteBuffer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; + +import static org.apache.qpidity.transport.util.Functions.*; + + +/** + * Frame + * + * @author Rafael H. Schloming + */ + +// RA: changed it to public until we sort the package issues +public class Frame implements NetworkEvent, Iterable +{ + public static final int HEADER_SIZE = 12; + + // XXX: enums? + public static final byte L1 = 0; + public static final byte L2 = 1; + public static final byte L3 = 2; + public static final byte L4 = 3; + + public static final byte METHOD = 1; + public static final byte HEADER = 2; + public static final byte BODY = 3; + + public static final byte RESERVED = 0x0; + + public static final byte VERSION = 0x0; + + public static final byte FIRST_SEG = 0x8; + public static final byte LAST_SEG = 0x4; + public static final byte FIRST_FRAME = 0x2; + public static final byte LAST_FRAME = 0x1; + + final private byte flags; + final private byte type; + final private byte track; + final private int channel; + final private List fragments; + private int size; + + public Frame(byte flags, byte type, byte track, int channel) + { + this.flags = flags; + this.type = type; + this.track = track; + this.channel = channel; + this.size = 0; + this.fragments = new ArrayList(); + } + + public void addFragment(ByteBuffer fragment) + { + fragments.add(fragment); + size += fragment.remaining(); + } + + public byte getFlags() + { + return flags; + } + + public int getChannel() + { + return channel; + } + + public int getSize() + { + return size; + } + + public byte getType() + { + return type; + } + + public byte getTrack() + { + return track; + } + + private boolean flag(byte mask) + { + return (flags & mask) != 0; + } + + public boolean isFirstSegment() + { + return flag(FIRST_SEG); + } + + public boolean isLastSegment() + { + return flag(LAST_SEG); + } + + public boolean isFirstFrame() + { + return flag(FIRST_FRAME); + } + + public boolean isLastFrame() + { + return flag(LAST_FRAME); + } + + public Iterator getFragments() + { + return new SliceIterator(fragments.iterator()); + } + + public Iterator iterator() + { + return getFragments(); + } + + public void delegate(NetworkDelegate delegate) + { + delegate.frame(this); + } + + public String toString() + { + StringBuilder str = new StringBuilder(); + str.append(String.format + ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(), + getTrack(), getType(), + isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, + isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); + + boolean first = true; + for (ByteBuffer buf : this) + { + if (first) + { + first = false; + } + else + { + str.append(" | "); + } + + str.append(str(buf)); + } + + return str.toString(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java new file mode 100644 index 0000000000..191f900c02 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -0,0 +1,239 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.Receiver; + +import static org.apache.qpidity.transport.network.InputHandler.State.*; + + +/** + * InputHandler + * + * @author Rafael H. Schloming + */ + +public class InputHandler implements Receiver +{ + + public enum State + { + PROTO_HDR, + PROTO_HDR_M, + PROTO_HDR_Q, + PROTO_HDR_P, + PROTO_HDR_CLASS, + PROTO_HDR_INSTANCE, + PROTO_HDR_MAJOR, + PROTO_HDR_MINOR, + FRAME_HDR, + FRAME_HDR_TYPE, + FRAME_HDR_SIZE1, + FRAME_HDR_SIZE2, + FRAME_HDR_RSVD1, + FRAME_HDR_TRACK, + FRAME_HDR_CH1, + FRAME_HDR_CH2, + FRAME_HDR_RSVD2, + FRAME_HDR_RSVD3, + FRAME_HDR_RSVD4, + FRAME_HDR_RSVD5, + FRAME_PAYLOAD, + FRAME_FRAGMENT, + ERROR; + } + + private final Receiver receiver; + private State state; + + private byte instance; + private byte major; + private byte minor; + + private byte flags; + private byte type; + private byte track; + private int channel; + private int size; + private Frame frame; + + public InputHandler(Receiver receiver, State state) + { + this.receiver = receiver; + this.state = state; + } + + public InputHandler(Receiver receiver) + { + this(receiver, PROTO_HDR); + } + + private void init() + { + receiver.received(new ProtocolHeader(instance, major, minor)); + } + + private void frame() + { + assert size == frame.getSize(); + receiver.received(frame); + frame = null; + } + + private void error(String fmt, Object ... args) + { + receiver.received(new ProtocolError(Frame.L1, fmt, args)); + } + + public void received(ByteBuffer buf) + { + while (buf.hasRemaining()) + { + state = next(buf); + } + } + + private State next(ByteBuffer buf) + { + switch (state) { + case PROTO_HDR: + return expect(buf, 'A', PROTO_HDR_M); + case PROTO_HDR_M: + return expect(buf, 'M', PROTO_HDR_Q); + case PROTO_HDR_Q: + return expect(buf, 'Q', PROTO_HDR_P); + case PROTO_HDR_P: + return expect(buf, 'P', PROTO_HDR_CLASS); + case PROTO_HDR_CLASS: + return expect(buf, 1, PROTO_HDR_INSTANCE); + case PROTO_HDR_INSTANCE: + instance = buf.get(); + return PROTO_HDR_MAJOR; + case PROTO_HDR_MAJOR: + major = buf.get(); + return PROTO_HDR_MINOR; + case PROTO_HDR_MINOR: + minor = buf.get(); + init(); + return FRAME_HDR; + case FRAME_HDR: + flags = buf.get(); + return FRAME_HDR_TYPE; + case FRAME_HDR_TYPE: + type = buf.get(); + return FRAME_HDR_SIZE1; + case FRAME_HDR_SIZE1: + size = buf.get() << 8; + return FRAME_HDR_SIZE2; + case FRAME_HDR_SIZE2: + size += buf.get(); + size -= 12; + return FRAME_HDR_RSVD1; + case FRAME_HDR_RSVD1: + return expect(buf, 0, FRAME_HDR_TRACK); + case FRAME_HDR_TRACK: + byte b = buf.get(); + if ((b & 0xF0) != 0) { + error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: '%x'", b); + return ERROR; + } else { + track = (byte) (b & 0xF); + return FRAME_HDR_CH1; + } + case FRAME_HDR_CH1: + channel = buf.get() << 8; + return FRAME_HDR_CH2; + case FRAME_HDR_CH2: + channel += buf.get(); + return FRAME_HDR_RSVD2; + case FRAME_HDR_RSVD2: + return expect(buf, 0, FRAME_HDR_RSVD3); + case FRAME_HDR_RSVD3: + return expect(buf, 0, FRAME_HDR_RSVD4); + case FRAME_HDR_RSVD4: + return expect(buf, 0, FRAME_HDR_RSVD5); + case FRAME_HDR_RSVD5: + return expect(buf, 0, FRAME_PAYLOAD); + case FRAME_PAYLOAD: + frame = new Frame(flags, type, track, channel); + if (size > buf.remaining()) { + frame.addFragment(buf.slice()); + buf.position(buf.limit()); + return FRAME_FRAGMENT; + } else { + ByteBuffer payload = buf.slice(); + payload.limit(size); + buf.position(buf.position() + size); + frame.addFragment(payload); + frame(); + return FRAME_HDR; + } + case FRAME_FRAGMENT: + int delta = size - frame.getSize(); + if (delta > buf.remaining()) { + frame.addFragment(buf.slice()); + buf.position(buf.limit()); + return FRAME_FRAGMENT; + } else { + ByteBuffer fragment = buf.slice(); + fragment.limit(delta); + buf.position(buf.position() + delta); + frame.addFragment(fragment); + frame(); + return FRAME_HDR; + } + default: + throw new IllegalStateException(); + } + } + + private State expect(ByteBuffer buf, int expected, State next) + { + return expect(buf, (byte) expected, next); + } + + private State expect(ByteBuffer buf, char expected, State next) + { + return expect(buf, (byte) expected, next); + } + + private State expect(ByteBuffer buf, byte expected, State next) + { + byte b = buf.get(); + if (b == expected) { + return next; + } else { + error("expecting '%x', got '%x'", expected, b); + return ERROR; + } + } + + public void closed() + { + receiver.closed(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java new file mode 100644 index 0000000000..48655edd0c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolHeader; + + +/** + * NetworkDelegate + * + * @author Rafael H. Schloming + */ + +public interface NetworkDelegate +{ + + void init(ProtocolHeader header); + + void frame(Frame frame); + + void error(ProtocolError error); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java new file mode 100644 index 0000000000..080efee704 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + + +/** + * NetworkEvent + * + */ + +public interface NetworkEvent +{ + + void delegate(NetworkDelegate delegate); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java new file mode 100644 index 0000000000..90bef36790 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.transport.ProtocolError; +import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.Sender; + +import static org.apache.qpidity.transport.network.Frame.*; + + +/** + * OutputHandler + * + */ + +public class OutputHandler implements Sender, NetworkDelegate +{ + + private Sender sender; + private Object lock = new Object(); + + public OutputHandler(Sender sender) + { + this.sender = sender; + } + + public void send(NetworkEvent event) + { + event.delegate(this); + } + + public void close() + { + synchronized (lock) + { + sender.close(); + } + } + + public void init(ProtocolHeader header) + { + synchronized (lock) + { + sender.send(header.toByteBuffer()); + } + } + + public void frame(Frame frame) + { + ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE); + hdr.put(frame.getFlags()); + hdr.put(frame.getType()); + hdr.putShort((short) (frame.getSize() + HEADER_SIZE)); + hdr.put(RESERVED); + hdr.put(frame.getTrack()); + hdr.putShort((short) frame.getChannel()); + hdr.put(RESERVED); + hdr.put(RESERVED); + hdr.put(RESERVED); + hdr.put(RESERVED); + hdr.flip(); + + synchronized (lock) + { + sender.send(hdr); + for (ByteBuffer buf : frame) + { + sender.send(buf); + } + } + } + + public void error(ProtocolError error) + { + throw new IllegalStateException("XXX"); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java new file mode 100644 index 0000000000..ac9dab615d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network.mina; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; + +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketConnector; + +import org.apache.qpidity.transport.Connection; +import org.apache.qpidity.transport.ConnectionDelegate; +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.Sender; + +import org.apache.qpidity.transport.network.Assembler; +import org.apache.qpidity.transport.network.Disassembler; +import org.apache.qpidity.transport.network.InputHandler; +import org.apache.qpidity.transport.network.OutputHandler; + + +/** + * MinaHandler + * + * @author Rafael H. Schloming + */ +//RA making this public until we sort out the package issues +public class MinaHandler implements IoHandler +{ + + private final ConnectionDelegate delegate; + private final InputHandler.State state; + + public MinaHandler(ConnectionDelegate delegate, InputHandler.State state) + { + this.delegate = delegate; + this.state = state; + } + + public void messageReceived(IoSession ssn, Object obj) + { + Attachment attachment = (Attachment) ssn.getAttachment(); + ByteBuffer buf = (ByteBuffer) obj; + attachment.receiver.received(buf.buf()); + } + + public void messageSent(IoSession ssn, Object obj) + { + // do nothing + } + + public void exceptionCaught(IoSession ssn, Throwable e) + { + e.printStackTrace(); + } + + public void sessionCreated(final IoSession ssn) + { + // do nothing + } + + public void sessionOpened(final IoSession ssn) + { + System.out.println("opened " + ssn); + // XXX: hardcoded version + max-frame + Connection conn = new Connection + (new Disassembler(new OutputHandler(new MinaSender(ssn)), + (byte)0, (byte)10, 64*1024), + delegate); + // XXX: hardcoded version + Receiver receiver = + new InputHandler(new Assembler(conn, (byte)0, (byte)10), state); + ssn.setAttachment(new Attachment(conn, receiver)); + // XXX + synchronized (ssn) + { + ssn.notifyAll(); + } + } + + public void sessionClosed(IoSession ssn) + { + System.out.println("closed " + ssn); + Attachment attachment = (Attachment) ssn.getAttachment(); + attachment.receiver.closed(); + ssn.setAttachment(null); + } + + public void sessionIdle(IoSession ssn, IdleStatus status) + { + // do nothing + } + + private class Attachment + { + + Connection connection; + Receiver receiver; + + Attachment(Connection connection, + Receiver receiver) + { + this.connection = connection; + this.receiver = receiver; + } + } + + public static final void accept(String host, int port, + ConnectionDelegate delegate) + throws IOException + { + IoAcceptor acceptor = new SocketAcceptor(); + acceptor.bind(new InetSocketAddress(host, port), + new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); + } + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + MinaHandler handler = new MinaHandler(delegate, + InputHandler.State.FRAME_HDR); + SocketAddress addr = new InetSocketAddress(host, port); + SocketConnector connector = new SocketConnector(); + connector.setWorkerTimeout(0); + ConnectFuture cf = connector.connect(addr, handler); + cf.join(); + IoSession ssn = cf.getSession(); + // XXX + synchronized (ssn) + { + while (ssn.getAttachment() == null) + { + try + { + ssn.wait(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + Attachment attachment = (Attachment) ssn.getAttachment(); + return attachment.connection; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java new file mode 100644 index 0000000000..54e9ec28ef --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +import org.apache.qpidity.transport.Sender; + + +/** + * MinaSender + * + */ + +public class MinaSender implements Sender +{ + + private final IoSession session; + + public MinaSender(IoSession session) + { + this.session = session; + } + + public void send(java.nio.ByteBuffer buf) + { + session.write(ByteBuffer.wrap(buf)); + } + + public void close() + { + session.close(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java new file mode 100644 index 0000000000..fb1d4ccddf --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.util; + +import java.nio.ByteBuffer; + + +/** + * Functions + * + * @author Rafael H. Schloming + */ + +public class Functions +{ + + public static final short unsigned(byte b) + { + return (short) ((0x100 + b) & 0xFF); + } + + public static final int unsigned(short s) + { + return (0x10000 + s) & 0xFFFF; + } + + public static final long unsigned(int i) + { + return (0x1000000000L + i) & 0xFFFFFFFFL; + } + + public static final byte lsb(int i) + { + return (byte) (0xFF & i); + } + + public static final byte lsb(long l) + { + return (byte) (0xFF & l); + } + + public static final String str(ByteBuffer buf) + { + return str(buf, buf.limit()); + } + + public static final String str(ByteBuffer buf, int limit) + { + StringBuilder str = new StringBuilder(); + for (int i = 0; i < limit; i++) + { + if (i > 0 && i % 2 == 0) + { + str.append(" "); + } + str.append(String.format("%02x", buf.get(i))); + } + + return str.toString(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java new file mode 100644 index 0000000000..32392a3561 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.util; + +import java.nio.ByteBuffer; + +import java.util.Iterator; + + +/** + * SliceIterator + * + * @author Rafael H. Schloming + */ + +public class SliceIterator implements Iterator +{ + + final private Iterator iterator; + + public SliceIterator(Iterator iterator) + { + this.iterator = iterator; + } + + public boolean hasNext() + { + return iterator.hasNext(); + } + + public ByteBuffer next() + { + return iterator.next().slice(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + +} -- cgit v1.2.1