package Dakkar::TweetArchive::Store; use 5.024; use Moo; use experimental 'signatures'; use DBI; use Types::Standard qw(Str InstanceOf); use DateTime::Format::Strptime; use DateTime::Format::Pg; use JSON::MaybeXS; use namespace::clean; has [qw(dsn user pass)] => ( is => 'ro', required => 1, isa => Str, ); has dbh => ( is => 'lazy', isa => InstanceOf['DBI::db'], ); sub _build_dbh($self) { return DBI->connect( $self->dsn, $self->user, $self->pass, { PrintError => 0, AutoCommit => 1, RaiseError => 1, }, ); } sub BUILD($self,@) { $self->_ensure_schema; } my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); my $dt_printer = DateTime::Format::Pg->new(); my $json_printer = JSON::MaybeXS->new( ascii => 1, pretty => 0, allow_nonref => 1, allow_unknown => 1, allow_blessed => 1, convert_blessed => 1, ); sub latest_tweet_id($self) { return $self->dbh->selectall_arrayref( q{SELECT MAX(id) FROM tweets}, )->[0][0]; } sub store_tweet($self,$tweet) { # yes, the source most probably decoded this from a string, we # have to serialise it again, so that PostgreSQL can parse it # *again* my $tweet_str = $json_printer->encode($tweet); my $created_at = $dt_parser->parse_datetime($tweet->{created_at}); $self->dbh->do(<<'SQL', {}, INSERT INTO tweets(id,created_at,data) VALUES(?,?,?) ON CONFLICT (id) DO UPDATE SET created_at = EXCLUDED.created_at, data = EXCLUDED.data SQL $tweet->{id_str}, $dt_printer->format_datetime($created_at), $tweet_str, ); } sub _store_people($self,$people) { my @ids; for my $person ($people->@*) { my $person_str = $json_printer->encode($person); push @ids, $self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0]; INSERT INTO people(data) VALUES (?) ON CONFLICT (people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id SQL } return \@ids; } sub store_friends($self,$friends) { my $ids = $self->_store_people($friends); $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids)); INSERT INTO friends(users) VALUES(?) SQL } sub store_followers($self,$followers) { my $ids = $self->_store_people($followers); $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids)); INSERT INTO followers(users) VALUES(?) SQL } sub _schema_deploy($self,$next_version) { my $method_name = "_schema_deploy_${next_version}"; if (my $method = $self->can($method_name)) { $self->dbh->begin_work; $self->$method(); $self->dbh->do(<<'SQL', {}, $next_version); INSERT INTO meta(version) VALUES (?) SQL $self->dbh->commit; return 1; } else { return 0; } } sub _ensure_schema($self) { my $dbh = $self->dbh; # simple-minded schema version management { # silence the 'NOTICE: relation "meta" already exists, skipping' local $dbh->{PrintWarn} = 0; $dbh->do(<<'SQL'); CREATE TABLE IF NOT EXISTS meta ( version INTEGER PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ) SQL } my $current_version = $dbh->selectall_arrayref( q{SELECT max(version) FROM meta} )->[0][0]; $current_version //= 0; my $next_version = $current_version + 1; while ($self->_schema_deploy($next_version)) { ++$next_version; } } sub _schema_deploy_1($self) { $self->dbh->do(<<'SQL'); CREATE TABLE tweets ( id VARCHAR(255) PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, data JSONB NOT NULL ) SQL } sub _schema_deploy_2($self) { $self->dbh->do(<<'SQL'); CREATE TABLE followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL $self->dbh->do(<<'SQL'); CREATE TABLE friends ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL } sub _schema_deploy_3($self) { $self->dbh->do(<<'SQL'); CREATE TABLE people ( id SERIAL PRIMARY KEY, data JSONB NOT NULL ) SQL $self->dbh->do(<<'SQL'); CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$ SELECT array[ (data->>'id_str'), (data->>'url'), (data->>'name'), (data->>'location'), (data->>'description'), (data->>'screen_name'), (data->>'profile_image_url'), (data->>'profile_background_image_url') ]; $$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT SQL $self->dbh->do(<<'SQL'); CREATE UNIQUE INDEX idx_people ON people (people_details(data)) SQL my $json_parser = JSON::MaybeXS->new(); my $migrate = sub($table) { $self->dbh->do(<<"SQL"); DECLARE migrate_crs CURSOR FOR SELECT taken_at,users FROM $table SQL my $fetch = $self->dbh->prepare(q{FETCH 1 FROM migrate_crs}); my $update = $self->dbh->prepare(<<"SQL"); UPDATE $table SET users = ? WHERE taken_at = ? SQL $fetch->execute; while (my @row = $fetch->fetchrow_array) { my $friends = $json_parser->decode($row[1]); my $ids = $self->_store_people($friends); $update->execute($json_printer->encode($ids),$row[0]); $fetch->execute; } }; $migrate->('friends'); $migrate->('followers'); } 1;