summaryrefslogtreecommitdiff
path: root/lib/Thread/Task/Thread.pm
blob: 401992afb11520116c9f2dabdddf7815dbda094c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use 5.008003;
use MooseX::Declare;
 
class Thread::Task::Thread {
    use threads;
    use threads::shared;
 
    use MooseX::Types::Moose qw(Int);
    use Thread::Task::Types qw(Handle_T Worker_T Task_T Finished_ET
                               Queue_T);
 
    use Thread::Task::Exception::Finished;
 
    use TryCatch;
    use Thread::Queue;
 
    my $SEQUENCE : shared = 0;
    my %WID2TID : shared  = ();
 
    my $SINGLETON;
 
    method master(Class $class:) {
        $SINGLETON
            or $class->new->spawn;
    }
 
    method import(Class $class@rest) {
        if (@rest && defined $rest[0] && $rest[0] eq ':master') {
            $class->master;
        }
    }
 
    has wid => (
        isa => Int,
        is => 'ro',
        init_arg => undef,
        default => sub { ++$SEQUENCE },
        required => 1,
    );
 
    has queue => (
        isa => Queue_T,
        is => 'ro',
        init_arg => undef,
        default => sub { Thread::Queue->new },
        required => 1,
        handles => {
            _enqueue_msg => 'enqueue',
            _dequeue_msg => 'dequeue',
        },
    );
 
    has thread => (
        is => 'ro',
        lazy_build => 1,
        required => 1,
        clearer => '_clear_thread',
        handles => [qw(join is_running is_joinable is_detached)],
    );
 
    method _build_thread() {
        threads->object( $self->tid );
    }
 
    method spawn() {
        $self->_clear_thread;
        $WID2TID{$self->wid} =
            threads->create(
                sub {
                    $_[0]->run;
                },
                $self,
            )->tid;
        return $self;
    }
 
    method tid() {
        return $WID2TID{$self->wid};
    }
 
    method is_thread() {
        return $self->tid == threads->self->tid;
    }
 
    method send(Str $method,@args) {
        unless ($self->can($method)) {
            die "Attempted to send message to non-existant method '$method'";
        }
 
        $self->_enqueue_msg([$method,@args]);
 
        return;
    }
 
    method start(@args) {
        $self->send('start_child',@args);
    }
 
    method stop() {
        $self->thread->detach;
        $self->send('stop_child');
    }
 
    method run() {
        while (my $message = $self->_dequeue_msg) {
            next unless ref($messageeq 'ARRAY' && @$message;
            my $method = shift @$message;
            next unless $self->can($method);
 
            try {
                $self->$method(@$method);
            }
            catch (Finished_ET) {
                last;
            }
        }
        return;
    }
 
    method start_child(Thread_T $thread) {
        $thread->spawn;
        return;
    }
 
    method stop_child() {
        die Thread::Task::Exception::Finished->new;
    }
 
}
 
1;