summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-20 10:52:26 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-20 10:52:26 +0000
commit959cffd1298dc79218182cbcf73d8e494b2b06e3 (patch)
tree83bbfbd944accfd5ec017fdb7ee6bda4be10ce3a /java
parent43350455ee022494c324080993fdbd37d0788426 (diff)
downloadqpid-python-959cffd1298dc79218182cbcf73d8e494b2b06e3.tar.gz
Merged revisions 1-447993,447995-448007,448009-448141,448143-448157,448161-448194,448196-448210,448212-448218,448220-448223,448225-448233,448235,448237-448241,448243-448596,448598-448623,448625-448850,448852-448880,448882-448982,448984-449635,449637-449639,449641-449642,449644-449645,449647-449674,449676-449719,449721-449749,449751-449762,449764-449933,449935-449941,449943-450383,450385,450387-450400,450402-450433,450435-450503,450505-450555,450557-450860,450862-451024,451026-451149,451151-451316,451318-451931,451933-452139,452141-452162,452164-452320,452322,452324-452325,452327-452333,452335-452429,452431-452528,452530-452545,452547-453192,453194-453195,453197-453536,453538,453540-453656,453658-454676,454678-454735,454737,454739-454781,454783-462728,462730-462819,462821-462833,462835-462839,462841-463071,463073-463178,463180-463308,463310-463362,463364-463375,463377-463396,463398-463402,463404-463409,463411-463661,463663-463670,463672-463673,463675-464493,464495-464502,464504-464576,464578-464613,464615-464628,464630,464632-464866,464868-464899,464901-464942,464944-464949,464951-465004,465006-465016,465018-465053,465055-465165,465167-465321,465323-465406,465408-465427,465429-465431,465433-465548,465550-466044,466047-466075,466077,466079-466081,466083-466099,466101-466112,466114-466126,466128-466240,466242-466971,466973-466978,466980-467309,467311-467312,467316-467328,467330-467485,467487-467588,467590-467604,467606-467699,467701-467706,467708-467749,467751-468069,468071-468537,468539-469241,469244-469246,469248-469318,469320-469421,469423,469425-469429,469431-469435,469437-469462,469464-469469,469472-469477,469479-469490,469492-469503,469505-469529,469531-469598,469600-469624,469626-469737,469739-469752,469754-469806,469808-469928,469930-469953,469955-470011,470013-470109,470111-470335,470338-470339,470341-470379,470381,470383-470399,470401-470446,470448-470741,470743-470758,470760-470809,470811-470817,470819-470993,470995-471001,471003-471788,471790-471792,471794-472028,472030-472032,472034-472036,472038,472040,472043,472045-472059,472061,472063,472065-472066,472068,472070-472072,472074-472080,472082,472084-472092,472094-472107,472109-472123,472125-472158,472160-472165,472167-472172,472174-472457,472459-472460,472462-472464,472466-472470,472472-472483,472486-472491,472493-472494,472496-472497,472499,472501-472503,472505-472512,472514-472544,472546-472556,472558-472560,472562-472572,472574-472587,472589-472591,472593-472605,472607,472609-472731,472733-472786,472788-472843,472845-472849,472851-472859,472861-472878,472880-472903,472905,472907-472988,472990-472991,472993-473071,473073-473086,473088-473090,473093,473095-473096,473098-473106,473108-473110,473112-473185,473187-473260,473262,473268-473270,473275-473279,473281,473284-473287,473289-473295,473297-473306,473308-473330,473332-473335,473337,473339-473344,473346-473351,473353-473355,473357-473358,473361-473471,473473-473497,473499-473535,473537-473567,473569-473888,473890-474451,474454-474492,474494-474563,474565-474843,474845-474865,474867-474932,474934-475035,475037-475144,475146-475180,475182-475265,475267-475285,475287,475289-475293,475295-475296,475298-475302,475304-475631,475633-475649,475651-475748,475750-475752,475754-476107,476109-476302,476304-476413,476415-476430,476432-476700,476702-476868,476870-477147,477149-477213,477215-477263,477265-477340,477342-477635,477637-477789,477791-477825,477827-477841,477843,477846-477852,477854,477856,477858-477865,477867-477894,477896-478022,478024-478182,478184-478211,478213-478233,478235-478236,478238-478241,478243-478252,478254-478259,478261-478263,478265,478267-478269,478271-478286,478288-478342,478344-478379,478381-478412,478414-478443,478445-478636,478639-478658,478660-478821,478823-478853,478855-478922,478924-478962,478965-478974,478976-479029,479031-479049,479051-479210,479212-479214,479216-479407,479409-479415,479417-479425,479427-479559,479561-479639,479641-479676,479678-479685,479687-480030,480033-480086,480091-480093,480095-480118,480120-480139,480141,480143-480148,480150-480156,480158-480163,480165-480177,480179-480189,480191-480193,480195-480198,480200-480220,480222-480282,480284-480292,480294-480308,480310-480317,480320-480422,480424,480426-480581,480583-480656,480658-480692,480695-480702,480704,480706-480710,480712-480910,480913-480933,480935-480945,480947-480972,480974-480993,480995-481034,481036-481158,481161-481174,481176-481220,481222-481234,481236-481260,481263-481264,481266-481296,481298-481304,481306-481311,481313-481332,481334,481336-481380,481382-481441,481443-482144,482146-482180,482182-482193,482195-482232,482234-482236,482239,482241-482242,482244-482247,482250-482251,482253,482256-482261,482264-482288,482290-482364,482366,482368,482370-482554,482556,482558-482569,482572-482636,482638,482640-482696,482698-482722,482724-482732,482734-482771,482774-482957,482959-483045,483047-483105,483108,483110-483115,483117,483119-483127,483130-483134,483136-483148,483150-483158,483160-483164,483166-483178,483180-483391,483393-483400,483402-483403,483405-483418,483420-483421,483425-483436,483438-483470,483472-483502,483504-483558,483560-483599,483601-483637,483639-483644,483646-483659,483661-483670,483672-483878,483880-483910,483912-483915,483917-483940,483942,483944-483968,483970-483972,483974-483976,483978,483980-484612,484614-484657,484659-484693,484695-484718,484720-484842,484844-484847,484849-484986,484988-485019,485021-485489,485491-485544,485546-485591,485593,485595-485697,485699-485729,485731-485734,485736-485779,485781-485787,485789-485851,485853,485855-486007,486009,486011-486020,486022-486083,486085-486097,486099-486117,486120-486131,486133-486148,486150-486161,486163-486164,486166-486197,486199-486205,486208-486247,486249-486253,486256-486427,486429-486431,486433-486554,486556-486573,486575-486593,486595,486597-486609,486611-486619,486622,486625,486627-486641,486643-486645,486649-486687,486689-486721,486723-486730,486732-486746,486748-486759,486761,486763-486777,486779-486782,486784-486788,486790,486792,486794-486796,486798-487175,487178,487180-487213,487215,487217-487267,487269-487284,487286-487298,487300-487358,487360-487367,487369-487382,487384-487434,487436-487480,487482-487547,487549-487561,487563-487565,487567-487578,487580-487615,487617-487622,487624,487626,487628,487630-487635,487637-487703,487705-487777,487780-487781,487783-487800,487802-487803,487805-487820,487822-487848,487850-487902,487904-488103,488105-488133,488135-488158,488160-488163,488165-488187,488189-488216,488218-488248,488250-488278,488280,488282-488303,488305-488313,488315-488342,488344-488351,488353-488376,488378-488449,488451-488593,488595,488597-488623,488625-488700,488702-488704,488706-488710,488714,488716-488725,488727-488744,488746-488770,488772-488798,488800,488802-488807,488809,488811-488829,488831-488843,488845-488851,488853-489069,489071-489077,489079-489081,489084-489102,489104-489105,489107-489109,489111-489112,489114-489139,489141-489178,489181-489203,489205-489211,489213,489216-489329,489332-489402,489404-489417,489419-489421,489423-489643,489645-489690,489692-489703,489705-489714,489716-489747,489749-489753,489755-489803,489805-489904,489906-490372,490374-490504,490506-490604,490606-490707,490710-490733,490735-490871,490873-490984,490986-491028,491030,491032-491071,491073-491119,491121-491576,491578-491672,491674-491800,491802-491838,491840-491878,491880-492183,492185-492279,492281-492317,492319-492513,492515-492584,492586-492587,492589-492601,492603-492635,492637-492640,492642-492717,492719-492723,492725-492729,492731-492755,492757-492901,492903-492955,492957-492962,492964-492997,492999-493002,493004-493041,493043-493059,493062-493063,493065-493086,493088-493125,493127-493139,493141-493150,493152-493871,493873-494017,494019-494030,494032-494041,494043-494091,494093-494120,494122-494354,494356-494436,494438-494539,494541-494552,494554-494586,494588-494649,494651,494653-494654,494656-494657,494659-494764,494766-494768,494770-494796,494798-494799,494802,494804-494860,494862-494903,494905-494906,494908-495019,495021-495160,495162-495168,495171-495188,495190-495229,495231-495254,495256-495303,495305-495313,495315-495336,495338-495372,495374-495379,495381-495454,495457-495459,495462-495516,495518-495524,495526-495531,495533-495548,495551-495553,495555,495557-495558,495560,495562-495573,495575-495583,495585-495594,495596-495628,495630-495638,495640-495651,495653-495660,495662-495753,495755-496259,496261-496262,496264-496269,496271-496275,496277-496301,496303-496316,496318-496383,496385-496413,496415-496495,496497-496625,496627-496636,496638-496640,496642-496647,496650-496657,496659-496660,496663-496664,496666-496677,496679-496681,496683-496730,496732-496750,496752,496754-496784,496786-496832,496834-496840,496842-496990,496992-496995,496997-497340,497343-497351,497353-497403,497405-497424,497426-497438,497440-497481,497483-497497,497499-497765,497767-497769,497771-497775,497777-497778,497780,497782-497783,497785,497787-497812,497814-497871,497873-497877,497879-498573,498575-498588,498590,498592,498594-498636,498638-498669,498671-498686,498688-498689,498691-498719,498721-498964,498966-498969,498971-498973,498975-498982,498985-499035,499037-499040,499042,499044-499048,499050-499082,499084-499086,499088-499164,499167-499169,499171-499355,499357-499370,499372-499373,499375-499391,499393,499395-499425,499428,499430-499445,499447-499455,499457-499460,499462-499465,499467,499469-499489,499491-499492,499494-499531,499533-499562,499566-499627,499629-499715,499717-499732,499734-499755,499758-499763,499765-499780,499782-499795,499797-499802,499804-499844,499846,499848-499850,499852-499863,499865-499873,499875-499974,499976-499978,499980-500263,500265-500283,500285-500309,500311-501000,501002,501012-501057,501059-501095,501097-501390,501392-501410,501413-501447,501449-501454,501456,501458-501464,501466-501471,501473-501803,501805-501913,501915-501916,501918-501919,501921-501944,501946-502171,502173-502177,502181,502183-502247,502250-502252,502254-502260,502262-502267,502270,502272,502274-502575,502577-502609,502611-502619,502621-502626,502628-502654,502656-503592,503594-503603,503605-503608,503610-503636,503638-503645,503647-503705,503707-503789,503791-504024,504026-504111,504113-504506,504508-504735,504737-504863,504865-504867,504869-504914,504916-505241,505243-505254,505257-505267,505269-505354,505356-505891,505893-505971,505973-506400,506402-506404,506407-506438,506440-506516,506518-506541,506543-506966,506968-506971,506973-507095,507097-507108,507111-507454,507456,507459-507471,507473-507556,507558,507560-507581,507585-507594,507597,507599-507608,507610-507728,507730-507893,507895-507937,507940-508234,508236-508350,508352-508365,508367-508380,508383,508386-508415,508417-508648,508650-508941,508943-509146,509148-509171,509173-509175,509179-509201,509203-509207,509209-509215,509217-509222,509224-509477,509480-509627,509629-509634,509636-509641,509643-509736,509738-509931,509933-510059,510061-510075,510077-510158,510161-510896,510898-510938,510940-511388,511390-511922,511924-512287,512289-512698,512702-512813,512815-512817,512819-513359,513361-513370,513372-514702,514704-514886,514888-514902,514904-515126,515129-515141,515143-515516,515518-515534,515536-515538,515540-515648,515650-515651,515653-516070,516072-516411,516413-516448,516450,516452-517637,517639-517647,517649-517659,517661-517663,517665-517677,517679-517682,517684-517744,517746-518085,518087-518175,518177-518558,518560-518568,518571-518666,518668,518670-518699,518701-518987,518990-518992,518994-519908,519910-519932,519934-520414,520416-520842,520844-520937,520939-521362,521364-521681,521683-521704,521706-521709,521711-521714,521716-521781,521783-521792,521794-522462,522464-522527,522529-522534,522536-522566,522568-522958,522960,522962-522966,522968-522976,522978-522980,522982-522988,522992-522993,522995-523244,523246-523746,523748-524049,524051-524738,524741-524742,524744-524762,524764,524766,524768-525486,525488-525530,525532,525534,525537-525552,525554-525765,525767-525776,525778-525784,525789-525803,525805-525816,525818-525828,525830-525861,525863-525866,525868-526090,526092-526112,526114-526116,526119-526121,526123-526149,526151-526153,526155-526156,526160-526165,526167-526186,526188-526193,526196-526197,526200-526665,526667-526682,526686-526690,526693,526695-526708,526710-526713,526715-526775,526777-526802,526804-526806,526808-527048,527051-527052,527054-527181,527183-527486,527488-527492,527494-527498,527500-527508,527510-527517,527519-527536,527538-527555,527559-527802,527804-527842,527844-527847,527849-527875,527877-527940,527942-527958,527960-527971,527973-528002,528004,528006-528423,528425-529232,529234-529245,529247-529296,529298-529634,529636-529658,529660-529665,529667-529668,529670-530033,530035-530036,530038-530040,530045-530046,530050-530051,530053-530696,530698-530735 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r530432 | ritchiem | 2007-04-19 15:42:53 +0100 (Thu, 19 Apr 2007) | 8 lines QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test. AMQChannel remove comment around setPublisher - this is used by noLocal implementation. Subscription - rename of hasFilters to filtersMessages AMQQueue/RemoteSubscriptionImpl/SubscriptionTestHelper/SubscriptionSet - rename of hasFilters to filtersMessages SubscriptionImpl - rename of hasFilters to filtersMessages and changes to include noLocal in that check. TopicSessionTest - Additional testing for NoLocal to ensure. ........ r530437 | ritchiem | 2007-04-19 15:52:49 +0100 (Thu, 19 Apr 2007) | 5 lines QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test. TopicSessionTest - Additional testing for NoLocal to ensure. Forgot to include the file in the commit. ........ r530438 | ritchiem | 2007-04-19 15:53:32 +0100 (Thu, 19 Apr 2007) | 1 line Added Test logging to aid in diagnosing problems ........ r530441 | ritchiem | 2007-04-19 16:07:54 +0100 (Thu, 19 Apr 2007) | 5 lines QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test. ConcurrentSelectorDeliveryManager - method changes from hasFilter to filtersMessages. Forgot to include the file in the commit. ........ r530442 | ritchiem | 2007-04-19 16:08:04 +0100 (Thu, 19 Apr 2007) | 5 lines QPID-455 Pre-fetched messages can cause problems with client tools. Set IMMEDIATE_PREFETCH="true" for previous behaviour. Inverted check now setting System proprety IMMEDIATE_PREFETCH="true" will cause existing messages to be immediately pre-fetched to the newly registered consumer. Solved out standing broker issues see QPID-458 and QPID-459. ........ r530444 | ritchiem | 2007-04-19 16:10:10 +0100 (Thu, 19 Apr 2007) | 3 lines QPID-454 - Message 'taken' notion is per message. Adjusted to be per message per queue. Previous commit was not sufficiently tested and other bugs were causing problems that were not related to this change. ........ r530447 | ritchiem | 2007-04-19 16:15:25 +0100 (Thu, 19 Apr 2007) | 1 line Committed test with localhost as the broker ........ r530449 | ritchiem | 2007-04-19 16:16:32 +0100 (Thu, 19 Apr 2007) | 1 line Additional test to ensure connection closure doesn't cause problems. ........ r530683 | ritchiem | 2007-04-20 09:11:05 +0100 (Fri, 20 Apr 2007) | 2 lines Reinstated the two consumer receive test. Added additional test class to cover the IMMEDIATE_PREFETCHs. ........ r530685 | ritchiem | 2007-04-20 09:13:03 +0100 (Fri, 20 Apr 2007) | 1 line Made IMMEDIATE_PREFETCH to allow access from additional test class. ........ r530686 | ritchiem | 2007-04-20 09:14:57 +0100 (Fri, 20 Apr 2007) | 1 line Moved string literal 'Qpid_HOME' to constant QPID_HOME ........ r530734 | bhupendrab | 2007-04-20 11:42:52 +0100 (Fri, 20 Apr 2007) | 1 line QPID-445 : md5 hashed password will be sent from management console to Qpid ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@530737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java46
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java70
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java72
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java81
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java28
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java118
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java2
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java46
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java22
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java2
24 files changed, 549 insertions, 173 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 2e1653e69d..97c95dae5e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -197,7 +197,6 @@ public class AMQChannel
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
_txnContext);
- // TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 146d0566ce..14aa919356 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -70,7 +70,7 @@ public class Main
private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
-
+ public static final String QPID_HOME = "QPID_HOME";
private static final int IPV4_ADDRESS_LENGTH = 4;
private static final char IPV4_LITERAL_SEPARATOR = '.';
@@ -204,7 +204,7 @@ public class Main
protected void startup() throws InitException, ConfigurationException, Exception
{
- final String QpidHome = System.getProperty("QPID_HOME");
+ final String QpidHome = System.getProperty(QPID_HOME);
final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
if (!configFile.exists())
@@ -213,7 +213,7 @@ public class Main
if (QpidHome == null)
{
- error = error + "\nNote: Qpid_HOME is not set.";
+ error = error + "\nNote: "+QPID_HOME+" is not set.";
}
throw new InitException(error, null);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b2046efee3..e19038504f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -80,18 +80,17 @@ public class AMQMessage
*/
private boolean _immediate;
- private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
- private Subscription _takenBySubcription;
+
private Set<Subscription> _rejectedBy = null;
+
+
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
- public boolean isTaken(AMQQueue queue)
- {
- return _taken.get();
- }
private final int hashcode = System.identityHashCode(this);
@@ -206,7 +205,7 @@ public class AMQMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _taken = new AtomicBoolean(false);
+// _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -326,7 +325,6 @@ public class AMQMessage
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
- _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -459,17 +457,53 @@ public class AMQMessage
return _deliveredToConsumer;
}
-
- public boolean taken(AMQQueue queue, Subscription sub)
+ public boolean isTaken(AMQQueue queue)
{
- if (_taken.getAndSet(true))
+ //return _taken.get();
+
+ synchronized (this)
{
- return true;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ _takenMap.put(queue, taken);
+ }
+
+ return taken.get();
}
- else
+ }
+
+ public boolean taken(AMQQueue queue, Subscription sub)
+ {
+// if (_taken.getAndSet(true))
+// {
+// return true;
+// }
+// else
+// {
+// _takenBySubcription = sub;
+// return false;
+// }
+
+ synchronized (this)
{
- _takenBySubcription = sub;
- return false;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+
+ if (taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, sub);
+ return false;
+ }
}
}
@@ -479,8 +513,26 @@ public class AMQMessage
{
_log.trace("Releasing Message:" + debugIdentity());
}
- _taken.set(false);
- _takenBySubcription = null;
+
+// _taken.set(false);
+// _takenBySubcription = null;
+
+
+ synchronized (this)
+ {
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+ else
+ {
+ taken.set(false);
+ }
+
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, null);
+ }
}
public boolean checkToken(Object token)
@@ -833,16 +885,20 @@ public class AMQMessage
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- _taken + " by :" + _takenBySubcription;
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+// _taken + " by :" + _takenBySubcription;
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcription;
+// return _takenBySubcription;
+ synchronized (this)
+ {
+ return _takenBySubcriptionMap.get(queue);
+ }
}
public void reject(Subscription subscription)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a418bb8f8a..65d5906d05 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -237,8 +236,10 @@ public class AMQQueue implements Managable, Comparable
/**
* Returns messages within the given range of message Ids
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return List of messages
*/
public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
@@ -253,6 +254,7 @@ public class AMQQueue implements Managable, Comparable
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -267,10 +269,10 @@ public class AMQQueue implements Managable, Comparable
/**
* moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages available in the given message
- * id range - setup the other queue for moving messages (stop the async delivery) - send these
- * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
- * remove messages from this queue - remove locks from both queues and start async delivery
+ * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
+ * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
+ * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
+ * locks from both queues and start async delivery
*
* @param fromMessageId
* @param toMessageId
@@ -442,7 +444,7 @@ public class AMQQueue implements Managable, Comparable
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
- if (subscription.hasFilters())
+ if (subscription.filtersMessages())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -641,7 +643,7 @@ public class AMQQueue implements Managable, Comparable
{
_totalMessagesReceived.incrementAndGet();
}
-
+
try
{
_managedObject.checkForNotification(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 979f692361..1f92cee1df 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executor;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
@@ -372,7 +371,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended() && sub.hasFilters())
+ if (!sub.isSuspended() && sub.filtersMessages())
{
Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
for (AMQMessage msg : messageList)
@@ -613,6 +612,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_processingThreadName = Thread.currentThread().getName();
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Running process Queue." + currentStatus());
+ }
+
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -633,11 +637,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
}
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Done process Queue." + currentStatus());
+ }
+
}
// private void sendNextMessage(Subscription sub)
// {
-// if (sub.hasFilters())
+// if (sub.filtersMessages())
// {
// sendNextMessage(sub, sub.getPreDeliveryQueue());
// if (sub.isAutoClose())
@@ -817,6 +827,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Executing Async process.");
+ }
executor.execute(asyncDelivery);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index e9f209839a..e6d5d0c88d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -32,7 +32,7 @@ public interface Subscription
void queueDeleted(AMQQueue queue) throws AMQException;
- boolean hasFilters();
+ boolean filtersMessages();
boolean hasInterest(AMQMessage msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index e3944954f3..3bce950ba9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -158,7 +158,7 @@ public class SubscriptionImpl implements Subscription
}
- if (_filters != null)
+ if (filtersMessages())
{
_messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
@@ -346,9 +346,9 @@ public class SubscriptionImpl implements Subscription
channel.queueDeleted(queue);
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
- return _filters != null;
+ return _filters != null || _noLocal;
}
public boolean hasInterest(AMQMessage msg)
@@ -363,7 +363,10 @@ public class SubscriptionImpl implements Subscription
// return false;
}
- if (_noLocal)
+ final AMQProtocolSession publisher = msg.getPublisher();
+
+ //todo - client id should be recoreded and this test removed but handled below
+ if (_noLocal && publisher != null)
{
// We don't want local messages so check to see if message is one we sent
Object localInstance;
@@ -372,8 +375,9 @@ public class SubscriptionImpl implements Subscription
if ((protocolSession.getClientProperties() != null) &&
(localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if ((msg.getPublisher().getClientProperties() != null) &&
- (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
if (localInstance == msgInstance || localInstance.equals(msgInstance))
{
@@ -388,8 +392,11 @@ public class SubscriptionImpl implements Subscription
}
else
{
+
localInstance = protocolSession.getClientIdentifier();
- msgInstance = msg.getPublisher().getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
+
+ msgInstance = publisher.getClientIdentifier();
if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
if (_logger.isTraceEnabled())
@@ -399,7 +406,6 @@ public class SubscriptionImpl implements Subscription
}
return false;
}
-
}
@@ -623,7 +629,7 @@ public class SubscriptionImpl implements Subscription
return _resendQueue;
}
- if (_filters != null)
+ if (filtersMessages())
{
if (isAutoClose())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 26b040aae0..b500247fa4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -157,7 +157,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
//FIXME the queue could be full of sent messages.
// Either need to clean all PDQs after sending a message
// OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
{
return subscription;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
index a43474559d..20f123179f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.security.access;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanOperationParameter;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
@@ -107,8 +106,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return UserManagement.TYPE;
}
- public boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password)
+ public boolean setPassword(String username, char[] password)
{
try
{
@@ -122,10 +120,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
}
}
- public boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
- @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
- @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ public boolean setRights(String username, boolean read, boolean write, boolean admin)
{
if (_accessRights.get(username) == null)
@@ -179,11 +174,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return true;
}
- public boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password,
- @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
- @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
- @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin)
{
if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password))
{
@@ -195,7 +186,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return false;
}
- public boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username)
+ public boolean deleteUser(String username)
{
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
index 6381213398..ce5e9fa4a7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
@@ -45,7 +45,7 @@ public interface UserManagement
*/
@MBeanOperation(name = "setPassword", description = "Set password for user.")
boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password);
+ @MBeanOperationParameter(name = "password", description = "Password")char[] password);
/**
* set rights for users with given details
@@ -76,7 +76,7 @@ public interface UserManagement
*/
@MBeanOperation(name = "createUser", description = "Create new user from system.")
boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password,
+ @MBeanOperationParameter(name = "password", description = "Password")char[] password,
@MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
@MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
@MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
index 956db64d90..cd0a371b48 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
@@ -176,7 +176,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
}
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
User user = _users.get(principal.getName());
@@ -187,13 +187,10 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
try
{
-
- char[] passwd = convertPassword(password);
-
try
{
_userUpdate.lock();
- user.setPassword(passwd);
+ user.setPassword(password);
try
{
@@ -215,7 +212,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
}
}
}
- catch (UnsupportedEncodingException e)
+ catch (Exception e)
{
return false;
}
@@ -237,23 +234,14 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
return passwd;
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
if (_users.get(principal.getName()) != null)
{
return false;
}
- User user;
- try
- {
- user = new User(principal.getName(), convertPassword(password));
- }
- catch (UnsupportedEncodingException e)
- {
- _logger.warn("Unable to encode password:" + e);
- return false;
- }
+ User user = new User(principal.getName(), password);
try
{
@@ -598,8 +586,13 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
private void encodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException
{
- Base64 b64 = new Base64();
- _encodedPassword = b64.encode(new String(_password).getBytes(DEFAULT_ENCODING));
+ byte[] byteArray = new byte[_password.length];
+ int index = 0;
+ for (char c : _password)
+ {
+ byteArray[index++] = (byte)c;
+ }
+ _encodedPassword = (new Base64()).encode(byteArray);
}
public boolean isModified()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
index 3f6794aaaf..90d08c963e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
@@ -151,12 +151,12 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase
return passwd;
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
return false; // updates denied
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
return false; // updates denied
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
index 8073fcc3c6..494d8e0bf4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
@@ -65,7 +65,7 @@ public interface PrincipalDatabase
* @return True if change was successful
* @throws AccountNotFoundException If the given principal doesn't exist in the Database
*/
- boolean updatePassword(Principal principal, String password)
+ boolean updatePassword(Principal principal, char[] password)
throws AccountNotFoundException;
/**
@@ -74,7 +74,7 @@ public interface PrincipalDatabase
* @param password The password to set for the principal
* @return True on a successful creation
*/
- boolean createPrincipal(Principal principal, String password);
+ boolean createPrincipal(Principal principal, char[] password);
/**
* Delete a principal
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
index b1ac0e1f00..74c330f606 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
@@ -93,12 +93,12 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase
}
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
return false; // updates denied
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
return false; // updates denied
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8bb5b622f7..7b65f279be 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -199,11 +199,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final Object _suspensionLock = new Object();
- /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
+ /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
+ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+
+ /** System property to enable immediate message prefetching */
+ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+ /** Immediate message prefetch default */
+ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
- private AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -1932,20 +1939,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+ if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
{
-// if (!connectionStopped)
+ // We do this now if this is the first call on a started connection
+ if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
{
- if (isSuspended() && _firstDispatcher.getAndSet(false))
+ try
{
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- _logger.info("Suspending channel threw an exception:" + e);
- }
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
}
}
}
@@ -1998,11 +2004,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- // The dispatcher will be null if we have just created this session
- // so suspend the channel before we register our consumer so that we don't
- // start prefetching until a receive/mListener is set.
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
+ if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
{
+ // The dispatcher will be null if we have just created this session
+ // so suspend the channel before we register our consumer so that we don't
+ // start prefetching until a receive/mListener is set.
if (_dispatcher == null)
{
if (!isSuspended())
@@ -2010,6 +2017,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
suspendChannel(true);
+ _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
}
catch (AMQException e)
{
@@ -2018,6 +2026,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
}
+ else
+ {
+ _logger.info("Immediately prefetching existing messages to new consumer.");
+ }
try
{
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
new file mode 100644
index 0000000000..9e48914431
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest
+{
+
+
+ protected void setUp() throws Exception
+ {
+
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+ super.setUp();
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 794fd5c8c1..c9407d8ff6 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -62,7 +62,8 @@ public class MessageListenerMultiConsumerTest extends TestCase
private Connection _clientConnection;
private MessageConsumer _consumer1;
private MessageConsumer _consumer2;
-
+ private Session _clientSession1;
+ private Queue _queue;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
@@ -76,25 +77,25 @@ public class MessageListenerMultiConsumerTest extends TestCase
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "direct://amq.direct//"+this.getClass().getName());
_context = factory.getInitialContext(env);
- Queue queue = (Queue) _context.lookup("queue");
+ _queue = (Queue) _context.lookup("queue");
//Create Client 1
_clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
_clientConnection.start();
- Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer1 = clientSession1.createConsumer(queue);
+ _consumer1 = _clientSession1.createConsumer(_queue);
//Create Client 2
Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer2 = clientSession2.createConsumer(queue);
+ _consumer2 = clientSession2.createConsumer(_queue);
//Create Producer
Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
@@ -104,7 +105,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ MessageProducer producer = producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
@@ -123,20 +124,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
TransportConnection.killAllVMBrokers();
}
-// public void testRecieveC1thenC2() throws Exception
-// {
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-//
-// assertTrue(_consumer1.receive() != null);
-// }
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-// assertTrue(_consumer2.receive() != null);
-// }
-// }
public void testRecieveInterleaved() throws Exception
{
@@ -206,10 +193,12 @@ public class MessageListenerMultiConsumerTest extends TestCase
assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
- public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception
+ public void testRecieveC2Only() throws Exception
{
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
{
+ _logger.info("Performing Receive only on C2");
for (int msg = 0; msg < MSG_COUNT; msg++)
{
assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
@@ -218,6 +207,43 @@ public class MessageListenerMultiConsumerTest extends TestCase
}
}
+ public void testRecieveBoth() throws Exception
+ {
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
+ {
+ _logger.info("Performing Receive only with two consumers on one session ");
+
+ MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(consumer2.receive() != null);
+ }
+ }
+ else
+ {
+ _logger.info("Performing Receive only on both C1 and C2");
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+ }
+
public static junit.framework.Test suite()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java
new file mode 100644
index 0000000000..505af361bc
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.basic.close;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.log4j.Logger;
+
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+public class CloseTests extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(CloseTests.class);
+
+
+ private static final String BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.setUp();
+
+ TransportConnection.killVMBroker(1);
+ }
+
+
+ public void testCloseQueueReceiver() throws AMQException, URLSyntaxException, JMSException
+ {
+ AMQConnection connection = new AMQConnection(BROKER, "guest", "guest", this.getName(), "test");
+
+ Session session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue"));
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue);
+
+ connection.start();
+
+ _logger.info("About to close consumer");
+
+ consumer.close();
+
+ _logger.info("Closed Consumer");
+
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 190b3861f0..15cb9678e4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -36,9 +36,11 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
public class DurableSubscriptionTest extends TestCase
{
+ private static final Logger _logger = Logger.getLogger(DurableSubscriptionTest.class);
protected void setUp() throws Exception
{
@@ -55,41 +57,59 @@ public class DurableSubscriptionTest extends TestCase
public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic");
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
+ _logger.info("Create Session 1");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create Consumer on Session 1");
MessageConsumer consumer1 = session1.createConsumer(topic);
+ _logger.info("Create Producer on Session 1");
MessageProducer producer = session1.createProducer(topic);
+ _logger.info("Create Session 2");
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create Durable Subscriber on Session 2");
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+ _logger.info("Starting connection");
con.start();
+ _logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
Message msg;
+ _logger.info("Receive message on consumer 1:expecting A");
msg = consumer1.receive();
assertEquals("A", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
+
+ _logger.info("Receive message on consumer 1:expecting A");
msg = consumer2.receive();
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(1000);
+ _logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
+ _logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
+ _logger.info("Producer sending message B");
producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive();
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 2 :expecting null");
msg = consumer2.receive(1000);
assertEquals(null, msg);
+ _logger.info("Close connection");
con.close();
}
@@ -97,7 +117,7 @@ public class DurableSubscriptionTest extends TestCase
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic");
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -129,13 +149,17 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(100);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(100);
assertEquals(null, msg);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index fe7efb4e88..a19687b07c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -38,11 +38,11 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
-/**
- * @author Apache Software Foundation
- */
+/** @author Apache Software Foundation */
public class TopicSessionTest extends TestCase
{
+ private static final String BROKER = "vm://:1";
+
protected void setUp() throws Exception
{
super.setUp();
@@ -53,17 +53,16 @@ public class TopicSessionTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
- //Thread.sleep(2000);
}
public void testTopicSubscriptionUnsubscription() throws Exception
{
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(),"MyTopic");
+ AMQConnection con = new AMQConnection(BROKER+"?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
- TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
+ TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
con.start();
@@ -81,11 +80,11 @@ public class TopicSessionTest extends TestCase
session1.unsubscribe("not a subscription");
fail("expected InvalidDestinationException when unsubscribing from unknown subscription");
}
- catch(InvalidDestinationException e)
+ catch (InvalidDestinationException e)
{
; // PASS
}
- catch(Exception e)
+ catch (Exception e)
{
fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e);
}
@@ -106,8 +105,8 @@ public class TopicSessionTest extends TestCase
private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
{
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic1" + String.valueOf(shutdown));
- AMQTopic topic2 = new AMQTopic(con,"MyOtherTopic1" + String.valueOf(shutdown));
+ AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
+ AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
@@ -145,7 +144,7 @@ public class TopicSessionTest extends TestCase
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con1,"MyTopic3");
+ AMQTopic topic = new AMQTopic(con1, "MyTopic3");
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
@@ -176,7 +175,7 @@ public class TopicSessionTest extends TestCase
{
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic4");
+ AMQTopic topic = new AMQTopic(con, "MyTopic4");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -226,11 +225,11 @@ public class TopicSessionTest extends TestCase
producer.send(sentMessage);
TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(),receivedMessage.getText());
+ assertEquals(sentMessage.getText(), receivedMessage.getText());
producer.send(sentMessage);
receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(),receivedMessage.getText());
+ assertEquals(sentMessage.getText(), receivedMessage.getText());
conn.close();
@@ -248,14 +247,14 @@ public class TopicSessionTest extends TestCase
producer.send(session.createTextMessage("hello"));
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
- assertEquals("hello",tm.getText());
+ assertEquals("hello", tm.getText());
try
{
topic.delete();
fail("Expected JMSException : should not be able to delete while there are active consumers");
}
- catch(JMSException je)
+ catch (JMSException je)
{
; //pass
}
@@ -266,7 +265,7 @@ public class TopicSessionTest extends TestCase
{
topic.delete();
}
- catch(JMSException je)
+ catch (JMSException je)
{
fail("Unexpected Exception: " + je.getMessage());
}
@@ -283,11 +282,92 @@ public class TopicSessionTest extends TestCase
}
-
conn.close();
}
+ public void testNoLocal() throws Exception
+ {
+
+ AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test");
+
+ AMQTopic topic = new AMQTopic(con, "testNoLocal");
+
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber noLocal = session1.createDurableSubscriber(topic, "noLocal", "", true);
+ TopicSubscriber select = session1.createDurableSubscriber(topic, "select", "Selector = 'select'", false);
+ TopicSubscriber normal = session1.createDurableSubscriber(topic, "normal");
+
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ con.start();
+ TextMessage m;
+ TextMessage message;
+
+ //send message to all consumers
+ publisher.publish(session1.createTextMessage("hello-new2"));
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber doesn't message
+ m = (TextMessage) select.receive(1000);
+ assertNull(m);
+
+ //test nolocal subscriber doesn't message
+ m = (TextMessage) noLocal.receive(1000);
+ if (m != null)
+ {
+ System.out.println("Message:" + m.getText());
+ }
+ assertNull(m);
+
+ //send message to all consumers
+ message = session1.createTextMessage("hello2");
+ message.setStringProperty("Selector", "select");
+
+ publisher.publish(message);
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber does get message
+ m = (TextMessage) select.receive(100);
+ assertNotNull(m);
+
+ //test nolocal subscriber doesn't message
+ m = (TextMessage) noLocal.receive(100);
+ assertNull(m);
+
+ AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test2", "test");
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicPublisher publisher2 = session2.createPublisher(topic);
+
+
+ message = session2.createTextMessage("hello2");
+ message.setStringProperty("Selector", "select");
+
+ publisher2.publish(message);
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber does get message
+ m = (TextMessage) select.receive(100);
+ assertNotNull(m);
+
+ //test nolocal subscriber does message
+ m = (TextMessage) noLocal.receive(100);
+ assertNotNull(m);
+
+
+ con.close();
+ con2.close();
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(TopicSessionTest.class);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index a5ace41752..42412bebae 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -102,7 +102,7 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
}
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
index 2ac037e4f0..60d8f7920d 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
@@ -31,6 +31,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularDataSupport;
import static org.apache.qpid.management.ui.Constants.*;
+
import org.apache.qpid.management.ui.ApplicationRegistry;
import org.apache.qpid.management.ui.ManagedBean;
import org.apache.qpid.management.ui.jmx.MBeanUtility;
@@ -337,10 +338,17 @@ public class OperationTabControl extends TabControl
// display the parameter data type next to the text field
if (valueInCombo)
+ {
label = _toolkit.createLabel(_paramsComposite, "");
+ }
+ else if (PASSWORD.equalsIgnoreCase(param.getName()))
+ {
+ label = _toolkit.createLabel(_paramsComposite, "(String)");
+ }
else
{
- String str = param.getType() ;
+ String str = param.getType();
+
if (param.getType().lastIndexOf(".") != -1)
str = param.getType().substring(1 + param.getType().lastIndexOf("."));
@@ -581,34 +589,32 @@ public class OperationTabControl extends TabControl
}
// End of custom code
-
- // customized for passwords
- if (PASSWORD.equalsIgnoreCase(param.getName()))
+ ViewUtility.popupInfoMessage(_form.getText(), "Please select the " + ViewUtility.getDisplayText(param.getName()));
+ return;
+ }
+
+ // customized for passwords
+ String securityMechanism = ApplicationRegistry.getSecurityMechanism();
+ if ((MECH_CRAMMD5.equals(securityMechanism)) && PASSWORD.equalsIgnoreCase(param.getName()))
+ {
+ try
{
- try
- {
- param.setValueFromString(ViewUtility.getHashedString(param.getValue()));
- }
- catch (Exception ex)
- {
- MBeanUtility.handleException(_mbean, ex);
- return;
- }
+ param.setValue(ViewUtility.getMD5HashedCharArray(param.getValue()));
+ }
+ catch (Exception ex)
+ {
+ MBeanUtility.handleException(_mbean, ex);
+ return;
}
- // end of customization
- ViewUtility.popupInfoMessage(_form.getText(),
- "Please select the " + ViewUtility.getDisplayText(param.getName()));
-
- return;
}
+ // end of customization
}
}
if (_opData.getImpact() == OPERATION_IMPACT_ACTION)
{
String bean = _mbean.getName() == null ? _mbean.getType() : _mbean.getName();
- int response = ViewUtility.popupConfirmationMessage(bean,
- "Do you want to " + _form.getText()+ " ?");
+ int response = ViewUtility.popupConfirmationMessage(bean, "Do you want to " + _form.getText()+ " ?");
if (response == SWT.YES)
{
executeAndShowResults();
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
index 9b5cddd342..89ab360937 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
@@ -560,10 +560,26 @@ public class ViewUtility
}
}
- public static String getHashedString(Object text) throws NoSuchAlgorithmException, UnsupportedEncodingException
+ public static char[] getMD5HashedCharArray(Object text) throws NoSuchAlgorithmException, UnsupportedEncodingException
{
- char[] chars = getHash((String)text);
- return new String(chars);
+ byte[] data = ((String)text).getBytes("utf-8");
+
+ MessageDigest md = MessageDigest.getInstance("MD5");
+
+ for (byte b : data)
+ {
+ md.update(b);
+ }
+
+ byte[] digest = md.digest();
+
+ char[] byteArray = new char[digest.length];
+ int index = 0;
+ for (byte b : digest)
+ {
+ byteArray[index++] = (char)b;
+ }
+ return byteArray;
}
public static char[] getHash(String text) throws NoSuchAlgorithmException, UnsupportedEncodingException
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 01eb2ba6a2..1a0a341bbf 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -87,7 +87,7 @@ public class SubscriptionTestHelper implements Subscription
{
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}