From 05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b Mon Sep 17 00:00:00 2001 From: dakkar Date: Sat, 22 Apr 2017 13:01:17 +0100 Subject: store tweets in postgres --- .gitignore | 1 - lib/Dakkar/TweetArchive.pm | 52 +++++++++++++++++ lib/Dakkar/TweetArchive/Store.pm | 118 +++++++++++++++++++++++++++++++++++++++ tweet-archive.pl | 54 +++++------------- 4 files changed, 185 insertions(+), 40 deletions(-) create mode 100644 lib/Dakkar/TweetArchive.pm create mode 100644 lib/Dakkar/TweetArchive/Store.pm diff --git a/.gitignore b/.gitignore index 61a96e4..b70bcfa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ /tweet-archive.conf -/elasticsearch-*/ diff --git a/lib/Dakkar/TweetArchive.pm b/lib/Dakkar/TweetArchive.pm new file mode 100644 index 0000000..9d914fd --- /dev/null +++ b/lib/Dakkar/TweetArchive.pm @@ -0,0 +1,52 @@ +package Dakkar::TweetArchive; +use 5.024; +use Moo; +use experimental 'signatures'; +use Net::Twitter; +use Types::Standard qw(Str InstanceOf); +use namespace::clean; + +has [qw(consumer_key consumer_secret access_token access_token_secret)] => ( + is => 'ro', + required => 1, + isa => Str, +); + +has client => ( + is => 'lazy', + isa => InstanceOf['Net::Twitter'], +); + +sub _build_client($self) { + my $nt = Net::Twitter->new( + traits => [ + qw/API::RESTv1_1 OAuth RetryOnError/, + AutoCursor => { + max_calls => 16, + force_cursor => 1, + array_accessor => 'users', + methods => [qw/friends followers/], + }, + ], + consumer_key => $self->consumer_key, + consumer_secret => $self->consumer_secret, + ); + + $nt->access_token($self->access_token); + $nt->access_token_secret($self->access_token_secret); + + return $nt; +} + +sub home_timeline($self,$since_id) { + return $self->client->home_timeline({ + include_entities => 1, + trim_user => 0, + exclude_replies => 0, + ( $since_id ? ( since_id => $since_id ) : () ), + count => 200, + }); +} + +1; + diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm new file mode 100644 index 0000000..e4a4f3b --- /dev/null +++ b/lib/Dakkar/TweetArchive/Store.pm @@ -0,0 +1,118 @@ +package Dakkar::TweetArchive::Store; +use 5.024; +use Moo; +use experimental 'signatures'; +use DBI; +use Types::Standard qw(Str InstanceOf); +use DateTime::Format::Strptime; +use DateTime::Format::Pg; +use JSON::MaybeXS; +use namespace::clean; + +use Data::Printer; + +has [qw(dsn user pass)] => ( + is => 'ro', + required => 1, + isa => Str, +); + +has dbh => ( + is => 'lazy', + isa => InstanceOf['DBI::db'], +); + +sub _build_dbh($self) { + my $dbh = DBI->connect( + $self->dsn, $self->user, $self->pass, + { + PrintError => 0, + AutoCommit => 1, + RaiseError => 1, + }, + ); + $self->_ensure_schema($dbh); + return $dbh; +} + +my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); +my $dt_printer = DateTime::Format::Pg->new(); + +my $json_printer = JSON::MaybeXS->new( + ascii => 1, + pretty => 0, + allow_nonref => 1, + allow_unknown => 1, + allow_blessed => 1, + convert_blessed => 1, +); + +sub latest_tweet_id($self) { + return $self->dbh->selectall_arrayref( + q{SELECT MAX(id) FROM tweets}, + )->[0][0]; +} + +sub store_tweet($self,$tweet) { + # yes, the source most probably decoded this from a string, we + # have to serialise it again, so that PostgreSQL can parse it + # *again* + my $tweet_str = $json_printer->encode($tweet); + my $created_at = $dt_parser->parse_datetime($tweet->{created_at}); + + $self->dbh->do(<<'SQL', {}, +INSERT INTO tweets(id,created_at,data) VALUES(?,?,?) + ON CONFLICT (id) DO UPDATE SET + created_at = EXCLUDED.created_at, + data = EXCLUDED.data +SQL + $tweet->{id_str}, + $dt_printer->format_datetime($created_at), + $tweet_str, + ); +} + +sub _schema_deploy($self,$dbh,$next_version) { + my $method_name = "_schema_deploy_${next_version}"; + if (my $method = $self->can($method_name)) { + $self->$method($dbh); + $dbh->do(<<'SQL', {}, $next_version); +INSERT INTO meta(version) VALUES (?) +SQL + return 1; + } + else { + return 0; + } +} + +sub _ensure_schema($self,$dbh) { + # simple-minded schema version management + $dbh->do(<<'SQL'); +CREATE TABLE IF NOT EXISTS meta ( + version INTEGER PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +) +SQL + my $current_version = $dbh->selectall_arrayref( + q{SELECT max(version) FROM meta} + )->[0][0]; + $current_version //= 0; + + my $next_version = $current_version + 1; + while ($self->_schema_deploy($dbh,$next_version)) { + ++$next_version; + } +} + +sub _schema_deploy_1($self,$dbh) { + $dbh->do(<<'SQL'); +CREATE TABLE tweets ( + id VARCHAR(255) PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + data JSONB NOT NULL +) +SQL +} + +1; diff --git a/tweet-archive.pl b/tweet-archive.pl index 3c6c0b0..170c6af 100644 --- a/tweet-archive.pl +++ b/tweet-archive.pl @@ -1,52 +1,28 @@ #!/usr/bin/env perl -use 5.016; +use 5.024; use strict; use warnings; use Path::Class; -use JSON; -use Net::Twitter; -use ElasticSearch; -use DateTime::Format::Strptime; -use Data::Visitor::Callback; -use Data::Printer; +use JSON::MaybeXS; +use Dakkar::TweetArchive; +use Dakkar::TweetArchive::Store; -my $conf = JSON->new->utf8->decode( +my $json_parser = JSON::MaybeXS->new( + utf8 => 1, + relaxed => 1, +); + +my $conf = $json_parser->decode( file(__FILE__)->parent->file('tweet-archive.conf') ->slurp(iomode=>'<:raw') // '{}' ); -my $cb = Data::Visitor::Callback->new( - 'JSON::Boolean' => sub { say "converting a boolean"; return 0+$_ }, -); -my $dt = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); - -my $nt = Net::Twitter->new( - traits => [qw/API::RESTv1_1 OAuth/], - consumer_key => $conf->{consumer_key}, - consumer_secret => $conf->{consumer_secret}, -); - -$nt->access_token($conf->{access_token}); -$nt->access_token_secret($conf->{access_token_secret}); +my $client = Dakkar::TweetArchive->new($conf); +my $store = Dakkar::TweetArchive::Store->new($conf); -my $es = ElasticSearch->new(); +my $latest_id = $store->latest_tweet_id; -my $tl = $nt->home_timeline({ - include_entities => 1, - trim_user => 1, - exclude_replies => 0, -}); - -for my $tweet (@$tl) { - $cb->visit($tweet); - say p $tweet; - my $result = $es->index( - index => 'tweet-archive', - type => 'tweet', - id => $tweet->{id_str}, - timestamp => $dt->parse_datetime($tweet->{created_at})->iso8601, - data => $tweet, - ); - say p $result; +for my $tweet ($client->home_timeline($latest_id)->@*) { + $store->store_tweet($tweet); } -- cgit v1.2.3