summaryrefslogtreecommitdiff
path: root/lib/Thread/Task/Manager.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Thread/Task/Manager.pm')
-rw-r--r--lib/Thread/Task/Manager.pm230
1 files changed, 230 insertions, 0 deletions
diff --git a/lib/Thread/Task/Manager.pm b/lib/Thread/Task/Manager.pm
new file mode 100644
index 0000000..e969edc
--- /dev/null
+++ b/lib/Thread/Task/Manager.pm
@@ -0,0 +1,230 @@
+use 5.008003;
+use MooseX::Declare;
+
+class Thread::Task::Manager {
+ use TryCatch;
+
+ use MooseX::Types::Moose qw(Object Bool Int);
+ use Thread::Task::Types qw(Handle_T Worker_T Task_T Conduit_T Finished_ET);
+ use MooseX::Singleton;
+
+ use Thread::Task::Handle ();
+ use Thread::Task::Thread ();
+ use Thread::Task::Worker ();
+
+ require Storable;
+
+ our $VERSION='0.001';
+
+ has active => (
+ traits => [ 'Bool' ],
+ isa => Bool,
+ is => 'ro',
+ default => 0,
+ init_arg => undef,
+ handles => {
+ '_set_active' => 'set',
+ '_reset_active' => 'unset',
+ },
+ );
+
+ has threads => (
+ isa => Bool,
+ is => 'ro',
+ default => 1,
+ );
+
+ has minimum => (
+ isa => Int,
+ is => 'ro',
+ default => 2,
+ );
+
+ has workers => (
+ traits => [ 'Array' ],
+ isa => ArrayRef[Worker_T],
+ default => sub { [] },
+ init_arg => undef,
+ handles => {
+ _workers_count => 'count',
+ _set_worker => 'set',
+ _get_worker => 'get',
+ _delete_worker => 'delete',
+ _workers => 'elements',
+ },
+ );
+
+ has handles => (
+ traits => [ 'Hash' ],
+ isa => HashRef[Handle_T],
+ default => sub { {} },
+ init_arg => undef,
+ handles => {
+ _handles_count => 'count',
+ _set_handle => 'set',
+ _get_handle => 'get',
+ _delete_handle => 'delete',
+ _handles => 'elements',
+ },
+ );
+
+ has running => (
+ traits => [ 'Hash' ],
+ isa => HashRef[Handle_T],
+ default => sub { {} },
+ init_arg => undef,
+ handles => {
+ _running_count => 'count',
+ _set_running => 'set',
+ _get_running => 'get',
+ _delete_running => 'delete',
+ _running => 'elements',
+ },
+ );
+
+ has queue => (
+ traits => [ 'Array' ],
+ isa => ArrayRef[Task_T],
+ default => sub { [] },
+ init_arg => undef,
+ handles => {
+ _enqueue_task => 'push',
+ _dequeue_task => 'shift',
+ _queue_size => 'count',
+ },
+ );
+
+ has conduit => (
+ is => 'ro',
+ does => Conduit_T,
+ );
+
+ method BUILD() {
+ $self->conduit->conduit_init($self);
+ }
+
+ method start() {
+ if ($self->threads) {
+ for (0 .. $self->minimum -1 ) {
+ $self->start_thread($_);
+ }
+ }
+
+ $self->_set_active;
+ $self->step;
+ }
+
+ method start_thread(Int $worker_id) {
+ my $master = Thread::Task::Thread->master; # init the master thread
+ my $worker = Thread::Task::Worker->new->spawn;
+ $self->_set_worker($worker_id,$worker);
+ }
+
+ method stop() {
+ $self->_reset_active;
+ if ($self->threads) {
+ for (0 .. $self->_workers_count -1) {
+ $self->stop_thread($_);
+ }
+ Thread::Task::Thread->master->stop;
+ }
+ }
+
+ method stop_thread(Int $worker_id) {
+ $self->_delete_worker($worker_id)->stop;
+ }
+
+ method next_thread() {
+ for my $worker ($self->_workers) {
+ next if $worker->has_hid;
+ return $worker;
+ }
+ return;
+ }
+
+ method schedule(Task_T $task) {
+ $self->_enqueue_task($task);
+ $self->step;
+ }
+
+ method step() {
+ return unless $self->active;
+ return unless $self->_queue_size;
+
+ if ($self->threads) {
+ unless ($self->minimum > $self->_handles_count) {
+ return;
+ }
+ }
+
+ my $task = $self->_dequeue_task;
+ my $handle = Thread::Task::Handle->new(task => $task);
+ my $hid = $handle->hid;
+
+ try {
+ $handle->prepare();
+ }
+ catch (Finished_ET $e) {
+ undef $handle;
+ return $self->step;
+ }
+
+ $self->_set_handle($hid,$handle);
+
+ my $worker = $self->next_thread or return;
+ $worker->hid($hid);
+ $handle->wid($worker->wid);
+
+ $worker->send(task => $handle->as_array);
+
+ return $self->step;
+ }
+
+ method on_signal(Str $frozen) {
+ my $message;
+ try {
+ $message = Storable::nthaw($frozen);
+ }
+ catch {
+ return;
+ }
+ unless (ref $message eq 'ARRAY') {
+ return;
+ }
+
+ my $hid = shift @$message;
+ my $handle = $self->_get_handle($hid) or return;
+
+ my $method = shift @$message;
+
+ if ($method eq '__STARTED__') {
+ $self->_set_running($hid,$handle);
+ return;
+ }
+
+ unless ($self->_get_running($hid)) {
+ return;
+ }
+
+ if ($method eq '__STOPPED__') {
+ $self->_delete_running($hid);
+
+ for my $worker ($self->_workers) {
+ next unless $worker->has_hid;
+ next unless $worker->hid == $hid;
+ $worker->clear_hid;
+ last;
+ }
+
+ $handle->on_stopped(@$message);
+
+ $self->_delete_handle($hid);
+
+ return $self->step;
+ }
+
+ return $handle->on_message($method,@$message);
+ }
+}
+
+1;