diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 17:14:51 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 17:14:51 +0000 |
| commit | 1f6298a505e849fb71e7653705b7f106c8ecaeb0 (patch) | |
| tree | 740eb5184b804db07ea1bd87d0a078c8c9154390 /java | |
| parent | 451ea3fb817c3af36682f12cc123a861c9bcad41 (diff) | |
| download | qpid-python-1f6298a505e849fb71e7653705b7f106c8ecaeb0.tar.gz | |
Initial checkin of AMQP 1-0 Java Prototype work
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1157566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
12 files changed, 989 insertions, 950 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 449b52d737..41aa22b8ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,18 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.logging.*; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -28,9 +40,28 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; -import org.apache.qpid.server.Broker.InitException; +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.xml.QpidLog4JConfigurator; +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; +import org.apache.qpid.server.information.management.ServerInformationMBean; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.management.LoggingManagementMBean; +import org.apache.qpid.server.logging.messages.BrokerMessages; +import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; import org.apache.qpid.server.registry.ApplicationRegistry; - +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.transport.QpidAcceptor; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; /** * Main entry point for AMQPD. @@ -38,41 +69,39 @@ import org.apache.qpid.server.registry.ApplicationRegistry; */ public class Main { - private final Options options = new Options(); - private CommandLine commandLine; + private static Logger _logger; - public static void main(String[] args) + private static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; + + public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; + public static final String QPID_HOME = "QPID_HOME"; + private static final int IPV4_ADDRESS_LENGTH = 4; + + private static final char IPV4_LITERAL_SEPARATOR = '.'; + private java.util.logging.Logger FRAME_LOGGER; + private java.util.logging.Logger RAW_LOGGER; + + protected static class InitException extends Exception { - //if the -Dlog4j.configuration property has not been set, enable the init override - //to stop Log4J wondering off and picking up the first log4j.xml/properties file it - //finds from the classpath when we get the first Loggers - if(System.getProperty("log4j.configuration") == null) + InitException(String msg, Throwable cause) { - System.setProperty("log4j.defaultInitOverride", "true"); + super(msg, cause); } - - new Main(args); } - public Main(final String[] args) + protected final Options options = new Options(); + protected CommandLine commandLine; + + protected Main(String[] args) { setOptions(options); if (parseCommandline(args)) { - try - { - execute(); - } - catch(Exception e) - { - System.err.println("Exception during startup: " + e); - e.printStackTrace(); - shutdown(1); - } + execute(); } } - protected boolean parseCommandline(final String[] args) + protected boolean parseCommandline(String[] args) { try { @@ -90,7 +119,8 @@ public class Main } } - protected void setOptions(final Options options) + @SuppressWarnings("static-access") + protected void setOptions(Options options) { Option help = new Option("h", "help", false, "print this message"); Option version = new Option("v", "version", false, "print the version information and exit"); @@ -134,21 +164,16 @@ public class Main Option bind = OptionBuilder.withArgName("bind").hasArg() .withDescription("bind to the specified address. Overrides any value in the config file") - .withLongOpt("bind").create(BrokerOptions.BIND); + .withLongOpt("bind").create("b"); Option logconfig = OptionBuilder.withArgName("logconfig").hasArg() .withDescription("use the specified log4j xml configuration file. By " - + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE - + " in the same directory as the configuration file").withLongOpt("logconfig").create(BrokerOptions.LOG_CONFIG); + + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + + " in the same directory as the configuration file").withLongOpt("logconfig").create("l"); Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg() .withDescription("monitor the log file configuration file for changes. Units are seconds. " - + "Zero means do not check for changes.").withLongOpt("logwatch").create(BrokerOptions.WATCH); - - Option sslport = - OptionBuilder.withArgName("sslport").hasArg() - .withDescription("SSL port. Overrides any value in the config file") - .withLongOpt("sslport").create(BrokerOptions.SSL_PORTS); + + "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); options.addOption(help); options.addOption(version); @@ -162,120 +187,472 @@ public class Main options.addOption(exclude0_8); options.addOption(mport); options.addOption(bind); - options.addOption(sslport); } - protected void execute() throws Exception + protected void execute() { - BrokerOptions options = new BrokerOptions(); - String configFile = commandLine.getOptionValue(BrokerOptions.CONFIG); - if(configFile != null) + // note this understands either --help or -h. If an option only has a long name you can use that but if + // an option has a short name and a long name you must use the short name here. + if (commandLine.hasOption("h")) { - options.setConfigFile(configFile); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("Qpid", options, true); } + else if (commandLine.hasOption("v")) + { + String ver = QpidProperties.getVersionString(); + + StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: "); + + boolean first = true; + for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) + { + if (first) + { + first = false; + } + else + { + protocol.append(", "); + } + + protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); + + } - String logWatchConfig = commandLine.getOptionValue(BrokerOptions.WATCH); - if(logWatchConfig != null) + System.out.println(ver + " (" + protocol + ")"); + } + else { - options.setLogWatchFrequency(Integer.parseInt(logWatchConfig)); + try + { + CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); + startup(); + CurrentActor.remove(); + } + catch (InitException e) + { + System.out.println("Initialisation Error : " + e.getMessage()); + shutdown(1); + } + catch (Throwable e) + { + System.out.println("Error initialising message broker: " + e); + e.printStackTrace(); + shutdown(1); + } } + } + + protected void shutdown(int status) + { + ApplicationRegistry.removeAll(); + System.exit(status); + } + + protected void startup() throws Exception + { + + FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log"); + RAW_LOGGER = updateLogger("RAW", "qpid-raw.log"); + + final String QpidHome = System.getProperty(QPID_HOME); + final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); + final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath())); + if (!configFile.exists()) + { + String error = "File " + configFile + " could not be found. Check the file exists and is readable."; - String logConfig = commandLine.getOptionValue(BrokerOptions.LOG_CONFIG); - if(logConfig != null) + if (QpidHome == null) + { + error = error + "\nNote: " + QPID_HOME + " is not set."; + } + + throw new InitException(error, null); + } + else { - options.setLogConfigFile(logConfig); + CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath())); } - String jmxPort = commandLine.getOptionValue(BrokerOptions.MANAGEMENT); - if(jmxPort != null) + String logConfig = commandLine.getOptionValue("l"); + String logWatchConfig = commandLine.getOptionValue("w", "0"); + + int logWatchTime = 0; + try + { + logWatchTime = Integer.parseInt(logWatchConfig); + } + catch (NumberFormatException e) { - options.setJmxPort(Integer.parseInt(jmxPort)); + System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + + "a non-negative integer. Using default of zero (no watching configured"); } - String bindAddr = commandLine.getOptionValue(BrokerOptions.BIND); - if (bindAddr != null) + File logConfigFile; + if (logConfig != null) + { + logConfigFile = new File(logConfig); + configureLogging(logConfigFile, logWatchTime); + } + else { - options.setBind(bindAddr); + File configFileDirectory = configFile.getParentFile(); + logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME); + configureLogging(logConfigFile, logWatchTime); } - String[] portStr = commandLine.getOptionValues(BrokerOptions.PORTS); - if(portStr != null) + ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); + ServerConfiguration serverConfig = config.getConfiguration(); + updateManagementPort(serverConfig, commandLine.getOptionValue("m")); + + ApplicationRegistry.initialise(config); + + // We have already loaded the BrokerMessages class by this point so we + // need to refresh the locale setting incase we had a different value in + // the configuration. + BrokerMessages.reload(); + + // AR.initialise() sets and removes its own actor so we now need to set the actor + // for the remainder of the startup, and the default actor if the stack is empty + CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger())); + CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); + GenericActor.setDefaultMessageLogger(config.getRootMessageLogger()); + + + try { - parsePortArray(options, portStr, false); - for(ProtocolExclusion pe : ProtocolExclusion.values()) + configureLoggingManagementMBean(logConfigFile, logWatchTime); + + ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); + configMBean.register(); + + ServerInformationMBean sysInfoMBean = + new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion()); + sysInfoMBean.register(); + + + String[] portStr = commandLine.getOptionValues("p"); + + Set<Integer> ports = new HashSet<Integer>(); + Set<Integer> exclude_0_10 = new HashSet<Integer>(); + Set<Integer> exclude_0_9_1 = new HashSet<Integer>(); + Set<Integer> exclude_0_9 = new HashSet<Integer>(); + Set<Integer> exclude_0_8 = new HashSet<Integer>(); + + if(portStr == null || portStr.length == 0) { - parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe); + + parsePortList(ports, serverConfig.getPorts()); + parsePortList(exclude_0_10, serverConfig.getPortExclude010()); + parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); + parsePortList(exclude_0_9, serverConfig.getPortExclude09()); + parsePortList(exclude_0_8, serverConfig.getPortExclude08()); + } - } + else + { + parsePortArray(ports, portStr); + parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); + parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); + parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); + parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); - String[] sslPortStr = commandLine.getOptionValues(BrokerOptions.SSL_PORTS); - if(sslPortStr != null) - { - parsePortArray(options, sslPortStr, true); - for(ProtocolExclusion pe : ProtocolExclusion.values()) + } + + + + + String bindAddr = commandLine.getOptionValue("b"); + if (bindAddr == null) + { + bindAddr = serverConfig.getBind(); + } + InetAddress bindAddress = null; + + + + if (bindAddr.equals("wildcard")) { - parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe); + bindAddress = new InetSocketAddress(0).getAddress(); } + else + { + bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); + } + + String hostName = bindAddress.getCanonicalHostName(); + + + String keystorePath = serverConfig.getKeystorePath(); + String keystorePassword = serverConfig.getKeystorePassword(); + String certType = serverConfig.getCertType(); + SSLContextFactory sslFactory = null; + + if (!serverConfig.getSSLOnly()) + { + + for(int port : ports) + { + + NetworkDriver driver = new MINANetworkDriver(); + + Set<VERSION> supported = EnumSet.allOf(VERSION.class); + + if(exclude_0_10.contains(port)) + { + supported.remove(VERSION.v0_10); + } + + if(exclude_0_9_1.contains(port)) + { + supported.remove(VERSION.v0_9_1); + } + if(exclude_0_9.contains(port)) + { + supported.remove(VERSION.v0_9); + } + if(exclude_0_8.contains(port)) + { + supported.remove(VERSION.v0_8); + } + + MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory(hostName, supported); + + + + driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory, + serverConfig.getNetworkConfiguration(), null); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); + + } + + } + + if (serverConfig.getEnableSSL()) + { + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, + new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort())); + } + + CurrentActor.get().message(BrokerMessages.READY()); + } - - startBroker(options); + finally + { + // Startup is complete so remove the AR initialised Startup actor + CurrentActor.remove(); + } + + + } - protected void startBroker(final BrokerOptions options) throws Exception + private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException { - Broker broker = new Broker(); - broker.startup(options); + java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType); + logger.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + + return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : logger.getHandlers()) + { + logger.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + + logger.addHandler(handler); + + + handler = new FileHandler(logFileName, true); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + + logger.addHandler(handler); + return logger; } - protected void shutdown(final int status) + private void parsePortArray(Set<Integer> ports, String[] portStr) + throws InitException { - ApplicationRegistry.remove(); - System.exit(status); + if(portStr != null) + { + for(int i = 0; i < portStr.length; i++) + { + try + { + ports.add(Integer.parseInt(portStr[i])); + } + catch (NumberFormatException e) + { + throw new InitException("Invalid port: " + portStr[i], e); + } + } + } } - private static void parsePortArray(final BrokerOptions options,final Object[] ports, - final boolean ssl) throws InitException + private void parsePortList(Set<Integer> output, List input) + throws InitException { - if(ports != null) + if(input != null) { - for(int i = 0; i < ports.length; i++) + for(Object port : input) { try { - if(ssl) - { - options.addSSLPort(Integer.parseInt(String.valueOf(ports[i]))); - } - else - { - options.addPort(Integer.parseInt(String.valueOf(ports[i]))); - } + output.add(Integer.parseInt(String.valueOf(port))); } catch (NumberFormatException e) { - throw new InitException("Invalid port: " + ports[i], e); + throw new InitException("Invalid port: " + port, e); } } } } - private static void parsePortArray(final BrokerOptions options, final Object[] ports, - final ProtocolExclusion excludedProtocol) throws InitException + /** + * Update the configuration data with the management port. + * @param configuration + * @param managementPort The string from the command line + */ + private void updateManagementPort(ServerConfiguration configuration, String managementPort) { - if(ports != null) + if (managementPort != null) { - for(int i = 0; i < ports.length; i++) + try + { + configuration.setJMXManagementPort(Integer.parseInt(managementPort)); + } + catch (NumberFormatException e) { + _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); + } + } + } + + public static void main(String[] args) + { + //if the -Dlog4j.configuration property has not been set, enable the init override + //to stop Log4J wondering off and picking up the first log4j.xml/properties file it + //finds from the classpath when we get the first Loggers + if(System.getProperty("log4j.configuration") == null) + { + System.setProperty("log4j.defaultInitOverride", "true"); + } + + //now that the override status is know, we can instantiate the Loggers + _logger = Logger.getLogger(Main.class); + + new Main(args); + } + + private byte[] parseIP(String address) throws Exception + { + char[] literalBuffer = address.toCharArray(); + int byteCount = 0; + int currByte = 0; + byte[] ip = new byte[IPV4_ADDRESS_LENGTH]; + for (int i = 0; i < literalBuffer.length; i++) + { + char currChar = literalBuffer[i]; + if ((currChar >= '0') && (currChar <= '9')) + { + currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF); + } + + if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length)) + { + ip[byteCount++] = (byte) currByte; + currByte = 0; + } + } + + if (byteCount != 4) + { + throw new Exception("Invalid IP address: " + address); + } + return ip; + } + + private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException + { + if (logConfigFile.exists() && logConfigFile.canRead()) + { + CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); + + if (logWatchTime > 0) + { + System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + + logWatchTime + " seconds"); + // log4j expects the watch interval in milliseconds try { - options.addExcludedPort(excludedProtocol, - Integer.parseInt(String.valueOf(ports[i]))); + QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); } - catch (NumberFormatException e) + catch (Exception e) + { + throw new InitException(e.getMessage(),e); + } + } + else + { + try + { + QpidLog4JConfigurator.configure(logConfigFile.getPath()); + } + catch (Exception e) { - throw new InitException("Invalid port for exclusion: " + ports[i], e); + throw new InitException(e.getMessage(),e); } } } + else + { + System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); + System.err.println("Using the fallback internal log4j.properties configuration"); + + InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); + if(propsFile == null) + { + throw new IOException("Unable to load the fallback internal log4j.properties configuration file"); + } + else + { + try + { + Properties fallbackProps = new Properties(); + fallbackProps.load(propsFile); + PropertyConfigurator.configure(fallbackProps); + } + finally + { + propsFile.close(); + } + } + } + } + + private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception + { + LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); + + blm.register(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 460ea93509..500a34b4a8 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,44 +20,56 @@ */ package org.apache.qpid.server.protocol; -import java.util.EnumSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.transport.network.NetworkConnection; + +import java.util.Set; +import java.util.Arrays; +import java.util.HashSet; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { - private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class); - private static final AtomicLong ID_GENERATOR = new AtomicLong(0); + ; + + + public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 }; + + private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values())); private final IApplicationRegistry _appRegistry; private final String _fqdn; - private final Set<AmqpProtocolVersion> _supported; + private final Set<VERSION> _supported; + public MultiVersionProtocolEngineFactory() { - this("localhost", ALL_VERSIONS); + this(1, "localhost", ALL_VERSIONS); + } + + public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions) + { + this(1, fqdn, versions); } + public MultiVersionProtocolEngineFactory(String fqdn) { - this(fqdn, ALL_VERSIONS); + this(1, fqdn, ALL_VERSIONS); } - public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions) + public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions) { - _appRegistry = ApplicationRegistry.getInstance(); + _appRegistry = ApplicationRegistry.getInstance(instance); _fqdn = fqdn; _supported = supportedVersions; } - public ServerProtocolEngine newProtocolEngine(NetworkConnection network) + + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement()); + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a095ef47ea..3a89194eb9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -83,7 +83,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** null means shared */ private final AMQShortString _owner; - private AuthorizationHolder _authorizationHolder; + private PrincipalHolder _prinicpalHolder; private boolean _exclusive = false; private AMQSessionModel _exclusiveOwner; @@ -102,7 +102,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected final QueueEntryList _entries; - protected final SubscriptionList _subscriptionList = new SubscriptionList(); + protected final SubscriptionList _subscriptionList = new SubscriptionList(this); + + private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); private volatile Subscription _exclusiveSubscriber; @@ -186,7 +188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //TODO : persist creation time private long _createTime = System.currentTimeMillis(); private ConfigurationPlugin _queueConfiguration; - + private final boolean _isTopic; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) @@ -232,10 +234,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _exclusive = exclusive; _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments; + _arguments = arguments == null ? Collections.EMPTY_MAP : arguments; _id = virtualHost.getConfigStore().createId(); + _isTopic = arguments != null && arguments.containsKey("topic"); + _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); _logSubject = new QueueLogSubject(this); @@ -327,7 +331,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _exclusive; } - + public void setExclusive(boolean exclusive) throws AMQException { _exclusive = exclusive; @@ -371,14 +375,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _owner; } - public AuthorizationHolder getAuthorizationHolder() + public PrincipalHolder getPrincipalHolder() { - return _authorizationHolder; + return _prinicpalHolder; } - public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) + public void setPrincipalHolder(PrincipalHolder prinicpalHolder) { - _authorizationHolder = authorizationHolder; + _prinicpalHolder = prinicpalHolder; } @@ -402,8 +406,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied"); } - - + + if (hasExclusiveSubscriber()) { throw new ExistingExclusiveSubscription(); @@ -433,14 +437,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener subscription.setNoLocal(_nolocal); } _subscriptionList.add(subscription); - + //Increment consumerCountHigh if necessary. (un)registerSubscription are both //synchronized methods so we don't need additional synchronization here if(_counsumerCountHigh.get() < getConsumerCount()) { _counsumerCountHigh.incrementAndGet(); } - + if (isDeleted()) { subscription.queueDeleted(this); @@ -486,6 +490,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // queue. This is because the delete method uses the subscription set which has just been cleared subscription.queueDeleted(this); } + + if(_subscriptionList.size() == 0 && _isTopic) + { + clearQueue(); + } } } @@ -512,10 +521,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener break; } } - + reconfigure(); } - + private void reconfigure() { //Reconfigure the queue for to reflect this new binding. @@ -541,7 +550,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void removeBinding(final Binding binding) { _bindings.remove(binding); - + reconfigure(); } @@ -568,101 +577,104 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { - incrementTxnEnqueueStats(message); - incrementQueueCount(); - incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); - QueueEntry entry; Subscription exclusiveSub = _exclusiveSubscriber; - - if (exclusiveSub != null) + if(!_isTopic || _subscriptionList.size()!=0) { - exclusiveSub.getSendLock(); + incrementTxnEnqueueStats(message); + incrementQueueCount(); + incrementQueueSize(message); - try - { - entry = _entries.add(message); + QueueEntry entry; - deliverToSubscription(exclusiveSub, entry); - } - finally + if (exclusiveSub != null) { - exclusiveSub.releaseSendLock(); - } - } - else - { - entry = _entries.add(message); - /* - - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message + exclusiveSub.getSendLock(); - */ - SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); - SubscriptionList.SubscriptionNode nextNode = node.findNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().findNext(); - } - while (nextNode != null) - { - if (_subscriptionList.updateMarkedNode(node, nextNode)) + try { - break; + entry = _entries.add(message); + + deliverToSubscription(exclusiveSub, entry); } - else + finally { - node = _subscriptionList.getMarkedNode(); - nextNode = node.findNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().findNext(); - } + exclusiveSub.releaseSendLock(); } } + else + { + entry = _entries.add(message); + /* - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; + iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - while (entry.isAvailable() && loops != 0) - { + */ + SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); + SubscriptionList.SubscriptionNode nextNode = node.getNext(); if (nextNode == null) { - loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _subscriptionList.getHead().getNext(); } - else + while (nextNode != null) { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); + if (_lastSubscriptionNode.compareAndSet(node, nextNode)) + { + break; + } + else + { + node = _lastSubscriptionNode.get(); + nextNode = node.getNext(); + if (nextNode == null) + { + nextNode = _subscriptionList.getHead().getNext(); + } + } } - nextNode = nextNode.findNext(); + // always do one extra loop after we believe we've finished + // this catches the case where we *just* miss an update + int loops = 2; + + while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) + { + if (nextNode == null) + { + loops--; + nextNode = _subscriptionList.getHead(); + } + else + { + // if subscription at end, and active, offer + Subscription sub = nextNode.getSubscription(); + deliverToSubscription(sub, entry); + } + nextNode = nextNode.getNext(); + + } } - } - if (entry.isAvailable()) - { - checkSubscriptionsNotAheadOfDelivery(entry); + if (!(entry.isAcquired() || entry.isDeleted())) + { + checkSubscriptionsNotAheadOfDelivery(entry); - deliverAsync(); - } + deliverAsync(); + } - if(_managedObject != null) - { - _managedObject.checkForNotification(entry.getMessage()); - } + if(_managedObject != null) + { + _managedObject.checkForNotification(entry.getMessage()); + } - if(action != null) - { - action.onEnqueue(entry); + if(action != null) + { + action.onEnqueue(entry); + } } - } private void deliverToSubscription(final Subscription sub, final QueueEntry entry) @@ -718,20 +730,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { getAtomicQueueCount().incrementAndGet(); } - + private void incrementTxnEnqueueStats(final ServerMessage message) { SessionConfig session = message.getSessionConfig(); - + if(session !=null && session.isTransactional()) { _msgTxnEnqueues.incrementAndGet(); _byteTxnEnqueues.addAndGet(message.getSize()); } } - + private void incrementTxnDequeueStats(QueueEntry entry) - { + { _msgTxnDequeues.incrementAndGet(); _byteTxnDequeues.addAndGet(entry.getSize()); } @@ -745,6 +757,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener incrementUnackedMsgCount(); sub.send(entry); + + if(_isTopic) + { + if(allSubscriptionsAhead(entry) && entry.acquire()) + { + entry.discard(); + } + } + } + + private boolean allSubscriptionsAhead(final QueueEntry entry) + { + SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); + while(subIter.advance() && !entry.isAcquired()) + { + final Subscription subscription = subIter.getNode().getSubscription(); + if(!subscription.isClosed()) + { + QueueContext context = (QueueContext) subscription.getQueueContext(); + if(context != null) + { + QueueEntry subnode = context._lastSeenEntry; + if(subnode.compareTo(entry)<0) + { + return false; + } + } + else + { + return false; + } + } + } + return true; } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException @@ -803,6 +849,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void requeue(QueueEntryImpl entry, Subscription subscription) + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + Subscription sub = subscriberIter.getNode().getSubscription(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues() && (!sub.acquires() && sub == subscription)) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + } + public void dequeue(QueueEntry entry, Subscription sub) { decrementQueueCount(); @@ -811,7 +875,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _deliveredMessages.decrementAndGet(); } - + if(sub != null && sub.isSessionTransactional()) { incrementTxnDequeueStats(entry); @@ -868,7 +932,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _subscriptionList.size(); } - + public int getConsumerCountHigh() { return _counsumerCountHigh.get(); @@ -940,7 +1004,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDispensed()) + if (node != null && !node.isDeleted()) { entryList.add(node); } @@ -1044,7 +1108,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDispensed() && filter.accept(node)) + if (!node.isDeleted() && filter.accept(node)) { entryList.add(node); } @@ -1238,6 +1302,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if ((messageId >= fromMessageId) && (messageId <= toMessageId) + && !node.isDeleted() && node.acquire()) { dequeueEntry(node); @@ -1267,7 +1332,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (noDeletes && queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node.acquire()) + if (!node.isDeleted() && node.acquire()) { dequeueEntry(node); noDeletes = false; @@ -1277,7 +1342,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } public long clearQueue() throws AMQException - { + { return clear(0l); } @@ -1288,7 +1353,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied: queue " + getName()); } - + QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; @@ -1297,7 +1362,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node.acquire()) + if (!node.isDeleted() && node.acquire()) { dequeueEntry(node, txn); if(++count == request) @@ -1355,7 +1420,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied: " + getName()); } - + if (!_deleted.getAndSet(true)) { @@ -1564,7 +1629,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void deliverAsync() { - QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1583,6 +1648,52 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(flusher); } + + private class Runner implements ReadWriteRunnable + { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + + public void run() + { + String originalName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName(_name); + CurrentActor.set(_logActor); + + processQueue(this); + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + Thread.currentThread().setName(originalName); + } + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } + + public String toString() + { + return _name; + } + } + public void flushSubscription(Subscription sub) throws AMQException { // Access control @@ -1603,12 +1714,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { sub.getSendLock(); atTail = attemptDelivery(sub); - if (atTail && sub.isAutoClose()) + if (atTail && getNextAvailableEntry(sub) == null) { - unregisterSubscription(sub); - - sub.confirmAutoClose(); - + sub.queueEmpty(); } else if (!atTail) { @@ -1629,6 +1737,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { advanceAllSubscriptions(); } + return atTail; } @@ -1651,7 +1760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = getNextAvailableEntry(sub); - if (node != null && node.isAvailable()) + if (node != null && !(node.isAcquired() || node.isDeleted())) { if (sub.hasInterest(node)) { @@ -1712,7 +1821,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; - while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node))) + while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node))) { if (expired) { @@ -1741,40 +1850,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - /** - * Used by queue Runners to asynchronously deliver messages to consumers. - * - * A queue Runner is started whenever a state change occurs, e.g when a new - * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are - * capable of accepting/delivering all messages then these messages would - * otherwise remain on the queue. - * - * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue - * then processQueue should stop to prevent spinning. - * - * Since processQueue is runs in a fixed size Executor, it should not run - * indefinitely to prevent starving other tasks of CPU (e.g jobs to process - * incoming messages may not be able to be scheduled in the thread pool - * because all threads are working on clearing down large queues). To solve - * this problem, after an arbitrary number of message deliveries the - * processQueue job stops iterating, resubmits itself to the executor, and - * ends the current instance - * - * @param runner the Runner to schedule - * @throws AMQException - */ - public void processQueue(QueueRunner runner) throws AMQException + private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; boolean deliveryIncomplete = true; - boolean lastLoop = false; - int iterations = MAX_ASYNC_DELIVERIES; + int extraLoops = 1; + long iterations = MAX_ASYNC_DELIVERIES; _asynchronousRunner.compareAndSet(runner, null); @@ -1791,14 +1874,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (previousStateChangeCount != stateChangeCount) { - //further asynchronous delivery is required since the - //previous loop. keep going if iteration slicing allows. - lastLoop = false; + extraLoops = 1; } previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; + deliveryIncomplete = _subscriptionList.size() != 0; + boolean done; SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer @@ -1808,25 +1889,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.getSendLock(); try { - //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub); - if (subscriptionDone) + + done = attemptDelivery(sub); + + if (done) { - //close autoClose subscriptions if we are not currently intent on continuing - if (lastLoop && sub.isAutoClose()) + if (extraLoops == 0) { - unregisterSubscription(sub); + if(getNextAvailableEntry(sub) == null) + { + sub.queueEmpty(); + } + deliveryIncomplete = false; - sub.confirmAutoClose(); + } + else + { + extraLoops--; } } else { - //this subscription can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; - lastLoop = false; iterations--; + extraLoops = 1; } } finally @@ -1834,34 +1919,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.releaseSendLock(); } } - - if(allSubscriptionsDone && lastLoop) - { - //We have done an extra loop already and there are again - //again no further delivery attempts possible, only - //keep going if state change demands it. - deliveryIncomplete = false; - } - else if(allSubscriptionsDone) - { - //All subscriptions reported being done, but we have to do - //an extra loop if the iterations are not exhausted and - //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; - lastLoop = true; - } - else - { - //some subscriptions can still accept more messages, - //keep going if iteration count allows. - lastLoop = false; - deliveryIncomplete = true; - } - _asynchronousRunner.set(null); } - // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit + // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { @@ -1881,8 +1942,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted and not dequeued - if (!node.isDispensed()) + // Only process nodes that are not currently deleted + if (!node.isDeleted()) { // If the node has exired then aquire it if (node.expired() && node.acquire()) @@ -2146,22 +2207,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _dequeueSize.get(); } - + public long getByteTxnEnqueues() { return _byteTxnEnqueues.get(); } - + public long getByteTxnDequeues() { return _byteTxnDequeues.get(); } - + public long getMsgTxnEnqueues() { return _msgTxnEnqueues.get(); } - + public long getMsgTxnDequeues() { return _msgTxnDequeues.get(); @@ -2198,21 +2259,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _unackedMsgCountHigh.get(); } - + public long getUnackedMessageCount() { return _unackedMsgCount.get(); } - + public void decrementUnackedMsgCount() { _unackedMsgCount.decrementAndGet(); } - + private void incrementUnackedMsgCount() { long unackedMsgCount = _unackedMsgCount.incrementAndGet(); - + long unackedMsgCountHigh; while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) { @@ -2222,9 +2283,4 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } - - public LogActor getLogActor() - { - return _logActor; - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java index 4c59c25d84..15651b088b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java @@ -20,73 +20,20 @@ */ package org.apache.qpid.server.security.auth.manager; -import javax.security.auth.Subject; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource; import org.apache.qpid.common.Closeable; -import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.security.auth.AuthenticationResult; -/** - * Implementations of the AuthenticationManager are responsible for determining - * the authenticity of a user's credentials. - * - * If the authentication is successful, the manager is responsible for producing a populated - * {@link Subject} containing the user's identity and zero or more principals representing - * groups to which the user belongs. - * <p> - * The {@link #initialise()} method is responsible for registering SASL mechanisms required by - * the manager. The {@link #close()} method must reverse this registration. - * - */ -public interface AuthenticationManager extends Closeable, Plugin +public interface AuthenticationManager extends Closeable, CallbackHanderSource { - /** The name for the required SASL Server mechanisms */ - public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; - - /** - * Initialise the authentication plugin. - * - */ - void initialise(); - - /** - * Gets the SASL mechanisms known to this manager. - * - * @return SASL mechanism names, space separated. - */ String getMechanisms(); - /** - * Creates a SASL server for the specified mechanism name for the given - * fully qualified domain name. - * - * @param mechanism mechanism name - * @param localFQDN domain name - * - * @return SASL server - * @throws SaslException - */ SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException; - /** - * Authenticates a user using SASL negotiation. - * - * @param server SASL server - * @param response SASL response to process - * - * @return authentication result - */ AuthenticationResult authenticate(SaslServer server, byte[] response); - /** - * Authenticates a user using their username and password. - * - * @param username username - * @param password password - * - * @return authentication result - */ - AuthenticationResult authenticate(String username, String password); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 1945c2e15f..bbd90b4d53 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -20,65 +20,27 @@ */ package org.apache.qpid.server.security.auth.manager; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.security.Security; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; - -import javax.security.auth.Subject; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.login.AccountNotFoundException; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; -import javax.security.sasl.SaslServerFactory; - +import org.apache.log4j.Logger; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.configuration.PropertyException; -import org.apache.qpid.configuration.PropertyUtils; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.security.auth.AuthenticationResult; -import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean; -import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.JCAProvider; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; +import org.apache.qpid.server.security.auth.AuthenticationResult; +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.SaslServerFactory; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslException; +import javax.security.sasl.Sasl; +import java.util.Map; +import java.util.HashMap; +import java.util.TreeMap; +import java.security.Security; -/** - * Concrete implementation of the AuthenticationManager that determines if supplied - * user credentials match those appearing in a PrincipalDatabase. The implementation - * of the PrincipalDatabase is determined from the configuration. - * - * This implementation also registers the JMX UserManagemement MBean. - * - * This plugin expects configuration such as: - * - * <pre> - * <pd-auth-manager> - * <principal-database> - * <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class> - * <attributes> - * <attribute> - * <name>passwordFile</name> - * <value>${conf}/passwd</value> - * </attribute> - * </attributes> - * </principal-database> - * </pd-auth-manager> - * </pre> - */ public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager { private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class); @@ -87,109 +49,55 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private String _mechanisms; /** Maps from the mechanism to the callback handler to use for handling those requests */ - private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>(); + private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>(); /** * Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for * details of the use of these properties. This map is populated during initialisation of each provider. */ - private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>(); - - protected PrincipalDatabase _principalDatabase = null; + private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>(); - protected AMQUserManagementMBean _mbean = null; + private AuthenticationManager _default = null; + /** The name for the required SASL Server mechanisms */ + public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; - public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>() + public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception { - public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException - { - final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName()); + _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'") + + " PrincipalDatabase authentication manager."); - // If there is no configuration for this plugin then don't load it. - if (configuration == null) - { - _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager"); - return null; - } + // Fixme This should be done per Vhost but allowing global hack isn't right but ... + // required as authentication is done before Vhost selection - final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager(); - pdam.configure(configuration); - pdam.initialise(); - return pdam; - } + Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>(); - public Class<PrincipalDatabaseAuthenticationManager> getPluginClass() - { - return PrincipalDatabaseAuthenticationManager.class; - } - public String getPluginName() + if (name == null || hostConfig == null) { - return PrincipalDatabaseAuthenticationManager.class.getName(); + initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases()); } - }; - - public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin { - - public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory() + else { - public List<String> getParentPaths() - { - return Arrays.asList("security.pd-auth-manager"); - } + String databaseName = hostConfig.getAuthenticationDatabase(); - public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException + if (databaseName == null) { - final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration(); - - instance.setConfiguration(path, config); - return instance; - } - }; - - public String[] getElementsProcessed() - { - return new String[] {"principal-database.class", - "principal-database.attributes.attribute.name", - "principal-database.attributes.attribute.value"}; - } - public void validateConfiguration() throws ConfigurationException - { - } - - public String getPrincipalDatabaseClass() - { - return _configuration.getString("principal-database.class"); - } - - public Map<String,String> getPdClassAttributeMap() throws ConfigurationException - { - final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name"); - final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value"); - final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size()); - - for (int i = 0; i < argumentNames.size(); i++) + _default = ApplicationRegistry.getInstance().getAuthenticationManager(); + return; + } + else { - final String argName = argumentNames.get(i); - final String argValue = argumentValues.get(i); + PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName); - attributes.put(argName, argValue); - } + if (database == null) + { + throw new ConfigurationException("Requested database:" + databaseName + " was not found"); + } - return Collections.unmodifiableMap(attributes); + initialiseAuthenticationMechanisms(providerMap, database); + } } - } - - protected PrincipalDatabaseAuthenticationManager() - { - } - - public void initialise() - { - final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>(); - - initialiseAuthenticationMechanisms(providerMap, _principalDatabase); if (providerMap.size() > 0) { @@ -202,16 +110,33 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan { _logger.info("Additional SASL providers successfully registered."); } + } else { _logger.warn("No additional SASL providers registered."); } - registerManagement(); } - private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) + + private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception + { + if (databases.size() > 1) + { + _logger.warn("More than one principle database provided currently authentication mechanism will override each other."); + } + + for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet()) + { + // fixme As the database now provide the mechanisms they support, they will ... + // overwrite each other in the map. There should only be one database per vhost. + // But currently we must have authentication before vhost definition. + initialiseAuthenticationMechanisms(providerMap, entry.getValue()); + } + } + + private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception { if (database == null || database.getMechanisms().size() == 0) { @@ -227,6 +152,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser, Map<String, Class<? extends SaslServerFactory>> providerMap) + throws Exception { if (_mechanisms == null) { @@ -247,217 +173,70 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan _logger.info("Initialised " + mechanism + " SASL provider successfully"); } - /** - * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin) - */ - public void configure(final ConfigurationPlugin config) throws ConfigurationException - { - final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config; - final String pdClazz = pdamConfig.getPrincipalDatabaseClass(); - - _logger.info("PrincipalDatabase concrete implementation : " + pdClazz); - - _principalDatabase = createPrincipalDatabaseImpl(pdClazz); - - configPrincipalDatabase(_principalDatabase, pdamConfig); - } - public String getMechanisms() { - return _mechanisms; - } - - public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException - { - return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism), - _callbackHandlerMap.get(mechanism)); - } - - /** - * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[]) - */ - public AuthenticationResult authenticate(SaslServer server, byte[] response) - { - try + if (_default != null) { - // Process response from the client - byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]); - - if (server.isComplete()) - { - final Subject subject = new Subject(); - subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID())); - return new AuthenticationResult(subject); - } - else - { - return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE); - } + // Use the default AuthenticationManager if present + return _default.getMechanisms(); } - catch (SaslException e) + else { - return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e); + return _mechanisms; } } - /** - * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String) - */ - public AuthenticationResult authenticate(final String username, final String password) + public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException { - try + if (_default != null) { - if (_principalDatabase.verifyPassword(username, password.toCharArray())) - { - final Subject subject = new Subject(); - subject.getPrincipals().add(new UsernamePrincipal(username)); - return new AuthenticationResult(subject); - } - else - { - return new AuthenticationResult(AuthenticationStatus.CONTINUE); - } + // Use the default AuthenticationManager if present + return _default.createSaslServer(mechanism, localFQDN); } - catch (AccountNotFoundException e) + else { - return new AuthenticationResult(AuthenticationStatus.CONTINUE); + return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism), + _callbackHandlerMap.get(mechanism)); } - } - public void close() - { - _mechanisms = null; - Security.removeProvider(PROVIDER_NAME); - - unregisterManagement(); } - private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException + public AuthenticationResult authenticate(SaslServer server, byte[] response) { - try - { - return (PrincipalDatabase) Class.forName(pdClazz).newInstance(); - } - catch (InstantiationException ie) + // Use the default AuthenticationManager if present + if (_default != null) { - throw new ConfigurationException("Cannot instantiate " + pdClazz, ie); + return _default.authenticate(server, response); } - catch (IllegalAccessException iae) - { - throw new ConfigurationException("Cannot access " + pdClazz, iae); - } - catch (ClassNotFoundException cnfe) - { - throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe); - } - catch (ClassCastException cce) - { - throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce); - } - } - - private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config) - throws ConfigurationException - { - final Map<String,String> attributes = config.getPdClassAttributeMap(); - for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();) + try { - final Entry<String, String> nameValuePair = iterator.next(); - final String methodName = generateSetterName(nameValuePair.getKey()); - final Method method; - try - { - method = principalDatabase.getClass().getMethod(methodName, String.class); - } - catch (Exception e) - { - throw new ConfigurationException("No method " + methodName + " found in class " - + principalDatabase.getClass() - + " hence unable to configure principal database. The method must be public and " - + "have a single String argument with a void return type", e); - } - try - { - method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue())); - } - catch (IllegalArgumentException e) - { - throw new ConfigurationException(e.getMessage(), e); - } - catch (PropertyException e) - { - throw new ConfigurationException(e.getMessage(), e); - } - catch (IllegalAccessException e) + // Process response from the client + byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]); + + if (server.isComplete()) { - throw new ConfigurationException(e.getMessage(), e); + return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS); } - catch (InvocationTargetException e) + else { - // QPID-1347.. InvocationTargetException wraps the checked exception thrown from the reflective - // method call. Pull out the underlying message and cause to make these more apparent to the user. - throw new ConfigurationException(e.getCause().getMessage(), e.getCause()); + return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE); } } - } - - private String generateSetterName(String argName) throws ConfigurationException - { - if ((argName == null) || (argName.length() == 0)) - { - throw new ConfigurationException("Argument names must have length >= 1 character"); - } - - if (Character.isLowerCase(argName.charAt(0))) + catch (SaslException e) { - argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); + return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e); } - - final String methodName = "set" + argName; - return methodName; - } - - protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase) - { - _principalDatabase = principalDatabase; } - protected void registerManagement() + public void close() { - try - { - _logger.info("Registering UserManagementMBean"); - - _mbean = new AMQUserManagementMBean(); - _mbean.setPrincipalDatabase(_principalDatabase); - _mbean.register(); - } - catch (Exception e) - { - _logger.warn("User management disabled as unable to create MBean:", e); - _mbean = null; - } + Security.removeProvider(PROVIDER_NAME); } - protected void unregisterManagement() + public CallbackHandler getHandler(String mechanism) { - try - { - if (_mbean != null) - { - _logger.info("Unregistering UserManagementMBean"); - _mbean.unregister(); - } - } - catch (Exception e) - { - _logger.warn("Failed to unregister User management MBean:", e); - } - finally - { - _mbean = null; - } + return _callbackHandlerMap.get(mechanism); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java index 17d123eb0d..eb463ee722 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java @@ -45,10 +45,9 @@ public class AmqPlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && - (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE))) + if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java index 8a5ff7df2d..5706cbf49e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java @@ -47,11 +47,10 @@ public class AnonymousSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && - (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE) || - props.containsKey(Sasl.POLICY_NOANONYMOUS))) + if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE) || + props.containsKey(Sasl.POLICY_NOANONYMOUS))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java index 3144bfbce6..11b0f26e05 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java @@ -45,10 +45,9 @@ public class PlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && - (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE))) + if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 65790e2e6f..c4e2f1a322 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -23,25 +23,9 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; import static org.apache.qpid.util.Serial.gt; -import java.lang.ref.WeakReference; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; - -import javax.security.auth.Subject; +import com.sun.security.auth.UserPrincipal; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -54,18 +38,18 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -74,13 +58,24 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject +import java.lang.ref.WeakReference; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; + +public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject { - private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); - private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private final UUID _id; @@ -111,12 +106,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); private ServerTransaction _transaction; - + private final AtomicLong _txnStarts = new AtomicLong(0); private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); + + private Principal _principal; private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); @@ -129,27 +125,27 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); } - public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) - { - super(connection, delegate, name, expiry); - _connectionConfig = connConfig; - _transaction = new AutoCommitTransaction(this.getMessageStore()); - - _reference = new WeakReference<Session>(this); - _id = getConfigStore().createId(); - getConfigStore().addConfiguredObject(this); - } - protected void setState(State state) { super.setState(state); if (state == State.OPEN) { - _actor.message(ChannelMessages.CREATE()); + _actor.message(ChannelMessages.CREATE()); } } + public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) + { + super(connection, delegate, name, expiry); + _connectionConfig = connConfig; + _transaction = new AutoCommitTransaction(this.getMessageStore()); + _principal = new UserPrincipal(connection.getAuthorizationID()); + _reference = new WeakReference(this); + _id = getConfigStore().createId(); + getConfigStore().addConfiguredObject(this); + } + private ConfigStore getConfigStore() { return getConnectionConfig().getConfigStore(); @@ -164,8 +160,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) { - getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - _transaction.enqueue(queues,message, new ServerTransaction.Action() + + _transaction.enqueue(queues,message, new ServerTransaction.Action() { BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); @@ -193,7 +189,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi }); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); } @@ -201,7 +196,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi Runnable postIdSettingAction) { invoke(xfr, postIdSettingAction); - getConnectionModel().registerMessageDelivered(xfr.getBodySize()); } public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) @@ -337,7 +331,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - public void removeDispositionListener(Method method) + public void removeDispositionListener(Method method) { _messageDispositionListenerMap.remove(method.getId()); } @@ -357,7 +351,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { task.doTask(this); } - + CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE()); } @@ -383,7 +377,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi entry.release(); } }); - updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -403,7 +396,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void unregister(Subscription_0_10 sub) { - _subscriptions.remove(sub.getConsumerTag().toString()); + _subscriptions.remove(sub.getName()); try { sub.getSendLock(); @@ -417,14 +410,14 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } finally { sub.releaseSendLock(); } } - + public boolean isTransactional() { // this does not look great but there should only be one "non-transactional" @@ -432,11 +425,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi // theory return !(_transaction instanceof AutoCommitTransaction); } - - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } public void selectTx() { @@ -447,7 +435,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void commit() { _transaction.commit(); - + _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); @@ -456,13 +444,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void rollback() { _transaction.rollback(); - + _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } - + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -472,7 +460,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _txnCount.compareAndSet(0,1); } } - + private void decrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -483,17 +471,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - /** - * Update last transaction activity timestamp - */ - public void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(System.currentTimeMillis()); - } - } - public Long getTxnStarts() { return _txnStarts.get(); @@ -513,15 +490,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { return _txnCount.get(); } - - public Principal getAuthorizedPrincipal() - { - return ((ServerConnection) getConnection()).getAuthorizedPrincipal(); - } - - public Subject getAuthorizedSubject() + + public Principal getPrincipal() { - return ((ServerConnection) getConnection()).getAuthorizedSubject(); + return _principal; } public void addSessionCloseTask(Task task) @@ -634,47 +606,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi return (LogSubject) this; } - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException - { - if (inTransaction()) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); - - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime)); - _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); - } - - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - } - else if (openClose > 0L && openTime > openClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); - } - } - } - public String toLogString() { return "[" + MessageFormat.format(CHANNEL_FORMAT, - ((ServerConnection) getConnection()).getConnectionId(), + getConnection().getConnectionId(), getClientID(), ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(), getVirtualHost().getName(), getChannel()) + "] "; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 5f3446236c..49678055f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,16 +20,12 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -61,8 +57,7 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -71,7 +66,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; @@ -99,7 +94,7 @@ public class VirtualHostImpl implements VirtualHost private AMQBrokerManagerMBean _brokerMBean; - private final AuthenticationManager _authenticationManager; + private AuthenticationManager _authenticationManager; private SecurityManager _securityManager; @@ -111,12 +106,11 @@ public class VirtualHostImpl implements VirtualHost private BrokerConfig _broker; private UUID _id; - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _createTime = System.currentTimeMillis(); private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; + private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); public IConnectionRegistry getConnectionRegistry() { @@ -163,12 +157,12 @@ public class VirtualHostImpl implements VirtualHost public String getObjectInstanceName() { - return ObjectName.quote(_name); + return _name.toString(); } public String getName() { - return _name; + return _name.toString(); } public VirtualHostImpl getVirtualHost() @@ -177,11 +171,22 @@ public class VirtualHostImpl implements VirtualHost } } - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception + public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception + { + this(appRegistry, hostConfig, null); + } + + + public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception + { + this(ApplicationRegistry.getInstance(),hostConfig,store); + } + + private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception { if (hostConfig == null) { - throw new IllegalArgumentException("HostConfig cannot be null"); + throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); } _appRegistry = appRegistry; @@ -235,13 +240,11 @@ public class VirtualHostImpl implements VirtualHost initialiseMessageStore(hostConfig); } - _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(); + _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, _configuration); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); - - initialiseStatistics(); } private void initialiseHouseKeeping(long period) @@ -274,30 +277,19 @@ public class VirtualHostImpl implements VirtualHost // house keeping task from running. } } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - for (AMQSessionModel session : connection.getSessionModels()) - { - _logger.debug("Checking for long running open transactions on session " + session); - try - { - session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), - _configuration.getTransactionTimeoutOpenClose(), - _configuration.getTransactionTimeoutIdleWarn(), - _configuration.getTransactionTimeoutIdleClose()); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } } } scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); + class ForceChannelClosuresTask extends TimerTask + { + public void run() + { + _connectionRegistry.expireClosedChannels(); + } + } + Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -450,57 +442,46 @@ public class VirtualHostImpl implements VirtualHost private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); - String queueName = queue.getName(); if (queue.isDurable()) { getDurableConfigurationStore().createQueue(queue); } - //get the exchange name (returns default exchange name if none was specified) String exchangeName = queueConfiguration.getExchange(); - Exchange exchange = _exchangeRegistry.getExchange(exchangeName); + Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); + + if (exchange == null) + { + exchange = _exchangeRegistry.getDefaultExchange(); + } + if (exchange == null) { - throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); + throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); } - Exchange defaultExchange = _exchangeRegistry.getDefaultExchange(); - - //get routing keys in configuration (returns empty list if none are defined) - List<?> routingKeys = queueConfiguration.getRoutingKeys(); + List routingKeys = queueConfiguration.getRoutingKeys(); + if (routingKeys == null || routingKeys.isEmpty()) + { + routingKeys = Collections.singletonList(queue.getNameShortString()); + } for (Object routingKeyNameObj : routingKeys) { - String routingKey = String.valueOf(routingKeyNameObj); - - if (exchange.equals(defaultExchange) && !queueName.equals(routingKey)) + AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); + if (_logger.isInfoEnabled()) { - throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + - "' to the default exchange with a key other than the queue name: " + routingKey); + _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this); } - - configureBinding(queue, exchange, routingKey); - } - - if (!exchange.equals(defaultExchange)) - { - //bind the queue to the named exchange using its name - configureBinding(queue, exchange, queueName); + _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null); } - //ensure the queue is bound to the default exchange using its name - configureBinding(queue, defaultExchange, queueName); - } - - private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException - { - if (_logger.isInfoEnabled()) + if (exchange != _exchangeRegistry.getDefaultExchange()) { - _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName()); + _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null); } - _bindingFactory.addBinding(routingKey, queue, exchange, null); } public String getName() @@ -642,80 +623,6 @@ public class VirtualHostImpl implements VirtualHost { return _bindingFactory; } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - _appRegistry.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - _appRegistry.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - - for (AMQConnectionModel connection : _connectionRegistry.getConnections()) - { - connection.resetStatistics(); - } - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); - _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); - _messagesReceived = new StatisticsCounter("messages-received-" + getName()); - _dataReceived = new StatisticsCounter("bytes-received-" + getName()); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } public void createBrokerConnection(final String transport, final String host, @@ -752,6 +659,17 @@ public class VirtualHostImpl implements VirtualHost } } + public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) + { + LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); + if(linkRegistry == null) + { + linkRegistry = new LinkRegistry(); + _linkRegistry.put(remoteContainerId, linkRegistry); + } + return linkRegistry; + } + public ConfigStore getConfigStore() { return getApplicationRegistry().getConfigStore(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6db1560fb7..8755724cfc 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -100,14 +100,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase return bind(queueName, queueName, getHeadersMap(bindings)); } - + protected void unbind(TestQueue queue, String... bindings) throws AMQException { String queueName = queue.getName(); //TODO - check this exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings))); } - + protected int getCount() { return count; @@ -120,7 +120,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase exchange.onBind(new Binding(null,key, queue, exchange, args)); return queue; } - + protected int route(Message m) throws AMQException { @@ -175,14 +175,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase } } - + static Map<String,Object> getHeadersMap(String... entries) { if(entries == null) { return null; } - + Map<String,Object> headers = new HashMap<String,Object>(); for (String s : entries) @@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase static ContentHeaderBody getContentHeader(FieldTable headers) { ContentHeaderBody header = new ContentHeaderBody(); - header.setProperties(getProperties(headers)); + header.properties = getProperties(headers); return header; } @@ -428,11 +428,21 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase //To change body of implemented methods use File | Settings | File Templates. } - public boolean isRejectedBy(long subscriptionId) + public void reject(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRejectedBy(Subscription subscription) { return false; //To change body of implemented methods use File | Settings | File Templates. } + public void requeue(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void dequeue() { //To change body of implemented methods use File | Settings | File Templates. @@ -472,16 +482,6 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return 0; //To change body of implemented methods use File | Settings | File Templates. } - - public boolean isDequeued() - { - return false; - } - - public boolean isDispensed() - { - return false; - } }; if(action != null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 422105e410..790511017a 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -31,10 +31,10 @@ import org.apache.qpid.server.message.ServerMessage; /** * Mock Server Message allowing its persistent flag to be controlled from test. */ -class MockServerMessage implements ServerMessage +class MockServerMessage implements ServerMessage<MockServerMessage> { /** - * + * */ private final boolean persistent; @@ -46,56 +46,67 @@ class MockServerMessage implements ServerMessage this.persistent = persistent; } + public boolean isPersistent() { return persistent; } - public MessageReference newReference() + + public MessageReference<MockServerMessage> newReference() { throw new NotImplementedException(); } + public boolean isImmediate() { throw new NotImplementedException(); } + public long getSize() { throw new NotImplementedException(); } + public SessionConfig getSessionConfig() { throw new NotImplementedException(); } + public String getRoutingKey() { throw new NotImplementedException(); } + public AMQMessageHeader getMessageHeader() { throw new NotImplementedException(); } + public long getExpiration() { throw new NotImplementedException(); } + public int getContent(ByteBuffer buf, int offset) { throw new NotImplementedException(); } + public long getArrivalTime() { throw new NotImplementedException(); } + public Long getMessageNumber() { return 0L; |
