package Dakkar::TweetArchive::Store;
use 5.024;
use Moo;
use experimental 'signatures';
use DBI;
use Types::Standard qw(Str InstanceOf);
use DateTime::Format::Strptime;
use DateTime::Format::Pg;
use JSON::MaybeXS;
use namespace::clean;
has [qw(dsn user pass)] => (
is => 'ro',
required => 1,
isa => Str,
);
has dbh => (
is => 'lazy',
isa => InstanceOf['DBI::db'],
);
sub _build_dbh($self) {
return DBI->connect(
$self->dsn, $self->user, $self->pass,
{
PrintError => 0,
AutoCommit => 1,
RaiseError => 1,
},
);
}
sub BUILD($self,@) {
$self->_ensure_schema;
}
my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
my $dt_printer = DateTime::Format::Pg->new();
my $json_printer = JSON::MaybeXS->new(
ascii => 1,
pretty => 0,
allow_nonref => 1,
allow_unknown => 1,
allow_blessed => 1,
convert_blessed => 1,
);
sub latest_tweet_id($self) {
return $self->dbh->selectall_arrayref(
q{SELECT MAX(id) FROM tweets},
)->[0][0];
}
sub store_tweet($self,$tweet) {
my $tweet_str = $json_printer->encode($tweet);
my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
$self->dbh->do(<<'SQL', {},
INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
ON CONFLICT (id) DO UPDATE SET
created_at = EXCLUDED.created_at,
data = EXCLUDED.data
SQL
$tweet->{id_str},
$dt_printer->format_datetime($created_at),
$tweet_str,
);
}
sub _store_people($self,$people) {
my @ids;
for my $person ($people->@*) {
my $person_str = $json_printer->encode($person);
push @ids,
$self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0];
INSERT INTO people(data) VALUES (?)
ON CONFLICT (people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id
SQL
}
return \@ids;
}
sub store_friends($self,$friends) {
my $ids = $self->_store_people($friends);
$self->dbh->do(<<"SQL", {}, $json_printer->encode($ids));
INSERT INTO friends(users) VALUES(?)
SQL
}
sub store_followers($self,$followers) {
my $ids = $self->_store_people($followers);
$self->dbh->do(<<"SQL", {}, $json_printer->encode($ids));
INSERT INTO followers(users) VALUES(?)
SQL
}
sub _schema_deploy($self,$next_version) {
my $method_name = "_schema_deploy_${next_version}";
if (my $method = $self->can($method_name)) {
$self->dbh->begin_work;
$self->$method();
$self->dbh->do(<<'SQL', {}, $next_version);
INSERT INTO meta(version) VALUES (?)
SQL
$self->dbh->commit;
return 1;
}
else {
return 0;
}
}
sub _ensure_schema($self) {
my $dbh = $self->dbh;
{
local $dbh->{PrintWarn} = 0;
$dbh->do(<<'SQL');
CREATE TABLE IF NOT EXISTS meta (
version INTEGER PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
SQL
}
my $current_version = $dbh->selectall_arrayref(
q{SELECT max(version) FROM meta}
)->[0][0];
$current_version //= 0;
my $next_version = $current_version + 1;
while ($self->_schema_deploy($next_version)) {
++$next_version;
}
}
sub _schema_deploy_1($self) {
$self->dbh->do(<<'SQL');
CREATE TABLE tweets (
id VARCHAR(255) PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB NOT NULL
)
SQL
}
sub _schema_deploy_2($self) {
$self->dbh->do(<<'SQL');
CREATE TABLE followers (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
)
SQL
$self->dbh->do(<<'SQL');
CREATE TABLE friends (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
)
SQL
}
sub _schema_deploy_3($self) {
$self->dbh->do(<<'SQL');
CREATE TABLE people (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL
)
SQL
$self->dbh->do(<<'SQL');
CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$
SELECT array[
(data->>'id_str'),
(data->>'url'),
(data->>'name'),
(data->>'location'),
(data->>'description'),
(data->>'screen_name'),
(data->>'profile_image_url'),
(data->>'profile_background_image_url')
];
$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT
SQL
$self->dbh->do(<<'SQL');
CREATE UNIQUE INDEX idx_people ON people (people_details(data))
SQL
my $json_parser = JSON::MaybeXS->new();
my $migrate = sub($table) {
$self->dbh->do(<<"SQL");
DECLARE migrate_crs CURSOR FOR SELECT taken_at,users FROM $table
SQL
my $fetch = $self->dbh->prepare(q{FETCH 1 FROM migrate_crs});
my $update = $self->dbh->prepare(<<"SQL");
UPDATE $table
SET users = ?
WHERE taken_at = ?
SQL
$fetch->execute;
while (my @row = $fetch->fetchrow_array) {
my $friends = $json_parser->decode($row[1]);
my $ids = $self->_store_people($friends);
$update->execute($json_printer->encode($ids),$row[0]);
$fetch->execute;
}
};
$migrate->('friends');
$migrate->('followers');
}
1;