From f4c31024fc3dd0210ce8821e8b63d9f411828c3f Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 9 Dec 2012 13:45:17 +0000 Subject: de-duping --- lib/Feed.pm | 24 ++++++++- lib/Feed/DeDupe.pm | 141 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/Feed/FixDateTime.pm | 4 ++ lib/Feed/Printer.pm | 5 ++ 4 files changed, 172 insertions(+), 2 deletions(-) create mode 100644 lib/Feed/DeDupe.pm (limited to 'lib') diff --git a/lib/Feed.pm b/lib/Feed.pm index d1bb7df..fbcdf10 100644 --- a/lib/Feed.pm +++ b/lib/Feed.pm @@ -5,6 +5,14 @@ use 5.016; use MooseX::Types::URI 'Uri'; with 'MooseX::Traits'; use XML::Feed; +use Log::Log4perl; + +sub log { + my ($self) = @_; + + my $caller = caller(); + return Log::Log4perl->get_logger($caller) +} has '+_trait_namespace' => ( default => __PACKAGE__ ); @@ -19,6 +27,7 @@ has feed => ( is => 'ro', isa => 'XML::Feed', lazy_build => 1, + builder => 'get_feed', ); has _entries => ( @@ -26,6 +35,7 @@ has _entries => ( isa => 'ArrayRef[XML::Feed::Entry]', traits => [ 'Array' ], lazy_build => 1, + builder => 'extract_entries', handles => { entries => 'elements', count_entries => 'count', @@ -35,28 +45,38 @@ has _entries => ( sub process { my ($self) = @_; + $self->log->trace('process - begin'); + for my $entry ($self->entries) { $self->process_entry($entry); } + $self->log->trace('process - end'); + return; } -sub _build_feed { +sub get_feed { my ($self) = @_; + $self->log->trace('get_feed'); + return XML::Feed->parse($self->uri) or die XML::Feed->errstr; } -sub _build__entries { +sub extract_entries { my ($self) = @_; + $self->log->trace('extract_entries'); return [$self->feed->entries]; } sub process_entry { + my ($self) = @_; + + $self->log->trace('process_entry - stub'); } __PACKAGE__->meta->make_immutable; diff --git a/lib/Feed/DeDupe.pm b/lib/Feed/DeDupe.pm new file mode 100644 index 0000000..f617154 --- /dev/null +++ b/lib/Feed/DeDupe.pm @@ -0,0 +1,141 @@ +package Feed::DeDupe; +use Moose::Role; +use 5.016; +use namespace::autoclean -also => ['_maybe_build_schema']; +use DBI; +use Try::Tiny; +use Encode; +use Digest::SHA1 'sha1_base64'; + +requires 'get_feed','process','process_entry'; + +has 'dupe_dsn' => ( + is => 'ro', + isa => 'Str', + required => 1, +); + +has dbh => ( + is => 'ro', + lazy_build => 1, +); + +has ['_find_sth','_insert_sth'] => ( + is => 'ro', + lazy_build => 1, +); + +sub _build_dbh { + my ($self) = @_; + + my $dbh = DBI->connect($self->dupe_dsn,undef,undef,{ + RaiseError => 1, + PrintError => 0, + AutoCommit => 0, + }); + + _maybe_build_schema($dbh); + + return $dbh; +} + +sub _maybe_build_schema { + my ($dbh) = @_; + + try { + $dbh->selectrow_array(q{SELECT * FROM seen_items LIMIT 1}); + $dbh->rollback; + } catch { + $dbh->do(<<'SQL'); +CREATE TABLE seen_items ( + id VARCHAR(255) PRIMARY KEY +) +SQL +$dbh->commit; + }; + + return; +} + +sub _build__find_sth { + my ($self) = @_; + + return $self->dbh->prepare(<<'SQL'); +SELECT COUNT(*) +FROM seen_items +WHERE id=? +SQL +} + +sub _build__insert_sth { + my ($self) = @_; + + return $self->dbh->prepare(<<'SQL'); +INSERT INTO seen_items(id) +VALUES (?) +SQL +} + +after process => sub { + my ($self) = @_; + + $self->log->trace('after process'); + + $self->dbh->commit; +}; + +around process_entry => sub { + my ($orig,$self,$entry) = @_; + + $self->log->trace('around process_entry - begin'); + + return if $self->seen_already($entry); + + $self->log->trace('around process_entry - call original'); + + $self->$orig($entry); + + $self->mark_seen($entry); + + $self->log->trace('around process_entry - end'); + + return; +}; + +sub seen_already { + my ($self,$e) = @_; + + $self->log->trace('seen_already - begin'); + + my $id = $self->_entry_id($e); + + $self->_find_sth->execute($id); + my ($count) = $self->_find_sth->fetchrow_array; + + $self->log->trace("seen_already - end ($count)"); + + return $count; +} + +sub mark_seen { + my ($self,$e) = @_; + + $self->log->trace('mark_seen - begin'); + + my $id = $self->_entry_id($e); + + $self->_insert_sth->execute($id); + + $self->log->trace('mark_seen - end'); +} + +sub _entry_id { + my ($self,$e) = @_; + + my $body = $e->content->body; + my $content_digest = sha1_base64(encode('utf-8',$body)); + my $id = join '-',$e->id,$e->modified->iso8601,$content_digest; + return encode('utf-8',$id); +} + +1; diff --git a/lib/Feed/FixDateTime.pm b/lib/Feed/FixDateTime.pm index f9ebe73..6034931 100644 --- a/lib/Feed/FixDateTime.pm +++ b/lib/Feed/FixDateTime.pm @@ -9,6 +9,8 @@ requires 'process_entry'; before process_entry => sub { my ($self,$entry) = @_; + $self->log->trace('before process_entry - begin'); + for my $f ('issued','modified') { my $date = $entry->$f; if (!$date) { @@ -20,6 +22,8 @@ before process_entry => sub { $entry->$f($date); } + + $self->log->trace('before process_entry - end'); }; 1; diff --git a/lib/Feed/Printer.pm b/lib/Feed/Printer.pm index 5743883..7fd2f65 100644 --- a/lib/Feed/Printer.pm +++ b/lib/Feed/Printer.pm @@ -14,10 +14,15 @@ before process => sub { sub process_entry { my ($self,$entry) = @_; + $self->log->trace('process_entry - begin'); + for my $f (qw(id title link issued modified)) { say " $f:",$entry->$f//''; } + say $entry->content->body; say ''; + + $self->log->trace('process_entry - end'); } 1; -- cgit v1.2.3