summaryrefslogtreecommitdiff
path: root/lib/Thread/Task/Handle.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Thread/Task/Handle.pm')
-rw-r--r--lib/Thread/Task/Handle.pm134
1 files changed, 134 insertions, 0 deletions
diff --git a/lib/Thread/Task/Handle.pm b/lib/Thread/Task/Handle.pm
new file mode 100644
index 0000000..b7012f4
--- /dev/null
+++ b/lib/Thread/Task/Handle.pm
@@ -0,0 +1,134 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Handle {
+ use TryCatch;
+
+ use MooseX::Types::Moose qw(Object Bool Int);
+ use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET);
+ use Thread::Task::Exception::Finished;
+
+ require Scalar::Util;
+
+ our $SEQUENCE = 0;
+
+ has hid => (
+ isa => Int,
+ is => 'ro',
+ default => sub { ++$SEQUENCE },
+ init_arg => undef,
+ );
+
+ has task => (
+ isa => Task_T,
+ is => 'ro',
+ required => 1,
+ );
+
+ has execption => (
+ is => 'ro',
+ writer => '_set_exception',
+ predicate => 'has_exception',
+ init_arg => undef,
+ );
+
+ method prepare() {
+ try {
+ $self->task->prepare;
+ }
+ catch (Finished_ET) {
+ die $@;
+ }
+ catch {
+ die Thread::Task::Exception::Finished->new(msg=>$@);
+ }
+ }
+
+ method finish() {
+ try {
+ $self->task->finish;
+ }
+ catch (Finished_ET) {
+ die $@;
+ }
+ catch {
+ die Thread::Task::Exception::Finished->new(msg=>$@);
+ }
+ }
+
+ method run() {
+ try {
+ $self->task->handle($self);
+ $self->task->run;
+ $self->task->clear_handle;
+ }
+ catch (Finished_ET) {
+ }
+ catch ($e) {
+ $self->task->clear_handle;
+ $self->_set_exception($e);
+ }
+
+ return;
+ }
+
+ method as_array() {
+ return [
+ $self->hid,
+ $self->task->meta->name,
+ $task->as_string,
+ ];
+ }
+
+ method from_array(Class $class: ArrayRef $array) {
+ my ($hid,$task_class,$task_serialized)=@$array;
+ Class::MOP::load_class($task_class);
+
+ return $class->new(
+ hid=>$hid,
+ task=>Thread::Task->from_string($task_serialized),
+ );
+ }
+
+ method message(@message) {
+ Thread::Task::Manager->instance->conduit->signal(
+ Storable::nfreeze([$self->hid,@message])
+ );
+ }
+
+ method on_message($method,@args) {
+ unless ($self->task->can($method)) {
+ return;
+ }
+
+ try {
+ $self->task->$method(@args);
+ }
+ catch {
+ return;
+ }
+ return;
+ }
+
+ method started() {
+ $self->message('__STARTED__');
+ }
+
+ method stopped() {
+ $self->message('__STOPPED__');
+ }
+
+ method on_stopped(Task_T $new_task) {
+ $self->task->_update($new_task);
+
+ try {
+ $self->task->finish(@args);
+ }
+ catch {
+ return;
+ }
+ return;
+ }
+}
+
+1;