LinkpimpClient.pl deals with the majority of the tasks needed for this system. Although feeds can be PubSub-enabled via XML-RPC, SOAP, HTTP POST, or not at all, and while for completeness we should be able to deal with all of these situations to make a Publish and Subscribe system fully spec-compliant, XML-RPC is the only system being used at the moment and the only one we will discuss here.
So, onward! Example 12-1 shows the entire program listing, but first let's step through the interesting bits to illustrate a few points.
After setting up the necessary variables and loading the modules, we get to the section of the program that deals with the current subscriptions. When the program is run for the first time, this section is irrelevant, but since we'll probably run this script every hour from cron, it soon becomes necessary. Subscriptions last for 25 hours, so we need to make sure they are renewed in time.
logger ("\n Program started. Let's go!\n"); # First we need to strim the PubSub records and remove all the old entries. # Work out the time 25 hours ago (1 hour = 3600 seconds, so 25 hours = 90000 seconds) my $oldestpossibletime = time( ) - 90000; logger ("The oldest allowable subscription cannot have been made earlier than $oldestpossibletime"); # Open the subscriber list created by pubSubListener and stick it in an array. open (PUBSUBLIST, "<$pubsublog"); my @lines = <PUBSUBLIST>; close (PUBSUBLIST); # Clear the subscriber list. unlink ($pubsublog) or die "Can't delete the data file\n"; logger ("Old Subscription list deleted"); # We need to prevent the file being empty, even if there are no subscriptions, so: open (NEWPUBSUBLIST, ">$pubsublog"); print NEWPUBSUBLIST "This holds the details of all your subscriptions , 0000001\n"; # Go through each line, splitting it back into the right variables. foreach $line (@lines) { my ($rssUrl , $time) = split (/,/, "$line"); # If the time the notification request was made ($time) is later than 25 hours ago # ($oldestpossibletime) then stick that line back into the data file. if ($time > $oldestpossibletime) { print NEWPUBSUBLIST "$line\n"; }; }; logger ("New PubSublist written"); close (NEWPUBSUBLIST);
We're storing the current subscriptions in a file called pubsub.txt. This section opens the file, loads it into an array, and then deletes it. We then rewrite the file line-by-line if, and only if, the line is younger than the oldest possible time allowed (i.e., 24-hours old).
We then open the new file and read the lines into another array, which we will work with later:
# Now, we reopen the pubsublog, and load it as a string for use later open (PUBSUB, "$pubsublog"); $/ = ''; my $content_of_pubsublog = <PUBSUB>;
Notice the $/=''; line. This changes the delimiter for the line-input operator used in the next line. By default, saying my$content_of_pubsublog=<PUBSUB>; only loads the file until the first new line. By setting the delimiter to null, we have the operator just scoop the whole file up in one swallow.
The next section then loads the subscription list from Syndic8 in the manner described in Chapter 10. We then enter a loop, loading the RSS files one-by-one, and examine them for PubSub-related elements:
# Take the next line from the array of DataURLs foreach $url (@edited_subscribed_feeds_list) { logger ("Now working with $url"); # Check the feed is not on the list of subscribed-to feeds if ($content_of_pubsublog =~ m/$url/i ) { logger ("Subscription already present, it seems. Excellent. I will get on with the next one."); #We leave the main loop and move onto the next URL } else { # Retrieve the RSS feed $retrieved_feed = get ($url); logger ("Retrieved Feed from $url"); # Examine for <cloud> if ($retrieved_feed =~ m/<cloud/) { &there_is_a_cloud } else { logger("There is no cloud element"); # Stick it through print_html, with an error trap here eval {&print_html}; logger ("The parsing choked on $url with this error\n $@ \n") if $@; } };
This section runs a series of tests. First it checks to see if we are already subscribed to the feed. If so, we move straight on to the next one. Why? Because one of the reasons for Publish and Subscribe is to lessen the load on the publisher's server. If we were to retrieve the feed every hour anyway, we would ruin this aspect of the idea.
If we are not subscribed, we retrieve the feed and run the other tests, using regular expressions to check for cloud elements. I realize the mention of regexps will have sent many of you into a swoon. They are here because they fulfill a very simple purpose (in other words: tough). Because these elements appear only when Publish and Subscribe is allowed for, if they are found, we can spin the program off into the relevant subroutine. If no cloud element is found, we just go straight-ahead and parse the feed using the subroutine &print_html, which works as we described in Chapter 8. If we find a cloud element, we spin the program off to the there_is_a_cloud subroutine
RSS 0.92's Publish and Subscribe standard requires dealing with either XML-RPC or SOAP, depending on the whim of the publisher. Our system must be able to deal with both. Here it is:
sub there_is_a_cloud { logger ("We're not subscribed, so I will attempt to subscribe to the $url"); # First we must parse the <cloud> element with $retrieved_feed. This is in a set format: # e.g <cloud domain="www.newsisfree.com" port="80" path="/RPC" registerProcedure="hpe.rssPleaseNotify" protocol="xml-rpc" /> # We'll do this with XML::Simple. my $parsed_xml = XMLin($retrieved_feed); my $cloud_domain = $parsed_xml->{channel}->{cloud}->{domain}; my $cloud_port = $parsed_xml->{channel}->{cloud}->{port}; my $cloud_path = $parsed_xml->{channel}->{cloud}->{path}; my $cloud_registerProcedure = $parsed_xml->{channel}->{cloud}->{registerProcedure}; my $cloud_protocol = $parsed_xml->{channel}->{cloud}->{protocol}; logger ("We have retrieved the PubSub data from the RSS 0.92 feed."); logger ("The cloud domain is $cloud_domain"); logger ("The port is $cloud_port"); logger ("The path is $cloud_path"); logger ("The port is $cloud_registerProcedure"); logger ("The protocol is $cloud_protocol"); # The protocol is all important. We need to differentiate between SOAP users, those who like XML-RPC, and the big men of HTTP-POST. if ($cloud_protocol eq "xml-rpc") { # Marvellous. That done, we spawn a new xml:rpc client. my $pubsub_call = Frontier::Client -> new ( url => "http://$cloud_domain: $cloud_port$cloud_path", debug => 0, use_objects => 1); # Then call the remote procedure with the rss url, as per the spec. $pubsub_call->call($cloud_registerProcedure,$pubsub_listening_procedure,$pubsub_ port,$pubsub_path,$cloud_protocol,$url); logger ("I've asked for the subscription"); } else { logger ("The protocol requested is not yet supported"); return 1; } # Now add the url and the time it was made to the pubsublog open (PUBSUBLOG, ">>$pubsublog"); my $time = time( ); print PUBSUBLOG "$url , $time\n"; close PUBSUBLOG; # That's it: return to the next one in the list. };
In this script, we have checked for the protocol attribute of the cloud element and reacted accordingly. We then parse the feed for the first time in the usual way and move onto the next URL.
The complete listing is shown in Example 12-1.
#!/usr/bin/perl -w use diagnostics; use warnings; use XML::RSS; use XML::Simple; use LWP::Simple; use Frontier::Client; use Frontier::RPC2; use File::Copy; use SOAP::Lite; use LWP::UserAgent; # User changable variables my $logging = "1"; my $logfile = "logfile.txt"; my $pubsublog = "pubsub.txt"; my $includefile = "feeds.shtml"; my $tempincludefile = "feeds.shtml.tmp"; my $syndic8_userid = "bhammersley"; my $syndic8_password = "joe90"; my $syndic8_list_id = "0"; my $syndic8_XMLRPC_path = "http://www.syndic8.com:80/xmlrpc.php"; my $pubsub_listening_procedure = "updatedFeed"; my $pubsub_port = "8889"; my $pubsub_path = "/RPC2"; my $pubsub_protocol = "xml-rpc"; my $content; my $file; my $line; our $url; our $retrieved_feed; our $feed_spec; ##################################################### logger ("\n Program started. Let's go!\n"); # First we need to strim the pubsub records and remove all the old entries. # Work out the time 25 hours ago (1 hour = 3600 seconds, so 25 hours = 90000 seconds). my $oldestpossibletime = time( ) - 90000; logger ("The oldest allowable subscription cannot have been made earlier than $oldestpossibletime"); # Open the subscriber list created by pubSubListener and stick it in an array. open (PUBSUBLIST, "<$pubsublog"); my @lines = <PUBSUBLIST>; close (PUBSUBLIST); # Clear the subscriber list. unlink ($pubsublog) or die "Can't delete the data file\n"; logger ("Old Subscription list deleted"); # We need to prevent the file being empty, even if there are no subscriptions, so: open (NEWPUBSUBLIST, ">$pubsublog"); print NEWPUBSUBLIST "This holds the details of all your subscriptions , 0000001\n"; # Go through each line, splitting it back into the right variables. foreach $line (@lines) { my ($rssUrl , $time) = split (/,/, "$line"); # If the time the notification request was made ($time) is later than 25 hours ago # ($oldestpossibletime) then stick that line back into the data file. if ($time > $oldestpossibletime) { print NEWPUBSUBLIST "$line\n"; }; }; logger ("New PubSublist written"); close (NEWPUBSUBLIST); # Now, we reopen the pubsublog, and load it as a string for use later open (PUBSUB, "$pubsublog"); $/ = ''; my $content_of_pubsublog = <PUBSUB>; # and we finally close the filehandle. close (PUBSUB); ########## # Use xmlrpc to ask for list of feeds from syndic8, and create object from result. my $syndic8_xmlrpc_call = Frontier::Client -> new ( url => $syndic8_XMLRPC_path, debug => 0, use_objects => 1 ); my $syndic8_xmlrpc_returned_subscriber_list = $syndic8_xmlrpc_call -> call('syndic8.GetSubscribed',$syndic8_userid,$syndic8_password,$syndic8_list_id) or die "Cannot retrieve Syndic8 list"; logger ("Retrieved Syndic8 subscription list"); # Place the dataurls from the subscriber list into an array. my @edited_subscribed_feeds_list = map { $_->{dataurl} } @$syndic8_xmlrpc_returned_ subscriber_list; # Take the next line from the array of DataURLs. foreach $url (@edited_subscribed_feeds_list) { logger ("Now working with $url"); # Check the feed is not on the list of subscribed-to feeds if ($content_of_pubsublog =~ m/$url/i ) { logger ("Subscription already present, it seems. Excellent. I will get on with the next one."); #We leave the main loop and move onto the next URL } else { # Retrieve the RSS feed $retrieved_feed = get ($url); logger ("Retrieved Feed from $url"); # Examine for <cloud> if ($retrieved_feed =~ m/<cloud/) { &there_is_a_cloud } else { logger("There is no cloud element"); # Stick it through print_html, with an error trap here eval {&print_html}; logger ("The parsing choked on $url with this error\n $@ \n") if $@; } }; ### Replace the include file with the temporary one, and do it fast! move ("$tempincludefile", "$includefile"); ### Clean up and exit the program. logger ("We're all done here for now. Exiting Program.\n\n"); END; ###### ## THE SUBROUTINES ###### sub there_is_a_cloud { logger ("We're not subscribed, so I will attempt to subscribe to the $url"); # First we must parse the <cloud> element with $retrieved_feed. This is in a set format: # e.g <cloud domain="www.newsisfree.com" port="80" path="/RPC" registerProcedure="hpe. rssPleaseNotify" protocol="xml-rpc" /> # We'll do this with XML::Simple. my $parsed_xml = XMLin($retrieved_feed); my $cloud_domain = $parsed_xml->{channel}->{cloud}->{domain}; my $cloud_port = $parsed_xml->{channel}->{cloud}->{port}; my $cloud_path = $parsed_xml->{channel}->{cloud}->{path}; my $cloud_registerProcedure = $parsed_xml->{channel}->{cloud}->{registerProcedure}; my $cloud_protocol = $parsed_xml->{channel}->{cloud}->{protocol}; logger ("We have retrieved the PubSub data from the RSS 0.92 feed."); logger ("The cloud domain is $cloud_domain"); logger ("The port is $cloud_port"); logger ("The path is $cloud_path"); logger ("The port is $cloud_registerProcedure"); logger ("The protocol is $cloud_protocol"); # The protocol is all important. We need to differentiate between SOAP users, those who like XML-RPC, and the big men of HTTP-POST if ($cloud_protocol eq "xml-rpc") { # Marvellous. That done, we spawn a new xml:rpc client. my $pubsub_call = Frontier::Client -> new ( url => "http://$cloud_domain:$cloud_port$cloud_path", debug => 0, use_objects => 1); # Then call the remote procedure with the rss url, as per the spec. $pubsub_call->call($cloud_registerProcedure,$pubsub_listening_procedure,$pubsub_ port,$pubsub_path,$cloud_protocol,$url); logger ("I've asked for the subscription"); } else { logger ("The protocol requested is not yet supported"); return 1; } # Now add the url, and the time it was made to the pubsublog. open (PUBSUBLOG, ">>$pubsublog"); my $time = time( ); print PUBSUBLOG "$url , $time\n"; close PUBSUBLOG; # That's it: return to the next one in the list. }; ###### ###### sub logger { if ($logging eq "1") { open( LOG, ">>$logfile" ); print LOG @_, "\n"; close LOG; return 1; } else { return 1;} } ###### ###### sub includefile { ## In order to prevent a race condition, or duplicate feeds, we can't just append directly to the include file itself ## so we create a temporary include file, and then replace the real one with the temporary one right at the end of the program. open (INCLUDEFILE, ">>$tempincludefile"); print INCLUDEFILE '<!--#include file="'.$outputfile.'" -->'."\n"."<br/>"."\n"; close INCLUDEFILE; return 1; } ####### ####### sub print_html { # Create new instance of XML::RSS. my $rss = new XML::RSS; # Parse the $url and stick it in $rss. logger ("Now trying to parse $url"); my $feed_to_parse = get ($url); $rss->parse($feed_to_parse); # Decide on name for outputfile. our $outputfile = "$rss->{'channel'}->{'title'}.html"; $outputfile =~ s/ /_/g; # Open the output file. logger ("I'm going to call the output file $outputfile"); open (OUTPUTFILE, ">$outputfile"); # Print the channel title. print OUTPUTFILE '<div class="channel_link">'."\n".'<a href="'; print OUTPUTFILE "$rss->{'channel'}->{'link'}"; print OUTPUTFILE '">'; print OUTPUTFILE "$rss->{'channel'}->{'title'}</a>\n</div>\n"; # Print channel image, checking first if it exists. if ($rss->{'image'}->{'link'}) { print OUTPUTFILE '<div class="channel_image">'."\n".'<a href="'; print OUTPUTFILE "$rss->{'image'}->{'link'}"; print OUTPUTFILE '">'."\n"; print OUTPUTFILE '<img src="'; print OUTPUTFILE "$rss->{'image'}->{'url'}"; print OUTPUTFILE '" alt="'; print OUTPUTFILE "$rss->{'image'}->{'title'}"; print OUTPUTFILE '"/>'."\n</a>\n</div>"; print OUTPUTFILE "\n"; } # Print the channel items. print OUTPUTFILE '<div class="linkentries">'."\n"."<ul>"; print OUTPUTFILE "\n"; foreach my $item (@{$rss->{'items'}}) { next unless defined($item->{'title'}) && defined($item->{'link'}); print OUTPUTFILE '<li><a href="'; print OUTPUTFILE "$item->{'link'}"; print OUTPUTFILE '">'; print OUTPUTFILE "$item->{'title'}</a></li>\n"; } print OUTPUTFILE "</ul>\n</div>\n"; # Close the OUTPUTFILE close (OUTPUTFILE); logger ("and lo $outputfile has been written."); # Add to the include-file includefile ($outputfile); }
The other half of a Publish and Subscribe system is the listener. All the listener does is sit on a port?in this case, it is defaulting to port 8888, but you can change that?and wait for an update notification. It takes that notification and retrieves the refreshed feed, parsing it and saving it to disk, where the web server can retrieve it the next time someone requests the page. The complete listing is shown in Example 12-2.
#!usr/bin/perl -w use strict; use HTTP::Daemon; use Frontier::RPC2; use HTTP::Date; use XML::RSS; use LWP::Simple; # ------USER CHANGABLE VARIABLES HERE ------- my $listeningport = "8888"; # ------------------------------------------- my $methods = {'updateFeed' => \&updateFeed}; our $host = ""; # --------------- Start the server up ------------------------ my $listen_socket = HTTP::Daemon->new( LocalPort => $listeningport, Listen => 20, Proto => 'tcp', Reuse => 1 ); die "Can't create a listening socket: $@" unless $listen_socket; while (my $connection = $listen_socket->accept) { $host = $connection->peerhost; interact($connection); $connection->close; } # ------------- The Interact subroutine, as called when a peer connects sub interact { my $sock = shift; my $req; eval { $req = $sock->get_request; }; # Check to see if the contact is both xml and to the right path. if( $req->header('Content-Type') eq 'text/xml'&& $req->url->path eq '/RPC2') { my $message_content = ($req->content); if( $main::Fork ){ my $pid = fork( ); unless( defined $pid ){ # check this response my $res = HTTP::Response->new(500,'Internal Server Error'); $sock->send_status_line( ); $sock->send_response($res); } if( $pid = = 0 ){ $sock->close; $main::Fork->( ); exit; } $main::Fork = undef; } my $conn_host = gethostbyaddr($sock->peeraddr,AF_INET) || $sock->peerhost; my $res = HTTP::Response->new(200,'OK'); $res->header( date => time2str( ), Server => 'PubSubServer', Content_Type => 'text/xml', ); $res->content($res_xml); $sock->send_response($res); # --------------------------------------------------------------------- # ---- updateFeed ----- sub updateFeed { my ($url) = @_; # Create new instance of XML::RSS my $rss = new XML::RSS; # Parse the $url and stick it in $rss my $feed_to_parse = get ($url); $rss->parse($feed_to_parse); # Decide on name for outputfile my $outputfile = "$rss->{'channel'}->{'title'}.html"; $outputfile =~ s/ /_/g; # Open the output file open (OUTPUTFILE, ">$outputfile"); # Print the channel title. print OUTPUTFILE '<div id="channel_link"><a href="'; print OUTPUTFILE "$rss->{'channel'}->{'link'}"; print OUTPUTFILE '">'; print OUTPUTFILE "$rss->{'channel'}->{'title'}</a></div>\n"; # Print channel image, checking first if it exists. if ($rss->{'image'}->{'link'}) { print OUTPUTFILE '<div id="channel_image"><a href="'; print OUTPUTFILE "$rss->{'image'}->{'link'}"; print OUTPUTFILE '">'; print OUTPUTFILE '<img src="'; print OUTPUTFILE "$rss->{'image'}->{'url'}"; print OUTPUTFILE '" alt="'; print OUTPUTFILE "$rss->{'image'}->{'title'}"; print OUTPUTFILE '"/></a>'; print OUTPUTFILE "\n"; } # Print the channel items. print OUTPUTFILE '<div id="linkentries">'; print OUTPUTFILE "\n"; foreach my $item (@{$rss->{'items'}}) { next unless defined($item->{'title'}) && defined($item->{'link'}); print OUTPUTFILE '<li><a href="'; print OUTPUTFILE "$item->{'link'}"; print OUTPUTFILE '">'; print OUTPUTFILE "$item->{'title'}</a><BR>\n"; } print OUTPUTFILE "</div>\n"; # If there's a textinput element... if ($rss->{'textinput'}->{'title'}) { print OUTPUTFILE '<div id="textinput">'; print OUTPUTFILE '<form method="get" action="'; print OUTPUTFILE "$rss->{'textinput'}->{'link'}"; print OUTPUTFILE '">'; print OUTPUTFILE "$rss->{'textinput'}->{'description'}<br/>/n"; print OUTPUTFILE '<input type="text" name="'; print OUTPUTFILE "$rss->{'textinput'}->{'name'}"; print OUTPUTFILE '"><br/>/n'; print OUTPUTFILE '<input type="submit" value="'; print OUTPUTFILE "$rss->{'textinput'}->{'title'}"; print OUTPUTFILE '"></form>'; print OUTPUTFILE '</div>'; } # If there's a copyright element... if ($rss->{'channel'}->{'copyright'}) { print OUTPUTFILE '<div id="copyright">'; print OUTPUTFILE "$rss->{'channel'}->{'copyright'}</div>"; } # Close the OUTPUTFILE. close (OUTPUTFILE); } # ---------------- }; };