package Dakkar::TweetArchive::Store; use 5.024; use Moo; use experimental 'signatures'; use DBI; use Types::Standard qw(Str InstanceOf); use DateTime::Format::Strptime; use DateTime::Format::Pg; use JSON::MaybeXS; use namespace::clean; has [qw(dsn user pass)] => ( is => 'ro', required => 1, isa => Str, ); has dbh => ( is => 'lazy', isa => InstanceOf['DBI::db'], ); sub _build_dbh($self) { return DBI->connect( $self->dsn, $self->user, $self->pass, { PrintError => 0, AutoCommit => 1, RaiseError => 1, }, ); } sub BUILD($self,@) { $self->_ensure_schema; } my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); my $dt_printer = DateTime::Format::Pg->new(); my $json_printer = JSON::MaybeXS->new( ascii => 1, pretty => 0, allow_nonref => 1, allow_unknown => 1, allow_blessed => 1, convert_blessed => 1, ); sub latest_tweet_id($self) { return $self->dbh->selectall_arrayref( q{SELECT MAX(id) FROM tweets}, )->[0][0]; } sub store_tweet($self,$tweet) { # yes, the source most probably decoded this from a string, we # have to serialise it again, so that PostgreSQL can parse it # *again* my $tweet_str = $json_printer->encode($tweet); my $created_at = $dt_parser->parse_datetime($tweet->{created_at}); $self->dbh->do(<<'SQL', {}, INSERT INTO tweets(id,created_at,data) VALUES(?,?,?) ON CONFLICT (id) DO UPDATE SET created_at = EXCLUDED.created_at, data = EXCLUDED.data SQL $tweet->{id_str}, $dt_printer->format_datetime($created_at), $tweet_str, ); } sub _store_people($self,$people) { my @ids; for my $person ($people->@*) { my $person_str = $json_printer->encode($person); push @ids, $self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0]; INSERT INTO people(data) VALUES (?) ON CONFLICT (people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id SQL } return \@ids; } sub store_friends($self,$friends) { my $ids = $self->_store_people($friends); $self->dbh->do(<<"SQL", {}, $ids); INSERT INTO friends(users) VALUES(?) SQL } sub store_followers($self,$followers) { my $ids = $self->_store_people($followers); $self->dbh->do(<<"SQL", {}, $ids); INSERT INTO followers(users) VALUES(?) SQL } sub _schema_deploy($self,$next_version) { my $method_name = "_schema_deploy_${next_version}"; if (my $method = $self->can($method_name)) { $self->dbh->begin_work; $self->$method(); $self->dbh->do(<<'SQL', {}, $next_version); INSERT INTO meta(version) VALUES (?) SQL $self->dbh->commit; return 1; } else { return 0; } } sub _ensure_schema($self) { my $dbh = $self->dbh; # simple-minded schema version management { # silence the 'NOTICE: relation "meta" already exists, skipping' local $dbh->{PrintWarn} = 0; $dbh->do(<<'SQL'); CREATE TABLE IF NOT EXISTS meta ( version INTEGER PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ) SQL } my $current_version = $dbh->selectall_arrayref( q{SELECT max(version) FROM meta} )->[0][0]; $current_version //= 0; my $next_version = $current_version + 1; while ($self->_schema_deploy($next_version)) { ++$next_version; } } sub _schema_deploy_1($self) { $self->dbh->do(<<'SQL'); CREATE TABLE tweets ( id VARCHAR(255) PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, data JSONB NOT NULL ) SQL } sub _schema_deploy_2($self) { $self->dbh->do(<<'SQL'); CREATE TABLE followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL $self->dbh->do(<<'SQL'); CREATE TABLE friends ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL } sub _schema_deploy_3($self) { my $dbh = $self->dbh; $dbh->do(<<'SQL'); CREATE TABLE people ( id SERIAL PRIMARY KEY, data JSONB NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$ SELECT array[ (data->>'id_str'), (data->>'url'), (data->>'name'), (data->>'location'), (data->>'description'), (data->>'screen_name'), (data->>'profile_image_url'), (data->>'profile_background_image_url') ]; $$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT SQL $dbh->do(<<'SQL'); CREATE UNIQUE INDEX idx_people ON people (people_details(data)) SQL $dbh->do(<<'SQL'); CREATE TABLE new_followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users integer[] NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE TABLE new_friends ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users integer[] NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION migrate_people() RETURNS VOID AS $$ DECLARE f RECORD; p RECORD; ids INTEGER[]; BEGIN FOR f IN SELECT * FROM friends ORDER BY taken_at DESC LIMIT 1 LOOP RAISE NOTICE 'migrating friends at %s', f.taken_at; ids := '{}'; FOR p IN INSERT INTO people(data) SELECT v FROM jsonb_array_elements(f.users) AS d(v) on CONFLICT (people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id LOOP ids := array_append(ids,p.id); END LOOP; INSERT INTO new_friends(taken_at,users) VALUES (f.taken_at,ids); END LOOP; FOR f IN SELECT * FROM followers ORDER BY taken_at DESC LIMIT 1 LOOP RAISE NOTICE 'migrating followers at %s', f.taken_at; ids := '{}'; FOR p IN INSERT INTO people(data) SELECT v FROM jsonb_array_elements(f.users) AS d(v) ON CONFLICT (people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id LOOP ids := array_append(ids,p.id); END LOOP; INSERT INTO new_followers(taken_at,users) VALUES (f.taken_at,ids); END LOOP; END; $$ LANGUAGE PLPGSQL; SQL $dbh->selectall_arrayref(q{SELECT migrate_people()}); $dbh->do(q{DROP FUNCTION migrate_people()}); $dbh->do(q{DROP TABLE friends}); $dbh->do(q{DROP TABLE followers}); $dbh->do(q{ALTER TABLE new_friends RENAME TO friends}); $dbh->do(q{ALTER TABLE new_followers RENAME TO followers}); } sub _schema_deploy_4($self) { # let's get some sensible ordering when saying "order by id" $dbh->do(<<'SQL'); ALTER TABLE tweets ALTER COLUMN id SET DATA TYPE bigint USING id::bigint SQL # get the text of a tweet # # we don't actually get extended_tweet, probably Twitter only # gives it to "recognised" clients $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweet_text(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT CASE t ? 'extended_tweet' WHEN true THEN t->'extended_tweet'->>'text' ELSE t->>'text' END $$; SQL # get text, including RTs and quotes $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweet_text_recursive(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT tweet_text(t) || ' ' || COALESCE( tweet_text_recursive(t->'retweeted_status'), '') || ' ' || COALESCE( tweet_text_recursive(t->'quoted_status'), '') $$; SQL # map a tweet to a full-text search language configuration $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweet_language(IN t JSONB) RETURNS regconfig LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT CASE t->>'lang' WHEN 'it' THEN 'pg_catalog.italian'::regconfig WHEN 'fr' THEN 'pg_catalog.french'::regconfig WHEN 'de' THEN 'pg_catalog.german'::regconfig WHEN 'es' THEN 'pg_catalog.spanish'::regconfig WHEN 'nl' THEN 'pg_catalog.dutch'::regconfig ELSE 'pg_catalog.english'::regconfig END $$; SQL # add the full-text search data $dbh->do(<<'SQL'); ALTER TABLE tweets ADD COLUMN fts tsvector; SQL $dbh->do(<<'SQL'); UPDATE tweets SET fts = to_tsvector(tweet_language(data),tweet_text_recursive(data)); SQL $dbh->do(<<'SQL'); CREATE INDEX tweets_fts ON tweets USING GIN (fts); SQL # and make sure it stays up-to-date $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweets_fts_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ begin new.fts := to_tsvector(tweet_language(new.data),tweet_text_recursive(new.data)); return new; end $$; SQL $dbh->do(<<'SQL'); CREATE TRIGGER tweets_fts_update BEFORE INSERT OR UPDATE ON tweets FOR EACH ROW EXECUTE PROCEDURE tweets_fts_trigger(); SQL } 1;