1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.actor;
7
8 import std.stdio : writeln, writefln;
9
10 import core.thread : Thread;
11 import logger = std.experimental.logger;
12 import std.algorithm : schwartzSort, max, min, among;
13 import std.array : empty;
14 import std.datetime : SysTime, Clock, dur;
15 import std.exception : collectException;
16 import std.functional : toDelegate;
17 import std.meta : staticMap;
18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction;
19 import std.typecons : Tuple, tuple;
20 import std.variant : Variant;
21
22 import my.actor.common : ExitReason, SystemError, makeSignature;
23 import my.actor.mailbox;
24 import my.actor.msg;
25 import my.actor.system : System;
26 import my.actor.typed : isTypedAddress, isTypedActorImpl;
27 import my.gc.refc;
28 import sumtype;
29
30 private struct PromiseData {
31 WeakAddress replyTo;
32 ulong replyId;
33
34 /// Copy constructor
35 this(ref return scope typeof(this) rhs) @safe nothrow @nogc {
36 replyTo = rhs.replyTo;
37 replyId = rhs.replyId;
38 }
39
40 @disable this(this);
41 }
42
43 // deliver can only be called one time.
44 struct Promise(T) {
45 package {
46 RefCounted!PromiseData data;
47 }
48
49 void deliver(T reply) {
50 auto tmp = reply;
51 deliver(reply);
52 }
53
54 /** Deliver the message `reply`.
55 *
56 * A promise can only be delivered once.
57 */
58 void deliver(ref T reply) @trusted
59 in (!data.empty, "promise must be initialized") {
60 if (data.empty)
61 return;
62 scope (exit)
63 data.release;
64
65 // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed.
66 if (auto replyTo = data.get.replyTo.lock.get) {
67 enum wrapInTuple = !is(T : Tuple!U, U);
68 static if (wrapInTuple)
69 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply))));
70 else
71 replyTo.put(Reply(data.get.replyId, Variant(reply)));
72 }
73 }
74
75 void opAssign(Promise!T rhs) {
76 data = rhs.data;
77 }
78
79 /// True if the promise is not initialized.
80 bool empty() {
81 return data.empty || data.get.replyId == 0;
82 }
83
84 /// Clear the promise.
85 void clear() {
86 data.release;
87 }
88 }
89
90 auto makePromise(T)() {
91 return Promise!T(refCounted(PromiseData.init));
92 }
93
94 struct RequestResult(T) {
95 this(T v) {
96 value = typeof(value)(v);
97 }
98
99 this(ErrorMsg v) {
100 value = typeof(value)(v);
101 }
102
103 this(Promise!T v) {
104 value = typeof(value)(v);
105 }
106
107 SumType!(T, ErrorMsg, Promise!T) value;
108 }
109
110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe;
111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg,
112 ulong replyId, WeakAddress replyTo) @safe;
113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe;
114
115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow;
116
117 /** Actors send error messages to others by returning an error (see Errors)
118 * from a message handler. Similar to exit messages, error messages usually
119 * cause the receiving actor to terminate, unless a custom handler was
120 * installed. The default handler is used as fallback if request is used
121 * without error handler.
122 */
123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow;
124
125 /** Bidirectional monitoring with a strong lifetime coupling is established by
126 * calling a `LinkRequest` to an address. This will cause the runtime to send
127 * an `ExitMsg` if either this or other dies. Per default, actors terminate
128 * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal.
129 * This mechanism propagates failure states in an actor system. Linked actors
130 * form a sub system in which an error causes all actors to fail collectively.
131 */
132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow;
133
134 /// An exception has been thrown while processing a message.
135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow;
136
137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest`
138 * to an address. This will cause the runtime system to send a `DownMsg` for
139 * other if it dies.
140 *
141 * Actors drop down messages unless they provide a custom handler.
142 */
143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow;
144
145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow {
146 }
147
148 /// Write the name of the actor and the message type to the console.
149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow {
150 import std.stdio : writeln;
151
152 try {
153 writeln("UNKNOWN message sent to actor ", self.name);
154 writeln(msg.toString);
155 } catch (Exception e) {
156 }
157 }
158
159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow {
160 self.lastError = msg.reason;
161 self.shutdown;
162 }
163
164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow {
165 self.lastError = msg.reason;
166 self.forceShutdown;
167 }
168
169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow {
170 self.lastError = SystemError.runtimeError;
171 // TODO: should log?
172 self.forceShutdown;
173 }
174
175 // Write the name of the actor and the exception to stdout.
176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow {
177 import std.stdio : writeln;
178
179 self.lastError = SystemError.runtimeError;
180
181 try {
182 writeln("EXCEPTION thrown by actor ", self.name);
183 writeln(e.msg);
184 writeln("TERMINATING");
185 } catch (Exception e) {
186 }
187
188 self.forceShutdown;
189 }
190
191 /// Timeout for an outstanding request.
192 struct ReplyHandlerTimeout {
193 ulong id;
194 SysTime timeout;
195 }
196
197 package enum ActorState {
198 /// waiting to be started.
199 waiting,
200 /// active and processing messages.
201 active,
202 /// wait for all awaited responses to finish
203 shutdown,
204 /// discard also the awaite responses, just shutdown fast
205 forceShutdown,
206 /// in process of shutting down
207 finishShutdown,
208 /// stopped.
209 stopped,
210 }
211
212 private struct AwaitReponse {
213 Closure!(ReplyHandler, void*) behavior;
214 ErrorHandler onError;
215 }
216
217 struct Actor {
218 import std.container.rbtree : RedBlackTree, redBlackTree;
219
220 package StrongAddress addr;
221 // visible in the package for logging purpose.
222 package ActorState state_ = ActorState.stopped;
223
224 private {
225 // TODO: rename to behavior.
226 Closure!(MsgHandler, void*)[ulong] incoming;
227 Closure!(RequestHandler, void*)[ulong] reqBehavior;
228
229 // callbacks for awaited responses key:ed on their id.
230 AwaitReponse[ulong] awaitedResponses;
231 ReplyHandlerTimeout[] replyTimeouts;
232
233 // important that it start at 1 because then zero is known to not be initialized.
234 ulong nextReplyId = 1;
235
236 /// Delayed messages ordered by their trigger time.
237 RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed;
238
239 /// Used during shutdown to signal monitors and links why this actor is terminating.
240 SystemError lastError;
241
242 /// monitoring the actor lifetime.
243 WeakAddress[size_t] monitors;
244
245 /// strong, bidirectional link of the actors lifetime.
246 WeakAddress[size_t] links;
247
248 // Number of messages that has been processed.
249 ulong messages_;
250
251 /// System the actor belongs to.
252 System* homeSystem_;
253
254 string name_;
255
256 ErrorHandler errorHandler_;
257
258 /// callback when a link goes down.
259 DownHandler downHandler_;
260
261 ExitHandler exitHandler_;
262
263 ExceptionHandler exceptionHandler_;
264
265 DefaultHandler defaultHandler_;
266 }
267
268 invariant () {
269 if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) {
270 assert(errorHandler_);
271 assert(exitHandler_);
272 assert(exceptionHandler_);
273 assert(defaultHandler_);
274 }
275 }
276
277 this(StrongAddress a) @trusted
278 in (!a.empty, "address is empty") {
279 state_ = ActorState.waiting;
280
281 addr = a;
282 addr.get.setOpen;
283 delayed = new typeof(delayed);
284
285 errorHandler_ = toDelegate(&defaultErrorHandler);
286 downHandler_ = null;
287 exitHandler_ = toDelegate(&defaultExitHandler);
288 exceptionHandler_ = toDelegate(&defaultExceptionHandler);
289 defaultHandler_ = toDelegate(&.defaultHandler);
290 }
291
292 WeakAddress address() @safe {
293 return addr.weakRef;
294 }
295
296 package ref StrongAddress addressRef() return @safe pure nothrow @nogc {
297 return addr;
298 }
299
300 ref System homeSystem() @safe pure nothrow @nogc {
301 return *homeSystem_;
302 }
303
304 /** Clean shutdown of the actor
305 *
306 * Stopping incoming messages from triggering new behavior and finish all
307 * awaited respones.
308 */
309 void shutdown() @safe nothrow {
310 if (state_.among(ActorState.waiting, ActorState.active))
311 state_ = ActorState.shutdown;
312 }
313
314 /** Force an immediate shutdown.
315 *
316 * Stopping incoming messages from triggering new behavior and finish all
317 * awaited respones.
318 */
319 void forceShutdown() @safe nothrow {
320 if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown))
321 state_ = ActorState.forceShutdown;
322 }
323
324 ulong id() @safe pure nothrow const @nogc {
325 return addr.id;
326 }
327
328 /// Returns: the name of the actor.
329 string name() @safe pure nothrow const @nogc {
330 return name_;
331 }
332
333 // dfmt off
334
335 /// Set name name of the actor.
336 void name(string n) @safe pure nothrow @nogc {
337 this.name_ = n;
338 }
339
340 void errorHandler(ErrorHandler v) @safe pure nothrow @nogc {
341 errorHandler_ = v;
342 }
343
344 void downHandler(DownHandler v) @safe pure nothrow @nogc {
345 downHandler_ = v;
346 }
347
348 void exitHandler(ExitHandler v) @safe pure nothrow @nogc {
349 exitHandler_ = v;
350 }
351
352 void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc {
353 exceptionHandler_ = v;
354 }
355
356 void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc {
357 defaultHandler_ = v;
358 }
359
360 // dfmt on
361
362 package:
363 bool hasMessage() @safe pure nothrow @nogc {
364 return addr && addr.get.hasMessage;
365 }
366
367 /// How long until a delayed message or a timeout fires.
368 Duration nextTimeout(const SysTime now, const Duration default_) @safe {
369 return min(delayed.empty ? default_ : (delayed.front.triggerAt - now),
370 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now));
371 }
372
373 bool waitingForReply() @safe pure nothrow const @nogc {
374 return !awaitedResponses.empty;
375 }
376
377 /// Number of messages that has been processed.
378 ulong messages() @safe pure nothrow const @nogc {
379 return messages_;
380 }
381
382 void setHomeSystem(System* sys) @safe pure nothrow @nogc {
383 homeSystem_ = sys;
384 }
385
386 void cleanupBehavior() @trusted nothrow {
387 foreach (ref a; incoming.byValue) {
388 try {
389 a.free;
390 } catch (Exception e) {
391 // TODO: call exceptionHandler?
392 }
393 }
394 incoming = null;
395 foreach (ref a; reqBehavior.byValue) {
396 try {
397 a.free;
398 } catch (Exception e) {
399 }
400 }
401 reqBehavior = null;
402 }
403
404 void cleanupAwait() @trusted nothrow {
405 foreach (ref a; awaitedResponses.byValue) {
406 try {
407 a.behavior.free;
408 } catch (Exception e) {
409 }
410 }
411 awaitedResponses = null;
412 }
413
414 void cleanupDelayed() @trusted nothrow {
415 foreach (const _; 0 .. delayed.length) {
416 try {
417 delayed.front.msg = Msg.init;
418 delayed.removeFront;
419 } catch (Exception e) {
420 }
421 }
422 .destroy(delayed);
423 }
424
425 bool isAlive() @safe pure nothrow const @nogc {
426 final switch (state_) {
427 case ActorState.waiting:
428 goto case;
429 case ActorState.active:
430 goto case;
431 case ActorState.shutdown:
432 goto case;
433 case ActorState.forceShutdown:
434 goto case;
435 case ActorState.finishShutdown:
436 return true;
437 case ActorState.stopped:
438 return false;
439 }
440 }
441
442 /// Accepting messages.
443 bool isAccepting() @safe pure nothrow const @nogc {
444 final switch (state_) {
445 case ActorState.waiting:
446 goto case;
447 case ActorState.active:
448 goto case;
449 case ActorState.shutdown:
450 return true;
451 case ActorState.forceShutdown:
452 goto case;
453 case ActorState.finishShutdown:
454 goto case;
455 case ActorState.stopped:
456 return false;
457 }
458 }
459
460 ulong replyId() @safe {
461 return nextReplyId++;
462 }
463
464 void process(const SysTime now) @safe nothrow {
465 import core.memory : GC;
466
467 assert(!GC.inFinalizer);
468
469 messages_ = 0;
470
471 void tick() {
472 // philosophy of the order is that a timeout should only trigger if it
473 // is really required thus it is checked last. This order then mean
474 // that a request may have triggered a timeout but because
475 // `processReply` is called before `checkReplyTimeout` it is *ignored*.
476 // Thus "better to accept even if it is timeout rather than fail".
477 //
478 // NOTE: the assumption that a message that has timed out should be
479 // processed turned out to be... wrong. It is annoying that
480 // sometimes a timeout message triggers even though it shouldn't,
481 // because it is now too old to be useful!
482 // Thus the order is changed to first check for timeout, then process.
483 try {
484 processSystemMsg();
485 checkReplyTimeout(now);
486 processDelayed(now);
487 processIncoming();
488 processReply();
489 } catch (Exception e) {
490 exceptionHandler_(this, e);
491 }
492 }
493
494 assert(state_ == ActorState.stopped || addr, "no address");
495
496 final switch (state_) {
497 case ActorState.waiting:
498 state_ = ActorState.active;
499 tick;
500 // the state can be changed before the actor have executed.
501 break;
502 case ActorState.active:
503 tick;
504 // self terminate if the actor has no behavior.
505 if (incoming.empty && awaitedResponses.empty && reqBehavior.empty)
506 state_ = ActorState.forceShutdown;
507 break;
508 case ActorState.shutdown:
509 tick;
510 if (awaitedResponses.empty)
511 state_ = ActorState.finishShutdown;
512 cleanupBehavior;
513 break;
514 case ActorState.forceShutdown:
515 state_ = ActorState.finishShutdown;
516 cleanupBehavior;
517 addr.get.setClosed;
518 break;
519 case ActorState.finishShutdown:
520 state_ = ActorState.stopped;
521
522 sendToMonitors(DownMsg(addr.weakRef, lastError));
523
524 sendToLinks(ExitMsg(addr.weakRef, lastError));
525
526 replyTimeouts = null;
527 cleanupDelayed;
528 cleanupAwait;
529
530 // must be last because sendToLinks and sendToMonitors uses addr.
531 addr.get.shutdown();
532 addr.release;
533 break;
534 case ActorState.stopped:
535 break;
536 }
537 }
538
539 void sendToMonitors(DownMsg msg) @safe nothrow {
540 foreach (ref a; monitors.byValue) {
541 try {
542 if (auto rc = a.lock.get)
543 rc.put(SystemMsg(msg));
544 a.release;
545 } catch (Exception e) {
546 }
547 }
548
549 monitors = null;
550 }
551
552 void sendToLinks(ExitMsg msg) @safe nothrow {
553 foreach (ref a; links.byValue) {
554 try {
555 if (auto rc = a.lock.get)
556 rc.put(SystemMsg(msg));
557 a.release;
558 } catch (Exception e) {
559 }
560 }
561
562 links = null;
563 }
564
565 void checkReplyTimeout(const SysTime now) @safe {
566 if (replyTimeouts.empty)
567 return;
568
569 size_t removeTo;
570 foreach (const i; 0 .. replyTimeouts.length) {
571 if (now > replyTimeouts[i].timeout) {
572 const id = replyTimeouts[i].id;
573 if (auto v = id in awaitedResponses) {
574 messages_++;
575 v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout));
576 try {
577 () @trusted { v.behavior.free; }();
578 } catch (Exception e) {
579 }
580 awaitedResponses.remove(id);
581 }
582 removeTo = i + 1;
583 } else {
584 break;
585 }
586 }
587
588 if (removeTo >= replyTimeouts.length) {
589 replyTimeouts = null;
590 } else if (removeTo != 0) {
591 replyTimeouts = replyTimeouts[removeTo .. $];
592 }
593 }
594
595 void processIncoming() @safe {
596 if (addr.get.empty!Msg)
597 return;
598 messages_++;
599
600 auto front = addr.get.pop!Msg;
601 scope (exit)
602 .destroy(front);
603
604 void doSend(ref MsgOneShot msg) {
605 if (auto v = front.get.signature in incoming) {
606 (*v)(msg.data);
607 } else {
608 defaultHandler_(this, msg.data);
609 }
610 }
611
612 void doRequest(ref MsgRequest msg) @trusted {
613 if (auto v = front.get.signature in reqBehavior) {
614 (*v)(msg.data, msg.replyId, msg.replyTo);
615 } else {
616 defaultHandler_(this, msg.data);
617 }
618 }
619
620 front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) {
621 doRequest(a);
622 });
623 }
624
625 /** All system messages are handled.
626 *
627 * Assuming:
628 * * they are not heavy to process
629 * * very important that if there are any they should be handled as soon as possible
630 * * ignoring the case when there is a "storm" of system messages which
631 * "could" overload the actor system and lead to a crash. I classify this,
632 * for now, as intentional, malicious coding by the developer themself.
633 * External inputs that could trigger such a behavior should be controlled
634 * and limited. Other types of input such as a developer trying to break
635 * the actor system is out of scope.
636 */
637 void processSystemMsg() @safe {
638 //() @trusted {
639 //logger.infof("run %X", cast(void*) &this);
640 //}();
641 while (!addr.get.empty!SystemMsg) {
642 messages_++;
643 //logger.infof("%X %s %s", addr.toHash, state_, messages_);
644 auto front = addr.get.pop!SystemMsg;
645 scope (exit)
646 .destroy(front);
647
648 front.get.match!((ref DownMsg a) {
649 if (downHandler_)
650 downHandler_(this, a);
651 }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) {
652 if (auto v = a.addr.toHash in monitors)
653 v.release;
654 monitors.remove(a.addr.toHash);
655 }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) {
656 if (auto v = a.addr.toHash in links)
657 v.release;
658 links.remove(a.addr.toHash);
659 }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) {
660 exitHandler_(this, a);
661 }, (ref SystemExitMsg a) {
662 final switch (a.reason) {
663 case ExitReason.normal:
664 break;
665 case ExitReason.unhandledException:
666 exitHandler_(this, ExitMsg.init);
667 break;
668 case ExitReason.unknown:
669 exitHandler_(this, ExitMsg.init);
670 break;
671 case ExitReason.userShutdown:
672 exitHandler_(this, ExitMsg.init);
673 break;
674 case ExitReason.kill:
675 exitHandler_(this, ExitMsg.init);
676 // the user do NOT have an option here
677 forceShutdown;
678 break;
679 }
680 });
681 }
682 }
683
684 void processReply() @safe {
685 if (addr.get.empty!Reply)
686 return;
687 messages_++;
688
689 auto front = addr.get.pop!Reply;
690 scope (exit)
691 .destroy(front);
692
693 if (auto v = front.get.id in awaitedResponses) {
694 // TODO: reduce the lookups on front.id
695 v.behavior(front.get.data);
696 try {
697 () @trusted { v.behavior.free; }();
698 } catch (Exception e) {
699 }
700 awaitedResponses.remove(front.get.id);
701 removeReplyTimeout(front.get.id);
702 } else {
703 // TODO: should probably be SystemError.unexpectedResponse?
704 defaultHandler_(this, front.get.data);
705 }
706 }
707
708 void processDelayed(const SysTime now) @trusted {
709 if (!addr.get.empty!DelayedMsg) {
710 // count as a message because handling them are "expensive".
711 // Ignoring the case that the message right away is moved to the
712 // incoming queue. This lead to "double accounting" but ohh well.
713 // Don't use delayedSend when you should have used send.
714 messages_++;
715 delayed.insert(addr.get.pop!DelayedMsg.unsafeMove);
716 } else if (delayed.empty) {
717 return;
718 }
719
720 foreach (const i; 0 .. delayed.length) {
721 if (now > delayed.front.triggerAt) {
722 addr.get.put(delayed.front.msg);
723 delayed.removeFront;
724 } else {
725 break;
726 }
727 }
728 }
729
730 private void removeReplyTimeout(ulong id) @safe nothrow {
731 import std.algorithm : remove;
732
733 foreach (const i; 0 .. replyTimeouts.length) {
734 if (replyTimeouts[i].id == id) {
735 remove(replyTimeouts, i);
736 break;
737 }
738 }
739 }
740
741 void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted {
742 if (!isAccepting)
743 return;
744
745 if (auto v = signature in incoming) {
746 try {
747 v.free;
748 } catch (Exception e) {
749 }
750 }
751 incoming[signature] = handler;
752 }
753
754 void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted {
755 if (!isAccepting)
756 return;
757
758 if (auto v = signature in reqBehavior) {
759 try {
760 v.free;
761 } catch (Exception e) {
762 }
763 }
764 reqBehavior[signature] = handler;
765 }
766
767 void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler,
768 void*) reply, ErrorHandler onError) @safe {
769 if (!isAccepting)
770 return;
771
772 awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError);
773 replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout);
774 schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts);
775 }
776 }
777
778 struct Closure(Fn, CtxT) {
779 alias FreeFn = void function(CtxT);
780
781 Fn fn;
782 CtxT ctx;
783 FreeFn cleanup;
784
785 this(Fn fn) {
786 this.fn = fn;
787 }
788
789 this(Fn fn, CtxT* ctx, FreeFn cleanup) {
790 this.fn = fn;
791 this.ctx = ctx;
792 this.cleanup = cleanup;
793 }
794
795 void opCall(Args...)(auto ref Args args) {
796 assert(fn !is null);
797 fn(ctx, args);
798 }
799
800 void free() {
801 // will crash, on purpuse, if there is a ctx and no cleanup registered.
802 // maybe a bad idea? dunno... lets see
803 if (ctx)
804 cleanup(ctx);
805 ctx = CtxT.init;
806 }
807 }
808
809 @("shall register a behavior to be called when msg received matching signature")
810 unittest {
811 auto addr = makeAddress2;
812 auto actor = Actor(addr);
813
814 bool processedIncoming;
815 void fn(void* ctx, ref Variant msg) {
816 processedIncoming = true;
817 }
818
819 actor.register(1, Closure!(MsgHandler, void*)(&fn));
820 addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42)))));
821
822 actor.process(Clock.currTime);
823
824 assert(processedIncoming);
825 }
826
827 private void cleanupCtx(CtxT)(void* ctx)
828 if (is(CtxT == Tuple!T, T) || is(CtxT == void)) {
829 import std.traits;
830 import my.actor.typed;
831
832 static if (!is(CtxT == void)) {
833 // trust that any use of this also pass on the correct context type.
834 auto userCtx = () @trusted { return cast(CtxT*) ctx; }();
835 // release the context such as if it holds a rc object.
836 alias Types = CtxT.Types;
837
838 static foreach (const i; 0 .. CtxT.Types.length) {
839 {
840 alias T = CtxT.Types[i];
841 alias UT = Unqual!T;
842 static if (!is(T == UT)) {
843 static assert(!is(UT : WeakAddress),
844 "WeakAddress must NEVER be const or immutable");
845 static assert(!is(UT : TypedAddress!M, M...),
846 "WeakAddress must NEVER be const or immutable: " ~ T.stringof);
847 }
848 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit?
849
850 static if (is(UT == T)) {
851 .destroy((*userCtx)[i]);
852 }
853 }
854 }
855 }
856 }
857
858 @("shall default initialize when possible, skipping const/immutable")
859 unittest {
860 {
861 auto x = tuple(cast(const) 42, 43);
862 alias T = typeof(x);
863 cleanupCtx!T(cast(void*)&x);
864 assert(x[0] == 42); // can't assign to const
865 assert(x[1] == 0);
866 }
867
868 {
869 import my.path : Path;
870
871 auto x = tuple(Path.init, cast(const) Path("foo"));
872 alias T = typeof(x);
873 cleanupCtx!T(cast(void*)&x);
874 assert(x[0] == Path.init);
875 assert(x[1] == Path("foo"));
876 }
877 }
878
879 package struct Action {
880 Closure!(MsgHandler, void*) action;
881 ulong signature;
882 }
883
884 /// An behavior for an actor when it receive a message of `signature`.
885 package auto makeAction(T, CtxT = void)(T handler) @safe
886 if (isFunction!T || isFunctionPointer!T) {
887 static if (is(CtxT == void))
888 alias Params = Parameters!T;
889 else {
890 alias CtxParam = Parameters!T[0];
891 alias Params = Parameters!T[1 .. $];
892 checkMatchingCtx!(CtxParam, CtxT);
893 checkRefForContext!handler;
894 }
895
896 alias HArgs = staticMap!(Unqual, Params);
897
898 void fn(void* ctx, ref Variant msg) @trusted {
899 static if (is(CtxT == void)) {
900 handler(msg.get!(Tuple!HArgs).expand);
901 } else {
902 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
903 handler(*userCtx, msg.get!(Tuple!HArgs).expand);
904 }
905 }
906
907 return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
908 }
909
910 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe {
911 static if (is(CtxT == void))
912 alias Params = Parameters!T;
913 else {
914 alias CtxParam = Parameters!T[0];
915 alias Params = Parameters!T[1 .. $];
916 checkMatchingCtx!(CtxParam, CtxT);
917 checkRefForContext!handler;
918 }
919
920 alias HArgs = staticMap!(Unqual, Params);
921
922 void fn(void* ctx, ref Variant msg) @trusted {
923 static if (is(CtxT == void)) {
924 handler(msg.get!(Tuple!HArgs).expand);
925 } else {
926 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
927 handler(*userCtx, msg.get!(Tuple!HArgs).expand);
928 }
929 }
930
931 return typeof(return)(&fn, null, &cleanupCtx!CtxT);
932 }
933
934 package struct Request {
935 Closure!(RequestHandler, void*) request;
936 ulong signature;
937 }
938
939 private string locToString(Loc...)() {
940 import std.conv : to;
941
942 return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string;
943 }
944
945 /// Check that the context parameter is `ref` otherwise issue a warning.
946 package void checkRefForContext(alias handler)() {
947 import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
948
949 alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0];
950
951 static if (CtxParam != ParameterStorageClass.ref_) {
952 pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof);
953 static assert(CtxParam == ParameterStorageClass.ref_,
954 "The context must be `ref` to avoid unnecessary copying");
955 }
956 }
957
958 package void checkMatchingCtx(CtxParam, CtxT)() {
959 static if (!is(CtxT == CtxParam)) {
960 static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }),
961 "mismatch between the context type " ~ CtxT.stringof
962 ~ " and the first parameter " ~ CtxParam.stringof);
963 }
964 }
965
966 package auto makeRequest(T, CtxT = void)(T handler) @safe {
967 static assert(!is(ReturnType!T == void), "handler returns void, not allowed");
968
969 alias RType = ReturnType!T;
970 enum isReqResult = is(RType : RequestResult!ReqT, ReqT);
971 enum isPromise = is(RType : Promise!PromT, PromT);
972
973 static if (is(CtxT == void))
974 alias Params = Parameters!T;
975 else {
976 alias CtxParam = Parameters!T[0];
977 alias Params = Parameters!T[1 .. $];
978 checkMatchingCtx!(CtxParam, CtxT);
979 checkRefForContext!handler;
980 }
981
982 alias HArgs = staticMap!(Unqual, Params);
983
984 void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted {
985 static if (is(CtxT == void)) {
986 auto r = handler(msg.get!(Tuple!HArgs).expand);
987 } else {
988 auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx;
989 auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand);
990 }
991
992 static if (isReqResult) {
993 r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) {
994 assert(!a.data.empty, "the promise MUST be constructed before it is returned");
995 a.data.get.replyId = replyId;
996 a.data.get.replyTo = replyTo;
997 }, (data) {
998 enum wrapInTuple = !is(typeof(data) : Tuple!U, U);
999 if (auto rc = replyTo.lock.get) {
1000 static if (wrapInTuple)
1001 rc.put(Reply(replyId, Variant(tuple(data))));
1002 else
1003 rc.put(Reply(replyId, Variant(data)));
1004 }
1005 });
1006 } else static if (isPromise) {
1007 r.data.get.replyId = replyId;
1008 r.data.get.replyTo = replyTo;
1009 } else {
1010 // TODO: is this syntax for U one variable or variable. I want it to be variable.
1011 enum wrapInTuple = !is(RType : Tuple!U, U);
1012 if (auto rc = replyTo.lock.get) {
1013 static if (wrapInTuple)
1014 rc.put(Reply(replyId, Variant(tuple(r))));
1015 else
1016 rc.put(Reply(replyId, Variant(r)));
1017 }
1018 }
1019 }
1020
1021 return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
1022 }
1023
1024 @("shall link two actors lifetime")
1025 unittest {
1026 int count;
1027 void countExits(ref Actor self, ExitMsg msg) @safe nothrow {
1028 count++;
1029 self.shutdown;
1030 }
1031
1032 auto aa1 = Actor(makeAddress2);
1033 auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize;
1034 auto aa2 = Actor(makeAddress2);
1035 auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize;
1036
1037 a1.linkTo(a2.address);
1038 a1.process(Clock.currTime);
1039 a2.process(Clock.currTime);
1040
1041 assert(a1.isAlive);
1042 assert(a2.isAlive);
1043
1044 sendExit(a1.address, ExitReason.userShutdown);
1045 foreach (_; 0 .. 5) {
1046 a1.process(Clock.currTime);
1047 a2.process(Clock.currTime);
1048 }
1049
1050 assert(!a1.isAlive);
1051 assert(!a2.isAlive);
1052 assert(count == 2);
1053 }
1054
1055 @("shall let one actor monitor the lifetime of the other one")
1056 unittest {
1057 int count;
1058 void downMsg(ref Actor self, DownMsg msg) @safe nothrow {
1059 count++;
1060 }
1061
1062 auto aa1 = Actor(makeAddress2);
1063 auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize;
1064 auto aa2 = Actor(makeAddress2);
1065 auto a2 = build(&aa2).set((int x) {}).finalize;
1066
1067 a1.monitor(a2.address);
1068 a1.process(Clock.currTime);
1069 a2.process(Clock.currTime);
1070
1071 assert(a1.isAlive);
1072 assert(a2.isAlive);
1073
1074 sendExit(a2.address, ExitReason.userShutdown);
1075 foreach (_; 0 .. 5) {
1076 a1.process(Clock.currTime);
1077 a2.process(Clock.currTime);
1078 }
1079
1080 assert(a1.isAlive);
1081 assert(!a2.isAlive);
1082 assert(count == 1);
1083 }
1084
1085 private struct BuildActor {
1086 Actor* actor;
1087
1088 Actor* finalize() @safe {
1089 auto rval = actor;
1090 actor = null;
1091 return rval;
1092 }
1093
1094 auto errorHandler(ErrorHandler a) {
1095 actor.errorHandler = a;
1096 return this;
1097 }
1098
1099 auto downHandler_(DownHandler a) {
1100 actor.downHandler_ = a;
1101 return this;
1102 }
1103
1104 auto exitHandler_(ExitHandler a) {
1105 actor.exitHandler_ = a;
1106 return this;
1107 }
1108
1109 auto exceptionHandler_(ExceptionHandler a) {
1110 actor.exceptionHandler_ = a;
1111 return this;
1112 }
1113
1114 auto defaultHandler_(DefaultHandler a) {
1115 actor.defaultHandler_ = a;
1116 return this;
1117 }
1118
1119 auto set(BehaviorT)(BehaviorT behavior)
1120 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1121 && !is(ReturnType!BehaviorT == void)) {
1122 auto act = makeRequest(behavior);
1123 actor.register(act.signature, act.request);
1124 return this;
1125 }
1126
1127 auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1128 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1129 && !is(ReturnType!BehaviorT == void)) {
1130 auto act = makeRequest!(BehaviorT, CT)(behavior);
1131 // for now just use the GC to allocate the context on.
1132 // TODO: use an allocator.
1133 act.request.ctx = cast(void*) new CT(c);
1134 actor.register(act.signature, act.request);
1135 return this;
1136 }
1137
1138 auto set(BehaviorT)(BehaviorT behavior)
1139 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1140 && is(ReturnType!BehaviorT == void)) {
1141 auto act = makeAction(behavior);
1142 actor.register(act.signature, act.action);
1143 return this;
1144 }
1145
1146 auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1147 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1148 && is(ReturnType!BehaviorT == void)) {
1149 auto act = makeAction!(BehaviorT, CT)(behavior);
1150 // for now just use the GC to allocate the context on.
1151 // TODO: use an allocator.
1152 act.action.ctx = cast(void*) new CT(c);
1153 actor.register(act.signature, act.action);
1154 return this;
1155 }
1156 }
1157
1158 package BuildActor build(Actor* a) @safe {
1159 return BuildActor(a);
1160 }
1161
1162 /// Implement an actor.
1163 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) {
1164 import my.actor.msg : isCapture, Capture;
1165
1166 auto bactor = build(self);
1167 static foreach (const i; 0 .. Behavior.length) {
1168 {
1169 alias b = Behavior[i];
1170
1171 static if (!isCapture!b) {
1172 static if (!(isFunction!(b) || isFunctionPointer!(b)))
1173 static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof);
1174
1175 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) {
1176 bactor.set(behaviors[i], behaviors[i + 1]);
1177 } else
1178 bactor.set(behaviors[i]);
1179 }
1180 }
1181 }
1182
1183 return bactor.finalize;
1184 }
1185
1186 @("build dynamic actor from functions")
1187 unittest {
1188 static void fn3(int s) @safe {
1189 }
1190
1191 static string fn4(int s) @safe {
1192 return "foo";
1193 }
1194
1195 static Tuple!(int, string) fn5(const string s) @safe {
1196 return typeof(return)(42, "hej");
1197 }
1198
1199 auto aa1 = Actor(makeAddress2);
1200 auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize;
1201 }
1202
1203 unittest {
1204 bool delayOk;
1205 static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe {
1206 *c.delayOk = true;
1207 }
1208
1209 bool delayShouldNeverHappen;
1210 static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe {
1211 *c.delayShouldNeverHappen = true;
1212 }
1213
1214 auto aa1 = Actor(makeAddress2);
1215 auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2,
1216 capture(&delayShouldNeverHappen)).finalize;
1217 delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo");
1218 delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42);
1219
1220 assert(!actor.addressRef.get.empty!DelayedMsg);
1221 assert(actor.addressRef.get.empty!Msg);
1222 assert(actor.addressRef.get.empty!Reply);
1223
1224 actor.process(Clock.currTime);
1225
1226 assert(!actor.addressRef.get.empty!DelayedMsg);
1227 assert(actor.addressRef.get.empty!Msg);
1228 assert(actor.addressRef.get.empty!Reply);
1229
1230 actor.process(Clock.currTime);
1231 actor.process(Clock.currTime);
1232
1233 assert(actor.addressRef.get.empty!DelayedMsg);
1234 assert(actor.addressRef.get.empty!Msg);
1235 assert(actor.addressRef.get.empty!Reply);
1236
1237 assert(delayOk);
1238 assert(!delayShouldNeverHappen);
1239 }
1240
1241 @("shall process a request->then chain xyz")
1242 @system unittest {
1243 // checking capture is correctly setup/teardown by using captured rc.
1244
1245 auto rcReq = refCounted(42);
1246 bool calledOk;
1247 static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s,
1248 const string b) {
1249 assert(2 == ctx[1].refCount);
1250 if (s == "apa")
1251 *ctx.calledOk = true;
1252 return "foo";
1253 }
1254
1255 auto rcReply = refCounted(42);
1256 bool calledReply;
1257 static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) {
1258 *ctx[0] = s == "foo";
1259 assert(2 == ctx[1].refCount);
1260 }
1261
1262 auto aa1 = Actor(makeAddress2);
1263 auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize;
1264
1265 assert(2 == rcReq.refCount);
1266 assert(1 == rcReply.refCount);
1267
1268 actor.request(actor.address, infTimeout).send("apa", "foo")
1269 .capture(&calledReply, rcReply).then(&reply);
1270 assert(2 == rcReply.refCount);
1271
1272 assert(!actor.addr.get.empty!Msg);
1273 assert(actor.addr.get.empty!Reply);
1274
1275 actor.process(Clock.currTime);
1276 assert(actor.addr.get.empty!Msg);
1277 assert(actor.addr.get.empty!Reply);
1278
1279 assert(2 == rcReq.refCount);
1280 assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back");
1281
1282 assert(calledOk);
1283 assert(calledReply);
1284
1285 actor.shutdown;
1286 while (actor.isAlive)
1287 actor.process(Clock.currTime);
1288 }
1289
1290 @("shall process a request->then chain using promises")
1291 unittest {
1292 static struct A {
1293 string v;
1294 }
1295
1296 static struct B {
1297 string v;
1298 }
1299
1300 int calledOk;
1301 auto fn1p = makePromise!string;
1302 static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted {
1303 if (a.v == "apa")
1304 (*c.calledOk)++;
1305 return typeof(return)(c.p);
1306 }
1307
1308 auto fn2p = makePromise!string;
1309 static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) {
1310 (*c.calledOk)++;
1311 return c.p;
1312 }
1313
1314 int calledReply;
1315 static void reply(ref Tuple!(int*) ctx, const string s) {
1316 if (s == "foo")
1317 *ctx[0] += 1;
1318 }
1319
1320 auto aa1 = Actor(makeAddress2);
1321 auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2,
1322 capture(&calledOk, fn2p)).finalize;
1323
1324 actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply);
1325 actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply);
1326
1327 actor.process(Clock.currTime);
1328 assert(calledOk == 1); // first request
1329 assert(calledReply == 0);
1330
1331 fn1p.deliver("foo");
1332
1333 assert(calledReply == 0);
1334
1335 actor.process(Clock.currTime);
1336 assert(calledOk == 2); // second request triggered
1337 assert(calledReply == 1);
1338
1339 fn2p.deliver("foo");
1340 actor.process(Clock.currTime);
1341
1342 assert(calledReply == 2);
1343
1344 actor.shutdown;
1345 while (actor.isAlive) {
1346 actor.process(Clock.currTime);
1347 }
1348 }
1349
1350 /// The timeout triggered.
1351 class ScopedActorException : Exception {
1352 this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow {
1353 super(null, file, line);
1354 error = err;
1355 }
1356
1357 ScopedActorError error;
1358 }
1359
1360 enum ScopedActorError : ubyte {
1361 none,
1362 // actor address is down
1363 down,
1364 // request timeout
1365 timeout,
1366 // the address where unable to process the received message
1367 unknownMsg,
1368 // some type of fatal error occured.
1369 fatal,
1370 }
1371
1372 /** Intended to be used in a local scope by a user.
1373 *
1374 * `ScopedActor` is not thread safe.
1375 */
1376 struct ScopedActor {
1377 import my.actor.typed : underlyingAddress, underlyingWeakAddress;
1378
1379 private {
1380 static struct Data {
1381 Actor self;
1382 ScopedActorError errSt;
1383
1384 ~this() @safe {
1385 if (self.addr.empty)
1386 return;
1387
1388 () @trusted {
1389 self.downHandler = null;
1390 self.defaultHandler = toDelegate(&.defaultHandler);
1391 self.errorHandler = toDelegate(&defaultErrorHandler);
1392 }();
1393
1394 self.shutdown;
1395 while (self.isAlive) {
1396 self.process(Clock.currTime);
1397 }
1398 }
1399 }
1400
1401 RefCounted!Data data;
1402 }
1403
1404 this(StrongAddress addr, string name) @safe {
1405 data = refCounted(Data(Actor(addr)));
1406 data.get.self.name = name;
1407 }
1408
1409 private void reset() @safe nothrow {
1410 data.get.errSt = ScopedActorError.none;
1411 }
1412
1413 SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout)
1414 if (isAddress!TAddress) {
1415 reset;
1416 auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout);
1417 return SRequestSend(rs, this);
1418 }
1419
1420 private static struct SRequestSend {
1421 RequestSend rs;
1422 ScopedActor self;
1423
1424 /// Copy constructor
1425 this(ref return typeof(this) rhs) @safe pure nothrow @nogc {
1426 rs = rhs.rs;
1427 self = rhs.self;
1428 }
1429
1430 @disable this(this);
1431
1432 SRequestSendThen send(Args...)(auto ref Args args) {
1433 return SRequestSendThen(.send(rs, args), self);
1434 }
1435 }
1436
1437 private static struct SRequestSendThen {
1438 RequestSendThen rs;
1439 ScopedActor self;
1440 uint backoff;
1441
1442 /// Copy constructor
1443 this(ref return typeof(this) rhs) {
1444 rs = rhs.rs;
1445 self = rhs.self;
1446 backoff = rhs.backoff;
1447 }
1448
1449 @disable this(this);
1450
1451 void dynIntervalSleep() @trusted {
1452 // +100 usecs "feels good", magic number. current OS and
1453 // implementation of message passing isn't that much faster than
1454 // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They
1455 // aren't expected to be used for "time critical" sections.
1456 Thread.sleep(backoff.dur!"usecs");
1457 backoff = min(backoff + 100, 20000);
1458 }
1459
1460 private static struct ValueCapture {
1461 RefCounted!Data data;
1462
1463 void downHandler(ref Actor, DownMsg) @safe nothrow {
1464 data.get.errSt = ScopedActorError.down;
1465 }
1466
1467 void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow {
1468 if (msg.reason == SystemError.requestTimeout)
1469 data.get.errSt = ScopedActorError.timeout;
1470 else
1471 data.get.errSt = ScopedActorError.fatal;
1472 }
1473
1474 void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow {
1475 logAndDropHandler(a, msg);
1476 data.get.errSt = ScopedActorError.unknownMsg;
1477 }
1478 }
1479
1480 void then(T)(T handler, ErrorHandler onError = null) {
1481 scope (exit)
1482 demonitor(rs.rs.self, rs.rs.requestTo);
1483 monitor(rs.rs.self, rs.rs.requestTo);
1484
1485 auto callback = new ValueCapture(self.data);
1486 self.data.get.self.downHandler = &callback.downHandler;
1487 self.data.get.self.defaultHandler = &callback.unknownMsgHandler;
1488 self.data.get.self.errorHandler = &callback.errorHandler;
1489
1490 () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }();
1491
1492 scope (exit)
1493 () @trusted {
1494 self.data.get.self.downHandler = null;
1495 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler);
1496 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler);
1497 }();
1498
1499 auto requestTo = rs.rs.requestTo.lock;
1500 if (!requestTo)
1501 throw new ScopedActorException(ScopedActorError.down);
1502
1503 // TODO: this loop is stupid... should use a conditional variable
1504 // instead but that requires changing the mailbox. later
1505 do {
1506 rs.rs.self.process(Clock.currTime);
1507 // force the actor to be alive even though there are no behaviors.
1508 rs.rs.self.state_ = ActorState.waiting;
1509
1510 if (self.data.get.errSt == ScopedActorError.none) {
1511 dynIntervalSleep;
1512 } else {
1513 throw new ScopedActorException(self.data.get.errSt);
1514 }
1515
1516 }
1517 while (self.data.get.self.waitingForReply);
1518 }
1519 }
1520 }
1521
1522 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe {
1523 import std.format : format;
1524
1525 return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line));
1526 }
1527
1528 @(
1529 "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed")
1530 unittest {
1531 import my.actor.system;
1532
1533 auto sys = makeSystem;
1534
1535 auto a0 = sys.spawn((Actor* self) {
1536 return impl(self, (ref CSelf!() ctx, int x) {
1537 Thread.sleep(50.dur!"msecs");
1538 return 42;
1539 }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self),
1540 (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self));
1541 });
1542
1543 {
1544 auto self = scopedActor;
1545 bool excThrown;
1546 auto stopAt = Clock.currTime + 3.dur!"seconds";
1547 while (!excThrown && Clock.currTime < stopAt) {
1548 try {
1549 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {});
1550 } catch (ScopedActorException e) {
1551 excThrown = e.error == ScopedActorError.timeout;
1552 } catch (Exception e) {
1553 logger.info(e.msg);
1554 }
1555 }
1556 assert(excThrown, "timeout did not trigger as expected");
1557 }
1558
1559 {
1560 auto self = scopedActor;
1561 bool excThrown;
1562 auto stopAt = Clock.currTime + 3.dur!"seconds";
1563 while (!excThrown && Clock.currTime < stopAt) {
1564 try {
1565 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) {
1566 });
1567 } catch (ScopedActorException e) {
1568 excThrown = e.error == ScopedActorError.down;
1569 } catch (Exception e) {
1570 logger.info(e.msg);
1571 }
1572 }
1573 assert(excThrown, "detecting terminated actor did not trigger as expected");
1574 }
1575 }