use 5.008003;
use MooseX::Declare;
class Thread::Task::Manager {
use TryCatch;
use MooseX::Types::Moose qw(Object Bool Int ArrayRef HashRef);
use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET);
use MooseX::Singleton;
use Thread::Task::Handle ();
use Thread::Task::Thread ();
use Thread::Task::Worker ();
require Storable;
our $VERSION='0.001';
has active => (
traits => [ 'Bool' ],
isa => Bool,
is => 'ro',
default => 0,
init_arg => undef,
handles => {
'_set_active' => 'set',
'_reset_active' => 'unset',
},
);
has threads => (
isa => Bool,
is => 'ro',
default => 1,
);
has minimum => (
isa => Int,
is => 'ro',
default => 2,
);
has workers => (
traits => [ 'Array' ],
isa => ArrayRef[Worker_T],
default => sub { [] },
init_arg => undef,
handles => {
_workers_count => 'count',
_set_worker => 'set',
_get_worker => 'get',
_delete_worker => 'delete',
_workers => 'elements',
},
);
has handles => (
traits => [ 'Hash' ],
isa => HashRef[Handle_T],
default => sub { {} },
init_arg => undef,
handles => {
_handles_count => 'count',
_set_handle => 'set',
_get_handle => 'get',
_delete_handle => 'delete',
_handles => 'elements',
},
);
has running => (
traits => [ 'Hash' ],
isa => HashRef[Handle_T],
default => sub { {} },
init_arg => undef,
handles => {
_running_count => 'count',
_set_running => 'set',
_get_running => 'get',
_delete_running => 'delete',
_running => 'elements',
},
);
has queue => (
traits => [ 'Array' ],
isa => ArrayRef[Task_T],
default => sub { [] },
init_arg => undef,
handles => {
_enqueue_task => 'push',
_dequeue_task => 'shift',
_queue_size => 'count',
},
);
has conduit => (
is => 'ro',
does => Conduit_T,
);
method BUILD() {
$self->conduit->conduit_init($self);
}
method start() {
if ($self->threads) {
for (0 .. $self->minimum -1 ) {
$self->start_thread($_);
}
}
$self->_set_active;
$self->step;
}
method start_thread(Int $worker_id) {
my $master = Thread::Task::Thread->master;
my $worker = Thread::Task::Worker->new->spawn;
$self->_set_worker($worker_id,$worker);
}
method stop() {
$self->_reset_active;
if ($self->threads) {
for (0 .. $self->_workers_count -1) {
$self->stop_thread($_);
}
Thread::Task::Thread->master->stop;
}
}
method stop_thread(Int $worker_id) {
$self->_delete_worker($worker_id)->stop;
}
method next_thread() {
for my $worker ($self->_workers) {
next if $worker->has_hid;
return $worker;
}
return;
}
method schedule(Task_T $task) {
$self->_enqueue_task($task);
$self->step;
}
method step() {
return unless $self->active;
return unless $self->_queue_size;
if ($self->threads) {
unless ($self->minimum > $self->_handles_count) {
return;
}
}
my $task = $self->_dequeue_task;
my $handle = Thread::Task::Handle->new(task => $task);
my $hid = $handle->hid;
try {
$handle->prepare();
}
catch (Finished_ET $e) {
undef $handle;
return $self->step;
}
$self->_set_handle($hid,$handle);
my $worker = $self->next_thread or return;
$worker->hid($hid);
$handle->wid($worker->wid);
$worker->send(task => $handle->as_array);
return $self->step;
}
method on_signal(Str $frozen) {
my $message;
try {
$message = Storable::thaw($frozen);
}
catch {
return;
}
unless (ref $message eq 'ARRAY') {
return;
}
my $hid = shift @$message;
my $handle = $self->_get_handle($hid) or return;
my $method = shift @$message;
if ($method eq '__STARTED__') {
$self->_set_running($hid,$handle);
return;
}
unless ($self->_get_running($hid)) {
return;
}
if ($method eq '__STOPPED__') {
$self->_delete_running($hid);
for my $worker ($self->_workers) {
next unless $worker->has_hid;
next unless $worker->hid == $hid;
$worker->clear_hid;
last;
}
$handle->on_stopped(@$message);
$self->_delete_handle($hid);
return $self->step;
}
return $handle->on_message($method,@$message);
}
}
1;