summaryrefslogtreecommitdiff
path: root/lib/Dakkar/TweetArchive/Store.pm
blob: e4a4f3b7bd81c6f26d5f998cbd59eaddcedfc7af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package Dakkar::TweetArchive::Store; 
use 5.024;
use Moo;
use experimental 'signatures';
use DBI;
use Types::Standard qw(Str InstanceOf);
use DateTime::Format::Strptime;
use DateTime::Format::Pg;
use JSON::MaybeXS;
use namespace::clean;
 
use Data::Printer;
 
has [qw(dsn user pass)] => (
    is => 'ro',
    required => 1,
    isa => Str,
);
 
has dbh => (
    is => 'lazy',
    isa => InstanceOf['DBI::db'],
);
 
sub _build_dbh($self) {
    my $dbh = DBI->connect(
        $self->dsn, $self->user, $self->pass,
        {
            PrintError => 0,
            AutoCommit => 1,
            RaiseError => 1,
        },
    );
    $self->_ensure_schema($dbh);
    return $dbh;
}
 
my $dt_parser = DateTime::Format::Strptime->new(pattern => '%a %b %d %T %z %Y');
my $dt_printer = DateTime::Format::Pg->new();
 
my $json_printer = JSON::MaybeXS->new(
    ascii => 1,
    pretty => 0,
    allow_nonref => 1,
    allow_unknown => 1,
    allow_blessed => 1,
    convert_blessed => 1,
);
 
sub latest_tweet_id($self) {
    return $self->dbh->selectall_arrayref(
        q{SELECT MAX(id) FROM tweets},
    )->[0][0];
}
 
sub store_tweet($self,$tweet) {
    # yes, the source most probably decoded this from a string, we 
    # have to serialise it again, so that PostgreSQL can parse it 
    # *again* 
    my $tweet_str = $json_printer->encode($tweet);
    my $created_at = $dt_parser->parse_datetime($tweet->{created_at});
 
    $self->dbh->do(<<'SQL', {},
INSERT INTO tweets(id,created_at,data) VALUES(?,?,?)
 ON CONFLICT (id) DO UPDATE SET
   created_at = EXCLUDED.created_at,
   data = EXCLUDED.data
SQL
                   $tweet->{id_str},
                   $dt_printer->format_datetime($created_at),
                   $tweet_str,
               );
}
 
sub _schema_deploy($self,$dbh,$next_version) {
    my $method_name = "_schema_deploy_${next_version}";
    if (my $method = $self->can($method_name)) {
        $self->$method($dbh);
        $dbh->do(<<'SQL', {}, $next_version);
INSERT INTO meta(version) VALUES (?)
SQL
        return 1;
    }
    else {
        return 0;
    }
}
 
sub _ensure_schema($self,$dbh) {
    # simple-minded schema version management 
    $dbh->do(<<'SQL');
CREATE TABLE IF NOT EXISTS meta (
  version INTEGER PRIMARY KEY,
  created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
SQL
    my $current_version = $dbh->selectall_arrayref(
        q{SELECT max(version) FROM meta}
    )->[0][0];
    $current_version //= 0;
 
    my $next_version = $current_version + 1;
    while ($self->_schema_deploy($dbh,$next_version)) {
        ++$next_version;
    }
}
 
sub _schema_deploy_1($self,$dbh) {
    $dbh->do(<<'SQL');
CREATE TABLE tweets (
  id VARCHAR(255) PRIMARY KEY,
  created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
  data JSONB NOT NULL
)
SQL
}
 
1;