diff options
Diffstat (limited to 'cpp/bindings/qpid/examples/perl')
-rw-r--r-- | cpp/bindings/qpid/examples/perl/README | 26 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/client.pl | 72 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/drain.pl | 136 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/hello_world.pl | 30 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/hello_xml.pl | 35 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/map_receiver.pl | 28 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/map_sender.pl | 46 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/server.pl | 53 | ||||
-rwxr-xr-x[-rw-r--r--] | cpp/bindings/qpid/examples/perl/spout.pl | 132 |
9 files changed, 337 insertions, 221 deletions
diff --git a/cpp/bindings/qpid/examples/perl/README b/cpp/bindings/qpid/examples/perl/README deleted file mode 100644 index 1e113f1fa0..0000000000 --- a/cpp/bindings/qpid/examples/perl/README +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -The examples in this directory are written against the raw Perl -binding ("cqpid"). This binding is identical to the C++ messaging (in -namespace qpid::messaging). - -It is desired that a layer will be written over this interface (called -"qpid") that provides a more Perl-specific API. When this occurs, -these examples will be changed to use the new Perl API. - diff --git a/cpp/bindings/qpid/examples/perl/client.pl b/cpp/bindings/qpid/examples/perl/client.pl index 19d9d3f14f..586beb787e 100644..100755 --- a/cpp/bindings/qpid/examples/perl/client.pl +++ b/cpp/bindings/qpid/examples/perl/client.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,47 +20,59 @@ use strict; use warnings; -use cqpid_perl; +use qpid; -my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; -my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; +my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; - -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +# creates a new connection instance +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); eval { -$connection->open(); -my $session = $connection->createSession(); + # open the connection and create a session for interacting with it + $connection->open(); -my $sender = $session->createSender("service_queue"); + my $session = $connection->create_session(); + my $sender = $session->create_sender("service_queue"); -#create temp queue & receiver... -my $responseQueue = new cqpid_perl::Address("#response-queue; {create:always, delete:always}"); -my $receiver = $session->createReceiver($responseQueue); + # create an address and receiver for incoming messages + # the queue will be created always, and will be deleted + # when the receive disconnects + my $responseQueue = new qpid::messaging::Address( + "#response-queue; {create:always, delete:always}"); + my $receiver = $session->create_receiver($responseQueue); -#Now send some messages... + # Now send some messages... -my @s = ( - "Twas brillig, and the slithy toves", - "Did gire and gymble in the wabe.", - "All mimsy were the borogroves,", - "And the mome raths outgrabe." - ); + my @s = ( + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." + ); -my $request = new cqpid_perl::Message(); -$request->setReplyTo($responseQueue); -for (my $i=0; $i<4; $i++) { - $request->setContent($s[$i]); - $sender->send($request); - my $response = $receiver->fetch(); - print $request->getContent() . " -> " . $response->getContent() . "\n"; -} + # create the message object, and set a reply-to address + # so that the server knows where to send responses + # the message object will be reused to send each line + my $request = new qpid::messaging::Message(); + $request->set_reply_to($responseQueue); + for ( my $i = 0 ; $i < 4 ; $i++ ) { + $request->set_content( $s[$i] ); + $sender->send($request); + + # wait for the response to the last line sent + # the message will be taken directly from the + # broker's queue rather than waiting for it + # to be queued locally + my $response = $receiver->fetch(); + print $request->get_content() . " -> " + . $response->get_content() . "\n"; + } -$connection->close(); + # close the connection + $connection->close(); }; if ($@) { die $@; } - - diff --git a/cpp/bindings/qpid/examples/perl/drain.pl b/cpp/bindings/qpid/examples/perl/drain.pl index 60ac0c50ed..f7a710c485 100644..100755 --- a/cpp/bindings/qpid/examples/perl/drain.pl +++ b/cpp/bindings/qpid/examples/perl/drain.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,79 +20,135 @@ use strict; use warnings; -use cqpid_perl; +use qpid; use Getopt::Long; +use Pod::Usage; -my $url = "127.0.0.1"; -my $timeout = 60; -my $forever = 0; -my $count = 1; +my $url = "127.0.0.1"; +my $timeout = 0; +my $forever = 0; +my $count = 0; my $connectionOptions = ""; -my $address = "amq.direct"; +my $address = "amq.direct"; +my $help; my $result = GetOptions( - "broker|b=s" => \ $url, - "timeout|t=i" => \ $timeout, - "forever|f" => \ $forever, - "connection-options=s" => \ $connectionOptions, - "count|c=i" => \ $count, -); - -if (! $result) { - print "Usage: perl drain.pl [OPTIONS]\n"; -} + "broker|b=s" => \$url, + "timeout|t=i" => \$timeout, + "forever|f" => \$forever, + "connection-options=s" => \$connectionOptions, + "count|c=i" => \$count, + "help|h" => \$help +) || pod2usage( -verbose => 0 ); + +pod2usage( -verbose => 1 ) if $help; -if ($#ARGV ge 0) { - $address = $ARGV[0] +if ( $#ARGV ge 0 ) { + $address = $ARGV[0]; } sub getTimeout { - return ($forever) ? $cqpid_perl::Duration::FOREVER : new cqpid_perl::Duration($timeout*1000); + + # returns either the named duration FOREVER if the + # forever cmdline argument was used, otherwise creates + # a new Duration of the specified length + return ($forever) + ? qpid::messaging::Duration::FOREVER + : new qpid::messaging::Duration( $timeout * 1000 ); } +sub printProperties { + my $h = shift(); + return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}]; +} -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); eval { + # open the connection, then create a session and receiver $connection->open(); - my $session = $connection->createSession(); - my $receiver = $session->createReceiver($address); + my $session = $connection->create_session(); + my $receiver = $session->create_receiver($address); my $timeout = getTimeout(); + my $message = new qpid::messaging::Message(); + my $i = 0; + + for ( ; ; ) { + eval { $message = $receiver->fetch($timeout); }; - my $message = new cqpid_perl::Message(); - my $i = 0; + if ($@) { + last; + } + + # check if the message was on that was redelivered + my $redelivered = + ( $message->get_redelivered ) ? "redelivered=True, " : ""; + print "Message(" + . $redelivered + . "properties=" + . printProperties( $message->get_properties() ) + . ", content='"; - while($receiver->fetch($message, $timeout)) { - print "Message(properties=" . $message->getProperties() . ",content='"; - if ($message->getContentType() eq "amqp/map") { - my $content = cqpid_perl::decodeMap($message); - map{ print "\n$_ => $content->{$_}"; } keys %{$content}; + # if the message content was a map, then we will print + # it out as a series of name => value pairs + if ( $message->get_content_type() eq "amqp/map" ) { + my $content = $message->get_content(); + map { print "\n$_ => $content->{$_}"; } keys %{$content}; } else { - print $message->getContent(); + # it's not a map, so just print the content as a string + print $message->get_content(); } print "')\n"; - - my $replyto = $message->getReplyTo(); - if ($replyto->getName()) { - print "Replying to " . $message->getReplyTo()->str() . "...\n"; - my $sender = $session->createSender($replyto); - my $response = new cqpid_perl::Message("received by the server."); + + # if the message had a reply-to address, then we'll send a + # response back letting the send know the message was processed + my $replyto = $message->get_reply_to(); + if ( $replyto->get_name() ) { + print "Replying to " . $message->get_reply_to()->str() . "...\n"; + + # create a temporary sender for the specified queue + my $sender = $session->create_sender($replyto); + my $response = + new qpid::messaging::Message("received by the server."); $sender->send($response); } + + # acknowledge all messages received on this queue so far $session->acknowledge(); - if ($count and (++$i ==$count)) { + if ( $count and ( ++$i == $count ) ) { last; } } + + # close everything to clean up $receiver->close(); $session->close(); $connection->close(); }; if ($@) { - $connection->close(); - die $@; + $connection->close(); + die $@; } +__END__ + +=head1 NAME + +drain - Drains messages from the specified address + +=head1 SYNOPSIS + + Options: + -h, --help show this message + -b VALUE, --broker VALUE url of broker to connect to + -t VALUE, --timeout VALUE timeout in seconds to wait before exiting + -f, --forever ignore timeout and wait forever + --connection-options VALUE connection options string in the form {name1:value1, name2:value2} + -c VALUE, --count VALUE number of messages to read before exiting + +=cut + diff --git a/cpp/bindings/qpid/examples/perl/hello_world.pl b/cpp/bindings/qpid/examples/perl/hello_world.pl index a96b98a002..6ec7d52f1f 100644..100755 --- a/cpp/bindings/qpid/examples/perl/hello_world.pl +++ b/cpp/bindings/qpid/examples/perl/hello_world.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -21,35 +21,35 @@ use strict; use warnings; use Data::Dumper; -use cqpid_perl; +use qpid; my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672"; my $address = ( @ARGV > 1 ) ? $ARGV[0] : "amq.topic"; my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : ""; -my $connection = new cqpid_perl::Connection($broker, $connectionOptions); +# create a connection +my $connection = new qpid::messaging::Connection( $broker, $connectionOptions ); eval { + # open the connection and create a session, and both a sender a receive $connection->open(); - my $session = $connection->createSession(); - my $receiver = $session->createReceiver($address); - my $sender = $session->createSender($address); + my $session = $connection->create_session(); - $sender->send(new cqpid_perl::Message("Hello world!")); + my $receiver = $session->create_receiver($address); + my $sender = $session->create_sender($address); - #my $duration = new cqpid_perl::Duration(1000); - #print ">>>" . $duration->getMilliseconds() . "\n"; + # send a simple message + $sender->send( new qpid::messaging::Message("Hello world!") ); - my $message = $receiver->fetch($cqpid_perl::Duration::SECOND); + # receive the message, fetching it directly from the broker + my $message = $receiver->fetch(qpid::messaging::Duration::SECOND); - #$message->setDurable(1); - #print "Durable: " . $message->getDurable() . "\n"; - #print Dumper($message->getProperties()); - - print $message->getContent() . "\n"; + # output the message content, then acknowledge it + print $message->get_content() . "\n"; $session->acknowledge(); + # close the connection $connection->close(); }; diff --git a/cpp/bindings/qpid/examples/perl/hello_xml.pl b/cpp/bindings/qpid/examples/perl/hello_xml.pl index cebf2ceee6..8d77c4b2b8 100644..100755 --- a/cpp/bindings/qpid/examples/perl/hello_xml.pl +++ b/cpp/bindings/qpid/examples/perl/hello_xml.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,7 +20,7 @@ use strict; use warnings; -use cqpid_perl; +use qpid; my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672"; my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; @@ -36,23 +36,25 @@ END my $address = <<END; xml-exchange; { -create: always, +create: always, node: { type: topic, x-declare: { type: xml } }, link: { x-bindings: [{ exchange: xml-exchange, key: weather, arguments: { xquery:" $query" } }] }} END - -my $connection = new cqpid_perl::Connection($broker, $connectionOptions); +# create a connection object +my $connection = new qpid::messaging::Connection( $broker, $connectionOptions ); eval { + # open the connection, then create from it a session + # from the session, create a receiver to handle incoming messages $connection->open(); - my $session = $connection->createSession(); + my $session = $connection->create_session(); + my $receiver = $session->create_receiver($address); - my $receiver = $session->createReceiver($address); - - my $message = new cqpid_perl::Message(); + # create a message and set its contentn + my $message = new qpid::messaging::Message(); my $content = <<END; <weather> @@ -62,14 +64,19 @@ eval { <dewpoint>35</dewpoint> </weather> END - - $message->setContent($content); - my $sender = $session->createSender('xml-exchange/weather'); + + $message->set_content($content); + + # create a sender for the xml-exchange/weater topic + # then send the message + my $sender = $session->create_sender('xml-exchange/weather'); $sender->send($message); - + + # wait for the response and then output it to the screen my $response = $receiver->fetch(); - print $response->getContent() . "\n"; + print $response->get_content() . "\n"; + # close the connection $connection->close(); }; diff --git a/cpp/bindings/qpid/examples/perl/map_receiver.pl b/cpp/bindings/qpid/examples/perl/map_receiver.pl index 2e2611e38f..a538adf380 100644..100755 --- a/cpp/bindings/qpid/examples/perl/map_receiver.pl +++ b/cpp/bindings/qpid/examples/perl/map_receiver.pl @@ -1,4 +1,4 @@ -#! /usr/bin/perl5 +#! /usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -21,25 +21,33 @@ use strict; use warnings; use Data::Dumper; -use cqpid_perl; +use qpid; -my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; -my $address = ( @ARGV > 1 ) ? $ARGV[0] : "message_queue; {create: always}"; +my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $address = ( @ARGV > 1 ) ? $ARGV[0] : "message_queue; {create: always}"; my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : ""; -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); eval { + # open the connection, then create a session from it $connection->open(); - my $session = $connection->createSession(); - my $receiver = $session->createReceiver($address); + my $session = $connection->create_session(); - my $content = cqpid_perl::decodeMap($receiver->fetch()); - #my $content = cqpid_perl::decodeList($receiver->fetch()); - + # create a receiver for the session, subscribed the the specified queue + my $receiver = $session->create_receiver($address); + # wait for a message to appear in the queue + my $message = $receiver->fetch(); + + # display the content of the message + my $content = $message->get_content(); print Dumper($content); + # acknowledge the message, removing it from the queue $session->acknowledge(); + + # close everything, cleaning up $receiver->close(); $connection->close(); }; diff --git a/cpp/bindings/qpid/examples/perl/map_sender.pl b/cpp/bindings/qpid/examples/perl/map_sender.pl index 4107cd48b9..27063ef780 100644..100755 --- a/cpp/bindings/qpid/examples/perl/map_sender.pl +++ b/cpp/bindings/qpid/examples/perl/map_sender.pl @@ -1,4 +1,4 @@ -#! /usr/bin/perl5 +#! /usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -21,29 +21,39 @@ use strict; use warnings; use Data::Dumper; -use cqpid_perl; +use qpid; -my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; -my $address = ( @ARGV > 1 ) ? $ARGV[1] : "message_queue; {create: always}"; +my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $address = ( @ARGV > 1 ) ? $ARGV[1] : "message_queue; {create: always}"; my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[2] : ""; -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +# create a new connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); eval { - $connection->open(); - - my $session = $connection->createSession(); - my $sender = $session->createSender($address); - - my $message = new cqpid_perl::Message(); - my $content = { id => 987654321, - name => "Widget", - percent => sprintf("%.2f", 0.99), - colours => [ qw (red green white) ], - }; - cqpid_perl::encode($content, $message); - $sender->send($message, 1); + # open the connection and create a session + $connection->open(); + my $session = $connection->create_session(); + + # create a sender and connect it to the supplied address string + my $sender = $session->create_sender($address); + + # create a message and set the content to be a map of values + my $message = new qpid::messaging::Message(); + my $content = { + id => 987654321, + name => "Widget", + percent => sprintf( "%.2f", 0.99 ), + colours => [qw (red green white)], + }; + $message->set_content($content); + + # send the message + $sender->send( $message, 1 ); + + # close the connection and session + $session->close(); $connection->close(); }; diff --git a/cpp/bindings/qpid/examples/perl/server.pl b/cpp/bindings/qpid/examples/perl/server.pl index b14da565b9..be43655aeb 100644..100755 --- a/cpp/bindings/qpid/examples/perl/server.pl +++ b/cpp/bindings/qpid/examples/perl/server.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,43 +20,64 @@ use strict; use warnings; -use cqpid_perl; +use qpid; -my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; -my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; +my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; +my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; - -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); eval { + + # connect to the broker and create a session $connection->open(); - my $session = $connection->createSession(); + my $session = $connection->create_session(); - my $receiver = $session->createReceiver("service_queue; {create: always}"); + # create a receiver for accepting incoming messages + my $receiver = $session->create_receiver("service_queue; {create: always}"); + # go into an infinite loop to receive messages and process them while (1) { + + # wait for the next message to be processed my $request = $receiver->fetch(); - my $address = $request->getReplyTo(); + + + # get the address for sending replies + # if no address was supplised then we can't really respond, so + # only process when one is present + my $address = $request->get_reply_to(); if ($address) { - my $sender = $session->createSender($address); - my $s = $request->getContent(); + + # a temporary sender for sending to the response queue + my $sender = $session->create_sender($address); + my $s = $request->get_content(); $s = uc($s); - my $response = new cqpid_perl::Message($s); + + # create the response message and send it + my $response = new qpid::messaging::Message($s); $sender->send($response); - print "Processed request: " . $request->getContent() . " -> " . $response->getContent() . "\n"; + print "Processed request: " + . $request->get_content() . " -> " + . $response->get_content() . "\n"; + + # acknowledge the message since it was processed $session->acknowledge(); } else { - print "Error: no reply address specified for request: " . $request->getContent() . "\n"; + print "Error: no reply address specified for request: " + . $request->get_content() . "\n"; $session->reject($request); } } -$connection->close(); + # close connections to clean up + $session->close(); + $connection->close(); }; if ($@) { die $@; } - diff --git a/cpp/bindings/qpid/examples/perl/spout.pl b/cpp/bindings/qpid/examples/perl/spout.pl index 7365e732bf..d8ac860143 100644..100755 --- a/cpp/bindings/qpid/examples/perl/spout.pl +++ b/cpp/bindings/qpid/examples/perl/spout.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,117 +20,145 @@ use strict; use warnings; -use cqpid_perl; +use qpid; use Getopt::Long; +use Pod::Usage; use Time::Local; -my $url = "127.0.0.1"; +my $url = "127.0.0.1"; my $timeout = 0; my $count = 1; my $id = ""; my $replyto = ""; my @properties; my @entries; -my $content = ""; +my $content = ""; my $connectionOptions = ""; -my $address = "amq.direct"; +my $address = "amq.direct"; +my $help; my $result = GetOptions( - "broker|b=s" => \ $url, - "timeout|t=i" => \ $timeout, - "count|c=i" => \ $count, - "id|i=s" => \ $id, - "replyto=s" => \ $replyto, - "property|p=s@" => \ @properties, - "map|m=s@" => \ @entries, - "content=s" => \ $content, - "connection-options=s" => \ $connectionOptions, -); - - -if (! $result) { - print "Usage: perl drain.pl [OPTIONS]\n"; + "broker|b=s" => \$url, + "timeout|t=i" => \$timeout, + "count|c=i" => \$count, + "id|i=s" => \$id, + "replyto=s" => \$replyto, + "property|p=s@" => \@properties, + "map|m=s@" => \@entries, + "content=s" => \$content, + "connection-options=s" => \$connectionOptions, + "help|h" => \$help +) || pod2usage( -verbose => 0 ); + +pod2usage( -verbose => 1 ) if $help; + +if ( $#ARGV ge 0 ) { + $address = $ARGV[0]; } - -if ($#ARGV ge 0) { - $address = $ARGV[0] -} - - sub setEntries { my ($content) = @_; foreach (@entries) { - my ($name, $value) = split("=", $_); + my ( $name, $value ) = split( "=", $_ ); $content->{$name} = $value; } } - sub setProperties { my ($message) = @_; foreach (@properties) { - my ($name, $value) = split("=", $_); - $message->getProperties()->{$name} = $value; + my ( $name, $value ) = split( "=", $_ ); + $message->setProperty( $name, $value ); } } -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +# create a connection object +my $connection = new qpid::messaging::Connection( $url, $connectionOptions ); eval { + # open the connection, create a session and then a sender $connection->open(); - my $session = $connection->createSession(); - my $sender = $session->createSender($address); + my $session = $connection->create_session(); + my $sender = $session->create_sender($address); - my $message = new cqpid_perl::Message(); + # create a message to be sent + my $message = new qpid::messaging::Message(); setProperties($message) if (@properties); if (@entries) { my $content = {}; setEntries($content); - cqpid_perl::encode($content, $message); + $message->set_content($content); } elsif ($content) { - $message->setContent($content); - $message->setContentType("text/plain"); + $message->set_content($content); + $message->set_content_type("text/plain"); } + # if a reply-to address was supplied, then create a receiver from the + # session and wait for a response to be sent my $receiver; if ($replyto) { - my $responseQueue = new cqpid_perl::Address($replyto); - $receiver = $session->createReceiver($responseQueue); - $message->setReplyTo($responseQueue); + my $responseQueue = new qpid::messaging::Address($replyto); + $receiver = $session->create_receiver($responseQueue); + $message->set_reply_to($responseQueue); } my $start = localtime; - my @s = split(/[:\s]/, $start); - my $s = "$s[3]$s[4]$s[5]"; - my $n = $s; - - for (my $i = 0; - ($i < $count || $count == 0) and - ($timeout == 0 || abs($n - $s) < $timeout); - $i++) { + my @s = split( /[:\s]/, $start ); + my $s = "$s[3]$s[4]$s[5]"; + my $n = $s; + + for ( + my $i = 0 ; + ( $i < $count || $count == 0 ) + and ( $timeout == 0 || abs( $n - $s ) < $timeout ) ; + $i++ + ) + { $sender->send($message); if ($receiver) { + print "Waiting for a response.\n"; my $response = $receiver->fetch(); - print "$i -> " . $response->getContent() . "\n"; + print "$i -> " . $response->get_content() . "\n"; } my $now = localtime; - my @n = split(/[:\s]/, $now); - my $n = "$n[3]$n[4]$n[5]"; + my @n = split( /[:\s]/, $now ); + my $n = "$n[3]$n[4]$n[5]"; } $session->sync(); $connection->close(); }; if ($@) { - $connection->close(); - die $@; + $connection->close(); + die $@; } +__END__ + +=head1 NAME + +spout - Send messages to the specified address + +=head1 SYNOPSIS + + Usage: spout [OPTIONS] ADDRESS + + Options: + -h, --help show this message + -b VALUE, --broker VALUE url of broker to connect to + -t VALUE, --timeout VALUE exit after the specified time + -c VALUE, --count VALUE stop after count messageshave been sent, zero disables + -i VALUE, --id VALUE use the supplied id instead of generating one + --replyto VALUE specify reply-to value + -P VALUE, --property VALUE specify message property + -M VALUE, --map VALUE specify entry for map content + --content VALUE specify textual content + --connection-options VALUE connection options string in the form {name1:value1, name2:value2} +=cut |