summaryrefslogtreecommitdiff
path: root/lib/Thread/Task/Thread.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Thread/Task/Thread.pm')
-rw-r--r--lib/Thread/Task/Thread.pm131
1 files changed, 131 insertions, 0 deletions
diff --git a/lib/Thread/Task/Thread.pm b/lib/Thread/Task/Thread.pm
new file mode 100644
index 0000000..401992a
--- /dev/null
+++ b/lib/Thread/Task/Thread.pm
@@ -0,0 +1,131 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Thread {
+ use threads;
+ use threads::shared;
+
+ use MooseX::Types::Moose qw(Int);
+ use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET
+ Queue_T);
+
+ use Thread::Task::Exception::Finished;
+
+ use TryCatch;
+ use Thread::Queue;
+
+ my $SEQUENCE : shared = 0;
+ my %WID2TID : shared = ();
+
+ my $SINGLETON;
+
+ method master(Class $class:) {
+ $SINGLETON
+ or $class->new->spawn;
+ }
+
+ method import(Class $class: @rest) {
+ if (@rest && defined $rest[0] && $rest[0] eq ':master') {
+ $class->master;
+ }
+ }
+
+ has wid => (
+ isa => Int,
+ is => 'ro',
+ init_arg => undef,
+ default => sub { ++$SEQUENCE },
+ required => 1,
+ );
+
+ has queue => (
+ isa => Queue_T,
+ is => 'ro',
+ init_arg => undef,
+ default => sub { Thread::Queue->new },
+ required => 1,
+ handles => {
+ _enqueue_msg => 'enqueue',
+ _dequeue_msg => 'dequeue',
+ },
+ );
+
+ has thread => (
+ is => 'ro',
+ lazy_build => 1,
+ required => 1,
+ clearer => '_clear_thread',
+ handles => [qw(join is_running is_joinable is_detached)],
+ );
+
+ method _build_thread() {
+ threads->object( $self->tid );
+ }
+
+ method spawn() {
+ $self->_clear_thread;
+ $WID2TID{$self->wid} =
+ threads->create(
+ sub {
+ $_[0]->run;
+ },
+ $self,
+ )->tid;
+ return $self;
+ }
+
+ method tid() {
+ return $WID2TID{$self->wid};
+ }
+
+ method is_thread() {
+ return $self->tid == threads->self->tid;
+ }
+
+ method send(Str $method,@args) {
+ unless ($self->can($method)) {
+ die "Attempted to send message to non-existant method '$method'";
+ }
+
+ $self->_enqueue_msg([$method,@args]);
+
+ return;
+ }
+
+ method start(@args) {
+ $self->send('start_child',@args);
+ }
+
+ method stop() {
+ $self->thread->detach;
+ $self->send('stop_child');
+ }
+
+ method run() {
+ while (my $message = $self->_dequeue_msg) {
+ next unless ref($message) eq 'ARRAY' && @$message;
+ my $method = shift @$message;
+ next unless $self->can($method);
+
+ try {
+ $self->$method(@$method);
+ }
+ catch (Finished_ET) {
+ last;
+ }
+ }
+ return;
+ }
+
+ method start_child(Thread_T $thread) {
+ $thread->spawn;
+ return;
+ }
+
+ method stop_child() {
+ die Thread::Task::Exception::Finished->new;
+ }
+
+}
+
+1;