use 5.008003; use MooseX::Declare; class Thread::Task::Thread { use threads; use threads::shared; use MooseX::Types::Moose qw(Int); use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET Queue_T); use Thread::Task::Exception::Finished; use TryCatch; use Thread::Queue; my $SEQUENCE : shared = 0; my %WID2TID : shared = (); my $SINGLETON; method master(Class $class:) { $SINGLETON or $class->new->spawn; } method import(Class $class: @rest) { if (@rest && defined $rest[0] && $rest[0] eq ':master') { $class->master; } } has wid => ( isa => Int, is => 'ro', init_arg => undef, default => sub { ++$SEQUENCE }, required => 1, ); has queue => ( isa => Queue_T, is => 'ro', init_arg => undef, default => sub { Thread::Queue->new }, required => 1, handles => { _enqueue_msg => 'enqueue', _dequeue_msg => 'dequeue', }, ); has thread => ( is => 'ro', lazy_build => 1, required => 1, clearer => '_clear_thread', handles => [qw(join is_running is_joinable is_detached)], ); method _build_thread() { threads->object( $self->tid ); } method spawn() { $self->_clear_thread; $WID2TID{$self->wid} = threads->create( sub { $_[0]->run; }, $self, )->tid; return $self; } method tid() { return $WID2TID{$self->wid}; } method is_thread() { return $self->tid == threads->self->tid; } method send(Str $method,@args) { unless ($self->can($method)) { die "Attempted to send message to non-existant method '$method'"; } $self->_enqueue_msg([$method,@args]); return; } method start(@args) { $self->send('start_child',@args); } method stop() { $self->thread->detach; $self->send('stop_child'); } method run() { while (my $message = $self->_dequeue_msg) { next unless ref($message) eq 'ARRAY' && @$message; my $method = shift @$message; next unless $self->can($method); try { $self->$method(@$method); } catch (Finished_ET) { last; } } return; } method start_child(Thread_T $thread) { $thread->spawn; return; } method stop_child() { die Thread::Task::Exception::Finished->new; } } 1;