From 6fc388b833ec0e924f663c11089b5c2b373ccf15 Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 11 Jul 2010 20:34:02 +0100 Subject: start --- .gitignore | 6 + Makefile.PL | 26 ++++ lib/Thread/Task.pm | 115 +++++++++++++++++ lib/Thread/Task/Exception.pm | 22 ++++ lib/Thread/Task/Exception/Finished.pm | 9 ++ lib/Thread/Task/Handle.pm | 134 ++++++++++++++++++++ lib/Thread/Task/Manager.pm | 230 ++++++++++++++++++++++++++++++++++ lib/Thread/Task/Role/Task.pm | 54 ++++++++ lib/Thread/Task/Thread.pm | 131 +++++++++++++++++++ lib/Thread/Task/Types.pm | 25 ++++ lib/Thread/Task/Worker.pm | 36 ++++++ 11 files changed, 788 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile.PL create mode 100644 lib/Thread/Task.pm create mode 100644 lib/Thread/Task/Exception.pm create mode 100644 lib/Thread/Task/Exception/Finished.pm create mode 100644 lib/Thread/Task/Handle.pm create mode 100644 lib/Thread/Task/Manager.pm create mode 100644 lib/Thread/Task/Role/Task.pm create mode 100644 lib/Thread/Task/Thread.pm create mode 100644 lib/Thread/Task/Types.pm create mode 100644 lib/Thread/Task/Worker.pm diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6a3638d --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +/Makefile +/Makefile.old +/blib/ +/pm_to_blib +/inc/ +*~ diff --git a/Makefile.PL b/Makefile.PL new file mode 100644 index 0000000..2086bd5 --- /dev/null +++ b/Makefile.PL @@ -0,0 +1,26 @@ +use inc:Module::Install; +use strict; +use warnings; + +name 'Thread-Task'; +license 'perl'; +author 'Gianni Ceccarelli '; +requires 'perl' => '5.008003'; + +requires 'Moose' => '1.00'; +requires 'MooseX::Types' => '0.22'; +requires 'MooseX::Types::Structured' => '0.23'; +requires 'threads' => '1.71'; +requires 'threads::shared' => '1.26'; +requires 'Thread::Queue::Event' => 0; +requires 'namespace::autoclean' => 0; +requires 'MooseX::Declare' => '0.33'; +requires 'MooseX::Singleton' => '0.22'; +requires 'Scalar::Util' => 0; +requires 'TryCatch' => '1.002'; + +test_requires 'Test::Most' => '0.21'; + +no_index 'directory' => qw{ t xt inc }; + +WriteAll; diff --git a/lib/Thread/Task.pm b/lib/Thread/Task.pm new file mode 100644 index 0000000..9966634 --- /dev/null +++ b/lib/Thread/Task.pm @@ -0,0 +1,115 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task { + use MooseX::Types::Moose qw(Object); + use Thread::Task::Types qw(Handle_T); + + require Storable; + + require Thread::Task::Role::Task; + require Thread::Task::Manager; + require Thread::Task::Exception::Finished; + + our $VERSION='0.001'; + + has _owner_taskrev => ( + isa => Int, + is => 'ro', + weak_ref => 1, + required => 0, + predicate => '_has_owner_taskrev', + ); + + has callback => ( + isa => Str, + is => 'ro', + required => 0, + default => 'task_response', + ); + + has handle => ( + isa => Handle_T, + is => 'rw', + required => 0, + predicate => 'running', + clearer => 'clear_handle', + ); + + around BUILDARGS (Class $class,@rest) { + my $params = $class->$orig(@rest); + + if (exists $params->{owner}) { + $params->{_owner_taskrev} = + delete($params->{owner})->task_revision; + } + } + + method BUILD() { + if ($self->_has_owner_taskrev) { + my $owner = $self->owner; + my $callback = $self->callback; + unless ($owner->can($callback)) { + die "Owner can't $callback"; + } + } + } + + method owner() { + Thread::Task::Role::Task->task_owner($self->_owner_taskrev); + } + + method schedule(@args) { + Thread::Task::Manager->instance->schedule($self,@args); + } + + method prepare() { return } + + method run() { + if ($self->_has_owner_taskrev) { + $self->owner or die Thread::Task::Exception::Finished->new(); + } + + return; + } + + method finish() { + if ($self->_has_owner_taskrev) { + my $owner = $self->owner + or die Thread::Task::Exception::Finished->new(); + + my $callback = $self->callback; + + $owner->$callback($self); + } + + return; + } + + method as_string() { + Storable::nfreeze($self); + } + + method from_string(Class $class: Str $serialization) { + my $self=Storable::nthaw($serialization); + my $self_class=$self->meta->name; + unless ($self_class eq $class) { + die "Deserialized as $self_class instead of $class"; + } + return $self; + } + + method _update(Task_T $new_task) { + if ($self->meta ne $new_task->meta) { + die "Can't update between different task classes"; + } + for my $attr ($self->meta->get_all_attributes) { + $attr->set_value($self, + $attr->get_value($new_task) + ); + } + return; + } +} + +1; diff --git a/lib/Thread/Task/Exception.pm b/lib/Thread/Task/Exception.pm new file mode 100644 index 0000000..b2f02f2 --- /dev/null +++ b/lib/Thread/Task/Exception.pm @@ -0,0 +1,22 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Exception { + use MooseX::Types::Moose qw(Str HashRef); + + has 'msg' => ( + isa => Str, + is => 'ro', + required => 0, + predicate => 'has_msg', + ); + + has 'params' => ( + isa => HashRef, + is => 'ro', + required => 0, + predicate => 'has_params', + ); +} + +1; diff --git a/lib/Thread/Task/Exception/Finished.pm b/lib/Thread/Task/Exception/Finished.pm new file mode 100644 index 0000000..9edcc40 --- /dev/null +++ b/lib/Thread/Task/Exception/Finished.pm @@ -0,0 +1,9 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Exception::Finished + extends Thread::Task::Exception { +} + +1; + diff --git a/lib/Thread/Task/Handle.pm b/lib/Thread/Task/Handle.pm new file mode 100644 index 0000000..b7012f4 --- /dev/null +++ b/lib/Thread/Task/Handle.pm @@ -0,0 +1,134 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Handle { + use TryCatch; + + use MooseX::Types::Moose qw(Object Bool Int); + use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET); + use Thread::Task::Exception::Finished; + + require Scalar::Util; + + our $SEQUENCE = 0; + + has hid => ( + isa => Int, + is => 'ro', + default => sub { ++$SEQUENCE }, + init_arg => undef, + ); + + has task => ( + isa => Task_T, + is => 'ro', + required => 1, + ); + + has execption => ( + is => 'ro', + writer => '_set_exception', + predicate => 'has_exception', + init_arg => undef, + ); + + method prepare() { + try { + $self->task->prepare; + } + catch (Finished_ET) { + die $@; + } + catch { + die Thread::Task::Exception::Finished->new(msg=>$@); + } + } + + method finish() { + try { + $self->task->finish; + } + catch (Finished_ET) { + die $@; + } + catch { + die Thread::Task::Exception::Finished->new(msg=>$@); + } + } + + method run() { + try { + $self->task->handle($self); + $self->task->run; + $self->task->clear_handle; + } + catch (Finished_ET) { + } + catch ($e) { + $self->task->clear_handle; + $self->_set_exception($e); + } + + return; + } + + method as_array() { + return [ + $self->hid, + $self->task->meta->name, + $task->as_string, + ]; + } + + method from_array(Class $class: ArrayRef $array) { + my ($hid,$task_class,$task_serialized)=@$array; + Class::MOP::load_class($task_class); + + return $class->new( + hid=>$hid, + task=>Thread::Task->from_string($task_serialized), + ); + } + + method message(@message) { + Thread::Task::Manager->instance->conduit->signal( + Storable::nfreeze([$self->hid,@message]) + ); + } + + method on_message($method,@args) { + unless ($self->task->can($method)) { + return; + } + + try { + $self->task->$method(@args); + } + catch { + return; + } + return; + } + + method started() { + $self->message('__STARTED__'); + } + + method stopped() { + $self->message('__STOPPED__'); + } + + method on_stopped(Task_T $new_task) { + $self->task->_update($new_task); + + try { + $self->task->finish(@args); + } + catch { + return; + } + return; + } +} + +1; diff --git a/lib/Thread/Task/Manager.pm b/lib/Thread/Task/Manager.pm new file mode 100644 index 0000000..e969edc --- /dev/null +++ b/lib/Thread/Task/Manager.pm @@ -0,0 +1,230 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Manager { + use TryCatch; + + use MooseX::Types::Moose qw(Object Bool Int); + use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET); + use MooseX::Singleton; + + use Thread::Task::Handle (); + use Thread::Task::Thread (); + use Thread::Task::Worker (); + + require Storable; + + our $VERSION='0.001'; + + has active => ( + traits => [ 'Bool' ], + isa => Bool, + is => 'ro', + default => 0, + init_arg => undef, + handles => { + '_set_active' => 'set', + '_reset_active' => 'unset', + }, + ); + + has threads => ( + isa => Bool, + is => 'ro', + default => 1, + ); + + has minimum => ( + isa => Int, + is => 'ro', + default => 2, + ); + + has workers => ( + traits => [ 'Array' ], + isa => ArrayRef[Worker_T], + default => sub { [] }, + init_arg => undef, + handles => { + _workers_count => 'count', + _set_worker => 'set', + _get_worker => 'get', + _delete_worker => 'delete', + _workers => 'elements', + }, + ); + + has handles => ( + traits => [ 'Hash' ], + isa => HashRef[Handle_T], + default => sub { {} }, + init_arg => undef, + handles => { + _handles_count => 'count', + _set_handle => 'set', + _get_handle => 'get', + _delete_handle => 'delete', + _handles => 'elements', + }, + ); + + has running => ( + traits => [ 'Hash' ], + isa => HashRef[Handle_T], + default => sub { {} }, + init_arg => undef, + handles => { + _running_count => 'count', + _set_running => 'set', + _get_running => 'get', + _delete_running => 'delete', + _running => 'elements', + }, + ); + + has queue => ( + traits => [ 'Array' ], + isa => ArrayRef[Task_T], + default => sub { [] }, + init_arg => undef, + handles => { + _enqueue_task => 'push', + _dequeue_task => 'shift', + _queue_size => 'count', + }, + ); + + has conduit => ( + is => 'ro', + does => Conduit_T, + ); + + method BUILD() { + $self->conduit->conduit_init($self); + } + + method start() { + if ($self->threads) { + for (0 .. $self->minimum -1 ) { + $self->start_thread($_); + } + } + + $self->_set_active; + $self->step; + } + + method start_thread(Int $worker_id) { + my $master = Thread::Task::Thread->master; # init the master thread + my $worker = Thread::Task::Worker->new->spawn; + $self->_set_worker($worker_id,$worker); + } + + method stop() { + $self->_reset_active; + if ($self->threads) { + for (0 .. $self->_workers_count -1) { + $self->stop_thread($_); + } + Thread::Task::Thread->master->stop; + } + } + + method stop_thread(Int $worker_id) { + $self->_delete_worker($worker_id)->stop; + } + + method next_thread() { + for my $worker ($self->_workers) { + next if $worker->has_hid; + return $worker; + } + return; + } + + method schedule(Task_T $task) { + $self->_enqueue_task($task); + $self->step; + } + + method step() { + return unless $self->active; + return unless $self->_queue_size; + + if ($self->threads) { + unless ($self->minimum > $self->_handles_count) { + return; + } + } + + my $task = $self->_dequeue_task; + my $handle = Thread::Task::Handle->new(task => $task); + my $hid = $handle->hid; + + try { + $handle->prepare(); + } + catch (Finished_ET $e) { + undef $handle; + return $self->step; + } + + $self->_set_handle($hid,$handle); + + my $worker = $self->next_thread or return; + $worker->hid($hid); + $handle->wid($worker->wid); + + $worker->send(task => $handle->as_array); + + return $self->step; + } + + method on_signal(Str $frozen) { + my $message; + try { + $message = Storable::nthaw($frozen); + } + catch { + return; + } + unless (ref $message eq 'ARRAY') { + return; + } + + my $hid = shift @$message; + my $handle = $self->_get_handle($hid) or return; + + my $method = shift @$message; + + if ($method eq '__STARTED__') { + $self->_set_running($hid,$handle); + return; + } + + unless ($self->_get_running($hid)) { + return; + } + + if ($method eq '__STOPPED__') { + $self->_delete_running($hid); + + for my $worker ($self->_workers) { + next unless $worker->has_hid; + next unless $worker->hid == $hid; + $worker->clear_hid; + last; + } + + $handle->on_stopped(@$message); + + $self->_delete_handle($hid); + + return $self->step; + } + + return $handle->on_message($method,@$message); + } +} + +1; diff --git a/lib/Thread/Task/Role/Task.pm b/lib/Thread/Task/Role/Task.pm new file mode 100644 index 0000000..61f9cbe --- /dev/null +++ b/lib/Thread/Task/Role/Task.pm @@ -0,0 +1,54 @@ +use 5.008003; +use MooseX::Declare; + +role Thread::Task::Role::Task { + use MooseX::Types::Moose qw(Int); + require Scalar::Util; + + my $SEQUENCE = 0; + my %INDEX = 0; + + has task_revision => ( + isa => Int, + is => 'ro', + lazy_build => 1, + predicate => '_has_taskrev', + writer => '_set_taskrev', + ); + + method _build_task_revision() { + my $rev = ++$SEQUENCE; + $INDEX{$rev} = $self; + Scalar::Util::weaken($INDEX{$rev}); + + return $rev; + } + + method task_reset() { + if ($self->_has_taskrev) { + delete $INDEX{$self->task_revision}; + } + $self->_set_taskrev(++$SEQUENCE); + return; + } + + method task_owner($invocant: Int $task_rev) { + return $INDEX{$task_rev}; + } + + method task_request(:$task, %params) { + Class::MOP::load_class($task); + $task->isa('Thread::Task') + or die "Bad class $task, not a Thread::Task"; + $class->new(owner=>$self,%params)->schedule; + } + + method task_response(Task_T $task) { + my $class = $self->meta->name; + my $task_class = $task->meta->name; + + die "Unhandled task_response for $class (recieved $task_class)"; + } +} + +1; diff --git a/lib/Thread/Task/Thread.pm b/lib/Thread/Task/Thread.pm new file mode 100644 index 0000000..401992a --- /dev/null +++ b/lib/Thread/Task/Thread.pm @@ -0,0 +1,131 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Thread { + use threads; + use threads::shared; + + use MooseX::Types::Moose qw(Int); + use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET + Queue_T); + + use Thread::Task::Exception::Finished; + + use TryCatch; + use Thread::Queue; + + my $SEQUENCE : shared = 0; + my %WID2TID : shared = (); + + my $SINGLETON; + + method master(Class $class:) { + $SINGLETON + or $class->new->spawn; + } + + method import(Class $class: @rest) { + if (@rest && defined $rest[0] && $rest[0] eq ':master') { + $class->master; + } + } + + has wid => ( + isa => Int, + is => 'ro', + init_arg => undef, + default => sub { ++$SEQUENCE }, + required => 1, + ); + + has queue => ( + isa => Queue_T, + is => 'ro', + init_arg => undef, + default => sub { Thread::Queue->new }, + required => 1, + handles => { + _enqueue_msg => 'enqueue', + _dequeue_msg => 'dequeue', + }, + ); + + has thread => ( + is => 'ro', + lazy_build => 1, + required => 1, + clearer => '_clear_thread', + handles => [qw(join is_running is_joinable is_detached)], + ); + + method _build_thread() { + threads->object( $self->tid ); + } + + method spawn() { + $self->_clear_thread; + $WID2TID{$self->wid} = + threads->create( + sub { + $_[0]->run; + }, + $self, + )->tid; + return $self; + } + + method tid() { + return $WID2TID{$self->wid}; + } + + method is_thread() { + return $self->tid == threads->self->tid; + } + + method send(Str $method,@args) { + unless ($self->can($method)) { + die "Attempted to send message to non-existant method '$method'"; + } + + $self->_enqueue_msg([$method,@args]); + + return; + } + + method start(@args) { + $self->send('start_child',@args); + } + + method stop() { + $self->thread->detach; + $self->send('stop_child'); + } + + method run() { + while (my $message = $self->_dequeue_msg) { + next unless ref($message) eq 'ARRAY' && @$message; + my $method = shift @$message; + next unless $self->can($method); + + try { + $self->$method(@$method); + } + catch (Finished_ET) { + last; + } + } + return; + } + + method start_child(Thread_T $thread) { + $thread->spawn; + return; + } + + method stop_child() { + die Thread::Task::Exception::Finished->new; + } + +} + +1; diff --git a/lib/Thread/Task/Types.pm b/lib/Thread/Task/Types.pm new file mode 100644 index 0000000..8a4d044 --- /dev/null +++ b/lib/Thread/Task/Types.pm @@ -0,0 +1,25 @@ +package Thread::Task::Types; +use MooseX::Types + -declare => + [qw( + Handle_T + Worker_T + Task_T + Conduit_T + Finished_ET + Queue_T + Queue_Ev_T + )]; + +class_type Handle_T, { class => 'Thread::Task::Handle' }; +class_type Worker_T, { class => 'Thread::Task::Worker' }; +class_type Task_T, { class => 'Thread::Task' }; + +role_type Conduit_T, { role => 'Thread::Task::Role::Conduit' }; + +class_type Finished_ET, { class => 'Thread::Task::Exception::Finished' }; + +class_type Queue_T, { class => 'Thread::Queue' }; +class_type Queue_Ev_T, { class => 'Thread::Queue::Event' }; + +1; diff --git a/lib/Thread/Task/Worker.pm b/lib/Thread/Task/Worker.pm new file mode 100644 index 0000000..f0ec3e6 --- /dev/null +++ b/lib/Thread/Task/Worker.pm @@ -0,0 +1,36 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Worker extends Thread::Task::Thread { + use TryCatch; + + use MooseX::Types::Moose qw(Object Bool Int ArrayRef); + use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET); + + has hid => ( + isa => Int, + is => 'rw', + predicate => 'has_hid', + clearer => 'clear_hid', + required => 0, + ); + + method task(ArrayRef $task_array) { + require Thread::Task::Handle; + + my $handle = Thread::Task::Handle->from_array($task_array); + + try { + $handle->started; + $handle->run; + $handle->stopped; + } + catch { + # trace + } + return; + } + +} + +1; -- cgit v1.2.3