summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2017-04-22 13:01:17 +0100
committerdakkar <dakkar@thenautilus.net>2017-04-22 13:49:17 +0100
commit05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b (patch)
tree5e2c342323686f7050b8a37a5638f2378521e452 /lib
parentbump API version (diff)
downloadtweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.tar.gz
tweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.tar.bz2
tweet-archive-05a2c0b5df45d380b2bfc3ab4640d9c6a3bf594b.zip
store tweets in postgres
Diffstat (limited to 'lib')
-rw-r--r--lib/Dakkar/TweetArchive.pm52
-rw-r--r--lib/Dakkar/TweetArchive/Store.pm118
2 files changed, 170 insertions, 0 deletions
diff --git a/lib/Dakkar/TweetArchive.pm b/lib/Dakkar/TweetArchive.pm
new file mode 100644
index 0000000..9d914fd
--- /dev/null
+++ b/lib/Dakkar/TweetArchive.pm
@@ -0,0 +1,52 @@
+package Dakkar::TweetArchive;
+use 5.024;
+use Moo;
+use experimental 'signatures';
+use Net::Twitter;
+use Types::Standard qw(Str InstanceOf);
+use namespace::clean;
+
+has [qw(consumer_key consumer_secret access_token access_token_secret)] => (
+ is => 'ro',
+ required => 1,
+ isa => Str,
+);
+
+has client => (
+ is => 'lazy',
+ isa => InstanceOf['Net::Twitter'],
+);
+
+sub _build_client($self) {
+ my $nt = Net::Twitter->new(
+ traits => [
+ qw/API::RESTv1_1 OAuth RetryOnError/,
+ AutoCursor => {
+ max_calls => 16,
+ force_cursor => 1,
+ array_accessor => 'users',
+ methods => [qw/friends followers/],
+ },
+ ],
+ consumer_key => $self->consumer_key,
+ consumer_secret => $self->consumer_secret,
+ );
+
+ $nt->access_token($self->access_token);
+ $nt->access_token_secret($self->access_token_secret);
+
+ return $nt;
+}
+
+sub home_timeline($self,$since_id) {
+ return $self->client->home_timeline({
+ include_entities => 1,
+ trim_user => 0,
+ exclude_replies => 0,
+ ( $since_id ? ( since_id => $since_id ) : () ),
+ count => 200,
+ });
+}
+
+1;
+
diff --git a/lib/Dakkar/TweetArchive/Store.pm b/lib/Dakkar/TweetArchive/Store.pm
new file mode 100644
index 0000000..e4a4f3b
--- /dev/null
+++ b/lib/Dakkar/TweetArchive/Store.pm
@@ -0,0 +1,118 @@
+package Dakkar::TweetArchive::Store;
+use 5.024;
+use Moo;
+use experimental 'signatures';
+use DBI;
+use Types::Standard qw(Str InstanceOf);
+use DateTime::Format::Strptime;
+use DateTime::Format::Pg;
+use JSON::MaybeXS;
+use namespace::clean;
+
+use Data::Printer;
+
+has [qw(dsn user pass)] => (
+ is => 'ro',
+ required => 1,
+ isa => Str,
+);
+
+has dbh => (
+ is => 'lazy',
+ isa => InstanceOf['DBI::db'],
+);
+
+sub _build_dbh($self) {
+ my $dbh = DBI->connect(
+ $self->dsn, $self->user, $self->pass,
+ {
+ PrintError => 0,
+ AutoCommit => 1,
+ RaiseError => 1,
+ },
+ );
+ $self->_ensure_schema($dbh);
+ return $dbh;
+}
+
+my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
+my $dt_printer = DateTime::Format::Pg->new();
+
+my $json_printer = JSON::MaybeXS->new(
+ ascii => 1,
+ pretty => 0,
+ allow_nonref => 1,
+ allow_unknown => 1,
+ allow_blessed => 1,
+ convert_blessed => 1,
+);
+
+sub latest_tweet_id($self) {
+ return $self->dbh->selectall_arrayref(
+ q{SELECT MAX(id) FROM tweets},
+ )->[0][0];
+}
+
+sub store_tweet($self,$tweet) {
+ # yes, the source most probably decoded this from a string, we
+ # have to serialise it again, so that PostgreSQL can parse it
+ # *again*
+ my $tweet_str = $json_printer->encode($tweet);
+ my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
+
+ $self->dbh->do(<<'SQL', {},
+INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
+ ON CONFLICT (id) DO UPDATE SET
+ created_at = EXCLUDED.created_at,
+ data = EXCLUDED.data
+SQL
+ $tweet->{id_str},
+ $dt_printer->format_datetime($created_at),
+ $tweet_str,
+ );
+}
+
+sub _schema_deploy($self,$dbh,$next_version) {
+ my $method_name = "_schema_deploy_${next_version}";
+ if (my $method = $self->can($method_name)) {
+ $self->$method($dbh);
+ $dbh->do(<<'SQL', {}, $next_version);
+INSERT INTO meta(version) VALUES (?)
+SQL
+ return 1;
+ }
+ else {
+ return 0;
+ }
+}
+
+sub _ensure_schema($self,$dbh) {
+ # simple-minded schema version management
+ $dbh->do(<<'SQL');
+CREATE TABLE IF NOT EXISTS meta (
+ version INTEGER PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
+)
+SQL
+ my $current_version = $dbh->selectall_arrayref(
+ q{SELECT max(version) FROM meta}
+ )->[0][0];
+ $current_version //= 0;
+
+ my $next_version = $current_version + 1;
+ while ($self->_schema_deploy($dbh,$next_version)) {
+ ++$next_version;
+ }
+}
+
+sub _schema_deploy_1($self,$dbh) {
+ $dbh->do(<<'SQL');
+CREATE TABLE tweets (
+ id VARCHAR(255) PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ data JSONB NOT NULL
+)
+SQL
+}
+
+1;