summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2017-04-22 13:01:17 +0100
committerdakkar <dakkar@thenautilus.net>2017-04-22 13:49:17 +0100
commit05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b (patch)
tree5e2c342323686f7050b8a37a5638f2378521e452
parentbump API version (diff)
downloadtweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.tar.gz
tweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.tar.bz2
tweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.zip
store tweets in postgres
-rw-r--r--.gitignore1
-rw-r--r--lib/Dakkar/TweetArchive.pm52
-rw-r--r--lib/Dakkar/TweetArchive/Store.pm118
-rw-r--r--tweet-archive.pl54
4 files changed, 185 insertions, 40 deletions
diff --git a/.gitignore b/.gitignore
index 61a96e4..b70bcfa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1 @@
/tweet-archive.conf
-/elasticsearch-*/
diff --git a/lib/Dakkar/TweetArchive.pm b/lib/Dakkar/TweetArchive.pm
new file mode 100644
index 0000000..9d914fd
--- /dev/null
+++ b/lib/Dakkar/TweetArchive.pm
@@ -0,0 +1,52 @@
+package Dakkar::TweetArchive;
+use 5.024;
+use Moo;
+use experimental 'signatures';
+use Net::Twitter;
+use Types::Standard qw(Str InstanceOf);
+use namespace::clean;
+
+has [qw(consumer_key consumer_secret access_token access_token_secret)] => (
+ is => 'ro',
+ required => 1,
+ isa => Str,
+);
+
+has client => (
+ is => 'lazy',
+ isa => InstanceOf['Net::Twitter'],
+);
+
+sub _build_client($self) {
+ my $nt = Net::Twitter->new(
+ traits => [
+ qw/API::RESTv1_1 OAuth RetryOnError/,
+ AutoCursor => {
+ max_calls => 16,
+ force_cursor => 1,
+ array_accessor => 'users',
+ methods => [qw/friends followers/],
+ },
+ ],
+ consumer_key => $self->consumer_key,
+ consumer_secret => $self->consumer_secret,
+ );
+
+ $nt->access_token($self->access_token);
+ $nt->access_token_secret($self->access_token_secret);
+
+ return $nt;
+}
+
+sub home_timeline($self,$since_id) {
+ return $self->client->home_timeline({
+ include_entities => 1,
+ trim_user => 0,
+ exclude_replies => 0,
+ ( $since_id ? ( since_id => $since_id ) : () ),
+ count => 200,
+ });
+}
+
+1;
+
diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm
new file mode 100644
index 0000000..e4a4f3b
--- /dev/null
+++ b/lib/Dakkar/TweetArchive/Store.pm
@@ -0,0 +1,118 @@
+package Dakkar::TweetArchive::Store;
+use 5.024;
+use Moo;
+use experimental 'signatures';
+use DBI;
+use Types::Standard qw(Str InstanceOf);
+use DateTime::Format::Strptime;
+use DateTime::Format::Pg;
+use JSON::MaybeXS;
+use namespace::clean;
+
+use Data::Printer;
+
+has [qw(dsn user pass)] => (
+ is => 'ro',
+ required => 1,
+ isa => Str,
+);
+
+has dbh => (
+ is => 'lazy',
+ isa => InstanceOf['DBI::db'],
+);
+
+sub _build_dbh($self) {
+ my $dbh = DBI->connect(
+ $self->dsn, $self->user, $self->pass,
+ {
+ PrintError => 0,
+ AutoCommit => 1,
+ RaiseError => 1,
+ },
+ );
+ $self->_ensure_schema($dbh);
+ return $dbh;
+}
+
+my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
+my $dt_printer = DateTime::Format::Pg->new();
+
+my $json_printer = JSON::MaybeXS->new(
+ ascii => 1,
+ pretty => 0,
+ allow_nonref => 1,
+ allow_unknown => 1,
+ allow_blessed => 1,
+ convert_blessed => 1,
+);
+
+sub latest_tweet_id($self) {
+ return $self->dbh->selectall_arrayref(
+ q{SELECT MAX(id) FROM tweets},
+ )->[0][0];
+}
+
+sub store_tweet($self,$tweet) {
+ # yes, the source most probably decoded this from a string, we
+ # have to serialise it again, so that PostgreSQL can parse it
+ # *again*
+ my $tweet_str = $json_printer->encode($tweet);
+ my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
+
+ $self->dbh->do(<<'SQL', {},
+INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
+ ON CONFLICT (id) DO UPDATE SET
+ created_at = EXCLUDED.created_at,
+ data = EXCLUDED.data
+SQL
+ $tweet->{id_str},
+ $dt_printer->format_datetime($created_at),
+ $tweet_str,
+ );
+}
+
+sub _schema_deploy($self,$dbh,$next_version) {
+ my $method_name = "_schema_deploy_${next_version}";
+ if (my $method = $self->can($method_name)) {
+ $self->$method($dbh);
+ $dbh->do(<<'SQL', {}, $next_version);
+INSERT INTO meta(version) VALUES (?)
+SQL
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+sub _ensure_schema($self,$dbh) {
+ # simple-minded schema version management
+ $dbh->do(<<'SQL');
+CREATE TABLE IF NOT EXISTS meta (
+ version INTEGER PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
+)
+SQL
+ my $current_version = $dbh->selectall_arrayref(
+ q{SELECT max(version) FROM meta}
+ )->[0][0];
+ $current_version //= 0;
+
+ my $next_version = $current_version + 1;
+ while ($self->_schema_deploy($dbh,$next_version)) {
+ ++$next_version;
+ }
+}
+
+sub _schema_deploy_1($self,$dbh) {
+ $dbh->do(<<'SQL');
+CREATE TABLE tweets (
+ id VARCHAR(255) PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ data JSONB NOT NULL
+)
+SQL
+}
+
+1;
diff --git a/tweet-archive.pl b/tweet-archive.pl
index 3c6c0b0..170c6af 100644
--- a/tweet-archive.pl
+++ b/tweet-archive.pl
@@ -1,52 +1,28 @@
#!/usr/bin/env perl
-use 5.016;
+use 5.024;
use strict;
use warnings;
use Path::Class;
-use JSON;
-use Net::Twitter;
-use ElasticSearch;
-use DateTime::Format::Strptime;
-use Data::Visitor::Callback;
-use Data::Printer;
+use JSON::MaybeXS;
+use Dakkar::TweetArchive;
+use Dakkar::TweetArchive::Store;
-my $conf = JSON->new->utf8->decode(
+my $json_parser = JSON::MaybeXS->new(
+ utf8 => 1,
+ relaxed => 1,
+);
+
+my $conf = $json_parser->decode(
file(__FILE__)->parent->file('tweet-archive.conf')
->slurp(iomode=>'<:raw')
// '{}'
);
-my $cb = Data::Visitor::Callback->new(
- 'JSON::Boolean' => sub { say "converting a boolean"; return 0+$_ },
-);
-my $dt = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
-
-my $nt = Net::Twitter->new(
- traits => [qw/API::RESTv1_1 OAuth/],
- consumer_key => $conf->{consumer_key},
- consumer_secret => $conf->{consumer_secret},
-);
-
-$nt->access_token($conf->{access_token});
-$nt->access_token_secret($conf->{access_token_secret});
+my $client = Dakkar::TweetArchive->new($conf);
+my $store = Dakkar::TweetArchive::Store->new($conf);
-my $es = ElasticSearch->new();
+my $latest_id = $store->latest_tweet_id;
-my $tl = $nt->home_timeline({
- include_entities => 1,
- trim_user => 1,
- exclude_replies => 0,
-});
-
-for my $tweet (@$tl) {
- $cb->visit($tweet);
- say p $tweet;
- my $result = $es->index(
- index => 'tweet-archive',
- type => 'tweet',
- id => $tweet->{id_str},
- timestamp => $dt->parse_datetime($tweet->{created_at})->iso8601,
- data => $tweet,
- );
- say p $result;
+for my $tweet ($client->home_timeline($latest_id)->@*) {
+ $store->store_tweet($tweet);
}