summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-07-31 15:53:37 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-07-31 15:53:37 +0000
commitd38b1e9610db7b6b84c2830a004e7032b04b1a0b (patch)
treeee8295a834c7ac296c57b8aeb67c4d9619e5d5b4 /java/broker
parent81d0ea9cdc425348d19651eab8a6013a1e34ea39 (diff)
downloadqpid-python-d38b1e9610db7b6b84c2830a004e7032b04b1a0b.tar.gz
Merged revisions 1-447993,447995-448007,448009-448141,448143-448157,448161-448194,448196-448210,448212-448218,448220-448223,448225-448233,448235,448237-448241,448243-448596,448598-448623,448625-448850,448852-448880,448882-448982,448984-449635,449637-449639,449641-449642,449644-449645,449647-449674,449676-449719,449721-449749,449751-449762,449764-449933,449935-449941,449943-450383,450385,450387-450400,450402-450433,450435-450503,450505-450555,450557-450860,450862-451024,451026-451149,451151-451316,451318-451931,451933-452139,452141-452162,452164-452320,452322,452324-452325,452327-452333,452335-452429,452431-452528,452530-452545,452547-453192,453194-453195,453197-453536,453538,453540-453656,453658-454676,454678-454735,454737,454739-454781,454783-462728,462730-462819,462821-462833,462835-462839,462841-463071,463073-463178,463180-463308,463310-463362,463364-463375,463377-463396,463398-463402,463404-463409,463411-463661,463663-463670,463672-463673,463675-464493,464495-464502,464504-464576,464578-464613,464615-464628,464630,464632-464866,464868-464899,464901-464942,464944-464949,464951-465004,465006-465016,465018-465053,465055-465165,465167-465321,465323-465406,465408-465427,465429-465431,465433-465548,465550-466044,466047-466075,466077,466079-466081,466083-466099,466101-466112,466114-466126,466128-466240,466242-466971,466973-466978,466980-467309,467311-467312,467316-467328,467330-467485,467487-467588,467590-467604,467606-467699,467701-467706,467708-467749,467751-468069,468071-468537,468539-469241,469244-469246,469248-469318,469320-469421,469423,469425-469429,469431-469435,469437-469462,469464-469469,469472-469477,469479-469490,469492-469503,469505-469529,469531-469598,469600-469624,469626-469737,469739-469752,469754-469806,469808-469928,469930-469953,469955-470011,470013-470109,470111-470335,470338-470339,470341-470379,470381,470383-470399,470401-470446,470448-470741,470743-470758,470760-470809,470811-470817,470819-470993,470995-471001,471003-471788,471790-471792,471794-472028,472030-472032,472034-472036,472038,472040,472043,472045-472059,472061,472063,472065-472066,472068,472070-472072,472074-472080,472082,472084-472092,472094-472107,472109-472123,472125-472158,472160-472165,472167-472172,472174-472457,472459-472460,472462-472464,472466-472470,472472-472483,472486-472491,472493-472494,472496-472497,472499,472501-472503,472505-472512,472514-472544,472546-472556,472558-472560,472562-472572,472574-472587,472589-472591,472593-472605,472607,472609-472731,472733-472786,472788-472843,472845-472849,472851-472859,472861-472878,472880-472903,472905,472907-472988,472990-472991,472993-473071,473073-473086,473088-473090,473093,473095-473096,473098-473106,473108-473110,473112-473185,473187-473260,473262,473268-473270,473275-473279,473281,473284-473287,473289-473295,473297-473306,473308-473330,473332-473335,473337,473339-473344,473346-473351,473353-473355,473357-473358,473361-473471,473473-473497,473499-473535,473537-473567,473569-473888,473890-474451,474454-474492,474494-474563,474565-474843,474845-474865,474867-474932,474934-475035,475037-475144,475146-475180,475182-475265,475267-475285,475287,475289-475293,475295-475296,475298-475302,475304-475631,475633-475649,475651-475748,475750-475752,475754-476107,476109-476302,476304-476413,476415-476430,476432-476700,476702-476868,476870-477147,477149-477213,477215-477263,477265-477340,477342-477635,477637-477789,477791-477825,477827-477841,477843,477846-477852,477854,477856,477858-477865,477867-477894,477896-478022,478024-478182,478184-478211,478213-478233,478235-478236,478238-478241,478243-478252,478254-478259,478261-478263,478265,478267-478269,478271-478286,478288-478342,478344-478379,478381-478412,478414-478443,478445-478636,478639-478658,478660-478821,478823-478853,478855-478922,478924-478962,478965-478974,478976-479029,479031-479049,479051-479210,479212-479214,479216-479407,479409-479415,479417-479425,479427-479559,479561-479639,479641-479676,479678-479685,479687-480030,480033-480086,480091-480093,480095-480118,480120-480139,480141,480143-480148,480150-480156,480158-480163,480165-480177,480179-480189,480191-480193,480195-480198,480200-480220,480222-480282,480284-480292,480294-480308,480310-480317,480320-480422,480424,480426-480581,480583-480656,480658-480692,480695-480702,480704,480706-480710,480712-480910,480913-480933,480935-480945,480947-480972,480974-480993,480995-481034,481036-481158,481161-481174,481176-481220,481222-481234,481236-481260,481263-481264,481266-481296,481298-481304,481306-481311,481313-481332,481334,481336-481380,481382-481441,481443-482144,482146-482180,482182-482193,482195-482232,482234-482236,482239,482241-482242,482244-482247,482250-482251,482253,482256-482261,482264-482288,482290-482364,482366,482368,482370-482554,482556,482558-482569,482572-482636,482638,482640-482696,482698-482722,482724-482732,482734-482771,482774-482957,482959-483045,483047-483105,483108,483110-483115,483117,483119-483127,483130-483134,483136-483148,483150-483158,483160-483164,483166-483178,483180-483391,483393-483400,483402-483403,483405-483418,483420-483421,483425-483436,483438-483470,483472-483502,483504-483558,483560-483599,483601-483637,483639-483644,483646-483659,483661-483670,483672-483878,483880-483910,483912-483915,483917-483940,483942,483944-483968,483970-483972,483974-483976,483978,483980-484612,484614-484657,484659-484693,484695-484718,484720-484842,484844-484847,484849-484986,484988-485019,485021-485489,485491-485544,485546-485591,485593,485595-485697,485699-485729,485731-485734,485736-485779,485781-485787,485789-485851,485853,485855-486007,486009,486011-486020,486022-486083,486085-486097,486099-486117,486120-486131,486133-486148,486150-486161,486163-486164,486166-486197,486199-486205,486208-486247,486249-486253,486256-486427,486429-486431,486433-486554,486556-486573,486575-486593,486595,486597-486609,486611-486619,486622,486625,486627-486641,486643-486645,486649-486687,486689-486721,486723-486730,486732-486746,486748-486759,486761,486763-486777,486779-486782,486784-486788,486790,486792,486794-486796,486798-487175,487178,487180-487213,487215,487217-487267,487269-487284,487286-487298,487300-487358,487360-487367,487369-487382,487384-487434,487436-487480,487482-487547,487549-487561,487563-487565,487567-487578,487580-487615,487617-487622,487624,487626,487628,487630-487635,487637-487703,487705-487777,487780-487781,487783-487800,487802-487803,487805-487820,487822-487848,487850-487902,487904-488103,488105-488133,488135-488158,488160-488163,488165-488187,488189-488216,488218-488248,488250-488278,488280,488282-488303,488305-488313,488315-488342,488344-488351,488353-488376,488378-488449,488451-488593,488595,488597-488623,488625-488700,488702-488704,488706-488710,488714,488716-488725,488727-488744,488746-488770,488772-488798,488800,488802-488807,488809,488811-488829,488831-488843,488845-488851,488853-489069,489071-489077,489079-489081,489084-489102,489104-489105,489107-489109,489111-489112,489114-489139,489141-489178,489181-489203,489205-489211,489213,489216-489329,489332-489402,489404-489417,489419-489421,489423-489643,489645-489690,489692-489703,489705-489714,489716-489747,489749-489753,489755-489803,489805-489904,489906-490372,490374-490504,490506-490604,490606-490707,490710-490733,490735-490871,490873-490984,490986-491028,491030,491032-491071,491073-491119,491121-491576,491578-491672,491674-491800,491802-491838,491840-491878,491880-492183,492185-492279,492281-492317,492319-492513,492515-492584,492586-492587,492589-492601,492603-492635,492637-492640,492642-492717,492719-492723,492725-492729,492731-492755,492757-492901,492903-492955,492957-492962,492964-492997,492999-493002,493004-493041,493043-493059,493062-493063,493065-493086,493088-493125,493127-493139,493141-493150,493152-493871,493873-494017,494019-494030,494032-494041,494043-494091,494093-494120,494122-494354,494356-494436,494438-494539,494541-494552,494554-494586,494588-494649,494651,494653-494654,494656-494657,494659-494764,494766-494768,494770-494796,494798-494799,494802,494804-494860,494862-494903,494905-494906,494908-495019,495021-495160,495162-495168,495171-495188,495190-495229,495231-495254,495256-495303,495305-495313,495315-495336,495338-495372,495374-495379,495381-495454,495457-495459,495462-495516,495518-495524,495526-495531,495533-495548,495551-495553,495555,495557-495558,495560,495562-495573,495575-495583,495585-495594,495596-495628,495630-495638,495640-495651,495653-495660,495662-495753,495755-496259,496261-496262,496264-496269,496271-496275,496277-496301,496303-496316,496318-496383,496385-496413,496415-496495,496497-496625,496627-496636,496638-496640,496642-496647,496650-496657,496659-496660,496663-496664,496666-496677,496679-496681,496683-496730,496732-496750,496752,496754-496784,496786-496832,496834-496840,496842-496990,496992-496995,496997-497340,497343-497351,497353-497403,497405-497424,497426-497438,497440-497481,497483-497497,497499-497765,497767-497769,497771-497775,497777-497778,497780,497782-497783,497785,497787-497812,497814-497871,497873-497877,497879-498573,498575-498588,498590,498592,498594-498636,498638-498669,498671-498686,498688-498689,498691-498719,498721-498964,498966-498969,498971-498973,498975-498982,498985-499035,499037-499040,499042,499044-499048,499050-499082,499084-499086,499088-499164,499167-499169,499171-499355,499357-499370,499372-499373,499375-499391,499393,499395-499425,499428,499430-499445,499447-499455,499457-499460,499462-499465,499467,499469-499489,499491-499492,499494-499531,499533-499562,499566-499627,499629-499715,499717-499732,499734-499755,499758-499763,499765-499780,499782-499795,499797-499802,499804-499844,499846,499848-499850,499852-499863,499865-499873,499875-499974,499976-499978,499980-500263,500265-500283,500285-500309,500311-501000,501002,501012-501057,501059-501095,501097-501390,501392-501410,501413-501447,501449-501454,501456,501458-501464,501466-501471,501473-501803,501805-501913,501915-501916,501918-501919,501921-501944,501946-502171,502173-502177,502181,502183-502247,502250-502252,502254-502260,502262-502267,502270,502272,502274-502575,502577-502609,502611-502619,502621-502626,502628-502654,502656-503592,503594-503603,503605-503608,503610-503636,503638-503645,503647-503705,503707-503789,503791-504024,504026-504111,504113-504506,504508-504735,504737-504863,504865-504867,504869-504914,504916-505241,505243-505254,505257-505267,505269-505354,505356-505891,505893-505971,505973-506400,506402-506404,506407-506438,506440-506516,506518-506541,506543-506966,506968-506971,506973-507095,507097-507108,507111-507454,507456,507459-507471,507473-507556,507558,507560-507581,507585-507594,507597,507599-507608,507610-507728,507730-507893,507895-507937,507940-508234,508236-508350,508352-508365,508367-508380,508383,508386-508415,508417-508648,508650-508941,508943-509146,509148-509171,509173-509175,509179-509201,509203-509207,509209-509215,509217-509222,509224-509477,509480-509627,509629-509634,509636-509641,509643-509736,509738-509931,509933-510059,510061-510075,510077-510158,510161-510896,510898-510938,510940-511388,511390-511922,511924-512287,512289-512698,512702-512813,512815-512817,512819-513359,513361-513370,513372-514702,514704-514886,514888-514902,514904-515126,515129-515141,515143-515516,515518-515534,515536-515538,515540-515648,515650-515651,515653-516070,516072-516411,516413-516448,516450,516452-517637,517639-517647,517649-517659,517661-517663,517665-517677,517679-517682,517684-517744,517746-518085,518087-518175,518177-518558,518560-518568,518571-518666,518668,518670-518699,518701-518987,518990-518992,518994-519908,519910-519932,519934-520414,520416-520842,520844-520937,520939-521362,521364-521681,521683-521704,521706-521709,521711-521714,521716-521781,521783-521792,521794-522462,522464-522527,522529-522534,522536-522566,522568-522958,522960,522962-522966,522968-522976,522978-522980,522982-522988,522992-522993,522995-523244,523246-523746,523748-524049,524051-524738,524741-524742,524744-524762,524764,524766,524768-525486,525488-525530,525532,525534,525537-525552,525554-525765,525767-525776,525778-525784,525789-525803,525805-525816,525818-525828,525830-525861,525863-525866,525868-526090,526092-526112,526114-526116,526119-526121,526123-526149,526151-526153,526155-526156,526160-526165,526167-526186,526188-526193,526196-526197,526200-526665,526667-526682,526686-526690,526693,526695-526708,526710-526713,526715-526775,526777-526802,526804-526806,526808-527048,527051-527052,527054-527181,527183-527486,527488-527492,527494-527498,527500-527508,527510-527517,527519-527536,527538-527555,527559-527802,527804-527842,527844-527847,527849-527875,527877-527940,527942-527958,527960-527971,527973-528002,528004,528006-528423,528425-529232,529234-529245,529247-529296,529298-529634,529636-529658,529660-529665,529667-529668,529670-530033,530035-530036,530038-530040,530045-530046,530050-530051,530053-530431,530433-530436,530439-530440,530443,530445-530446,530448,530450-530682,530684,530687-530696,530698-530733,530735-530776,530778-530795,530799,530801-530811,530813-530818,530820-530837,530839-531436,531438-531455,531457,531459-531511,531514,531516,531519-531523,531525,531528-531858,531860-531864,531866-531907,531909-531916,531918-531936,531938-531988,531990-532001,532003-532371,532373-532465,532467-532727,532729-532765,532767-532785,532788-532790,532792-532793,532795-533064,533066-533074,533076,533080-533130,533132-533139,533142-533703,533705-533720,533722-533763,533766-533818,533820-533839,533841-533859,533862-534035,534037-534112,534114-534116,534118-534472,534474-534477,534479-534762,534764-534896,534898-534902,534904-535253,535255-535308,535310-535808,535810-535873,535875-536007,536009-536140,536142-536162,536165-536242,536244-536252,536254-536278,536280-536338,536340-536448,536450-536479,536481-536482,536484-536485,536487-536495,536497,536500-536505,536507-536561,536563-536570,536572,536574-536583,536586-536823,536825-537014,537016-537018,537020-537025,537027-537028,537030-537160,537162-537170,537172-537672,537674-537781,537783-537833,537836-537840,537842-537844,537846-537953,537955-538034,538036-538078,538080-538083,538085-538097,538099-538108,538110-538239,538241-538881,538883-538906,538908-538911,538913-538921,538923-539177,539179-539190,539192-539469,539471-539475,539477-539480,539482-539483,539485-539500,539502-539593,539595-539782,539784-539787,539789-540106,540108-540168,540170-540510,540512-541246,541248-542483,542485-542788,542790-543495,543497-544108,544110-544421,544423-544507,544509-544865,544867-545145,545147-546095,546097-546189,546191-546440,546442-546457,546459-547177,547179-547626,547628-548275,548277-548278,548280-548301,548303-548307,548309-548311,548313-548314,548316,548318,548320-548380,548382-549010,549012-549529,549531-549848,549850-550508,550510-550747,550749-550772,550774-550848,550850-551116,551122-553446,553448-561282 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r541920 | tomasr | 2007-05-26 18:35:51 +0100 (Sat, 26 May 2007) | 1 line QPID-136 Initial Prefetch Implementation ........ r549112 | arnaudsimon | 2007-06-20 15:11:03 +0100 (Wed, 20 Jun 2007) | 1 line changed setText to use UTF8 as default encoder ........ r551167 | arnaudsimon | 2007-06-27 15:08:50 +0100 (Wed, 27 Jun 2007) | 1 line added public void declareAndBind(AMQDestination amqd) ........ r551174 | ritchiem | 2007-06-27 15:23:21 +0100 (Wed, 27 Jun 2007) | 3 lines Caused each of these tests to run 10 times to help identify any race conditions that were occuring. Updated the CommitRollbackTest to be more robust in the detection of failure. ........ r551175 | ritchiem | 2007-06-27 15:23:52 +0100 (Wed, 27 Jun 2007) | 1 line Allowed more of the constants to be set via system properties. ........ r551176 | ritchiem | 2007-06-27 15:25:13 +0100 (Wed, 27 Jun 2007) | 1 line renamed the passwd programme qpid-passwd to match the change in bin directory. ........ r552441 | rupertlssmith | 2007-07-02 10:23:54 +0100 (Mon, 02 Jul 2007) | 1 line Added log4j as slfj logger for perftests. ........ r552499 | rupertlssmith | 2007-07-02 15:17:45 +0100 (Mon, 02 Jul 2007) | 1 line Added some documentation. ........ r553172 | rupertlssmith | 2007-07-04 12:11:04 +0100 (Wed, 04 Jul 2007) | 1 line Messages moved by management console now commited on the message store. ........ r553248 | ritchiem | 2007-07-04 17:05:55 +0100 (Wed, 04 Jul 2007) | 6 lines Addition of the MessageStore Tool. Small changes to the Exchanges to allow the extraction of currently listed items. Extracted initial broker configuration mechanism to a reusable class. Have modified broker to use it. Move the Passwd.java to new tools package structure on the broker. ........ r553265 | ritchiem | 2007-07-04 17:42:59 +0100 (Wed, 04 Jul 2007) | 1 line Tidied up some extranious logging. ........ r553432 | rupertlssmith | 2007-07-05 10:28:33 +0100 (Thu, 05 Jul 2007) | 1 line Fixed test state carrying over to mandatory message test from immediate. Also added in-vm clean up to other tests. ........ r553480 | ritchiem | 2007-07-05 13:40:50 +0100 (Thu, 05 Jul 2007) | 2 lines Minor changes and tidy up when running via command line. Added Copy command. ........ r553482 | ritchiem | 2007-07-05 13:44:42 +0100 (Thu, 05 Jul 2007) | 2 lines Forgot to compile before committing. Missed a method change in the Select command. ........ r554964 | rupertlssmith | 2007-07-10 15:40:04 +0100 (Tue, 10 Jul 2007) | 1 line Added message copy method. ........ r555249 | rupertlssmith | 2007-07-11 12:52:39 +0100 (Wed, 11 Jul 2007) | 1 line Update perftests to center better around current performance. ........ r556011 | rupertlssmith | 2007-07-13 15:24:03 +0100 (Fri, 13 Jul 2007) | 1 line Moved test framework into its own package and cleaned it up. ........ r556024 | rupertlssmith | 2007-07-13 16:02:06 +0100 (Fri, 13 Jul 2007) | 1 line Completed javadoc for test framework. ........ r556628 | rgodfrey | 2007-07-16 14:50:57 +0100 (Mon, 16 Jul 2007) | 1 line QPID-537 : Make AMQMessage.incrementReference public ........ r556675 | cctrieloff | 2007-07-16 18:36:21 +0100 (Mon, 16 Jul 2007) | 2 lines added notice entries ........ r556680 | cctrieloff | 2007-07-16 18:56:40 +0100 (Mon, 16 Jul 2007) | 2 lines clean up ........ r556682 | cctrieloff | 2007-07-16 18:58:37 +0100 (Mon, 16 Jul 2007) | 2 lines removed optional cppunit as not in distributed packages ........ r556845 | ritchiem | 2007-07-17 09:26:33 +0100 (Tue, 17 Jul 2007) | 3 lines Additional logging in case of broker failure at startup. Use broker logger at error level as well as System.out ........ r556847 | ritchiem | 2007-07-17 09:35:35 +0100 (Tue, 17 Jul 2007) | 3 lines Update to the MessageStore Tool to provide Move and Purge functionality. Updated to remove the AMQExceptions that will be removed from the Exchange class. ........ r556861 | ritchiem | 2007-07-17 10:26:47 +0100 (Tue, 17 Jul 2007) | 2 lines QPID-538 Check to ensure a duplicate binding is not created. ........ r556868 | ritchiem | 2007-07-17 10:55:56 +0100 (Tue, 17 Jul 2007) | 1 line Addition of simple pub/sub examples. ........ r556869 | ritchiem | 2007-07-17 10:56:17 +0100 (Tue, 17 Jul 2007) | 1 line QPID-540 Prevent NPE when purging message from the main _message queue in the ConcurrentSelectorDeliveryManager that have been delivered via a Subscribers _messageQueue. Ensuring that any expired messages are still correctly handled. i.e. the Queue size/depth is reduced and the message correctly dequeued from the underlying store. ........ r556871 | ritchiem | 2007-07-17 10:57:35 +0100 (Tue, 17 Jul 2007) | 1 line White space & code formatting change ........ r556872 | ritchiem | 2007-07-17 10:58:35 +0100 (Tue, 17 Jul 2007) | 3 lines Added additional information to hard-error logging in exceptionReceived. Fully expanded imports ........ r556888 | ritchiem | 2007-07-17 12:33:08 +0100 (Tue, 17 Jul 2007) | 1 line Change to allow the management port to be specified on the command line, via -m or --mport ........ r556890 | ritchiem | 2007-07-17 12:38:10 +0100 (Tue, 17 Jul 2007) | 4 lines QPID-541 A large portion of memory was being wasted in 32k ByteBuffers being held by the AMQShortStrings. Patch submitted by Robert Godfrey to intern() the AMQSSs to reduce memory usage. Current implementation *will* impact performance due to the usage of a static Map for storage. However, a thread local implementation is in the works. ........ r556898 | rgodfrey | 2007-07-17 13:00:57 +0100 (Tue, 17 Jul 2007) | 1 line QPID-541 : Change to use threadlocal maps for intern for the common case to avoid excessive synchronization. In the uncommon case will require more lookup. ........ r556958 | rupertlssmith | 2007-07-17 17:22:16 +0100 (Tue, 17 Jul 2007) | 1 line Refactored the distributed test clients and coordinator to support different distribution and sequencing engines. ........ r556967 | rupertlssmith | 2007-07-17 17:40:14 +0100 (Tue, 17 Jul 2007) | 1 line Removed unused package. ........ r556968 | rupertlssmith | 2007-07-17 17:42:00 +0100 (Tue, 17 Jul 2007) | 1 line Retired old interop tests. ........ r556969 | rupertlssmith | 2007-07-17 17:43:49 +0100 (Tue, 17 Jul 2007) | 1 line Properties file not needed any more. Test properties all driven from MessagingTestConfigProperties. ........ r557276 | ritchiem | 2007-07-18 15:36:11 +0100 (Wed, 18 Jul 2007) | 1 line Updates to pom files and Licensing/Notice files for M2 release. ........ r557279 | ritchiem | 2007-07-18 15:51:42 +0100 (Wed, 18 Jul 2007) | 1 line This is left over from ANT ........ r557281 | ritchiem | 2007-07-18 15:54:06 +0100 (Wed, 18 Jul 2007) | 1 line updated comment to refelect property values ........ r557286 | ritchiem | 2007-07-18 16:02:22 +0100 (Wed, 18 Jul 2007) | 1 line Set default mvn build to assembly:assembly ........ r557288 | ritchiem | 2007-07-18 16:09:07 +0100 (Wed, 18 Jul 2007) | 1 line Ensure the top level release-docs directory is included in the builds ........ r557306 | ritchiem | 2007-07-18 17:01:58 +0100 (Wed, 18 Jul 2007) | 1 line Update fix incorrect license headers. ........ r557312 | ritchiem | 2007-07-18 17:07:01 +0100 (Wed, 18 Jul 2007) | 1 line added license ........ r557314 | ritchiem | 2007-07-18 17:11:17 +0100 (Wed, 18 Jul 2007) | 1 line added license ........ r557452 | aconway | 2007-07-19 03:03:02 +0100 (Thu, 19 Jul 2007) | 14 lines * lib/broker/Daemon.cpp, .h - Rewrote to remove libdaemon dependency. - PID file stored in /var/run if root, /tmp otherwise. * src/qpidd.cpp: Use new Daemon.cpp. - lock files stored in /var/run (for root) or /tmp. - updated to trunk daemon flag behavior. * lib/broker/Makefile.am (libqpidbroker_la_LIBADD): - Daemon.cpp now needs -lboost_iostreams * NOTICE, README: Removed mention of libdaemon. ........ r558027 | ritchiem | 2007-07-20 17:08:05 +0100 (Fri, 20 Jul 2007) | 1 line Added a logger but only used to control the toString inclusion of password. If in debug mode it will include password otherwise the password is "********". ........ r558072 | astitcher | 2007-07-20 18:49:41 +0100 (Fri, 20 Jul 2007) | 2 lines Fixed the license from the "old" apache copyright notice ........ r558083 | aconway | 2007-07-20 19:29:08 +0100 (Fri, 20 Jul 2007) | 2 lines Remove -ldaemon, we no longer require libdaemon. ........ r558099 | aconway | 2007-07-20 20:20:01 +0100 (Fri, 20 Jul 2007) | 2 lines Ignore QPID_ env variables that don't correspond to known options. ........ r558108 | cctrieloff | 2007-07-20 20:55:40 +0100 (Fri, 20 Jul 2007) | 2 lines typo fix ........ r558114 | rajith | 2007-07-20 21:11:03 +0100 (Fri, 20 Jul 2007) | 1 line added release notes ........ r558115 | rajith | 2007-07-20 21:12:20 +0100 (Fri, 20 Jul 2007) | 1 line Checking in the release notes ........ r558116 | aconway | 2007-07-20 21:16:20 +0100 (Fri, 20 Jul 2007) | 3 lines Removed .txt from RELEASE_NOTES Added RELEASE_NOTES to EXTRA_DIST in Makefile.am ........ r558168 | rajith | 2007-07-20 23:03:42 +0100 (Fri, 20 Jul 2007) | 1 line added release notes ........ r558170 | rajith | 2007-07-20 23:04:11 +0100 (Fri, 20 Jul 2007) | 1 line added release notes ........ r558630 | gsim | 2007-07-23 08:21:49 +0100 (Mon, 23 Jul 2007) | 3 lines Revised release notes: removed bug fixed on this branch, removed outstanding feature lists as it is not terribly accurate or helpful. ........ r559419 | rupertlssmith | 2007-07-25 13:17:59 +0100 (Wed, 25 Jul 2007) | 1 line Refactored interop tests into general distributed test framework. Moved framework under systests from integrationtests. ........ r559427 | ritchiem | 2007-07-25 13:40:24 +0100 (Wed, 25 Jul 2007) | 2 lines AMQMessage - added //todo-s and removed unused parameter StoreContext from expired() method call. ConcurrentSelectorDeliveryManager - Update to reflect expired() call change. Created new _reaperContextStore to be used when performing reaper operations such as message dequeue due to expiration. Removed old commented code. ........ r559455 | rupertlssmith | 2007-07-25 14:40:16 +0100 (Wed, 25 Jul 2007) | 1 line Added to comments. ........ r559456 | rupertlssmith | 2007-07-25 14:41:21 +0100 (Wed, 25 Jul 2007) | 1 line Removed redundant method. ........ r559458 | rupertlssmith | 2007-07-25 14:57:21 +0100 (Wed, 25 Jul 2007) | 1 line Refactored packaging of test framework. ........ r559461 | rupertlssmith | 2007-07-25 15:00:16 +0100 (Wed, 25 Jul 2007) | 1 line Removed redundant packages. ........ r559943 | rhs | 2007-07-26 20:15:17 +0100 (Thu, 26 Jul 2007) | 1 line adding missing ack ........ r559944 | rhs | 2007-07-26 20:15:44 +0100 (Thu, 26 Jul 2007) | 1 line removed old script ........ r560198 | ritchiem | 2007-07-27 12:30:34 +0100 (Fri, 27 Jul 2007) | 1 line Added files to ignore list ........ r560225 | ritchiem | 2007-07-27 14:33:50 +0100 (Fri, 27 Jul 2007) | 1 line Converted namespaces from Qpid.* to Apache.Qpid.* ........ r560471 | tomasr | 2007-07-28 03:35:41 +0100 (Sat, 28 Jul 2007) | 1 line Removed using directives causing compilation failure in .NET 1.1 ........ r561278 | ritchiem | 2007-07-31 10:07:57 +0100 (Tue, 31 Jul 2007) | 8 lines Changes to POMs. Client pom now builds a single jar with all dependancies included in the single bundle. systests/pom.xml adjusted to include only *Test.class items. This will fix the current Error on OptOutTestCase management/eclipse-plugin/pom.xml - editied to include there required MANIFEST.MF to identify plugin to eclipse distribution/src/main/assembly/management-eclipse-plugin.xml editied to include there required MANIFEST.MF to identify the plugin distribution/pom.xml - white space Also updated log4j.xml default to create an alert.log file from the AMQQueue alerting. Added a debug.log4j.xml that gives example of debugging the broker via log4j. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561365 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rwxr-xr-xjava/broker/bin/msTool.sh56
-rw-r--r--java/broker/bin/qpid-passwd2
-rw-r--r--java/broker/bin/qpid-server6
-rw-r--r--java/broker/distribution/src/main/assembly/broker-bin.xml4
-rw-r--r--java/broker/etc/debug.log4j.xml114
-rw-r--r--java/broker/etc/log4j.xml36
-rw-r--r--java/broker/etc/messagestoretool-log4j.xml53
-rw-r--r--java/broker/etc/mstool-log4j.xml54
-rw-r--r--java/broker/pom.xml19
-rw-r--r--java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java188
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java61
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java250
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java109
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java124
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java262
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java648
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java85
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java299
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java98
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java314
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java94
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java166
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java54
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java233
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java513
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java (renamed from java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java)2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java90
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java121
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java363
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java75
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java352
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java116
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java25
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java25
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java25
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java25
72 files changed, 5604 insertions, 396 deletions
diff --git a/java/broker/bin/msTool.sh b/java/broker/bin/msTool.sh
new file mode 100755
index 0000000000..4d55ee7811
--- /dev/null
+++ b/java/broker/bin/msTool.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_TOOLS/lib/qpid-incubating.jar
+
+die() {
+ if [[ $1 = -usage ]]; then
+ shift
+ usage=true
+ else
+ usage=false
+ fi
+ echo "$@"
+ $usage && echo
+ $usage && usage
+ exit 1
+}
+
+cygwin=false
+if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then
+ cygwin=true
+fi
+
+if $cygwin; then
+ QPID_TOOLS=$(cygpath -w $QPID_TOOLS)
+fi
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_OPTS=-Dlog4j.configuration=file:$QPID_TOOLS/etc/mstool-log4j.xml \
+ QPID_CLASSPATH=$QPID_LIBS
+
+if [ -z "$QPID_TOOLS" ]; then
+ die "QPID_TOOLS be set"
+fi
+
+. qpid-run org.apache.qpid.tools.messagestore.MessageStoreTool "$@"
diff --git a/java/broker/bin/qpid-passwd b/java/broker/bin/qpid-passwd
index 6e64af6e70..f046252522 100644
--- a/java/broker/bin/qpid-passwd
+++ b/java/broker/bin/qpid-passwd
@@ -27,4 +27,4 @@ export JAVA=java \
JAVA_MEM=-Xmx1024m \
QPID_CLASSPATH=$QPID_LIBS
-. qpid-run org.apache.qpid.server.security.Passwd "$@"
+. qpid-run org.apache.qpid.tools.security.Passwd "$@"
diff --git a/java/broker/bin/qpid-server b/java/broker/bin/qpid-server
index 76d0ad786d..fabf7e941c 100644
--- a/java/broker/bin/qpid-server
+++ b/java/broker/bin/qpid-server
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,7 @@ QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
export JAVA=java \
JAVA_VM=-server \
JAVA_MEM=-Xmx1024m \
- JAVA_GC=-XX:-UseConcMarkSweepGC
+ JAVA_GC=-XX:-UseConcMarkSweepGC \
QPID_CLASSPATH=$QPID_LIBS
. qpid-run org.apache.qpid.server.Main "$@"
diff --git a/java/broker/distribution/src/main/assembly/broker-bin.xml b/java/broker/distribution/src/main/assembly/broker-bin.xml
index 4b32630771..e66190a3f4 100644
--- a/java/broker/distribution/src/main/assembly/broker-bin.xml
+++ b/java/broker/distribution/src/main/assembly/broker-bin.xml
@@ -114,9 +114,9 @@
<fileMode>473</fileMode>
</file>
<file>
- <source>../bin/passwd</source>
+ <source>../bin/qpid-passwd</source>
<outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
- <destName>passwd</destName>
+ <destName>qpid-passwd</destName>
<fileMode>473</fileMode>
</file>
<file>
diff --git a/java/broker/etc/debug.log4j.xml b/java/broker/etc/debug.log4j.xml
new file mode 100644
index 0000000000..e8fd7e119d
--- /dev/null
+++ b/java/broker/etc/debug.log4j.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="ArchivingFileAppender" class="org.apache.log4j.QpidCompositeRollingAppender">
+ <!-- Ensure that logs allways have the dateFormat set-->
+ <param name="StaticLogFileName" value="false"/>
+ <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/>
+ <param name="Append" value="false"/>
+ <!-- Change the direction so newer files have bigger numbers -->
+ <!-- So log.1 is written then log.2 etc This prevents a lot of file renames at log rollover -->
+ <param name="CountDirection" value="1"/>
+ <!-- Use default 10MB -->
+ <!--param name="MaxFileSize" value="100000"/-->
+ <param name="DatePattern" value="'.'yyyy-MM-dd-HH-mm"/>
+ <!-- Unlimited number of backups -->
+ <param name="MaxSizeRollBackups" value="-1"/>
+ <!-- Compress(gzip) the backup files-->
+ <param name="CompressBackupFiles" value="true"/>
+ <!-- Compress the backup files using a second thread -->
+ <param name="CompressAsync" value="true"/>
+ <!-- Start at zero numbered files-->
+ <param name="ZeroBased" value="true"/>
+ <!-- Backup Location -->
+ <param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FileAppender" class="org.apache.log4j.FileAppender">
+ <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/>
+ <param name="Append" value="false"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="AlertFile" class="org.apache.log4j.FileAppender">
+ <param name="File" value="${QPID_WORK}/log/alert.log"/>
+ <param name="Append" value="false"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="Qpid.Broker">
+ <priority value="debug"/>
+ <appender-ref ref="AlertFile"/>
+ <!--appender-ref ref="STDOUT"/-->
+ </category>
+
+
+ <category name="org.apache.qpid.server.queue.AMQQueueMBean">
+ <priority value="info"/>
+ <appender-ref ref="AlertFile"/>
+ </category>
+
+
+ <!-- Provide warnings to standard output -->
+ <!--category name="org.apache.qpid">
+ <priority value="warn"/>
+ <appender-ref ref="STDOUT"/>
+ </category-->
+
+
+ <!-- Additional level settings for debugging -->
+ <!-- Each class in the Broker is a category that can have its logging level adjusted. -->
+ <!-- This will provide more details if available about that classes processing. -->
+ <!--category name="org.apache.qpid.server.txn">
+ <priority value="debug"/>
+ </category>-->
+
+ <!--<category name="org.apache.qpid.server.store">
+ <priority value="debug"/>
+ </category-->
+
+ <!-- Log all info events to file -->
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="STDOUT"/>
+ <appender-ref ref="FileAppender"/>
+ </root>
+
+</log4j:configuration>
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml
index 2fb7b80c96..2060246b7f 100644
--- a/java/broker/etc/log4j.xml
+++ b/java/broker/etc/log4j.xml
@@ -44,7 +44,7 @@
<param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
</layout>
</appender>
@@ -57,6 +57,15 @@
</layout>
</appender>
+ <appender name="AlertFile" class="org.apache.log4j.FileAppender">
+ <param name="File" value="${QPID_WORK}/log/alert.log"/>
+ <param name="Append" value="false"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
<appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
@@ -64,15 +73,27 @@
</layout>
</appender>
- <category name="Qpid.Broker">
+ <category name="Qpid.Broker">
+ <priority value="debug"/>
+ <appender-ref ref="AlertFile"/>
+ </category>
+
+ <category name="org.apache.qpid.server.queue.AMQQueueMBean">
<priority value="info"/>
+ <appender-ref ref="AlertFile"/>
+ </category>
+
+ <!-- Provide warnings to standard output -->
+ <category name="org.apache.qpid">
+ <priority value="warn"/>
+ <appender-ref ref="STDOUT"/>
</category>
- <!--<category name="org.apache.qpid.server.store">
- <priority value="debug"/>
- </category-->
- <!--category name="org.apache.qpid.server.queue">
+ <!-- Examples of additional logging settings -->
+ <!-- Used to generate extra debug. See debug.log4j.xml -->
+
+ <!--<category name="org.apache.qpid.server.store">
<priority value="debug"/>
</category-->
@@ -80,10 +101,11 @@
<priority value="debug"/>
</category>-->
+ <!-- Log all info events to file -->
<root>
<priority value="info"/>
- <appender-ref ref="STDOUT"/>
<appender-ref ref="FileAppender"/>
<!--appender-ref ref="ArchivingFileAppender"/-->
</root>
+
</log4j:configuration>
diff --git a/java/broker/etc/messagestoretool-log4j.xml b/java/broker/etc/messagestoretool-log4j.xml
new file mode 100644
index 0000000000..0313de4f95
--- /dev/null
+++ b/java/broker/etc/messagestoretool-log4j.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.qpid.tools">
+ <priority value="info"/>
+ </category>
+
+ <category name="org.apache.qpid">
+ <priority value="info"/>
+ </category>
+
+ <category name="org.apache.qpid.server.security">
+ <priority value="info"/>
+ </category>
+
+ <category name="org.apache.qpid.server.management">
+ <priority value="error"/>
+ </category>
+
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+</log4j:configuration>
diff --git a/java/broker/etc/mstool-log4j.xml b/java/broker/etc/mstool-log4j.xml
new file mode 100644
index 0000000000..8c46010e2d
--- /dev/null
+++ b/java/broker/etc/mstool-log4j.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <!--param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/-->
+ <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.qpid.tools">
+ <priority value="info"/>
+ </category>
+
+ <category name="org.apache.qpid">
+ <priority value="error"/>
+ </category>
+
+ <category name="org.apache.qpid.server.security">
+ <priority value="error"/>
+ </category>
+
+ <category name="org.apache.qpid.server.management">
+ <priority value="error"/>
+ </category>
+
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+</log4j:configuration>
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index 81c5d22b22..35c29d504f 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -101,6 +101,25 @@
<build>
<plugins>
+
+
+ <!--plugin>
+ <artifactId>minijar-maven-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>minijars</goal>
+ </goals>
+ <configuration>
+ <stripUnusedClasses>true</stripUnusedClasses>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin-->
+
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
diff --git a/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java b/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
index 931c15a664..7e0c4defe1 100644
--- a/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
+++ b/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import org.apache.log4j.helpers.CountingQuietWriter;
@@ -39,8 +38,6 @@ import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.helpers.OptionConverter;
import org.apache.log4j.spi.LoggingEvent;
-import org.apache.qpid.framing.FieldTable;
-
/**
* <p>CompositeRollingAppender combines RollingFileAppender and DailyRollingFileAppender<br> It can function as either
* or do both at the same time (making size based rolling files like RollingFileAppender until a data/time boundary is
diff --git a/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java b/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java
new file mode 100644
index 0000000000..40ff590a0a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class Configuration
+{
+ public static final String QPID_HOME = "QPID_HOME";
+
+ final String QPIDHOME = System.getProperty(QPID_HOME);
+
+ private static Logger _devlog = LoggerFactory.getLogger(Configuration.class);
+
+ public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+ public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+
+ protected final Options _options = new Options();
+ protected CommandLine _commandLine;
+ protected File _configFile;
+
+
+ public Configuration()
+ {
+
+ }
+
+ public void processCommandline(String[] args) throws InitException
+ {
+ try
+ {
+ _commandLine = new PosixParser().parse(_options, args);
+ }
+ catch (ParseException e)
+ {
+ throw new InitException("Unable to parse commmandline", e);
+ }
+
+ final File defaultConfigFile = new File(QPIDHOME, DEFAULT_CONFIG_FILE);
+ setConfig(new File(_commandLine.getOptionValue("c", defaultConfigFile.getPath())));
+ }
+
+ public void setConfig(File file)
+ {
+ _configFile = file;
+ }
+
+ /**
+ * @param option The option to set.
+ */
+ public void setOption(Option option)
+ {
+ _options.addOption(option);
+ }
+
+ /**
+ * getOptionValue from the configuration
+ * @param option variable argument, first string is option to get, second if present is the default value.
+ * @return the String for the given option or null if not present (if default value not specified)
+ */
+ public String getOptionValue(String... option)
+ {
+ if (option.length == 1)
+ {
+ return _commandLine.getOptionValue(option[0]);
+ }
+ else if (option.length == 2)
+ {
+ return _commandLine.getOptionValue(option[0], option[1]);
+ }
+ return null;
+ }
+
+ public void loadConfig(File file) throws InitException
+ {
+ setConfig(file);
+ loadConfig();
+ }
+
+ private void loadConfig() throws InitException
+ {
+ if (!_configFile.exists())
+ {
+ String error = "File " + _configFile + " could not be found. Check the file exists and is readable.";
+
+ if (QPIDHOME == null)
+ {
+ error = error + "\nNote: " + QPID_HOME + " is not set.";
+ }
+
+ throw new InitException(error, null);
+ }
+ else
+ {
+ _devlog.debug("Using configuration file " + _configFile.getAbsolutePath());
+ }
+
+// String logConfig = _commandLine.getOptionValue("l");
+// String logWatchConfig = _commandLine.getOptionValue("w", "0");
+// if (logConfig != null)
+// {
+// File logConfigFile = new File(logConfig);
+// configureLogging(logConfigFile, logWatchConfig);
+// }
+// else
+// {
+// File configFileDirectory = _configFile.getParentFile();
+// File logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+// configureLogging(logConfigFile, logWatchConfig);
+// }
+ }
+
+
+// private void configureLogging(File logConfigFile, String logWatchConfig)
+// {
+// int logWatchTime = 0;
+// try
+// {
+// logWatchTime = Integer.parseInt(logWatchConfig);
+// }
+// catch (NumberFormatException e)
+// {
+// _devlog.error("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+// + "a non-negative integer. Using default of zero (no watching configured");
+// }
+//
+// if (logConfigFile.exists() && logConfigFile.canRead())
+// {
+// _devlog.info("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
+// if (logWatchTime > 0)
+// {
+// _devlog.info("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+// + logWatchTime + " seconds");
+// // log4j expects the watch interval in milliseconds
+// DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
+// }
+// else
+// {
+// DOMConfigurator.configure(logConfigFile.getAbsolutePath());
+// }
+// }
+// else
+// {
+// System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+// System.err.println("Using basic log4j configuration");
+// BasicConfigurator.configure();
+// }
+// }
+
+ public File getConfigFile()
+ {
+ return _configFile;
+ }
+
+
+ public class InitException extends Exception
+ {
+ InitException(String msg, Throwable cause)
+ {
+ super(msg, cause);
+ }
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 28a9e85489..7f14946834 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -582,9 +582,9 @@ public class AMQChannel
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
- if (_log.isInfoEnabled())
+ if (_log.isDebugEnabled())
{
- _log.info("unacked map Size:" + _unacknowledgedMessageMap.size());
+ _log.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
}
// Process the Unacked-Map.
@@ -640,15 +640,15 @@ public class AMQChannel
});
// Process Messages to Resend
- if (_log.isInfoEnabled())
+ if (_log.isDebugEnabled())
{
if (!msgToResend.isEmpty())
{
- _log.info("Preparing (" + msgToResend.size() + ") message to resend.");
+ _log.debug("Preparing (" + msgToResend.size() + ") message to resend.");
}
else
{
- _log.info("No message to resend.");
+ _log.debug("No message to resend.");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 29ea69caf7..dd6546585f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -34,6 +34,7 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
@@ -48,6 +49,7 @@ import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.management.JMXManagedObjectRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
import org.apache.qpid.server.protocol.AMQPProtocolProvider;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -55,11 +57,19 @@ import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.transport.ConnectorConfiguration;
import org.apache.qpid.url.URLSyntaxException;
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
/**
* Main entry point for AMQPD.
*
*/
-@SuppressWarnings({ "AccessStaticViaInstance" })
+@SuppressWarnings({"AccessStaticViaInstance"})
public class Main
{
/** Used for debugging. */
@@ -133,6 +143,12 @@ public class Main
OptionBuilder.withArgName("port").hasArg()
.withDescription("listen on the specified port. Overrides any value in the config file")
.withLongOpt("port").create("p");
+ Option mport =
+ OptionBuilder.withArgName("mport").hasArg()
+ .withDescription("listen on the specified management port. Overrides any value in the config file")
+ .withLongOpt("mport").create("m");
+
+
Option bind =
OptionBuilder.withArgName("bind").hasArg()
.withDescription("bind to the specified address. Overrides any value in the config file")
@@ -153,6 +169,7 @@ public class Main
options.addOption(logconfig);
options.addOption(logwatchconfig);
options.addOption(port);
+ options.addOption(mport);
options.addOption(bind);
}
@@ -203,15 +220,19 @@ public class Main
catch (InitException e)
{
System.out.println(e.getMessage());
+ _brokerLogger.error("Initialisation Error : " + e.getMessage());
+
}
catch (ConfigurationException e)
{
System.out.println("Error configuring message broker: " + e);
+ _brokerLogger.error("Error configuring message broker: " + e);
e.printStackTrace();
}
catch (Exception e)
{
System.out.println("Error intialising message broker: " + e);
+ _brokerLogger.error("Error intialising message broker: " + e);
e.printStackTrace();
}
}
@@ -260,7 +281,15 @@ public class Main
configureLogging(logConfigFile, logWatchConfig);
}
- ApplicationRegistry.initialise(new ConfigurationFileApplicationRegistry(configFile));
+ ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+
+
+ updateManagementPort(config.getConfiguration(), commandLine.getOptionValue("m"));
+
+
+
+ ApplicationRegistry.initialise(config);
+
// fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues
// that are causing the broker build to pick up the wrong properties file and hence say
@@ -318,6 +347,29 @@ public class Main
bind(port, connectorConfig);
}
+ /**
+ * Update the configuration data with the management port.
+ * @param configuration
+ * @param managementPort The string from the command line
+ */
+ private void updateManagementPort(Configuration configuration, String managementPort)
+ {
+ if (managementPort != null)
+ {
+ int mport;
+ int defaultMPort = configuration.getInt(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH);
+ try
+ {
+ mport = Integer.parseInt(managementPort);
+ configuration.setProperty(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH, mport);
+ }
+ catch (NumberFormatException e)
+ {
+ _logger.warn("Invalid management port: " + managementPort + " will use default:" + defaultMPort, e);
+ }
+ }
+ }
+
protected void setupVirtualHosts(String configFileParent, String configFilePath)
throws ConfigurationException, AMQException, URLSyntaxException
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 868ac31a54..9ebb893362 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -38,9 +38,13 @@ import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.List;
+import java.util.Map;
+
public abstract class AbstractExchange implements Exchange, Managable
{
private AMQShortString _name;
@@ -189,6 +193,8 @@ public abstract class AbstractExchange implements Exchange, Managable
}
}
+ abstract public Map<AMQShortString, List<AMQQueue>> getBindings();
+
public String toString()
{
return getClass().getName() + "[" + getName() +"]";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index f3bdecc32e..377a73dd31 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -20,18 +20,20 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.exception.InternalErrorException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
public class DefaultExchangeRegistry implements ExchangeRegistry
{
private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
@@ -64,7 +66,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange) throws AMQException
{
_exchangeMap.put(exchange.getName(), exchange);
- if(exchange.isDurable())
+ if (exchange.isDurable())
{
try
{
@@ -86,13 +88,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
return _defaultExchange;
}
+ public Collection<AMQShortString> getExchangeNames()
+ {
+ return _exchangeMap.keySet();
+ }
+
public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
Exchange e = _exchangeMap.remove(name);
if (e != null)
{
- if(e.isDurable())
+ if (e.isDurable())
{
try
{
@@ -112,7 +119,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public Exchange getExchange(AMQShortString name)
{
- if((name == null) || name.length() == 0)
+ if ((name == null) || name.length() == 0)
{
return getDefaultExchange();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 0dcceaddbb..c24f9a37e1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -26,22 +26,16 @@ import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -222,4 +216,9 @@ public class DestNameExchange extends AbstractExchange
{
return !_index.getBindingsMap().isEmpty();
}
+
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return _index.getBindingsMap();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index f6a95b5e55..e1a3a24d3e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -21,11 +21,9 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -35,17 +33,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -59,7 +51,7 @@ public class DestWildExchange extends AbstractExchange
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
- new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
// private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final String TOPIC_SEPARATOR = ".";
private static final String AMQP_STAR = "*";
@@ -92,7 +84,7 @@ public class DestWildExchange extends AbstractExchange
queueList.add(q.getName().toString());
}
- Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
+ Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -311,6 +303,11 @@ public class DestWildExchange extends AbstractExchange
}
}
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return _routingKey2queues;
+ }
+
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -358,8 +355,8 @@ public class DestWildExchange extends AbstractExchange
if (queueList.size() > (depth + queueskip))
{ // a hash and it is the last entry
matching =
- queueList.get(depth + queueskip).equals(AMQP_HASH)
- && (queueList.size() == (depth + queueskip + 1));
+ queueList.get(depth + queueskip).equals(AMQP_HASH)
+ && (queueList.size() == (depth + queueskip + 1));
}
}
else if (routingkeyList.size() > (depth + routingskip))
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index a5f77cc2a4..37cd85a8f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -27,9 +27,13 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.List;
+import java.util.Map;
+
public interface Exchange
{
AMQShortString getName();
+
AMQShortString getType();
void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
@@ -51,6 +55,17 @@ public interface Exchange
void route(AMQMessage message) throws AMQException;
+
+ /**
+ * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
+ * @param routingKey
+ * @param arguments
+ * @param queue
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue);
+
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key
* @param routingKey
@@ -58,22 +73,25 @@ public interface Exchange
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+ boolean isBound(AMQShortString routingKey, AMQQueue queue);
/**
- * Determines whether a message is routing to any queue using a specific routing key
+ * Determines whether a message is routing to any queue using a specific _routing key
* @param routingKey
* @return
* @throws AMQException
*/
- boolean isBound(AMQShortString routingKey) throws AMQException;
+ boolean isBound(AMQShortString routingKey);
- boolean isBound(AMQQueue queue) throws AMQException;
+ boolean isBound(AMQQueue queue);
/**
* Returns true if this exchange has at least one binding associated with it.
* @return
* @throws AMQException
*/
- boolean hasBindings() throws AMQException;
+ boolean hasBindings();
+
+ Map<AMQShortString, List<AMQQueue>> getBindings();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index d3a466565f..fe3b19e74e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import java.util.Collection;
+
public interface ExchangeRegistry extends MessageRouter
{
@@ -43,5 +45,7 @@ public interface ExchangeRegistry extends MessageRouter
Exchange getDefaultExchange();
+ Collection<AMQShortString> getExchangeNames();
+
void initialise() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index bf00eeb9d3..1a705248c1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -34,17 +33,13 @@ import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -79,7 +74,7 @@ public class FanoutExchange extends AbstractExchange
{
String queueName = queue.getName().toString();
- Object[] bindingItemValues = { queueName, new String[] { queueName } };
+ Object[] bindingItemValues = {queueName, new String[]{queueName}};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -120,6 +115,11 @@ public class FanoutExchange extends AbstractExchange
}
}
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return null;
+ }
+
public AMQShortString getType()
{
return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
@@ -181,24 +181,29 @@ public class FanoutExchange extends AbstractExchange
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
return (_queues != null) && !_queues.isEmpty();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
return _queues.contains(queue);
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_queues.isEmpty();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index e86094e26f..9bb1ee6a62 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -20,23 +20,6 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.management.JMException;
-import javax.management.openmbean.ArrayType;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -50,6 +33,23 @@ import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import javax.management.JMException;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
/**
* An exchange that binds queues based on a set of required headers and header values
* and routes messages to these queues by matching the headers of the message against
@@ -91,13 +91,13 @@ public class HeadersExchange extends AbstractExchange
private final class HeadersExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ Headers exchange")
- public HeadersExchangeMBean() throws JMException
+ public HeadersExchangeMBean() throws JMException
{
super();
_exchangeType = "headers";
init();
}
-
+
/**
* initialises the OpenType objects.
*/
@@ -113,7 +113,7 @@ public class HeadersExchange extends AbstractExchange
_bindingDataType = new CompositeType("Exchange Binding", "Queue name and header bindings",
_bindingItemNames, _bindingItemNames, _bindingItemTypes);
_bindinglistDataType = new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(),
- _bindingDataType, _bindingItemIndexNames);
+ _bindingDataType, _bindingItemIndexNames);
}
public TabularData bindings() throws OpenDataException
@@ -169,7 +169,7 @@ public class HeadersExchange extends AbstractExchange
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
}
- String[] bindings = binding.split(",");
+ String[] bindings = binding.split(",");
FieldTable bindingMap = new FieldTable();
for (int i = 0; i < bindings.length; i++)
{
@@ -241,17 +241,23 @@ public class HeadersExchange extends AbstractExchange
}
}
- public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ //fixme isBound here should take the arguements in to consideration.
+ return isBound(routingKey, queue);
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return isBound(queue);
}
- public boolean isBound(AMQShortString routingKey) throws AMQException
+ public boolean isBound(AMQShortString routingKey)
{
return hasBindings();
}
- public boolean isBound(AMQQueue queue) throws AMQException
+ public boolean isBound(AMQQueue queue)
{
for (Registration r : _bindings)
{
@@ -263,7 +269,7 @@ public class HeadersExchange extends AbstractExchange
return false;
}
- public boolean hasBindings() throws AMQException
+ public boolean hasBindings()
{
return !_bindings.isEmpty();
}
@@ -288,6 +294,11 @@ public class HeadersExchange extends AbstractExchange
}
}
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return null;
+ }
+
private static class Registration
{
private final HeadersBinding binding;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 9346eecbb2..ab4f2c4e64 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -100,6 +100,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+
+ if (body.consumerTag != null)
+ {
+ body.consumerTag = body.consumerTag.intern();
+ }
+
try
{
AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
@@ -138,15 +144,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
// If the above doesn't work then perhaps this is wrong too.
// throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
// "Non-unique consumer tag, '" + body.consumerTag + "'");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId
+ BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
catch (ExistingExclusiveSubscriptionException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 67ade0a744..68d4483df3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -67,6 +67,10 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
+ else
+ {
+ body.exchange = body.exchange.intern();
+ }
VirtualHost vHost = session.getVirtualHost();
Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
// if the exchange does not exist we raise a channel exception
@@ -86,6 +90,11 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
throw body.getChannelNotFoundException(evt.getChannelId());
}
+ if(body.routingKey != null)
+ {
+ body.routingKey = body.routingKey.intern();
+ }
+
MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
channel.setPublishFrame(info, session);
}
@@ -93,3 +102,4 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
}
+
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 5dbd1b18de..cd0d0e3b76 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.HeartbeatConfig;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -72,7 +73,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
SaslServer ss = null;
try
- {
+ {
ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
if (ss == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index f24c96f87f..0ff19bdf9e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.handler;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 855d1a2add..f0f6fde08c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -83,7 +83,9 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
try
{
- exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+ exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
+ body.type == null ? null : body.type.intern(),
+ body.durable,
body.passive, body.ticket);
exchangeRegistry.registerExchange(exchange);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 4dc67b1970..3e68069838 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,7 +37,6 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -77,7 +77,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
-
+
if (body.routingKey == null)
{
body.routingKey = queue.getName();
@@ -97,9 +97,18 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
}
+
+ if (body.routingKey != null)
+ {
+ body.routingKey = body.routingKey.intern();
+ }
+
try
- {
- queue.bind(body.routingKey, body.arguments, exch);
+ {
+ if (!exch.isBound(body.routingKey, body.arguments, queue))
+ {
+ queue.bind(body.routingKey, body.arguments, exch);
+ }
}
catch (AMQInvalidRoutingKeyException rke)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index f9e94af697..28967841a2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -93,8 +93,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
synchronized (queueRegistry)
{
+
+
if (((queue = queueRegistry.getQueue(body.queue)) == null))
{
+ if(body.queue != null)
+ {
+ body.queue = body.queue.intern();
+ }
+
if (body.passive)
{
String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
index f277398b50..4caae2b26f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.server.management;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
import javax.management.JMException;
import javax.management.MBeanServer;
@@ -43,17 +43,14 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AccountNotFoundException;
import javax.security.sasl.AuthorizeCallback;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
-import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
/**
* This class starts up an MBeanserver. If out of the box agent is being used then there are no security features
@@ -68,6 +65,9 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
private final MBeanServer _mbeanServer;
private Registry _rmiRegistry;
private JMXServiceURL _jmxURL;
+
+ public static final String MANAGEMENT_PORT_CONFIG_PATH = "management.jmxport";
+ public static final int MANAGEMENT_PORT_DEFAULT = 8999;
public JMXManagedObjectRegistry() throws AMQException
{
@@ -95,7 +95,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", false);
- int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999);
+ int port = appRegistry.getConfiguration().getInt(MANAGEMENT_PORT_CONFIG_PATH, MANAGEMENT_PORT_DEFAULT);
if (security)
{
@@ -144,13 +144,13 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance();
cs.setMBeanServerForwarder(mbsf);
cs.start();
- _log.warn("JMX: Started JMXConnector server with SASL");
+ _log.warn("JMX: Started JMXConnector server on port '" + port + "' with SASL");
}
else
{
startJMXConnectorServer(port);
- _log.warn("JMX: Started JMXConnector server with security disabled");
+ _log.warn("JMX: Started JMXConnector server on port '" + port + "' with security disabled");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
index 3ab23e8b46..4fb260472d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.management;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 84ec91d569..5c88edf86f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -44,6 +44,9 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ConnectorConfiguration;
import org.apache.qpid.ssl.SSLContextFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
/**
* The protocol handler handles "protocol events" for all connections. The state
* associated with an individual connection is accessed through the protocol session.
@@ -80,7 +83,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
createSession(protocolSession, _applicationRegistry, codecFactory);
- _logger.info("Protocol session created");
+ _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
@@ -127,12 +130,12 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
public void sessionOpened(IoSession protocolSession) throws Exception
{
- _logger.info("Session opened");
+ _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
}
public void sessionClosed(IoSession protocolSession) throws Exception
{
- _logger.info("Protocol Session closed");
+ _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
// fixme -- this can be null
if (amqProtocolSession != null)
@@ -143,7 +146,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
public void sessionIdle(IoSession session, IdleStatus status) throws Exception
{
- _logger.debug("Protocol Session [" + this + "] idle: " + status);
+ _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
if (IdleStatus.WRITER_IDLE.equals(status))
{
// write heartbeat frame:
@@ -167,7 +170,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
protocolSession.close();
- _logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable);
+ _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
}
else if (throwable instanceof IOException)
{
@@ -178,13 +181,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
// Be aware of possible changes to parameter order as versions change.
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(),
- session.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- 200, // replyCode
- new AMQShortString(throwable.getMessage()) // replyText
- ));
+ protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+ session.getProtocolMajorVersion(),
+ session.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ 200, // replyCode
+ new AMQShortString(throwable.getMessage()) // replyText
+ ));
protocolSession.close();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 95f75fdb36..44634eb877 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -53,6 +53,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
*/
public class AMQMessage implements StorableMessage
{
+ /** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
// The ordered list of queues into which this message is enqueued.
@@ -76,19 +77,16 @@ public class AMQMessage implements StorableMessage
private AMQMessageHandle _messageHandle;
- // TODO: ideally this should be able to go into the transient message date - check this! (RG)
+ /** Holds the transactional context in which this message is being processed. */
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
- * messages published with the 'immediate' flag.
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
- /**
- * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
- * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
- * removed from the store.
- */
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
private boolean _immediate;
// private Subscription _takenBySubcription;
@@ -494,7 +492,7 @@ public class AMQMessage implements StorableMessage
*/
public AMQMessage takeReference()
{
- _referenceCount.incrementAndGet();
+ incrementReference(); // _referenceCount.incrementAndGet();
return this;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 6273ac997b..0c2fd294d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
@@ -46,11 +45,11 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.messageStore.StorableMessage;
import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
-
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -163,22 +162,22 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
+ throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
- new SubscriptionSet(), new SubscriptionImpl.Factory());
+ new SubscriptionSet(), new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
+ VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
- new SubscriptionImpl.Factory());
+ new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
- SubscriptionFactory subscriptionFactory) throws AMQException
+ VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
+ SubscriptionFactory subscriptionFactory) throws AMQException
{
if (name == null)
{
@@ -260,7 +259,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
/**
- * Returns messages within the given range of message Ids
+ * Returns messages within the given range of message Ids.
*
* @param fromMessageId
* @param toMessageId
@@ -292,38 +291,220 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
/**
- * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
- * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
- * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
- * locks from both queues and start async delivery
+ * Moves messages from this queue to another queue, and also commits the move on the message store. Delivery activity
+ * on the queues being moved between is suspended during the move.
*
- * @param fromMessageId
- * @param toMessageId
- * @param queueName
- * @param storeContext
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
*/
public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
+ StoreContext storeContext)
{
- // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
- AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues on the same message store.");
+ }
+
try
{
+ // Obtain locks to prevent activity on the queues being moved between.
startMovingMessages();
+ toQueue.startMovingMessages();
+
+ // Get the list of messages to move.
List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
- // move messages to another queue
- anotherQueue.startMovingMessages();
- anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+ toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between to continue.
+ finally
+ {
+ toQueue.stopMovingMessages();
+ stopMovingMessages();
+ }
+ }
+
+ /**
+ * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
+ * on the queues being moved between is suspended during the move.
+ *
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
+ */
+ public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+ StoreContext storeContext)
+ {
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
- // moving is successful, now remove from original queue
- _deliveryMgr.removeMovedMessages(foundMessagesList);
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues on the same message store.");
}
+
+ try
+ {
+ // Obtain locks to prevent activity on the queues being moved between.
+ startMovingMessages();
+ toQueue.startMovingMessages();
+
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+ message.takeReference();
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between to continue.
+ finally
+ {
+ toQueue.stopMovingMessages();
+ stopMovingMessages();
+ }
+ }
+
+ /**
+ * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
+ * on the queues being moved between is suspended during the remove.
+ *
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
+ */
+ public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+ {
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+
+ try
+ {
+ // Obtain locks to prevent activity on the queues being moved between.
+ startMovingMessages();
+
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // remove the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // remove the messages on the in-memory queues.
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between to continue.
finally
{
- // remove the lock and start the async delivery
- anotherQueue.stopMovingMessages();
stopMovingMessages();
}
}
@@ -458,7 +639,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
{
if (incrementSubscriberCount() > 1)
{
@@ -481,13 +662,12 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
if (_logger.isDebugEnabled())
{
- _logger.debug(MessageFormat.format(
- "Registering protocol session {0} with channel {1} and " + "consumer tag {2} with {3}", ps, channel,
- consumerTag, this));
+ _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and "
+ + "consumer tag {2} with {3}", ps, channel, consumerTag, this));
}
Subscription subscription =
- _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+ _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
if (subscription.filtersMessages())
{
@@ -525,8 +705,8 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
if (_logger.isDebugEnabled())
{
_logger.debug(MessageFormat.format(
- "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel,
- consumerTag, this));
+ "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
+ ps, channel, consumerTag, this));
}
Subscription removedSubscription;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index bbaa7379f6..07872d7644 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -18,30 +18,23 @@
* under the License.
*
*/
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
package org.apache.qpid.server.queue;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -60,30 +53,25 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
/**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
+ * AMQQueueMBean is the management bean for an {@link AMQQueue}.
+ *
+ * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+ /** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 2aa759b35d..907d68b733 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -20,19 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
@@ -42,8 +29,21 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.util.MessageQueue;
import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
+import org.apache.qpid.util.MessageQueue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
/** Manages delivery of messages on behalf of a queue */
@@ -87,6 +87,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
+
+ /** Used by any reaping thread to purge messages */
+ private StoreContext _reapingStoreContext = new StoreContext();
+
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -453,12 +457,31 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
while (purgeMessage(message, sub))
{
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+// boolean alreadyTaken = message.taken(_queue, sub);
+
//remove the already taken message or expired
AMQMessage removed = messages.poll();
assert removed == message;
- _totalMessageSize.addAndGet(-message.getSize());
+ // if the message expired then the _totalMessageSize needs adjusting
+ if (message.expired(_queue))
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+
+ // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
+ message.dequeue(_reapingStoreContext, _queue);
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info(debugIdentity() + " Doing clean up of the main _message queue.");
+ }
+ }
+
+ //else the clean up is not required as the message has already been taken for this queue therefore
+ // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
if (_log.isTraceEnabled())
{
@@ -473,7 +496,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- *
+ * This method will return true if the message is to be purged from the queue.
+ *
+ *
+ * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
* @param message
* @param sub
* @return
@@ -493,15 +519,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// if the message is null then don't purge as we have no messagse.
if (message != null)
{
+ // Check that the message hasn't expired.
+ if (message.expired(_queue))
+ {
+ return true;
+ }
+
// if we have a subscriber perform message checks
if (sub != null)
{
- // Check that the message hasn't expired.
- if (message.expired(sub.getChannel().getStoreContext(), _queue))
- {
- return true;
- }
-
// if we have a queue browser(we don't purge) so check mark the message as taken
purge = ((!sub.isBrowser() || message.isTaken(_queue)));
}
@@ -606,7 +632,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isInfoEnabled())
{
- _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+ //fixme - we should do the clean up as the message remains on the _message queue
+ // this is resulting in the next consumer receiving the message and then attempting to purge it
+ //
+ _log.info(debugIdentity() + "We should do clean up of the main _message queue here");
}
}
@@ -617,7 +646,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
catch (AMQException e)
{
- message.release(_queue);
+ if (message != null)
+ {
+ message.release(_queue);
+ }
+ else
+ {
+ _log.error(debugIdentity() + "Unable to release message as it is null. " + e, e);
+ }
_log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -696,25 +732,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
-// private void sendNextMessage(Subscription sub)
-// {
-// if (sub.filtersMessages())
-// {
-// sendNextMessage(sub, sub.getPreDeliveryQueue());
-// if (sub.isAutoClose())
-// {
-// if (sub.getPreDeliveryQueue().isEmpty())
-// {
-// sub.close();
-// }
-// }
-// }
-// else
-// {
-// sendNextMessage(sub, _messages);
-// }
-// }
-
public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
{
@@ -723,8 +740,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
}
- // This shouldn't be done here.
-// msg.release();
//Check if we have someone to deliver the message to.
_lock.lock();
@@ -800,7 +815,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (debugEnabled)
{
_log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
- "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+ "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
}
}
}
@@ -810,7 +825,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (debugEnabled)
{
_log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
- " Subscriber:" + System.identityHashCode(s));
+ " Subscriber:" + System.identityHashCode(s));
}
deliver(context, name, msg, deliverFirst);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index c0f1e7f40c..cbe9246f09 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.server.queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
public class DefaultQueueRegistry implements QueueRegistry
{
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
@@ -57,4 +58,14 @@ public class DefaultQueueRegistry implements QueueRegistry
{
return _queueMap.get(name);
}
+
+ public Collection<AMQShortString> getQueueNames()
+ {
+ return _queueMap.keySet();
+ }
+
+ public Collection<AMQQueue> getQueues()
+ {
+ return _queueMap.values();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
index a8247aa2db..60c1a8f574 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
@@ -46,7 +46,7 @@ class ExchangeBindings
ExchangeBinding(AMQShortString routingKey, Exchange exchange)
{
- this(routingKey, exchange,EMPTY_ARGUMENTS);
+ this(routingKey, exchange, EMPTY_ARGUMENTS);
}
ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
@@ -80,7 +80,10 @@ class ExchangeBindings
public boolean equals(Object o)
{
- if (!(o instanceof ExchangeBinding)) return false;
+ if (!(o instanceof ExchangeBinding))
+ {
+ return false;
+ }
ExchangeBinding eb = (ExchangeBinding) o;
return _exchange.equals(eb._exchange)
&& _routingKey.equals(eb._routingKey)
@@ -104,16 +107,16 @@ class ExchangeBindings
*/
void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
{
- _bindings.add(new ExchangeBinding(routingKey, exchange, arguments ));
+ _bindings.add(new ExchangeBinding(routingKey, exchange, arguments));
}
public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
{
- _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments ));
+ _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
}
-
+
/**
* Deregisters this queue from any exchange it has been bound to
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
index 285f05fb20..6118a4c11f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
@@ -1,18 +1,22 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
package org.apache.qpid.server.queue;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index 00ccffdea1..6b3d65661f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.queue;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
index 9554d34f00..959ca03c80 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
@@ -1,22 +1,26 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.queue;
+
public interface QueueNotificationListener
{
void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index ed2101fd75..13c150f82b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.messageStore.StorableQueue;
+import java.util.Collection;
public interface QueueRegistry
{
@@ -35,4 +36,9 @@ public interface QueueRegistry
void unregisterQueue(AMQShortString name) throws AMQException;
AMQQueue getQueue(AMQShortString name);
+
+ Collection<AMQShortString> getQueueNames();
+
+ Collection<AMQQueue> getQueues();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
index 7c8064789e..79ee6b93a3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.queue;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 8becaf52b9..d9500429af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.txn;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
index 88451e2fca..fee25c07df 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -28,24 +28,144 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoreContext;
/**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ * TransactionalContext provides a context in which transactional operations on {@link AMQMessage}s are performed.
+ * Different levels of transactional support for the delivery of messages may be provided by different implementations
+ * of this interface.
+ *
+ * <p/>The fundamental transactional operations that can be performed on a message queue are 'enqueue' and 'dequeue'.
+ * In this interface, these have been recast as the {@link #messageFullyReceived} and {@link #acknowledgeMessage}
+ * operations. This interface essentially provides a way to make enqueueing and dequeuing transactional.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Explicitly accept a transaction start notification.
+ * <tr><td> Commit all pending operations in a transaction.
+ * <tr><td> Rollback all pending operations in a transaction.
+ * <tr><td> Deliver a message to a queue as part of a transaction.
+ * <tr><td> Redeliver a message to a queue as part of a transaction.
+ * <tr><td> Mark a message as acknowledged as part of a transaction.
+ * <tr><td> Accept notification that a message has been completely received as part of a transaction.
+ * <tr><td> Accept notification that a message has been fully processed as part of a transaction.
+ * <tr><td> Associate a message store context with this transaction context.
+ * </table>
+ *
+ * @todo The 'fullyReceived' and 'messageProcessed' events sit uncomfortably in the responsibilities of a transactional
+ * context. They are non-transactional operations, used to trigger other side-effects. Consider moving them
+ * somewhere else, a seperate interface for example.
+ *
+ * @todo This transactional context could be written as a wrapper extension to a Queue implementation, that provides
+ * transactional management of the enqueue and dequeue operations, with added commit/rollback methods. Any
+ * queue implementation could be made transactional by wrapping it as a transactional queue. This would mean
+ * that the enqueue/dequeue operations do not need to be recast as deliver/acknowledge operations, which may be
+ * conceptually neater.
+ *
+ * For example:
+ * <pre>
+ * public interface Transactional
+ * {
+ * public void commit();
+ * public void rollback();
+ * }
+ *
+ * public interface TransactionalQueue<E> extends Transactional, SizeableQueue<E>
+ * {}
+ *
+ * public class Queues
+ * {
+ * ...
+ * // For transactional messaging, take a transactional view onto the queue.
+ * public static <E> TransactionalQueue<E> getTransactionalQueue(SizeableQueue<E> queue) { ... }
+ *
+ * // For non-transactional messaging, take a non-transactional view onto the queue.
+ * public static <E> TransactionalQueue<E> getNonTransactionalQueue(SizeableQueue<E> queue) { ... }
+ * }
+ * </pre>
*/
public interface TransactionalContext
{
+ /**
+ * Explicitly begins the transaction, if it has not already been started. {@link #commit} or {@link #rollback}
+ * should automatically begin the next transaction in the chain.
+ *
+ * @throws AMQException If the transaction cannot be started for any reason.
+ */
void beginTranIfNecessary() throws AMQException;
+ /**
+ * Makes all pending operations on the transaction permanent and visible.
+ *
+ * @throws AMQException If the transaction cannot be committed for any reason.
+ */
void commit() throws AMQException;
+ /**
+ * Erases all pending operations on the transaction.
+ *
+ * @throws AMQException If the transaction cannot be committed for any reason.
+ */
void rollback() throws AMQException;
+ /**
+ * Delivers the specified message to the specified queue. A 'deliverFirst' flag may be set if the message is a
+ * redelivery, and should be placed on the front of the queue.
+ *
+ * <p/>This is an 'enqueue' operation.
+ *
+ * @param message The message to deliver.
+ * @param queue The queue to deliver the message to.
+ * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
+ * for normal FIFO message ordering.
+ *
+ * @throws AMQException If the message cannot be delivered for any reason.
+ */
void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
+ /**
+ * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
+ * setting the 'multiple' flag. It is also possible for the acknowledged message id to be zero, when the 'multiple'
+ * flag is set, in which case an acknowledgement up to the latest delivered message should be done.
+ *
+ * <p/>This is a 'dequeue' operation.
+ *
+ * @param deliveryTag The id of the message to acknowledge, or zero, if using multiple acknowledgement
+ * up to the latest message.
+ * @param lastDeliveryTag The latest message delivered.
+ * @param multiple <tt>true</tt> if all message ids up the acknowledged one or latest delivered, are
+ * to be acknowledged, <tt>false</tt> otherwise.
+ * @param unacknowledgedMessageMap The unacknowledged messages in the transaction, to remove the acknowledged message
+ * from.
+ *
+ * @throws AMQException If the message cannot be acknowledged for any reason.
+ */
void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
- UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+ /**
+ * Notifies the transactional context that a message has been fully received. The actual message that was received
+ * is not specified. This event may be used to trigger a process related to the receipt of the message, for example,
+ * flushing its data to disk.
+ *
+ * @param persistent <tt>true</tt> if the received message is persistent, <tt>false</tt> otherwise.
+ *
+ * @throws AMQException If the fully received event cannot be processed for any reason.
+ */
void messageFullyReceived(boolean persistent) throws AMQException;
+ /**
+ * Notifies the transactional context that a message has been delivered, succesfully or otherwise. The actual
+ * message that was delivered is not specified. This event may be used to trigger a process related to the
+ * outcome of the delivery of the message, for example, cleaning up failed deliveries.
+ *
+ * @param protocolSession The protocol session of the deliverable message.
+ *
+ * @throws AMQException If the message processed event cannot be handled for any reason.
+ */
void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
+ /**
+ * Gets the message store context associated with this transactional context.
+ *
+ * @return The message store context associated with this transactional context.
+ */
StoreContext getStoreContext();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 6638689299..235555c58b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -1,3 +1,4 @@
+<<<<<<< .working
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -286,3 +287,264 @@ public class VirtualHost implements Accessable
}
}
+=======
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+
+public class VirtualHost implements Accessable
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+ private AuthenticationManager _authenticationManager;
+
+ private AccessManager _accessManager;
+
+
+ public void setAccessableName(String name)
+ {
+ _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+ + name + ") ignored remains :" + getAccessableName());
+ }
+
+ public String getAccessableName()
+ {
+ return _name;
+ }
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
+ * implementaion of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+ /**
+ * Used for testing only
+ * @param name
+ * @param store
+ * @throws Exception
+ */
+ public VirtualHost(String name, MessageStore store) throws Exception
+ {
+ this(name, null, store);
+ }
+
+ /**
+ * Normal Constructor
+ * @param name
+ * @param hostConfig
+ * @throws Exception
+ */
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ this(name, hostConfig, null);
+ }
+
+ private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ // This isn't needed to be registered
+ //_virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ if (store != null)
+ {
+ _messageStore = store;
+ }
+ else
+ {
+ if (hostConfig == null)
+ {
+ throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ }
+ initialiseMessageStore(hostConfig);
+ }
+
+ _exchangeRegistry.initialise();
+
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+ _accessManager = new AccessManagerImpl(name, hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public AccessManager getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public void close() throws Exception
+ {
+ if (_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
+>>>>>>> .merge-right.r553432
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
new file mode 100644
index 0000000000..afa7916074
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.commands.Clear;
+import org.apache.qpid.tools.messagestore.commands.Command;
+import org.apache.qpid.tools.messagestore.commands.Copy;
+import org.apache.qpid.tools.messagestore.commands.Dump;
+import org.apache.qpid.tools.messagestore.commands.Help;
+import org.apache.qpid.tools.messagestore.commands.List;
+import org.apache.qpid.tools.messagestore.commands.Load;
+import org.apache.qpid.tools.messagestore.commands.Quit;
+import org.apache.qpid.tools.messagestore.commands.Select;
+import org.apache.qpid.tools.messagestore.commands.Show;
+import org.apache.qpid.tools.utils.CommandParser;
+import org.apache.qpid.tools.utils.Console;
+import org.apache.qpid.tools.utils.SimpleCommandParser;
+import org.apache.qpid.tools.utils.SimpleConsole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+/**
+ * MessageStoreTool.
+ */
+public class MessageStoreTool
+{
+ /** Text outputted at the start of each console.*/
+ private static final String BOILER_PLATE = "MessageStoreTool - for examining Persistent Qpid Broker MessageStore instances";
+
+ /** I/O Wrapper. */
+ protected Console _console;
+
+ /** Batch mode flag. */
+ protected boolean _batchMode;
+
+ /** Internal State object. */
+ private State _state = new State();
+
+ private HashMap<String, Command> _commands = new HashMap<String, Command>();
+
+ /** SLF4J Logger. */
+ private static Logger _devlog = LoggerFactory.getLogger(MessageStoreTool.class);
+
+ /** Loaded configuration file. */
+ private Configuration _config;
+
+ /** Control used for main run loop. */
+ private boolean _running = true;
+ private boolean _initialised = false;
+
+ //---------------------------------------------------------------------------------------------------/
+
+ public static void main(String[] args) throws Configuration.InitException
+ {
+
+ MessageStoreTool tool = new MessageStoreTool(args);
+
+ tool.start();
+ }
+
+
+ public MessageStoreTool(String[] args) throws Configuration.InitException
+ {
+ this(args, System.in, System.out);
+ }
+
+ public MessageStoreTool(String[] args, InputStream in, OutputStream out) throws Configuration.InitException
+ {
+ BufferedReader consoleReader = new BufferedReader(new InputStreamReader(in));
+ BufferedWriter consoleWriter = new BufferedWriter(new OutputStreamWriter(out));
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
+ _batchMode = false;
+
+ _console = new SimpleConsole(consoleWriter, consoleReader);
+
+ _config = new Configuration();
+
+ setOptions();
+ _config.processCommandline(args);
+ }
+
+
+ private void setOptions()
+ {
+ Option help = new Option("h", "help", false, "print this message");
+ Option version = new Option("v", "version", false, "print the version information and exit");
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg()
+ .withDescription("use given configuration file By "
+ + "default looks for a file named "
+ + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
+ .withLongOpt("config")
+ .create("c");
+
+ _config.setOption(help);
+ _config.setOption(version);
+ _config.setOption(configFile);
+ }
+
+ public State getState()
+ {
+ return _state;
+ }
+
+ public Map<String, Command> getCommands()
+ {
+ return _commands;
+ }
+
+ public void setConfigurationFile(String configfile) throws Configuration.InitException
+ {
+ _config.loadConfig(new File(configfile));
+ setup();
+ }
+
+ public Console getConsole()
+ {
+ return _console;
+ }
+
+ public void setConsole(Console console)
+ {
+ _console = console;
+ }
+
+ /**
+ * Simple ShutdownHook to cleanly shutdown the databases
+ */
+ class ShutdownHook implements Runnable
+ {
+ MessageStoreTool _tool;
+
+ ShutdownHook(MessageStoreTool messageStoreTool)
+ {
+ _tool = messageStoreTool;
+ }
+
+ public void run()
+ {
+ _tool.quit();
+ }
+ }
+
+ public void quit()
+ {
+ _running = false;
+
+ if (_initialised)
+ {
+ ApplicationRegistry.remove(1);
+ }
+
+ _console.println("...exiting");
+
+ _console.close();
+ }
+
+ public void setBatchMode(boolean batchmode)
+ {
+ _batchMode = batchmode;
+ }
+
+ /**
+ * Main loop
+ */
+ protected void start()
+ {
+ setup();
+
+ if (!_initialised)
+ {
+ System.exit(1);
+ }
+
+ _console.println("");
+
+ _console.println(BOILER_PLATE);
+
+ runCLI();
+ }
+
+ private void setup()
+ {
+ loadDefaultVirtualHosts();
+
+ loadCommands();
+
+ _state.clearAll();
+ }
+
+ private void loadCommands()
+ {
+ _commands.clear();
+ //todo Dynamically load the classes that exis in com.redhat.etp.qpid.commands
+ _commands.put("close", new Clear(this));
+ _commands.put("copy", new Copy(this));
+ _commands.put("dump", new Dump(this));
+ _commands.put("help", new Help(this));
+ _commands.put("list", new List(this));
+ _commands.put("load", new Load(this));
+ _commands.put("quit", new Quit(this));
+ _commands.put("select", new Select(this));
+ _commands.put("show", new Show(this));
+ }
+
+ private void loadDefaultVirtualHosts()
+ {
+ final File configFile = _config.getConfigFile();
+
+ loadVirtualHosts(configFile);
+ }
+
+ private void loadVirtualHosts(File configFile)
+ {
+
+ if (!configFile.exists())
+ {
+ _devlog.error("Config file not found:" + configFile.getAbsolutePath());
+ return;
+ }
+ else
+ {
+ _devlog.debug("using config file :" + configFile.getAbsolutePath());
+ }
+
+ try
+ {
+ ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
+
+ ApplicationRegistry.remove(1);
+
+ ApplicationRegistry.initialise(registry);
+
+ checkMessageStores();
+ _initialised = true;
+ }
+ catch (ConfigurationException e)
+ {
+ _console.println("Unable to load configuration due to configuration error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ catch (Exception e)
+ {
+ _console.println("Unable to load configuration due to: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+
+ }
+
+ private void checkMessageStores()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+ boolean warning = false;
+ for (VirtualHost vhost : vhosts)
+ {
+ if (vhost.getMessageStore() instanceof MemoryMessageStore)
+ {
+ _console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. "
+ + "Changes will not persist.");
+ warning = true;
+ }
+ }
+
+ if (warning)
+ {
+ _console.println("");
+ _console.println("Please ensure you are using the correct config file currently using '"
+ + _config.getConfigFile().getAbsolutePath() + "'");
+ _console.println("New config file can be specifed by 'load <config file>' or -c on the commandline.");
+ _console.println("");
+ }
+ }
+
+ private void runCLI()
+ {
+ while (_running)
+ {
+ if (!_batchMode)
+ {
+ printPrompt();
+ }
+
+ String[] args = _console.readCommand();
+
+ while (args != null)
+ {
+ exec(args);
+
+ if (_running)
+ {
+ if (!_batchMode)
+ {
+ printPrompt();
+ }
+
+ args = _console.readCommand();
+ }
+ }
+ }
+ }
+
+ private void printPrompt()
+ {
+ _console.print(prompt());
+ }
+
+
+ /**
+ * Execute a script (batch mode).
+ *
+ * @param script The file script
+ */
+ protected void runScripts(String script)
+ {
+ //Store Current State
+ boolean oldBatch = _batchMode;
+ CommandParser oldParser = _console.getCommandParser();
+ setBatchMode(true);
+
+ try
+ {
+ _devlog.debug("Running script '" + script + "'");
+
+ _console.setCommandParser(new SimpleCommandParser(new BufferedReader(new FileReader(script))));
+
+ start();
+ }
+ catch (java.io.FileNotFoundException e)
+ {
+ _devlog.error("Script not found: '" + script + "' due to:" + e.getMessage());
+ }
+
+ //Restore previous state
+ _console.setCommandParser(oldParser);
+ setBatchMode(oldBatch);
+ }
+
+ public String prompt()
+ {
+ String state = _state.toString();
+ if (state != null && state.length() != 0)
+ {
+ return state + ":bdb$ ";
+ }
+ else
+ {
+ return "bdb$ ";
+ }
+ }
+
+ /**
+ * Execute the command.
+ *
+ * @param args [command, arg0, arg1...].
+ */
+ protected void exec(String[] args)
+ {
+ // Comment lines start with a #
+ if (args.length == 0 || args[0].startsWith("#"))
+ {
+ return;
+ }
+
+ final String command = args[0];
+
+ Command cmd = _commands.get(command);
+
+ if (cmd == null)
+ {
+ _console.println("Command not understood: " + command);
+ }
+ else
+ {
+ cmd.execute(args);
+ }
+ }
+
+
+ /**
+ * Displays usage info.
+ */
+ protected static void help()
+ {
+ System.out.println(BOILER_PLATE);
+ System.out.println("Usage: java " + MessageStoreTool.class + " [Options]");
+ System.out.println(" [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
+ }
+
+
+ /**
+ * This class is used to store the current state of the tool.
+ *
+ * This is then interrogated by the various commands to augment their behaviour.
+ *
+ *
+ */
+ public class State
+ {
+ private VirtualHost _vhost = null;
+ private AMQQueue _queue = null;
+ private Exchange _exchange = null;
+ private java.util.List<Long> _msgids = null;
+
+ public State()
+ {
+ }
+
+ public void setQueue(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public void setVhost(VirtualHost vhost)
+ {
+ _vhost = vhost;
+ }
+
+ public VirtualHost getVhost()
+ {
+ return _vhost;
+ }
+
+ public Exchange getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(Exchange exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public String toString()
+ {
+ StringBuilder status = new StringBuilder();
+
+ if (_vhost != null)
+ {
+ status.append(_vhost.getName());
+
+ if (_exchange != null)
+ {
+ status.append("[");
+ status.append(_exchange.getName());
+ status.append("]");
+
+ if (_queue != null)
+ {
+ status.append("->'");
+ status.append(_queue.getName());
+ status.append("'");
+
+ if (_msgids != null)
+ {
+ status.append(printMessages());
+ }
+ }
+ }
+ }
+
+ return status.toString();
+ }
+
+
+ public String printMessages()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ Long previous = null;
+
+ Long start = null;
+ for (Long id : _msgids)
+ {
+ if (previous != null)
+ {
+ if (id == previous + 1)
+ {
+ if (start == null)
+ {
+ start = previous;
+ }
+ }
+ else
+ {
+ if (start != null)
+ {
+ sb.append(",");
+ sb.append(start);
+ sb.append("-");
+ sb.append(id);
+ start = null;
+ }
+ else
+ {
+ sb.append(",");
+ sb.append(previous);
+ }
+ }
+ }
+
+ previous = id;
+ }
+
+ if (start != null)
+ {
+ sb.append(",");
+ sb.append(start);
+ sb.append("-");
+ sb.append(_msgids.get(_msgids.size() - 1));
+ }
+ else
+ {
+ sb.append(",");
+ sb.append(previous);
+ }
+
+ // surround list in ()
+ sb.replace(0, 1, "(");
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void clearAll()
+ {
+ _vhost = null;
+ clearExchange();
+ }
+
+ public void clearExchange()
+ {
+ _exchange = null;
+ clearQueue();
+ }
+
+ public void clearQueue()
+ {
+ _queue = null;
+ clearMessages();
+ }
+
+ public void clearMessages()
+ {
+ _msgids = null;
+ }
+
+ /**
+ * A common location to provide parsing of the message id string
+ * utilised by a number of the commands.
+ * The String is comma separated list of ids that can be individual ids
+ * or a range (4-10)
+ *
+ * @param msgString string of msg ids to parse 1,2,4-10
+ */
+ public void setMessages(String msgString)
+ {
+ StringTokenizer tok = new StringTokenizer(msgString, ",");
+
+ if (tok.hasMoreTokens())
+ {
+ _msgids = new LinkedList<Long>();
+ }
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+ if (next.contains("-"))
+ {
+ Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
+ Long end = Long.parseLong(next.substring(next.indexOf("-") + 1));
+
+ if (end >= start)
+ {
+ for (long l = start; l <= end; l++)
+ {
+ _msgids.add(l);
+ }
+ }
+ }
+ else
+ {
+ _msgids.add(Long.parseLong(next));
+ }
+ }
+
+ }
+
+ public void setMessages(java.util.List<Long> msgids)
+ {
+ _msgids = msgids;
+ }
+
+ public java.util.List<Long> getMessages()
+ {
+ return _msgids;
+ }
+ }//Class State
+
+}//Class MessageStoreTool
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
new file mode 100644
index 0000000000..5444197cb4
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+public abstract class AbstractCommand implements Command
+{
+ protected Console _console;
+ protected MessageStoreTool _tool;
+
+ public AbstractCommand(MessageStoreTool tool)
+ {
+ _console = tool.getConsole();
+ _tool = tool;
+ }
+
+ public void setOutput(Console out)
+ {
+ _console = out;
+ }
+
+ protected void commandError(String message, String[] args)
+ {
+ _console.print(getCommand() + " : " + message);
+
+ if (args != null)
+ {
+ for (int i = 1; i < args.length; i++)
+ {
+ _console.print(args[i]);
+ }
+ }
+ _console.println("");
+ _console.println(help());
+ }
+
+
+ public abstract String help();
+
+ public abstract String usage();
+
+ public abstract String getCommand();
+
+
+ public abstract void execute(String... args);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java
new file mode 100644
index 0000000000..b0006b3fe6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Clear extends AbstractCommand
+{
+ public Clear(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Clears any selection.";
+ }
+
+ public String usage()
+ {
+ return "clear [ all | virtualhost | exchange | queue | msgs ]";
+ }
+
+ public String getCommand()
+ {
+ return "clear";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length < 1)
+ {
+ doClose("all");
+ }
+ else
+ {
+ doClose(args[1]);
+ }
+ }
+
+ private void doClose(String type)
+ {
+ if (type.equals("virtualhost")
+ || type.equals("all"))
+ {
+ _tool.getState().clearAll();
+ }
+
+ if (type.equals("exchange"))
+ {
+ _tool.getState().clearExchange();
+ }
+
+ if (type.equals("queue"))
+ {
+ _tool.getState().clearQueue();
+ }
+
+ if (type.equals("msgs"))
+ {
+ _tool.getState().clearMessages();
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java
new file mode 100644
index 0000000000..bfa775a34a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.utils.Console;
+
+public interface Command
+{
+ public void setOutput(Console out);
+
+ public String help();
+
+ public abstract String usage();
+
+ String getCommand();
+
+ public void execute(String... args);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
new file mode 100644
index 0000000000..96ecb36952
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Copy extends Move
+{
+ public Copy(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Copy messages between queues.\n" +
+ "The currently selected message set will be copied to the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line.";
+ }
+
+ public String usage()
+ {
+ return "copy to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "copy";
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext)
+ {
+ fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), storeContext);
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
new file mode 100644
index 0000000000..eea53252c6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class Dump extends Show
+{
+ private static final int LINE_SIZE = 8;
+ private static final String DEFAULT_ENCODING = "utf-8";
+ private static final boolean SPACE_BYTES = true;
+ private static final String BYTE_SPACER = " ";
+ private static final String NON_PRINTING_ASCII_CHAR = "?";
+
+ protected boolean _content = true;
+
+ public Dump(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Dump selected message content. Default: show=content";
+ }
+
+ public String usage()
+ {
+ return getCommand() + " [show=[all],[msgheaders],[_amqHeaders],[routing],[content]] [id=<msgid e.g. 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "dump";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("show="))
+ {
+ _content = arg.contains("content") || arg.contains("all");
+ }
+ }
+
+ parseArgs(args);
+ }
+
+ performShow();
+ }
+
+
+ protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ boolean showMessageHeaders)
+ {
+
+ List<List> display = new LinkedList<List>();
+
+ List<String> hex = new LinkedList<String>();
+ List<String> ascii = new LinkedList<String>();
+ display.add(hex);
+ display.add(ascii);
+
+ for (AMQMessage msg : messages)
+ {
+ if (!includeMsg(msg, msgids))
+ {
+ continue;
+ }
+
+ //Add divider between messages
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ // Show general message information
+ hex.add(Show.Columns.ID.name());
+ ascii.add(msg.getMessageId().toString());
+
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ if (showRouting)
+ {
+ addShowInformation(hex, ascii, msg, "Routing Details", true, false, false);
+ }
+ if (showHeaders)
+ {
+ addShowInformation(hex, ascii, msg, "Headers", false, true, false);
+ }
+ if (showMessageHeaders)
+ {
+ addShowInformation(hex, ascii, msg, null, false, false, true);
+ }
+
+ // Add Content Body seciont
+ hex.add("Content Body");
+ ascii.add("");
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ Iterator bodies = msg.getContentBodyIterator();
+ if (bodies.hasNext())
+ {
+
+ hex.add("Hex");
+ hex.add(Console.ROW_DIVIDER);
+
+
+ ascii.add("ASCII");
+ ascii.add(Console.ROW_DIVIDER);
+
+ while (bodies.hasNext())
+ {
+ ContentChunk chunk = (ContentChunk) bodies.next();
+
+ //Duplicate so we don't destroy original data :)
+ ByteBuffer hexBuffer = chunk.getData().duplicate();
+
+ ByteBuffer charBuffer = hexBuffer.duplicate();
+
+ Hex hexencoder = new Hex();
+
+ while (hexBuffer.hasRemaining())
+ {
+ byte[] line = new byte[LINE_SIZE];
+
+ int bufsize = hexBuffer.remaining();
+ if (bufsize < LINE_SIZE)
+ {
+ hexBuffer.get(line, 0, bufsize);
+ }
+ else
+ {
+ bufsize = line.length;
+ hexBuffer.get(line);
+ }
+
+ byte[] encoded = hexencoder.encode(line);
+
+ try
+ {
+ String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
+ String hexLine = "";
+
+ int strKength = encStr.length();
+ for (int c = 0; c < strKength; c++)
+ {
+ hexLine += encStr.charAt(c);
+
+ if (c % 2 == 1 && SPACE_BYTES)
+ {
+ hexLine += BYTE_SPACER;
+ }
+ }
+
+ hex.add(hexLine);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ _console.println(e.getMessage());
+ return null;
+ }
+ }
+
+ while (charBuffer.hasRemaining())
+ {
+ String asciiLine = "";
+
+ for (int pos = 0; pos < LINE_SIZE; pos++)
+ {
+ if (charBuffer.hasRemaining())
+ {
+ byte ch = charBuffer.get();
+
+ if (isPrintable(ch))
+ {
+ asciiLine += (char) ch;
+ }
+ else
+ {
+ asciiLine += NON_PRINTING_ASCII_CHAR;
+ }
+
+ if (SPACE_BYTES)
+ {
+ asciiLine += BYTE_SPACER;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ ascii.add(asciiLine);
+ }
+ }
+ }
+ else
+ {
+ List<String> result = new LinkedList<String>();
+
+ display.add(result);
+ result.add("No ContentBodies");
+ }
+ }
+
+ // if hex is empty then we have no data to display
+ if (hex.size() == 0)
+ {
+ return null;
+ }
+
+ return display;
+ }
+
+ private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
+ String title, boolean routing, boolean headers, boolean messageHeaders)
+ {
+ List<AMQMessage> single = new LinkedList<AMQMessage>();
+ single.add(msg);
+
+ List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
+
+ //Reformat data
+ if (title != null)
+ {
+ column1.add(title);
+ column2.add("");
+ column1.add(Console.ROW_DIVIDER);
+ column2.add(Console.ROW_DIVIDER);
+ }
+
+ // look at all columns in the routing Data
+ for (List item : routingData)
+ {
+ // the item should be:
+ // Title
+ // *divider
+ // value
+ // otherwise we can't reason about the correct value
+ if (item.size() == 3)
+ {
+ //Filter out the columns we are not interested in.
+
+ String columnName = item.get(0).toString();
+
+ if (!(columnName.equals(Show.Columns.ID.name())
+ || columnName.equals(Show.Columns.Size.name())))
+ {
+ column1.add(columnName);
+ column2.add(item.get(2).toString());
+ }
+ }
+ }
+ column1.add(Console.ROW_DIVIDER);
+ column2.add(Console.ROW_DIVIDER);
+ }
+
+ private boolean isPrintable(byte c)
+ {
+ return c > 31 && c < 127;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java
new file mode 100644
index 0000000000..0f9546541b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+public class Help extends AbstractCommand
+{
+ public Help(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Provides detailed help on commands.";
+ }
+
+ public String getCommand()
+ {
+ return "help";
+ }
+
+ public String usage()
+ {
+ return "help [<command>]";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 1)
+ {
+ Command command = _tool.getCommands().get(args[1]);
+ if (command != null)
+ {
+ _console.println(command.help());
+ _console.println("Usage:" + command.usage());
+ }
+ else
+ {
+ commandError("Command not found: ", args);
+ }
+ }
+ else
+ {
+ java.util.List<java.util.List> data = new LinkedList<java.util.List>();
+
+ java.util.List<String> commandName = new LinkedList<String>();
+ java.util.List<String> commandDescription = new LinkedList<String>();
+
+ data.add(commandName);
+ data.add(commandDescription);
+
+ //Set up Headers
+ commandName.add("Command");
+ commandDescription.add("Description");
+
+ commandName.add(Console.ROW_DIVIDER);
+ commandDescription.add(Console.ROW_DIVIDER);
+
+ //Add current Commands with descriptions
+ Map<String, Command> commands = _tool.getCommands();
+
+ for (Command command : commands.values())
+ {
+ commandName.add(command.getCommand());
+ commandDescription.add(command.help());
+ }
+
+ _console.printMap("Available Commands", data);
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
new file mode 100644
index 0000000000..df8b59ec19
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+public class List extends AbstractCommand
+{
+
+ public List(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public void setOutput(Console out)
+ {
+ _console = out;
+ }
+
+ public String help()
+ {
+ return "list available items.";
+ }
+
+ public String usage()
+ {
+ return "list queues [<exchange>] | exchanges | bindings [<exchange>] | all";
+ }
+
+ public String getCommand()
+ {
+ return "list";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 1)
+ {
+ if ((args[1].equals("exchanges"))
+ || (args[1].equals("queues"))
+ || (args[1].equals("bindings"))
+ || (args[1].equals("all")))
+ {
+ if (args.length == 2)
+ {
+ doList(args[1]);
+ }
+ else if (args.length == 3)
+ {
+ doList(args[1], args[2]);
+ }
+ }
+ else
+ {
+ commandError("Unknown options. ", args);
+ }
+ }
+ else if (args.length < 2)
+ {
+ doList("all");
+ }
+ else
+ {
+ doList(args[1]);
+ }
+ }
+
+ private void doList(String... listItem)
+ {
+ if (_tool.getState().getVhost() == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ listVirtualHosts();
+ return;
+ }
+
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ java.util.List<String> data = null;
+
+ if (listItem[0].equals("queues"))
+ {
+ if (listItem.length > 1)
+ {
+ data = listQueues(vhost, new AMQShortString(listItem[1]));
+ }
+ else
+ {
+ Exchange exchange = _tool.getState().getExchange();
+ data = listQueues(vhost, exchange);
+ }
+ }
+
+ if (listItem[0].equals("exchanges"))
+ {
+ data = listExchanges(vhost);
+ }
+
+ if (listItem[0].equals("bindings"))
+ {
+
+ if (listItem.length > 1)
+ {
+ data = listBindings(vhost, new AMQShortString(listItem[1]));
+ }
+ else
+ {
+ Exchange exchange = _tool.getState().getExchange();
+
+ data = listBindings(vhost, exchange);
+ }
+ }
+
+ if (data != null)
+ {
+ if (data.size() == 1)
+ {
+ _console.println("No '" + listItem[0] + "' to display,");
+ }
+ else
+ {
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+ }
+
+
+ if (listItem[0].equals("all"))
+ {
+
+ boolean displayed = false;
+ Exchange exchange = _tool.getState().getExchange();
+
+ //Do the display here for each one so that they are pretty printed
+ data = listQueues(vhost, exchange);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+
+ if (exchange == null)
+ {
+ data = listExchanges(vhost);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+ }
+
+ data = listBindings(vhost, exchange);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+
+ if (!displayed)
+ {
+ _console.println("Nothing to list");
+ }
+ }
+ }
+
+ private void listVirtualHosts()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance()
+ .getVirtualHostRegistry().getVirtualHosts();
+
+ String[] data = new String[vhosts.size() + 1];
+
+ data[0] = "Available VirtualHosts";
+
+ int index = 1;
+ for (VirtualHost vhost : vhosts)
+ {
+ data[index] = vhost.getName();
+ index++;
+ }
+
+ _console.displayList(true, data);
+ }
+
+ private java.util.List<String> listBindings(VirtualHost vhost, AMQShortString exchangeName)
+ {
+ return listBindings(vhost, vhost.getExchangeRegistry().getExchange(exchangeName));
+ }
+
+ private java.util.List<String> listBindings(VirtualHost vhost, Exchange exchange)
+ {
+ Collection<AMQShortString> queues = vhost.getQueueRegistry().getQueueNames();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Current Bindings");
+
+ for (AMQShortString queue : queues)
+ {
+ if (exchange != null)
+ {
+ if (exchange.isBound(queue))
+ {
+ data.add(queue.toString());
+ }
+ }
+ else
+ {
+ data.add(queue.toString());
+ }
+ }
+
+ return data;
+ }
+
+ private java.util.List<String> listExchanges(VirtualHost vhost)
+ {
+ Collection<AMQShortString> queues = vhost.getExchangeRegistry().getExchangeNames();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Available Exchanges");
+
+ for (AMQShortString queue : queues)
+ {
+ data.add(queue.toString());
+ }
+
+ return data;
+ }
+
+ private java.util.List<String> listQueues(VirtualHost vhost, AMQShortString exchangeName)
+ {
+ return listQueues(vhost, vhost.getExchangeRegistry().getExchange(exchangeName));
+ }
+
+ private java.util.List<String> listQueues(VirtualHost vhost, Exchange exchange)
+ {
+ Collection<AMQQueue> queues = vhost.getQueueRegistry().getQueues();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Available Queues");
+
+ for (AMQQueue queue : queues)
+ {
+ if (exchange != null)
+ {
+ if (exchange.isBound(queue))
+ {
+ data.add(queue.getName().toString());
+ }
+ }
+ else
+ {
+ data.add(queue.getName().toString());
+ }
+ }
+
+ if (exchange != null)
+ {
+ if (queues.size() == 1)
+ {
+ return null;
+ }
+ }
+
+ return data;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java
new file mode 100644
index 0000000000..244a311c30
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Load extends AbstractCommand
+{
+ public Load(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Loads specified broker configuration file.";
+ }
+
+ public String usage()
+ {
+ return "load <configuration file>";
+ }
+
+ public String getCommand()
+ {
+ return "load";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 2)
+ {
+ _console.print("load " + args[1] + ": additional options not understood:");
+ for (int i = 2; i < args.length; i++)
+ {
+ _console.print(args[i] + " ");
+ }
+ _console.println("");
+ }
+ else if (args.length < 2)
+ {
+ _console.println("Enter Configuration file.");
+ String input = _console.readln();
+ if (input != null)
+ {
+ doLoad(input);
+ }
+ else
+ {
+ _console.println("Did not recognise config file.");
+ }
+ }
+ else
+ {
+ doLoad(args[1]);
+ }
+ }
+
+ private void doLoad(String configfile)
+ {
+ _console.println("Loading Configuration:" + configfile);
+
+ try
+ {
+ _tool.setConfigurationFile(configfile);
+ }
+ catch (Configuration.InitException e)
+ {
+ _console.println("Unable to open config file due to: '" + e.getMessage() + "'");
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
new file mode 100644
index 0000000000..a9497fd23e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+import java.util.List;
+
+public class Move extends AbstractCommand
+{
+
+ /**
+ * Since the Coopy command is not associated with a real channel we can safely create our own store context
+ * for use in the few methods that require one.
+ */
+ private StoreContext _storeContext = new StoreContext();
+
+ public Move(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Move messages between queues.\n" +
+ "The currently selected message set will be moved to the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line.";
+ }
+
+ public String usage()
+ {
+ return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "move";
+ }
+
+ public void execute(String... args)
+ {
+ AMQQueue toQueue = null;
+ AMQQueue fromQueue = _tool.getState().getQueue();
+ java.util.List<Long> msgids = _tool.getState().getMessages();
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("to="))
+ {
+ String queueName = arg.substring(arg.indexOf("=") + 1);
+ toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ }
+
+ if (arg.startsWith("from="))
+ {
+ String queueName = arg.substring(arg.indexOf("=") + 1);
+ fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ }
+
+ if (arg.startsWith("msgids="))
+ {
+ String msgidStr = arg.substring(arg.indexOf("=") + 1);
+
+ // Record the current message selection
+ java.util.List<Long> currentIDs = _tool.getState().getMessages();
+
+ // Use the ToolState class to perform the messasge parsing
+ _tool.getState().setMessages(msgidStr);
+ msgids = _tool.getState().getMessages();
+
+ // Reset the original selection of messages
+ _tool.getState().setMessages(currentIDs);
+ }
+ }
+ }
+
+ if (!checkRequirements(fromQueue, toQueue, msgids))
+ {
+ return;
+ }
+
+ processIDs(fromQueue, toQueue, msgids);
+ }
+
+ private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
+ {
+ Long previous = null;
+ Long start = null;
+
+ for (long id : msgids)
+ {
+ if (previous != null)
+ {
+ if (id == previous + 1)
+ {
+ if (start == null)
+ {
+ start = previous;
+ }
+ }
+ else
+ {
+ if (start != null)
+ {
+ //move a range of ids
+ doCommand(fromQueue, start, id, toQueue, _storeContext);
+ }
+ else
+ {
+ //move a single id
+ doCommand(fromQueue, id, id, toQueue, _storeContext);
+ }
+ }
+ }
+
+ previous = id;
+ }
+ }
+
+ protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids)
+ {
+ if (toQueue == null)
+ {
+ _console.println("Destination queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ if (fromQueue == null)
+ {
+ _console.println("Source queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue, StoreContext storeContext)
+ {
+ fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
new file mode 100644
index 0000000000..7154159b40
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Purge extends Move
+{
+ public Purge(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Purge messages from a queue.\n" +
+ "The currently selected message set will be purged from the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line.";
+ }
+
+ public String usage()
+ {
+ return "purge from=<queue> [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "purge";
+ }
+
+
+ protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
+ {
+ if (fromQueue == null)
+ {
+ _console.println("Source queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext)
+ {
+ fromQueue.removeMessagesFromQueue(start, end, storeContext);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java
new file mode 100644
index 0000000000..a81bc07c38
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Quit extends AbstractCommand
+{
+ public Quit(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Quit the tool.";
+ }
+
+ public String usage()
+ {
+ return "quit";
+ }
+
+ public String getCommand()
+ {
+ return "quit";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals("quit");
+
+ _tool.quit();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
new file mode 100644
index 0000000000..5e9b7028e9
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+import java.util.LinkedList;
+import java.util.StringTokenizer;
+
+public class Select extends AbstractCommand
+{
+
+ public Select(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Perform a selection.";
+ }
+
+ public String usage()
+ {
+ return "select virtualhost <name> |exchange <name> |queue <name> | msgs id=<msgids eg. 1,2,4-10>";
+ }
+
+ public String getCommand()
+ {
+ return "select";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 2;
+ assert args[0].equals("select");
+
+ if (args.length < 3)
+ {
+ if (args[1].equals("show"))
+ {
+ doSelect(args[1], null);
+ }
+ else
+ {
+ _console.print("select : unknown command:");
+ _console.println(help());
+ }
+ }
+ else
+ {
+ if (args[1].equals("virtualhost")
+ || args[1].equals("vhost")
+ || args[1].equals("exchange")
+ || args[1].equals("queue")
+ || args[1].equals("msg")
+ )
+ {
+ doSelect(args[1], args[2]);
+ }
+ else
+ {
+ _console.println(help());
+ }
+ }
+ }
+
+ private void doSelect(String type, String item)
+ {
+ if (type.equals("virtualhost"))
+ {
+
+ VirtualHost vhost = ApplicationRegistry.getInstance()
+ .getVirtualHostRegistry().getVirtualHost(item);
+
+ if (vhost == null)
+ {
+ _console.println("Virtualhost '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setVhost(vhost);
+ }
+ }
+
+ if (type.equals("exchange"))
+ {
+
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ if (vhost == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ return;
+ }
+
+
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(new AMQShortString(item));
+
+ if (exchange == null)
+ {
+ _console.println("Exchange '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setExchange(exchange);
+ }
+
+ if (_tool.getState().getQueue() != null)
+ {
+ if (!exchange.isBound(_tool.getState().getQueue()))
+ {
+ _tool.getState().setQueue(null);
+ }
+ }
+ }
+
+ if (type.equals("queue"))
+ {
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ if (vhost == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ return;
+ }
+
+ AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(item));
+
+ if (queue == null)
+ {
+ _console.println("Queue '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setQueue(queue);
+
+ if (_tool.getState().getExchange() == null)
+ {
+ for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames())
+ {
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ if (exchange.isBound(queue))
+ {
+ _tool.getState().setExchange(exchange);
+ break;
+ }
+ }
+ }
+
+ //remove the message selection
+ _tool.getState().setMessages((java.util.List<Long>) null);
+ }
+ }
+
+ if (type.equals("msg"))
+ {
+ if (item.startsWith("id="))
+ {
+ StringTokenizer tok = new StringTokenizer(item.substring(item.indexOf("=") + 1), ",");
+
+ java.util.List<Long> msgids = null;
+
+ if (tok.hasMoreTokens())
+ {
+ msgids = new LinkedList<Long>();
+ }
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+ if (next.contains("-"))
+ {
+ Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
+ Long end = Long.parseLong(next.substring(next.indexOf("-") + 1));
+
+ if (end >= start)
+ {
+ for (long l = start; l <= end; l++)
+ {
+ msgids.add(l);
+ }
+ }
+ }
+ else
+ {
+ msgids.add(Long.parseLong(next));
+ }
+ }
+
+ _tool.getState().setMessages(msgids);
+ }
+
+ }
+
+ if (type.equals("show"))
+ {
+ _console.println(_tool.getState().toString());
+ if (_tool.getState().getMessages() != null)
+ {
+ _console.print("Msgs:");
+ for (Long l : _tool.getState().getMessages())
+ {
+ _console.print(" " + l);
+ }
+ _console.println("");
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
new file mode 100644
index 0000000000..5988cdabfc
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+public class Show extends AbstractCommand
+{
+ protected boolean _amqHeaders = false;
+ protected boolean _routing = false;
+ protected boolean _msgHeaders = false;
+
+ public Show(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Shows the messages headers.";
+ }
+
+ public String usage()
+ {
+ return getCommand() + " [show=[all],[msgheaders],[amqheaders],[routing]] [id=<msgid e.g. 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "show";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length < 2)
+ {
+ parseArgs("all");
+ }
+ else
+ {
+ parseArgs(args);
+ }
+
+ performShow();
+ }
+
+ protected void parseArgs(String... args)
+ {
+ List<Long> msgids = null;
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("show="))
+ {
+ _msgHeaders = arg.contains("msgheaders") || arg.contains("all");
+ _amqHeaders = arg.contains("amqheaders") || arg.contains("all");
+ _routing = arg.contains("routing") || arg.contains("all");
+ }
+
+ if (arg.startsWith("id="))
+ {
+ _tool.getState().setMessages(msgids);
+ }
+ }//for args
+ }// if args > 2
+ }
+
+ protected void performShow()
+ {
+ if (_tool.getState().getVhost() == null)
+ {
+ _console.println("No Virtualhost selected. 'DuSelect' a Virtualhost first.");
+ return;
+ }
+
+ AMQQueue _queue = _tool.getState().getQueue();
+
+ List<Long> msgids = _tool.getState().getMessages();
+
+ if (_queue != null)
+ {
+ List<AMQMessage> messages = _queue.getMessagesOnTheQueue();
+ if (messages == null || messages.size() == 0)
+ {
+ _console.println("No messages on queue");
+ return;
+ }
+
+ List<List> data = createMessageData(msgids, messages, _amqHeaders, _routing, _msgHeaders);
+ if (data != null)
+ {
+ _console.printMap(null, data);
+ }
+ else
+ {
+ String message = "No data to display.";
+ if (msgids != null)
+ {
+ message += " Is message selection correct? " + _tool.getState().printMessages();
+ }
+ _console.println(message);
+ }
+
+ }
+ else
+ {
+ _console.println("No Queue specified to show.");
+ }
+ }
+
+ /**
+ * Create the list data for display from the messages.
+ *
+ * @param msgids The list of message ids to display
+ * @param messages A list of messages to format and display.
+ * @param showHeaders should the header info be shown
+ * @param showRouting show the routing info be shown
+ * @param showMessageHeaders show the msg headers be shown
+ * @return the formated data lists for printing
+ */
+ protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ boolean showMessageHeaders)
+ {
+
+ // Currenly exposed message properties
+// //Printing the content Body
+// msg.getContentBodyIterator();
+// //Print the Headers
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppIdAsString();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getClusterId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getContentType();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getCorrelationId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getDeliveryMode();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getTimestamp();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getType();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getUserId();
+//
+// //Print out all the property names
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames();
+//
+// msg.getMessageId();
+// msg.getSize();
+// msg.getArrivalTime();
+
+// msg.getDeliveredSubscription();
+// msg.getDeliveredToConsumer();
+// msg.getMessageHandle();
+// msg.getMessageId();
+// msg.getMessagePublishInfo();
+// msg.getPublisher();
+
+// msg.getStoreContext();
+// msg.isAllContentReceived();
+// msg.isPersistent();
+// msg.isRedelivered();
+// msg.isRejectedBy();
+// msg.isTaken();
+
+ //Header setup
+
+ List<List> data = new LinkedList<List>();
+
+ List<String> id = new LinkedList<String>();
+ data.add(id);
+ id.add(Columns.ID.name());
+ id.add(Console.ROW_DIVIDER);
+
+ List<String> exchange = new LinkedList<String>();
+ List<String> routingkey = new LinkedList<String>();
+ List<String> immediate = new LinkedList<String>();
+ List<String> mandatory = new LinkedList<String>();
+ if (showRouting)
+ {
+ data.add(exchange);
+ exchange.add(Columns.Exchange.name());
+ exchange.add(Console.ROW_DIVIDER);
+
+ data.add(routingkey);
+ routingkey.add(Columns.RoutingKey.name());
+ routingkey.add(Console.ROW_DIVIDER);
+
+ data.add(immediate);
+ immediate.add(Columns.isImmediate.name());
+ immediate.add(Console.ROW_DIVIDER);
+
+ data.add(mandatory);
+ mandatory.add(Columns.isMandatory.name());
+ mandatory.add(Console.ROW_DIVIDER);
+ }
+
+ List<String> size = new LinkedList<String>();
+ List<String> appid = new LinkedList<String>();
+ List<String> clusterid = new LinkedList<String>();
+ List<String> contenttype = new LinkedList<String>();
+ List<String> correlationid = new LinkedList<String>();
+ List<String> deliverymode = new LinkedList<String>();
+ List<String> encoding = new LinkedList<String>();
+ List<String> arrival = new LinkedList<String>();
+ List<String> expiration = new LinkedList<String>();
+ List<String> priority = new LinkedList<String>();
+ List<String> propertyflag = new LinkedList<String>();
+ List<String> replyto = new LinkedList<String>();
+ List<String> timestamp = new LinkedList<String>();
+ List<String> type = new LinkedList<String>();
+ List<String> userid = new LinkedList<String>();
+ List<String> ispersitent = new LinkedList<String>();
+ List<String> isredelivered = new LinkedList<String>();
+ List<String> isdelivered = new LinkedList<String>();
+
+ data.add(size);
+ size.add(Columns.Size.name());
+ size.add(Console.ROW_DIVIDER);
+
+ if (showHeaders)
+ {
+ data.add(ispersitent);
+ ispersitent.add(Columns.isPersistent.name());
+ ispersitent.add(Console.ROW_DIVIDER);
+
+ data.add(isredelivered);
+ isredelivered.add(Columns.isRedelivered.name());
+ isredelivered.add(Console.ROW_DIVIDER);
+
+ data.add(isdelivered);
+ isdelivered.add(Columns.isDelivered.name());
+ isdelivered.add(Console.ROW_DIVIDER);
+
+ data.add(appid);
+ appid.add(Columns.App_ID.name());
+ appid.add(Console.ROW_DIVIDER);
+
+ data.add(clusterid);
+ clusterid.add(Columns.Cluster_ID.name());
+ clusterid.add(Console.ROW_DIVIDER);
+
+ data.add(contenttype);
+ contenttype.add(Columns.Content_Type.name());
+ contenttype.add(Console.ROW_DIVIDER);
+
+ data.add(correlationid);
+ correlationid.add(Columns.Correlation_ID.name());
+ correlationid.add(Console.ROW_DIVIDER);
+
+ data.add(deliverymode);
+ deliverymode.add(Columns.Delivery_Mode.name());
+ deliverymode.add(Console.ROW_DIVIDER);
+
+ data.add(encoding);
+ encoding.add(Columns.Encoding.name());
+ encoding.add(Console.ROW_DIVIDER);
+
+ data.add(arrival);
+ expiration.add(Columns.Arrival.name());
+ expiration.add(Console.ROW_DIVIDER);
+
+ data.add(expiration);
+ expiration.add(Columns.Expiration.name());
+ expiration.add(Console.ROW_DIVIDER);
+
+ data.add(priority);
+ priority.add(Columns.Priority.name());
+ priority.add(Console.ROW_DIVIDER);
+
+ data.add(propertyflag);
+ propertyflag.add(Columns.Property_Flag.name());
+ propertyflag.add(Console.ROW_DIVIDER);
+
+ data.add(replyto);
+ replyto.add(Columns.ReplyTo.name());
+ replyto.add(Console.ROW_DIVIDER);
+
+ data.add(timestamp);
+ timestamp.add(Columns.Timestamp.name());
+ timestamp.add(Console.ROW_DIVIDER);
+
+ data.add(type);
+ type.add(Columns.Type.name());
+ type.add(Console.ROW_DIVIDER);
+
+ data.add(userid);
+ userid.add(Columns.UserID.name());
+ userid.add(Console.ROW_DIVIDER);
+ }
+
+ List<String> msgHeaders = new LinkedList<String>();
+ if (showMessageHeaders)
+ {
+ data.add(msgHeaders);
+ msgHeaders.add(Columns.MsgHeaders.name());
+ msgHeaders.add(Console.ROW_DIVIDER);
+ }
+
+ //Add create the table of data
+ for (AMQMessage msg : messages)
+ {
+ if (!includeMsg(msg, msgids))
+ {
+ continue;
+ }
+
+ id.add(msg.getMessageId().toString());
+
+ size.add("" + msg.getSize());
+
+ arrival.add("" + msg.getArrivalTime());
+
+ try
+ {
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
+ }
+ catch (AMQException e)
+ {
+ ispersitent.add("n/a");
+ }
+
+ isredelivered.add(msg.isRedelivered() ? "true" : "false");
+
+ isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
+
+// msg.getMessageHandle();
+
+ BasicContentHeaderProperties headers = null;
+
+ try
+ {
+ headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+ }
+ catch (AMQException e)
+ {
+ //ignore
+// commandError("Unable to read properties for message: " + e.getMessage(), null);
+ }
+
+ if (headers != null)
+ {
+ String appidS = headers.getAppIdAsString();
+ appid.add(appidS == null ? "null" : appidS);
+
+ String clusterS = headers.getClusterIdAsString();
+ clusterid.add(clusterS == null ? "null" : clusterS);
+
+ String contentS = headers.getContentTypeAsString();
+ contenttype.add(contentS == null ? "null" : contentS);
+
+ String correlationS = headers.getCorrelationIdAsString();
+ correlationid.add(correlationS == null ? "null" : correlationS);
+
+ deliverymode.add("" + headers.getDeliveryMode());
+
+ AMQShortString encodeSS = headers.getEncoding();
+ encoding.add(encodeSS == null ? "null" : encodeSS.toString());
+
+ expiration.add("" + headers.getExpiration());
+
+ FieldTable headerFT = headers.getHeaders();
+ msgHeaders.add(headerFT == null ? "none" : "" + headerFT.toString());
+
+ priority.add("" + headers.getPriority());
+ propertyflag.add("" + headers.getPropertyFlags());
+
+ AMQShortString replytoSS = headers.getReplyTo();
+ replyto.add(replytoSS == null ? "null" : replytoSS.toString());
+
+ timestamp.add("" + headers.getTimestamp());
+
+ AMQShortString typeSS = headers.getType();
+ type.add(typeSS == null ? "null" : typeSS.toString());
+
+ AMQShortString useridSS = headers.getUserId();
+ userid.add(useridSS == null ? "null" : useridSS.toString());
+
+ MessagePublishInfo info = null;
+ try
+ {
+ info = msg.getMessagePublishInfo();
+ }
+ catch (AMQException e)
+ {
+ //ignore
+ }
+
+ if (info != null)
+ {
+ AMQShortString exchangeSS = info.getExchange();
+ exchange.add(exchangeSS == null ? "null" : exchangeSS.toString());
+
+ AMQShortString routingkeySS = info.getRoutingKey();
+ routingkey.add(routingkeySS == null ? "null" : routingkeySS.toString());
+
+ immediate.add(info.isImmediate() ? "true" : "false");
+ mandatory.add(info.isMandatory() ? "true" : "false");
+ }
+
+// msg.getPublisher(); -- only used in clustering
+// msg.getStoreContext();
+// msg.isAllContentReceived();
+
+ }// if headers!=null
+
+// need to access internal map and do lookups.
+// msg.isTaken();
+// msg.getDeliveredSubscription();
+// msg.isRejectedBy();
+
+ }
+
+ // if id only had the header and the divider in it then we have no data to display
+ if (id.size() == 2)
+ {
+ return null;
+ }
+ return data;
+ }
+
+ protected boolean includeMsg(AMQMessage msg, List<Long> msgids)
+ {
+ if (msgids == null)
+ {
+ return true;
+ }
+
+ Long msgid = msg.getMessageId();
+
+ boolean found = false;
+
+ if (msgids != null)
+ {
+ //check msgid is in msgids
+ for (Long l : msgids)
+ {
+ if (l.equals(msgid))
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ return found;
+ }
+
+ public enum Columns
+ {
+ ID,
+ Size,
+ Exchange,
+ RoutingKey,
+ isImmediate,
+ isMandatory,
+ isPersistent,
+ isRedelivered,
+ isDelivered,
+ App_ID,
+ Cluster_ID,
+ Content_Type,
+ Correlation_ID,
+ Delivery_Mode,
+ Encoding,
+ Arrival,
+ Expiration,
+ Priority,
+ Property_Flag,
+ ReplyTo,
+ Timestamp,
+ Type,
+ UserID,
+ MsgHeaders
+ }
+}
+
+
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java b/java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java
index f9e093dba7..c27c52eb8e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java
@@ -18,7 +18,7 @@
*
*
*/
-package org.apache.qpid.server.security;
+package org.apache.qpid.tools.security;
import org.apache.commons.codec.binary.Base64;
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java
new file mode 100644
index 0000000000..986fea32cc
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+public interface CommandParser
+{
+ /**
+ * If there is more than one command received on the last parse request.
+ *
+ * Subsequent calls to parse will utilise this input rather than reading new data from the input source
+ * @return boolean
+ */
+ boolean more();
+
+ /**
+ * True if the currently parsed command has been requested as a background operation
+ *
+ * @return boolean
+ */
+ boolean isBackground();
+
+ /**
+ * Parses user commands, and groups tokens in the
+ * String[] format that all Java main's love.
+ *
+ * If more than one command is provided in one input line then the more() method will return true.
+ * A subsequent call to parse() will continue to parse that input line before reading new input.
+ *
+ * @return <code>input</code> split in args[] format; null if eof.
+ * @throws java.io.IOException if there is a problem reading from the input stream
+ */
+ String[] parse() throws java.io.IOException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java
new file mode 100644
index 0000000000..cf457d1ea5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+import java.util.List;
+
+public interface Console
+{
+ public enum CellFormat
+ {
+ CENTRED, LEFT, RIGHT
+ }
+
+ public static String ROW_DIVIDER = "*divider";
+
+ public void print(String... message);
+
+ public void println(String... message);
+
+ public String readln();
+
+ /**
+ * Reads and parses the command line.
+ *
+ *
+ * @return The next command or null
+ */
+ public String[] readCommand();
+
+ public CommandParser getCommandParser();
+
+ public void setCommandParser(CommandParser parser);
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +-------------+
+ * | Heading |
+ * +-------------+
+ * | Item 1 |
+ * | Item 2 |
+ * | Item 3 |
+ * +-------------+
+ *
+ * @param hasTitle should list[0] be used as a heading
+ * @param list The list of Strings to display
+ */
+ public void displayList(boolean hasTitle, String... list);
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +----------------------------+
+ * | Heading |
+ * +----------------------------+
+ * | title | title | ..
+ * +----------------------------+
+ * | Item 2 | value 2 | ..
+ * +----------------------------+ (*divider)
+ * | Item 3 | value 2 | ..
+ * +----------------------------+
+ *
+ * @param title The title to display if any
+ * @param entries the entries to display in a map.
+ */
+ void printMap(String title, List<List> entries);
+
+
+ public void close();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java
new file mode 100644
index 0000000000..09444ccdd7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+public class SimpleCommandParser implements CommandParser
+{
+ private static final String COMMAND_SEPERATOR = ";";
+
+ /** Input source of commands */
+ protected BufferedReader _reader;
+
+ /** The next list of commands from the command line */
+ private StringBuilder _nextCommand = null;
+
+ public SimpleCommandParser(BufferedReader reader)
+ {
+ _reader = reader;
+ }
+
+ public boolean more()
+ {
+ return _nextCommand != null;
+ }
+
+ public boolean isBackground()
+ {
+ return false;
+ }
+
+ public String[] parse() throws IOException
+ {
+ String[] commands = null;
+
+ String input = null;
+
+ if (_nextCommand == null)
+ {
+ input = _reader.readLine();
+ }
+ else
+ {
+ input = _nextCommand.toString();
+ _nextCommand = null;
+ }
+
+ if (input == null)
+ {
+ return null;
+ }
+
+ StringTokenizer tok = new StringTokenizer(input, " ");
+
+ int tokenCount = tok.countTokens();
+ int index = 0;
+
+ if (tokenCount > 0)
+ {
+ commands = new String[tokenCount];
+ boolean commandComplete = false;
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+
+ if (next.equals(COMMAND_SEPERATOR))
+ {
+ commandComplete = true;
+ _nextCommand = new StringBuilder();
+ continue;
+ }
+
+ if (commandComplete)
+ {
+ _nextCommand.append(next);
+ _nextCommand.append(" ");
+ }
+ else
+ {
+ commands[index] = next;
+ index++;
+ }
+ }
+
+ }
+
+ //Reduce the String[] if not all the tokens were used in this command.
+ // i.e. there is more than one command on the line.
+ if (index != tokenCount)
+ {
+ String[] shortCommands = new String[index];
+ System.arraycopy(commands, 0, shortCommands, 0, index);
+ return shortCommands;
+ }
+ else
+ {
+ return commands;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java
new file mode 100644
index 0000000000..ec080a4611
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimpleConsole implements Console
+{
+ /** SLF4J Logger. */
+ private static Logger _devlog = LoggerFactory.getLogger(SimpleConsole.class);
+
+ /** Console Writer. */
+ protected static BufferedWriter _consoleWriter;
+
+ /** Console Reader. */
+ protected static BufferedReader _consoleReader;
+
+ /** Parser for command-line input. */
+ protected CommandParser _parser;
+
+ public SimpleConsole(BufferedWriter writer, BufferedReader reader)
+ {
+ _consoleWriter = writer;
+ _consoleReader = reader;
+ _parser = new SimpleCommandParser(_consoleReader);
+ }
+
+ public void print(String... message)
+ {
+ try
+ {
+ for (String s : message)
+ {
+ _consoleWriter.write(s);
+ }
+ _consoleWriter.flush();
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to write:" + message);
+ }
+
+ }
+
+ public void println(String... message)
+ {
+ print(message);
+ print(System.getProperty("line.separator"));
+ }
+
+
+ public String readln()
+ {
+ try
+ {
+ return _consoleReader.readLine();
+ }
+ catch (IOException e)
+ {
+ _devlog.debug("Unable to read input due to:" + e.getMessage());
+ return null;
+ }
+ }
+
+ public String[] readCommand()
+ {
+ try
+ {
+ return _parser.parse();
+ }
+ catch (IOException e)
+ {
+ _devlog.error("Error reading command:" + e.getMessage());
+ return new String[0];
+ }
+ }
+
+ public CommandParser getCommandParser()
+ {
+ return _parser;
+ }
+
+ public void setCommandParser(CommandParser parser)
+ {
+ _parser = parser;
+ }
+
+ public void displayList(boolean hasTitle, String... list)
+ {
+ java.util.List<java.util.List> data = new LinkedList<List>();
+
+ java.util.List<String> values = new LinkedList<String>();
+
+ data.add(values);
+
+ for (String value : list)
+ {
+ values.add(value);
+ }
+
+ if (hasTitle)
+ {
+ values.add(1, "*divider");
+ }
+
+ printMap(null, data);
+ }
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +----------------------------+
+ * | Heading |
+ * +----------------------------+
+ * | title | title | ..
+ * +----------------------------+
+ * | Item 2 | value 2 | ..
+ * | Item 3 | value 2 | ..
+ * +----------------------------+
+ *
+ * @param title The title to display if any
+ * @param entries the entries to display in a map.
+ */
+ public void printMap(String title, java.util.List<java.util.List> entries)
+ {
+ try
+ {
+ int columns = entries.size();
+
+ int[] columnWidth = new int[columns];
+
+ // calculate row count
+ int rowMax = 0;
+
+ //the longest item
+ int itemMax = 0;
+
+ for (int i = 0; i < columns; i++)
+ {
+ int columnIRowMax = entries.get(i).size();
+
+ if (columnIRowMax > rowMax)
+ {
+ rowMax = columnIRowMax;
+ }
+ for (Object values : entries.get(i))
+ {
+ if (values.toString().equals(Console.ROW_DIVIDER))
+ {
+ continue;
+ }
+
+ int itemLength = values.toString().length();
+
+ //note for single width
+ if (itemLength > itemMax)
+ {
+ itemMax = itemLength;
+ }
+
+ //note for mulit width
+ if (itemLength > columnWidth[i])
+ {
+ columnWidth[i] = itemLength;
+ }
+
+ }
+ }
+
+ int tableWidth = 0;
+
+
+ for (int i = 0; i < columns; i++)
+ {
+ // plus 2 for the space padding
+ columnWidth[i] += 2;
+ }
+ for (int size : columnWidth)
+ {
+ tableWidth += size;
+ }
+ tableWidth += (columns - 1);
+
+ if (title != null)
+ {
+ if (title.length() > tableWidth)
+ {
+ tableWidth = title.length();
+ }
+
+ printCellRow("+", "-", tableWidth);
+
+ printCell(CellFormat.CENTRED, "|", tableWidth, " " + title + " ", 0);
+ _consoleWriter.newLine();
+
+ }
+
+ //put top line | or bottom of title
+ printCellRow("+", "-", tableWidth);
+
+ //print the table data
+ int row = 0;
+
+ for (; row < rowMax; row++)
+ {
+ for (int i = 0; i < columns; i++)
+ {
+ java.util.List columnData = entries.get(i);
+
+ String value;
+ // does this column have a value for this row
+ if (columnData.size() > row)
+ {
+ value = " " + columnData.get(row).toString() + " ";
+ }
+ else
+ {
+ value = " ";
+ }
+
+ if (i == 0 && value.equals(" " + Console.ROW_DIVIDER + " "))
+ {
+ printCellRow("+", "-", tableWidth);
+ //move on to the next row
+ break;
+ }
+ else
+ {
+ printCell(CellFormat.LEFT, "|", columnWidth[i], value, i);
+ }
+
+ // if it is the last row then do a new line.
+ if (i == columns - 1)
+ {
+ _consoleWriter.newLine();
+ }
+ }
+ }
+
+ printCellRow("+", "-", tableWidth);
+
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to write.");
+ }
+ }
+
+ public void close()
+ {
+
+ try
+ {
+ _consoleReader.close();
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to close reader.");
+ }
+
+ try
+ {
+
+ _consoleWriter.close();
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to close writer.");
+ }
+
+ }
+
+ private void printCell(CellFormat format, String edge, int cellWidth, String cell, int column) throws IOException
+ {
+ int pad = cellWidth - cell.length();
+
+ if (column == 0)
+ {
+ _consoleWriter.write(edge);
+ }
+
+ switch (format)
+ {
+ case CENTRED:
+ printPad(" ", pad / 2);
+ break;
+ case RIGHT:
+ printPad(" ", pad);
+ break;
+ }
+
+ _consoleWriter.write(cell);
+
+
+ switch (format)
+ {
+ case CENTRED:
+ // if pad isn't even put the extra one on the right
+ if (pad % 2 == 0)
+ {
+ printPad(" ", pad / 2);
+ }
+ else
+ {
+ printPad(" ", (pad / 2) + 1);
+ }
+ break;
+ case LEFT:
+ printPad(" ", pad);
+ break;
+ }
+
+
+ _consoleWriter.write(edge);
+
+ }
+
+ private void printCellRow(String edge, String mid, int cellWidth) throws IOException
+ {
+ _consoleWriter.write(edge);
+
+ printPad(mid, cellWidth);
+
+ _consoleWriter.write(edge);
+ _consoleWriter.newLine();
+ }
+
+ private void printPad(String padChar, int count) throws IOException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ _consoleWriter.write(padChar);
+ }
+ }
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java
new file mode 100644
index 0000000000..6a95529059
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package com.redhat.etp.qpid.utils;
+
+public interface CommandParser
+{
+ /**
+ * If there is more than one command received on the last parse request.
+ *
+ * Subsequent calls to parse will utilise this input rather than reading new data from the input source
+ * @return boolean
+ */
+ boolean more();
+
+ /**
+ * True if the currently parsed command has been requested as a background operation
+ *
+ * @return boolean
+ */
+ boolean isBackground();
+
+ /**
+ * Parses user commands, and groups tokens in the
+ * String[] format that all Java main's love.
+ *
+ * If more than one command is provided in one input line then the more() method will return true.
+ * A subsequent call to parse() will continue to parse that input line before reading new input.
+ *
+ * @return <code>input</code> split in args[] format; null if eof.
+ * @throws java.io.IOException if there is a problem reading from the input stream
+ */
+ String[] parse() throws java.io.IOException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java
new file mode 100644
index 0000000000..892a48254c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package com.redhat.etp.qpid;
+
+import java.util.List;
+
+public interface Console
+{
+ public enum CellFormat
+ {
+ CENTRED, LEFT, RIGHT
+ }
+
+ public static String ROW_DIVIDER = "*divider";
+
+ public void print(String... message);
+
+ public void println(String... message);
+
+ public String[] readln();
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +-------------+
+ * | Heading |
+ * +-------------+
+ * | Item 1 |
+ * | Item 2 |
+ * | Item 3 |
+ * +-------------+
+ *
+ * @param hasTitle should list[0] be used as a heading
+ * @param list The list of Strings to display
+ */
+ public void displayList(boolean hasTitle, String... list);
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +----------------------------+
+ * | Heading |
+ * +----------------------------+
+ * | title | title | ..
+ * +----------------------------+
+ * | Item 2 | value 2 | ..
+ * +----------------------------+ (*divider)
+ * | Item 3 | value 2 | ..
+ * +----------------------------+
+ *
+ * @param title The title to display if any
+ * @param entries the entries to display in a map.
+ */
+ void printMap(String title, List<List> entries);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java
new file mode 100644
index 0000000000..ea4045c917
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java
@@ -0,0 +1,352 @@
+/*
+ * The Java Shell: jsh core -- RELEASE: alpha3
+ * (C)1999 Osvaldo Pinali Doederlein.
+ *
+ * LICENSE
+ * =======
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * CHANGES
+ * =======
+ * 1.0.2 - Added support for non-quoted '\' escape char (Not interpreted by the shell)
+ * 1.0.1 - Added support for arguments in aliases
+ * 1.0.0 - Initial release; split from Shell and enhanced a lot.
+ *
+ * LINKS
+ * =====
+ * Contact: mailto@osvaldo.visionnaire.com.br, mailto@g.collin@appliweb.net
+ * Site #1: http://www.geocities.com/ResearchTriangle/Node/2005/
+ * Site #2: http://www.appliweb.net/jsh
+ */
+
+package com.redhat.etp.qpid.utils;
+
+import java.io.BufferedReader;
+import java.util.Vector;
+
+/**
+ * The Java Shell.
+ * <p>
+ * Provides an environment for launching and controlling Java apps.
+ * <p>
+ * TODO: - Support for applets!
+ * - Support for variable replacement
+ *
+ * @author Osvaldo Pinali Doederlein.
+ */
+public class RSHCommandParser implements CommandParser
+{
+ /** Where commands come from. */
+ protected BufferedReader reader;
+ /** Continuation for command line. */
+ protected String contLine = null;
+ /** Run next command in background? */
+ protected boolean background;
+ protected String[] env; // Commands passed in args.
+
+ public RSHCommandParser(BufferedReader reader)
+ {
+ this(reader, null);
+ }
+
+ public RSHCommandParser(BufferedReader reader, String env[])
+ {
+ this.reader = reader;
+ this.env = env;
+ if (env == null)
+ {
+ this.env = new String[0];
+ }
+ }
+
+
+ public boolean more()
+ {
+ return contLine != null;
+ }
+
+ public boolean isBackground()
+ {
+ return background;
+ }
+
+ /**
+ * Solves and expands aliases in an argument array.
+ * This expansion affects only the first (if any) argument; the
+ * typical thing to do, as we don't want to expand parameters.
+ * We recursively parse the result to be sure we expand everything.
+ * For example, an alias can be expanded to further aliases, or it can contains args.
+ *
+ * @param args Raw arguments.
+ * @return <code>args</code> resolved and expanded.
+ */
+ public static String[] expand(String[] args)
+ {
+ if (args.length > 0)
+ {
+ String expanded = args[0];//Alias.resolve(args[0]);
+
+ // Try to expand recursively the command line
+ if (expanded != args[0])
+ {
+ RSHCommandParser recurse = new RSHCommandParser(new BufferedReader(
+ new java.io.StringReader(expanded)));
+
+ try
+ {
+ String cmdLine[] = recurse.parse();
+ cmdLine = recurse.expand(cmdLine);
+
+ // do we need to handle new arguments and insert them in the command array ?
+ if (cmdLine.length > 1)
+ {
+ String[] newArgs = new String[cmdLine.length + args.length - 1];
+ System.arraycopy(cmdLine, 0, newArgs, 0, cmdLine.length);
+
+ if (args.length > 1)
+ {
+ System.arraycopy(args, 1, newArgs, cmdLine.length, args.length - 1);
+ }
+
+ args = newArgs;
+ }
+ else if (cmdLine.length == 1)
+ {
+ args[0] = cmdLine[0];
+ }
+ }
+ catch (java.io.IOException e)
+ {
+ }
+ }
+ }
+
+ return args;
+ }
+
+ public String[] parse() throws java.io.IOException
+ {
+ final int READ = 0, QUOTE = 1, SKIP = 2, ESCAPE = 3, NONQUOTEDESCAPE = 4, VARIABLE = 5;
+ final int EOF = 0xFFFF;
+ int mode = SKIP;
+ Vector<String> args = new Vector<String>();
+ StringBuffer current = new StringBuffer();
+ StringBuffer varName = null;
+ background = false;
+ String line;
+
+ if (contLine == null)
+ {
+ line = reader.readLine();
+ }
+ else
+ {
+ line = contLine;
+ contLine = null;
+ }
+
+ if (line == null)
+ {
+ reader = null;
+ return null;
+ }
+
+ for (int pos = 0; pos < line.length(); ++pos)
+ {
+ char c = line.charAt(pos);
+
+ switch (mode)
+ {
+ case SKIP:
+ switch (c)
+ {
+ case' ':
+ case'\t':
+ break;
+ case'\"':
+ mode = QUOTE;
+ break;
+ case'&':
+ background = true;
+ case';':
+ contLine = line.substring(pos + 1);
+ pos = line.length();
+ break;
+ default:
+ mode = READ;
+ --pos;
+ }
+ break;
+
+ case READ:
+ switch (c)
+ {
+ case'\"':
+ mode = QUOTE;
+ break;
+ case';':
+ case'&':
+ --pos;
+ case' ':
+ case'\t':
+ mode = SKIP;
+ break;
+ case'\\':
+ mode = NONQUOTEDESCAPE;
+ break;
+ case'$':
+ mode = VARIABLE;
+ varName = new StringBuffer();
+ break;
+ default:
+ current.append(c);
+ }
+ if ((mode != READ) && (mode != NONQUOTEDESCAPE))
+ {
+ args.addElement(current.toString());
+ current = new StringBuffer();
+ }
+ break;
+
+ case QUOTE:
+ switch (c)
+ {
+ case'\"':
+ mode = READ;
+ break;
+ case'\\':
+ mode = ESCAPE;
+ break;
+ default:
+ current.append(c);
+ }
+ break;
+
+ case ESCAPE:
+ switch (c)
+ {
+ case'n':
+ c = '\n';
+ break;
+ case'r':
+ c = '\r';
+ break;
+ case't':
+ c = '\t';
+ break;
+ case'b':
+ c = '\b';
+ break;
+ case'f':
+ c = '\f';
+ break;
+ default:
+ current.append('\\');
+ break;
+ }
+ mode = QUOTE;
+ current.append(c);
+ break;
+ case NONQUOTEDESCAPE:
+ switch (c)
+ {
+ case';':
+ mode = READ;
+ current.append(c);
+ break;
+ default: // This is not a escaped char.
+ mode = READ;
+ current.append('\\');
+ current.append(c);
+ break;
+ }
+ break;
+ case VARIABLE:
+ switch (c)
+ {
+ case'$':
+ {
+// String val = Set.get(new String(varName));
+// if (val != null)
+// {
+// current.append(val);
+// }
+ mode = READ;
+ break;
+ }
+ case'@':
+ {
+ StringBuffer val = new StringBuffer();
+ int i;
+ for (i = 0; i < env.length; i++)
+ {
+ val.append(env[i]);
+ val.append(' ');
+ }
+ current.append(val);
+ mode = READ;
+ break;
+ }
+ case'0':
+ case'1':
+ case'2':
+ case'3':
+ case'4':
+ case'5':
+ case'6':
+ case'7':
+ case'8':
+ case'9':
+ {
+ if (varName.length() == 0)
+ {
+ int value = Integer.parseInt(new String(new char[]{c}));
+ if (env.length > value)
+ {
+ current.append(env[value]);
+ }
+ mode = READ;
+ break;
+ }
+ // else fall back
+ }
+ default:
+ varName.append(c);
+ break;
+ }
+ break;
+ }
+ }
+
+ if (current.length() > 0)
+ {
+ args.addElement(current.toString());
+ }
+ return expand(toArray(args));
+ }
+
+ private String[] toArray(Vector strings)
+ {
+ String[] arr = new String[strings.size()];
+
+ for (int i = 0; i < strings.size(); ++i)
+ {
+ arr[i] = (String) strings.elementAt(i);
+ }
+
+ return arr;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java
new file mode 100644
index 0000000000..98e816554e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package com.redhat.etp.qpid.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+public class SimpleCommandParser implements CommandParser
+{
+ private static final String COMMAND_SEPERATOR = ";";
+
+ /** Input source of commands */
+ protected BufferedReader _reader;
+
+ /** The next list of commands from the command line */
+ private StringBuilder _nextCommand = null;
+
+ public SimpleCommandParser(BufferedReader reader)
+ {
+ _reader = reader;
+ }
+
+ public boolean more()
+ {
+ return _nextCommand != null;
+ }
+
+ public boolean isBackground()
+ {
+ return false;
+ }
+
+ public String[] parse() throws IOException
+ {
+ String[] commands = null;
+
+ String input = null;
+
+ if (_nextCommand == null)
+ {
+ input = _reader.readLine();
+ }
+ else
+ {
+ input = _nextCommand.toString();
+ _nextCommand = null;
+ }
+
+ StringTokenizer tok = new StringTokenizer(input, " ");
+
+ int tokenCount = tok.countTokens();
+ int index = 0;
+
+ if (tokenCount > 0)
+ {
+ commands = new String[tokenCount];
+ boolean commandComplete = false;
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+
+ if (next.equals(COMMAND_SEPERATOR))
+ {
+ commandComplete = true;
+ _nextCommand = new StringBuilder();
+ continue;
+ }
+
+ if (commandComplete)
+ {
+ _nextCommand.append(next);
+ _nextCommand.append(" ");
+ }
+ else
+ {
+ commands[index] = next;
+ index++;
+ }
+ }
+
+ }
+
+ //Reduce the String[] if not all the tokens were used in this command.
+ // i.e. there is more than one command on the line.
+ if (index != tokenCount)
+ {
+ String[] shortCommands = new String[index];
+ System.arraycopy(commands, 0, shortCommands, 0, index);
+ return shortCommands;
+ }
+ else
+ {
+ return commands;
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
index 9653155a51..3bca0a4545 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.exchange;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
index 89b0e068d9..0c0d8f471e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.protocol;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index a7e5c0d1d0..94d67848c6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.queue;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index acd5e0772f..76b67314e6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.queue;