From 6fc388b833ec0e924f663c11089b5c2b373ccf15 Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 11 Jul 2010 20:34:02 +0100 Subject: start --- lib/Thread/Task/Manager.pm | 230 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 lib/Thread/Task/Manager.pm (limited to 'lib/Thread/Task/Manager.pm') diff --git a/lib/Thread/Task/Manager.pm b/lib/Thread/Task/Manager.pm new file mode 100644 index 0000000..e969edc --- /dev/null +++ b/lib/Thread/Task/Manager.pm @@ -0,0 +1,230 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Manager { + use TryCatch; + + use MooseX::Types::Moose qw(Object Bool Int); + use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET); + use MooseX::Singleton; + + use Thread::Task::Handle (); + use Thread::Task::Thread (); + use Thread::Task::Worker (); + + require Storable; + + our $VERSION='0.001'; + + has active => ( + traits => [ 'Bool' ], + isa => Bool, + is => 'ro', + default => 0, + init_arg => undef, + handles => { + '_set_active' => 'set', + '_reset_active' => 'unset', + }, + ); + + has threads => ( + isa => Bool, + is => 'ro', + default => 1, + ); + + has minimum => ( + isa => Int, + is => 'ro', + default => 2, + ); + + has workers => ( + traits => [ 'Array' ], + isa => ArrayRef[Worker_T], + default => sub { [] }, + init_arg => undef, + handles => { + _workers_count => 'count', + _set_worker => 'set', + _get_worker => 'get', + _delete_worker => 'delete', + _workers => 'elements', + }, + ); + + has handles => ( + traits => [ 'Hash' ], + isa => HashRef[Handle_T], + default => sub { {} }, + init_arg => undef, + handles => { + _handles_count => 'count', + _set_handle => 'set', + _get_handle => 'get', + _delete_handle => 'delete', + _handles => 'elements', + }, + ); + + has running => ( + traits => [ 'Hash' ], + isa => HashRef[Handle_T], + default => sub { {} }, + init_arg => undef, + handles => { + _running_count => 'count', + _set_running => 'set', + _get_running => 'get', + _delete_running => 'delete', + _running => 'elements', + }, + ); + + has queue => ( + traits => [ 'Array' ], + isa => ArrayRef[Task_T], + default => sub { [] }, + init_arg => undef, + handles => { + _enqueue_task => 'push', + _dequeue_task => 'shift', + _queue_size => 'count', + }, + ); + + has conduit => ( + is => 'ro', + does => Conduit_T, + ); + + method BUILD() { + $self->conduit->conduit_init($self); + } + + method start() { + if ($self->threads) { + for (0 .. $self->minimum -1 ) { + $self->start_thread($_); + } + } + + $self->_set_active; + $self->step; + } + + method start_thread(Int $worker_id) { + my $master = Thread::Task::Thread->master; # init the master thread + my $worker = Thread::Task::Worker->new->spawn; + $self->_set_worker($worker_id,$worker); + } + + method stop() { + $self->_reset_active; + if ($self->threads) { + for (0 .. $self->_workers_count -1) { + $self->stop_thread($_); + } + Thread::Task::Thread->master->stop; + } + } + + method stop_thread(Int $worker_id) { + $self->_delete_worker($worker_id)->stop; + } + + method next_thread() { + for my $worker ($self->_workers) { + next if $worker->has_hid; + return $worker; + } + return; + } + + method schedule(Task_T $task) { + $self->_enqueue_task($task); + $self->step; + } + + method step() { + return unless $self->active; + return unless $self->_queue_size; + + if ($self->threads) { + unless ($self->minimum > $self->_handles_count) { + return; + } + } + + my $task = $self->_dequeue_task; + my $handle = Thread::Task::Handle->new(task => $task); + my $hid = $handle->hid; + + try { + $handle->prepare(); + } + catch (Finished_ET $e) { + undef $handle; + return $self->step; + } + + $self->_set_handle($hid,$handle); + + my $worker = $self->next_thread or return; + $worker->hid($hid); + $handle->wid($worker->wid); + + $worker->send(task => $handle->as_array); + + return $self->step; + } + + method on_signal(Str $frozen) { + my $message; + try { + $message = Storable::nthaw($frozen); + } + catch { + return; + } + unless (ref $message eq 'ARRAY') { + return; + } + + my $hid = shift @$message; + my $handle = $self->_get_handle($hid) or return; + + my $method = shift @$message; + + if ($method eq '__STARTED__') { + $self->_set_running($hid,$handle); + return; + } + + unless ($self->_get_running($hid)) { + return; + } + + if ($method eq '__STOPPED__') { + $self->_delete_running($hid); + + for my $worker ($self->_workers) { + next unless $worker->has_hid; + next unless $worker->hid == $hid; + $worker->clear_hid; + last; + } + + $handle->on_stopped(@$message); + + $self->_delete_handle($hid); + + return $self->step; + } + + return $handle->on_message($method,@$message); + } +} + +1; -- cgit v1.2.3