package Dakkar::TweetArchive::Store;
use 5.024;
use Moo;
use experimental 'signatures';
use DBI;
use Types::Standard qw(Str InstanceOf);
use DateTime::Format::Strptime;
use DateTime::Format::Pg;
use JSON::MaybeXS;
use namespace::clean;
has [qw(dsn user pass)] => (
is => 'ro',
required => 1,
isa => Str,
);
has dbh => (
is => 'lazy',
isa => InstanceOf['DBI::db'],
);
sub _build_dbh($self) {
return DBI->connect(
$self->dsn, $self->user, $self->pass,
{
PrintError => 0,
AutoCommit => 1,
RaiseError => 1,
},
);
}
sub BUILD($self,@) {
$self->_ensure_schema;
}
my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
my $dt_printer = DateTime::Format::Pg->new();
my $json_printer = JSON::MaybeXS->new(
ascii => 1,
pretty => 0,
allow_nonref => 1,
allow_unknown => 1,
allow_blessed => 1,
convert_blessed => 1,
);
sub latest_tweet_id($self) {
return $self->dbh->selectall_arrayref(
q{SELECT MAX(id) FROM tweets},
)->[0][0];
}
sub store_tweet($self,$tweet) {
my $tweet_str = $json_printer->encode($tweet);
my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
$self->dbh->do(<<'SQL', {},
INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
ON CONFLICT (id) DO UPDATE SET
created_at = EXCLUDED.created_at,
data = EXCLUDED.data
SQL
$tweet->{id_str},
$dt_printer->format_datetime($created_at),
$tweet_str,
);
}
sub _store_people($self,$people) {
my @ids;
for my $person ($people->@*) {
my $person_str = $json_printer->encode($person);
push @ids,
$self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0];
INSERT INTO people(data) VALUES (?)
ON CONFLICT (people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id
SQL
}
return \@ids;
}
sub store_friends($self,$friends) {
my $ids = $self->_store_people($friends);
$self->dbh->do(<<"SQL", {}, $ids);
INSERT INTO friends(users) VALUES(?)
SQL
}
sub store_followers($self,$followers) {
my $ids = $self->_store_people($followers);
$self->dbh->do(<<"SQL", {}, $ids);
INSERT INTO followers(users) VALUES(?)
SQL
}
sub _schema_deploy($self,$next_version) {
my $method_name = "_schema_deploy_${next_version}";
if (my $method = $self->can($method_name)) {
$self->dbh->begin_work;
$self->$method();
$self->dbh->do(<<'SQL', {}, $next_version);
INSERT INTO meta(version) VALUES (?)
SQL
$self->dbh->commit;
return 1;
}
else {
return 0;
}
}
sub _ensure_schema($self) {
my $dbh = $self->dbh;
{
local $dbh->{PrintWarn} = 0;
$dbh->do(<<'SQL');
CREATE TABLE IF NOT EXISTS meta (
version INTEGER PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
SQL
}
my $current_version = $dbh->selectall_arrayref(
q{SELECT max(version) FROM meta}
)->[0][0];
$current_version //= 0;
my $next_version = $current_version + 1;
while ($self->_schema_deploy($next_version)) {
++$next_version;
}
}
sub _schema_deploy_1($self) {
$self->dbh->do(<<'SQL');
CREATE TABLE tweets (
id VARCHAR(255) PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB NOT NULL
)
SQL
}
sub _schema_deploy_2($self) {
$self->dbh->do(<<'SQL');
CREATE TABLE followers (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
)
SQL
$self->dbh->do(<<'SQL');
CREATE TABLE friends (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
)
SQL
}
sub _schema_deploy_3($self) {
my $dbh = $self->dbh;
$dbh->do(<<'SQL');
CREATE TABLE people (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$
SELECT array[
(data->>'id_str'),
(data->>'url'),
(data->>'name'),
(data->>'location'),
(data->>'description'),
(data->>'screen_name'),
(data->>'profile_image_url'),
(data->>'profile_background_image_url')
];
$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT
SQL
$dbh->do(<<'SQL');
CREATE UNIQUE INDEX idx_people ON people (people_details(data))
SQL
$dbh->do(<<'SQL');
CREATE TABLE new_followers (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users integer[] NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE TABLE new_friends (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users integer[] NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION migrate_people() RETURNS VOID AS $$
DECLARE
f RECORD;
p RECORD;
ids INTEGER[];
BEGIN
FOR f IN SELECT * FROM friends ORDER BY taken_at DESC LIMIT 1 LOOP
RAISE NOTICE 'migrating friends at %s', f.taken_at;
ids := '{}';
FOR p IN INSERT INTO people(data)
SELECT v FROM jsonb_array_elements(f.users) AS d(v)
on CONFLICT (people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id LOOP
ids := array_append(ids,p.id);
END LOOP;
INSERT INTO new_friends(taken_at,users) VALUES (f.taken_at,ids);
END LOOP;
FOR f IN SELECT * FROM followers ORDER BY taken_at DESC LIMIT 1 LOOP
RAISE NOTICE 'migrating followers at %s', f.taken_at;
ids := '{}';
FOR p IN INSERT INTO people(data)
SELECT v FROM jsonb_array_elements(f.users) AS d(v)
ON CONFLICT (people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id LOOP
ids := array_append(ids,p.id);
END LOOP;
INSERT INTO new_followers(taken_at,users) VALUES (f.taken_at,ids);
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
SQL
$dbh->selectall_arrayref(q{SELECT migrate_people()});
$dbh->do(q{DROP FUNCTION migrate_people()});
$dbh->do(q{DROP TABLE friends});
$dbh->do(q{DROP TABLE followers});
$dbh->do(q{ALTER TABLE new_friends RENAME TO friends});
$dbh->do(q{ALTER TABLE new_followers RENAME TO followers});
}
1;