package Thread::Queue::Event; use Moose; use Glib; use Thread::Queue; use Carp; has queue => ( is => 'ro', isa => 'Thread::Queue', init_arg => undef, builder => '_build_queue', handles => [qw( enqueue dequeue dequeue_nb pending peek insert extract )], ); has pipes => ( isa => 'ArrayRef', is => 'ro', builder => '_build_pipes', ); sub _build_pipes { my ($main,$thread);pipe($main,$thread); return [$main,$thread]; } sub _build_queue { return Thread::Queue->new(); } sub _pipe_write { return syswrite $_[0]->pipes->[1],'0'; } sub _pipe_read { my $foo;sysread $_[0]->pipes->[0],$foo,1; return $foo; } sub event_fh { return $_[0]->pipes->[0]; } sub event_fileno { return fileno($_[0]->event_fh); } after enqueue => \&_pipe_write; before dequeue => \&_pipe_read; ## TODO test this around dequeue_nb => sub { my ($dequeue_nb,$self,@params)=@_; my $pre=$self->pending; my @res=$dequeue_nb->($self,@params); my $post=$self->pending; $self->_pipe_read for 1..($post-$pre); return @res; }; sub signal { my ($self)=@_; croak "Not signaling on an empty queue, would block" unless $self->pending; $self->_pipe_write(); } sub with_role { my ($self,$role)=@_; if ($role !~ /::/) { $role="Thread::Queue::Event::$role"; } Class::MOP::load_class($role); $role->meta->apply($self); return $self; } no Moose; 1;