package Dakkar::TweetArchive::Store; use 5.024; use Moo; use experimental 'signatures'; use DBI; use Types::Standard qw(Str InstanceOf); use DateTime::Format::Strptime; use DateTime::Format::Pg; use JSON::MaybeXS; use namespace::clean; use Data::Printer; has [qw(dsn user pass)] => ( is => 'ro', required => 1, isa => Str, ); has dbh => ( is => 'lazy', isa => InstanceOf['DBI::db'], ); sub _build_dbh($self) { my $dbh = DBI->connect( $self->dsn, $self->user, $self->pass, { PrintError => 0, AutoCommit => 1, RaiseError => 1, }, ); $self->_ensure_schema($dbh); return $dbh; } my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); my $dt_printer = DateTime::Format::Pg->new(); my $json_printer = JSON::MaybeXS->new( ascii => 1, pretty => 0, allow_nonref => 1, allow_unknown => 1, allow_blessed => 1, convert_blessed => 1, ); sub latest_tweet_id($self) { return $self->dbh->selectall_arrayref( q{SELECT MAX(id) FROM tweets}, )->[0][0]; } sub store_tweet($self,$tweet) { # yes, the source most probably decoded this from a string, we # have to serialise it again, so that PostgreSQL can parse it # *again* my $tweet_str = $json_printer->encode($tweet); my $created_at = $dt_parser->parse_datetime($tweet->{created_at}); $self->dbh->do(<<'SQL', {}, INSERT INTO tweets(id,created_at,data) VALUES(?,?,?) ON CONFLICT (id) DO UPDATE SET created_at = EXCLUDED.created_at, data = EXCLUDED.data SQL $tweet->{id_str}, $dt_printer->format_datetime($created_at), $tweet_str, ); } sub store_friends($self,$friends) { my $friends_str = $json_printer->encode($friends); $self->dbh->do(<<'SQL', {}, $friends_str); INSERT INTO friends(users) VALUES(?) SQL } sub store_followers($self,$followers) { my $followers_str = $json_printer->encode($followers); $self->dbh->do(<<'SQL', {}, $followers_str); INSERT INTO followers(users) VALUES(?) SQL } sub _schema_deploy($self,$dbh,$next_version) { my $method_name = "_schema_deploy_${next_version}"; if (my $method = $self->can($method_name)) { $self->$method($dbh); $dbh->do(<<'SQL', {}, $next_version); INSERT INTO meta(version) VALUES (?) SQL return 1; } else { return 0; } } sub _ensure_schema($self,$dbh) { # simple-minded schema version management { # silence the 'NOTICE: relation "meta" already exists, skipping' local $dbh->{PrintWarn} = 0; $dbh->do(<<'SQL'); CREATE TABLE IF NOT EXISTS meta ( version INTEGER PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ) SQL } my $current_version = $dbh->selectall_arrayref( q{SELECT max(version) FROM meta} )->[0][0]; $current_version //= 0; my $next_version = $current_version + 1; while ($self->_schema_deploy($dbh,$next_version)) { ++$next_version; } } sub _schema_deploy_1($self,$dbh) { $dbh->do(<<'SQL'); CREATE TABLE tweets ( id VARCHAR(255) PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, data JSONB NOT NULL ) SQL } sub _schema_deploy_2($self,$dbh) { $dbh->do(<<'SQL'); CREATE TABLE followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE TABLE friends ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL } 1;