1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.actor;
7 
8 import std.stdio : writeln, writefln;
9 
10 import core.thread : Thread;
11 import logger = std.experimental.logger;
12 import std.algorithm : schwartzSort, max, min, among;
13 import std.array : empty;
14 import std.datetime : SysTime, Clock, dur;
15 import std.exception : collectException;
16 import std.functional : toDelegate;
17 import std.meta : staticMap;
18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction;
19 import std.typecons : Tuple, tuple;
20 import std.variant : Variant;
21 
22 import my.actor.common : ExitReason, SystemError, makeSignature;
23 import my.actor.mailbox;
24 import my.actor.msg;
25 import my.actor.system : System;
26 import my.actor.typed : isTypedAddress, isTypedActorImpl;
27 import my.gc.refc;
28 import sumtype;
29 
30 private struct PromiseData {
31     WeakAddress replyTo;
32     ulong replyId;
33 
34     /// Copy constructor
35     this(ref return scope typeof(this) rhs) @safe nothrow @nogc {
36         replyTo = rhs.replyTo;
37         replyId = rhs.replyId;
38     }
39 
40     @disable this(this);
41 }
42 
43 // deliver can only be called one time.
44 struct Promise(T) {
45     package {
46         RefCounted!PromiseData data;
47     }
48 
49     void deliver(T reply) {
50         auto tmp = reply;
51         deliver(reply);
52     }
53 
54     /** Deliver the message `reply`.
55      *
56      * A promise can only be delivered once.
57      */
58     void deliver(ref T reply) @trusted
59     in (!data.empty, "promise must be initialized") {
60         if (data.empty)
61             return;
62         scope (exit)
63             data.release;
64 
65         // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed.
66         if (auto replyTo = data.get.replyTo.lock.get) {
67             enum wrapInTuple = !is(T : Tuple!U, U);
68             static if (wrapInTuple)
69                 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply))));
70             else
71                 replyTo.put(Reply(data.get.replyId, Variant(reply)));
72         }
73     }
74 
75     void opAssign(Promise!T rhs) {
76         data = rhs.data;
77     }
78 
79     /// True if the promise is not initialized.
80     bool empty() {
81         return data.empty || data.get.replyId == 0;
82     }
83 
84     /// Clear the promise.
85     void clear() {
86         data.release;
87     }
88 }
89 
90 auto makePromise(T)() {
91     return Promise!T(refCounted(PromiseData.init));
92 }
93 
94 struct RequestResult(T) {
95     this(T v) {
96         value = typeof(value)(v);
97     }
98 
99     this(ErrorMsg v) {
100         value = typeof(value)(v);
101     }
102 
103     this(Promise!T v) {
104         value = typeof(value)(v);
105     }
106 
107     SumType!(T, ErrorMsg, Promise!T) value;
108 }
109 
110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe;
111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg,
112         ulong replyId, WeakAddress replyTo) @safe;
113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe;
114 
115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow;
116 
117 /** Actors send error messages to others by returning an error (see Errors)
118  * from a message handler. Similar to exit messages, error messages usually
119  * cause the receiving actor to terminate, unless a custom handler was
120  * installed. The default handler is used as fallback if request is used
121  * without error handler.
122  */
123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow;
124 
125 /** Bidirectional monitoring with a strong lifetime coupling is established by
126  * calling a `LinkRequest` to an address. This will cause the runtime to send
127  * an `ExitMsg` if either this or other dies. Per default, actors terminate
128  * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal.
129  * This mechanism propagates failure states in an actor system. Linked actors
130  * form a sub system in which an error causes all actors to fail collectively.
131  */
132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow;
133 
134 /// An exception has been thrown while processing a message.
135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow;
136 
137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest`
138  * to an address. This will cause the runtime system to send a `DownMsg` for
139  * other if it dies.
140  *
141  * Actors drop down messages unless they provide a custom handler.
142  */
143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow;
144 
145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow {
146 }
147 
148 /// Write the name of the actor and the message type to the console.
149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow {
150     import std.stdio : writeln;
151 
152     try {
153         writeln("UNKNOWN message sent to actor ", self.name);
154         writeln(msg.toString);
155     } catch (Exception e) {
156     }
157 }
158 
159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow {
160     self.lastError = msg.reason;
161     self.shutdown;
162 }
163 
164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow {
165     self.lastError = msg.reason;
166     self.forceShutdown;
167 }
168 
169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow {
170     self.lastError = SystemError.runtimeError;
171     // TODO: should log?
172     self.forceShutdown;
173 }
174 
175 // Write the name of the actor and the exception to stdout.
176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow {
177     import std.stdio : writeln;
178 
179     self.lastError = SystemError.runtimeError;
180 
181     try {
182         writeln("EXCEPTION thrown by actor ", self.name);
183         writeln(e.msg);
184         writeln("TERMINATING");
185     } catch (Exception e) {
186     }
187 
188     self.forceShutdown;
189 }
190 
191 /// Timeout for an outstanding request.
192 struct ReplyHandlerTimeout {
193     ulong id;
194     SysTime timeout;
195 }
196 
197 package enum ActorState {
198     /// waiting to be started.
199     waiting,
200     /// active and processing messages.
201     active,
202     /// wait for all awaited responses to finish
203     shutdown,
204     /// discard also the awaite responses, just shutdown fast
205     forceShutdown,
206     /// in process of shutting down
207     finishShutdown,
208     /// stopped.
209     stopped,
210 }
211 
212 private struct AwaitReponse {
213     Closure!(ReplyHandler, void*) behavior;
214     ErrorHandler onError;
215 }
216 
217 struct Actor {
218     import std.container.rbtree : RedBlackTree, redBlackTree;
219 
220     package StrongAddress addr;
221     // visible in the package for logging purpose.
222     package ActorState state_ = ActorState.stopped;
223 
224     private {
225         // TODO: rename to behavior.
226         Closure!(MsgHandler, void*)[ulong] incoming;
227         Closure!(RequestHandler, void*)[ulong] reqBehavior;
228 
229         // callbacks for awaited responses key:ed on their id.
230         AwaitReponse[ulong] awaitedResponses;
231         ReplyHandlerTimeout[] replyTimeouts;
232 
233         // important that it start at 1 because then zero is known to not be initialized.
234         ulong nextReplyId = 1;
235 
236         /// Delayed messages ordered by their trigger time.
237         RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed;
238 
239         /// Used during shutdown to signal monitors and links why this actor is terminating.
240         SystemError lastError;
241 
242         /// monitoring the actor lifetime.
243         WeakAddress[size_t] monitors;
244 
245         /// strong, bidirectional link of the actors lifetime.
246         WeakAddress[size_t] links;
247 
248         // Number of messages that has been processed.
249         ulong messages_;
250 
251         /// System the actor belongs to.
252         System* homeSystem_;
253 
254         string name_;
255 
256         ErrorHandler errorHandler_;
257 
258         /// callback when a link goes down.
259         DownHandler downHandler_;
260 
261         ExitHandler exitHandler_;
262 
263         ExceptionHandler exceptionHandler_;
264 
265         DefaultHandler defaultHandler_;
266     }
267 
268     invariant () {
269         if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) {
270             assert(errorHandler_);
271             assert(exitHandler_);
272             assert(exceptionHandler_);
273             assert(defaultHandler_);
274         }
275     }
276 
277     this(StrongAddress a) @trusted
278     in (!a.empty, "address is empty") {
279         state_ = ActorState.waiting;
280 
281         addr = a;
282         addr.get.setOpen;
283         delayed = new typeof(delayed);
284 
285         errorHandler_ = toDelegate(&defaultErrorHandler);
286         downHandler_ = null;
287         exitHandler_ = toDelegate(&defaultExitHandler);
288         exceptionHandler_ = toDelegate(&defaultExceptionHandler);
289         defaultHandler_ = toDelegate(&.defaultHandler);
290     }
291 
292     WeakAddress address() @safe {
293         return addr.weakRef;
294     }
295 
296     package ref StrongAddress addressRef() return @safe pure nothrow @nogc {
297         return addr;
298     }
299 
300     ref System homeSystem() @safe pure nothrow @nogc {
301         return *homeSystem_;
302     }
303 
304     /** Clean shutdown of the actor
305      *
306      * Stopping incoming messages from triggering new behavior and finish all
307      * awaited respones.
308      */
309     void shutdown() @safe nothrow {
310         if (state_.among(ActorState.waiting, ActorState.active))
311             state_ = ActorState.shutdown;
312     }
313 
314     /** Force an immediate shutdown.
315      *
316      * Stopping incoming messages from triggering new behavior and finish all
317      * awaited respones.
318      */
319     void forceShutdown() @safe nothrow {
320         if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown))
321             state_ = ActorState.forceShutdown;
322     }
323 
324     ulong id() @safe pure nothrow const @nogc {
325         return addr.id;
326     }
327 
328     /// Returns: the name of the actor.
329     string name() @safe pure nothrow const @nogc {
330         return name_;
331     }
332 
333     // dfmt off
334 
335     /// Set name name of the actor.
336     void name(string n) @safe pure nothrow @nogc {
337         this.name_ = n;
338     }
339 
340     void errorHandler(ErrorHandler v) @safe pure nothrow @nogc {
341         errorHandler_ = v;
342     }
343 
344     void downHandler(DownHandler v) @safe pure nothrow @nogc {
345         downHandler_ = v;
346     }
347 
348     void exitHandler(ExitHandler v) @safe pure nothrow @nogc {
349         exitHandler_ = v;
350     }
351 
352     void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc {
353         exceptionHandler_ = v;
354     }
355 
356     void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc {
357         defaultHandler_ = v;
358     }
359 
360     // dfmt on
361 
362 package:
363     bool hasMessage() @safe pure nothrow @nogc {
364         return addr && addr.get.hasMessage;
365     }
366 
367     /// How long until a delayed message or a timeout fires.
368     Duration nextTimeout(const SysTime now, const Duration default_) @safe {
369         return min(delayed.empty ? default_ : (delayed.front.triggerAt - now),
370                 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now));
371     }
372 
373     bool waitingForReply() @safe pure nothrow const @nogc {
374         return !awaitedResponses.empty;
375     }
376 
377     /// Number of messages that has been processed.
378     ulong messages() @safe pure nothrow const @nogc {
379         return messages_;
380     }
381 
382     void setHomeSystem(System* sys) @safe pure nothrow @nogc {
383         homeSystem_ = sys;
384     }
385 
386     void cleanupBehavior() @trusted nothrow {
387         foreach (ref a; incoming.byValue) {
388             try {
389                 a.free;
390             } catch (Exception e) {
391                 // TODO: call exceptionHandler?
392             }
393         }
394         incoming = null;
395         foreach (ref a; reqBehavior.byValue) {
396             try {
397                 a.free;
398             } catch (Exception e) {
399             }
400         }
401         reqBehavior = null;
402     }
403 
404     void cleanupAwait() @trusted nothrow {
405         foreach (ref a; awaitedResponses.byValue) {
406             try {
407                 a.behavior.free;
408             } catch (Exception e) {
409             }
410         }
411         awaitedResponses = null;
412     }
413 
414     void cleanupDelayed() @trusted nothrow {
415         foreach (const _; 0 .. delayed.length) {
416             try {
417                 delayed.front.msg = Msg.init;
418                 delayed.removeFront;
419             } catch (Exception e) {
420             }
421         }
422         .destroy(delayed);
423     }
424 
425     bool isAlive() @safe pure nothrow const @nogc {
426         final switch (state_) {
427         case ActorState.waiting:
428             goto case;
429         case ActorState.active:
430             goto case;
431         case ActorState.shutdown:
432             goto case;
433         case ActorState.forceShutdown:
434             goto case;
435         case ActorState.finishShutdown:
436             return true;
437         case ActorState.stopped:
438             return false;
439         }
440     }
441 
442     /// Accepting messages.
443     bool isAccepting() @safe pure nothrow const @nogc {
444         final switch (state_) {
445         case ActorState.waiting:
446             goto case;
447         case ActorState.active:
448             goto case;
449         case ActorState.shutdown:
450             return true;
451         case ActorState.forceShutdown:
452             goto case;
453         case ActorState.finishShutdown:
454             goto case;
455         case ActorState.stopped:
456             return false;
457         }
458     }
459 
460     ulong replyId() @safe {
461         return nextReplyId++;
462     }
463 
464     void process(const SysTime now) @safe nothrow {
465         import core.memory : GC;
466 
467         assert(!GC.inFinalizer);
468 
469         messages_ = 0;
470 
471         void tick() {
472             // philosophy of the order is that a timeout should only trigger if it
473             // is really required thus it is checked last. This order then mean
474             // that a request may have triggered a timeout but because
475             // `processReply` is called before `checkReplyTimeout` it is *ignored*.
476             // Thus "better to accept even if it is timeout rather than fail".
477             //
478             // NOTE: the assumption that a message that has timed out should be
479             // processed turned out to be... wrong. It is annoying that
480             // sometimes a timeout message triggers even though it shouldn't,
481             // because it is now too old to be useful!
482             // Thus the order is changed to first check for timeout, then process.
483             try {
484                 processSystemMsg();
485                 checkReplyTimeout(now);
486                 processDelayed(now);
487                 processIncoming();
488                 processReply();
489             } catch (Exception e) {
490                 exceptionHandler_(this, e);
491             }
492         }
493 
494         assert(state_ == ActorState.stopped || addr, "no address");
495 
496         final switch (state_) {
497         case ActorState.waiting:
498             state_ = ActorState.active;
499             tick;
500             // the state can be changed before the actor have executed.
501             break;
502         case ActorState.active:
503             tick;
504             // self terminate if the actor has no behavior.
505             if (incoming.empty && awaitedResponses.empty && reqBehavior.empty)
506                 state_ = ActorState.forceShutdown;
507             break;
508         case ActorState.shutdown:
509             tick;
510             if (awaitedResponses.empty)
511                 state_ = ActorState.finishShutdown;
512             cleanupBehavior;
513             break;
514         case ActorState.forceShutdown:
515             state_ = ActorState.finishShutdown;
516             cleanupBehavior;
517             addr.get.setClosed;
518             break;
519         case ActorState.finishShutdown:
520             state_ = ActorState.stopped;
521 
522             sendToMonitors(DownMsg(addr.weakRef, lastError));
523 
524             sendToLinks(ExitMsg(addr.weakRef, lastError));
525 
526             replyTimeouts = null;
527             cleanupDelayed;
528             cleanupAwait;
529 
530             // must be last because sendToLinks and sendToMonitors uses addr.
531             addr.get.shutdown();
532             addr.release;
533             break;
534         case ActorState.stopped:
535             break;
536         }
537     }
538 
539     void sendToMonitors(DownMsg msg) @safe nothrow {
540         foreach (ref a; monitors.byValue) {
541             try {
542                 if (auto rc = a.lock.get)
543                     rc.put(SystemMsg(msg));
544                 a.release;
545             } catch (Exception e) {
546             }
547         }
548 
549         monitors = null;
550     }
551 
552     void sendToLinks(ExitMsg msg) @safe nothrow {
553         foreach (ref a; links.byValue) {
554             try {
555                 if (auto rc = a.lock.get)
556                     rc.put(SystemMsg(msg));
557                 a.release;
558             } catch (Exception e) {
559             }
560         }
561 
562         links = null;
563     }
564 
565     void checkReplyTimeout(const SysTime now) @safe {
566         if (replyTimeouts.empty)
567             return;
568 
569         size_t removeTo;
570         foreach (const i; 0 .. replyTimeouts.length) {
571             if (now > replyTimeouts[i].timeout) {
572                 const id = replyTimeouts[i].id;
573                 if (auto v = id in awaitedResponses) {
574                     messages_++;
575                     v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout));
576                     try {
577                         () @trusted { v.behavior.free; }();
578                     } catch (Exception e) {
579                     }
580                     awaitedResponses.remove(id);
581                 }
582                 removeTo = i + 1;
583             } else {
584                 break;
585             }
586         }
587 
588         if (removeTo >= replyTimeouts.length) {
589             replyTimeouts = null;
590         } else if (removeTo != 0) {
591             replyTimeouts = replyTimeouts[removeTo .. $];
592         }
593     }
594 
595     void processIncoming() @safe {
596         if (addr.get.empty!Msg)
597             return;
598         messages_++;
599 
600         auto front = addr.get.pop!Msg;
601         scope (exit)
602             .destroy(front);
603 
604         void doSend(ref MsgOneShot msg) {
605             if (auto v = front.get.signature in incoming) {
606                 (*v)(msg.data);
607             } else {
608                 defaultHandler_(this, msg.data);
609             }
610         }
611 
612         void doRequest(ref MsgRequest msg) @trusted {
613             if (auto v = front.get.signature in reqBehavior) {
614                 (*v)(msg.data, msg.replyId, msg.replyTo);
615             } else {
616                 defaultHandler_(this, msg.data);
617             }
618         }
619 
620         front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) {
621             doRequest(a);
622         });
623     }
624 
625     /** All system messages are handled.
626      *
627      * Assuming:
628      *  * they are not heavy to process
629      *  * very important that if there are any they should be handled as soon as possible
630      *  * ignoring the case when there is a "storm" of system messages which
631      *    "could" overload the actor system and lead to a crash. I classify this,
632      *    for now, as intentional, malicious coding by the developer themself.
633      *    External inputs that could trigger such a behavior should be controlled
634      *    and limited. Other types of input such as a developer trying to break
635      *    the actor system is out of scope.
636      */
637     void processSystemMsg() @safe {
638         //() @trusted {
639         //logger.infof("run %X", cast(void*) &this);
640         //}();
641         while (!addr.get.empty!SystemMsg) {
642             messages_++;
643             //logger.infof("%X %s %s", addr.toHash, state_, messages_);
644             auto front = addr.get.pop!SystemMsg;
645             scope (exit)
646                 .destroy(front);
647 
648             front.get.match!((ref DownMsg a) {
649                 if (downHandler_)
650                     downHandler_(this, a);
651             }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) {
652                 if (auto v = a.addr.toHash in monitors)
653                     v.release;
654                 monitors.remove(a.addr.toHash);
655             }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) {
656                 if (auto v = a.addr.toHash in links)
657                     v.release;
658                 links.remove(a.addr.toHash);
659             }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) {
660                 exitHandler_(this, a);
661             }, (ref SystemExitMsg a) {
662                 final switch (a.reason) {
663                 case ExitReason.normal:
664                     break;
665                 case ExitReason.unhandledException:
666                     exitHandler_(this, ExitMsg.init);
667                     break;
668                 case ExitReason.unknown:
669                     exitHandler_(this, ExitMsg.init);
670                     break;
671                 case ExitReason.userShutdown:
672                     exitHandler_(this, ExitMsg.init);
673                     break;
674                 case ExitReason.kill:
675                     exitHandler_(this, ExitMsg.init);
676                     // the user do NOT have an option here
677                     forceShutdown;
678                     break;
679                 }
680             });
681         }
682     }
683 
684     void processReply() @safe {
685         if (addr.get.empty!Reply)
686             return;
687         messages_++;
688 
689         auto front = addr.get.pop!Reply;
690         scope (exit)
691             .destroy(front);
692 
693         if (auto v = front.get.id in awaitedResponses) {
694             // TODO: reduce the lookups on front.id
695             v.behavior(front.get.data);
696             try {
697                 () @trusted { v.behavior.free; }();
698             } catch (Exception e) {
699             }
700             awaitedResponses.remove(front.get.id);
701             removeReplyTimeout(front.get.id);
702         } else {
703             // TODO: should probably be SystemError.unexpectedResponse?
704             defaultHandler_(this, front.get.data);
705         }
706     }
707 
708     void processDelayed(const SysTime now) @trusted {
709         if (!addr.get.empty!DelayedMsg) {
710             // count as a message because handling them are "expensive".
711             // Ignoring the case that the message right away is moved to the
712             // incoming queue. This lead to "double accounting" but ohh well.
713             // Don't use delayedSend when you should have used send.
714             messages_++;
715             delayed.insert(addr.get.pop!DelayedMsg.unsafeMove);
716         } else if (delayed.empty) {
717             return;
718         }
719 
720         foreach (const i; 0 .. delayed.length) {
721             if (now > delayed.front.triggerAt) {
722                 addr.get.put(delayed.front.msg);
723                 delayed.removeFront;
724             } else {
725                 break;
726             }
727         }
728     }
729 
730     private void removeReplyTimeout(ulong id) @safe nothrow {
731         import std.algorithm : remove;
732 
733         foreach (const i; 0 .. replyTimeouts.length) {
734             if (replyTimeouts[i].id == id) {
735                 remove(replyTimeouts, i);
736                 break;
737             }
738         }
739     }
740 
741     void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted {
742         if (!isAccepting)
743             return;
744 
745         if (auto v = signature in incoming) {
746             try {
747                 v.free;
748             } catch (Exception e) {
749             }
750         }
751         incoming[signature] = handler;
752     }
753 
754     void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted {
755         if (!isAccepting)
756             return;
757 
758         if (auto v = signature in reqBehavior) {
759             try {
760                 v.free;
761             } catch (Exception e) {
762             }
763         }
764         reqBehavior[signature] = handler;
765     }
766 
767     void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler,
768             void*) reply, ErrorHandler onError) @safe {
769         if (!isAccepting)
770             return;
771 
772         awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError);
773         replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout);
774         schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts);
775     }
776 }
777 
778 struct Closure(Fn, CtxT) {
779     alias FreeFn = void function(CtxT);
780 
781     Fn fn;
782     CtxT ctx;
783     FreeFn cleanup;
784 
785     this(Fn fn) {
786         this.fn = fn;
787     }
788 
789     this(Fn fn, CtxT* ctx, FreeFn cleanup) {
790         this.fn = fn;
791         this.ctx = ctx;
792         this.cleanup = cleanup;
793     }
794 
795     void opCall(Args...)(auto ref Args args) {
796         assert(fn !is null);
797         fn(ctx, args);
798     }
799 
800     void free() {
801         // will crash, on purpuse, if there is a ctx and no cleanup registered.
802         // maybe a bad idea? dunno... lets see
803         if (ctx)
804             cleanup(ctx);
805         ctx = CtxT.init;
806     }
807 }
808 
809 @("shall register a behavior to be called when msg received matching signature")
810 unittest {
811     auto addr = makeAddress2;
812     auto actor = Actor(addr);
813 
814     bool processedIncoming;
815     void fn(void* ctx, ref Variant msg) {
816         processedIncoming = true;
817     }
818 
819     actor.register(1, Closure!(MsgHandler, void*)(&fn));
820     addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42)))));
821 
822     actor.process(Clock.currTime);
823 
824     assert(processedIncoming);
825 }
826 
827 private void cleanupCtx(CtxT)(void* ctx)
828         if (is(CtxT == Tuple!T, T) || is(CtxT == void)) {
829     import std.traits;
830     import my.actor.typed;
831 
832     static if (!is(CtxT == void)) {
833         // trust that any use of this also pass on the correct context type.
834         auto userCtx = () @trusted { return cast(CtxT*) ctx; }();
835         // release the context such as if it holds a rc object.
836         alias Types = CtxT.Types;
837 
838         static foreach (const i; 0 .. CtxT.Types.length) {
839             {
840                 alias T = CtxT.Types[i];
841                 alias UT = Unqual!T;
842                 static if (!is(T == UT)) {
843                     static assert(!is(UT : WeakAddress),
844                             "WeakAddress must NEVER be const or immutable");
845                     static assert(!is(UT : TypedAddress!M, M...),
846                             "WeakAddress must NEVER be const or immutable: " ~ T.stringof);
847                 }
848                 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit?
849 
850                 static if (is(UT == T)) {
851                     .destroy((*userCtx)[i]);
852                 }
853             }
854         }
855     }
856 }
857 
858 @("shall default initialize when possible, skipping const/immutable")
859 unittest {
860     {
861         auto x = tuple(cast(const) 42, 43);
862         alias T = typeof(x);
863         cleanupCtx!T(cast(void*)&x);
864         assert(x[0] == 42); // can't assign to const
865         assert(x[1] == 0);
866     }
867 
868     {
869         import my.path : Path;
870 
871         auto x = tuple(Path.init, cast(const) Path("foo"));
872         alias T = typeof(x);
873         cleanupCtx!T(cast(void*)&x);
874         assert(x[0] == Path.init);
875         assert(x[1] == Path("foo"));
876     }
877 }
878 
879 package struct Action {
880     Closure!(MsgHandler, void*) action;
881     ulong signature;
882 }
883 
884 /// An behavior for an actor when it receive a message of `signature`.
885 package auto makeAction(T, CtxT = void)(T handler) @safe
886         if (isFunction!T || isFunctionPointer!T) {
887     static if (is(CtxT == void))
888         alias Params = Parameters!T;
889     else {
890         alias CtxParam = Parameters!T[0];
891         alias Params = Parameters!T[1 .. $];
892         checkMatchingCtx!(CtxParam, CtxT);
893         checkRefForContext!handler;
894     }
895 
896     alias HArgs = staticMap!(Unqual, Params);
897 
898     void fn(void* ctx, ref Variant msg) @trusted {
899         static if (is(CtxT == void)) {
900             handler(msg.get!(Tuple!HArgs).expand);
901         } else {
902             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
903             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
904         }
905     }
906 
907     return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
908 }
909 
910 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe {
911     static if (is(CtxT == void))
912         alias Params = Parameters!T;
913     else {
914         alias CtxParam = Parameters!T[0];
915         alias Params = Parameters!T[1 .. $];
916         checkMatchingCtx!(CtxParam, CtxT);
917         checkRefForContext!handler;
918     }
919 
920     alias HArgs = staticMap!(Unqual, Params);
921 
922     void fn(void* ctx, ref Variant msg) @trusted {
923         static if (is(CtxT == void)) {
924             handler(msg.get!(Tuple!HArgs).expand);
925         } else {
926             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
927             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
928         }
929     }
930 
931     return typeof(return)(&fn, null, &cleanupCtx!CtxT);
932 }
933 
934 package struct Request {
935     Closure!(RequestHandler, void*) request;
936     ulong signature;
937 }
938 
939 private string locToString(Loc...)() {
940     import std.conv : to;
941 
942     return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string;
943 }
944 
945 /// Check that the context parameter is `ref` otherwise issue a warning.
946 package void checkRefForContext(alias handler)() {
947     import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
948 
949     alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0];
950 
951     static if (CtxParam != ParameterStorageClass.ref_) {
952         pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof);
953         static assert(CtxParam == ParameterStorageClass.ref_,
954                 "The context must be `ref` to avoid unnecessary copying");
955     }
956 }
957 
958 package void checkMatchingCtx(CtxParam, CtxT)() {
959     static if (!is(CtxT == CtxParam)) {
960         static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }),
961                 "mismatch between the context type " ~ CtxT.stringof
962                 ~ " and the first parameter " ~ CtxParam.stringof);
963     }
964 }
965 
966 package auto makeRequest(T, CtxT = void)(T handler) @safe {
967     static assert(!is(ReturnType!T == void), "handler returns void, not allowed");
968 
969     alias RType = ReturnType!T;
970     enum isReqResult = is(RType : RequestResult!ReqT, ReqT);
971     enum isPromise = is(RType : Promise!PromT, PromT);
972 
973     static if (is(CtxT == void))
974         alias Params = Parameters!T;
975     else {
976         alias CtxParam = Parameters!T[0];
977         alias Params = Parameters!T[1 .. $];
978         checkMatchingCtx!(CtxParam, CtxT);
979         checkRefForContext!handler;
980     }
981 
982     alias HArgs = staticMap!(Unqual, Params);
983 
984     void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted {
985         static if (is(CtxT == void)) {
986             auto r = handler(msg.get!(Tuple!HArgs).expand);
987         } else {
988             auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx;
989             auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand);
990         }
991 
992         static if (isReqResult) {
993             r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) {
994                 assert(!a.data.empty, "the promise MUST be constructed before it is returned");
995                 a.data.get.replyId = replyId;
996                 a.data.get.replyTo = replyTo;
997             }, (data) {
998                 enum wrapInTuple = !is(typeof(data) : Tuple!U, U);
999                 if (auto rc = replyTo.lock.get) {
1000                     static if (wrapInTuple)
1001                         rc.put(Reply(replyId, Variant(tuple(data))));
1002                     else
1003                         rc.put(Reply(replyId, Variant(data)));
1004                 }
1005             });
1006         } else static if (isPromise) {
1007             r.data.get.replyId = replyId;
1008             r.data.get.replyTo = replyTo;
1009         } else {
1010             // TODO: is this syntax for U one variable or variable. I want it to be variable.
1011             enum wrapInTuple = !is(RType : Tuple!U, U);
1012             if (auto rc = replyTo.lock.get) {
1013                 static if (wrapInTuple)
1014                     rc.put(Reply(replyId, Variant(tuple(r))));
1015                 else
1016                     rc.put(Reply(replyId, Variant(r)));
1017             }
1018         }
1019     }
1020 
1021     return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
1022 }
1023 
1024 @("shall link two actors lifetime")
1025 unittest {
1026     int count;
1027     void countExits(ref Actor self, ExitMsg msg) @safe nothrow {
1028         count++;
1029         self.shutdown;
1030     }
1031 
1032     auto aa1 = Actor(makeAddress2);
1033     auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize;
1034     auto aa2 = Actor(makeAddress2);
1035     auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize;
1036 
1037     a1.linkTo(a2.address);
1038     a1.process(Clock.currTime);
1039     a2.process(Clock.currTime);
1040 
1041     assert(a1.isAlive);
1042     assert(a2.isAlive);
1043 
1044     sendExit(a1.address, ExitReason.userShutdown);
1045     foreach (_; 0 .. 5) {
1046         a1.process(Clock.currTime);
1047         a2.process(Clock.currTime);
1048     }
1049 
1050     assert(!a1.isAlive);
1051     assert(!a2.isAlive);
1052     assert(count == 2);
1053 }
1054 
1055 @("shall let one actor monitor the lifetime of the other one")
1056 unittest {
1057     int count;
1058     void downMsg(ref Actor self, DownMsg msg) @safe nothrow {
1059         count++;
1060     }
1061 
1062     auto aa1 = Actor(makeAddress2);
1063     auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize;
1064     auto aa2 = Actor(makeAddress2);
1065     auto a2 = build(&aa2).set((int x) {}).finalize;
1066 
1067     a1.monitor(a2.address);
1068     a1.process(Clock.currTime);
1069     a2.process(Clock.currTime);
1070 
1071     assert(a1.isAlive);
1072     assert(a2.isAlive);
1073 
1074     sendExit(a2.address, ExitReason.userShutdown);
1075     foreach (_; 0 .. 5) {
1076         a1.process(Clock.currTime);
1077         a2.process(Clock.currTime);
1078     }
1079 
1080     assert(a1.isAlive);
1081     assert(!a2.isAlive);
1082     assert(count == 1);
1083 }
1084 
1085 private struct BuildActor {
1086     Actor* actor;
1087 
1088     Actor* finalize() @safe {
1089         auto rval = actor;
1090         actor = null;
1091         return rval;
1092     }
1093 
1094     auto errorHandler(ErrorHandler a) {
1095         actor.errorHandler = a;
1096         return this;
1097     }
1098 
1099     auto downHandler_(DownHandler a) {
1100         actor.downHandler_ = a;
1101         return this;
1102     }
1103 
1104     auto exitHandler_(ExitHandler a) {
1105         actor.exitHandler_ = a;
1106         return this;
1107     }
1108 
1109     auto exceptionHandler_(ExceptionHandler a) {
1110         actor.exceptionHandler_ = a;
1111         return this;
1112     }
1113 
1114     auto defaultHandler_(DefaultHandler a) {
1115         actor.defaultHandler_ = a;
1116         return this;
1117     }
1118 
1119     auto set(BehaviorT)(BehaviorT behavior)
1120             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1121                 && !is(ReturnType!BehaviorT == void)) {
1122         auto act = makeRequest(behavior);
1123         actor.register(act.signature, act.request);
1124         return this;
1125     }
1126 
1127     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1128             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1129                 && !is(ReturnType!BehaviorT == void)) {
1130         auto act = makeRequest!(BehaviorT, CT)(behavior);
1131         // for now just use the GC to allocate the context on.
1132         // TODO: use an allocator.
1133         act.request.ctx = cast(void*) new CT(c);
1134         actor.register(act.signature, act.request);
1135         return this;
1136     }
1137 
1138     auto set(BehaviorT)(BehaviorT behavior)
1139             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1140                 && is(ReturnType!BehaviorT == void)) {
1141         auto act = makeAction(behavior);
1142         actor.register(act.signature, act.action);
1143         return this;
1144     }
1145 
1146     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1147             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1148                 && is(ReturnType!BehaviorT == void)) {
1149         auto act = makeAction!(BehaviorT, CT)(behavior);
1150         // for now just use the GC to allocate the context on.
1151         // TODO: use an allocator.
1152         act.action.ctx = cast(void*) new CT(c);
1153         actor.register(act.signature, act.action);
1154         return this;
1155     }
1156 }
1157 
1158 package BuildActor build(Actor* a) @safe {
1159     return BuildActor(a);
1160 }
1161 
1162 /// Implement an actor.
1163 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) {
1164     import my.actor.msg : isCapture, Capture;
1165 
1166     auto bactor = build(self);
1167     static foreach (const i; 0 .. Behavior.length) {
1168         {
1169             alias b = Behavior[i];
1170 
1171             static if (!isCapture!b) {
1172                 static if (!(isFunction!(b) || isFunctionPointer!(b)))
1173                     static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof);
1174 
1175                 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) {
1176                     bactor.set(behaviors[i], behaviors[i + 1]);
1177                 } else
1178                     bactor.set(behaviors[i]);
1179             }
1180         }
1181     }
1182 
1183     return bactor.finalize;
1184 }
1185 
1186 @("build dynamic actor from functions")
1187 unittest {
1188     static void fn3(int s) @safe {
1189     }
1190 
1191     static string fn4(int s) @safe {
1192         return "foo";
1193     }
1194 
1195     static Tuple!(int, string) fn5(const string s) @safe {
1196         return typeof(return)(42, "hej");
1197     }
1198 
1199     auto aa1 = Actor(makeAddress2);
1200     auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize;
1201 }
1202 
1203 unittest {
1204     bool delayOk;
1205     static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe {
1206         *c.delayOk = true;
1207     }
1208 
1209     bool delayShouldNeverHappen;
1210     static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe {
1211         *c.delayShouldNeverHappen = true;
1212     }
1213 
1214     auto aa1 = Actor(makeAddress2);
1215     auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2,
1216             capture(&delayShouldNeverHappen)).finalize;
1217     delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo");
1218     delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42);
1219 
1220     assert(!actor.addressRef.get.empty!DelayedMsg);
1221     assert(actor.addressRef.get.empty!Msg);
1222     assert(actor.addressRef.get.empty!Reply);
1223 
1224     actor.process(Clock.currTime);
1225 
1226     assert(!actor.addressRef.get.empty!DelayedMsg);
1227     assert(actor.addressRef.get.empty!Msg);
1228     assert(actor.addressRef.get.empty!Reply);
1229 
1230     actor.process(Clock.currTime);
1231     actor.process(Clock.currTime);
1232 
1233     assert(actor.addressRef.get.empty!DelayedMsg);
1234     assert(actor.addressRef.get.empty!Msg);
1235     assert(actor.addressRef.get.empty!Reply);
1236 
1237     assert(delayOk);
1238     assert(!delayShouldNeverHappen);
1239 }
1240 
1241 @("shall process a request->then chain xyz")
1242 @system unittest {
1243     // checking capture is correctly setup/teardown by using captured rc.
1244 
1245     auto rcReq = refCounted(42);
1246     bool calledOk;
1247     static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s,
1248             const string b) {
1249         assert(2 == ctx[1].refCount);
1250         if (s == "apa")
1251             *ctx.calledOk = true;
1252         return "foo";
1253     }
1254 
1255     auto rcReply = refCounted(42);
1256     bool calledReply;
1257     static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) {
1258         *ctx[0] = s == "foo";
1259         assert(2 == ctx[1].refCount);
1260     }
1261 
1262     auto aa1 = Actor(makeAddress2);
1263     auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize;
1264 
1265     assert(2 == rcReq.refCount);
1266     assert(1 == rcReply.refCount);
1267 
1268     actor.request(actor.address, infTimeout).send("apa", "foo")
1269         .capture(&calledReply, rcReply).then(&reply);
1270     assert(2 == rcReply.refCount);
1271 
1272     assert(!actor.addr.get.empty!Msg);
1273     assert(actor.addr.get.empty!Reply);
1274 
1275     actor.process(Clock.currTime);
1276     assert(actor.addr.get.empty!Msg);
1277     assert(actor.addr.get.empty!Reply);
1278 
1279     assert(2 == rcReq.refCount);
1280     assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back");
1281 
1282     assert(calledOk);
1283     assert(calledReply);
1284 
1285     actor.shutdown;
1286     while (actor.isAlive)
1287         actor.process(Clock.currTime);
1288 }
1289 
1290 @("shall process a request->then chain using promises")
1291 unittest {
1292     static struct A {
1293         string v;
1294     }
1295 
1296     static struct B {
1297         string v;
1298     }
1299 
1300     int calledOk;
1301     auto fn1p = makePromise!string;
1302     static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted {
1303         if (a.v == "apa")
1304             (*c.calledOk)++;
1305         return typeof(return)(c.p);
1306     }
1307 
1308     auto fn2p = makePromise!string;
1309     static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) {
1310         (*c.calledOk)++;
1311         return c.p;
1312     }
1313 
1314     int calledReply;
1315     static void reply(ref Tuple!(int*) ctx, const string s) {
1316         if (s == "foo")
1317             *ctx[0] += 1;
1318     }
1319 
1320     auto aa1 = Actor(makeAddress2);
1321     auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2,
1322             capture(&calledOk, fn2p)).finalize;
1323 
1324     actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply);
1325     actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply);
1326 
1327     actor.process(Clock.currTime);
1328     assert(calledOk == 1); // first request
1329     assert(calledReply == 0);
1330 
1331     fn1p.deliver("foo");
1332 
1333     assert(calledReply == 0);
1334 
1335     actor.process(Clock.currTime);
1336     assert(calledOk == 2); // second request triggered
1337     assert(calledReply == 1);
1338 
1339     fn2p.deliver("foo");
1340     actor.process(Clock.currTime);
1341 
1342     assert(calledReply == 2);
1343 
1344     actor.shutdown;
1345     while (actor.isAlive) {
1346         actor.process(Clock.currTime);
1347     }
1348 }
1349 
1350 /// The timeout triggered.
1351 class ScopedActorException : Exception {
1352     this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow {
1353         super(null, file, line);
1354         error = err;
1355     }
1356 
1357     ScopedActorError error;
1358 }
1359 
1360 enum ScopedActorError : ubyte {
1361     none,
1362     // actor address is down
1363     down,
1364     // request timeout
1365     timeout,
1366     // the address where unable to process the received message
1367     unknownMsg,
1368     // some type of fatal error occured.
1369     fatal,
1370 }
1371 
1372 /** Intended to be used in a local scope by a user.
1373  *
1374  * `ScopedActor` is not thread safe.
1375  */
1376 struct ScopedActor {
1377     import my.actor.typed : underlyingAddress, underlyingWeakAddress;
1378 
1379     private {
1380         static struct Data {
1381             Actor self;
1382             ScopedActorError errSt;
1383 
1384             ~this() @safe {
1385                 if (self.addr.empty)
1386                     return;
1387 
1388                 () @trusted {
1389                     self.downHandler = null;
1390                     self.defaultHandler = toDelegate(&.defaultHandler);
1391                     self.errorHandler = toDelegate(&defaultErrorHandler);
1392                 }();
1393 
1394                 self.shutdown;
1395                 while (self.isAlive) {
1396                     self.process(Clock.currTime);
1397                 }
1398             }
1399         }
1400 
1401         RefCounted!Data data;
1402     }
1403 
1404     this(StrongAddress addr, string name) @safe {
1405         data = refCounted(Data(Actor(addr)));
1406         data.get.self.name = name;
1407     }
1408 
1409     private void reset() @safe nothrow {
1410         data.get.errSt = ScopedActorError.none;
1411     }
1412 
1413     SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout)
1414             if (isAddress!TAddress) {
1415         reset;
1416         auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout);
1417         return SRequestSend(rs, this);
1418     }
1419 
1420     private static struct SRequestSend {
1421         RequestSend rs;
1422         ScopedActor self;
1423 
1424         /// Copy constructor
1425         this(ref return typeof(this) rhs) @safe pure nothrow @nogc {
1426             rs = rhs.rs;
1427             self = rhs.self;
1428         }
1429 
1430         @disable this(this);
1431 
1432         SRequestSendThen send(Args...)(auto ref Args args) {
1433             return SRequestSendThen(.send(rs, args), self);
1434         }
1435     }
1436 
1437     private static struct SRequestSendThen {
1438         RequestSendThen rs;
1439         ScopedActor self;
1440         uint backoff;
1441 
1442         /// Copy constructor
1443         this(ref return typeof(this) rhs) {
1444             rs = rhs.rs;
1445             self = rhs.self;
1446             backoff = rhs.backoff;
1447         }
1448 
1449         @disable this(this);
1450 
1451         void dynIntervalSleep() @trusted {
1452             // +100 usecs "feels good", magic number. current OS and
1453             // implementation of message passing isn't that much faster than
1454             // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They
1455             // aren't expected to be used for "time critical" sections.
1456             Thread.sleep(backoff.dur!"usecs");
1457             backoff = min(backoff + 100, 20000);
1458         }
1459 
1460         private static struct ValueCapture {
1461             RefCounted!Data data;
1462 
1463             void downHandler(ref Actor, DownMsg) @safe nothrow {
1464                 data.get.errSt = ScopedActorError.down;
1465             }
1466 
1467             void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow {
1468                 if (msg.reason == SystemError.requestTimeout)
1469                     data.get.errSt = ScopedActorError.timeout;
1470                 else
1471                     data.get.errSt = ScopedActorError.fatal;
1472             }
1473 
1474             void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow {
1475                 logAndDropHandler(a, msg);
1476                 data.get.errSt = ScopedActorError.unknownMsg;
1477             }
1478         }
1479 
1480         void then(T)(T handler, ErrorHandler onError = null) {
1481             scope (exit)
1482                 demonitor(rs.rs.self, rs.rs.requestTo);
1483             monitor(rs.rs.self, rs.rs.requestTo);
1484 
1485             auto callback = new ValueCapture(self.data);
1486             self.data.get.self.downHandler = &callback.downHandler;
1487             self.data.get.self.defaultHandler = &callback.unknownMsgHandler;
1488             self.data.get.self.errorHandler = &callback.errorHandler;
1489 
1490             () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }();
1491 
1492             scope (exit)
1493                 () @trusted {
1494                 self.data.get.self.downHandler = null;
1495                 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler);
1496                 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler);
1497             }();
1498 
1499             auto requestTo = rs.rs.requestTo.lock;
1500             if (!requestTo)
1501                 throw new ScopedActorException(ScopedActorError.down);
1502 
1503             // TODO: this loop is stupid... should use a conditional variable
1504             // instead but that requires changing the mailbox. later
1505             do {
1506                 rs.rs.self.process(Clock.currTime);
1507                 // force the actor to be alive even though there are no behaviors.
1508                 rs.rs.self.state_ = ActorState.waiting;
1509 
1510                 if (self.data.get.errSt == ScopedActorError.none) {
1511                     dynIntervalSleep;
1512                 } else {
1513                     throw new ScopedActorException(self.data.get.errSt);
1514                 }
1515 
1516             }
1517             while (self.data.get.self.waitingForReply);
1518         }
1519     }
1520 }
1521 
1522 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe {
1523     import std.format : format;
1524 
1525     return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line));
1526 }
1527 
1528 @(
1529         "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed")
1530 unittest {
1531     import my.actor.system;
1532 
1533     auto sys = makeSystem;
1534 
1535     auto a0 = sys.spawn((Actor* self) {
1536         return impl(self, (ref CSelf!() ctx, int x) {
1537             Thread.sleep(50.dur!"msecs");
1538             return 42;
1539         }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self),
1540             (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self));
1541     });
1542 
1543     {
1544         auto self = scopedActor;
1545         bool excThrown;
1546         auto stopAt = Clock.currTime + 3.dur!"seconds";
1547         while (!excThrown && Clock.currTime < stopAt) {
1548             try {
1549                 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {});
1550             } catch (ScopedActorException e) {
1551                 excThrown = e.error == ScopedActorError.timeout;
1552             } catch (Exception e) {
1553                 logger.info(e.msg);
1554             }
1555         }
1556         assert(excThrown, "timeout did not trigger as expected");
1557     }
1558 
1559     {
1560         auto self = scopedActor;
1561         bool excThrown;
1562         auto stopAt = Clock.currTime + 3.dur!"seconds";
1563         while (!excThrown && Clock.currTime < stopAt) {
1564             try {
1565                 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) {
1566                 });
1567             } catch (ScopedActorException e) {
1568                 excThrown = e.error == ScopedActorError.down;
1569             } catch (Exception e) {
1570                 logger.info(e.msg);
1571             }
1572         }
1573         assert(excThrown, "detecting terminated actor did not trigger as expected");
1574     }
1575 }