From ee82c4419d01243ecea3ce3a736b43edf2df5783 Mon Sep 17 00:00:00 2001 From: dakkar Date: Mon, 16 Mar 2020 15:03:43 +0000 Subject: more async! indices now work from channels, so we have a bunch of workers doing the parsing, and one worker per index, no lock needed (the implicit lock in the react/whenever is enough) next: indices return confidence level, store returns best response next next: spamc as an index --- lib/MaildirIndexer/Store.rakumod | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'lib/MaildirIndexer/Store.rakumod') diff --git a/lib/MaildirIndexer/Store.rakumod b/lib/MaildirIndexer/Store.rakumod index 791f9a4..856baca 100644 --- a/lib/MaildirIndexer/Store.rakumod +++ b/lib/MaildirIndexer/Store.rakumod @@ -9,6 +9,7 @@ has Lock $!lock .= new; has MaildirIndexer::Index @.indices is required; has Channel $.file-channel is required; has Int $.workers = Kernel.cpu-cores - 1; +has Channel @!index-channels = Channel.new xx +@!indices; method dump(--> Nil) { $!lock.protect: { @@ -17,6 +18,10 @@ method dump(--> Nil) { } method start(--> Nil) { + for @.indices Z @!index-channels -> ($i, $c) { + start $i.receive($c); + } + for ^$.workers { start react { whenever $.file-channel -> $file { @@ -42,28 +47,29 @@ method start(--> Nil) { method add-file(IO:D $file --> Nil) { my Str $mailbox = mailbox-from-path($file.path) or return; my MaildirIndexer::Email $email = parse-email($file,:headers-only) or return; - $!lock.protect: { - .add-mail($email,$mailbox) for @!indices; - } + my $event = AddMail.new(:$email,:$mailbox); + .send($event) for @!index-channels; + return; } method del-file(IO:D $file --> Nil) { my $mailbox = mailbox-from-path($file.path) or return; - $!lock.protect: { - .del-path($file,$mailbox) for @!indices; - } + + my $event = DelPath.new(:path($file),:$mailbox); + .send($event) for @!index-channels; + return; } method mailbox-for-email(MaildirIndexer::Email:D $email --> Str) { my Str $result; + my Channel $replies .= new; + my $event = MailboxForEmail.new(:$email,:reply-to($replies)); MaildirIndexer::LogTimelineSchema::Store::Find.log: { - $!lock.protect: { - for @!indices -> $index { - with $index.mailbox-for-email($email) { $result = $_; last }; - } - } + .send($event) for @!index-channels; + my @results = $replies.receive() xx @!index-channels; + $result = @results.grep(*.defined).join("\n"); } return $result; } -- cgit v1.2.3