From 05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b Mon Sep 17 00:00:00 2001 From: dakkar Date: Sat, 22 Apr 2017 13:01:17 +0100 Subject: store tweets in postgres --- lib/Dakkar/TweetArchive.pm | 52 +++++++++++++++++ lib/Dakkar/TweetArchive/Store.pm | 118 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 lib/Dakkar/TweetArchive.pm create mode 100644 lib/Dakkar/TweetArchive/Store.pm (limited to 'lib') diff --git a/lib/Dakkar/TweetArchive.pm b/lib/Dakkar/TweetArchive.pm new file mode 100644 index 0000000..9d914fd --- /dev/null +++ b/lib/Dakkar/TweetArchive.pm @@ -0,0 +1,52 @@ +package Dakkar::TweetArchive; +use 5.024; +use Moo; +use experimental 'signatures'; +use Net::Twitter; +use Types::Standard qw(Str InstanceOf); +use namespace::clean; + +has [qw(consumer_key consumer_secret access_token access_token_secret)] => ( + is => 'ro', + required => 1, + isa => Str, +); + +has client => ( + is => 'lazy', + isa => InstanceOf['Net::Twitter'], +); + +sub _build_client($self) { + my $nt = Net::Twitter->new( + traits => [ + qw/API::RESTv1_1 OAuth RetryOnError/, + AutoCursor => { + max_calls => 16, + force_cursor => 1, + array_accessor => 'users', + methods => [qw/friends followers/], + }, + ], + consumer_key => $self->consumer_key, + consumer_secret => $self->consumer_secret, + ); + + $nt->access_token($self->access_token); + $nt->access_token_secret($self->access_token_secret); + + return $nt; +} + +sub home_timeline($self,$since_id) { + return $self->client->home_timeline({ + include_entities => 1, + trim_user => 0, + exclude_replies => 0, + ( $since_id ? ( since_id => $since_id ) : () ), + count => 200, + }); +} + +1; + diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm new file mode 100644 index 0000000..e4a4f3b --- /dev/null +++ b/lib/Dakkar/TweetArchive/Store.pm @@ -0,0 +1,118 @@ +package Dakkar::TweetArchive::Store; +use 5.024; +use Moo; +use experimental 'signatures'; +use DBI; +use Types::Standard qw(Str InstanceOf); +use DateTime::Format::Strptime; +use DateTime::Format::Pg; +use JSON::MaybeXS; +use namespace::clean; + +use Data::Printer; + +has [qw(dsn user pass)] => ( + is => 'ro', + required => 1, + isa => Str, +); + +has dbh => ( + is => 'lazy', + isa => InstanceOf['DBI::db'], +); + +sub _build_dbh($self) { + my $dbh = DBI->connect( + $self->dsn, $self->user, $self->pass, + { + PrintError => 0, + AutoCommit => 1, + RaiseError => 1, + }, + ); + $self->_ensure_schema($dbh); + return $dbh; +} + +my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); +my $dt_printer = DateTime::Format::Pg->new(); + +my $json_printer = JSON::MaybeXS->new( + ascii => 1, + pretty => 0, + allow_nonref => 1, + allow_unknown => 1, + allow_blessed => 1, + convert_blessed => 1, +); + +sub latest_tweet_id($self) { + return $self->dbh->selectall_arrayref( + q{SELECT MAX(id) FROM tweets}, + )->[0][0]; +} + +sub store_tweet($self,$tweet) { + # yes, the source most probably decoded this from a string, we + # have to serialise it again, so that PostgreSQL can parse it + # *again* + my $tweet_str = $json_printer->encode($tweet); + my $created_at = $dt_parser->parse_datetime($tweet->{created_at}); + + $self->dbh->do(<<'SQL', {}, +INSERT INTO tweets(id,created_at,data) VALUES(?,?,?) + ON CONFLICT (id) DO UPDATE SET + created_at = EXCLUDED.created_at, + data = EXCLUDED.data +SQL + $tweet->{id_str}, + $dt_printer->format_datetime($created_at), + $tweet_str, + ); +} + +sub _schema_deploy($self,$dbh,$next_version) { + my $method_name = "_schema_deploy_${next_version}"; + if (my $method = $self->can($method_name)) { + $self->$method($dbh); + $dbh->do(<<'SQL', {}, $next_version); +INSERT INTO meta(version) VALUES (?) +SQL + return 1; + } + else { + return 0; + } +} + +sub _ensure_schema($self,$dbh) { + # simple-minded schema version management + $dbh->do(<<'SQL'); +CREATE TABLE IF NOT EXISTS meta ( + version INTEGER PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +) +SQL + my $current_version = $dbh->selectall_arrayref( + q{SELECT max(version) FROM meta} + )->[0][0]; + $current_version //= 0; + + my $next_version = $current_version + 1; + while ($self->_schema_deploy($dbh,$next_version)) { + ++$next_version; + } +} + +sub _schema_deploy_1($self,$dbh) { + $dbh->do(<<'SQL'); +CREATE TABLE tweets ( + id VARCHAR(255) PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + data JSONB NOT NULL +) +SQL +} + +1; -- cgit v1.2.3