package Dakkar::TweetArchive::Store;
use v5.36;
use Moo;
use DBI;
use Types::Standard qw(Str InstanceOf);
use DateTime::Format::Strptime;
use DateTime::Format::Pg;
use JSON::MaybeXS;
use namespace::clean;
has [qw(dsn user pass)] => (
is => 'ro',
required => 1,
isa => Str,
);
has dbh => (
is => 'lazy',
isa => InstanceOf['DBI::db'],
);
sub _build_dbh($self) {
return DBI->connect(
$self->dsn, $self->user, $self->pass,
{
PrintError => 0,
AutoCommit => 1,
RaiseError => 1,
},
);
}
sub BUILD($self,@) {
$self->_ensure_schema;
}
my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
my $dt_parser_iso = DateTime::Format::Strptime->new(pattern => '%Y-%m-%dT%H:%M:%S.%3NZ', time_zone=>'UTC');
my $dt_printer = DateTime::Format::Pg->new();
my $json_printer = JSON::MaybeXS->new(
ascii => 1,
pretty => 0,
allow_nonref => 1,
allow_unknown => 1,
allow_blessed => 1,
convert_blessed => 1,
);
sub latest_tweet_id($self) {
return $self->dbh->selectall_arrayref(
q{SELECT MAX(id) FROM tweets},
)->[0][0];
}
sub store_tweet($self,$tweet) {
my $tweet_str = $json_printer->encode($tweet);
my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
$self->dbh->do(<<'SQL', {},
INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
ON CONFLICT (id) DO UPDATE SET
created_at = EXCLUDED.created_at,
data = EXCLUDED.data
SQL
$tweet->{id_str},
$dt_printer->format_datetime($created_at),
$tweet_str,
);
}
sub _store_twitter_people($self,$people) {
my @ids;
for my $person ($people->@*) {
my $person_str = $json_printer->encode($person);
push @ids,
$self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0];
INSERT INTO twitter_people(data) VALUES (?)
ON CONFLICT (twitter_people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id
SQL
}
return \@ids;
}
sub _array_num_equal($a1,$a2) {
my ($a1_str, $a2_str) = map {
join "\0", sort { $a <=> $b } $_->@*
} $a1, $a2;
return $a1_str eq $a2_str;
}
sub _store_people_and_ids($self,$store_method,$ids_table,$people) {
my $ids = $self->$store_method($people);
my ($last_ids) = $self->dbh->selectrow_array(<<"SQL");
SELECT users
FROM ${ids_table}
ORDER BY taken_at DESC
LIMIT 1
SQL
return if $last_ids && _array_num_equal($ids, $last_ids);
$self->dbh->do(<<"SQL", {}, $ids);
INSERT INTO ${ids_table}(users) VALUES(?)
SQL
}
sub store_twitter_friends($self,$friends) {
$self->_store_people_and_ids('_store_twitter_people','twitter_friends',$friends);
}
sub store_twitter_followers($self,$followers) {
$self->_store_people_and_ids('_store_twitter_people','twitter_followers',$followers);
}
sub latest_note_id($self) {
return $self->dbh->selectall_arrayref(
q{SELECT MAX(id) FROM notes},
)->[0][0];
}
sub store_note($self,$note) {
my $note_str = $json_printer->encode($note);
my $created_at = $dt_parser_iso->parse_datetime($note->{createdAt});
$self->dbh->do(<<'SQL', {},
INSERT INTO notes(id,created_at,data) VALUES(?,?,?)
ON CONFLICT (id) DO UPDATE SET
created_at = EXCLUDED.created_at,
data = EXCLUDED.data
SQL
$note->{id},
$dt_printer->format_datetime($created_at),
$note_str,
);
}
sub _store_misskey_people($self,$people) {
my @ids;
for my $person ($people->@*) {
my $person_str = $json_printer->encode($person);
push @ids,
$self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0];
INSERT INTO misskey_people(data) VALUES (?)
ON CONFLICT (misskey_people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id
SQL
}
return \@ids;
}
sub store_misskey_following($self,$following) {
$self->_store_people_and_ids('_store_misskey_people','misskey_following',$following);
}
sub store_misskey_followers($self,$followers) {
$self->_store_people_and_ids('_store_misskey_people','misskey_followers',$followers);
}
sub _schema_deploy($self,$next_version) {
my $method_name = "_schema_deploy_${next_version}";
if (my $method = $self->can($method_name)) {
$self->dbh->begin_work;
$self->$method();
$self->dbh->do(<<'SQL', {}, $next_version);
INSERT INTO meta(version) VALUES (?)
SQL
$self->dbh->commit;
return 1;
}
else {
return 0;
}
}
sub _ensure_schema($self) {
my $dbh = $self->dbh;
{
local $dbh->{PrintWarn} = 0;
$dbh->do(<<'SQL');
CREATE TABLE IF NOT EXISTS meta (
version INTEGER PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
SQL
}
my $current_version = $dbh->selectall_arrayref(
q{SELECT max(version) FROM meta}
)->[0][0];
$current_version //= 0;
my $next_version = $current_version + 1;
while ($self->_schema_deploy($next_version)) {
++$next_version;
}
}
sub _schema_deploy_1($self) {
$self->dbh->do(<<'SQL');
CREATE TABLE tweets (
id VARCHAR(255) PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB NOT NULL
)
SQL
}
sub _schema_deploy_2($self) {
$self->dbh->do(<<'SQL');
CREATE TABLE followers (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
)
SQL
$self->dbh->do(<<'SQL');
CREATE TABLE friends (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
)
SQL
}
sub _schema_deploy_3($self) {
my $dbh = $self->dbh;
$dbh->do(<<'SQL');
CREATE TABLE people (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$
SELECT array[
(data->>'id_str'),
(data->>'url'),
(data->>'name'),
(data->>'location'),
(data->>'description'),
(data->>'screen_name'),
(data->>'profile_image_url'),
(data->>'profile_background_image_url')
];
$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT
SQL
$dbh->do(<<'SQL');
CREATE UNIQUE INDEX idx_people ON people (people_details(data))
SQL
$dbh->do(<<'SQL');
CREATE TABLE new_followers (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users integer[] NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE TABLE new_friends (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users integer[] NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION migrate_people() RETURNS VOID AS $$
DECLARE
f RECORD;
p RECORD;
ids INTEGER[];
BEGIN
FOR f IN SELECT * FROM friends ORDER BY taken_at DESC LIMIT 1 LOOP
RAISE NOTICE 'migrating friends at %s', f.taken_at;
ids := '{}';
FOR p IN INSERT INTO people(data)
SELECT v FROM jsonb_array_elements(f.users) AS d(v)
on CONFLICT (people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id LOOP
ids := array_append(ids,p.id);
END LOOP;
INSERT INTO new_friends(taken_at,users) VALUES (f.taken_at,ids);
END LOOP;
FOR f IN SELECT * FROM followers ORDER BY taken_at DESC LIMIT 1 LOOP
RAISE NOTICE 'migrating followers at %s', f.taken_at;
ids := '{}';
FOR p IN INSERT INTO people(data)
SELECT v FROM jsonb_array_elements(f.users) AS d(v)
ON CONFLICT (people_details(data)) DO UPDATE
SET data=EXCLUDED.data
RETURNING id LOOP
ids := array_append(ids,p.id);
END LOOP;
INSERT INTO new_followers(taken_at,users) VALUES (f.taken_at,ids);
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
SQL
$dbh->selectall_arrayref(q{SELECT migrate_people()});
$dbh->do(q{DROP FUNCTION migrate_people()});
$dbh->do(q{DROP TABLE friends});
$dbh->do(q{DROP TABLE followers});
$dbh->do(q{ALTER TABLE new_friends RENAME TO friends});
$dbh->do(q{ALTER TABLE new_followers RENAME TO followers});
}
sub _schema_deploy_4($self) {
my $dbh = $self->dbh;
$dbh->do(<<'SQL');
ALTER TABLE tweets ALTER COLUMN id SET DATA TYPE bigint USING id::bigint
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION tweet_text(IN t JSONB) RETURNS text
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$ SELECT
CASE t ? 'extended_tweet'
WHEN true THEN t->'extended_tweet'->>'text'
ELSE t->>'text'
END
$$;
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION tweet_text_recursive(IN t JSONB) RETURNS text
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$ SELECT
tweet_text(t) || ' ' ||
COALESCE( tweet_text_recursive(t->'retweeted_status'), '') || ' ' ||
COALESCE( tweet_text_recursive(t->'quoted_status'), '')
$$;
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION tweet_language(IN t JSONB) RETURNS regconfig
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$ SELECT CASE t->>'lang'
WHEN 'it' THEN 'pg_catalog.italian'::regconfig
WHEN 'fr' THEN 'pg_catalog.french'::regconfig
WHEN 'de' THEN 'pg_catalog.german'::regconfig
WHEN 'es' THEN 'pg_catalog.spanish'::regconfig
WHEN 'nl' THEN 'pg_catalog.dutch'::regconfig
ELSE 'pg_catalog.english'::regconfig
END
$$;
SQL
$dbh->do(<<'SQL');
ALTER TABLE tweets ADD COLUMN fts tsvector;
SQL
$dbh->do(<<'SQL');
UPDATE tweets SET fts = to_tsvector(tweet_language(data),tweet_text_recursive(data));
SQL
$dbh->do(<<'SQL');
CREATE INDEX tweets_fts ON tweets USING GIN (fts);
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION tweets_fts_trigger()
RETURNS trigger
LANGUAGE plpgsql
AS $$
begin
new.fts := to_tsvector(tweet_language(new.data),tweet_text_recursive(new.data));
return new;
end
$$;
SQL
$dbh->do(<<'SQL');
CREATE TRIGGER tweets_fts_update
BEFORE INSERT OR UPDATE
ON tweets
FOR EACH ROW
EXECUTE PROCEDURE tweets_fts_trigger();
SQL
}
sub _schema_deploy_5($self) {
my $dbh = $self->dbh;
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION tweet_text(IN t JSONB) RETURNS text
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$ SELECT
CASE t ? 'extended_tweet'
WHEN true THEN COALESCE(t->'extended_tweet'->>'text',t->'extended_tweet'->>'full_text')
ELSE COALESCE(t->>'text',t->>'full_text')
END
$$;
SQL
}
sub _schema_deploy_6($self) {
my $dbh = $self->dbh;
$dbh->do('ALTER TABLE people RENAME TO twitter_people');
$dbh->do('ALTER TABLE friends RENAME TO twitter_friends');
$dbh->do('ALTER TABLE followers RENAME TO twitter_followers');
$dbh->do('ALTER INDEX idx_people RENAME TO idx_twitter_people');
$dbh->do('ALTER FUNCTION people_details RENAME TO twitter_people_details');
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION note_text(IN t JSONB) RETURNS text
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$
SELECT t->>'text'
$$;
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION note_text_recursive(IN t JSONB) RETURNS text
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$ SELECT
note_text(t) || ' ' ||
COALESCE( note_text_recursive(t->'renote'), '')
$$;
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION note_language(IN t JSONB) RETURNS regconfig
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$
SELECT 'pg_catalog.english'::regconfig
$$;
SQL
$dbh->do(<<'SQL');
CREATE TABLE notes (
id VARCHAR(255) PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB NOT NULL,
fts tsvector
);
SQL
$dbh->do(<<'SQL');
CREATE INDEX notes_fts ON notes USING GIN (fts);
SQL
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION notes_fts_trigger()
RETURNS trigger
LANGUAGE plpgsql
AS $$
begin
new.fts := to_tsvector(note_language(new.data),note_text_recursive(new.data));
return new;
end
$$;
SQL
$dbh->do(<<'SQL');
CREATE TRIGGER notes_fts_update
BEFORE INSERT OR UPDATE
ON notes
FOR EACH ROW
EXECUTE PROCEDURE notes_fts_trigger();
SQL
$dbh->do(<<'SQL');
CREATE TABLE misskey_people (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE FUNCTION misskey_people_details(data jsonb) RETURNS text[] AS $$
SELECT array[
(data->>'id'),
(data->>'username'),
COALESCE(data->>'host', ''),
COALESCE(data->>'location', ''),
COALESCE(data->>'description', ''),
COALESCE(data->>'name', ''),
COALESCE(data->>'avatarBlurhash', ''),
COALESCE(data->>'bannerBlurhash', '')
];
$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT
SQL
$dbh->do(<<'SQL');
CREATE UNIQUE INDEX idx_miskkey_people ON misskey_people (misskey_people_details(data))
SQL
$dbh->do(<<'SQL');
CREATE TABLE misskey_followers (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users integer[] NOT NULL
)
SQL
$dbh->do(<<'SQL');
CREATE TABLE misskey_following (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users integer[] NOT NULL
)
SQL
}
sub _schema_deploy_7($self) {
my $dbh = $self->dbh;
$dbh->do(<<'SQL');
CREATE OR REPLACE FUNCTION note_text_recursive(IN t JSONB) RETURNS text
LANGUAGE sql
IMMUTABLE
RETURNS NULL ON NULL INPUT
PARALLEL SAFE
AS $$ SELECT
COALESCE( note_text(t), '') || ' ' ||
COALESCE( note_text_recursive(t->'renote'), '')
$$;
SQL
$dbh->do(<<'SQL');
UPDATE notes
SET fts = to_tsvector(note_language(data),note_text_recursive(data));
SQL
}
1;