diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-09 07:24:02 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-09 07:24:02 +0000 |
| commit | c1e0112bc308edb406ac8aba3c8e47fbf710752c (patch) | |
| tree | 67a5504ea691cf0f68c34ee508476917d18adfdf /java | |
| parent | ce1474162e775b693383ed55ebb6dacf562baaa5 (diff) | |
| download | qpid-python-c1e0112bc308edb406ac8aba3c8e47fbf710752c.tar.gz | |
implemented the Connection and Session API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564124 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
32 files changed, 1336 insertions, 305 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java index 3da3a12aa5..2ea6db8943 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java @@ -28,6 +28,17 @@ import org.apache.qpidity.QpidException; */ public interface Connection { + /** + * Establish the connection using the given parameters + * + * @param host + * @param port + * @param username + * @param password + * @throws QpidException + */ + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; + /** * Establish the connection with the broker identified by the provided URL. * @@ -52,7 +63,7 @@ public interface Connection * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. * @return A Newly created (suspended) session. */ - public Session createSession(int expiryInSeconds); + public Session createSession(long expiryInSeconds); /** * Create a DtxSession for this connection. diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index 633c77ecac..def7b5ca41 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -17,7 +17,9 @@ */ package org.apache.qpidity.client; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; + +import org.apache.qpidity.Struct; /** * Assembles message parts. @@ -31,21 +33,21 @@ import org.apache.qpidity.Header; * are transferred. */ public interface MessagePartListener -{ +{ /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. * * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> */ - public void messageHeaders(Header... headers); + public void messageHeaders(Struct... headers); /** * Add the following byte array to the content of the message being received * * @param data Data to be added or streamed. */ - public void addData(byte[] data); + public void addData(ByteBuffer src); /** * Indicates that the message has been fully received. diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 0178ef6d3b..84de268232 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -18,12 +18,14 @@ */
package org.apache.qpidity.client;
+import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.UUID;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.api.Message;
/**
* <p>A session is associated with a connection.
@@ -32,10 +34,12 @@ import org.apache.qpidity.RangeSet; */
public interface Session
{
- public static final short ACQUIRE_MODE_NO_ACQUIRE = 0;
- public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1;
- public static final short CONFIRM_MODE_REQUIRED = 1;
- public static final short CONFIRM_MODE_NOT_REQUIRED = 0;
+ public static final short ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
+ public static final short ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1;
+ public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
+ public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
@@ -54,21 +58,21 @@ public interface Session /**
* Close this session and any associated resources.
*/
- public void close();
+ public void sessionClose();
/**
* Suspend this session resulting in interrupting the traffic with the broker.
* <p> The session timer will start to tick in suspend.
* <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
*/
- public void suspend();
+ public void sessionSuspend();
/**
* This will resume an existing session
* <p> Upon resume the session is attached with an underlying channel
* hence making operation on this session available.
*/
- public void resume();
+ public void sessionResume(UUID sessionId);
//------------------------------------------------------
// Messaging methods
@@ -92,7 +96,7 @@ public interface Session * @param exchange The exchange the message is being sent.
* @param msg The Message to be sent
*/
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode);
/**
* Declare the beginning of a message transfer operation. This operation must
@@ -117,27 +121,39 @@ public interface Session * </ul>
* @param exchange The exchange the message is being sent.
*/
- public void messageTransfer(String exchange, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, short confirmMode, short acquireMode);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or to the message being sent.
*
- * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ * @param headers are Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
* @see org.apache.qpidity.DeliveryProperties
*/
- public void addMessageHeaders(Header... headers);
+ public void headers(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
*
* @param data Data to be added.
- * @param off Offset from which to start reading data
- * @param len Number of bytes to be read
*/
- public void addData(byte[] data, int off, int len);
+ public void data(byte[] data);
+
+ /**
+ * Add the following ByteBuffer to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(ByteBuffer buf);
/**
+ * Add the following String to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(String str);
+
+ /**
* Signals the end of data for the message.
*/
public void endData();
@@ -258,8 +274,6 @@ public interface Session * @param destination The destination to call flush on.
*/
public void messageFlush(String destination);
-
- public int getNoOfUnAckedMessages();
/**
* On receipt of this method, the brokers MUST set his credit to zero for the given
@@ -286,8 +300,12 @@ public interface Session * and may be either discarded or moved to the broker dead letter queue.
*
* @param ranges Range of rejected messages.
+ * @param code TODO
+ * @param text TODO
*/
- public void messageReject(RangeSet ranges);
+ public void messageReject(RangeSet ranges, int code, String text);
+
+ public RangeSet getRejectedMessages();
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -296,10 +314,10 @@ public interface Session * message acquisition can fail.
* The outcome of the acquisition is returned as an array of ranges of qcquired messages.
* <p> This method should only be called on non-acquired messages.
- *
+ * @param mode TODO
* @param range Ranges of messages to be acquired.
*/
- public void messageAcquire(RangeSet ranges);
+ public void messageAcquire(RangeSet ranges, short mode);
public RangeSet getAccquiredMessages();
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/impl/Client.java new file mode 100644 index 0000000000..13acff1ea6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/Client.java @@ -0,0 +1,112 @@ +package org.apache.qpidity.impl; + +import java.net.URL; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpidity.Channel; +import org.apache.qpidity.Connection; +import org.apache.qpidity.ConnectionClose; +import org.apache.qpidity.ConnectionDelegate; +import org.apache.qpidity.MinaHandler; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.client.DtxSession; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; + + +public class Client implements org.apache.qpidity.client.Connection +{ + private AtomicInteger _channelNo = new AtomicInteger(); + private Connection _conn; + private ExceptionListener _exceptionListner; + private final Lock _lock = new ReentrantLock(); + + public static org.apache.qpidity.client.Connection createConnection() + { + return new Client(); + } + + public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException + { + Condition negotiationComplete = _lock.newCondition(); + _lock.lock(); + + ConnectionDelegate connectionDelegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ClientSessionDelegate(); + } + + @Override public void connectionClose(Channel context, ConnectionClose struct) + { + _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null)); + } + }; + + connectionDelegate.setCondition(_lock,negotiationComplete); + connectionDelegate.setUsername(username); + connectionDelegate.setPassword(password); + connectionDelegate.setVirtualHost(virtualHost); + + _conn = MinaHandler.connect(host, port,connectionDelegate); + + _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer()); + + try + { + negotiationComplete.await(); + } + catch (Exception e) + { + // + } + finally + { + _lock.unlock(); + } + } + + /* + * Until the dust settles with the URL disucssion + * I am not going to implement this. + */ + public void connect(URL url) throws QpidException + { + throw new UnsupportedOperationException(); + } + + public void close() throws QpidException + { + Channel ch = _conn.getChannel(0); + ch.connectionClose(0, "client is closing", 0, 0); + //need to close the connection underneath as well + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = _conn.getChannel(_channelNo.incrementAndGet()); + ClientSession ssn = new ClientSession(); + ssn.attach(ch); + ssn.sessionOpen(expiryInSeconds); + + return ssn; + } + + public DtxSession createDTXSession(int expiryInSeconds) + { + // TODO Auto-generated method stub + return null; + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index a0111a86f4..627829556c 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -1,216 +1,101 @@ package org.apache.qpidity.impl; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.ExceptionListener; import org.apache.qpidity.client.MessagePartListener; -import org.apache.qpidity.*; /** * Implements a Qpid Sesion. */ -public class ClientSession implements org.apache.qpidity.client.Session +public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session { - - Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); - - public void addData(byte[] data, int off, int len) - { - // TODO Auto-generated method stub - - } - - public void addMessageHeaders(Header... headers) - { - // TODO Auto-generated method stub - - } - - public void close() - { - // TODO Auto-generated method stub - - } - - public void endData() - { - // TODO Auto-generated method stub - - } - - public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void exchangeDelete(String exchangeName, Option... options) + private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); + private ExceptionListener _exceptionListner; + private RangeSet _acquiredMessages; + private RangeSet _rejectedMessages; + private Map<String,List<RangeSet>> _unackedMessages = new HashMap<String,List<RangeSet>>(); + + @Override public void sessionClose() { - // TODO Auto-generated method stub - + // release all unacked messages and then issues a close + super.sessionClose(); } - + public void messageAcknowledge(RangeSet ranges) { - // TODO Auto-generated method stub - - } - - public void messageAcquire(RangeSet ranges) - { - // TODO Auto-generated method stub - } - - public void messageCancel(String destination) - { - // TODO Auto-generated method stub - - } - - public void messageFlow(String destination, short unit, long value) - { - // TODO Auto-generated method stub - - } - - public void messageFlowMode(String destination, short mode) - { - // TODO Auto-generated method stub - - } - - public void messageFlush(String destination) - { - // TODO Auto-generated method stub - } - - public void messageReject(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageRelease(RangeSet ranges) - { - // TODO Auto-generated method stub - - } - - public void messageStop(String destination) - { - // TODO Auto-generated method stub - + for (Range range : ranges) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + super.processed(l); + } + } } public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options) { - // TODO Auto-generated method stub - + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); } public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) { - // TODO Auto-generated method stub - + // need to break it down into small pieces + super.messageTransfer(exchange, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + // super.data(bytes); * + // super.endData() } - - public void messageTransfer(String exchange, short confirmMode, short acquireMode) - { - // TODO Auto-generated method stub - - } - - public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) - { - // TODO Auto-generated method stub - - } - - public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queueDelete(String queueName, Option... options) - { - // TODO Auto-generated method stub - - } - - public void queuePurge(String queueName) - { - // TODO Auto-generated method stub - - } - - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) - { - // TODO Auto-generated method stub - - } - - public void resume() + + + public RangeSet getAccquiredMessages() { - // TODO Auto-generated method stub - + return _acquiredMessages; } - public void setExceptionListener(ExceptionListener exceptionListner) + public RangeSet getRejectedMessages() { - // TODO Auto-generated method stub - + return _rejectedMessages; } - + public void setMessageListener(String destination, MessagePartListener listener) { - // TODO Auto-generated method stub - + _messageListeners.put(destination, listener); } - - public void suspend() - { - // TODO Auto-generated method stub - - } - - public void sync() - { - // TODO Auto-generated method stub - - } - - public void txCommit() throws IllegalStateException + + public void setExceptionListener(ExceptionListener exceptionListner) { - // TODO Auto-generated method stub - - } - - public void txRollback() throws IllegalStateException + _exceptionListner = exceptionListner; + } + + // ugly but nessacery + + void setAccquiredMessages(RangeSet acquiredMessages) { - // TODO Auto-generated method stub - + _acquiredMessages = acquiredMessages; } - - public void txSelect() + + void setRejectedMessages(RangeSet rejectedMessages) { - // TODO Auto-generated method stub - + _rejectedMessages = rejectedMessages; } - - public RangeSet getAccquiredMessages() + + void notifyException(QpidException ex) { - // TODO Auto-generated method stub - return null; + _exceptionListner.onException(ex); } - - public int getNoOfUnAckedMessages() + + Map<String,MessagePartListener> getMessageListerners() { - // TODO Auto-generated method stub - return 0; + return _messageListeners; } - - } diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java index ea9e4c067b..70d1019a06 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java @@ -1,45 +1,79 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.CommonSessionDelegate; -import org.apache.qpidity.client.Session; +import java.nio.ByteBuffer; +import org.apache.qpidity.Frame; +import org.apache.qpidity.MessageAcquired; +import org.apache.qpidity.MessageReject; +import org.apache.qpidity.MessageTransfer; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Session; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.Struct; +import org.apache.qpidity.client.MessagePartListener; -public class ClientSessionDelegate extends CommonSessionDelegate -{ - - /*@Override public void messageTransfer(Session context, MessageTransfer struct) - { - MessagePartListener l = context.messagListeners.get(struct.getDestination()); - l.messageTransfer(struct.getDestination(),new Option[0]); - }*/ - - // --------------------------------------------------------------- - // Non generated methods - but would like if they are also generated. - // These methods should be called from Body and Header Handlers. - // If these methods are generated as part of the delegate then - // I can call these methods from the BodyHandler and HeaderHandler - // in a generic way - // I have used destination to indicate my intent of receiving - // some form of correlation to know which consumer this data belongs to. - // It can be anything as long as I can make the right correlation - // ---------------------------------------------------------------- - /* public void data(Session context,String destination,byte[] src) throws QpidException +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void data(Session ssn, Frame frame) { - MessagePartListener l = context.messagListeners.get(destination); - l.data(src); + for (ByteBuffer b : frame) + { + _currentMessageListener.addData(b); + } + if (frame.isLastSegment() && frame.isLastFrame()) + { + _currentMessageListener.messageReceived(); + } + } - public void endData(Session context,String destination) throws QpidException + @Override public void headers(Session ssn, Struct... headers) { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); + _currentMessageListener.messageHeaders(headers); } - public void messageHeaders(Session context,String destination,Header... headers) throws QpidException - { - MessagePartListener l = context.messagListeners.get(destination); - l.endData(); - }*/ + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + + //a better way is to tell the broker to stop the transfer + if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) + { + RangeSet transfers = new RangeSet(); + transfers.add(_currentTransfer.getId()); + session.messageRelease(transfers); + } + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null)); + } + + @Override public void messageAcquired(Session session, MessageAcquired struct) + { + ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java new file mode 100644 index 0000000000..d325054bee --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java @@ -0,0 +1,45 @@ +package org.apache.qpidity.impl; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.Session; +import org.apache.qpidity.client.Connection; + +public class DemoClient +{ + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("Queue1", null, null); + ssn.sync(); + + ssn.messageTransfer("Queue1", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties(), + new MessageProperties()); + ssn.data("this is the data"); + ssn.endData(); + + ssn.messageTransfer("Queue2", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.endData(); + ssn.sync(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java index f05f2c0e76..5d3f6a6e3e 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java @@ -1,33 +1,109 @@ package org.apache.qpidity.impl; -import org.apache.qpidity.Header; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.Struct; import org.apache.qpidity.api.Message; import org.apache.qpidity.client.MessageListener; import org.apache.qpidity.client.MessagePartListener; +/** + * + * Will call onMessage method as soon as data is avialable + * The client can then start to process the data while + * the rest of the data is read. + * + */ public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; Message _currentMsg; + DeliveryProperties _currentDeliveryProps; + MessageProperties _currentMessageProps; public MessagePartListenerAdapter(MessageListener listener) { _adaptee = listener; - _currentMsg = null; - } + + // temp solution. + _currentMsg = new Message() + { + Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>(); + ByteBuffer _readBuffer; + private int dataSize; + + public void appendData(byte[] src) + { + appendData(ByteBuffer.wrap(src)); + } - public void addData(byte[] src) - { - _currentMsg.appendData(src); - } + public void appendData(ByteBuffer src) + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } - public void messageHeaders(Header... headers) + // since we provide the message only after completion + // we can assume that when this method is called we have + // received all data. + public void readData(byte[] target) + { + if (_readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + private void buildReadBuffer() + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + }; + } + + public void addData(ByteBuffer src) + { + _currentMsg.appendData(src); + } + + public void messageHeaders(Struct... headers) { - //_currentMsg add the headers + for(Struct struct: headers) + { + if(struct instanceof DeliveryProperties) + { + _currentDeliveryProps = (DeliveryProperties)struct; + } + else if (struct instanceof MessageProperties) + { + _currentMessageProps = (MessageProperties)struct; + } + } } - + public void messageReceived() { - _adaptee.onMessage(_currentMsg); + _adaptee.onMessage(_currentMsg); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java index 1dc35b5609..98767106ab 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java @@ -34,7 +34,7 @@ public class ExceptionHelper { if (exception instanceof QpidException) { - jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode()); + jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode())); } else { @@ -51,7 +51,7 @@ public class ExceptionHelper static public XAException convertQpidExceptionToXAException(QpidException exception) { - String qpidErrorCode = exception.getErrorCode(); + String qpidErrorCode = String.valueOf(exception.getErrorCode()); // todo map this error to an XA code int xaCode = XAException.XAER_PROTO; return new XAException(xaCode); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index b9922de296..3869c2a793 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -131,9 +131,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(destination.getName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // When the message selctor is set we do not acquire the messages - _messageSelector != null ? org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, + _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION); if (_messageSelector != null) { @@ -167,9 +167,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We always acquire the messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, // Request exclusive subscription access, meaning only this subscription // can access the queue. @@ -591,7 +591,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - getSession().getQpidSession().messageAcquire(ranges); + getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE); RangeSet acquired = getSession().getQpidSession().getAccquiredMessages(); if (acquired.size() > 0) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java index ad85f33aea..8d707e4ccc 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -94,9 +94,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser // this is a queue we expect that this queue exists getSession().getQpidSession() .messageSubscribe(queue.getQueueName(), getMessageActorID(), - org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We do not acquire those messages - org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null); } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index d5342fb464..5f2e7cbda7 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -384,7 +384,7 @@ public class SessionImpl implements Session _incomingAsynchronousMessages.notifyAll(); } // close the underlaying QpidSession - _qpidSession.close(); + _qpidSession.sessionClose(); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index f20c65e467..483b5e7f21 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -35,30 +35,31 @@ import static org.apache.qpidity.Functions.*; * @author Rafael H. Schloming */ -class Channel extends Invoker implements Handler<Frame> +public class Channel extends Invoker implements Handler<Frame> { final private Connection connection; final private int channel; final private TrackSwitch<Channel> tracks; final private Delegate<Channel> delegate; - + final private SessionDelegate sessionDelegate; // session may be null private Session session; private Method method = null; private List<ByteBuffer> data = null; private int dataSize; - + public Channel(Connection connection, int channel, SessionDelegate delegate) { this.connection = connection; this.channel = channel; this.delegate = new ChannelDelegate(); - + this.sessionDelegate = delegate; + tracks = new TrackSwitch<Channel>(); tracks.map(L1, new MethodHandler<Channel> - (getMajor(), getMinor(), this.delegate)); + (getMajor(), getMinor(), connection.getConnectionDelegate())); tracks.map(L2, new MethodHandler<Channel> (getMajor(), getMinor(), this.delegate)); tracks.map(L3, new SessionResolver<Frame> diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java index 9171208a28..c387a38b17 100644 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/Connection.java @@ -36,7 +36,8 @@ import java.nio.ByteBuffer; * short instead of Short */ -class Connection implements ProtocolActions +// RA making this public until we sort out the package issues +public class Connection implements ProtocolActions { final private Handler<ByteBuffer> input; @@ -58,6 +59,11 @@ class Connection implements ProtocolActions this.delegate = delegate; } + public ConnectionDelegate getConnectionDelegate() + { + return delegate; + } + public Connection(Handler<ByteBuffer> output, ConnectionDelegate delegate) { @@ -103,6 +109,9 @@ class Connection implements ProtocolActions output.handle(header.toByteBuffer()); // XXX: how do we close the connection? } + + // not sure if this is the right place + getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8"); } public Channel getChannel(int number) diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java index 9df264561c..537a7ef586 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java @@ -20,6 +20,17 @@ */ package org.apache.qpidity; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + /** * ConnectionDelegate @@ -27,9 +38,240 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -public interface ConnectionDelegate +/** + * Currently only implemented client specific methods + * the server specific methods are dummy impls for testing + * + * the connectionClose is kind of different for both sides + */ +public abstract class ConnectionDelegate extends Delegate<Channel> { + private String _username; + private String _password; + private String _mechanism; + private String _virtualHost; + private SaslClient saslClient; + private SaslServer saslServer; + private String _locale = "utf8"; + private int maxFrame = 64*1024; + private Condition _negotiationComplete; + private Lock _negotiationCompleteLock; + + public abstract SessionDelegate getSessionDelegate(); + + public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete) + { + _negotiationComplete = negotiationComplete; + _negotiationCompleteLock = negotiationCompleteLock; + } + + // ---------------------------------------------- + // Client side + //----------------------------------------------- + @Override public void connectionStart(Channel context, ConnectionStart struct) + { + System.out.println("The broker has sent connection-start"); + + String mechanism = null; + String response = null; + try + { + mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); + saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, + SecurityHelper.createCallbackHandler(mechanism,_username,_password )); + response = new String(saslClient.evaluateChallenge(new byte[0]),_locale); + } + catch (UnsupportedEncodingException e) + { + // need error handling + } + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + } + + Map<String,?> props = new HashMap<String,String>(); + context.connectionStartOk(props, mechanism, response, _locale); + } + + @Override public void connectionSecure(Channel context, ConnectionSecure struct) + { + System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge()); + + try + { + String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); + context.connectionSecureOk(response); + } + catch (UnsupportedEncodingException e) + { + // need error handling + } + catch (SaslException e) + { + // need error handling + } + } + + @Override public void connectionTune(Channel context, ConnectionTune struct) + { + System.out.println("The broker has sent connection-tune " + struct.toString()); + + // should update the channel max given by the broker. + context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); + context.connectionOpen(_virtualHost, null, Option.INSIST); + } + + + @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) + { + String knownHosts = struct.getKnownHosts(); + System.out.println("The broker has opened the connection for use"); + System.out.println("The broker supplied the following hosts for failover " + knownHosts); + _negotiationCompleteLock.lock(); + try + { + _negotiationComplete.signalAll(); + } + finally + { + _negotiationCompleteLock.unlock(); + } + } + + public void connectionRedirect(Channel context, ConnectionRedirect struct) + { + // not going to bother at the moment + } + + // ---------------------------------------------- + // Server side + //----------------------------------------------- + @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) + { + //set the client side locale on the server side + _locale = struct.getLocale(); + _mechanism = struct.getMechanism(); + + System.out.println("The client has sent connection-start-ok"); + + //try + //{ + //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); + //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + byte[] challenge = null; + if ( challenge == null) + { + System.out.println("Authentication sucessfull"); + context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + } + else + { + System.out.println("Authentication failed"); + try + { + context.connectionSecure(new String(challenge,_locale)); + } + catch(Exception e) + { + + } + } + + + /*} + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + }*/ + } + + @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct) + { + System.out.println("The client has excepted the tune params"); + } + + @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) + { + System.out.println("The client has sent connection-secure-ok"); + try + { + saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); + byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + if ( challenge == null) + { + System.out.println("Authentication sucessfull"); + context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + } + else + { + System.out.println("Authentication failed"); + try + { + context.connectionSecure(new String(challenge,_locale)); + } + catch(Exception e) + { + + } + } + + + } + catch (SaslException e) + { + // need error handling + } + catch (QpidException e) + { + // need error handling + } + } + + + @Override public void connectionOpen(Channel context, ConnectionOpen struct) + { + String hosts = "amqp:1223243232325"; + System.out.println("The client has sent connection-open-ok"); + context.connectionOpenOk(hosts); + } + + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } - SessionDelegate getSessionDelegate(); + public String getVirtualHost() + { + return _virtualHost; + } + public void setVirtualHost(String host) + { + _virtualHost = host; + } } diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/Frame.java index d5076e0ef0..89e7579cb3 100644 --- a/java/common/src/main/java/org/apache/qpidity/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/Frame.java @@ -35,7 +35,8 @@ import static org.apache.qpidity.Functions.*; * @author Rafael H. Schloming */ -class Frame implements Iterable<ByteBuffer> +// RA: changed it to public until we sort the package issues +public class Frame implements Iterable<ByteBuffer> { public static final int HEADER_SIZE = 12; diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java index a40753ed91..f255b56d0b 100644 --- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java @@ -42,8 +42,8 @@ import org.apache.mina.transport.socket.nio.SocketConnector; * * @author Rafael H. Schloming */ - -class MinaHandler implements IoHandler +//RA making this public until we sort out the package issues +public class MinaHandler implements IoHandler { private final ConnectionDelegate delegate; @@ -124,7 +124,8 @@ class MinaHandler implements IoHandler { IoAcceptor acceptor = new SocketAcceptor(); acceptor.bind(new InetSocketAddress(host, port), - new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); + new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); + } public static final Connection connect(String host, int port, diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java index 56c73d1f00..140d5ecbe3 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java @@ -29,7 +29,9 @@ import java.nio.ByteBuffer; * @author Rafael H. Schloming */ -class ProtocolHeader +//RA making this public until we sort out the package issues + +public class ProtocolHeader { private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; diff --git a/java/common/src/main/java/org/apache/qpidity/QpidConfig.java b/java/common/src/main/java/org/apache/qpidity/QpidConfig.java new file mode 100644 index 0000000000..b5aad12f10 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/QpidConfig.java @@ -0,0 +1,90 @@ +package org.apache.qpidity; + +/** + * API to configure the Security parameters of the client. + * The user can choose to pick the config from any source + * and set it using this class. + * + */ +public class QpidConfig +{ + private static QpidConfig _instance = new QpidConfig(); + + private SecurityMechanism[] securityMechanisms = + new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpidity.security.UsernamePasswordCallbackHandler"), + new SecurityMechanism("CRAM_MD5","org.apache.qpidity.security.UsernamePasswordCallbackHandler")}; + + private SaslClientFactory[] saslClientFactories = + new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpidity.security.amqplain.AmqPlainSaslClientFactory")}; + + private QpidConfig(){} + + public static QpidConfig get() + { + return _instance; + } + + public void setSecurityMechanisms(SecurityMechanism... securityMechanisms) + { + this.securityMechanisms = securityMechanisms; + } + + public SecurityMechanism[] getSecurityMechanisms() + { + return securityMechanisms; + } + + public void setSaslClientFactories(SaslClientFactory... saslClientFactories) + { + this.saslClientFactories = saslClientFactories; + } + + public SaslClientFactory[] getSaslClientFactories() + { + return saslClientFactories; + } + + public class SecurityMechanism + { + String type; + String handler; + + SecurityMechanism(String type,String handler) + { + this.type = type; + this.handler = handler; + } + + public String getHandler() + { + return handler; + } + + public String getType() + { + return type; + } + } + + public class SaslClientFactory + { + String type; + String factoryClass; + + SaslClientFactory(String type,String factoryClass) + { + this.type = type; + this.factoryClass = factoryClass; + } + + public String getFactoryClass() + { + return factoryClass; + } + + public String getType() + { + return type; + } + } +} diff --git a/java/common/src/main/java/org/apache/qpidity/QpidException.java b/java/common/src/main/java/org/apache/qpidity/QpidException.java index 4ab99b677f..5b3671cebd 100644 --- a/java/common/src/main/java/org/apache/qpidity/QpidException.java +++ b/java/common/src/main/java/org/apache/qpidity/QpidException.java @@ -25,12 +25,9 @@ package org.apache.qpidity; public class QpidException extends Exception { /** - * This exception error code. - * <p> This error code is used for internationalisation purpose. - * <p> This error code is set from the AMQP ones. - * <TODO> So we may want to use the AMQP error code directly. + * AMQP error code */ - private String _errorCode; + private int _errorCode; /** * Constructor for a Qpid Exception. @@ -38,20 +35,27 @@ public class QpidException extends Exception * they are unknown. * @param message A description of the reason of this exception . * @param errorCode A string specifyin the error code of this exception. - * @param cause The linked Execption. + * @param cause The linked Execption. * + * */ - public QpidException(String message, String errorCode, Throwable cause) + public QpidException(String message, int errorCode, Throwable cause) { super(message, cause); _errorCode = errorCode; } + + //hack to get rid of a compile error from a generated class + public QpidException(String message, String errorCode, Throwable cause) + { + + } /** * Get this execption error code. * * @return This exception error code. */ - public String getErrorCode() + public int getErrorCode() { return _errorCode; } diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java new file mode 100644 index 0000000000..474e2f7e8f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +import org.apache.qpidity.security.AMQPCallbackHandler; +import org.apache.qpidity.security.CallbackHandlerRegistry; + +public class SecurityHelper +{ + public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException + { + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + return null; + } + + public static AMQPCallbackHandler createCallbackHandler(String mechanism, String username,String password) + throws QpidException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; + cbh.initialise(username,password); + return cbh; + } + catch (Exception e) + { + throw new QpidException("Unable to create callback handler: " + e,0, e.getCause()); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index 3b33fde2df..d3a24ffd8a 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -42,11 +42,12 @@ public class Session extends Invoker // completed incoming commands private final RangeSet processed = new RangeSet(); private Range syncPoint = null; - + // outgoing command count private long commandsOut = 0; private Map<Long,Method> commands = new HashMap<Long,Method>(); private long mark = 0; + public Map<Long,Method> getOutstandingCommands() { @@ -231,7 +232,6 @@ public class Session extends Invoker } future.set(result); } - protected <T> Future<T> invoke(Method m, Class<T> klass) { long command = commandsOut; diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 651241f63c..683008fe8a 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -190,13 +190,20 @@ class ToyBroker extends SessionDelegate { final Map<String,Queue<Message>> queues = new HashMap<String,Queue<Message>>(); - MinaHandler.accept("0.0.0.0", 5672, new ConnectionDelegate() - { - public SessionDelegate getSessionDelegate() - { - return new ToyBroker(queues); - } - }); + + ConnectionDelegate delegate = new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ToyBroker(queues); + } + }; + + //hack + delegate.setUsername("guest"); + delegate.setPassword("guest"); + + MinaHandler.accept("0.0.0.0", 5672, delegate); } } diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java index 2305027556..ccad3577f0 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -1,5 +1,7 @@ package org.apache.qpidity.api; +import java.nio.ByteBuffer; + import org.apache.qpidity.MessageProperties; import org.apache.qpidity.DeliveryProperties; @@ -43,6 +45,8 @@ public interface Message */ public void appendData(byte[] src); + public void appendData(ByteBuffer src); + /** * This will abstract the underlying message data. * The Message implementation may not hold all message diff --git a/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java index 7db5cd5e11..ac3888d5bb 100644 --- a/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java +++ b/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java @@ -56,7 +56,7 @@ public class PropertyExpression implements Expression } catch (Exception e) { - throw new QpidException("cannot evaluate property ", "message selector", e); + throw new QpidException("cannot evaluate property ", 0, e); } } return result; diff --git a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java index cd9d31b1c2..2e7afa1b87 100644 --- a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,28 +18,11 @@ * under the License. * */ -package org.apache.qpidity; +package org.apache.qpidity.security; -/** - * CommonSessionDelegate - */ +import javax.security.auth.callback.CallbackHandler; -public class CommonSessionDelegate extends Delegate<Session> +public interface AMQPCallbackHandler extends CallbackHandler { - - @Override public void sessionAttached(Session session, SessionAttached struct) {} - - @Override public void sessionFlow(Session session, SessionFlow struct) {} - - @Override public void sessionFlowOk(Session session, SessionFlowOk struct) {} - - @Override public void sessionClose(Session session, SessionClose struct) {} - - @Override public void sessionClosed(Session session, SessionClosed struct) {} - - @Override public void sessionResume(Session session, SessionResume struct) {} - - @Override public void sessionSuspend(Session session, SessionSuspend struct) {} - - @Override public void sessionDetached(Session session, SessionDetached struct) {} + void initialise(String username,String password); } diff --git a/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java b/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java new file mode 100644 index 0000000000..624b015c69 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpidity.QpidConfig; + +public class CallbackHandlerRegistry +{ + + private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry(); + + private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>(); + + private StringBuilder _mechanisms; + + public static CallbackHandlerRegistry getInstance() + { + return _instance; + } + + public Class getCallbackHandlerClass(String mechanism) + { + return _mechanismToHandlerClassMap.get(mechanism); + } + + public String getMechanisms() + { + return _mechanisms.toString(); + } + + private CallbackHandlerRegistry() + { + // first we register any Sasl client factories + DynamicSaslRegistrar.registerSaslProviders(); + registerMechanisms(); + } + + private void registerMechanisms() + { + for (QpidConfig.SecurityMechanism securityMechanism: QpidConfig.get().getSecurityMechanisms() ) + { + Class clazz = null; + try + { + clazz = Class.forName(securityMechanism.getHandler()); + if (!AMQPCallbackHandler.class.isAssignableFrom(clazz)) + { + System.out.println("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class + + ". Skipping"); + continue; + } + _mechanismToHandlerClassMap.put(securityMechanism.getType(), clazz); + if (_mechanisms == null) + { + + _mechanisms = new StringBuilder(); + _mechanisms.append(securityMechanism.getType()); + } + else + { + _mechanisms.append(" " + securityMechanism.getType()); + } + } + catch (ClassNotFoundException ex) + { + System.out.println("Unable to load class " + securityMechanism.getHandler() + ". Skipping that SASL provider"); + continue; + } + } + } +} diff --git a/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java b/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java new file mode 100644 index 0000000000..52dacc6985 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.security.Security; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.sasl.SaslClientFactory; + +import org.apache.qpidity.QpidConfig; + +public class DynamicSaslRegistrar +{ + public static void registerSaslProviders() + { + Map<String, Class> factories = registerSaslClientFactories(); + if (factories.size() > 0) + { + Security.addProvider(new JCAProvider(factories)); + System.out.println("Dynamic SASL provider added as a security provider"); + } + } + + private static Map<String, Class> registerSaslClientFactories() + { + TreeMap<String, Class> factoriesToRegister = + new TreeMap<String, Class>(); + + for (QpidConfig.SaslClientFactory factory: QpidConfig.get().getSaslClientFactories()) + { + String className = factory.getFactoryClass(); + try + { + Class clazz = Class.forName(className); + if (!(SaslClientFactory.class.isAssignableFrom(clazz))) + { + System.out.println("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); + continue; + } + factoriesToRegister.put(factory.getType(), clazz); + } + catch (Exception ex) + { + System.out.println("Error instantiating SaslClientFactory calss " + className + " - skipping"); + } + } + return factoriesToRegister; + } + + +} diff --git a/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java b/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java new file mode 100644 index 0000000000..c775171a5f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.security.Provider; +import java.security.Security; +import java.util.Map; + +public class JCAProvider extends Provider +{ + public JCAProvider(Map<String, Class> providerMap) + { + super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + + "AMQ SASL providers that want to be registered"); + register(providerMap); + Security.addProvider(this); + } + + private void register(Map<String, Class> providerMap) + { + for (Map.Entry<String, Class> me :providerMap.entrySet()) + { + put("SaslClientFactory." + me.getKey(), me.getValue().getName()); + } + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java b/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java new file mode 100644 index 0000000000..0fd647e015 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security; + +import java.io.IOException; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; + +public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler +{ + private String _username; + private String _password; + + public void initialise(String username,String password) + { + _username = username; + _password = password; + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(_username); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword((_password).toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + } +} diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java new file mode 100644 index 0000000000..6e4a0218d2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security.amqplain; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.Callback; + +/** + * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd. + * + */ +public class AmqPlainSaslClient implements SaslClient +{ + /** + * The name of this mechanism + */ + public static final String MECHANISM = "AMQPLAIN"; + + private CallbackHandler _cbh; + + public AmqPlainSaslClient(CallbackHandler cbh) + { + _cbh = cbh; + } + + public String getMechanismName() + { + return "AMQPLAIN"; + } + + public boolean hasInitialResponse() + { + return true; + } + + public byte[] evaluateChallenge(byte[] challenge) throws SaslException + { + // we do not care about the prompt or the default name + NameCallback nameCallback = new NameCallback("prompt", "defaultName"); + PasswordCallback pwdCallback = new PasswordCallback("prompt", false); + Callback[] callbacks = new Callback[]{nameCallback, pwdCallback}; + try + { + _cbh.handle(callbacks); + } + catch (Exception e) + { + throw new SaslException("Error handling SASL callbacks: " + e, e); + } + FieldTable table = FieldTableFactory.newFieldTable(); + table.setString("LOGIN", nameCallback.getName()); + table.setString("PASSWORD", new String(pwdCallback.getPassword())); + return table.getDataAsBytes(); + } + + public boolean isComplete() + { + return true; + } + + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public Object getNegotiatedProperty(String propName) + { + return null; + } + + public void dispose() throws SaslException + { + _cbh = null; + } +} diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java new file mode 100644 index 0000000000..abc881f433 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.security.amqplain; + +import javax.security.sasl.SaslClientFactory; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.Sasl; +import javax.security.auth.callback.CallbackHandler; +import java.util.Map; + +public class AmqPlainSaslClientFactory implements SaslClientFactory +{ + public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException + { + for (int i = 0; i < mechanisms.length; i++) + { + if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM)) + { + if (cbh == null) + { + throw new SaslException("CallbackHandler must not be null"); + } + return new AmqPlainSaslClient(cbh); + } + } + return null; + } + + public String[] getMechanismNames(Map props) + { + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE)) + { + // returned array must be non null according to interface documentation + return new String[0]; + } + else + { + return new String[]{AmqPlainSaslClient.MECHANISM}; + } + } +} |
