summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-09 07:24:02 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-09 07:24:02 +0000
commitc1e0112bc308edb406ac8aba3c8e47fbf710752c (patch)
tree67a5504ea691cf0f68c34ee508476917d18adfdf /java
parentce1474162e775b693383ed55ebb6dacf562baaa5 (diff)
downloadqpid-python-c1e0112bc308edb406ac8aba3c8e47fbf710752c.tar.gz
implemented the Connection and Session API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564124 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Connection.java13
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java60
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/Client.java112
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java229
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java98
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java45
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java98
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Channel.java11
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Connection.java11
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java246
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Frame.java3
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MinaHandler.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/QpidConfig.java90
-rw-r--r--java/common/src/main/java/org/apache/qpidity/QpidException.java20
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SecurityHelper.java71
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java21
-rw-r--r--java/common/src/main/java/org/apache/qpidity/api/Message.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java (renamed from java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java)29
-rw-r--r--java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java92
-rw-r--r--java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java70
-rw-r--r--java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java44
-rw-r--r--java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java60
-rw-r--r--java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java105
-rw-r--r--java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java62
32 files changed, 1336 insertions, 305 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Connection.java b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
index 3da3a12aa5..2ea6db8943 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Connection.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Connection.java
@@ -28,6 +28,17 @@ import org.apache.qpidity.QpidException;
*/
public interface Connection
{
+ /**
+ * Establish the connection using the given parameters
+ *
+ * @param host
+ * @param port
+ * @param username
+ * @param password
+ * @throws QpidException
+ */
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
+
/**
* Establish the connection with the broker identified by the provided URL.
*
@@ -52,7 +63,7 @@ public interface Connection
* @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
* @return A Newly created (suspended) session.
*/
- public Session createSession(int expiryInSeconds);
+ public Session createSession(long expiryInSeconds);
/**
* Create a DtxSession for this connection.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
index 633c77ecac..def7b5ca41 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
@@ -17,7 +17,9 @@
*/
package org.apache.qpidity.client;
-import org.apache.qpidity.Header;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.Struct;
/**
* Assembles message parts.
@@ -31,21 +33,21 @@ import org.apache.qpidity.Header;
* are transferred.
*/
public interface MessagePartListener
-{
+{
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
*
* @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
*/
- public void messageHeaders(Header... headers);
+ public void messageHeaders(Struct... headers);
/**
* Add the following byte array to the content of the message being received
*
* @param data Data to be added or streamed.
*/
- public void addData(byte[] data);
+ public void addData(ByteBuffer src);
/**
* Indicates that the message has been fully received.
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 0178ef6d3b..84de268232 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -18,12 +18,14 @@
*/
package org.apache.qpidity.client;
+import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.UUID;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.api.Message;
/**
* <p>A session is associated with a connection.
@@ -32,10 +34,12 @@ import org.apache.qpidity.RangeSet;
*/
public interface Session
{
- public static final short ACQUIRE_MODE_NO_ACQUIRE = 0;
- public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1;
- public static final short CONFIRM_MODE_REQUIRED = 1;
- public static final short CONFIRM_MODE_NOT_REQUIRED = 0;
+ public static final short ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
+ public static final short ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1;
+ public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
+ public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
@@ -54,21 +58,21 @@ public interface Session
/**
* Close this session and any associated resources.
*/
- public void close();
+ public void sessionClose();
/**
* Suspend this session resulting in interrupting the traffic with the broker.
* <p> The session timer will start to tick in suspend.
* <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
*/
- public void suspend();
+ public void sessionSuspend();
/**
* This will resume an existing session
* <p> Upon resume the session is attached with an underlying channel
* hence making operation on this session available.
*/
- public void resume();
+ public void sessionResume(UUID sessionId);
//------------------------------------------------------
// Messaging methods
@@ -92,7 +96,7 @@ public interface Session
* @param exchange The exchange the message is being sent.
* @param msg The Message to be sent
*/
- public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode);
/**
* Declare the beginning of a message transfer operation. This operation must
@@ -117,27 +121,39 @@ public interface Session
* </ul>
* @param exchange The exchange the message is being sent.
*/
- public void messageTransfer(String exchange, short confirmMode, short acquireMode);
+ public void messageTransfer(String destination, short confirmMode, short acquireMode);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or to the message being sent.
*
- * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ * @param headers are Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
* @see org.apache.qpidity.DeliveryProperties
*/
- public void addMessageHeaders(Header... headers);
+ public void headers(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
*
* @param data Data to be added.
- * @param off Offset from which to start reading data
- * @param len Number of bytes to be read
*/
- public void addData(byte[] data, int off, int len);
+ public void data(byte[] data);
+
+ /**
+ * Add the following ByteBuffer to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(ByteBuffer buf);
/**
+ * Add the following String to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(String str);
+
+ /**
* Signals the end of data for the message.
*/
public void endData();
@@ -258,8 +274,6 @@ public interface Session
* @param destination The destination to call flush on.
*/
public void messageFlush(String destination);
-
- public int getNoOfUnAckedMessages();
/**
* On receipt of this method, the brokers MUST set his credit to zero for the given
@@ -286,8 +300,12 @@ public interface Session
* and may be either discarded or moved to the broker dead letter queue.
*
* @param ranges Range of rejected messages.
+ * @param code TODO
+ * @param text TODO
*/
- public void messageReject(RangeSet ranges);
+ public void messageReject(RangeSet ranges, int code, String text);
+
+ public RangeSet getRejectedMessages();
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -296,10 +314,10 @@ public interface Session
* message acquisition can fail.
* The outcome of the acquisition is returned as an array of ranges of qcquired messages.
* <p> This method should only be called on non-acquired messages.
- *
+ * @param mode TODO
* @param range Ranges of messages to be acquired.
*/
- public void messageAcquire(RangeSet ranges);
+ public void messageAcquire(RangeSet ranges, short mode);
public RangeSet getAccquiredMessages();
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/Client.java b/java/client/src/main/java/org/apache/qpidity/impl/Client.java
new file mode 100644
index 0000000000..13acff1ea6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/impl/Client.java
@@ -0,0 +1,112 @@
+package org.apache.qpidity.impl;
+
+import java.net.URL;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpidity.Channel;
+import org.apache.qpidity.Connection;
+import org.apache.qpidity.ConnectionClose;
+import org.apache.qpidity.ConnectionDelegate;
+import org.apache.qpidity.MinaHandler;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.client.DtxSession;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.Session;
+
+
+public class Client implements org.apache.qpidity.client.Connection
+{
+ private AtomicInteger _channelNo = new AtomicInteger();
+ private Connection _conn;
+ private ExceptionListener _exceptionListner;
+ private final Lock _lock = new ReentrantLock();
+
+ public static org.apache.qpidity.client.Connection createConnection()
+ {
+ return new Client();
+ }
+
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
+ {
+ Condition negotiationComplete = _lock.newCondition();
+ _lock.lock();
+
+ ConnectionDelegate connectionDelegate = new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ClientSessionDelegate();
+ }
+
+ @Override public void connectionClose(Channel context, ConnectionClose struct)
+ {
+ _exceptionListner.onException(new QpidException("Server closed the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null));
+ }
+ };
+
+ connectionDelegate.setCondition(_lock,negotiationComplete);
+ connectionDelegate.setUsername(username);
+ connectionDelegate.setPassword(password);
+ connectionDelegate.setVirtualHost(virtualHost);
+
+ _conn = MinaHandler.connect(host, port,connectionDelegate);
+
+ _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer());
+
+ try
+ {
+ negotiationComplete.await();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /*
+ * Until the dust settles with the URL disucssion
+ * I am not going to implement this.
+ */
+ public void connect(URL url) throws QpidException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws QpidException
+ {
+ Channel ch = _conn.getChannel(0);
+ ch.connectionClose(0, "client is closing", 0, 0);
+ //need to close the connection underneath as well
+ }
+
+ public Session createSession(long expiryInSeconds)
+ {
+ Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ ClientSession ssn = new ClientSession();
+ ssn.attach(ch);
+ ssn.sessionOpen(expiryInSeconds);
+
+ return ssn;
+ }
+
+ public DtxSession createDTXSession(int expiryInSeconds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
+ {
+ _exceptionListner = exceptionListner;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
index a0111a86f4..627829556c 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
@@ -1,216 +1,101 @@
package org.apache.qpidity.impl;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.MessagePartListener;
-import org.apache.qpidity.*;
/**
* Implements a Qpid Sesion.
*/
-public class ClientSession implements org.apache.qpidity.client.Session
+public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session
{
-
- Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>();
-
- public void addData(byte[] data, int off, int len)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void addMessageHeaders(Header... headers)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void close()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void endData()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void exchangeDelete(String exchangeName, Option... options)
+ private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
+ private ExceptionListener _exceptionListner;
+ private RangeSet _acquiredMessages;
+ private RangeSet _rejectedMessages;
+ private Map<String,List<RangeSet>> _unackedMessages = new HashMap<String,List<RangeSet>>();
+
+ @Override public void sessionClose()
{
- // TODO Auto-generated method stub
-
+ // release all unacked messages and then issues a close
+ super.sessionClose();
}
-
+
public void messageAcknowledge(RangeSet ranges)
{
- // TODO Auto-generated method stub
-
- }
-
- public void messageAcquire(RangeSet ranges)
- {
- // TODO Auto-generated method stub
- }
-
- public void messageCancel(String destination)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageFlow(String destination, short unit, long value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageFlowMode(String destination, short mode)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageFlush(String destination)
- {
- // TODO Auto-generated method stub
- }
-
- public void messageReject(RangeSet ranges)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageRelease(RangeSet ranges)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageStop(String destination)
- {
- // TODO Auto-generated method stub
-
+ for (Range range : ranges)
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("Acknowleding message for : " + super.getCommand((int) l));
+ super.processed(l);
+ }
+ }
}
public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options)
{
- // TODO Auto-generated method stub
-
+ setMessageListener(destination,listener);
+ super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);
}
public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode)
{
- // TODO Auto-generated method stub
-
+ // need to break it down into small pieces
+ super.messageTransfer(exchange, confirmMode, acquireMode);
+ super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ // super.data(bytes); *
+ // super.endData()
}
-
- public void messageTransfer(String exchange, short confirmMode, short acquireMode)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueDelete(String queueName, Option... options)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queuePurge(String queueName)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void resume()
+
+
+ public RangeSet getAccquiredMessages()
{
- // TODO Auto-generated method stub
-
+ return _acquiredMessages;
}
- public void setExceptionListener(ExceptionListener exceptionListner)
+ public RangeSet getRejectedMessages()
{
- // TODO Auto-generated method stub
-
+ return _rejectedMessages;
}
-
+
public void setMessageListener(String destination, MessagePartListener listener)
{
- // TODO Auto-generated method stub
-
+ _messageListeners.put(destination, listener);
}
-
- public void suspend()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void sync()
- {
- // TODO Auto-generated method stub
-
- }
-
- public void txCommit() throws IllegalStateException
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
{
- // TODO Auto-generated method stub
-
- }
-
- public void txRollback() throws IllegalStateException
+ _exceptionListner = exceptionListner;
+ }
+
+ // ugly but nessacery
+
+ void setAccquiredMessages(RangeSet acquiredMessages)
{
- // TODO Auto-generated method stub
-
+ _acquiredMessages = acquiredMessages;
}
-
- public void txSelect()
+
+ void setRejectedMessages(RangeSet rejectedMessages)
{
- // TODO Auto-generated method stub
-
+ _rejectedMessages = rejectedMessages;
}
-
- public RangeSet getAccquiredMessages()
+
+ void notifyException(QpidException ex)
{
- // TODO Auto-generated method stub
- return null;
+ _exceptionListner.onException(ex);
}
-
- public int getNoOfUnAckedMessages()
+
+ Map<String,MessagePartListener> getMessageListerners()
{
- // TODO Auto-generated method stub
- return 0;
+ return _messageListeners;
}
-
-
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
index ea9e4c067b..70d1019a06 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java
@@ -1,45 +1,79 @@
package org.apache.qpidity.impl;
-import org.apache.qpidity.CommonSessionDelegate;
-import org.apache.qpidity.client.Session;
+import java.nio.ByteBuffer;
+import org.apache.qpidity.Frame;
+import org.apache.qpidity.MessageAcquired;
+import org.apache.qpidity.MessageReject;
+import org.apache.qpidity.MessageTransfer;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Session;
+import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.client.MessagePartListener;
-public class ClientSessionDelegate extends CommonSessionDelegate
-{
-
- /*@Override public void messageTransfer(Session context, MessageTransfer struct)
- {
- MessagePartListener l = context.messagListeners.get(struct.getDestination());
- l.messageTransfer(struct.getDestination(),new Option[0]);
- }*/
-
- // ---------------------------------------------------------------
- // Non generated methods - but would like if they are also generated.
- // These methods should be called from Body and Header Handlers.
- // If these methods are generated as part of the delegate then
- // I can call these methods from the BodyHandler and HeaderHandler
- // in a generic way
- // I have used destination to indicate my intent of receiving
- // some form of correlation to know which consumer this data belongs to.
- // It can be anything as long as I can make the right correlation
- // ----------------------------------------------------------------
- /* public void data(Session context,String destination,byte[] src) throws QpidException
+public class ClientSessionDelegate extends SessionDelegate
+{
+ private MessageTransfer _currentTransfer;
+ private MessagePartListener _currentMessageListener;
+
+ @Override public void data(Session ssn, Frame frame)
{
- MessagePartListener l = context.messagListeners.get(destination);
- l.data(src);
+ for (ByteBuffer b : frame)
+ {
+ _currentMessageListener.addData(b);
+ }
+ if (frame.isLastSegment() && frame.isLastFrame())
+ {
+ _currentMessageListener.messageReceived();
+ }
+
}
- public void endData(Session context,String destination) throws QpidException
+ @Override public void headers(Session ssn, Struct... headers)
{
- MessagePartListener l = context.messagListeners.get(destination);
- l.endData();
+ _currentMessageListener.messageHeaders(headers);
}
- public void messageHeaders(Session context,String destination,Header... headers) throws QpidException
- {
- MessagePartListener l = context.messagListeners.get(destination);
- l.endData();
- }*/
+ @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
+ {
+ _currentTransfer = currentTransfer;
+ _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+
+ //a better way is to tell the broker to stop the transfer
+ if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1)
+ {
+ RangeSet transfers = new RangeSet();
+ transfers.add(_currentTransfer.getId());
+ session.messageRelease(transfers);
+ }
+ }
+
+ // --------------------------------------------
+ // Message methods
+ // --------------------------------------------
+
+
+ @Override public void messageReject(Session session, MessageReject struct)
+ {
+ for (Range range : struct.getTransfers())
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("message rejected: " +
+ session.getCommand((int) l));
+ }
+ }
+ ((ClientSession)session).setRejectedMessages(struct.getTransfers());
+ ((ClientSession)session).notifyException(new QpidException("Message Rejected",0,null));
+ }
+
+ @Override public void messageAcquired(Session session, MessageAcquired struct)
+ {
+ ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
new file mode 100644
index 0000000000..d325054bee
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/impl/DemoClient.java
@@ -0,0 +1,45 @@
+package org.apache.qpidity.impl;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.Connection;
+
+public class DemoClient
+{
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("Queue1", null, null);
+ ssn.sync();
+
+ ssn.messageTransfer("Queue1", (short) 0, (short) 1);
+ ssn.headers(new DeliveryProperties(),
+ new MessageProperties());
+ ssn.data("this is the data");
+ ssn.endData();
+
+ ssn.messageTransfer("Queue2", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.endData();
+ ssn.sync();
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
index f05f2c0e76..5d3f6a6e3e 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java
@@ -1,33 +1,109 @@
package org.apache.qpidity.impl;
-import org.apache.qpidity.Header;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.Struct;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.MessagePartListener;
+/**
+ *
+ * Will call onMessage method as soon as data is avialable
+ * The client can then start to process the data while
+ * the rest of the data is read.
+ *
+ */
public class MessagePartListenerAdapter implements MessagePartListener
{
MessageListener _adaptee;
Message _currentMsg;
+ DeliveryProperties _currentDeliveryProps;
+ MessageProperties _currentMessageProps;
public MessagePartListenerAdapter(MessageListener listener)
{
_adaptee = listener;
- _currentMsg = null;
- }
+
+ // temp solution.
+ _currentMsg = new Message()
+ {
+ Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+ ByteBuffer _readBuffer;
+ private int dataSize;
+
+ public void appendData(byte[] src)
+ {
+ appendData(ByteBuffer.wrap(src));
+ }
- public void addData(byte[] src)
- {
- _currentMsg.appendData(src);
- }
+ public void appendData(ByteBuffer src)
+ {
+ _data.offer(src);
+ dataSize += src.remaining();
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _currentDeliveryProps;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _currentMessageProps;
+ }
- public void messageHeaders(Header... headers)
+ // since we provide the message only after completion
+ // we can assume that when this method is called we have
+ // received all data.
+ public void readData(byte[] target)
+ {
+ if (_readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ _readBuffer.get(target);
+ }
+
+ private void buildReadBuffer()
+ {
+ _readBuffer = ByteBuffer.allocate(dataSize);
+ for(ByteBuffer buf:_data)
+ {
+ _readBuffer.put(buf);
+ }
+ }
+ };
+ }
+
+ public void addData(ByteBuffer src)
+ {
+ _currentMsg.appendData(src);
+ }
+
+ public void messageHeaders(Struct... headers)
{
- //_currentMsg add the headers
+ for(Struct struct: headers)
+ {
+ if(struct instanceof DeliveryProperties)
+ {
+ _currentDeliveryProps = (DeliveryProperties)struct;
+ }
+ else if (struct instanceof MessageProperties)
+ {
+ _currentMessageProps = (MessageProperties)struct;
+ }
+ }
}
-
+
public void messageReceived()
{
- _adaptee.onMessage(_currentMsg);
+ _adaptee.onMessage(_currentMsg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
index 1dc35b5609..98767106ab 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
@@ -34,7 +34,7 @@ public class ExceptionHelper
{
if (exception instanceof QpidException)
{
- jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode());
+ jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode()));
}
else
{
@@ -51,7 +51,7 @@ public class ExceptionHelper
static public XAException convertQpidExceptionToXAException(QpidException exception)
{
- String qpidErrorCode = exception.getErrorCode();
+ String qpidErrorCode = String.valueOf(exception.getErrorCode());
// todo map this error to an XA code
int xaCode = XAException.XAER_PROTO;
return new XAException(xaCode);
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index b9922de296..3869c2a793 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -131,9 +131,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// this is a queue we expect that this queue exists
getSession().getQpidSession()
.messageSubscribe(destination.getName(), getMessageActorID(),
- org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// When the message selctor is set we do not acquire the messages
- _messageSelector != null ? org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE,
+ _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION);
if (_messageSelector != null)
{
@@ -167,9 +167,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
- org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// We always acquire the messages
- org.apache.qpidity.client.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
_noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
// Request exclusive subscription access, meaning only this subscription
// can access the queue.
@@ -591,7 +591,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
- getSession().getQpidSession().messageAcquire(ranges);
+ getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE);
RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
if (acquired.size() > 0)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
index ad85f33aea..8d707e4ccc 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
@@ -94,9 +94,9 @@ public class QueueBrowserImpl extends MessageActor implements QueueBrowser
// this is a queue we expect that this queue exists
getSession().getQpidSession()
.messageSubscribe(queue.getQueueName(), getMessageActorID(),
- org.apache.qpidity.client.Session.CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// We do not acquire those messages
- org.apache.qpidity.client.Session.ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, messageAssembler, null);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index d5342fb464..5f2e7cbda7 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -384,7 +384,7 @@ public class SessionImpl implements Session
_incomingAsynchronousMessages.notifyAll();
}
// close the underlaying QpidSession
- _qpidSession.close();
+ _qpidSession.sessionClose();
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java
index f20c65e467..483b5e7f21 100644
--- a/java/common/src/main/java/org/apache/qpidity/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/Channel.java
@@ -35,30 +35,31 @@ import static org.apache.qpidity.Functions.*;
* @author Rafael H. Schloming
*/
-class Channel extends Invoker implements Handler<Frame>
+public class Channel extends Invoker implements Handler<Frame>
{
final private Connection connection;
final private int channel;
final private TrackSwitch<Channel> tracks;
final private Delegate<Channel> delegate;
-
+ final private SessionDelegate sessionDelegate;
// session may be null
private Session session;
private Method method = null;
private List<ByteBuffer> data = null;
private int dataSize;
-
+
public Channel(Connection connection, int channel, SessionDelegate delegate)
{
this.connection = connection;
this.channel = channel;
this.delegate = new ChannelDelegate();
-
+ this.sessionDelegate = delegate;
+
tracks = new TrackSwitch<Channel>();
tracks.map(L1, new MethodHandler<Channel>
- (getMajor(), getMinor(), this.delegate));
+ (getMajor(), getMinor(), connection.getConnectionDelegate()));
tracks.map(L2, new MethodHandler<Channel>
(getMajor(), getMinor(), this.delegate));
tracks.map(L3, new SessionResolver<Frame>
diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java
index 9171208a28..c387a38b17 100644
--- a/java/common/src/main/java/org/apache/qpidity/Connection.java
+++ b/java/common/src/main/java/org/apache/qpidity/Connection.java
@@ -36,7 +36,8 @@ import java.nio.ByteBuffer;
* short instead of Short
*/
-class Connection implements ProtocolActions
+// RA making this public until we sort out the package issues
+public class Connection implements ProtocolActions
{
final private Handler<ByteBuffer> input;
@@ -58,6 +59,11 @@ class Connection implements ProtocolActions
this.delegate = delegate;
}
+ public ConnectionDelegate getConnectionDelegate()
+ {
+ return delegate;
+ }
+
public Connection(Handler<ByteBuffer> output,
ConnectionDelegate delegate)
{
@@ -103,6 +109,9 @@ class Connection implements ProtocolActions
output.handle(header.toByteBuffer());
// XXX: how do we close the connection?
}
+
+ // not sure if this is the right place
+ getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8");
}
public Channel getChannel(int number)
diff --git a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
index 9df264561c..537a7ef586 100644
--- a/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
@@ -20,6 +20,17 @@
*/
package org.apache.qpidity;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
/**
* ConnectionDelegate
@@ -27,9 +38,240 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-public interface ConnectionDelegate
+/**
+ * Currently only implemented client specific methods
+ * the server specific methods are dummy impls for testing
+ *
+ * the connectionClose is kind of different for both sides
+ */
+public abstract class ConnectionDelegate extends Delegate<Channel>
{
+ private String _username;
+ private String _password;
+ private String _mechanism;
+ private String _virtualHost;
+ private SaslClient saslClient;
+ private SaslServer saslServer;
+ private String _locale = "utf8";
+ private int maxFrame = 64*1024;
+ private Condition _negotiationComplete;
+ private Lock _negotiationCompleteLock;
+
+ public abstract SessionDelegate getSessionDelegate();
+
+ public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
+ {
+ _negotiationComplete = negotiationComplete;
+ _negotiationCompleteLock = negotiationCompleteLock;
+ }
+
+ // ----------------------------------------------
+ // Client side
+ //-----------------------------------------------
+ @Override public void connectionStart(Channel context, ConnectionStart struct)
+ {
+ System.out.println("The broker has sent connection-start");
+
+ String mechanism = null;
+ String response = null;
+ try
+ {
+ mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
+ saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
+ SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
+ response = new String(saslClient.evaluateChallenge(new byte[0]),_locale);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // need error handling
+ }
+ catch (SaslException e)
+ {
+ // need error handling
+ }
+ catch (QpidException e)
+ {
+ // need error handling
+ }
+
+ Map<String,?> props = new HashMap<String,String>();
+ context.connectionStartOk(props, mechanism, response, _locale);
+ }
+
+ @Override public void connectionSecure(Channel context, ConnectionSecure struct)
+ {
+ System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge());
+
+ try
+ {
+ String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale);
+ context.connectionSecureOk(response);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // need error handling
+ }
+ catch (SaslException e)
+ {
+ // need error handling
+ }
+ }
+
+ @Override public void connectionTune(Channel context, ConnectionTune struct)
+ {
+ System.out.println("The broker has sent connection-tune " + struct.toString());
+
+ // should update the channel max given by the broker.
+ context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
+ context.connectionOpen(_virtualHost, null, Option.INSIST);
+ }
+
+
+ @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
+ {
+ String knownHosts = struct.getKnownHosts();
+ System.out.println("The broker has opened the connection for use");
+ System.out.println("The broker supplied the following hosts for failover " + knownHosts);
+ _negotiationCompleteLock.lock();
+ try
+ {
+ _negotiationComplete.signalAll();
+ }
+ finally
+ {
+ _negotiationCompleteLock.unlock();
+ }
+ }
+
+ public void connectionRedirect(Channel context, ConnectionRedirect struct)
+ {
+ // not going to bother at the moment
+ }
+
+ // ----------------------------------------------
+ // Server side
+ //-----------------------------------------------
+ @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
+ {
+ //set the client side locale on the server side
+ _locale = struct.getLocale();
+ _mechanism = struct.getMechanism();
+
+ System.out.println("The client has sent connection-start-ok");
+
+ //try
+ //{
+ //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
+ //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
+ byte[] challenge = null;
+ if ( challenge == null)
+ {
+ System.out.println("Authentication sucessfull");
+ context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+ }
+ else
+ {
+ System.out.println("Authentication failed");
+ try
+ {
+ context.connectionSecure(new String(challenge,_locale));
+ }
+ catch(Exception e)
+ {
+
+ }
+ }
+
+
+ /*}
+ catch (SaslException e)
+ {
+ // need error handling
+ }
+ catch (QpidException e)
+ {
+ // need error handling
+ }*/
+ }
+
+ @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct)
+ {
+ System.out.println("The client has excepted the tune params");
+ }
+
+ @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
+ {
+ System.out.println("The client has sent connection-secure-ok");
+ try
+ {
+ saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
+ byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
+ if ( challenge == null)
+ {
+ System.out.println("Authentication sucessfull");
+ context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+ }
+ else
+ {
+ System.out.println("Authentication failed");
+ try
+ {
+ context.connectionSecure(new String(challenge,_locale));
+ }
+ catch(Exception e)
+ {
+
+ }
+ }
+
+
+ }
+ catch (SaslException e)
+ {
+ // need error handling
+ }
+ catch (QpidException e)
+ {
+ // need error handling
+ }
+ }
+
+
+ @Override public void connectionOpen(Channel context, ConnectionOpen struct)
+ {
+ String hosts = "amqp:1223243232325";
+ System.out.println("The client has sent connection-open-ok");
+ context.connectionOpenOk(hosts);
+ }
+
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public void setUsername(String username)
+ {
+ _username = username;
+ }
- SessionDelegate getSessionDelegate();
+ public String getVirtualHost()
+ {
+ return _virtualHost;
+ }
+ public void setVirtualHost(String host)
+ {
+ _virtualHost = host;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/Frame.java
index d5076e0ef0..89e7579cb3 100644
--- a/java/common/src/main/java/org/apache/qpidity/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/Frame.java
@@ -35,7 +35,8 @@ import static org.apache.qpidity.Functions.*;
* @author Rafael H. Schloming
*/
-class Frame implements Iterable<ByteBuffer>
+// RA: changed it to public until we sort the package issues
+public class Frame implements Iterable<ByteBuffer>
{
public static final int HEADER_SIZE = 12;
diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
index a40753ed91..f255b56d0b 100644
--- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
@@ -42,8 +42,8 @@ import org.apache.mina.transport.socket.nio.SocketConnector;
*
* @author Rafael H. Schloming
*/
-
-class MinaHandler implements IoHandler
+//RA making this public until we sort out the package issues
+public class MinaHandler implements IoHandler
{
private final ConnectionDelegate delegate;
@@ -124,7 +124,8 @@ class MinaHandler implements IoHandler
{
IoAcceptor acceptor = new SocketAcceptor();
acceptor.bind(new InetSocketAddress(host, port),
- new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
+ new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
+
}
public static final Connection connect(String host, int port,
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
index 56c73d1f00..140d5ecbe3 100644
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
@@ -29,7 +29,9 @@ import java.nio.ByteBuffer;
* @author Rafael H. Schloming
*/
-class ProtocolHeader
+//RA making this public until we sort out the package issues
+
+public class ProtocolHeader
{
private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
diff --git a/java/common/src/main/java/org/apache/qpidity/QpidConfig.java b/java/common/src/main/java/org/apache/qpidity/QpidConfig.java
new file mode 100644
index 0000000000..b5aad12f10
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/QpidConfig.java
@@ -0,0 +1,90 @@
+package org.apache.qpidity;
+
+/**
+ * API to configure the Security parameters of the client.
+ * The user can choose to pick the config from any source
+ * and set it using this class.
+ *
+ */
+public class QpidConfig
+{
+ private static QpidConfig _instance = new QpidConfig();
+
+ private SecurityMechanism[] securityMechanisms =
+ new SecurityMechanism[]{new SecurityMechanism("PLAIN","org.apache.qpidity.security.UsernamePasswordCallbackHandler"),
+ new SecurityMechanism("CRAM_MD5","org.apache.qpidity.security.UsernamePasswordCallbackHandler")};
+
+ private SaslClientFactory[] saslClientFactories =
+ new SaslClientFactory[]{new SaslClientFactory("AMQPLAIN","org.apache.qpidity.security.amqplain.AmqPlainSaslClientFactory")};
+
+ private QpidConfig(){}
+
+ public static QpidConfig get()
+ {
+ return _instance;
+ }
+
+ public void setSecurityMechanisms(SecurityMechanism... securityMechanisms)
+ {
+ this.securityMechanisms = securityMechanisms;
+ }
+
+ public SecurityMechanism[] getSecurityMechanisms()
+ {
+ return securityMechanisms;
+ }
+
+ public void setSaslClientFactories(SaslClientFactory... saslClientFactories)
+ {
+ this.saslClientFactories = saslClientFactories;
+ }
+
+ public SaslClientFactory[] getSaslClientFactories()
+ {
+ return saslClientFactories;
+ }
+
+ public class SecurityMechanism
+ {
+ String type;
+ String handler;
+
+ SecurityMechanism(String type,String handler)
+ {
+ this.type = type;
+ this.handler = handler;
+ }
+
+ public String getHandler()
+ {
+ return handler;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+
+ public class SaslClientFactory
+ {
+ String type;
+ String factoryClass;
+
+ SaslClientFactory(String type,String factoryClass)
+ {
+ this.type = type;
+ this.factoryClass = factoryClass;
+ }
+
+ public String getFactoryClass()
+ {
+ return factoryClass;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/QpidException.java b/java/common/src/main/java/org/apache/qpidity/QpidException.java
index 4ab99b677f..5b3671cebd 100644
--- a/java/common/src/main/java/org/apache/qpidity/QpidException.java
+++ b/java/common/src/main/java/org/apache/qpidity/QpidException.java
@@ -25,12 +25,9 @@ package org.apache.qpidity;
public class QpidException extends Exception
{
/**
- * This exception error code.
- * <p> This error code is used for internationalisation purpose.
- * <p> This error code is set from the AMQP ones.
- * <TODO> So we may want to use the AMQP error code directly.
+ * AMQP error code
*/
- private String _errorCode;
+ private int _errorCode;
/**
* Constructor for a Qpid Exception.
@@ -38,20 +35,27 @@ public class QpidException extends Exception
* they are unknown.
* @param message A description of the reason of this exception .
* @param errorCode A string specifyin the error code of this exception.
- * @param cause The linked Execption.
+ * @param cause The linked Execption. *
+ *
*/
- public QpidException(String message, String errorCode, Throwable cause)
+ public QpidException(String message, int errorCode, Throwable cause)
{
super(message, cause);
_errorCode = errorCode;
}
+
+ //hack to get rid of a compile error from a generated class
+ public QpidException(String message, String errorCode, Throwable cause)
+ {
+
+ }
/**
* Get this execption error code.
*
* @return This exception error code.
*/
- public String getErrorCode()
+ public int getErrorCode()
{
return _errorCode;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
new file mode 100644
index 0000000000..474e2f7e8f
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+
+import org.apache.qpidity.security.AMQPCallbackHandler;
+import org.apache.qpidity.security.CallbackHandlerRegistry;
+
+public class SecurityHelper
+{
+ public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException
+ {
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ HashSet mechanismSet = new HashSet();
+ while (tokenizer.hasMoreTokens())
+ {
+ mechanismSet.add(tokenizer.nextToken());
+ }
+
+ String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
+ StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
+ while (prefTokenizer.hasMoreTokens())
+ {
+ String mech = prefTokenizer.nextToken();
+ if (mechanismSet.contains(mech))
+ {
+ return mech;
+ }
+ }
+ return null;
+ }
+
+ public static AMQPCallbackHandler createCallbackHandler(String mechanism, String username,String password)
+ throws QpidException
+ {
+ Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
+ try
+ {
+ Object instance = mechanismClass.newInstance();
+ AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance;
+ cbh.initialise(username,password);
+ return cbh;
+ }
+ catch (Exception e)
+ {
+ throw new QpidException("Unable to create callback handler: " + e,0, e.getCause());
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index 3b33fde2df..d3a24ffd8a 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -42,11 +42,12 @@ public class Session extends Invoker
// completed incoming commands
private final RangeSet processed = new RangeSet();
private Range syncPoint = null;
-
+
// outgoing command count
private long commandsOut = 0;
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
+
public Map<Long,Method> getOutstandingCommands()
{
@@ -231,7 +232,6 @@ public class Session extends Invoker
}
future.set(result);
}
-
protected <T> Future<T> invoke(Method m, Class<T> klass)
{
long command = commandsOut;
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 651241f63c..683008fe8a 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -190,13 +190,20 @@ class ToyBroker extends SessionDelegate
{
final Map<String,Queue<Message>> queues =
new HashMap<String,Queue<Message>>();
- MinaHandler.accept("0.0.0.0", 5672, new ConnectionDelegate()
- {
- public SessionDelegate getSessionDelegate()
- {
- return new ToyBroker(queues);
- }
- });
+
+ ConnectionDelegate delegate = new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ToyBroker(queues);
+ }
+ };
+
+ //hack
+ delegate.setUsername("guest");
+ delegate.setPassword("guest");
+
+ MinaHandler.accept("0.0.0.0", 5672, delegate);
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java
index 2305027556..ccad3577f0 100644
--- a/java/common/src/main/java/org/apache/qpidity/api/Message.java
+++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java
@@ -1,5 +1,7 @@
package org.apache.qpidity.api;
+import java.nio.ByteBuffer;
+
import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.DeliveryProperties;
@@ -43,6 +45,8 @@ public interface Message
*/
public void appendData(byte[] src);
+ public void appendData(ByteBuffer src);
+
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
diff --git a/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java b/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java
index 7db5cd5e11..ac3888d5bb 100644
--- a/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java
+++ b/java/common/src/main/java/org/apache/qpidity/filter/PropertyExpression.java
@@ -56,7 +56,7 @@ public class PropertyExpression implements Expression
}
catch (Exception e)
{
- throw new QpidException("cannot evaluate property ", "message selector", e);
+ throw new QpidException("cannot evaluate property ", 0, e);
}
}
return result;
diff --git a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java
index cd9d31b1c2..2e7afa1b87 100644
--- a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/security/AMQPCallbackHandler.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,28 +18,11 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.security;
-/**
- * CommonSessionDelegate
- */
+import javax.security.auth.callback.CallbackHandler;
-public class CommonSessionDelegate extends Delegate<Session>
+public interface AMQPCallbackHandler extends CallbackHandler
{
-
- @Override public void sessionAttached(Session session, SessionAttached struct) {}
-
- @Override public void sessionFlow(Session session, SessionFlow struct) {}
-
- @Override public void sessionFlowOk(Session session, SessionFlowOk struct) {}
-
- @Override public void sessionClose(Session session, SessionClose struct) {}
-
- @Override public void sessionClosed(Session session, SessionClosed struct) {}
-
- @Override public void sessionResume(Session session, SessionResume struct) {}
-
- @Override public void sessionSuspend(Session session, SessionSuspend struct) {}
-
- @Override public void sessionDetached(Session session, SessionDetached struct) {}
+ void initialise(String username,String password);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java b/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java
new file mode 100644
index 0000000000..624b015c69
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/security/CallbackHandlerRegistry.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.security;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpidity.QpidConfig;
+
+public class CallbackHandlerRegistry
+{
+
+ private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+
+ private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>();
+
+ private StringBuilder _mechanisms;
+
+ public static CallbackHandlerRegistry getInstance()
+ {
+ return _instance;
+ }
+
+ public Class getCallbackHandlerClass(String mechanism)
+ {
+ return _mechanismToHandlerClassMap.get(mechanism);
+ }
+
+ public String getMechanisms()
+ {
+ return _mechanisms.toString();
+ }
+
+ private CallbackHandlerRegistry()
+ {
+ // first we register any Sasl client factories
+ DynamicSaslRegistrar.registerSaslProviders();
+ registerMechanisms();
+ }
+
+ private void registerMechanisms()
+ {
+ for (QpidConfig.SecurityMechanism securityMechanism: QpidConfig.get().getSecurityMechanisms() )
+ {
+ Class clazz = null;
+ try
+ {
+ clazz = Class.forName(securityMechanism.getHandler());
+ if (!AMQPCallbackHandler.class.isAssignableFrom(clazz))
+ {
+ System.out.println("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class +
+ ". Skipping");
+ continue;
+ }
+ _mechanismToHandlerClassMap.put(securityMechanism.getType(), clazz);
+ if (_mechanisms == null)
+ {
+
+ _mechanisms = new StringBuilder();
+ _mechanisms.append(securityMechanism.getType());
+ }
+ else
+ {
+ _mechanisms.append(" " + securityMechanism.getType());
+ }
+ }
+ catch (ClassNotFoundException ex)
+ {
+ System.out.println("Unable to load class " + securityMechanism.getHandler() + ". Skipping that SASL provider");
+ continue;
+ }
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java b/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java
new file mode 100644
index 0000000000..52dacc6985
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/security/DynamicSaslRegistrar.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.security;
+
+import java.security.Security;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
+
+import org.apache.qpidity.QpidConfig;
+
+public class DynamicSaslRegistrar
+{
+ public static void registerSaslProviders()
+ {
+ Map<String, Class> factories = registerSaslClientFactories();
+ if (factories.size() > 0)
+ {
+ Security.addProvider(new JCAProvider(factories));
+ System.out.println("Dynamic SASL provider added as a security provider");
+ }
+ }
+
+ private static Map<String, Class> registerSaslClientFactories()
+ {
+ TreeMap<String, Class> factoriesToRegister =
+ new TreeMap<String, Class>();
+
+ for (QpidConfig.SaslClientFactory factory: QpidConfig.get().getSaslClientFactories())
+ {
+ String className = factory.getFactoryClass();
+ try
+ {
+ Class clazz = Class.forName(className);
+ if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+ {
+ System.out.println("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+ continue;
+ }
+ factoriesToRegister.put(factory.getType(), clazz);
+ }
+ catch (Exception ex)
+ {
+ System.out.println("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ }
+ }
+ return factoriesToRegister;
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java b/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java
new file mode 100644
index 0000000000..c775171a5f
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/security/JCAProvider.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.security;
+
+import java.security.Provider;
+import java.security.Security;
+import java.util.Map;
+
+public class JCAProvider extends Provider
+{
+ public JCAProvider(Map<String, Class> providerMap)
+ {
+ super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
+ "AMQ SASL providers that want to be registered");
+ register(providerMap);
+ Security.addProvider(this);
+ }
+
+ private void register(Map<String, Class> providerMap)
+ {
+ for (Map.Entry<String, Class> me :providerMap.entrySet())
+ {
+ put("SaslClientFactory." + me.getKey(), me.getValue().getName());
+ }
+ }
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java b/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java
new file mode 100644
index 0000000000..0fd647e015
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/security/UsernamePasswordCallbackHandler.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.security;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+
+public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler
+{
+ private String _username;
+ private String _password;
+
+ public void initialise(String username,String password)
+ {
+ _username = username;
+ _password = password;
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(_username);
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword((_password).toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java
new file mode 100644
index 0000000000..6e4a0218d2
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClient.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.security.amqplain;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.Callback;
+
+/**
+ * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd.
+ *
+ */
+public class AmqPlainSaslClient implements SaslClient
+{
+ /**
+ * The name of this mechanism
+ */
+ public static final String MECHANISM = "AMQPLAIN";
+
+ private CallbackHandler _cbh;
+
+ public AmqPlainSaslClient(CallbackHandler cbh)
+ {
+ _cbh = cbh;
+ }
+
+ public String getMechanismName()
+ {
+ return "AMQPLAIN";
+ }
+
+ public boolean hasInitialResponse()
+ {
+ return true;
+ }
+
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException
+ {
+ // we do not care about the prompt or the default name
+ NameCallback nameCallback = new NameCallback("prompt", "defaultName");
+ PasswordCallback pwdCallback = new PasswordCallback("prompt", false);
+ Callback[] callbacks = new Callback[]{nameCallback, pwdCallback};
+ try
+ {
+ _cbh.handle(callbacks);
+ }
+ catch (Exception e)
+ {
+ throw new SaslException("Error handling SASL callbacks: " + e, e);
+ }
+ FieldTable table = FieldTableFactory.newFieldTable();
+ table.setString("LOGIN", nameCallback.getName());
+ table.setString("PASSWORD", new String(pwdCallback.getPassword()));
+ return table.getDataAsBytes();
+ }
+
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return null;
+ }
+
+ public void dispose() throws SaslException
+ {
+ _cbh = null;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java
new file mode 100644
index 0000000000..abc881f433
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/security/amqplain/AmqPlainSaslClientFactory.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.security.amqplain;
+
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Map;
+
+public class AmqPlainSaslClientFactory implements SaslClientFactory
+{
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
+ {
+ for (int i = 0; i < mechanisms.length; i++)
+ {
+ if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM))
+ {
+ if (cbh == null)
+ {
+ throw new SaslException("CallbackHandler must not be null");
+ }
+ return new AmqPlainSaslClient(cbh);
+ }
+ }
+ return null;
+ }
+
+ public String[] getMechanismNames(Map props)
+ {
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
+ {
+ // returned array must be non null according to interface documentation
+ return new String[0];
+ }
+ else
+ {
+ return new String[]{AmqPlainSaslClient.MECHANISM};
+ }
+ }
+}