diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-04-20 10:52:26 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-20 10:52:26 +0000 |
| commit | 959cffd1298dc79218182cbcf73d8e494b2b06e3 (patch) | |
| tree | 83bbfbd944accfd5ec017fdb7ee6bda4be10ce3a /java/broker/src | |
| parent | 43350455ee022494c324080993fdbd37d0788426 (diff) | |
| download | qpid-python-959cffd1298dc79218182cbcf73d8e494b2b06e3.tar.gz | |
Merged revisions 1-447993,447995-448007,448009-448141,448143-448157,448161-448194,448196-448210,448212-448218,448220-448223,448225-448233,448235,448237-448241,448243-448596,448598-448623,448625-448850,448852-448880,448882-448982,448984-449635,449637-449639,449641-449642,449644-449645,449647-449674,449676-449719,449721-449749,449751-449762,449764-449933,449935-449941,449943-450383,450385,450387-450400,450402-450433,450435-450503,450505-450555,450557-450860,450862-451024,451026-451149,451151-451316,451318-451931,451933-452139,452141-452162,452164-452320,452322,452324-452325,452327-452333,452335-452429,452431-452528,452530-452545,452547-453192,453194-453195,453197-453536,453538,453540-453656,453658-454676,454678-454735,454737,454739-454781,454783-462728,462730-462819,462821-462833,462835-462839,462841-463071,463073-463178,463180-463308,463310-463362,463364-463375,463377-463396,463398-463402,463404-463409,463411-463661,463663-463670,463672-463673,463675-464493,464495-464502,464504-464576,464578-464613,464615-464628,464630,464632-464866,464868-464899,464901-464942,464944-464949,464951-465004,465006-465016,465018-465053,465055-465165,465167-465321,465323-465406,465408-465427,465429-465431,465433-465548,465550-466044,466047-466075,466077,466079-466081,466083-466099,466101-466112,466114-466126,466128-466240,466242-466971,466973-466978,466980-467309,467311-467312,467316-467328,467330-467485,467487-467588,467590-467604,467606-467699,467701-467706,467708-467749,467751-468069,468071-468537,468539-469241,469244-469246,469248-469318,469320-469421,469423,469425-469429,469431-469435,469437-469462,469464-469469,469472-469477,469479-469490,469492-469503,469505-469529,469531-469598,469600-469624,469626-469737,469739-469752,469754-469806,469808-469928,469930-469953,469955-470011,470013-470109,470111-470335,470338-470339,470341-470379,470381,470383-470399,470401-470446,470448-470741,470743-470758,470760-470809,470811-470817,470819-470993,470995-471001,471003-471788,471790-471792,471794-472028,472030-472032,472034-472036,472038,472040,472043,472045-472059,472061,472063,472065-472066,472068,472070-472072,472074-472080,472082,472084-472092,472094-472107,472109-472123,472125-472158,472160-472165,472167-472172,472174-472457,472459-472460,472462-472464,472466-472470,472472-472483,472486-472491,472493-472494,472496-472497,472499,472501-472503,472505-472512,472514-472544,472546-472556,472558-472560,472562-472572,472574-472587,472589-472591,472593-472605,472607,472609-472731,472733-472786,472788-472843,472845-472849,472851-472859,472861-472878,472880-472903,472905,472907-472988,472990-472991,472993-473071,473073-473086,473088-473090,473093,473095-473096,473098-473106,473108-473110,473112-473185,473187-473260,473262,473268-473270,473275-473279,473281,473284-473287,473289-473295,473297-473306,473308-473330,473332-473335,473337,473339-473344,473346-473351,473353-473355,473357-473358,473361-473471,473473-473497,473499-473535,473537-473567,473569-473888,473890-474451,474454-474492,474494-474563,474565-474843,474845-474865,474867-474932,474934-475035,475037-475144,475146-475180,475182-475265,475267-475285,475287,475289-475293,475295-475296,475298-475302,475304-475631,475633-475649,475651-475748,475750-475752,475754-476107,476109-476302,476304-476413,476415-476430,476432-476700,476702-476868,476870-477147,477149-477213,477215-477263,477265-477340,477342-477635,477637-477789,477791-477825,477827-477841,477843,477846-477852,477854,477856,477858-477865,477867-477894,477896-478022,478024-478182,478184-478211,478213-478233,478235-478236,478238-478241,478243-478252,478254-478259,478261-478263,478265,478267-478269,478271-478286,478288-478342,478344-478379,478381-478412,478414-478443,478445-478636,478639-478658,478660-478821,478823-478853,478855-478922,478924-478962,478965-478974,478976-479029,479031-479049,479051-479210,479212-479214,479216-479407,479409-479415,479417-479425,479427-479559,479561-479639,479641-479676,479678-479685,479687-480030,480033-480086,480091-480093,480095-480118,480120-480139,480141,480143-480148,480150-480156,480158-480163,480165-480177,480179-480189,480191-480193,480195-480198,480200-480220,480222-480282,480284-480292,480294-480308,480310-480317,480320-480422,480424,480426-480581,480583-480656,480658-480692,480695-480702,480704,480706-480710,480712-480910,480913-480933,480935-480945,480947-480972,480974-480993,480995-481034,481036-481158,481161-481174,481176-481220,481222-481234,481236-481260,481263-481264,481266-481296,481298-481304,481306-481311,481313-481332,481334,481336-481380,481382-481441,481443-482144,482146-482180,482182-482193,482195-482232,482234-482236,482239,482241-482242,482244-482247,482250-482251,482253,482256-482261,482264-482288,482290-482364,482366,482368,482370-482554,482556,482558-482569,482572-482636,482638,482640-482696,482698-482722,482724-482732,482734-482771,482774-482957,482959-483045,483047-483105,483108,483110-483115,483117,483119-483127,483130-483134,483136-483148,483150-483158,483160-483164,483166-483178,483180-483391,483393-483400,483402-483403,483405-483418,483420-483421,483425-483436,483438-483470,483472-483502,483504-483558,483560-483599,483601-483637,483639-483644,483646-483659,483661-483670,483672-483878,483880-483910,483912-483915,483917-483940,483942,483944-483968,483970-483972,483974-483976,483978,483980-484612,484614-484657,484659-484693,484695-484718,484720-484842,484844-484847,484849-484986,484988-485019,485021-485489,485491-485544,485546-485591,485593,485595-485697,485699-485729,485731-485734,485736-485779,485781-485787,485789-485851,485853,485855-486007,486009,486011-486020,486022-486083,486085-486097,486099-486117,486120-486131,486133-486148,486150-486161,486163-486164,486166-486197,486199-486205,486208-486247,486249-486253,486256-486427,486429-486431,486433-486554,486556-486573,486575-486593,486595,486597-486609,486611-486619,486622,486625,486627-486641,486643-486645,486649-486687,486689-486721,486723-486730,486732-486746,486748-486759,486761,486763-486777,486779-486782,486784-486788,486790,486792,486794-486796,486798-487175,487178,487180-487213,487215,487217-487267,487269-487284,487286-487298,487300-487358,487360-487367,487369-487382,487384-487434,487436-487480,487482-487547,487549-487561,487563-487565,487567-487578,487580-487615,487617-487622,487624,487626,487628,487630-487635,487637-487703,487705-487777,487780-487781,487783-487800,487802-487803,487805-487820,487822-487848,487850-487902,487904-488103,488105-488133,488135-488158,488160-488163,488165-488187,488189-488216,488218-488248,488250-488278,488280,488282-488303,488305-488313,488315-488342,488344-488351,488353-488376,488378-488449,488451-488593,488595,488597-488623,488625-488700,488702-488704,488706-488710,488714,488716-488725,488727-488744,488746-488770,488772-488798,488800,488802-488807,488809,488811-488829,488831-488843,488845-488851,488853-489069,489071-489077,489079-489081,489084-489102,489104-489105,489107-489109,489111-489112,489114-489139,489141-489178,489181-489203,489205-489211,489213,489216-489329,489332-489402,489404-489417,489419-489421,489423-489643,489645-489690,489692-489703,489705-489714,489716-489747,489749-489753,489755-489803,489805-489904,489906-490372,490374-490504,490506-490604,490606-490707,490710-490733,490735-490871,490873-490984,490986-491028,491030,491032-491071,491073-491119,491121-491576,491578-491672,491674-491800,491802-491838,491840-491878,491880-492183,492185-492279,492281-492317,492319-492513,492515-492584,492586-492587,492589-492601,492603-492635,492637-492640,492642-492717,492719-492723,492725-492729,492731-492755,492757-492901,492903-492955,492957-492962,492964-492997,492999-493002,493004-493041,493043-493059,493062-493063,493065-493086,493088-493125,493127-493139,493141-493150,493152-493871,493873-494017,494019-494030,494032-494041,494043-494091,494093-494120,494122-494354,494356-494436,494438-494539,494541-494552,494554-494586,494588-494649,494651,494653-494654,494656-494657,494659-494764,494766-494768,494770-494796,494798-494799,494802,494804-494860,494862-494903,494905-494906,494908-495019,495021-495160,495162-495168,495171-495188,495190-495229,495231-495254,495256-495303,495305-495313,495315-495336,495338-495372,495374-495379,495381-495454,495457-495459,495462-495516,495518-495524,495526-495531,495533-495548,495551-495553,495555,495557-495558,495560,495562-495573,495575-495583,495585-495594,495596-495628,495630-495638,495640-495651,495653-495660,495662-495753,495755-496259,496261-496262,496264-496269,496271-496275,496277-496301,496303-496316,496318-496383,496385-496413,496415-496495,496497-496625,496627-496636,496638-496640,496642-496647,496650-496657,496659-496660,496663-496664,496666-496677,496679-496681,496683-496730,496732-496750,496752,496754-496784,496786-496832,496834-496840,496842-496990,496992-496995,496997-497340,497343-497351,497353-497403,497405-497424,497426-497438,497440-497481,497483-497497,497499-497765,497767-497769,497771-497775,497777-497778,497780,497782-497783,497785,497787-497812,497814-497871,497873-497877,497879-498573,498575-498588,498590,498592,498594-498636,498638-498669,498671-498686,498688-498689,498691-498719,498721-498964,498966-498969,498971-498973,498975-498982,498985-499035,499037-499040,499042,499044-499048,499050-499082,499084-499086,499088-499164,499167-499169,499171-499355,499357-499370,499372-499373,499375-499391,499393,499395-499425,499428,499430-499445,499447-499455,499457-499460,499462-499465,499467,499469-499489,499491-499492,499494-499531,499533-499562,499566-499627,499629-499715,499717-499732,499734-499755,499758-499763,499765-499780,499782-499795,499797-499802,499804-499844,499846,499848-499850,499852-499863,499865-499873,499875-499974,499976-499978,499980-500263,500265-500283,500285-500309,500311-501000,501002,501012-501057,501059-501095,501097-501390,501392-501410,501413-501447,501449-501454,501456,501458-501464,501466-501471,501473-501803,501805-501913,501915-501916,501918-501919,501921-501944,501946-502171,502173-502177,502181,502183-502247,502250-502252,502254-502260,502262-502267,502270,502272,502274-502575,502577-502609,502611-502619,502621-502626,502628-502654,502656-503592,503594-503603,503605-503608,503610-503636,503638-503645,503647-503705,503707-503789,503791-504024,504026-504111,504113-504506,504508-504735,504737-504863,504865-504867,504869-504914,504916-505241,505243-505254,505257-505267,505269-505354,505356-505891,505893-505971,505973-506400,506402-506404,506407-506438,506440-506516,506518-506541,506543-506966,506968-506971,506973-507095,507097-507108,507111-507454,507456,507459-507471,507473-507556,507558,507560-507581,507585-507594,507597,507599-507608,507610-507728,507730-507893,507895-507937,507940-508234,508236-508350,508352-508365,508367-508380,508383,508386-508415,508417-508648,508650-508941,508943-509146,509148-509171,509173-509175,509179-509201,509203-509207,509209-509215,509217-509222,509224-509477,509480-509627,509629-509634,509636-509641,509643-509736,509738-509931,509933-510059,510061-510075,510077-510158,510161-510896,510898-510938,510940-511388,511390-511922,511924-512287,512289-512698,512702-512813,512815-512817,512819-513359,513361-513370,513372-514702,514704-514886,514888-514902,514904-515126,515129-515141,515143-515516,515518-515534,515536-515538,515540-515648,515650-515651,515653-516070,516072-516411,516413-516448,516450,516452-517637,517639-517647,517649-517659,517661-517663,517665-517677,517679-517682,517684-517744,517746-518085,518087-518175,518177-518558,518560-518568,518571-518666,518668,518670-518699,518701-518987,518990-518992,518994-519908,519910-519932,519934-520414,520416-520842,520844-520937,520939-521362,521364-521681,521683-521704,521706-521709,521711-521714,521716-521781,521783-521792,521794-522462,522464-522527,522529-522534,522536-522566,522568-522958,522960,522962-522966,522968-522976,522978-522980,522982-522988,522992-522993,522995-523244,523246-523746,523748-524049,524051-524738,524741-524742,524744-524762,524764,524766,524768-525486,525488-525530,525532,525534,525537-525552,525554-525765,525767-525776,525778-525784,525789-525803,525805-525816,525818-525828,525830-525861,525863-525866,525868-526090,526092-526112,526114-526116,526119-526121,526123-526149,526151-526153,526155-526156,526160-526165,526167-526186,526188-526193,526196-526197,526200-526665,526667-526682,526686-526690,526693,526695-526708,526710-526713,526715-526775,526777-526802,526804-526806,526808-527048,527051-527052,527054-527181,527183-527486,527488-527492,527494-527498,527500-527508,527510-527517,527519-527536,527538-527555,527559-527802,527804-527842,527844-527847,527849-527875,527877-527940,527942-527958,527960-527971,527973-528002,528004,528006-528423,528425-529232,529234-529245,529247-529296,529298-529634,529636-529658,529660-529665,529667-529668,529670-530033,530035-530036,530038-530040,530045-530046,530050-530051,530053-530696,530698-530735 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r530432 | ritchiem | 2007-04-19 15:42:53 +0100 (Thu, 19 Apr 2007) | 8 lines
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.
AMQChannel remove comment around setPublisher - this is used by noLocal implementation.
Subscription - rename of hasFilters to filtersMessages
AMQQueue/RemoteSubscriptionImpl/SubscriptionTestHelper/SubscriptionSet - rename of hasFilters to filtersMessages
SubscriptionImpl - rename of hasFilters to filtersMessages and changes to include noLocal in that check.
TopicSessionTest - Additional testing for NoLocal to ensure.
........
r530437 | ritchiem | 2007-04-19 15:52:49 +0100 (Thu, 19 Apr 2007) | 5 lines
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.
TopicSessionTest - Additional testing for NoLocal to ensure.
Forgot to include the file in the commit.
........
r530438 | ritchiem | 2007-04-19 15:53:32 +0100 (Thu, 19 Apr 2007) | 1 line
Added Test logging to aid in diagnosing problems
........
r530441 | ritchiem | 2007-04-19 16:07:54 +0100 (Thu, 19 Apr 2007) | 5 lines
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.
ConcurrentSelectorDeliveryManager - method changes from hasFilter to filtersMessages.
Forgot to include the file in the commit.
........
r530442 | ritchiem | 2007-04-19 16:08:04 +0100 (Thu, 19 Apr 2007) | 5 lines
QPID-455 Pre-fetched messages can cause problems with client tools. Set IMMEDIATE_PREFETCH="true" for previous behaviour.
Inverted check now setting System proprety IMMEDIATE_PREFETCH="true" will cause existing messages to be immediately pre-fetched to the newly registered consumer.
Solved out standing broker issues see QPID-458 and QPID-459.
........
r530444 | ritchiem | 2007-04-19 16:10:10 +0100 (Thu, 19 Apr 2007) | 3 lines
QPID-454 - Message 'taken' notion is per message. Adjusted to be per message per queue.
Previous commit was not sufficiently tested and other bugs were causing problems that were not related to this change.
........
r530447 | ritchiem | 2007-04-19 16:15:25 +0100 (Thu, 19 Apr 2007) | 1 line
Committed test with localhost as the broker
........
r530449 | ritchiem | 2007-04-19 16:16:32 +0100 (Thu, 19 Apr 2007) | 1 line
Additional test to ensure connection closure doesn't cause problems.
........
r530683 | ritchiem | 2007-04-20 09:11:05 +0100 (Fri, 20 Apr 2007) | 2 lines
Reinstated the two consumer receive test.
Added additional test class to cover the IMMEDIATE_PREFETCHs.
........
r530685 | ritchiem | 2007-04-20 09:13:03 +0100 (Fri, 20 Apr 2007) | 1 line
Made IMMEDIATE_PREFETCH to allow access from additional test class.
........
r530686 | ritchiem | 2007-04-20 09:14:57 +0100 (Fri, 20 Apr 2007) | 1 line
Moved string literal 'Qpid_HOME' to constant QPID_HOME
........
r530734 | bhupendrab | 2007-04-20 11:42:52 +0100 (Fri, 20 Apr 2007) | 1 line
QPID-445 : md5 hashed password will be sent from management console to Qpid
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@530737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
14 files changed, 148 insertions, 87 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 2e1653e69d..97c95dae5e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -197,7 +197,6 @@ public class AMQChannel _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); - // TODO: used in clustering only I think (RG) _currentMessage.setPublisher(publisher); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 146d0566ce..14aa919356 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -70,7 +70,7 @@ public class Main private static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - + public static final String QPID_HOME = "QPID_HOME"; private static final int IPV4_ADDRESS_LENGTH = 4; private static final char IPV4_LITERAL_SEPARATOR = '.'; @@ -204,7 +204,7 @@ public class Main protected void startup() throws InitException, ConfigurationException, Exception { - final String QpidHome = System.getProperty("QPID_HOME"); + final String QpidHome = System.getProperty(QPID_HOME); final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath())); if (!configFile.exists()) @@ -213,7 +213,7 @@ public class Main if (QpidHome == null) { - error = error + "\nNote: Qpid_HOME is not set."; + error = error + "\nNote: "+QPID_HOME+" is not set."; } throw new InitException(error, null); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b2046efee3..e19038504f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -80,18 +80,17 @@ public class AMQMessage */ private boolean _immediate; - private AtomicBoolean _taken = new AtomicBoolean(false); + // private Subscription _takenBySubcription; + // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); - private Subscription _takenBySubcription; + private Set<Subscription> _rejectedBy = null; + + private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); - public boolean isTaken(AMQQueue queue) - { - return _taken.get(); - } private final int hashcode = System.identityHashCode(this); @@ -206,7 +205,7 @@ public class AMQMessage _immediate = info.isImmediate(); _transientMessageData.setMessagePublishInfo(info); - _taken = new AtomicBoolean(false); +// _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { @@ -326,7 +325,6 @@ public class AMQMessage for (AMQQueue q : _transientMessageData.getDestinationQueues()) { - _takenMap.put(q, new AtomicBoolean(false)); _messageHandle.enqueue(storeContext, _messageId, q); } @@ -459,17 +457,53 @@ public class AMQMessage return _deliveredToConsumer; } - - public boolean taken(AMQQueue queue, Subscription sub) + public boolean isTaken(AMQQueue queue) { - if (_taken.getAndSet(true)) + //return _taken.get(); + + synchronized (this) { - return true; + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + _takenMap.put(queue, taken); + } + + return taken.get(); } - else + } + + public boolean taken(AMQQueue queue, Subscription sub) + { +// if (_taken.getAndSet(true)) +// { +// return true; +// } +// else +// { +// _takenBySubcription = sub; +// return false; +// } + + synchronized (this) { - _takenBySubcription = sub; - return false; + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + } + + if (taken.getAndSet(true)) + { + return true; + } + else + { + _takenMap.put(queue, taken); + _takenBySubcriptionMap.put(queue, sub); + return false; + } } } @@ -479,8 +513,26 @@ public class AMQMessage { _log.trace("Releasing Message:" + debugIdentity()); } - _taken.set(false); - _takenBySubcription = null; + +// _taken.set(false); +// _takenBySubcription = null; + + + synchronized (this) + { + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + } + else + { + taken.set(false); + } + + _takenMap.put(queue, taken); + _takenBySubcriptionMap.put(queue, null); + } } public boolean checkToken(Object token) @@ -833,16 +885,20 @@ public class AMQMessage public String toString() { - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - _taken + " by :" + _takenBySubcription; +// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + +// _taken + " by :" + _takenBySubcription; -// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + -// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) { - return _takenBySubcription; +// return _takenBySubcription; + synchronized (this) + { + return _takenBySubcriptionMap.get(queue); + } } public void reject(Subscription subscription) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a418bb8f8a..65d5906d05 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.queue; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -237,8 +236,10 @@ public class AMQQueue implements Managable, Comparable /** * Returns messages within the given range of message Ids + * * @param fromMessageId * @param toMessageId + * * @return List of messages */ public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId) @@ -253,6 +254,7 @@ public class AMQQueue implements Managable, Comparable /** * @param messageId + * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) @@ -267,10 +269,10 @@ public class AMQQueue implements Managable, Comparable /** * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for - * moving messages (stop the async delivery) - get all the messages available in the given message - * id range - setup the other queue for moving messages (stop the async delivery) - send these - * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful, - * remove messages from this queue - remove locks from both queues and start async delivery + * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup + * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue + * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove + * locks from both queues and start async delivery * * @param fromMessageId * @param toMessageId @@ -442,7 +444,7 @@ public class AMQQueue implements Managable, Comparable Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); - if (subscription.hasFilters()) + if (subscription.filtersMessages()) { if (_deliveryMgr.hasQueuedMessages()) { @@ -641,7 +643,7 @@ public class AMQQueue implements Managable, Comparable { _totalMessagesReceived.incrementAndGet(); } - + try { _managedObject.checkForNotification(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 979f692361..1f92cee1df 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.Executor; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicInteger; @@ -372,7 +371,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { for (Subscription sub : _subscriptions.getSubscriptions()) { - if (!sub.isSuspended() && sub.hasFilters()) + if (!sub.isSuspended() && sub.filtersMessages()) { Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue(); for (AMQMessage msg : messageList) @@ -613,6 +612,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _processingThreadName = Thread.currentThread().getName(); } + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Running process Queue." + currentStatus()); + } + // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); @@ -633,11 +637,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } } + + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Done process Queue." + currentStatus()); + } + } // private void sendNextMessage(Subscription sub) // { -// if (sub.hasFilters()) +// if (sub.filtersMessages()) // { // sendNextMessage(sub, sub.getPreDeliveryQueue()); // if (sub.isAutoClose()) @@ -817,6 +827,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //are we already running? if so, don't re-run if (_processing.compareAndSet(false, true)) { + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Executing Async process."); + } executor.execute(asyncDelivery); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index e9f209839a..e6d5d0c88d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -32,7 +32,7 @@ public interface Subscription void queueDeleted(AMQQueue queue) throws AMQException; - boolean hasFilters(); + boolean filtersMessages(); boolean hasInterest(AMQMessage msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e3944954f3..3bce950ba9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -158,7 +158,7 @@ public class SubscriptionImpl implements Subscription } - if (_filters != null) + if (filtersMessages()) { _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); } @@ -346,9 +346,9 @@ public class SubscriptionImpl implements Subscription channel.queueDeleted(queue); } - public boolean hasFilters() + public boolean filtersMessages() { - return _filters != null; + return _filters != null || _noLocal; } public boolean hasInterest(AMQMessage msg) @@ -363,7 +363,10 @@ public class SubscriptionImpl implements Subscription // return false; } - if (_noLocal) + final AMQProtocolSession publisher = msg.getPublisher(); + + //todo - client id should be recoreded and this test removed but handled below + if (_noLocal && publisher != null) { // We don't want local messages so check to see if message is one we sent Object localInstance; @@ -372,8 +375,9 @@ public class SubscriptionImpl implements Subscription if ((protocolSession.getClientProperties() != null) && (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if ((msg.getPublisher().getClientProperties() != null) && - (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + + if ((publisher.getClientProperties() != null) && + (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if (localInstance == msgInstance || localInstance.equals(msgInstance)) { @@ -388,8 +392,11 @@ public class SubscriptionImpl implements Subscription } else { + localInstance = protocolSession.getClientIdentifier(); - msgInstance = msg.getPublisher().getClientIdentifier(); + //todo - client id should be recoreded and this test removed but handled here + + msgInstance = publisher.getClientIdentifier(); if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { if (_logger.isTraceEnabled()) @@ -399,7 +406,6 @@ public class SubscriptionImpl implements Subscription } return false; } - } @@ -623,7 +629,7 @@ public class SubscriptionImpl implements Subscription return _resendQueue; } - if (_filters != null) + if (filtersMessages()) { if (isAutoClose()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 26b040aae0..b500247fa4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -157,7 +157,7 @@ class SubscriptionSet implements WeightedSubscriptionManager //FIXME the queue could be full of sent messages. // Either need to clean all PDQs after sending a message // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty()) + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) { return subscription; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java index a43474559d..20f123179f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.security.access; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanOperationParameter; import org.apache.qpid.server.management.MBeanOperation; import org.apache.qpid.server.management.MBeanInvocationHandlerImpl; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; @@ -107,8 +106,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana return UserManagement.TYPE; } - public boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password) + public boolean setPassword(String username, char[] password) { try { @@ -122,10 +120,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana } } - public boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, - @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, - @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin) + public boolean setRights(String username, boolean read, boolean write, boolean admin) { if (_accessRights.get(username) == null) @@ -179,11 +174,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana return true; } - public boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password, - @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, - @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, - @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin) + public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin) { if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password)) { @@ -195,7 +186,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana return false; } - public boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username) + public boolean deleteUser(String username) { try diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java index 6381213398..ce5e9fa4a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java @@ -45,7 +45,7 @@ public interface UserManagement */ @MBeanOperation(name = "setPassword", description = "Set password for user.") boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password); + @MBeanOperationParameter(name = "password", description = "Password")char[] password); /** * set rights for users with given details @@ -76,7 +76,7 @@ public interface UserManagement */ @MBeanOperation(name = "createUser", description = "Create new user from system.") boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password, + @MBeanOperationParameter(name = "password", description = "Password")char[] password, @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java index 956db64d90..cd0a371b48 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java @@ -176,7 +176,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase } } - public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException { User user = _users.get(principal.getName()); @@ -187,13 +187,10 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase try { - - char[] passwd = convertPassword(password); - try { _userUpdate.lock(); - user.setPassword(passwd); + user.setPassword(password); try { @@ -215,7 +212,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase } } } - catch (UnsupportedEncodingException e) + catch (Exception e) { return false; } @@ -237,23 +234,14 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase return passwd; } - public boolean createPrincipal(Principal principal, String password) + public boolean createPrincipal(Principal principal, char[] password) { if (_users.get(principal.getName()) != null) { return false; } - User user; - try - { - user = new User(principal.getName(), convertPassword(password)); - } - catch (UnsupportedEncodingException e) - { - _logger.warn("Unable to encode password:" + e); - return false; - } + User user = new User(principal.getName(), password); try { @@ -598,8 +586,13 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase private void encodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException { - Base64 b64 = new Base64(); - _encodedPassword = b64.encode(new String(_password).getBytes(DEFAULT_ENCODING)); + byte[] byteArray = new byte[_password.length]; + int index = 0; + for (char c : _password) + { + byteArray[index++] = (byte)c; + } + _encodedPassword = (new Base64()).encode(byteArray); } public boolean isModified() diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java index 3f6794aaaf..90d08c963e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java @@ -151,12 +151,12 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase return passwd; } - public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException { return false; // updates denied } - public boolean createPrincipal(Principal principal, String password) + public boolean createPrincipal(Principal principal, char[] password) { return false; // updates denied } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java index 8073fcc3c6..494d8e0bf4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java @@ -65,7 +65,7 @@ public interface PrincipalDatabase * @return True if change was successful * @throws AccountNotFoundException If the given principal doesn't exist in the Database */ - boolean updatePassword(Principal principal, String password) + boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException; /** @@ -74,7 +74,7 @@ public interface PrincipalDatabase * @param password The password to set for the principal * @return True on a successful creation */ - boolean createPrincipal(Principal principal, String password); + boolean createPrincipal(Principal principal, char[] password); /** * Delete a principal diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java index b1ac0e1f00..74c330f606 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java @@ -93,12 +93,12 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase } } - public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException { return false; // updates denied } - public boolean createPrincipal(Principal principal, String password) + public boolean createPrincipal(Principal principal, char[] password) { return false; // updates denied } |
