use 5.008003; use MooseX::Declare; class Thread::Task::Manager { use TryCatch; use MooseX::Types::Moose qw(Object Bool Int ArrayRef HashRef); use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET); use MooseX::Singleton; use Thread::Task::Handle (); use Thread::Task::Thread (); use Thread::Task::Worker (); require Storable; our $VERSION='0.001'; has active => ( traits => [ 'Bool' ], isa => Bool, is => 'ro', default => 0, init_arg => undef, handles => { '_set_active' => 'set', '_reset_active' => 'unset', }, ); has threads => ( isa => Bool, is => 'ro', default => 1, ); has minimum => ( isa => Int, is => 'ro', default => 2, ); has workers => ( traits => [ 'Array' ], isa => ArrayRef[Worker_T], default => sub { [] }, init_arg => undef, handles => { _workers_count => 'count', _set_worker => 'set', _get_worker => 'get', _workers => 'elements', }, ); has handles => ( traits => [ 'Hash' ], isa => HashRef[Handle_T], default => sub { {} }, init_arg => undef, handles => { _handles_count => 'count', _set_handle => 'set', _get_handle => 'get', _delete_handle => 'delete', }, ); has running => ( traits => [ 'Hash' ], isa => HashRef[Handle_T], default => sub { {} }, init_arg => undef, handles => { _set_running => 'set', _get_running => 'get', _delete_running => 'delete', }, ); has queue => ( traits => [ 'Array' ], isa => ArrayRef[Task_T], default => sub { [] }, init_arg => undef, handles => { _enqueue_task => 'push', _dequeue_task => 'shift', _queue_size => 'count', }, ); has conduit => ( is => 'ro', does => Conduit_T, ); method BUILD(HashRef $params) { $self->conduit->conduit_init($self); } method start() { if ($self->threads) { for (0 .. $self->minimum -1 ) { $self->start_thread($_); } } $self->_set_active; $self->step; } method start_thread(Int $worker_id) { my $master = Thread::Task::Thread->master; # init the master thread my $worker = Thread::Task::Worker->new->spawn; $self->_set_worker($worker_id,$worker); } method stop() { $self->_reset_active; if ($self->threads) { for (0 .. $self->_workers_count -1) { $self->stop_thread($_); } Thread::Task::Thread->master->stop; } } method stop_thread(Int $worker_id) { my $worker=$self->_get_worker($worker_id); $worker->stop; } method next_thread() { for my $worker ($self->_workers) { next if $worker->has_hid; return $worker; } return; } method schedule(Task_T $task) { $self->_enqueue_task($task); $self->step; } method step() { return unless $self->active; return unless $self->_queue_size; if ($self->threads) { unless ($self->minimum > $self->_handles_count) { return; } } my $task = $self->_dequeue_task; my $handle = Thread::Task::Handle->new(task => $task); my $hid = $handle->hid; try { $handle->prepare(); } catch (Finished_ET $e) { undef $handle; return $self->step; } $self->_set_handle($hid,$handle); my $worker = $self->next_thread or return; $worker->hid($hid); #$handle->wid($worker->wid); $worker->send(task => $handle->as_array); return $self->step; } method on_signal(Str $frozen) { my $message; try { $message = Storable::thaw($frozen); } catch { return; } unless (ref $message eq 'ARRAY') { return; } my $hid = shift @$message; my $handle = $self->_get_handle($hid) or return; my $method = shift @$message; if ($method eq '__STARTED__') { $self->_set_running($hid,$handle); return; } unless ($self->_get_running($hid)) { return; } if ($method eq '__STOPPED__') { $self->_delete_running($hid); for my $worker ($self->_workers) { next unless $worker->has_hid; next unless $worker->hid == $hid; $worker->clear_hid; last; } $handle->on_stopped(@$message); $self->_delete_handle($hid); return $self->step; } return $handle->on_message($method,@$message); } } 1;