package Dakkar::TweetArchive::Store;
use 5.024;
use Moo;
use experimental 'signatures';
use DBI;
use Types::Standard qw(Str InstanceOf);
use DateTime::Format::Strptime;
use DateTime::Format::Pg;
use JSON::MaybeXS;
use namespace::clean;
use Data::Printer;
has [qw(dsn user pass)] => (
is => 'ro',
required => 1,
isa => Str,
);
has dbh => (
is => 'lazy',
isa => InstanceOf['DBI::db'],
);
sub _build_dbh($self) {
my $dbh = DBI->connect(
$self->dsn, $self->user, $self->pass,
{
PrintError => 0,
AutoCommit => 1,
RaiseError => 1,
},
);
$self->_ensure_schema($dbh);
return $dbh;
}
my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
my $dt_printer = DateTime::Format::Pg->new();
my $json_printer = JSON::MaybeXS->new(
ascii => 1,
pretty => 0,
allow_nonref => 1,
allow_unknown => 1,
allow_blessed => 1,
convert_blessed => 1,
);
sub latest_tweet_id($self) {
return $self->dbh->selectall_arrayref(
q{SELECT MAX(id) FROM tweets},
)->[0][0];
}
sub store_tweet($self,$tweet) {
my $tweet_str = $json_printer->encode($tweet);
my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
$self->dbh->do(<<'SQL', {},
INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
ON CONFLICT (id) DO UPDATE SET
created_at = EXCLUDED.created_at,
data = EXCLUDED.data
SQL
$tweet->{id_str},
$dt_printer->format_datetime($created_at),
$tweet_str,
);
}
sub _schema_deploy($self,$dbh,$next_version) {
my $method_name = "_schema_deploy_${next_version}";
if (my $method = $self->can($method_name)) {
$self->$method($dbh);
$dbh->do(<<'SQL', {}, $next_version);
INSERT INTO meta(version) VALUES (?)
SQL
return 1;
}
else {
return 0;
}
}
sub _ensure_schema($self,$dbh) {
$dbh->do(<<'SQL');
CREATE TABLE IF NOT EXISTS meta (
version INTEGER PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
SQL
my $current_version = $dbh->selectall_arrayref(
q{SELECT max(version) FROM meta}
)->[0][0];
$current_version //= 0;
my $next_version = $current_version + 1;
while ($self->_schema_deploy($dbh,$next_version)) {
++$next_version;
}
}
sub _schema_deploy_1($self,$dbh) {
$dbh->do(<<'SQL');
CREATE TABLE tweets (
id VARCHAR(255) PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB NOT NULL
)
SQL
}
1;