summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2017-05-07 15:15:44 +0100
committerdakkar <dakkar@thenautilus.net>2017-05-07 15:15:44 +0100
commit6483ef8d798cb26779186ab143222db9236aed7f (patch)
tree05f239b31b5e02d693d6aa8ca9c359f2704fa8ac
parentadd DBD::Pg (diff)
downloadtweet-archive-6483ef8d798cb26779186ab143222db9236aed7f.tar.gz
tweet-archive-6483ef8d798cb26779186ab143222db9236aed7f.tar.bz2
tweet-archive-6483ef8d798cb26779186ab143222db9236aed7f.zip
normalise people
-rw-r--r--lib/Dakkar/TweetArchive/Store.pm106
1 files changed, 89 insertions, 17 deletions
diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm
index 8cd5506..7dc7e4d 100644
--- a/lib/Dakkar/TweetArchive/Store.pm
+++ b/lib/Dakkar/TweetArchive/Store.pm
@@ -21,7 +21,7 @@ has dbh => (
);
sub _build_dbh($self) {
- my $dbh = DBI->connect(
+ return DBI->connect(
$self->dsn, $self->user, $self->pass,
{
PrintError => 0,
@@ -29,8 +29,10 @@ sub _build_dbh($self) {
RaiseError => 1,
},
);
- $self->_ensure_schema($dbh);
- return $dbh;
+}
+
+sub BUILD($self,@) {
+ $self->_ensure_schema;
}
my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
@@ -70,29 +72,48 @@ SQL
);
}
+sub _store_people($self,$people) {
+ my @ids;
+ for my $person ($people->@*) {
+ my $person_str = $json_printer->encode($person);
+ push @ids,
+ $self->dbh->selectall_arrayref(<<'SQL',{},$person_str)->[0][0];
+INSERT INTO people(data) VALUES (?)
+ON CONFLICT (people_details(data)) DO UPDATE
+ SET data=EXCLUDED.data
+RETURNING id
+SQL
+ }
+
+ return \@ids;
+}
+
+
sub store_friends($self,$friends) {
- my $friends_str = $json_printer->encode($friends);
+ my $ids = $self->_store_people($friends);
- $self->dbh->do(<<'SQL', {}, $friends_str);
+ $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids));
INSERT INTO friends(users) VALUES(?)
SQL
}
sub store_followers($self,$followers) {
- my $followers_str = $json_printer->encode($followers);
+ my $ids = $self->_store_people($followers);
- $self->dbh->do(<<'SQL', {}, $followers_str);
+ $self->dbh->do(<<"SQL", {}, $json_printer->encode($ids));
INSERT INTO followers(users) VALUES(?)
SQL
}
-sub _schema_deploy($self,$dbh,$next_version) {
+sub _schema_deploy($self,$next_version) {
my $method_name = "_schema_deploy_${next_version}";
if (my $method = $self->can($method_name)) {
- $self->$method($dbh);
- $dbh->do(<<'SQL', {}, $next_version);
+ $self->dbh->begin_work;
+ $self->$method();
+ $self->dbh->do(<<'SQL', {}, $next_version);
INSERT INTO meta(version) VALUES (?)
SQL
+ $self->dbh->commit;
return 1;
}
else {
@@ -100,7 +121,8 @@ SQL
}
}
-sub _ensure_schema($self,$dbh) {
+sub _ensure_schema($self) {
+ my $dbh = $self->dbh;
# simple-minded schema version management
{
# silence the 'NOTICE: relation "meta" already exists, skipping'
@@ -118,13 +140,13 @@ SQL
$current_version //= 0;
my $next_version = $current_version + 1;
- while ($self->_schema_deploy($dbh,$next_version)) {
+ while ($self->_schema_deploy($next_version)) {
++$next_version;
}
}
-sub _schema_deploy_1($self,$dbh) {
- $dbh->do(<<'SQL');
+sub _schema_deploy_1($self) {
+ $self->dbh->do(<<'SQL');
CREATE TABLE tweets (
id VARCHAR(255) PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -133,14 +155,14 @@ CREATE TABLE tweets (
SQL
}
-sub _schema_deploy_2($self,$dbh) {
- $dbh->do(<<'SQL');
+sub _schema_deploy_2($self) {
+ $self->dbh->do(<<'SQL');
CREATE TABLE followers (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
)
SQL
- $dbh->do(<<'SQL');
+ $self->dbh->do(<<'SQL');
CREATE TABLE friends (
taken_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP PRIMARY KEY,
users JSONB NOT NULL
@@ -148,4 +170,54 @@ CREATE TABLE friends (
SQL
}
+sub _schema_deploy_3($self) {
+ $self->dbh->do(<<'SQL');
+CREATE TABLE people (
+ id SERIAL PRIMARY KEY,
+ data JSONB NOT NULL
+)
+SQL
+ $self->dbh->do(<<'SQL');
+CREATE FUNCTION people_details(data jsonb) RETURNS text[] AS $$
+ SELECT array[
+ (data->>'id_str'),
+ (data->>'url'),
+ (data->>'name'),
+ (data->>'location'),
+ (data->>'description'),
+ (data->>'screen_name'),
+ (data->>'profile_image_url'),
+ (data->>'profile_background_image_url')
+ ];
+$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT
+SQL
+ $self->dbh->do(<<'SQL');
+CREATE UNIQUE INDEX idx_people ON people (people_details(data))
+SQL
+
+ my $json_parser = JSON::MaybeXS->new();
+
+ my $migrate = sub($table) {
+ $self->dbh->do(<<"SQL");
+DECLARE migrate_crs CURSOR FOR SELECT taken_at,users FROM $table
+SQL
+ my $fetch = $self->dbh->prepare(q{FETCH 1 FROM migrate_crs});
+ my $update = $self->dbh->prepare(<<"SQL");
+UPDATE $table
+ SET users = ?
+ WHERE taken_at = ?
+SQL
+ $fetch->execute;
+ while (my @row = $fetch->fetchrow_array) {
+ my $friends = $json_parser->decode($row[1]);
+ my $ids = $self->_store_people($friends);
+ $update->execute($json_printer->encode($ids),$row[0]);
+ $fetch->execute;
+ }
+ };
+
+ $migrate->('friends');
+ $migrate->('followers');
+}
+
1;