diff options
author | dakkar <dakkar@thenautilus.net> | 2020-03-16 15:03:43 +0000 |
---|---|---|
committer | dakkar <dakkar@thenautilus.net> | 2020-03-16 15:34:19 +0000 |
commit | ee82c4419d01243ecea3ce3a736b43edf2df5783 (patch) | |
tree | 067a9adf3702c65a9d6c133a4dde8873a5293a3d | |
parent | always decont the arguments to pair (diff) | |
download | MaildirIndexer-ee82c4419d01243ecea3ce3a736b43edf2df5783.tar.gz MaildirIndexer-ee82c4419d01243ecea3ce3a736b43edf2df5783.tar.bz2 MaildirIndexer-ee82c4419d01243ecea3ce3a736b43edf2df5783.zip |
more async!
indices now work from channels, so we have a bunch of workers doing
the parsing, and one worker per index, no lock needed (the implicit
lock in the react/whenever is enough)
next: indices return confidence level, store returns best response
next next: spamc as an index
-rw-r--r-- | lib/MaildirIndexer/Index.rakumod | 29 | ||||
-rw-r--r-- | lib/MaildirIndexer/Store.rakumod | 28 | ||||
-rw-r--r-- | t/store.t | 3 |
3 files changed, 48 insertions, 12 deletions
diff --git a/lib/MaildirIndexer/Index.rakumod b/lib/MaildirIndexer/Index.rakumod index 2cb308a..8d7ad30 100644 --- a/lib/MaildirIndexer/Index.rakumod +++ b/lib/MaildirIndexer/Index.rakumod @@ -2,6 +2,35 @@ use v6.d; unit role MaildirIndexer::Index; use MaildirIndexer::Email; +my class AddMail is export { + has MaildirIndexer::Email:D $.email is required; + has Str:D $.mailbox is required; +} +my class DelPath is export { + has IO:D $.path is required; + has Str:D $.mailbox is required; +} +my class MailboxForEmail is export { + has MaildirIndexer::Email:D $.email is required; + has Channel:D $.reply-to is required; +} + +method receive(Channel:D $channel --> Nil) { + react { + whenever $channel -> $event { + when $event ~~ AddMail { + self.add-mail($event.email, $event.mailbox); + } + when $event ~~ DelPath { + self.del-path($event.path, $event.mailbox); + } + when $event ~~ MailboxForEmail { + $event.reply-to.send(self.mailbox-for-email($event.email)); + } + } + } +} + method add-mail(MaildirIndexer::Email:D $email, Str:D $mailbox --> Nil) { ... } method del-path(IO:D $path, Str:D $mailbox --> Nil) { ... } method mailbox-for-email(MaildirIndexer::Email:D $email --> Str) { ... } diff --git a/lib/MaildirIndexer/Store.rakumod b/lib/MaildirIndexer/Store.rakumod index 791f9a4..856baca 100644 --- a/lib/MaildirIndexer/Store.rakumod +++ b/lib/MaildirIndexer/Store.rakumod @@ -9,6 +9,7 @@ has Lock $!lock .= new; has MaildirIndexer::Index @.indices is required; has Channel $.file-channel is required; has Int $.workers = Kernel.cpu-cores - 1; +has Channel @!index-channels = Channel.new xx +@!indices; method dump(--> Nil) { $!lock.protect: { @@ -17,6 +18,10 @@ method dump(--> Nil) { } method start(--> Nil) { + for @.indices Z @!index-channels -> ($i, $c) { + start $i.receive($c); + } + for ^$.workers { start react { whenever $.file-channel -> $file { @@ -42,28 +47,29 @@ method start(--> Nil) { method add-file(IO:D $file --> Nil) { my Str $mailbox = mailbox-from-path($file.path) or return; my MaildirIndexer::Email $email = parse-email($file,:headers-only) or return; - $!lock.protect: { - .add-mail($email,$mailbox) for @!indices; - } + my $event = AddMail.new(:$email,:$mailbox); + .send($event) for @!index-channels; + return; } method del-file(IO:D $file --> Nil) { my $mailbox = mailbox-from-path($file.path) or return; - $!lock.protect: { - .del-path($file,$mailbox) for @!indices; - } + + my $event = DelPath.new(:path($file),:$mailbox); + .send($event) for @!index-channels; + return; } method mailbox-for-email(MaildirIndexer::Email:D $email --> Str) { my Str $result; + my Channel $replies .= new; + my $event = MailboxForEmail.new(:$email,:reply-to($replies)); MaildirIndexer::LogTimelineSchema::Store::Find.log: { - $!lock.protect: { - for @!indices -> $index { - with $index.mailbox-for-email($email) { $result = $_; last }; - } - } + .send($event) for @!index-channels; + my @results = $replies.receive() xx @!index-channels; + $result = @results.grep(*.defined).join("\n"); } return $result; } @@ -30,13 +30,14 @@ subtest 'indexing' => { subtest 'finding' => { my Channel $file-channel .= new; my TestIndex $index1 .= new(:responses('1',Str,'1',Str),:name('index1')); - my TestIndex $index2 .= new(:responses('2',Str),:name('index2')); + my TestIndex $index2 .= new(:responses('2',Str,Str,Str),:name('index2')); my MaildirIndexer::Store $store .= new( :$file-channel, :indices($index1,$index2), :1workers, ); + $store.start; my @responses = $store.mailbox-for-email(MaildirIndexer::Email.new) xx 4; |