package Dakkar::TweetArchive::Store; use v5.36; use Moo; use DBI; use Types::Standard qw(Str InstanceOf); use DateTime::Format::Strptime; use DateTime::Format::Pg; use JSON::MaybeXS; use namespace::clean; has [qw(dsn user pass)] => ( is => 'ro', required => 1, isa => Str, ); has dbh => ( is => 'lazy', isa => InstanceOf['DBI::db'], ); sub _build_dbh($self) { return DBI->connect( $self->dsn, $self->user, $self->pass, { PrintError => 0, AutoCommit => 1, RaiseError => 1, }, ); } sub BUILD($self,@) { $self->_ensure_schema; } my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); my $dt_parser_iso = DateTime::Format::Strptime->new(pattern => '%Y-%m-%dT%H:%M:%S.%3NZ', time_zone=>'UTC'); my $dt_printer = DateTime::Format::Pg->new(); my $json_printer = JSON::MaybeXS->new( ascii => 1, pretty => 0, allow_nonref => 1, allow_unknown => 1, allow_blessed => 1, convert_blessed => 1, ); # tweets sub latest_tweet_id($self) { return $self->dbh->selectall_arrayref( q{SELECT MAX(id) FROM tweets}, )->[0][0]; } sub store_tweet($self,$tweet) { # yes, the source most probably decoded this from a string, we # have to serialise it again, so that PostgreSQL can parse it # *again* my $tweet_str = $json_printer->encode($tweet); my $created_at = $dt_parser->parse_datetime($tweet->{created_at}); $self->dbh->do(<<'SQL', {}, INSERT INTO tweets(id,created_at,data) VALUES(?,?,?) ON CONFLICT (id) DO UPDATE SET created_at = EXCLUDED.created_at, data = EXCLUDED.data SQL $tweet->{id_str}, $dt_printer->format_datetime($created_at), $tweet_str, ); } sub _store_twitter_people($self,$people) { my @ids; for my $person ($people->@*) { my $person_str = $json_printer->encode($person); push @ids, $self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0]; INSERT INTO twitter_people(data) VALUES (?) ON CONFLICT (twitter_people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id SQL } return \@ids; } sub _array_num_equal($a1,$a2) { my ($a1_str, $a2_str) = map { join "\0", sort { $a <=> $b } $_->@* } $a1, $a2; return $a1_str eq $a2_str; } sub _store_people_and_ids($self,$store_method,$ids_table,$people) { my $ids = $self->$store_method($people); my ($last_ids) = $self->dbh->selectrow_array(<<"SQL"); SELECT users FROM ${ids_table} ORDER BY taken_at DESC LIMIT 1 SQL return if $last_ids && _array_num_equal($ids, $last_ids); $self->dbh->do(<<"SQL", {}, $ids); INSERT INTO ${ids_table}(users) VALUES(?) SQL } sub store_twitter_friends($self,$friends) { $self->_store_people_and_ids('_store_twitter_people','twitter_friends',$friends); } sub store_twitter_followers($self,$followers) { $self->_store_people_and_ids('_store_twitter_people','twitter_followers',$followers); } # misskey notes sub latest_note_id($self) { return $self->dbh->selectall_arrayref( q{SELECT MAX(id) FROM notes}, )->[0][0]; } sub store_note($self,$note) { # yes, the source most probably decoded this from a string, we # have to serialise it again, so that PostgreSQL can parse it # *again* my $note_str = $json_printer->encode($note); my $created_at = $dt_parser_iso->parse_datetime($note->{createdAt}); $self->dbh->do(<<'SQL', {}, INSERT INTO notes(id,created_at,data) VALUES(?,?,?) ON CONFLICT (id) DO UPDATE SET created_at = EXCLUDED.created_at, data = EXCLUDED.data SQL $note->{id}, $dt_printer->format_datetime($created_at), $note_str, ); } sub _store_misskey_people($self,$people) { my @ids; for my $person ($people->@*) { my $person_str = $json_printer->encode($person); push @ids, $self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0]; INSERT INTO misskey_people(data) VALUES (?) ON CONFLICT (misskey_people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id SQL } return \@ids; } sub store_misskey_following($self,$following) { $self->_store_people_and_ids('_store_misskey_people','misskey_following',$following); } sub store_misskey_followers($self,$followers) { $self->_store_people_and_ids('_store_misskey_people','misskey_followers',$followers); } # schema sub _schema_deploy($self,$next_version) { my $method_name = "_schema_deploy_${next_version}"; if (my $method = $self->can($method_name)) { $self->dbh->begin_work; $self->$method(); $self->dbh->do(<<'SQL', {}, $next_version); INSERT INTO meta(version) VALUES (?) SQL $self->dbh->commit; return 1; } else { return 0; } } sub _ensure_schema($self) { my $dbh = $self->dbh; # simple-minded schema version management { # silence the 'NOTICE: relation "meta" already exists, skipping' local $dbh->{PrintWarn} = 0; $dbh->do(<<'SQL'); CREATE TABLE IF NOT EXISTS meta ( version INTEGER PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ) SQL } my $current_version = $dbh->selectall_arrayref( q{SELECT max(version) FROM meta} )->[0][0]; $current_version //= 0; my $next_version = $current_version + 1; while ($self->_schema_deploy($next_version)) { ++$next_version; } } sub _schema_deploy_1($self) { $self->dbh->do(<<'SQL'); CREATE TABLE tweets ( id VARCHAR(255) PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, data JSONB NOT NULL ) SQL } sub _schema_deploy_2($self) { $self->dbh->do(<<'SQL'); CREATE TABLE followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL $self->dbh->do(<<'SQL'); CREATE TABLE friends ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL } sub _schema_deploy_3($self) { my $dbh = $self->dbh; $dbh->do(<<'SQL'); CREATE TABLE people ( id SERIAL PRIMARY KEY, data JSONB NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$ SELECT array[ (data->>'id_str'), (data->>'url'), (data->>'name'), (data->>'location'), (data->>'description'), (data->>'screen_name'), (data->>'profile_image_url'), (data->>'profile_background_image_url') ]; $$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT SQL $dbh->do(<<'SQL'); CREATE UNIQUE INDEX idx_people ON people (people_details(data)) SQL $dbh->do(<<'SQL'); CREATE TABLE new_followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users integer[] NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE TABLE new_friends ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users integer[] NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION migrate_people() RETURNS VOID AS $$ DECLARE f RECORD; p RECORD; ids INTEGER[]; BEGIN FOR f IN SELECT * FROM friends ORDER BY taken_at DESC LIMIT 1 LOOP RAISE NOTICE 'migrating friends at %s', f.taken_at; ids := '{}'; FOR p IN INSERT INTO people(data) SELECT v FROM jsonb_array_elements(f.users) AS d(v) on CONFLICT (people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id LOOP ids := array_append(ids,p.id); END LOOP; INSERT INTO new_friends(taken_at,users) VALUES (f.taken_at,ids); END LOOP; FOR f IN SELECT * FROM followers ORDER BY taken_at DESC LIMIT 1 LOOP RAISE NOTICE 'migrating followers at %s', f.taken_at; ids := '{}'; FOR p IN INSERT INTO people(data) SELECT v FROM jsonb_array_elements(f.users) AS d(v) ON CONFLICT (people_details(data)) DO UPDATE SET data=EXCLUDED.data RETURNING id LOOP ids := array_append(ids,p.id); END LOOP; INSERT INTO new_followers(taken_at,users) VALUES (f.taken_at,ids); END LOOP; END; $$ LANGUAGE PLPGSQL; SQL $dbh->selectall_arrayref(q{SELECT migrate_people()}); $dbh->do(q{DROP FUNCTION migrate_people()}); $dbh->do(q{DROP TABLE friends}); $dbh->do(q{DROP TABLE followers}); $dbh->do(q{ALTER TABLE new_friends RENAME TO friends}); $dbh->do(q{ALTER TABLE new_followers RENAME TO followers}); } sub _schema_deploy_4($self) { my $dbh = $self->dbh; # let's get some sensible ordering when saying "order by id" $dbh->do(<<'SQL'); ALTER TABLE tweets ALTER COLUMN id SET DATA TYPE bigint USING id::bigint SQL # get the text of a tweet # # we don't actually get extended_tweet, probably Twitter only # gives it to "recognised" clients $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweet_text(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT CASE t ? 'extended_tweet' WHEN true THEN t->'extended_tweet'->>'text' ELSE t->>'text' END $$; SQL # get text, including RTs and quotes $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweet_text_recursive(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT tweet_text(t) || ' ' || COALESCE( tweet_text_recursive(t->'retweeted_status'), '') || ' ' || COALESCE( tweet_text_recursive(t->'quoted_status'), '') $$; SQL # map a tweet to a full-text search language configuration $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweet_language(IN t JSONB) RETURNS regconfig LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT CASE t->>'lang' WHEN 'it' THEN 'pg_catalog.italian'::regconfig WHEN 'fr' THEN 'pg_catalog.french'::regconfig WHEN 'de' THEN 'pg_catalog.german'::regconfig WHEN 'es' THEN 'pg_catalog.spanish'::regconfig WHEN 'nl' THEN 'pg_catalog.dutch'::regconfig ELSE 'pg_catalog.english'::regconfig END $$; SQL # add the full-text search data $dbh->do(<<'SQL'); ALTER TABLE tweets ADD COLUMN fts tsvector; SQL $dbh->do(<<'SQL'); UPDATE tweets SET fts = to_tsvector(tweet_language(data),tweet_text_recursive(data)); SQL $dbh->do(<<'SQL'); CREATE INDEX tweets_fts ON tweets USING GIN (fts); SQL # and make sure it stays up-to-date $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweets_fts_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ begin new.fts := to_tsvector(tweet_language(new.data),tweet_text_recursive(new.data)); return new; end $$; SQL $dbh->do(<<'SQL'); CREATE TRIGGER tweets_fts_update BEFORE INSERT OR UPDATE ON tweets FOR EACH ROW EXECUTE PROCEDURE tweets_fts_trigger(); SQL } sub _schema_deploy_5($self) { my $dbh = $self->dbh; # get *more* text of a tweet $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION tweet_text(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT CASE t ? 'extended_tweet' WHEN true THEN COALESCE(t->'extended_tweet'->>'text',t->'extended_tweet'->>'full_text') ELSE COALESCE(t->>'text',t->>'full_text') END $$; SQL } sub _schema_deploy_6($self) { my $dbh = $self->dbh; $dbh->do('ALTER TABLE people RENAME TO twitter_people'); $dbh->do('ALTER TABLE friends RENAME TO twitter_friends'); $dbh->do('ALTER TABLE followers RENAME TO twitter_followers'); $dbh->do('ALTER INDEX idx_people RENAME TO idx_twitter_people'); $dbh->do('ALTER FUNCTION people_details RENAME TO twitter_people_details'); $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION note_text(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT t->>'text' $$; SQL $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION note_text_recursive(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT note_text(t) || ' ' || COALESCE( note_text_recursive(t->'renote'), '') $$; SQL # misskey doesn't currently store language, let's pretend it's all # English $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION note_language(IN t JSONB) RETURNS regconfig LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT 'pg_catalog.english'::regconfig $$; SQL $dbh->do(<<'SQL'); CREATE TABLE notes ( id VARCHAR(255) PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, data JSONB NOT NULL, fts tsvector ); SQL $dbh->do(<<'SQL'); CREATE INDEX notes_fts ON notes USING GIN (fts); SQL $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION notes_fts_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ begin new.fts := to_tsvector(note_language(new.data),note_text_recursive(new.data)); return new; end $$; SQL $dbh->do(<<'SQL'); CREATE TRIGGER notes_fts_update BEFORE INSERT OR UPDATE ON notes FOR EACH ROW EXECUTE PROCEDURE notes_fts_trigger(); SQL $dbh->do(<<'SQL'); CREATE TABLE misskey_people ( id SERIAL PRIMARY KEY, data JSONB NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE FUNCTION misskey_people_details(data jsonb) RETURNS text[] AS $$ SELECT array[ (data->>'id'), (data->>'username'), COALESCE(data->>'host', ''), COALESCE(data->>'location', ''), COALESCE(data->>'description', ''), COALESCE(data->>'name', ''), COALESCE(data->>'avatarBlurhash', ''), COALESCE(data->>'bannerBlurhash', '') ]; $$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT SQL $dbh->do(<<'SQL'); CREATE UNIQUE INDEX idx_miskkey_people ON misskey_people (misskey_people_details(data)) SQL $dbh->do(<<'SQL'); CREATE TABLE misskey_followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users integer[] NOT NULL ) SQL $dbh->do(<<'SQL'); CREATE TABLE misskey_following ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users integer[] NOT NULL ) SQL } sub _schema_deploy_7($self) { my $dbh = $self->dbh; $dbh->do(<<'SQL'); CREATE OR REPLACE FUNCTION note_text_recursive(IN t JSONB) RETURNS text LANGUAGE sql IMMUTABLE RETURNS NULL ON NULL INPUT PARALLEL SAFE AS $$ SELECT COALESCE( note_text(t), '') || ' ' || COALESCE( note_text_recursive(t->'renote'), '') $$; SQL $dbh->do(<<'SQL'); UPDATE notes SET fts = to_tsvector(note_language(data),note_text_recursive(data)); SQL } 1;