From 6483ef8d798cb26779186ab143222db9236aed7f Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 7 May 2017 15:15:44 +0100 Subject: normalise people --- lib/Dakkar/TweetArchive/Store.pm | 106 ++++++++++++++++++++++++++++++++------- 1 file changed, 89 insertions(+), 17 deletions(-) diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm index 8cd5506..7dc7e4d 100644 --- a/lib/Dakkar/TweetArchive/Store.pm +++ b/lib/Dakkar/TweetArchive/Store.pm @@ -21,7 +21,7 @@ has dbh => ( ); sub _build_dbh($self) { - my $dbh = DBI->connect( + return DBI->connect( $self->dsn, $self->user, $self->pass, { PrintError => 0, @@ -29,8 +29,10 @@ sub _build_dbh($self) { RaiseError => 1, }, ); - $self->_ensure_schema($dbh); - return $dbh; +} + +sub BUILD($self,@) { + $self->_ensure_schema; } my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y'); @@ -70,29 +72,48 @@ SQL ); } +sub _store_people($self,$people) { + my @ids; + for my $person ($people->@*) { + my $person_str = $json_printer->encode($person); + push @ids, + $self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0]; +INSERT INTO people(data) VALUES (?) +ON CONFLICT (people_details(data)) DO UPDATE + SET data=EXCLUDED.data +RETURNING id +SQL + } + + return \@ids; +} + + sub store_friends($self,$friends) { - my $friends_str = $json_printer->encode($friends); + my $ids = $self->_store_people($friends); - $self->dbh->do(<<'SQL', {}, $friends_str); + $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids)); INSERT INTO friends(users) VALUES(?) SQL } sub store_followers($self,$followers) { - my $followers_str = $json_printer->encode($followers); + my $ids = $self->_store_people($followers); - $self->dbh->do(<<'SQL', {}, $followers_str); + $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids)); INSERT INTO followers(users) VALUES(?) SQL } -sub _schema_deploy($self,$dbh,$next_version) { +sub _schema_deploy($self,$next_version) { my $method_name = "_schema_deploy_${next_version}"; if (my $method = $self->can($method_name)) { - $self->$method($dbh); - $dbh->do(<<'SQL', {}, $next_version); + $self->dbh->begin_work; + $self->$method(); + $self->dbh->do(<<'SQL', {}, $next_version); INSERT INTO meta(version) VALUES (?) SQL + $self->dbh->commit; return 1; } else { @@ -100,7 +121,8 @@ SQL } } -sub _ensure_schema($self,$dbh) { +sub _ensure_schema($self) { + my $dbh = $self->dbh; # simple-minded schema version management { # silence the 'NOTICE: relation "meta" already exists, skipping' @@ -118,13 +140,13 @@ SQL $current_version //= 0; my $next_version = $current_version + 1; - while ($self->_schema_deploy($dbh,$next_version)) { + while ($self->_schema_deploy($next_version)) { ++$next_version; } } -sub _schema_deploy_1($self,$dbh) { - $dbh->do(<<'SQL'); +sub _schema_deploy_1($self) { + $self->dbh->do(<<'SQL'); CREATE TABLE tweets ( id VARCHAR(255) PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -133,14 +155,14 @@ CREATE TABLE tweets ( SQL } -sub _schema_deploy_2($self,$dbh) { - $dbh->do(<<'SQL'); +sub _schema_deploy_2($self) { + $self->dbh->do(<<'SQL'); CREATE TABLE followers ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL ) SQL - $dbh->do(<<'SQL'); + $self->dbh->do(<<'SQL'); CREATE TABLE friends ( taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, users JSONB NOT NULL @@ -148,4 +170,54 @@ CREATE TABLE friends ( SQL } +sub _schema_deploy_3($self) { + $self->dbh->do(<<'SQL'); +CREATE TABLE people ( + id SERIAL PRIMARY KEY, + data JSONB NOT NULL +) +SQL + $self->dbh->do(<<'SQL'); +CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$ + SELECT array[ + (data->>'id_str'), + (data->>'url'), + (data->>'name'), + (data->>'location'), + (data->>'description'), + (data->>'screen_name'), + (data->>'profile_image_url'), + (data->>'profile_background_image_url') + ]; +$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT +SQL + $self->dbh->do(<<'SQL'); +CREATE UNIQUE INDEX idx_people ON people (people_details(data)) +SQL + + my $json_parser = JSON::MaybeXS->new(); + + my $migrate = sub($table) { + $self->dbh->do(<<"SQL"); +DECLARE migrate_crs CURSOR FOR SELECT taken_at,users FROM $table +SQL + my $fetch = $self->dbh->prepare(q{FETCH 1 FROM migrate_crs}); + my $update = $self->dbh->prepare(<<"SQL"); +UPDATE $table + SET users = ? + WHERE taken_at = ? +SQL + $fetch->execute; + while (my @row = $fetch->fetchrow_array) { + my $friends = $json_parser->decode($row[1]); + my $ids = $self->_store_people($friends); + $update->execute($json_printer->encode($ids),$row[0]); + $fetch->execute; + } + }; + + $migrate->('friends'); + $migrate->('followers'); +} + 1; -- cgit v1.2.3