From 6fc388b833ec0e924f663c11089b5c2b373ccf15 Mon Sep 17 00:00:00 2001 From: dakkar Date: Sun, 11 Jul 2010 20:34:02 +0100 Subject: start --- lib/Thread/Task/Thread.pm | 131 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 lib/Thread/Task/Thread.pm (limited to 'lib/Thread/Task/Thread.pm') diff --git a/lib/Thread/Task/Thread.pm b/lib/Thread/Task/Thread.pm new file mode 100644 index 0000000..401992a --- /dev/null +++ b/lib/Thread/Task/Thread.pm @@ -0,0 +1,131 @@ +use 5.008003; +use MooseX::Declare; + +class Thread::Task::Thread { + use threads; + use threads::shared; + + use MooseX::Types::Moose qw(Int); + use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET + Queue_T); + + use Thread::Task::Exception::Finished; + + use TryCatch; + use Thread::Queue; + + my $SEQUENCE : shared = 0; + my %WID2TID : shared = (); + + my $SINGLETON; + + method master(Class $class:) { + $SINGLETON + or $class->new->spawn; + } + + method import(Class $class: @rest) { + if (@rest && defined $rest[0] && $rest[0] eq ':master') { + $class->master; + } + } + + has wid => ( + isa => Int, + is => 'ro', + init_arg => undef, + default => sub { ++$SEQUENCE }, + required => 1, + ); + + has queue => ( + isa => Queue_T, + is => 'ro', + init_arg => undef, + default => sub { Thread::Queue->new }, + required => 1, + handles => { + _enqueue_msg => 'enqueue', + _dequeue_msg => 'dequeue', + }, + ); + + has thread => ( + is => 'ro', + lazy_build => 1, + required => 1, + clearer => '_clear_thread', + handles => [qw(join is_running is_joinable is_detached)], + ); + + method _build_thread() { + threads->object( $self->tid ); + } + + method spawn() { + $self->_clear_thread; + $WID2TID{$self->wid} = + threads->create( + sub { + $_[0]->run; + }, + $self, + )->tid; + return $self; + } + + method tid() { + return $WID2TID{$self->wid}; + } + + method is_thread() { + return $self->tid == threads->self->tid; + } + + method send(Str $method,@args) { + unless ($self->can($method)) { + die "Attempted to send message to non-existant method '$method'"; + } + + $self->_enqueue_msg([$method,@args]); + + return; + } + + method start(@args) { + $self->send('start_child',@args); + } + + method stop() { + $self->thread->detach; + $self->send('stop_child'); + } + + method run() { + while (my $message = $self->_dequeue_msg) { + next unless ref($message) eq 'ARRAY' && @$message; + my $method = shift @$message; + next unless $self->can($method); + + try { + $self->$method(@$method); + } + catch (Finished_ET) { + last; + } + } + return; + } + + method start_child(Thread_T $thread) { + $thread->spawn; + return; + } + + method stop_child() { + die Thread::Task::Exception::Finished->new; + } + +} + +1; -- cgit v1.2.3