summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2016-10-23 15:04:48 +0100
committerdakkar <dakkar@thenautilus.net>2016-10-23 15:04:48 +0100
commitee56898e1d817d93ca8e9b4fcdaa5b0c55323e8e (patch)
tree5e412aff686156987a169a22b1df45acc8d81532 /lib
downloadIO-Async-PSGI-ee56898e1d817d93ca8e9b4fcdaa5b0c55323e8e.tar.gz
IO-Async-PSGI-ee56898e1d817d93ca8e9b4fcdaa5b0c55323e8e.tar.bz2
IO-Async-PSGI-ee56898e1d817d93ca8e9b4fcdaa5b0c55323e8e.zip
first maybe-working version
Diffstat (limited to 'lib')
-rw-r--r--lib/IO/Async/PSGI.pm135
1 files changed, 135 insertions, 0 deletions
diff --git a/lib/IO/Async/PSGI.pm b/lib/IO/Async/PSGI.pm
new file mode 100644
index 0000000..99f7d23
--- /dev/null
+++ b/lib/IO/Async/PSGI.pm
@@ -0,0 +1,135 @@
+package IO::Async::PSGI;
+use Moo;
+use Types::Standard qw(CodeRef InstanceOf);
+use IO::Async::Loop;
+use Future;
+use HTTP::Status ();
+use Safe::Isa;
+use Plack::Middleware::HTTPExceptions;
+
+{
+ my $logging_requestid_cb;
+ sub _invoke_logging_cb {
+ return unless $logging_requestid_cb;
+ $logging_requestid_cb->(@_);
+ }
+
+ sub logging_requestid_cb {
+ my ($class,$cb) = @_;
+ $logging_requestid_cb = $cb
+ if @_ > 1;
+ return $logging_requestid_cb;
+ }
+}
+
+our $_current_request_id;
+around 'Future::wrap_cb' => sub {
+ my $orig = shift;
+ my $cb = $orig->(@_);
+ my $reqid = $_current_request_id;
+ return sub {
+ local $_current_request_id = $reqid;
+ _invoke_logging_cb($reqid);
+ $cb->(@_);
+ };
+};
+
+has app => (
+ is => 'ro',
+ isa => CodeRef,
+ required => 1,
+);
+
+has _loop => (
+ is => 'rw',
+ isa => InstanceOf['IO::Async::Loop'],
+ default => sub { IO::Async::Loop->new },
+);
+
+sub _log {
+ my ($env,$level,@msg) = @_;
+ if (my $logger = $env->{'psgix.logger'}) {
+ $logger->$level(@msg);
+ }
+}
+
+sub psgi_app {
+ my ($self) = @_;
+ return sub {
+ my ($env) = @_;
+
+ my $reqid = $env->{'psgix.request_id'};
+
+ local $_current_request_id = $reqid;
+ _invoke_logging_cb($reqid);
+
+ my $given_loop = $env->{'io.async.loop'};
+ if ($given_loop && $given_loop != $self->_loop) {
+ # the server is running under a different loop, let's
+ # re-initialise everything!
+ _log($env,warn => "Event loop changed, re-initialising everything");
+ $self->_loop($given_loop);
+ }
+
+ $env->{'io.async.loop'} = $self->_loop;
+
+ my $f = Future->call($self->app,$env);
+
+ return sub {
+ my ($responder) = @_;
+ $f->on_done(
+ sub{
+ $responder->(@_);
+ # I'm not sure why I may have to set it again
+ # _invoke_logging_cb($reqid);
+ },
+ );
+ $f->on_fail(
+ sub{
+ my ($exc,@details) = @_;
+ my $response = $self->on_app_failure($env,$exc,@details);
+ $responder->($response);
+ # I'm not sure why I may have to set it again
+ # _invoke_logging_cb($reqid);
+ }
+ );
+
+ # the C<undef $f> is there capture the Future to prevent
+ # premature collection, and then release it as soon as we don't
+ # need it anymore
+ $f->on_ready(
+ sub{
+ if (not $given_loop) {
+ $self->_loop->stop;
+ }
+ undef $f;
+ }
+ );
+
+ # $f may be already ->done if the application returned an
+ # immediate future
+ if (not $given_loop and $f and not $f->is_done) {
+ $self->_loop->run;
+ }
+ }
+ }
+}
+
+sub on_app_failure {
+ my ($self,$env,$exc,@details) = @_;
+
+ if ($exc->$_can('as_psgi') or $exc->$_can('code')) {
+ return Plack::Middleware::HTTPExceptions->new(rethrow=>0)
+ ->transform_error($exc,$env);
+ } else {
+ my $message = join ' ', grep {defined} $exc,@details;
+ _log($env,warn => "Application exception: $message");
+ return [
+ 500,
+ ['Content-type' => 'text/plain'],
+ [$message],
+ ];
+ }
+}
+
+1;