use 5.008003;
use MooseX::Declare;
class Thread::Task::Thread {
use threads;
use threads::shared;
use MooseX::Types::Moose qw(ClassName Int);
use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET
Queue_T);
use Thread::Task::Exception::Finished;
use TryCatch;
use Thread::Queue;
my $SEQUENCE : shared = 0;
my %WID2TID : shared = ();
my $SINGLETON;
method master(ClassName $class:) {
$SINGLETON
or $class->new->spawn;
}
method import(ClassName $class: @rest) {
if (@rest && defined $rest[0] && $rest[0] eq ':master') {
$class->master;
}
}
has wid => (
isa => Int,
is => 'ro',
init_arg => undef,
default => sub { ++$SEQUENCE },
required => 1,
);
has queue => (
isa => Queue_T,
is => 'ro',
init_arg => undef,
default => sub { Thread::Queue->new },
required => 1,
handles => {
_enqueue_msg => 'enqueue',
_dequeue_msg => 'dequeue',
},
);
has thread => (
is => 'ro',
lazy_build => 1,
required => 1,
clearer => '_clear_thread',
handles => [qw(join is_running is_joinable is_detached)],
);
method _build_thread() {
threads->object( $self->tid );
}
method spawn() {
$self->_clear_thread;
$WID2TID{$self->wid} =
threads->create(
sub {
$_[0]->run;
},
$self,
)->tid;
return $self;
}
method tid() {
return $WID2TID{$self->wid};
}
method is_thread() {
return $self->tid == threads->self->tid;
}
method send(Str $method,@args) {
unless ($self->can($method)) {
die "Attempted to send message to non-existant method '$method'";
}
$self->_enqueue_msg([$method,@args]);
return;
}
method start(@args) {
$self->send('start_child',@args);
}
method stop() {
$self->thread->detach;
$self->send('stop_child');
}
method run() {
while (my $message = $self->_dequeue_msg) {
next unless ref($message) eq 'ARRAY' && @$message;
my $method = shift @$message;
next unless $self->can($method);
try {
$self->$method(@$method);
}
catch (Finished_ET $e) {
last;
}
}
return;
}
method start_child($thread) {
$thread->spawn;
return;
}
method stop_child() {
die Thread::Task::Exception::Finished->new;
}
}
1;