summaryrefslogtreecommitdiff
path: root/lib/Dakkar/TweetArchive/Store.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Dakkar/TweetArchive/Store.pm')
-rw-r--r--lib/Dakkar/TweetArchive/Store.pm118
1 files changed, 118 insertions, 0 deletions
diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm
new file mode 100644
index 0000000..e4a4f3b
--- /dev/null
+++ b/lib/Dakkar/TweetArchive/Store.pm
@@ -0,0 +1,118 @@
+package Dakkar::TweetArchive::Store;
+use 5.024;
+use Moo;
+use experimental 'signatures';
+use DBI;
+use Types::Standard qw(Str InstanceOf);
+use DateTime::Format::Strptime;
+use DateTime::Format::Pg;
+use JSON::MaybeXS;
+use namespace::clean;
+
+use Data::Printer;
+
+has [qw(dsn user pass)] => (
+ is => 'ro',
+ required => 1,
+ isa => Str,
+);
+
+has dbh => (
+ is => 'lazy',
+ isa => InstanceOf['DBI::db'],
+);
+
+sub _build_dbh($self) {
+ my $dbh = DBI->connect(
+ $self->dsn, $self->user, $self->pass,
+ {
+ PrintError => 0,
+ AutoCommit => 1,
+ RaiseError => 1,
+ },
+ );
+ $self->_ensure_schema($dbh);
+ return $dbh;
+}
+
+my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
+my $dt_printer = DateTime::Format::Pg->new();
+
+my $json_printer = JSON::MaybeXS->new(
+ ascii => 1,
+ pretty => 0,
+ allow_nonref => 1,
+ allow_unknown => 1,
+ allow_blessed => 1,
+ convert_blessed => 1,
+);
+
+sub latest_tweet_id($self) {
+ return $self->dbh->selectall_arrayref(
+ q{SELECT MAX(id) FROM tweets},
+ )->[0][0];
+}
+
+sub store_tweet($self,$tweet) {
+ # yes, the source most probably decoded this from a string, we
+ # have to serialise it again, so that PostgreSQL can parse it
+ # *again*
+ my $tweet_str = $json_printer->encode($tweet);
+ my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
+
+ $self->dbh->do(<<'SQL', {},
+INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
+ ON CONFLICT (id) DO UPDATE SET
+ created_at = EXCLUDED.created_at,
+ data = EXCLUDED.data
+SQL
+ $tweet->{id_str},
+ $dt_printer->format_datetime($created_at),
+ $tweet_str,
+ );
+}
+
+sub _schema_deploy($self,$dbh,$next_version) {
+ my $method_name = "_schema_deploy_${next_version}";
+ if (my $method = $self->can($method_name)) {
+ $self->$method($dbh);
+ $dbh->do(<<'SQL', {}, $next_version);
+INSERT INTO meta(version) VALUES (?)
+SQL
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+sub _ensure_schema($self,$dbh) {
+ # simple-minded schema version management
+ $dbh->do(<<'SQL');
+CREATE TABLE IF NOT EXISTS meta (
+ version INTEGER PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
+)
+SQL
+ my $current_version = $dbh->selectall_arrayref(
+ q{SELECT max(version) FROM meta}
+ )->[0][0];
+ $current_version //= 0;
+
+ my $next_version = $current_version + 1;
+ while ($self->_schema_deploy($dbh,$next_version)) {
+ ++$next_version;
+ }
+}
+
+sub _schema_deploy_1($self,$dbh) {
+ $dbh->do(<<'SQL');
+CREATE TABLE tweets (
+ id VARCHAR(255) PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ data JSONB NOT NULL
+)
+SQL
+}
+
+1;