From ee82c4419d01243ecea3ce3a736b43edf2df5783 Mon Sep 17 00:00:00 2001 From: dakkar Date: Mon, 16 Mar 2020 15:03:43 +0000 Subject: more async! indices now work from channels, so we have a bunch of workers doing the parsing, and one worker per index, no lock needed (the implicit lock in the react/whenever is enough) next: indices return confidence level, store returns best response next next: spamc as an index --- lib/MaildirIndexer/Index.rakumod | 29 +++++++++++++++++++++++++++++ lib/MaildirIndexer/Store.rakumod | 28 +++++++++++++++++----------- t/store.t | 3 ++- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/lib/MaildirIndexer/Index.rakumod b/lib/MaildirIndexer/Index.rakumod index 2cb308a..8d7ad30 100644 --- a/lib/MaildirIndexer/Index.rakumod +++ b/lib/MaildirIndexer/Index.rakumod @@ -2,6 +2,35 @@ use v6.d; unit role MaildirIndexer::Index; use MaildirIndexer::Email; +my class AddMail is export { + has MaildirIndexer::Email:D $.email is required; + has Str:D $.mailbox is required; +} +my class DelPath is export { + has IO:D $.path is required; + has Str:D $.mailbox is required; +} +my class MailboxForEmail is export { + has MaildirIndexer::Email:D $.email is required; + has Channel:D $.reply-to is required; +} + +method receive(Channel:D $channel --> Nil) { + react { + whenever $channel -> $event { + when $event ~~ AddMail { + self.add-mail($event.email, $event.mailbox); + } + when $event ~~ DelPath { + self.del-path($event.path, $event.mailbox); + } + when $event ~~ MailboxForEmail { + $event.reply-to.send(self.mailbox-for-email($event.email)); + } + } + } +} + method add-mail(MaildirIndexer::Email:D $email, Str:D $mailbox --> Nil) { ... } method del-path(IO:D $path, Str:D $mailbox --> Nil) { ... } method mailbox-for-email(MaildirIndexer::Email:D $email --> Str) { ... } diff --git a/lib/MaildirIndexer/Store.rakumod b/lib/MaildirIndexer/Store.rakumod index 791f9a4..856baca 100644 --- a/lib/MaildirIndexer/Store.rakumod +++ b/lib/MaildirIndexer/Store.rakumod @@ -9,6 +9,7 @@ has Lock $!lock .= new; has MaildirIndexer::Index @.indices is required; has Channel $.file-channel is required; has Int $.workers = Kernel.cpu-cores - 1; +has Channel @!index-channels = Channel.new xx +@!indices; method dump(--> Nil) { $!lock.protect: { @@ -17,6 +18,10 @@ method dump(--> Nil) { } method start(--> Nil) { + for @.indices Z @!index-channels -> ($i, $c) { + start $i.receive($c); + } + for ^$.workers { start react { whenever $.file-channel -> $file { @@ -42,28 +47,29 @@ method start(--> Nil) { method add-file(IO:D $file --> Nil) { my Str $mailbox = mailbox-from-path($file.path) or return; my MaildirIndexer::Email $email = parse-email($file,:headers-only) or return; - $!lock.protect: { - .add-mail($email,$mailbox) for @!indices; - } + my $event = AddMail.new(:$email,:$mailbox); + .send($event) for @!index-channels; + return; } method del-file(IO:D $file --> Nil) { my $mailbox = mailbox-from-path($file.path) or return; - $!lock.protect: { - .del-path($file,$mailbox) for @!indices; - } + + my $event = DelPath.new(:path($file),:$mailbox); + .send($event) for @!index-channels; + return; } method mailbox-for-email(MaildirIndexer::Email:D $email --> Str) { my Str $result; + my Channel $replies .= new; + my $event = MailboxForEmail.new(:$email,:reply-to($replies)); MaildirIndexer::LogTimelineSchema::Store::Find.log: { - $!lock.protect: { - for @!indices -> $index { - with $index.mailbox-for-email($email) { $result = $_; last }; - } - } + .send($event) for @!index-channels; + my @results = $replies.receive() xx @!index-channels; + $result = @results.grep(*.defined).join("\n"); } return $result; } diff --git a/t/store.t b/t/store.t index d5b2502..bb519b1 100644 --- a/t/store.t +++ b/t/store.t @@ -30,13 +30,14 @@ subtest 'indexing' => { subtest 'finding' => { my Channel $file-channel .= new; my TestIndex $index1 .= new(:responses('1',Str,'1',Str),:name('index1')); - my TestIndex $index2 .= new(:responses('2',Str),:name('index2')); + my TestIndex $index2 .= new(:responses('2',Str,Str,Str),:name('index2')); my MaildirIndexer::Store $store .= new( :$file-channel, :indices($index1,$index2), :1workers, ); + $store.start; my @responses = $store.mailbox-for-email(MaildirIndexer::Email.new) xx 4; -- cgit v1.2.3