From 6fc388b833ec0e924f663c11089b5c2b373ccf15 Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 11 Jul 2010 20:34:02 +0100 Subject: start --- lib/Thread/Task/Handle.pm | 134 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 lib/Thread/Task/Handle.pm (limited to 'lib/Thread/Task/Handle.pm') diff --git a/lib/Thread/Task/Handle.pm b/lib/Thread/Task/Handle.pm new file mode 100644 index 0000000..b7012f4 --- /dev/null +++ b/lib/Thread/Task/Handle.pm @@ -0,0 +1,134 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Handle { + use TryCatch; + + use MooseX::Types::Moose qw(Object Bool Int); + use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET); + use Thread::Task::Exception::Finished; + + require Scalar::Util; + + our $SEQUENCE = 0; + + has hid => ( + isa => Int, + is => 'ro', + default => sub { ++$SEQUENCE }, + init_arg => undef, + ); + + has task => ( + isa => Task_T, + is => 'ro', + required => 1, + ); + + has execption => ( + is => 'ro', + writer => '_set_exception', + predicate => 'has_exception', + init_arg => undef, + ); + + method prepare() { + try { + $self->task->prepare; + } + catch (Finished_ET) { + die $@; + } + catch { + die Thread::Task::Exception::Finished->new(msg=>$@); + } + } + + method finish() { + try { + $self->task->finish; + } + catch (Finished_ET) { + die $@; + } + catch { + die Thread::Task::Exception::Finished->new(msg=>$@); + } + } + + method run() { + try { + $self->task->handle($self); + $self->task->run; + $self->task->clear_handle; + } + catch (Finished_ET) { + } + catch ($e) { + $self->task->clear_handle; + $self->_set_exception($e); + } + + return; + } + + method as_array() { + return [ + $self->hid, + $self->task->meta->name, + $task->as_string, + ]; + } + + method from_array(Class $class: ArrayRef $array) { + my ($hid,$task_class,$task_serialized)=@$array; + Class::MOP::load_class($task_class); + + return $class->new( + hid=>$hid, + task=>Thread::Task->from_string($task_serialized), + ); + } + + method message(@message) { + Thread::Task::Manager->instance->conduit->signal( + Storable::nfreeze([$self->hid,@message]) + ); + } + + method on_message($method,@args) { + unless ($self->task->can($method)) { + return; + } + + try { + $self->task->$method(@args); + } + catch { + return; + } + return; + } + + method started() { + $self->message('__STARTED__'); + } + + method stopped() { + $self->message('__STOPPED__'); + } + + method on_stopped(Task_T $new_task) { + $self->task->_update($new_task); + + try { + $self->task->finish(@args); + } + catch { + return; + } + return; + } +} + +1; -- cgit v1.2.3