summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2010-07-11 20:34:02 +0100
committerdakkar <dakkar@thenautilus.net>2010-07-11 20:34:02 +0100
commit6fc388b833ec0e924f663c11089b5c2b373ccf15 (patch)
treec9e2956ad06c7ef0d8b9e888e36fd88222da056d
downloadThread-Task-6fc388b833ec0e924f663c11089b5c2b373ccf15.tar.gz
Thread-Task-6fc388b833ec0e924f663c11089b5c2b373ccf15.tar.bz2
Thread-Task-6fc388b833ec0e924f663c11089b5c2b373ccf15.zip
start
-rw-r--r--.gitignore6
-rw-r--r--Makefile.PL26
-rw-r--r--lib/Thread/Task.pm115
-rw-r--r--lib/Thread/Task/Exception.pm22
-rw-r--r--lib/Thread/Task/Exception/Finished.pm9
-rw-r--r--lib/Thread/Task/Handle.pm134
-rw-r--r--lib/Thread/Task/Manager.pm230
-rw-r--r--lib/Thread/Task/Role/Task.pm54
-rw-r--r--lib/Thread/Task/Thread.pm131
-rw-r--r--lib/Thread/Task/Types.pm25
-rw-r--r--lib/Thread/Task/Worker.pm36
11 files changed, 788 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6a3638d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+/Makefile
+/Makefile.old
+/blib/
+/pm_to_blib
+/inc/
+*~
diff --git a/Makefile.PL b/Makefile.PL
new file mode 100644
index 0000000..2086bd5
--- /dev/null
+++ b/Makefile.PL
@@ -0,0 +1,26 @@
+use inc:Module::Install;
+use strict;
+use warnings;
+
+name 'Thread-Task';
+license 'perl';
+author 'Gianni Ceccarelli <dakkar@thenautilus.net>';
+requires 'perl' => '5.008003';
+
+requires 'Moose' => '1.00';
+requires 'MooseX::Types' => '0.22';
+requires 'MooseX::Types::Structured' => '0.23';
+requires 'threads' => '1.71';
+requires 'threads::shared' => '1.26';
+requires 'Thread::Queue::Event' => 0;
+requires 'namespace::autoclean' => 0;
+requires 'MooseX::Declare' => '0.33';
+requires 'MooseX::Singleton' => '0.22';
+requires 'Scalar::Util' => 0;
+requires 'TryCatch' => '1.002';
+
+test_requires 'Test::Most' => '0.21';
+
+no_index 'directory' => qw{ t xt inc };
+
+WriteAll;
diff --git a/lib/Thread/Task.pm b/lib/Thread/Task.pm
new file mode 100644
index 0000000..9966634
--- /dev/null
+++ b/lib/Thread/Task.pm
@@ -0,0 +1,115 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task {
+ use MooseX::Types::Moose qw(Object);
+ use Thread::Task::Types qw(Handle_T);
+
+ require Storable;
+
+ require Thread::Task::Role::Task;
+ require Thread::Task::Manager;
+ require Thread::Task::Exception::Finished;
+
+ our $VERSION='0.001';
+
+ has _owner_taskrev => (
+ isa => Int,
+ is => 'ro',
+ weak_ref => 1,
+ required => 0,
+ predicate => '_has_owner_taskrev',
+ );
+
+ has callback => (
+ isa => Str,
+ is => 'ro',
+ required => 0,
+ default => 'task_response',
+ );
+
+ has handle => (
+ isa => Handle_T,
+ is => 'rw',
+ required => 0,
+ predicate => 'running',
+ clearer => 'clear_handle',
+ );
+
+ around BUILDARGS (Class $class,@rest) {
+ my $params = $class->$orig(@rest);
+
+ if (exists $params->{owner}) {
+ $params->{_owner_taskrev} =
+ delete($params->{owner})->task_revision;
+ }
+ }
+
+ method BUILD() {
+ if ($self->_has_owner_taskrev) {
+ my $owner = $self->owner;
+ my $callback = $self->callback;
+ unless ($owner->can($callback)) {
+ die "Owner can't $callback";
+ }
+ }
+ }
+
+ method owner() {
+ Thread::Task::Role::Task->task_owner($self->_owner_taskrev);
+ }
+
+ method schedule(@args) {
+ Thread::Task::Manager->instance->schedule($self,@args);
+ }
+
+ method prepare() { return }
+
+ method run() {
+ if ($self->_has_owner_taskrev) {
+ $self->owner or die Thread::Task::Exception::Finished->new();
+ }
+
+ return;
+ }
+
+ method finish() {
+ if ($self->_has_owner_taskrev) {
+ my $owner = $self->owner
+ or die Thread::Task::Exception::Finished->new();
+
+ my $callback = $self->callback;
+
+ $owner->$callback($self);
+ }
+
+ return;
+ }
+
+ method as_string() {
+ Storable::nfreeze($self);
+ }
+
+ method from_string(Class $class: Str $serialization) {
+ my $self=Storable::nthaw($serialization);
+ my $self_class=$self->meta->name;
+ unless ($self_class eq $class) {
+ die "Deserialized as $self_class instead of $class";
+ }
+ return $self;
+ }
+
+ method _update(Task_T $new_task) {
+ if ($self->meta ne $new_task->meta) {
+ die "Can't update between different task classes";
+ }
+ for my $attr ($self->meta->get_all_attributes) {
+ $attr->set_value($self,
+ $attr->get_value($new_task)
+ );
+ }
+ return;
+ }
+}
+
+1;
diff --git a/lib/Thread/Task/Exception.pm b/lib/Thread/Task/Exception.pm
new file mode 100644
index 0000000..b2f02f2
--- /dev/null
+++ b/lib/Thread/Task/Exception.pm
@@ -0,0 +1,22 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Exception {
+ use MooseX::Types::Moose qw(Str HashRef);
+
+ has 'msg' => (
+ isa => Str,
+ is => 'ro',
+ required => 0,
+ predicate => 'has_msg',
+ );
+
+ has 'params' => (
+ isa => HashRef,
+ is => 'ro',
+ required => 0,
+ predicate => 'has_params',
+ );
+}
+
+1;
diff --git a/lib/Thread/Task/Exception/Finished.pm b/lib/Thread/Task/Exception/Finished.pm
new file mode 100644
index 0000000..9edcc40
--- /dev/null
+++ b/lib/Thread/Task/Exception/Finished.pm
@@ -0,0 +1,9 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Exception::Finished
+ extends Thread::Task::Exception {
+}
+
+1;
+
diff --git a/lib/Thread/Task/Handle.pm b/lib/Thread/Task/Handle.pm
new file mode 100644
index 0000000..b7012f4
--- /dev/null
+++ b/lib/Thread/Task/Handle.pm
@@ -0,0 +1,134 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Handle {
+ use TryCatch;
+
+ use MooseX::Types::Moose qw(Object Bool Int);
+ use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET);
+ use Thread::Task::Exception::Finished;
+
+ require Scalar::Util;
+
+ our $SEQUENCE = 0;
+
+ has hid => (
+ isa => Int,
+ is => 'ro',
+ default => sub { ++$SEQUENCE },
+ init_arg => undef,
+ );
+
+ has task => (
+ isa => Task_T,
+ is => 'ro',
+ required => 1,
+ );
+
+ has execption => (
+ is => 'ro',
+ writer => '_set_exception',
+ predicate => 'has_exception',
+ init_arg => undef,
+ );
+
+ method prepare() {
+ try {
+ $self->task->prepare;
+ }
+ catch (Finished_ET) {
+ die $@;
+ }
+ catch {
+ die Thread::Task::Exception::Finished->new(msg=>$@);
+ }
+ }
+
+ method finish() {
+ try {
+ $self->task->finish;
+ }
+ catch (Finished_ET) {
+ die $@;
+ }
+ catch {
+ die Thread::Task::Exception::Finished->new(msg=>$@);
+ }
+ }
+
+ method run() {
+ try {
+ $self->task->handle($self);
+ $self->task->run;
+ $self->task->clear_handle;
+ }
+ catch (Finished_ET) {
+ }
+ catch ($e) {
+ $self->task->clear_handle;
+ $self->_set_exception($e);
+ }
+
+ return;
+ }
+
+ method as_array() {
+ return [
+ $self->hid,
+ $self->task->meta->name,
+ $task->as_string,
+ ];
+ }
+
+ method from_array(Class $class: ArrayRef $array) {
+ my ($hid,$task_class,$task_serialized)=@$array;
+ Class::MOP::load_class($task_class);
+
+ return $class->new(
+ hid=>$hid,
+ task=>Thread::Task->from_string($task_serialized),
+ );
+ }
+
+ method message(@message) {
+ Thread::Task::Manager->instance->conduit->signal(
+ Storable::nfreeze([$self->hid,@message])
+ );
+ }
+
+ method on_message($method,@args) {
+ unless ($self->task->can($method)) {
+ return;
+ }
+
+ try {
+ $self->task->$method(@args);
+ }
+ catch {
+ return;
+ }
+ return;
+ }
+
+ method started() {
+ $self->message('__STARTED__');
+ }
+
+ method stopped() {
+ $self->message('__STOPPED__');
+ }
+
+ method on_stopped(Task_T $new_task) {
+ $self->task->_update($new_task);
+
+ try {
+ $self->task->finish(@args);
+ }
+ catch {
+ return;
+ }
+ return;
+ }
+}
+
+1;
diff --git a/lib/Thread/Task/Manager.pm b/lib/Thread/Task/Manager.pm
new file mode 100644
index 0000000..e969edc
--- /dev/null
+++ b/lib/Thread/Task/Manager.pm
@@ -0,0 +1,230 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Manager {
+ use TryCatch;
+
+ use MooseX::Types::Moose qw(Object Bool Int);
+ use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET);
+ use MooseX::Singleton;
+
+ use Thread::Task::Handle ();
+ use Thread::Task::Thread ();
+ use Thread::Task::Worker ();
+
+ require Storable;
+
+ our $VERSION='0.001';
+
+ has active => (
+ traits => [ 'Bool' ],
+ isa => Bool,
+ is => 'ro',
+ default => 0,
+ init_arg => undef,
+ handles => {
+ '_set_active' => 'set',
+ '_reset_active' => 'unset',
+ },
+ );
+
+ has threads => (
+ isa => Bool,
+ is => 'ro',
+ default => 1,
+ );
+
+ has minimum => (
+ isa => Int,
+ is => 'ro',
+ default => 2,
+ );
+
+ has workers => (
+ traits => [ 'Array' ],
+ isa => ArrayRef[Worker_T],
+ default => sub { [] },
+ init_arg => undef,
+ handles => {
+ _workers_count => 'count',
+ _set_worker => 'set',
+ _get_worker => 'get',
+ _delete_worker => 'delete',
+ _workers => 'elements',
+ },
+ );
+
+ has handles => (
+ traits => [ 'Hash' ],
+ isa => HashRef[Handle_T],
+ default => sub { {} },
+ init_arg => undef,
+ handles => {
+ _handles_count => 'count',
+ _set_handle => 'set',
+ _get_handle => 'get',
+ _delete_handle => 'delete',
+ _handles => 'elements',
+ },
+ );
+
+ has running => (
+ traits => [ 'Hash' ],
+ isa => HashRef[Handle_T],
+ default => sub { {} },
+ init_arg => undef,
+ handles => {
+ _running_count => 'count',
+ _set_running => 'set',
+ _get_running => 'get',
+ _delete_running => 'delete',
+ _running => 'elements',
+ },
+ );
+
+ has queue => (
+ traits => [ 'Array' ],
+ isa => ArrayRef[Task_T],
+ default => sub { [] },
+ init_arg => undef,
+ handles => {
+ _enqueue_task => 'push',
+ _dequeue_task => 'shift',
+ _queue_size => 'count',
+ },
+ );
+
+ has conduit => (
+ is => 'ro',
+ does => Conduit_T,
+ );
+
+ method BUILD() {
+ $self->conduit->conduit_init($self);
+ }
+
+ method start() {
+ if ($self->threads) {
+ for (0 .. $self->minimum -1 ) {
+ $self->start_thread($_);
+ }
+ }
+
+ $self->_set_active;
+ $self->step;
+ }
+
+ method start_thread(Int $worker_id) {
+ my $master = Thread::Task::Thread->master; # init the master thread
+ my $worker = Thread::Task::Worker->new->spawn;
+ $self->_set_worker($worker_id,$worker);
+ }
+
+ method stop() {
+ $self->_reset_active;
+ if ($self->threads) {
+ for (0 .. $self->_workers_count -1) {
+ $self->stop_thread($_);
+ }
+ Thread::Task::Thread->master->stop;
+ }
+ }
+
+ method stop_thread(Int $worker_id) {
+ $self->_delete_worker($worker_id)->stop;
+ }
+
+ method next_thread() {
+ for my $worker ($self->_workers) {
+ next if $worker->has_hid;
+ return $worker;
+ }
+ return;
+ }
+
+ method schedule(Task_T $task) {
+ $self->_enqueue_task($task);
+ $self->step;
+ }
+
+ method step() {
+ return unless $self->active;
+ return unless $self->_queue_size;
+
+ if ($self->threads) {
+ unless ($self->minimum > $self->_handles_count) {
+ return;
+ }
+ }
+
+ my $task = $self->_dequeue_task;
+ my $handle = Thread::Task::Handle->new(task => $task);
+ my $hid = $handle->hid;
+
+ try {
+ $handle->prepare();
+ }
+ catch (Finished_ET $e) {
+ undef $handle;
+ return $self->step;
+ }
+
+ $self->_set_handle($hid,$handle);
+
+ my $worker = $self->next_thread or return;
+ $worker->hid($hid);
+ $handle->wid($worker->wid);
+
+ $worker->send(task => $handle->as_array);
+
+ return $self->step;
+ }
+
+ method on_signal(Str $frozen) {
+ my $message;
+ try {
+ $message = Storable::nthaw($frozen);
+ }
+ catch {
+ return;
+ }
+ unless (ref $message eq 'ARRAY') {
+ return;
+ }
+
+ my $hid = shift @$message;
+ my $handle = $self->_get_handle($hid) or return;
+
+ my $method = shift @$message;
+
+ if ($method eq '__STARTED__') {
+ $self->_set_running($hid,$handle);
+ return;
+ }
+
+ unless ($self->_get_running($hid)) {
+ return;
+ }
+
+ if ($method eq '__STOPPED__') {
+ $self->_delete_running($hid);
+
+ for my $worker ($self->_workers) {
+ next unless $worker->has_hid;
+ next unless $worker->hid == $hid;
+ $worker->clear_hid;
+ last;
+ }
+
+ $handle->on_stopped(@$message);
+
+ $self->_delete_handle($hid);
+
+ return $self->step;
+ }
+
+ return $handle->on_message($method,@$message);
+ }
+}
+
+1;
diff --git a/lib/Thread/Task/Role/Task.pm b/lib/Thread/Task/Role/Task.pm
new file mode 100644
index 0000000..61f9cbe
--- /dev/null
+++ b/lib/Thread/Task/Role/Task.pm
@@ -0,0 +1,54 @@
+use 5.008003;
+use MooseX::Declare;
+
+role Thread::Task::Role::Task {
+ use MooseX::Types::Moose qw(Int);
+ require Scalar::Util;
+
+ my $SEQUENCE = 0;
+ my %INDEX = 0;
+
+ has task_revision => (
+ isa => Int,
+ is => 'ro',
+ lazy_build => 1,
+ predicate => '_has_taskrev',
+ writer => '_set_taskrev',
+ );
+
+ method _build_task_revision() {
+ my $rev = ++$SEQUENCE;
+ $INDEX{$rev} = $self;
+ Scalar::Util::weaken($INDEX{$rev});
+
+ return $rev;
+ }
+
+ method task_reset() {
+ if ($self->_has_taskrev) {
+ delete $INDEX{$self->task_revision};
+ }
+ $self->_set_taskrev(++$SEQUENCE);
+ return;
+ }
+
+ method task_owner($invocant: Int $task_rev) {
+ return $INDEX{$task_rev};
+ }
+
+ method task_request(:$task, %params) {
+ Class::MOP::load_class($task);
+ $task->isa('Thread::Task')
+ or die "Bad class $task, not a Thread::Task";
+ $class->new(owner=>$self,%params)->schedule;
+ }
+
+ method task_response(Task_T $task) {
+ my $class = $self->meta->name;
+ my $task_class = $task->meta->name;
+
+ die "Unhandled task_response for $class (recieved $task_class)";
+ }
+}
+
+1;
diff --git a/lib/Thread/Task/Thread.pm b/lib/Thread/Task/Thread.pm
new file mode 100644
index 0000000..401992a
--- /dev/null
+++ b/lib/Thread/Task/Thread.pm
@@ -0,0 +1,131 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Thread {
+ use threads;
+ use threads::shared;
+
+ use MooseX::Types::Moose qw(Int);
+ use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET
+ Queue_T);
+
+ use Thread::Task::Exception::Finished;
+
+ use TryCatch;
+ use Thread::Queue;
+
+ my $SEQUENCE : shared = 0;
+ my %WID2TID : shared = ();
+
+ my $SINGLETON;
+
+ method master(Class $class:) {
+ $SINGLETON
+ or $class->new->spawn;
+ }
+
+ method import(Class $class: @rest) {
+ if (@rest && defined $rest[0] && $rest[0] eq ':master') {
+ $class->master;
+ }
+ }
+
+ has wid => (
+ isa => Int,
+ is => 'ro',
+ init_arg => undef,
+ default => sub { ++$SEQUENCE },
+ required => 1,
+ );
+
+ has queue => (
+ isa => Queue_T,
+ is => 'ro',
+ init_arg => undef,
+ default => sub { Thread::Queue->new },
+ required => 1,
+ handles => {
+ _enqueue_msg => 'enqueue',
+ _dequeue_msg => 'dequeue',
+ },
+ );
+
+ has thread => (
+ is => 'ro',
+ lazy_build => 1,
+ required => 1,
+ clearer => '_clear_thread',
+ handles => [qw(join is_running is_joinable is_detached)],
+ );
+
+ method _build_thread() {
+ threads->object( $self->tid );
+ }
+
+ method spawn() {
+ $self->_clear_thread;
+ $WID2TID{$self->wid} =
+ threads->create(
+ sub {
+ $_[0]->run;
+ },
+ $self,
+ )->tid;
+ return $self;
+ }
+
+ method tid() {
+ return $WID2TID{$self->wid};
+ }
+
+ method is_thread() {
+ return $self->tid == threads->self->tid;
+ }
+
+ method send(Str $method,@args) {
+ unless ($self->can($method)) {
+ die "Attempted to send message to non-existant method '$method'";
+ }
+
+ $self->_enqueue_msg([$method,@args]);
+
+ return;
+ }
+
+ method start(@args) {
+ $self->send('start_child',@args);
+ }
+
+ method stop() {
+ $self->thread->detach;
+ $self->send('stop_child');
+ }
+
+ method run() {
+ while (my $message = $self->_dequeue_msg) {
+ next unless ref($message) eq 'ARRAY' && @$message;
+ my $method = shift @$message;
+ next unless $self->can($method);
+
+ try {
+ $self->$method(@$method);
+ }
+ catch (Finished_ET) {
+ last;
+ }
+ }
+ return;
+ }
+
+ method start_child(Thread_T $thread) {
+ $thread->spawn;
+ return;
+ }
+
+ method stop_child() {
+ die Thread::Task::Exception::Finished->new;
+ }
+
+}
+
+1;
diff --git a/lib/Thread/Task/Types.pm b/lib/Thread/Task/Types.pm
new file mode 100644
index 0000000..8a4d044
--- /dev/null
+++ b/lib/Thread/Task/Types.pm
@@ -0,0 +1,25 @@
+package Thread::Task::Types;
+use MooseX::Types
+ -declare =>
+ [qw(
+ Handle_T
+ Worker_T
+ Task_T
+ Conduit_T
+ Finished_ET
+ Queue_T
+ Queue_Ev_T
+ )];
+
+class_type Handle_T, { class => 'Thread::Task::Handle' };
+class_type Worker_T, { class => 'Thread::Task::Worker' };
+class_type Task_T, { class => 'Thread::Task' };
+
+role_type Conduit_T, { role => 'Thread::Task::Role::Conduit' };
+
+class_type Finished_ET, { class => 'Thread::Task::Exception::Finished' };
+
+class_type Queue_T, { class => 'Thread::Queue' };
+class_type Queue_Ev_T, { class => 'Thread::Queue::Event' };
+
+1;
diff --git a/lib/Thread/Task/Worker.pm b/lib/Thread/Task/Worker.pm
new file mode 100644
index 0000000..f0ec3e6
--- /dev/null
+++ b/lib/Thread/Task/Worker.pm
@@ -0,0 +1,36 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Worker extends Thread::Task::Thread {
+ use TryCatch;
+
+ use MooseX::Types::Moose qw(Object Bool Int ArrayRef);
+ use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET);
+
+ has hid => (
+ isa => Int,
+ is => 'rw',
+ predicate => 'has_hid',
+ clearer => 'clear_hid',
+ required => 0,
+ );
+
+ method task(ArrayRef $task_array) {
+ require Thread::Task::Handle;
+
+ my $handle = Thread::Task::Handle->from_array($task_array);
+
+ try {
+ $handle->started;
+ $handle->run;
+ $handle->stopped;
+ }
+ catch {
+ # trace
+ }
+ return;
+ }
+
+}
+
+1;