summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-20 10:52:26 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-20 10:52:26 +0000
commit959cffd1298dc79218182cbcf73d8e494b2b06e3 (patch)
tree83bbfbd944accfd5ec017fdb7ee6bda4be10ce3a /java/broker/src
parent43350455ee022494c324080993fdbd37d0788426 (diff)
downloadqpid-python-959cffd1298dc79218182cbcf73d8e494b2b06e3.tar.gz
Merged revisions 1-447993,447995-448007,448009-448141,448143-448157,448161-448194,448196-448210,448212-448218,448220-448223,448225-448233,448235,448237-448241,448243-448596,448598-448623,448625-448850,448852-448880,448882-448982,448984-449635,449637-449639,449641-449642,449644-449645,449647-449674,449676-449719,449721-449749,449751-449762,449764-449933,449935-449941,449943-450383,450385,450387-450400,450402-450433,450435-450503,450505-450555,450557-450860,450862-451024,451026-451149,451151-451316,451318-451931,451933-452139,452141-452162,452164-452320,452322,452324-452325,452327-452333,452335-452429,452431-452528,452530-452545,452547-453192,453194-453195,453197-453536,453538,453540-453656,453658-454676,454678-454735,454737,454739-454781,454783-462728,462730-462819,462821-462833,462835-462839,462841-463071,463073-463178,463180-463308,463310-463362,463364-463375,463377-463396,463398-463402,463404-463409,463411-463661,463663-463670,463672-463673,463675-464493,464495-464502,464504-464576,464578-464613,464615-464628,464630,464632-464866,464868-464899,464901-464942,464944-464949,464951-465004,465006-465016,465018-465053,465055-465165,465167-465321,465323-465406,465408-465427,465429-465431,465433-465548,465550-466044,466047-466075,466077,466079-466081,466083-466099,466101-466112,466114-466126,466128-466240,466242-466971,466973-466978,466980-467309,467311-467312,467316-467328,467330-467485,467487-467588,467590-467604,467606-467699,467701-467706,467708-467749,467751-468069,468071-468537,468539-469241,469244-469246,469248-469318,469320-469421,469423,469425-469429,469431-469435,469437-469462,469464-469469,469472-469477,469479-469490,469492-469503,469505-469529,469531-469598,469600-469624,469626-469737,469739-469752,469754-469806,469808-469928,469930-469953,469955-470011,470013-470109,470111-470335,470338-470339,470341-470379,470381,470383-470399,470401-470446,470448-470741,470743-470758,470760-470809,470811-470817,470819-470993,470995-471001,471003-471788,471790-471792,471794-472028,472030-472032,472034-472036,472038,472040,472043,472045-472059,472061,472063,472065-472066,472068,472070-472072,472074-472080,472082,472084-472092,472094-472107,472109-472123,472125-472158,472160-472165,472167-472172,472174-472457,472459-472460,472462-472464,472466-472470,472472-472483,472486-472491,472493-472494,472496-472497,472499,472501-472503,472505-472512,472514-472544,472546-472556,472558-472560,472562-472572,472574-472587,472589-472591,472593-472605,472607,472609-472731,472733-472786,472788-472843,472845-472849,472851-472859,472861-472878,472880-472903,472905,472907-472988,472990-472991,472993-473071,473073-473086,473088-473090,473093,473095-473096,473098-473106,473108-473110,473112-473185,473187-473260,473262,473268-473270,473275-473279,473281,473284-473287,473289-473295,473297-473306,473308-473330,473332-473335,473337,473339-473344,473346-473351,473353-473355,473357-473358,473361-473471,473473-473497,473499-473535,473537-473567,473569-473888,473890-474451,474454-474492,474494-474563,474565-474843,474845-474865,474867-474932,474934-475035,475037-475144,475146-475180,475182-475265,475267-475285,475287,475289-475293,475295-475296,475298-475302,475304-475631,475633-475649,475651-475748,475750-475752,475754-476107,476109-476302,476304-476413,476415-476430,476432-476700,476702-476868,476870-477147,477149-477213,477215-477263,477265-477340,477342-477635,477637-477789,477791-477825,477827-477841,477843,477846-477852,477854,477856,477858-477865,477867-477894,477896-478022,478024-478182,478184-478211,478213-478233,478235-478236,478238-478241,478243-478252,478254-478259,478261-478263,478265,478267-478269,478271-478286,478288-478342,478344-478379,478381-478412,478414-478443,478445-478636,478639-478658,478660-478821,478823-478853,478855-478922,478924-478962,478965-478974,478976-479029,479031-479049,479051-479210,479212-479214,479216-479407,479409-479415,479417-479425,479427-479559,479561-479639,479641-479676,479678-479685,479687-480030,480033-480086,480091-480093,480095-480118,480120-480139,480141,480143-480148,480150-480156,480158-480163,480165-480177,480179-480189,480191-480193,480195-480198,480200-480220,480222-480282,480284-480292,480294-480308,480310-480317,480320-480422,480424,480426-480581,480583-480656,480658-480692,480695-480702,480704,480706-480710,480712-480910,480913-480933,480935-480945,480947-480972,480974-480993,480995-481034,481036-481158,481161-481174,481176-481220,481222-481234,481236-481260,481263-481264,481266-481296,481298-481304,481306-481311,481313-481332,481334,481336-481380,481382-481441,481443-482144,482146-482180,482182-482193,482195-482232,482234-482236,482239,482241-482242,482244-482247,482250-482251,482253,482256-482261,482264-482288,482290-482364,482366,482368,482370-482554,482556,482558-482569,482572-482636,482638,482640-482696,482698-482722,482724-482732,482734-482771,482774-482957,482959-483045,483047-483105,483108,483110-483115,483117,483119-483127,483130-483134,483136-483148,483150-483158,483160-483164,483166-483178,483180-483391,483393-483400,483402-483403,483405-483418,483420-483421,483425-483436,483438-483470,483472-483502,483504-483558,483560-483599,483601-483637,483639-483644,483646-483659,483661-483670,483672-483878,483880-483910,483912-483915,483917-483940,483942,483944-483968,483970-483972,483974-483976,483978,483980-484612,484614-484657,484659-484693,484695-484718,484720-484842,484844-484847,484849-484986,484988-485019,485021-485489,485491-485544,485546-485591,485593,485595-485697,485699-485729,485731-485734,485736-485779,485781-485787,485789-485851,485853,485855-486007,486009,486011-486020,486022-486083,486085-486097,486099-486117,486120-486131,486133-486148,486150-486161,486163-486164,486166-486197,486199-486205,486208-486247,486249-486253,486256-486427,486429-486431,486433-486554,486556-486573,486575-486593,486595,486597-486609,486611-486619,486622,486625,486627-486641,486643-486645,486649-486687,486689-486721,486723-486730,486732-486746,486748-486759,486761,486763-486777,486779-486782,486784-486788,486790,486792,486794-486796,486798-487175,487178,487180-487213,487215,487217-487267,487269-487284,487286-487298,487300-487358,487360-487367,487369-487382,487384-487434,487436-487480,487482-487547,487549-487561,487563-487565,487567-487578,487580-487615,487617-487622,487624,487626,487628,487630-487635,487637-487703,487705-487777,487780-487781,487783-487800,487802-487803,487805-487820,487822-487848,487850-487902,487904-488103,488105-488133,488135-488158,488160-488163,488165-488187,488189-488216,488218-488248,488250-488278,488280,488282-488303,488305-488313,488315-488342,488344-488351,488353-488376,488378-488449,488451-488593,488595,488597-488623,488625-488700,488702-488704,488706-488710,488714,488716-488725,488727-488744,488746-488770,488772-488798,488800,488802-488807,488809,488811-488829,488831-488843,488845-488851,488853-489069,489071-489077,489079-489081,489084-489102,489104-489105,489107-489109,489111-489112,489114-489139,489141-489178,489181-489203,489205-489211,489213,489216-489329,489332-489402,489404-489417,489419-489421,489423-489643,489645-489690,489692-489703,489705-489714,489716-489747,489749-489753,489755-489803,489805-489904,489906-490372,490374-490504,490506-490604,490606-490707,490710-490733,490735-490871,490873-490984,490986-491028,491030,491032-491071,491073-491119,491121-491576,491578-491672,491674-491800,491802-491838,491840-491878,491880-492183,492185-492279,492281-492317,492319-492513,492515-492584,492586-492587,492589-492601,492603-492635,492637-492640,492642-492717,492719-492723,492725-492729,492731-492755,492757-492901,492903-492955,492957-492962,492964-492997,492999-493002,493004-493041,493043-493059,493062-493063,493065-493086,493088-493125,493127-493139,493141-493150,493152-493871,493873-494017,494019-494030,494032-494041,494043-494091,494093-494120,494122-494354,494356-494436,494438-494539,494541-494552,494554-494586,494588-494649,494651,494653-494654,494656-494657,494659-494764,494766-494768,494770-494796,494798-494799,494802,494804-494860,494862-494903,494905-494906,494908-495019,495021-495160,495162-495168,495171-495188,495190-495229,495231-495254,495256-495303,495305-495313,495315-495336,495338-495372,495374-495379,495381-495454,495457-495459,495462-495516,495518-495524,495526-495531,495533-495548,495551-495553,495555,495557-495558,495560,495562-495573,495575-495583,495585-495594,495596-495628,495630-495638,495640-495651,495653-495660,495662-495753,495755-496259,496261-496262,496264-496269,496271-496275,496277-496301,496303-496316,496318-496383,496385-496413,496415-496495,496497-496625,496627-496636,496638-496640,496642-496647,496650-496657,496659-496660,496663-496664,496666-496677,496679-496681,496683-496730,496732-496750,496752,496754-496784,496786-496832,496834-496840,496842-496990,496992-496995,496997-497340,497343-497351,497353-497403,497405-497424,497426-497438,497440-497481,497483-497497,497499-497765,497767-497769,497771-497775,497777-497778,497780,497782-497783,497785,497787-497812,497814-497871,497873-497877,497879-498573,498575-498588,498590,498592,498594-498636,498638-498669,498671-498686,498688-498689,498691-498719,498721-498964,498966-498969,498971-498973,498975-498982,498985-499035,499037-499040,499042,499044-499048,499050-499082,499084-499086,499088-499164,499167-499169,499171-499355,499357-499370,499372-499373,499375-499391,499393,499395-499425,499428,499430-499445,499447-499455,499457-499460,499462-499465,499467,499469-499489,499491-499492,499494-499531,499533-499562,499566-499627,499629-499715,499717-499732,499734-499755,499758-499763,499765-499780,499782-499795,499797-499802,499804-499844,499846,499848-499850,499852-499863,499865-499873,499875-499974,499976-499978,499980-500263,500265-500283,500285-500309,500311-501000,501002,501012-501057,501059-501095,501097-501390,501392-501410,501413-501447,501449-501454,501456,501458-501464,501466-501471,501473-501803,501805-501913,501915-501916,501918-501919,501921-501944,501946-502171,502173-502177,502181,502183-502247,502250-502252,502254-502260,502262-502267,502270,502272,502274-502575,502577-502609,502611-502619,502621-502626,502628-502654,502656-503592,503594-503603,503605-503608,503610-503636,503638-503645,503647-503705,503707-503789,503791-504024,504026-504111,504113-504506,504508-504735,504737-504863,504865-504867,504869-504914,504916-505241,505243-505254,505257-505267,505269-505354,505356-505891,505893-505971,505973-506400,506402-506404,506407-506438,506440-506516,506518-506541,506543-506966,506968-506971,506973-507095,507097-507108,507111-507454,507456,507459-507471,507473-507556,507558,507560-507581,507585-507594,507597,507599-507608,507610-507728,507730-507893,507895-507937,507940-508234,508236-508350,508352-508365,508367-508380,508383,508386-508415,508417-508648,508650-508941,508943-509146,509148-509171,509173-509175,509179-509201,509203-509207,509209-509215,509217-509222,509224-509477,509480-509627,509629-509634,509636-509641,509643-509736,509738-509931,509933-510059,510061-510075,510077-510158,510161-510896,510898-510938,510940-511388,511390-511922,511924-512287,512289-512698,512702-512813,512815-512817,512819-513359,513361-513370,513372-514702,514704-514886,514888-514902,514904-515126,515129-515141,515143-515516,515518-515534,515536-515538,515540-515648,515650-515651,515653-516070,516072-516411,516413-516448,516450,516452-517637,517639-517647,517649-517659,517661-517663,517665-517677,517679-517682,517684-517744,517746-518085,518087-518175,518177-518558,518560-518568,518571-518666,518668,518670-518699,518701-518987,518990-518992,518994-519908,519910-519932,519934-520414,520416-520842,520844-520937,520939-521362,521364-521681,521683-521704,521706-521709,521711-521714,521716-521781,521783-521792,521794-522462,522464-522527,522529-522534,522536-522566,522568-522958,522960,522962-522966,522968-522976,522978-522980,522982-522988,522992-522993,522995-523244,523246-523746,523748-524049,524051-524738,524741-524742,524744-524762,524764,524766,524768-525486,525488-525530,525532,525534,525537-525552,525554-525765,525767-525776,525778-525784,525789-525803,525805-525816,525818-525828,525830-525861,525863-525866,525868-526090,526092-526112,526114-526116,526119-526121,526123-526149,526151-526153,526155-526156,526160-526165,526167-526186,526188-526193,526196-526197,526200-526665,526667-526682,526686-526690,526693,526695-526708,526710-526713,526715-526775,526777-526802,526804-526806,526808-527048,527051-527052,527054-527181,527183-527486,527488-527492,527494-527498,527500-527508,527510-527517,527519-527536,527538-527555,527559-527802,527804-527842,527844-527847,527849-527875,527877-527940,527942-527958,527960-527971,527973-528002,528004,528006-528423,528425-529232,529234-529245,529247-529296,529298-529634,529636-529658,529660-529665,529667-529668,529670-530033,530035-530036,530038-530040,530045-530046,530050-530051,530053-530696,530698-530735 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r530432 | ritchiem | 2007-04-19 15:42:53 +0100 (Thu, 19 Apr 2007) | 8 lines QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test. AMQChannel remove comment around setPublisher - this is used by noLocal implementation. Subscription - rename of hasFilters to filtersMessages AMQQueue/RemoteSubscriptionImpl/SubscriptionTestHelper/SubscriptionSet - rename of hasFilters to filtersMessages SubscriptionImpl - rename of hasFilters to filtersMessages and changes to include noLocal in that check. TopicSessionTest - Additional testing for NoLocal to ensure. ........ r530437 | ritchiem | 2007-04-19 15:52:49 +0100 (Thu, 19 Apr 2007) | 5 lines QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test. TopicSessionTest - Additional testing for NoLocal to ensure. Forgot to include the file in the commit. ........ r530438 | ritchiem | 2007-04-19 15:53:32 +0100 (Thu, 19 Apr 2007) | 1 line Added Test logging to aid in diagnosing problems ........ r530441 | ritchiem | 2007-04-19 16:07:54 +0100 (Thu, 19 Apr 2007) | 5 lines QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test. ConcurrentSelectorDeliveryManager - method changes from hasFilter to filtersMessages. Forgot to include the file in the commit. ........ r530442 | ritchiem | 2007-04-19 16:08:04 +0100 (Thu, 19 Apr 2007) | 5 lines QPID-455 Pre-fetched messages can cause problems with client tools. Set IMMEDIATE_PREFETCH="true" for previous behaviour. Inverted check now setting System proprety IMMEDIATE_PREFETCH="true" will cause existing messages to be immediately pre-fetched to the newly registered consumer. Solved out standing broker issues see QPID-458 and QPID-459. ........ r530444 | ritchiem | 2007-04-19 16:10:10 +0100 (Thu, 19 Apr 2007) | 3 lines QPID-454 - Message 'taken' notion is per message. Adjusted to be per message per queue. Previous commit was not sufficiently tested and other bugs were causing problems that were not related to this change. ........ r530447 | ritchiem | 2007-04-19 16:15:25 +0100 (Thu, 19 Apr 2007) | 1 line Committed test with localhost as the broker ........ r530449 | ritchiem | 2007-04-19 16:16:32 +0100 (Thu, 19 Apr 2007) | 1 line Additional test to ensure connection closure doesn't cause problems. ........ r530683 | ritchiem | 2007-04-20 09:11:05 +0100 (Fri, 20 Apr 2007) | 2 lines Reinstated the two consumer receive test. Added additional test class to cover the IMMEDIATE_PREFETCHs. ........ r530685 | ritchiem | 2007-04-20 09:13:03 +0100 (Fri, 20 Apr 2007) | 1 line Made IMMEDIATE_PREFETCH to allow access from additional test class. ........ r530686 | ritchiem | 2007-04-20 09:14:57 +0100 (Fri, 20 Apr 2007) | 1 line Moved string literal 'Qpid_HOME' to constant QPID_HOME ........ r530734 | bhupendrab | 2007-04-20 11:42:52 +0100 (Fri, 20 Apr 2007) | 1 line QPID-445 : md5 hashed password will be sent from management console to Qpid ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@530737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java4
14 files changed, 148 insertions, 87 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 2e1653e69d..97c95dae5e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -197,7 +197,6 @@ public class AMQChannel
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
_txnContext);
- // TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 146d0566ce..14aa919356 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -70,7 +70,7 @@ public class Main
private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
-
+ public static final String QPID_HOME = "QPID_HOME";
private static final int IPV4_ADDRESS_LENGTH = 4;
private static final char IPV4_LITERAL_SEPARATOR = '.';
@@ -204,7 +204,7 @@ public class Main
protected void startup() throws InitException, ConfigurationException, Exception
{
- final String QpidHome = System.getProperty("QPID_HOME");
+ final String QpidHome = System.getProperty(QPID_HOME);
final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
if (!configFile.exists())
@@ -213,7 +213,7 @@ public class Main
if (QpidHome == null)
{
- error = error + "\nNote: Qpid_HOME is not set.";
+ error = error + "\nNote: "+QPID_HOME+" is not set.";
}
throw new InitException(error, null);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b2046efee3..e19038504f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -80,18 +80,17 @@ public class AMQMessage
*/
private boolean _immediate;
- private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
- private Subscription _takenBySubcription;
+
private Set<Subscription> _rejectedBy = null;
+
+
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
- public boolean isTaken(AMQQueue queue)
- {
- return _taken.get();
- }
private final int hashcode = System.identityHashCode(this);
@@ -206,7 +205,7 @@ public class AMQMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _taken = new AtomicBoolean(false);
+// _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -326,7 +325,6 @@ public class AMQMessage
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
- _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -459,17 +457,53 @@ public class AMQMessage
return _deliveredToConsumer;
}
-
- public boolean taken(AMQQueue queue, Subscription sub)
+ public boolean isTaken(AMQQueue queue)
{
- if (_taken.getAndSet(true))
+ //return _taken.get();
+
+ synchronized (this)
{
- return true;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ _takenMap.put(queue, taken);
+ }
+
+ return taken.get();
}
- else
+ }
+
+ public boolean taken(AMQQueue queue, Subscription sub)
+ {
+// if (_taken.getAndSet(true))
+// {
+// return true;
+// }
+// else
+// {
+// _takenBySubcription = sub;
+// return false;
+// }
+
+ synchronized (this)
{
- _takenBySubcription = sub;
- return false;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+
+ if (taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, sub);
+ return false;
+ }
}
}
@@ -479,8 +513,26 @@ public class AMQMessage
{
_log.trace("Releasing Message:" + debugIdentity());
}
- _taken.set(false);
- _takenBySubcription = null;
+
+// _taken.set(false);
+// _takenBySubcription = null;
+
+
+ synchronized (this)
+ {
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+ else
+ {
+ taken.set(false);
+ }
+
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, null);
+ }
}
public boolean checkToken(Object token)
@@ -833,16 +885,20 @@ public class AMQMessage
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- _taken + " by :" + _takenBySubcription;
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+// _taken + " by :" + _takenBySubcription;
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcription;
+// return _takenBySubcription;
+ synchronized (this)
+ {
+ return _takenBySubcriptionMap.get(queue);
+ }
}
public void reject(Subscription subscription)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a418bb8f8a..65d5906d05 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -237,8 +236,10 @@ public class AMQQueue implements Managable, Comparable
/**
* Returns messages within the given range of message Ids
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return List of messages
*/
public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
@@ -253,6 +254,7 @@ public class AMQQueue implements Managable, Comparable
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -267,10 +269,10 @@ public class AMQQueue implements Managable, Comparable
/**
* moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages available in the given message
- * id range - setup the other queue for moving messages (stop the async delivery) - send these
- * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
- * remove messages from this queue - remove locks from both queues and start async delivery
+ * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
+ * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
+ * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
+ * locks from both queues and start async delivery
*
* @param fromMessageId
* @param toMessageId
@@ -442,7 +444,7 @@ public class AMQQueue implements Managable, Comparable
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
- if (subscription.hasFilters())
+ if (subscription.filtersMessages())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -641,7 +643,7 @@ public class AMQQueue implements Managable, Comparable
{
_totalMessagesReceived.incrementAndGet();
}
-
+
try
{
_managedObject.checkForNotification(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 979f692361..1f92cee1df 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executor;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
@@ -372,7 +371,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended() && sub.hasFilters())
+ if (!sub.isSuspended() && sub.filtersMessages())
{
Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
for (AMQMessage msg : messageList)
@@ -613,6 +612,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_processingThreadName = Thread.currentThread().getName();
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Running process Queue." + currentStatus());
+ }
+
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -633,11 +637,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
}
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Done process Queue." + currentStatus());
+ }
+
}
// private void sendNextMessage(Subscription sub)
// {
-// if (sub.hasFilters())
+// if (sub.filtersMessages())
// {
// sendNextMessage(sub, sub.getPreDeliveryQueue());
// if (sub.isAutoClose())
@@ -817,6 +827,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Executing Async process.");
+ }
executor.execute(asyncDelivery);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index e9f209839a..e6d5d0c88d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -32,7 +32,7 @@ public interface Subscription
void queueDeleted(AMQQueue queue) throws AMQException;
- boolean hasFilters();
+ boolean filtersMessages();
boolean hasInterest(AMQMessage msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index e3944954f3..3bce950ba9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -158,7 +158,7 @@ public class SubscriptionImpl implements Subscription
}
- if (_filters != null)
+ if (filtersMessages())
{
_messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
@@ -346,9 +346,9 @@ public class SubscriptionImpl implements Subscription
channel.queueDeleted(queue);
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
- return _filters != null;
+ return _filters != null || _noLocal;
}
public boolean hasInterest(AMQMessage msg)
@@ -363,7 +363,10 @@ public class SubscriptionImpl implements Subscription
// return false;
}
- if (_noLocal)
+ final AMQProtocolSession publisher = msg.getPublisher();
+
+ //todo - client id should be recoreded and this test removed but handled below
+ if (_noLocal && publisher != null)
{
// We don't want local messages so check to see if message is one we sent
Object localInstance;
@@ -372,8 +375,9 @@ public class SubscriptionImpl implements Subscription
if ((protocolSession.getClientProperties() != null) &&
(localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if ((msg.getPublisher().getClientProperties() != null) &&
- (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
if (localInstance == msgInstance || localInstance.equals(msgInstance))
{
@@ -388,8 +392,11 @@ public class SubscriptionImpl implements Subscription
}
else
{
+
localInstance = protocolSession.getClientIdentifier();
- msgInstance = msg.getPublisher().getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
+
+ msgInstance = publisher.getClientIdentifier();
if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
if (_logger.isTraceEnabled())
@@ -399,7 +406,6 @@ public class SubscriptionImpl implements Subscription
}
return false;
}
-
}
@@ -623,7 +629,7 @@ public class SubscriptionImpl implements Subscription
return _resendQueue;
}
- if (_filters != null)
+ if (filtersMessages())
{
if (isAutoClose())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 26b040aae0..b500247fa4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -157,7 +157,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
//FIXME the queue could be full of sent messages.
// Either need to clean all PDQs after sending a message
// OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
{
return subscription;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
index a43474559d..20f123179f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.security.access;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanOperationParameter;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
@@ -107,8 +106,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return UserManagement.TYPE;
}
- public boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password)
+ public boolean setPassword(String username, char[] password)
{
try
{
@@ -122,10 +120,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
}
}
- public boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
- @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
- @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ public boolean setRights(String username, boolean read, boolean write, boolean admin)
{
if (_accessRights.get(username) == null)
@@ -179,11 +174,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return true;
}
- public boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password,
- @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
- @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
- @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin)
{
if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password))
{
@@ -195,7 +186,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return false;
}
- public boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username)
+ public boolean deleteUser(String username)
{
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
index 6381213398..ce5e9fa4a7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
@@ -45,7 +45,7 @@ public interface UserManagement
*/
@MBeanOperation(name = "setPassword", description = "Set password for user.")
boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password);
+ @MBeanOperationParameter(name = "password", description = "Password")char[] password);
/**
* set rights for users with given details
@@ -76,7 +76,7 @@ public interface UserManagement
*/
@MBeanOperation(name = "createUser", description = "Create new user from system.")
boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password,
+ @MBeanOperationParameter(name = "password", description = "Password")char[] password,
@MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
@MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
@MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
index 956db64d90..cd0a371b48 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
@@ -176,7 +176,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
}
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
User user = _users.get(principal.getName());
@@ -187,13 +187,10 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
try
{
-
- char[] passwd = convertPassword(password);
-
try
{
_userUpdate.lock();
- user.setPassword(passwd);
+ user.setPassword(password);
try
{
@@ -215,7 +212,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
}
}
}
- catch (UnsupportedEncodingException e)
+ catch (Exception e)
{
return false;
}
@@ -237,23 +234,14 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
return passwd;
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
if (_users.get(principal.getName()) != null)
{
return false;
}
- User user;
- try
- {
- user = new User(principal.getName(), convertPassword(password));
- }
- catch (UnsupportedEncodingException e)
- {
- _logger.warn("Unable to encode password:" + e);
- return false;
- }
+ User user = new User(principal.getName(), password);
try
{
@@ -598,8 +586,13 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
private void encodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException
{
- Base64 b64 = new Base64();
- _encodedPassword = b64.encode(new String(_password).getBytes(DEFAULT_ENCODING));
+ byte[] byteArray = new byte[_password.length];
+ int index = 0;
+ for (char c : _password)
+ {
+ byteArray[index++] = (byte)c;
+ }
+ _encodedPassword = (new Base64()).encode(byteArray);
}
public boolean isModified()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
index 3f6794aaaf..90d08c963e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
@@ -151,12 +151,12 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase
return passwd;
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
return false; // updates denied
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
return false; // updates denied
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
index 8073fcc3c6..494d8e0bf4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
@@ -65,7 +65,7 @@ public interface PrincipalDatabase
* @return True if change was successful
* @throws AccountNotFoundException If the given principal doesn't exist in the Database
*/
- boolean updatePassword(Principal principal, String password)
+ boolean updatePassword(Principal principal, char[] password)
throws AccountNotFoundException;
/**
@@ -74,7 +74,7 @@ public interface PrincipalDatabase
* @param password The password to set for the principal
* @return True on a successful creation
*/
- boolean createPrincipal(Principal principal, String password);
+ boolean createPrincipal(Principal principal, char[] password);
/**
* Delete a principal
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
index b1ac0e1f00..74c330f606 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
@@ -93,12 +93,12 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase
}
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
return false; // updates denied
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
return false; // updates denied
}