From ee56898e1d817d93ca8e9b4fcdaa5b0c55323e8e Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 23 Oct 2016 15:04:48 +0100 Subject: first maybe-working version --- lib/IO/Async/PSGI.pm | 135 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 lib/IO/Async/PSGI.pm (limited to 'lib') diff --git a/lib/IO/Async/PSGI.pm b/lib/IO/Async/PSGI.pm new file mode 100644 index 0000000..99f7d23 --- /dev/null +++ b/lib/IO/Async/PSGI.pm @@ -0,0 +1,135 @@ +package IO::Async::PSGI; +use Moo; +use Types::Standard qw(CodeRef InstanceOf); +use IO::Async::Loop; +use Future; +use HTTP::Status (); +use Safe::Isa; +use Plack::Middleware::HTTPExceptions; + +{ + my $logging_requestid_cb; + sub _invoke_logging_cb { + return unless $logging_requestid_cb; + $logging_requestid_cb->(@_); + } + + sub logging_requestid_cb { + my ($class,$cb) = @_; + $logging_requestid_cb = $cb + if @_ > 1; + return $logging_requestid_cb; + } +} + +our $_current_request_id; +around 'Future::wrap_cb' => sub { + my $orig = shift; + my $cb = $orig->(@_); + my $reqid = $_current_request_id; + return sub { + local $_current_request_id = $reqid; + _invoke_logging_cb($reqid); + $cb->(@_); + }; +}; + +has app => ( + is => 'ro', + isa => CodeRef, + required => 1, +); + +has _loop => ( + is => 'rw', + isa => InstanceOf['IO::Async::Loop'], + default => sub { IO::Async::Loop->new }, +); + +sub _log { + my ($env,$level,@msg) = @_; + if (my $logger = $env->{'psgix.logger'}) { + $logger->$level(@msg); + } +} + +sub psgi_app { + my ($self) = @_; + return sub { + my ($env) = @_; + + my $reqid = $env->{'psgix.request_id'}; + + local $_current_request_id = $reqid; + _invoke_logging_cb($reqid); + + my $given_loop = $env->{'io.async.loop'}; + if ($given_loop && $given_loop != $self->_loop) { + # the server is running under a different loop, let's + # re-initialise everything! + _log($env,warn => "Event loop changed, re-initialising everything"); + $self->_loop($given_loop); + } + + $env->{'io.async.loop'} = $self->_loop; + + my $f = Future->call($self->app,$env); + + return sub { + my ($responder) = @_; + $f->on_done( + sub{ + $responder->(@_); + # I'm not sure why I may have to set it again + # _invoke_logging_cb($reqid); + }, + ); + $f->on_fail( + sub{ + my ($exc,@details) = @_; + my $response = $self->on_app_failure($env,$exc,@details); + $responder->($response); + # I'm not sure why I may have to set it again + # _invoke_logging_cb($reqid); + } + ); + + # the C is there capture the Future to prevent + # premature collection, and then release it as soon as we don't + # need it anymore + $f->on_ready( + sub{ + if (not $given_loop) { + $self->_loop->stop; + } + undef $f; + } + ); + + # $f may be already ->done if the application returned an + # immediate future + if (not $given_loop and $f and not $f->is_done) { + $self->_loop->run; + } + } + } +} + +sub on_app_failure { + my ($self,$env,$exc,@details) = @_; + + if ($exc->$_can('as_psgi') or $exc->$_can('code')) { + return Plack::Middleware::HTTPExceptions->new(rethrow=>0) + ->transform_error($exc,$env); + } else { + my $message = join ' ', grep {defined} $exc,@details; + _log($env,warn => "Application exception: $message"); + return [ + 500, + ['Content-type' => 'text/plain'], + [$message], + ]; + } +} + +1; -- cgit v1.2.3