From d790821e375623b6c20f04d943401400d263d8e9 Mon Sep 17 00:00:00 2001 From: dakkar Date: Mon, 8 May 2017 11:51:44 +0100 Subject: maybe faster people migration --- lib/Dakkar/TweetArchive/Store.pm | 84 +++++++++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm index 7dc7e4d..c3840ea 100644 --- a/lib/Dakkar/TweetArchive/Store.pm +++ b/lib/Dakkar/TweetArchive/Store.pm @@ -92,7 +92,7 @@ SQL sub store_friends($self,$friends) { my $ids = $self->_store_people($friends); - $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids)); + $self->dbh->do(<<"SQL", {}, $ids); INSERT INTO friends(users) VALUES(?) SQL } @@ -100,7 +100,7 @@ SQL sub store_followers($self,$followers) { my $ids = $self->_store_people($followers); - $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids)); + $self->dbh->do(<<"SQL", {}, $ids); INSERT INTO followers(users) VALUES(?) SQL } @@ -171,13 +171,14 @@ SQL } sub _schema_deploy_3($self) { - $self->dbh->do(<<'SQL'); + my $dbh = $self->dbh; + $dbh->do(<<'SQL'); CREATE TABLE people ( id SERIAL PRIMARY KEY, data JSONB NOT NULL ) SQL - $self->dbh->do(<<'SQL'); + $dbh->do(<<'SQL'); CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$ SELECT array[ (data->>'id_str'), @@ -191,33 +192,64 @@ CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$ ]; $$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT SQL - $self->dbh->do(<<'SQL'); + $dbh->do(<<'SQL'); CREATE UNIQUE INDEX idx_people ON people (people_details(data)) SQL - my $json_parser = JSON::MaybeXS->new(); - - my $migrate = sub($table) { - $self->dbh->do(<<"SQL"); -DECLARE migrate_crs CURSOR FOR SELECT taken_at,users FROM $table + $dbh->do(<<'SQL'); +CREATE TABLE new_followers ( + taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, + users integer[] NOT NULL +) SQL - my $fetch = $self->dbh->prepare(q{FETCH 1 FROM migrate_crs}); - my $update = $self->dbh->prepare(<<"SQL"); -UPDATE $table - SET users = ? - WHERE taken_at = ? + $dbh->do(<<'SQL'); +CREATE TABLE new_friends ( + taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY, + users integer[] NOT NULL +) +SQL + + $dbh->do(<<'SQL'); +CREATE OR REPLACE FUNCTION migrate_people() RETURNS VOID AS $$ +DECLARE + f RECORD; + p RECORD; + ids INTEGER[]; +BEGIN + FOR f IN SELECT * FROM friends ORDER BY taken_at ASC LOOP + RAISE NOTICE 'migrating friends at %s', f.taken_at; + ids := '{}'; + FOR p IN INSERT INTO people(data) + SELECT v FROM jsonb_array_elements(f.users) AS d(v) + on CONFLICT (people_details(data)) DO UPDATE + SET data=EXCLUDED.data + RETURNING id LOOP + ids := array_append(ids,p.id); + END LOOP; + INSERT INTO new_friends(taken_at,users) VALUES (f.taken_at,ids); + END LOOP; + + FOR f IN SELECT * FROM followers ORDER BY taken_at ASC LOOP + RAISE NOTICE 'migrating followers at %s', f.taken_at; + ids := '{}'; + FOR p IN INSERT INTO people(data) + SELECT v FROM jsonb_array_elements(f.users) AS d(v) + ON CONFLICT (people_details(data)) DO UPDATE + SET data=EXCLUDED.data + RETURNING id LOOP + ids := array_append(ids,p.id); + END LOOP; + INSERT INTO new_followers(taken_at,users) VALUES (f.taken_at,ids); + END LOOP; +END; +$$ LANGUAGE PLPGSQL; SQL - $fetch->execute; - while (my @row = $fetch->fetchrow_array) { - my $friends = $json_parser->decode($row[1]); - my $ids = $self->_store_people($friends); - $update->execute($json_printer->encode($ids),$row[0]); - $fetch->execute; - } - }; - - $migrate->('friends'); - $migrate->('followers'); + $dbh->selectall_arrayref(q{SELECT migrate_people()}); + $dbh->do(q{DROP FUNCTION migrate_people()}); + $dbh->do(q{DROP TABLE friends}); + $dbh->do(q{DROP TABLE followers}); + $dbh->do(q{ALTER TABLE new_friends RENAME TO friends}); + $dbh->do(q{ALTER TABLE new_followers RENAME TO followers}); } 1; -- cgit v1.2.3