summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordakkar <dakkar@thenautilus.net>2020-03-16 15:03:43 +0000
committerdakkar <dakkar@thenautilus.net>2020-03-16 15:34:19 +0000
commitee82c4419d01243ecea3ce3a736b43edf2df5783 (patch)
tree067a9adf3702c65a9d6c133a4dde8873a5293a3d
parentalways decont the arguments to pair (diff)
downloadMaildirIndexer-ee82c4419d01243ecea3ce3a736b43edf2df5783.tar.gz
MaildirIndexer-ee82c4419d01243ecea3ce3a736b43edf2df5783.tar.bz2
MaildirIndexer-ee82c4419d01243ecea3ce3a736b43edf2df5783.zip
more async!
indices now work from channels, so we have a bunch of workers doing the parsing, and one worker per index, no lock needed (the implicit lock in the react/whenever is enough) next: indices return confidence level, store returns best response next next: spamc as an index
-rw-r--r--lib/MaildirIndexer/Index.rakumod29
-rw-r--r--lib/MaildirIndexer/Store.rakumod28
-rw-r--r--t/store.t3
3 files changed, 48 insertions, 12 deletions
diff --git a/lib/MaildirIndexer/Index.rakumod b/lib/MaildirIndexer/Index.rakumod
index 2cb308a..8d7ad30 100644
--- a/lib/MaildirIndexer/Index.rakumod
+++ b/lib/MaildirIndexer/Index.rakumod
@@ -2,6 +2,35 @@ use v6.d;
unit role MaildirIndexer::Index;
use MaildirIndexer::Email;
+my class AddMail is export {
+ has MaildirIndexer::Email:D $.email is required;
+ has Str:D $.mailbox is required;
+}
+my class DelPath is export {
+ has IO:D $.path is required;
+ has Str:D $.mailbox is required;
+}
+my class MailboxForEmail is export {
+ has MaildirIndexer::Email:D $.email is required;
+ has Channel:D $.reply-to is required;
+}
+
+method receive(Channel:D $channel --> Nil) {
+ react {
+ whenever $channel -> $event {
+ when $event ~~ AddMail {
+ self.add-mail($event.email, $event.mailbox);
+ }
+ when $event ~~ DelPath {
+ self.del-path($event.path, $event.mailbox);
+ }
+ when $event ~~ MailboxForEmail {
+ $event.reply-to.send(self.mailbox-for-email($event.email));
+ }
+ }
+ }
+}
+
method add-mail(MaildirIndexer::Email:D $email, Str:D $mailbox --> Nil) { ... }
method del-path(IO:D $path, Str:D $mailbox --> Nil) { ... }
method mailbox-for-email(MaildirIndexer::Email:D $email --> Str) { ... }
diff --git a/lib/MaildirIndexer/Store.rakumod b/lib/MaildirIndexer/Store.rakumod
index 791f9a4..856baca 100644
--- a/lib/MaildirIndexer/Store.rakumod
+++ b/lib/MaildirIndexer/Store.rakumod
@@ -9,6 +9,7 @@ has Lock $!lock .= new;
has MaildirIndexer::Index @.indices is required;
has Channel $.file-channel is required;
has Int $.workers = Kernel.cpu-cores - 1;
+has Channel @!index-channels = Channel.new xx +@!indices;
method dump(--> Nil) {
$!lock.protect: {
@@ -17,6 +18,10 @@ method dump(--> Nil) {
}
method start(--> Nil) {
+ for @.indices Z @!index-channels -> ($i, $c) {
+ start $i.receive($c);
+ }
+
for ^$.workers {
start react {
whenever $.file-channel -> $file {
@@ -42,28 +47,29 @@ method start(--> Nil) {
method add-file(IO:D $file --> Nil) {
my Str $mailbox = mailbox-from-path($file.path) or return;
my MaildirIndexer::Email $email = parse-email($file,:headers-only) or return;
- $!lock.protect: {
- .add-mail($email,$mailbox) for @!indices;
- }
+ my $event = AddMail.new(:$email,:$mailbox);
+ .send($event) for @!index-channels;
+
return;
}
method del-file(IO:D $file --> Nil) {
my $mailbox = mailbox-from-path($file.path) or return;
- $!lock.protect: {
- .del-path($file,$mailbox) for @!indices;
- }
+
+ my $event = DelPath.new(:path($file),:$mailbox);
+ .send($event) for @!index-channels;
+
return;
}
method mailbox-for-email(MaildirIndexer::Email:D $email --> Str) {
my Str $result;
+ my Channel $replies .= new;
+ my $event = MailboxForEmail.new(:$email,:reply-to($replies));
MaildirIndexer::LogTimelineSchema::Store::Find.log: {
- $!lock.protect: {
- for @!indices -> $index {
- with $index.mailbox-for-email($email) { $result = $_; last };
- }
- }
+ .send($event) for @!index-channels;
+ my @results = $replies.receive() xx @!index-channels;
+ $result = @results.grep(*.defined).join("\n");
}
return $result;
}
diff --git a/t/store.t b/t/store.t
index d5b2502..bb519b1 100644
--- a/t/store.t
+++ b/t/store.t
@@ -30,13 +30,14 @@ subtest 'indexing' => {
subtest 'finding' => {
my Channel $file-channel .= new;
my TestIndex $index1 .= new(:responses('1',Str,'1',Str),:name('index1'));
- my TestIndex $index2 .= new(:responses('2',Str),:name('index2'));
+ my TestIndex $index2 .= new(:responses('2',Str,Str,Str),:name('index2'));
my MaildirIndexer::Store $store .= new(
:$file-channel,
:indices($index1,$index2),
:1workers,
);
+ $store.start;
my @responses = $store.mailbox-for-email(MaildirIndexer::Email.new) xx 4;