diff options
author | dakkar <dakkar@thenautilus.net> | 2017-04-22 13:01:17 +0100 |
---|---|---|
committer | dakkar <dakkar@thenautilus.net> | 2017-04-22 13:49:17 +0100 |
commit | 05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b (patch) | |
tree | 5e2c342323686f7050b8a37a5638f2378521e452 /lib/Dakkar/TweetArchive/Store.pm | |
parent | bump API version (diff) | |
download | tweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.tar.gz tweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.tar.bz2 tweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.zip |
store tweets in postgres
Diffstat (limited to 'lib/Dakkar/TweetArchive/Store.pm')
-rw-r--r-- | lib/Dakkar/TweetArchive/Store.pm | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm new file mode 100644 index 0000000..e4a4f3b --- /dev/null +++ b/lib/Dakkar/TweetArchive/Store.pm @@ -0,0 +1,118 @@ +package Dakkar::TweetArchive::Store; +use 5.024; +use Moo; +use experimental 'signatures'; +use DBI; +use Types::Standard qw(Str InstanceOf); +use DateTime::Format::Strptime; +use DateTime::Format::Pg; +use JSON::MaybeXS; +use namespace::clean; + +use Data::Printer; + +has [qw(dsn user pass)] => ( + is => 'ro', + required => 1, + isa => Str, +); + +has dbh => ( + is => 'lazy', + isa => InstanceOf['DBI::db'], +); + +sub _build_dbh($self) { + my $dbh = DBI->connect( + $self->dsn, $self->user, $self->pass, + { + PrintError => 0, + AutoCommit => 1, + RaiseError => 1, + }, + ); + $self->_ensure_schema($dbh); + return $dbh; +} + +my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); +my $dt_printer = DateTime::Format::Pg->new(); + +my $json_printer = JSON::MaybeXS->new( + ascii => 1, + pretty => 0, + allow_nonref => 1, + allow_unknown => 1, + allow_blessed => 1, + convert_blessed => 1, +); + +sub latest_tweet_id($self) { + return $self->dbh->selectall_arrayref( + q{SELECT MAX(id) FROM tweets}, + )->[0][0]; +} + +sub store_tweet($self,$tweet) { + # yes, the source most probably decoded this from a string, we + # have to serialise it again, so that PostgreSQL can parse it + # *again* + my $tweet_str = $json_printer->encode($tweet); + my $created_at = $dt_parser->parse_datetime($tweet->{created_at}); + + $self->dbh->do(<<'SQL', {}, +INSERT INTO tweets(id,created_at,data) VALUES(?,?,?) + ON CONFLICT (id) DO UPDATE SET + created_at = EXCLUDED.created_at, + data = EXCLUDED.data +SQL + $tweet->{id_str}, + $dt_printer->format_datetime($created_at), + $tweet_str, + ); +} + +sub _schema_deploy($self,$dbh,$next_version) { + my $method_name = "_schema_deploy_${next_version}"; + if (my $method = $self->can($method_name)) { + $self->$method($dbh); + $dbh->do(<<'SQL', {}, $next_version); +INSERT INTO meta(version) VALUES (?) +SQL + return 1; + } + else { + return 0; + } +} + +sub _ensure_schema($self,$dbh) { + # simple-minded schema version management + $dbh->do(<<'SQL'); +CREATE TABLE IF NOT EXISTS meta ( + version INTEGER PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +) +SQL + my $current_version = $dbh->selectall_arrayref( + q{SELECT max(version) FROM meta} + )->[0][0]; + $current_version //= 0; + + my $next_version = $current_version + 1; + while ($self->_schema_deploy($dbh,$next_version)) { + ++$next_version; + } +} + +sub _schema_deploy_1($self,$dbh) { + $dbh->do(<<'SQL'); +CREATE TABLE tweets ( + id VARCHAR(255) PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + data JSONB NOT NULL +) +SQL +} + +1; |