summaryrefslogtreecommitdiff
path: root/lib/IO/Async/PSGI.pm
blob: 99f7d2336ba530dc4bed8fabf1793dbbbebca29d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package IO::Async::PSGI; 
use Moo;
use Types::Standard qw(CodeRef InstanceOf);
use IO::Async::Loop;
use Future;
use HTTP::Status ();
use Safe::Isa;
use Plack::Middleware::HTTPExceptions;
 
{
    my $logging_requestid_cb;
    sub _invoke_logging_cb {
        return unless $logging_requestid_cb;
        $logging_requestid_cb->(@_);
    }
 
    sub logging_requestid_cb {
        my ($class,$cb) = @_;
        $logging_requestid_cb = $cb
            if @_ > 1;
        return $logging_requestid_cb;
    }
}
 
our $_current_request_id;
around 'Future::wrap_cb' => sub {
    my $orig = shift;
    my $cb = $orig->(@_);
    my $reqid = $_current_request_id;
    return sub {
        local $_current_request_id = $reqid;
        _invoke_logging_cb($reqid);
        $cb->(@_);
    };
};
 
has app => (
    is => 'ro',
    isa => CodeRef,
    required => 1,
);
 
has _loop => (
    is => 'rw',
    isa => InstanceOf['IO::Async::Loop'],
    default => sub { IO::Async::Loop->new },
);
 
sub _log {
    my ($env,$level,@msg) = @_;
    if (my $logger = $env->{'psgix.logger'}) {
        $logger->$level(@msg);
    }
}
 
sub psgi_app {
    my ($self) = @_;
    return sub {
        my ($env) = @_;
 
        my $reqid = $env->{'psgix.request_id'};
 
        local $_current_request_id = $reqid;
        _invoke_logging_cb($reqid);
 
        my $given_loop = $env->{'io.async.loop'};
        if ($given_loop && $given_loop != $self->_loop) {
            # the server is running under a different loop, let's 
            # re-initialise everything! 
            _log($env,warn => "Event loop changed, re-initialising everything");
            $self->_loop($given_loop);
        }
 
        $env->{'io.async.loop'} = $self->_loop;
 
        my $f = Future->call($self->app,$env);
 
        return sub {
            my ($responder) = @_;
            $f->on_done(
                sub{
                    $responder->(@_);
                    # I'm not sure why I may have to set it again 
                    # _invoke_logging_cb($reqid); 
                },
            );
            $f->on_fail(
                sub{
                    my ($exc,@details) = @_;
                    my $response = $self->on_app_failure($env,$exc,@details);
                    $responder->($response);
                    # I'm not sure why I may have to set it again 
                    # _invoke_logging_cb($reqid); 
                }
            );
 
            # the C<undef $f> is there capture the Future to prevent 
            # premature collection, and then release it as soon as we don't 
            # need it anymore 
            $f->on_ready(
                sub{
                    if (not $given_loop) {
                        $self->_loop->stop;
                    }
                    undef $f;
                }
            );
 
            # $f may be already ->done if the application returned an 
            # immediate future 
            if (not $given_loop and $f and not $f->is_done) {
                $self->_loop->run;
            }
        }
    }
}
 
sub on_app_failure {
    my ($self,$env,$exc,@details) = @_;
 
    if ($exc->$_can('as_psgi'or $exc->$_can('code')) {
        return Plack::Middleware::HTTPExceptions->new(rethrow=>0)
            ->transform_error($exc,$env);
    else {
        my $message = join ' 'grep {defined$exc,@details;
        _log($env,warn => "Application exception: $message");
        return [
            500,
            ['Content-type' => 'text/plain'],
            [$message],
        ];
    }
}
 
1;