1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.actor; 7 8 import std.stdio : writeln, writefln; 9 10 import core.thread : Thread; 11 import logger = std.experimental.logger; 12 import std.algorithm : schwartzSort, max, min, among; 13 import std.array : empty; 14 import std.datetime : SysTime, Clock, dur; 15 import std.exception : collectException; 16 import std.functional : toDelegate; 17 import std.meta : staticMap; 18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction; 19 import std.typecons : Tuple, tuple; 20 import std.variant : Variant; 21 22 import my.actor.common : ExitReason, SystemError, makeSignature; 23 import my.actor.mailbox; 24 import my.actor.msg; 25 import my.actor.system : System; 26 import my.actor.typed : isTypedAddress, isTypedActorImpl; 27 import my.gc.refc; 28 import sumtype; 29 30 private struct PromiseData { 31 WeakAddress replyTo; 32 ulong replyId; 33 34 /// Copy constructor 35 this(ref return scope typeof(this) rhs) @safe nothrow @nogc { 36 replyTo = rhs.replyTo; 37 replyId = rhs.replyId; 38 } 39 40 @disable this(this); 41 } 42 43 // deliver can only be called one time. 44 struct Promise(T) { 45 package { 46 RefCounted!PromiseData data; 47 } 48 49 void deliver(T reply) { 50 auto tmp = reply; 51 deliver(reply); 52 } 53 54 /** Deliver the message `reply`. 55 * 56 * A promise can only be delivered once. 57 */ 58 void deliver(ref T reply) @trusted 59 in (!data.empty, "promise must be initialized") { 60 if (data.empty) 61 return; 62 scope (exit) 63 data.release; 64 65 // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed. 66 if (auto replyTo = data.get.replyTo.lock.get) { 67 enum wrapInTuple = !is(T : Tuple!U, U); 68 static if (wrapInTuple) 69 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply)))); 70 else 71 replyTo.put(Reply(data.get.replyId, Variant(reply))); 72 } 73 } 74 75 void opAssign(Promise!T rhs) { 76 data = rhs.data; 77 } 78 79 /// True if the promise is not initialized. 80 bool empty() { 81 return data.empty || data.get.replyId == 0; 82 } 83 84 /// Clear the promise. 85 void clear() { 86 data.release; 87 } 88 } 89 90 auto makePromise(T)() { 91 return Promise!T(refCounted(PromiseData.init)); 92 } 93 94 struct RequestResult(T) { 95 this(T v) { 96 value = typeof(value)(v); 97 } 98 99 this(ErrorMsg v) { 100 value = typeof(value)(v); 101 } 102 103 this(Promise!T v) { 104 value = typeof(value)(v); 105 } 106 107 SumType!(T, ErrorMsg, Promise!T) value; 108 } 109 110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe; 111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg, 112 ulong replyId, WeakAddress replyTo) @safe; 113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe; 114 115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow; 116 117 /** Actors send error messages to others by returning an error (see Errors) 118 * from a message handler. Similar to exit messages, error messages usually 119 * cause the receiving actor to terminate, unless a custom handler was 120 * installed. The default handler is used as fallback if request is used 121 * without error handler. 122 */ 123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow; 124 125 /** Bidirectional monitoring with a strong lifetime coupling is established by 126 * calling a `LinkRequest` to an address. This will cause the runtime to send 127 * an `ExitMsg` if either this or other dies. Per default, actors terminate 128 * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal. 129 * This mechanism propagates failure states in an actor system. Linked actors 130 * form a sub system in which an error causes all actors to fail collectively. 131 */ 132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow; 133 134 /// An exception has been thrown while processing a message. 135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow; 136 137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest` 138 * to an address. This will cause the runtime system to send a `DownMsg` for 139 * other if it dies. 140 * 141 * Actors drop down messages unless they provide a custom handler. 142 */ 143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow; 144 145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow { 146 } 147 148 /// Write the name of the actor and the message type to the console. 149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow { 150 import std.stdio : writeln; 151 152 try { 153 writeln("UNKNOWN message sent to actor ", self.name); 154 writeln(msg.toString); 155 } catch (Exception e) { 156 } 157 } 158 159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow { 160 self.lastError = msg.reason; 161 self.shutdown; 162 } 163 164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow { 165 self.lastError = msg.reason; 166 self.forceShutdown; 167 } 168 169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow { 170 self.lastError = SystemError.runtimeError; 171 // TODO: should log? 172 self.forceShutdown; 173 } 174 175 // Write the name of the actor and the exception to stdout. 176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow { 177 import std.stdio : writeln; 178 179 self.lastError = SystemError.runtimeError; 180 181 try { 182 writeln("EXCEPTION thrown by actor ", self.name); 183 writeln(e.msg); 184 writeln("TERMINATING"); 185 } catch (Exception e) { 186 } 187 188 self.forceShutdown; 189 } 190 191 /// Timeout for an outstanding request. 192 struct ReplyHandlerTimeout { 193 ulong id; 194 SysTime timeout; 195 } 196 197 package enum ActorState { 198 /// waiting to be started. 199 waiting, 200 /// active and processing messages. 201 active, 202 /// wait for all awaited responses to finish 203 shutdown, 204 /// discard also the awaite responses, just shutdown fast 205 forceShutdown, 206 /// in process of shutting down 207 finishShutdown, 208 /// stopped. 209 stopped, 210 } 211 212 private struct AwaitReponse { 213 Closure!(ReplyHandler, void*) behavior; 214 ErrorHandler onError; 215 } 216 217 struct Actor { 218 import std.container.rbtree : RedBlackTree, redBlackTree; 219 220 package StrongAddress addr; 221 // visible in the package for logging purpose. 222 package ActorState state_ = ActorState.stopped; 223 224 private { 225 // TODO: rename to behavior. 226 Closure!(MsgHandler, void*)[ulong] incoming; 227 Closure!(RequestHandler, void*)[ulong] reqBehavior; 228 229 // callbacks for awaited responses key:ed on their id. 230 AwaitReponse[ulong] awaitedResponses; 231 ReplyHandlerTimeout[] replyTimeouts; 232 233 // important that it start at 1 because then zero is known to not be initialized. 234 ulong nextReplyId = 1; 235 236 /// Delayed messages ordered by their trigger time. 237 RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed; 238 239 /// Used during shutdown to signal monitors and links why this actor is terminating. 240 SystemError lastError; 241 242 /// monitoring the actor lifetime. 243 WeakAddress[size_t] monitors; 244 245 /// strong, bidirectional link of the actors lifetime. 246 WeakAddress[size_t] links; 247 248 // Number of messages that has been processed. 249 ulong messages_; 250 251 /// System the actor belongs to. 252 System* homeSystem_; 253 254 string name_; 255 256 ErrorHandler errorHandler_; 257 258 /// callback when a link goes down. 259 DownHandler downHandler_; 260 261 ExitHandler exitHandler_; 262 263 ExceptionHandler exceptionHandler_; 264 265 DefaultHandler defaultHandler_; 266 } 267 268 invariant () { 269 if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) { 270 assert(errorHandler_); 271 assert(exitHandler_); 272 assert(exceptionHandler_); 273 assert(defaultHandler_); 274 } 275 } 276 277 this(StrongAddress a) @trusted 278 in (!a.empty, "address is empty") { 279 state_ = ActorState.waiting; 280 281 addr = a; 282 addr.get.setOpen; 283 delayed = new typeof(delayed); 284 285 errorHandler_ = toDelegate(&defaultErrorHandler); 286 downHandler_ = null; 287 exitHandler_ = toDelegate(&defaultExitHandler); 288 exceptionHandler_ = toDelegate(&defaultExceptionHandler); 289 defaultHandler_ = toDelegate(&.defaultHandler); 290 } 291 292 WeakAddress address() @safe { 293 return addr.weakRef; 294 } 295 296 package ref StrongAddress addressRef() return @safe pure nothrow @nogc { 297 return addr; 298 } 299 300 ref System homeSystem() @safe pure nothrow @nogc { 301 return *homeSystem_; 302 } 303 304 /** Clean shutdown of the actor 305 * 306 * Stopping incoming messages from triggering new behavior and finish all 307 * awaited respones. 308 */ 309 void shutdown() @safe nothrow { 310 if (state_.among(ActorState.waiting, ActorState.active)) 311 state_ = ActorState.shutdown; 312 } 313 314 /** Force an immediate shutdown. 315 * 316 * Stopping incoming messages from triggering new behavior and finish all 317 * awaited respones. 318 */ 319 void forceShutdown() @safe nothrow { 320 if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown)) 321 state_ = ActorState.forceShutdown; 322 } 323 324 ulong id() @safe pure nothrow const @nogc { 325 return addr.id; 326 } 327 328 /// Returns: the name of the actor. 329 string name() @safe pure nothrow const @nogc { 330 return name_; 331 } 332 333 // dfmt off 334 335 /// Set name name of the actor. 336 void name(string n) @safe pure nothrow @nogc { 337 this.name_ = n; 338 } 339 340 void errorHandler(ErrorHandler v) @safe pure nothrow @nogc { 341 errorHandler_ = v; 342 } 343 344 void downHandler(DownHandler v) @safe pure nothrow @nogc { 345 downHandler_ = v; 346 } 347 348 void exitHandler(ExitHandler v) @safe pure nothrow @nogc { 349 exitHandler_ = v; 350 } 351 352 void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc { 353 exceptionHandler_ = v; 354 } 355 356 void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc { 357 defaultHandler_ = v; 358 } 359 360 // dfmt on 361 362 package: 363 bool hasMessage() @safe pure nothrow @nogc { 364 return addr && addr.get.hasMessage; 365 } 366 367 /// How long until a delayed message or a timeout fires. 368 Duration nextTimeout(const SysTime now, const Duration default_) @safe { 369 return min(delayed.empty ? default_ : (delayed.front.triggerAt - now), 370 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now)); 371 } 372 373 bool waitingForReply() @safe pure nothrow const @nogc { 374 return !awaitedResponses.empty; 375 } 376 377 /// Number of messages that has been processed. 378 ulong messages() @safe pure nothrow const @nogc { 379 return messages_; 380 } 381 382 void setHomeSystem(System* sys) @safe pure nothrow @nogc { 383 homeSystem_ = sys; 384 } 385 386 void cleanupBehavior() @trusted nothrow { 387 foreach (ref a; incoming.byValue) { 388 try { 389 a.free; 390 } catch (Exception e) { 391 // TODO: call exceptionHandler? 392 } 393 } 394 incoming = null; 395 foreach (ref a; reqBehavior.byValue) { 396 try { 397 a.free; 398 } catch (Exception e) { 399 } 400 } 401 reqBehavior = null; 402 } 403 404 void cleanupAwait() @trusted nothrow { 405 foreach (ref a; awaitedResponses.byValue) { 406 try { 407 a.behavior.free; 408 } catch (Exception e) { 409 } 410 } 411 awaitedResponses = null; 412 } 413 414 void cleanupDelayed() @trusted nothrow { 415 foreach (const _; 0 .. delayed.length) { 416 try { 417 delayed.front.msg = Msg.init; 418 delayed.removeFront; 419 } catch (Exception e) { 420 } 421 } 422 .destroy(delayed); 423 } 424 425 bool isAlive() @safe pure nothrow const @nogc { 426 final switch (state_) { 427 case ActorState.waiting: 428 goto case; 429 case ActorState.active: 430 goto case; 431 case ActorState.shutdown: 432 goto case; 433 case ActorState.forceShutdown: 434 goto case; 435 case ActorState.finishShutdown: 436 return true; 437 case ActorState.stopped: 438 return false; 439 } 440 } 441 442 /// Accepting messages. 443 bool isAccepting() @safe pure nothrow const @nogc { 444 final switch (state_) { 445 case ActorState.waiting: 446 goto case; 447 case ActorState.active: 448 goto case; 449 case ActorState.shutdown: 450 return true; 451 case ActorState.forceShutdown: 452 goto case; 453 case ActorState.finishShutdown: 454 goto case; 455 case ActorState.stopped: 456 return false; 457 } 458 } 459 460 ulong replyId() @safe { 461 return nextReplyId++; 462 } 463 464 void process(const SysTime now) @safe nothrow { 465 import core.memory : GC; 466 467 assert(!GC.inFinalizer); 468 469 messages_ = 0; 470 471 void tick() { 472 // philosophy of the order is that a timeout should only trigger if it 473 // is really required thus it is checked last. This order then mean 474 // that a request may have triggered a timeout but because 475 // `processReply` is called before `checkReplyTimeout` it is *ignored*. 476 // Thus "better to accept even if it is timeout rather than fail". 477 // 478 // NOTE: the assumption that a message that has timed out should be 479 // processed turned out to be... wrong. It is annoying that 480 // sometimes a timeout message triggers even though it shouldn't, 481 // because it is now too old to be useful! 482 // Thus the order is changed to first check for timeout, then process. 483 try { 484 processSystemMsg(); 485 checkReplyTimeout(now); 486 processDelayed(now); 487 processIncoming(); 488 processReply(); 489 } catch (Exception e) { 490 exceptionHandler_(this, e); 491 } 492 } 493 494 assert(state_ == ActorState.stopped || addr, "no address"); 495 496 final switch (state_) { 497 case ActorState.waiting: 498 state_ = ActorState.active; 499 tick; 500 // the state can be changed before the actor have executed. 501 break; 502 case ActorState.active: 503 tick; 504 // self terminate if the actor has no behavior. 505 if (incoming.empty && awaitedResponses.empty && reqBehavior.empty) 506 state_ = ActorState.forceShutdown; 507 break; 508 case ActorState.shutdown: 509 tick; 510 if (awaitedResponses.empty) 511 state_ = ActorState.finishShutdown; 512 cleanupBehavior; 513 break; 514 case ActorState.forceShutdown: 515 state_ = ActorState.finishShutdown; 516 cleanupBehavior; 517 addr.get.setClosed; 518 break; 519 case ActorState.finishShutdown: 520 state_ = ActorState.stopped; 521 522 sendToMonitors(DownMsg(addr.weakRef, lastError)); 523 524 sendToLinks(ExitMsg(addr.weakRef, lastError)); 525 526 replyTimeouts = null; 527 cleanupDelayed; 528 cleanupAwait; 529 530 // must be last because sendToLinks and sendToMonitors uses addr. 531 addr.get.shutdown(); 532 addr.release; 533 break; 534 case ActorState.stopped: 535 break; 536 } 537 } 538 539 void sendToMonitors(DownMsg msg) @safe nothrow { 540 foreach (ref a; monitors.byValue) { 541 try { 542 if (auto rc = a.lock.get) 543 rc.put(SystemMsg(msg)); 544 a.release; 545 } catch (Exception e) { 546 } 547 } 548 549 monitors = null; 550 } 551 552 void sendToLinks(ExitMsg msg) @safe nothrow { 553 foreach (ref a; links.byValue) { 554 try { 555 if (auto rc = a.lock.get) 556 rc.put(SystemMsg(msg)); 557 a.release; 558 } catch (Exception e) { 559 } 560 } 561 562 links = null; 563 } 564 565 void checkReplyTimeout(const SysTime now) @safe { 566 if (replyTimeouts.empty) 567 return; 568 569 size_t removeTo; 570 foreach (const i; 0 .. replyTimeouts.length) { 571 if (now > replyTimeouts[i].timeout) { 572 const id = replyTimeouts[i].id; 573 if (auto v = id in awaitedResponses) { 574 messages_++; 575 v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout)); 576 try { 577 () @trusted { v.behavior.free; }(); 578 } catch (Exception e) { 579 } 580 awaitedResponses.remove(id); 581 } 582 removeTo = i + 1; 583 } else { 584 break; 585 } 586 } 587 588 if (removeTo >= replyTimeouts.length) { 589 replyTimeouts = null; 590 } else if (removeTo != 0) { 591 replyTimeouts = replyTimeouts[removeTo .. $]; 592 } 593 } 594 595 void processIncoming() @safe { 596 if (addr.get.empty!Msg) 597 return; 598 messages_++; 599 600 auto front = addr.get.pop!Msg; 601 scope (exit) 602 .destroy(front); 603 604 void doSend(ref MsgOneShot msg) { 605 if (auto v = front.get.signature in incoming) { 606 (*v)(msg.data); 607 } else { 608 defaultHandler_(this, msg.data); 609 } 610 } 611 612 void doRequest(ref MsgRequest msg) @trusted { 613 if (auto v = front.get.signature in reqBehavior) { 614 (*v)(msg.data, msg.replyId, msg.replyTo); 615 } else { 616 defaultHandler_(this, msg.data); 617 } 618 } 619 620 front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) { 621 doRequest(a); 622 }); 623 } 624 625 /** All system messages are handled. 626 * 627 * Assuming: 628 * * they are not heavy to process 629 * * very important that if there are any they should be handled as soon as possible 630 * * ignoring the case when there is a "storm" of system messages which 631 * "could" overload the actor system and lead to a crash. I classify this, 632 * for now, as intentional, malicious coding by the developer themself. 633 * External inputs that could trigger such a behavior should be controlled 634 * and limited. Other types of input such as a developer trying to break 635 * the actor system is out of scope. 636 */ 637 void processSystemMsg() @safe { 638 //() @trusted { 639 //logger.infof("run %X", cast(void*) &this); 640 //}(); 641 while (!addr.get.empty!SystemMsg) { 642 messages_++; 643 //logger.infof("%X %s %s", addr.toHash, state_, messages_); 644 auto front = addr.get.pop!SystemMsg; 645 scope (exit) 646 .destroy(front); 647 648 front.get.match!((ref DownMsg a) { 649 if (downHandler_) 650 downHandler_(this, a); 651 }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) { 652 if (auto v = a.addr.toHash in monitors) 653 v.release; 654 monitors.remove(a.addr.toHash); 655 }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) { 656 if (auto v = a.addr.toHash in links) 657 v.release; 658 links.remove(a.addr.toHash); 659 }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) { 660 exitHandler_(this, a); 661 }, (ref SystemExitMsg a) { 662 final switch (a.reason) { 663 case ExitReason.normal: 664 break; 665 case ExitReason.unhandledException: 666 exitHandler_(this, ExitMsg.init); 667 break; 668 case ExitReason.unknown: 669 exitHandler_(this, ExitMsg.init); 670 break; 671 case ExitReason.userShutdown: 672 exitHandler_(this, ExitMsg.init); 673 break; 674 case ExitReason.kill: 675 exitHandler_(this, ExitMsg.init); 676 // the user do NOT have an option here 677 forceShutdown; 678 break; 679 } 680 }); 681 } 682 } 683 684 void processReply() @safe { 685 if (addr.get.empty!Reply) 686 return; 687 messages_++; 688 689 auto front = addr.get.pop!Reply; 690 scope (exit) 691 .destroy(front); 692 693 if (auto v = front.get.id in awaitedResponses) { 694 // TODO: reduce the lookups on front.id 695 v.behavior(front.get.data); 696 try { 697 () @trusted { v.behavior.free; }(); 698 } catch (Exception e) { 699 } 700 awaitedResponses.remove(front.get.id); 701 removeReplyTimeout(front.get.id); 702 } else { 703 // TODO: should probably be SystemError.unexpectedResponse? 704 defaultHandler_(this, front.get.data); 705 } 706 } 707 708 void processDelayed(const SysTime now) @trusted { 709 if (!addr.get.empty!DelayedMsg) { 710 // count as a message because handling them are "expensive". 711 // Ignoring the case that the message right away is moved to the 712 // incoming queue. This lead to "double accounting" but ohh well. 713 // Don't use delayedSend when you should have used send. 714 messages_++; 715 delayed.insert(addr.get.pop!DelayedMsg.unsafeMove); 716 } else if (delayed.empty) { 717 return; 718 } 719 720 foreach (const i; 0 .. delayed.length) { 721 if (now > delayed.front.triggerAt) { 722 addr.get.put(delayed.front.msg); 723 delayed.removeFront; 724 } else { 725 break; 726 } 727 } 728 } 729 730 private void removeReplyTimeout(ulong id) @safe nothrow { 731 import std.algorithm : remove; 732 733 foreach (const i; 0 .. replyTimeouts.length) { 734 if (replyTimeouts[i].id == id) { 735 remove(replyTimeouts, i); 736 break; 737 } 738 } 739 } 740 741 void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted { 742 if (!isAccepting) 743 return; 744 745 if (auto v = signature in incoming) { 746 try { 747 v.free; 748 } catch (Exception e) { 749 } 750 } 751 incoming[signature] = handler; 752 } 753 754 void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted { 755 if (!isAccepting) 756 return; 757 758 if (auto v = signature in reqBehavior) { 759 try { 760 v.free; 761 } catch (Exception e) { 762 } 763 } 764 reqBehavior[signature] = handler; 765 } 766 767 void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler, 768 void*) reply, ErrorHandler onError) @safe { 769 if (!isAccepting) 770 return; 771 772 awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError); 773 replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout); 774 schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts); 775 } 776 } 777 778 struct Closure(Fn, CtxT) { 779 alias FreeFn = void function(CtxT); 780 781 Fn fn; 782 CtxT ctx; 783 FreeFn cleanup; 784 785 this(Fn fn) { 786 this.fn = fn; 787 } 788 789 this(Fn fn, CtxT* ctx, FreeFn cleanup) { 790 this.fn = fn; 791 this.ctx = ctx; 792 this.cleanup = cleanup; 793 } 794 795 void opCall(Args...)(auto ref Args args) { 796 assert(fn !is null); 797 fn(ctx, args); 798 } 799 800 void free() { 801 // will crash, on purpuse, if there is a ctx and no cleanup registered. 802 // maybe a bad idea? dunno... lets see 803 if (ctx) 804 cleanup(ctx); 805 ctx = CtxT.init; 806 } 807 } 808 809 @("shall register a behavior to be called when msg received matching signature") 810 unittest { 811 auto addr = makeAddress2; 812 auto actor = Actor(addr); 813 814 bool processedIncoming; 815 void fn(void* ctx, ref Variant msg) { 816 processedIncoming = true; 817 } 818 819 actor.register(1, Closure!(MsgHandler, void*)(&fn)); 820 addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42))))); 821 822 actor.process(Clock.currTime); 823 824 assert(processedIncoming); 825 } 826 827 private void cleanupCtx(CtxT)(void* ctx) 828 if (is(CtxT == Tuple!T, T) || is(CtxT == void)) { 829 import std.traits; 830 import my.actor.typed; 831 832 static if (!is(CtxT == void)) { 833 // trust that any use of this also pass on the correct context type. 834 auto userCtx = () @trusted { return cast(CtxT*) ctx; }(); 835 // release the context such as if it holds a rc object. 836 alias Types = CtxT.Types; 837 838 static foreach (const i; 0 .. CtxT.Types.length) { 839 { 840 alias T = CtxT.Types[i]; 841 alias UT = Unqual!T; 842 static if (!is(T == UT)) { 843 static assert(!is(UT : WeakAddress), 844 "WeakAddress must NEVER be const or immutable"); 845 static assert(!is(UT : TypedAddress!M, M...), 846 "WeakAddress must NEVER be const or immutable: " ~ T.stringof); 847 } 848 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit? 849 850 static if (is(UT == T)) { 851 .destroy((*userCtx)[i]); 852 } 853 } 854 } 855 } 856 } 857 858 @("shall default initialize when possible, skipping const/immutable") 859 unittest { 860 { 861 auto x = tuple(cast(const) 42, 43); 862 alias T = typeof(x); 863 cleanupCtx!T(cast(void*)&x); 864 assert(x[0] == 42); // can't assign to const 865 assert(x[1] == 0); 866 } 867 868 { 869 import my.path : Path; 870 871 auto x = tuple(Path.init, cast(const) Path("foo")); 872 alias T = typeof(x); 873 cleanupCtx!T(cast(void*)&x); 874 assert(x[0] == Path.init); 875 assert(x[1] == Path("foo")); 876 } 877 } 878 879 package struct Action { 880 Closure!(MsgHandler, void*) action; 881 ulong signature; 882 } 883 884 /// An behavior for an actor when it receive a message of `signature`. 885 package auto makeAction(T, CtxT = void)(T handler) @safe 886 if (isFunction!T || isFunctionPointer!T) { 887 static if (is(CtxT == void)) 888 alias Params = Parameters!T; 889 else { 890 alias CtxParam = Parameters!T[0]; 891 alias Params = Parameters!T[1 .. $]; 892 checkMatchingCtx!(CtxParam, CtxT); 893 checkRefForContext!handler; 894 } 895 896 alias HArgs = staticMap!(Unqual, Params); 897 898 void fn(void* ctx, ref Variant msg) @trusted { 899 static if (is(CtxT == void)) { 900 handler(msg.get!(Tuple!HArgs).expand); 901 } else { 902 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 903 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 904 } 905 } 906 907 return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 908 } 909 910 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe { 911 static if (is(CtxT == void)) 912 alias Params = Parameters!T; 913 else { 914 alias CtxParam = Parameters!T[0]; 915 alias Params = Parameters!T[1 .. $]; 916 checkMatchingCtx!(CtxParam, CtxT); 917 checkRefForContext!handler; 918 } 919 920 alias HArgs = staticMap!(Unqual, Params); 921 922 void fn(void* ctx, ref Variant msg) @trusted { 923 static if (is(CtxT == void)) { 924 handler(msg.get!(Tuple!HArgs).expand); 925 } else { 926 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 927 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 928 } 929 } 930 931 return typeof(return)(&fn, null, &cleanupCtx!CtxT); 932 } 933 934 package struct Request { 935 Closure!(RequestHandler, void*) request; 936 ulong signature; 937 } 938 939 private string locToString(Loc...)() { 940 import std.conv : to; 941 942 return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string; 943 } 944 945 /// Check that the context parameter is `ref` otherwise issue a warning. 946 package void checkRefForContext(alias handler)() { 947 import std.traits : ParameterStorageClass, ParameterStorageClassTuple; 948 949 alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0]; 950 951 static if (CtxParam != ParameterStorageClass.ref_) { 952 pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof); 953 static assert(CtxParam == ParameterStorageClass.ref_, 954 "The context must be `ref` to avoid unnecessary copying"); 955 } 956 } 957 958 package void checkMatchingCtx(CtxParam, CtxT)() { 959 static if (!is(CtxT == CtxParam)) { 960 static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }), 961 "mismatch between the context type " ~ CtxT.stringof 962 ~ " and the first parameter " ~ CtxParam.stringof); 963 } 964 } 965 966 package auto makeRequest(T, CtxT = void)(T handler) @safe { 967 static assert(!is(ReturnType!T == void), "handler returns void, not allowed"); 968 969 alias RType = ReturnType!T; 970 enum isReqResult = is(RType : RequestResult!ReqT, ReqT); 971 enum isPromise = is(RType : Promise!PromT, PromT); 972 973 static if (is(CtxT == void)) 974 alias Params = Parameters!T; 975 else { 976 alias CtxParam = Parameters!T[0]; 977 alias Params = Parameters!T[1 .. $]; 978 checkMatchingCtx!(CtxParam, CtxT); 979 checkRefForContext!handler; 980 } 981 982 alias HArgs = staticMap!(Unqual, Params); 983 984 void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted { 985 static if (is(CtxT == void)) { 986 auto r = handler(msg.get!(Tuple!HArgs).expand); 987 } else { 988 auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx; 989 auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand); 990 } 991 992 static if (isReqResult) { 993 r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) { 994 assert(!a.data.empty, "the promise MUST be constructed before it is returned"); 995 a.data.get.replyId = replyId; 996 a.data.get.replyTo = replyTo; 997 }, (data) { 998 enum wrapInTuple = !is(typeof(data) : Tuple!U, U); 999 if (auto rc = replyTo.lock.get) { 1000 static if (wrapInTuple) 1001 rc.put(Reply(replyId, Variant(tuple(data)))); 1002 else 1003 rc.put(Reply(replyId, Variant(data))); 1004 } 1005 }); 1006 } else static if (isPromise) { 1007 r.data.get.replyId = replyId; 1008 r.data.get.replyTo = replyTo; 1009 } else { 1010 // TODO: is this syntax for U one variable or variable. I want it to be variable. 1011 enum wrapInTuple = !is(RType : Tuple!U, U); 1012 if (auto rc = replyTo.lock.get) { 1013 static if (wrapInTuple) 1014 rc.put(Reply(replyId, Variant(tuple(r)))); 1015 else 1016 rc.put(Reply(replyId, Variant(r))); 1017 } 1018 } 1019 } 1020 1021 return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 1022 } 1023 1024 @("shall link two actors lifetime") 1025 unittest { 1026 int count; 1027 void countExits(ref Actor self, ExitMsg msg) @safe nothrow { 1028 count++; 1029 self.shutdown; 1030 } 1031 1032 auto aa1 = Actor(makeAddress2); 1033 auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize; 1034 auto aa2 = Actor(makeAddress2); 1035 auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize; 1036 1037 a1.linkTo(a2.address); 1038 a1.process(Clock.currTime); 1039 a2.process(Clock.currTime); 1040 1041 assert(a1.isAlive); 1042 assert(a2.isAlive); 1043 1044 sendExit(a1.address, ExitReason.userShutdown); 1045 foreach (_; 0 .. 5) { 1046 a1.process(Clock.currTime); 1047 a2.process(Clock.currTime); 1048 } 1049 1050 assert(!a1.isAlive); 1051 assert(!a2.isAlive); 1052 assert(count == 2); 1053 } 1054 1055 @("shall let one actor monitor the lifetime of the other one") 1056 unittest { 1057 int count; 1058 void downMsg(ref Actor self, DownMsg msg) @safe nothrow { 1059 count++; 1060 } 1061 1062 auto aa1 = Actor(makeAddress2); 1063 auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize; 1064 auto aa2 = Actor(makeAddress2); 1065 auto a2 = build(&aa2).set((int x) {}).finalize; 1066 1067 a1.monitor(a2.address); 1068 a1.process(Clock.currTime); 1069 a2.process(Clock.currTime); 1070 1071 assert(a1.isAlive); 1072 assert(a2.isAlive); 1073 1074 sendExit(a2.address, ExitReason.userShutdown); 1075 foreach (_; 0 .. 5) { 1076 a1.process(Clock.currTime); 1077 a2.process(Clock.currTime); 1078 } 1079 1080 assert(a1.isAlive); 1081 assert(!a2.isAlive); 1082 assert(count == 1); 1083 } 1084 1085 private struct BuildActor { 1086 Actor* actor; 1087 1088 Actor* finalize() @safe { 1089 auto rval = actor; 1090 actor = null; 1091 return rval; 1092 } 1093 1094 auto errorHandler(ErrorHandler a) { 1095 actor.errorHandler = a; 1096 return this; 1097 } 1098 1099 auto downHandler_(DownHandler a) { 1100 actor.downHandler_ = a; 1101 return this; 1102 } 1103 1104 auto exitHandler_(ExitHandler a) { 1105 actor.exitHandler_ = a; 1106 return this; 1107 } 1108 1109 auto exceptionHandler_(ExceptionHandler a) { 1110 actor.exceptionHandler_ = a; 1111 return this; 1112 } 1113 1114 auto defaultHandler_(DefaultHandler a) { 1115 actor.defaultHandler_ = a; 1116 return this; 1117 } 1118 1119 auto set(BehaviorT)(BehaviorT behavior) 1120 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1121 && !is(ReturnType!BehaviorT == void)) { 1122 auto act = makeRequest(behavior); 1123 actor.register(act.signature, act.request); 1124 return this; 1125 } 1126 1127 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1128 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1129 && !is(ReturnType!BehaviorT == void)) { 1130 auto act = makeRequest!(BehaviorT, CT)(behavior); 1131 // for now just use the GC to allocate the context on. 1132 // TODO: use an allocator. 1133 act.request.ctx = cast(void*) new CT(c); 1134 actor.register(act.signature, act.request); 1135 return this; 1136 } 1137 1138 auto set(BehaviorT)(BehaviorT behavior) 1139 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1140 && is(ReturnType!BehaviorT == void)) { 1141 auto act = makeAction(behavior); 1142 actor.register(act.signature, act.action); 1143 return this; 1144 } 1145 1146 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1147 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1148 && is(ReturnType!BehaviorT == void)) { 1149 auto act = makeAction!(BehaviorT, CT)(behavior); 1150 // for now just use the GC to allocate the context on. 1151 // TODO: use an allocator. 1152 act.action.ctx = cast(void*) new CT(c); 1153 actor.register(act.signature, act.action); 1154 return this; 1155 } 1156 } 1157 1158 package BuildActor build(Actor* a) @safe { 1159 return BuildActor(a); 1160 } 1161 1162 /// Implement an actor. 1163 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) { 1164 import my.actor.msg : isCapture, Capture; 1165 1166 auto bactor = build(self); 1167 static foreach (const i; 0 .. Behavior.length) { 1168 { 1169 alias b = Behavior[i]; 1170 1171 static if (!isCapture!b) { 1172 static if (!(isFunction!(b) || isFunctionPointer!(b))) 1173 static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof); 1174 1175 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) { 1176 bactor.set(behaviors[i], behaviors[i + 1]); 1177 } else 1178 bactor.set(behaviors[i]); 1179 } 1180 } 1181 } 1182 1183 return bactor.finalize; 1184 } 1185 1186 @("build dynamic actor from functions") 1187 unittest { 1188 static void fn3(int s) @safe { 1189 } 1190 1191 static string fn4(int s) @safe { 1192 return "foo"; 1193 } 1194 1195 static Tuple!(int, string) fn5(const string s) @safe { 1196 return typeof(return)(42, "hej"); 1197 } 1198 1199 auto aa1 = Actor(makeAddress2); 1200 auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize; 1201 } 1202 1203 unittest { 1204 bool delayOk; 1205 static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe { 1206 *c.delayOk = true; 1207 } 1208 1209 bool delayShouldNeverHappen; 1210 static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe { 1211 *c.delayShouldNeverHappen = true; 1212 } 1213 1214 auto aa1 = Actor(makeAddress2); 1215 auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2, 1216 capture(&delayShouldNeverHappen)).finalize; 1217 delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo"); 1218 delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42); 1219 1220 assert(!actor.addressRef.get.empty!DelayedMsg); 1221 assert(actor.addressRef.get.empty!Msg); 1222 assert(actor.addressRef.get.empty!Reply); 1223 1224 actor.process(Clock.currTime); 1225 1226 assert(!actor.addressRef.get.empty!DelayedMsg); 1227 assert(actor.addressRef.get.empty!Msg); 1228 assert(actor.addressRef.get.empty!Reply); 1229 1230 actor.process(Clock.currTime); 1231 actor.process(Clock.currTime); 1232 1233 assert(actor.addressRef.get.empty!DelayedMsg); 1234 assert(actor.addressRef.get.empty!Msg); 1235 assert(actor.addressRef.get.empty!Reply); 1236 1237 assert(delayOk); 1238 assert(!delayShouldNeverHappen); 1239 } 1240 1241 @("shall process a request->then chain xyz") 1242 @system unittest { 1243 // checking capture is correctly setup/teardown by using captured rc. 1244 1245 auto rcReq = refCounted(42); 1246 bool calledOk; 1247 static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s, 1248 const string b) { 1249 assert(2 == ctx[1].refCount); 1250 if (s == "apa") 1251 *ctx.calledOk = true; 1252 return "foo"; 1253 } 1254 1255 auto rcReply = refCounted(42); 1256 bool calledReply; 1257 static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) { 1258 *ctx[0] = s == "foo"; 1259 assert(2 == ctx[1].refCount); 1260 } 1261 1262 auto aa1 = Actor(makeAddress2); 1263 auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize; 1264 1265 assert(2 == rcReq.refCount); 1266 assert(1 == rcReply.refCount); 1267 1268 actor.request(actor.address, infTimeout).send("apa", "foo") 1269 .capture(&calledReply, rcReply).then(&reply); 1270 assert(2 == rcReply.refCount); 1271 1272 assert(!actor.addr.get.empty!Msg); 1273 assert(actor.addr.get.empty!Reply); 1274 1275 actor.process(Clock.currTime); 1276 assert(actor.addr.get.empty!Msg); 1277 assert(actor.addr.get.empty!Reply); 1278 1279 assert(2 == rcReq.refCount); 1280 assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back"); 1281 1282 assert(calledOk); 1283 assert(calledReply); 1284 1285 actor.shutdown; 1286 while (actor.isAlive) 1287 actor.process(Clock.currTime); 1288 } 1289 1290 @("shall process a request->then chain using promises") 1291 unittest { 1292 static struct A { 1293 string v; 1294 } 1295 1296 static struct B { 1297 string v; 1298 } 1299 1300 int calledOk; 1301 auto fn1p = makePromise!string; 1302 static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted { 1303 if (a.v == "apa") 1304 (*c.calledOk)++; 1305 return typeof(return)(c.p); 1306 } 1307 1308 auto fn2p = makePromise!string; 1309 static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) { 1310 (*c.calledOk)++; 1311 return c.p; 1312 } 1313 1314 int calledReply; 1315 static void reply(ref Tuple!(int*) ctx, const string s) { 1316 if (s == "foo") 1317 *ctx[0] += 1; 1318 } 1319 1320 auto aa1 = Actor(makeAddress2); 1321 auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2, 1322 capture(&calledOk, fn2p)).finalize; 1323 1324 actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply); 1325 actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply); 1326 1327 actor.process(Clock.currTime); 1328 assert(calledOk == 1); // first request 1329 assert(calledReply == 0); 1330 1331 fn1p.deliver("foo"); 1332 1333 assert(calledReply == 0); 1334 1335 actor.process(Clock.currTime); 1336 assert(calledOk == 2); // second request triggered 1337 assert(calledReply == 1); 1338 1339 fn2p.deliver("foo"); 1340 actor.process(Clock.currTime); 1341 1342 assert(calledReply == 2); 1343 1344 actor.shutdown; 1345 while (actor.isAlive) { 1346 actor.process(Clock.currTime); 1347 } 1348 } 1349 1350 /// The timeout triggered. 1351 class ScopedActorException : Exception { 1352 this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow { 1353 super(null, file, line); 1354 error = err; 1355 } 1356 1357 ScopedActorError error; 1358 } 1359 1360 enum ScopedActorError : ubyte { 1361 none, 1362 // actor address is down 1363 down, 1364 // request timeout 1365 timeout, 1366 // the address where unable to process the received message 1367 unknownMsg, 1368 // some type of fatal error occured. 1369 fatal, 1370 } 1371 1372 /** Intended to be used in a local scope by a user. 1373 * 1374 * `ScopedActor` is not thread safe. 1375 */ 1376 struct ScopedActor { 1377 import my.actor.typed : underlyingAddress, underlyingWeakAddress; 1378 1379 private { 1380 static struct Data { 1381 Actor self; 1382 ScopedActorError errSt; 1383 1384 ~this() @safe { 1385 if (self.addr.empty) 1386 return; 1387 1388 () @trusted { 1389 self.downHandler = null; 1390 self.defaultHandler = toDelegate(&.defaultHandler); 1391 self.errorHandler = toDelegate(&defaultErrorHandler); 1392 }(); 1393 1394 self.shutdown; 1395 while (self.isAlive) { 1396 self.process(Clock.currTime); 1397 } 1398 } 1399 } 1400 1401 RefCounted!Data data; 1402 } 1403 1404 this(StrongAddress addr, string name) @safe { 1405 data = refCounted(Data(Actor(addr))); 1406 data.get.self.name = name; 1407 } 1408 1409 private void reset() @safe nothrow { 1410 data.get.errSt = ScopedActorError.none; 1411 } 1412 1413 SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout) 1414 if (isAddress!TAddress) { 1415 reset; 1416 auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout); 1417 return SRequestSend(rs, this); 1418 } 1419 1420 private static struct SRequestSend { 1421 RequestSend rs; 1422 ScopedActor self; 1423 1424 /// Copy constructor 1425 this(ref return typeof(this) rhs) @safe pure nothrow @nogc { 1426 rs = rhs.rs; 1427 self = rhs.self; 1428 } 1429 1430 @disable this(this); 1431 1432 SRequestSendThen send(Args...)(auto ref Args args) { 1433 return SRequestSendThen(.send(rs, args), self); 1434 } 1435 } 1436 1437 private static struct SRequestSendThen { 1438 RequestSendThen rs; 1439 ScopedActor self; 1440 uint backoff; 1441 1442 /// Copy constructor 1443 this(ref return typeof(this) rhs) { 1444 rs = rhs.rs; 1445 self = rhs.self; 1446 backoff = rhs.backoff; 1447 } 1448 1449 @disable this(this); 1450 1451 void dynIntervalSleep() @trusted { 1452 // +100 usecs "feels good", magic number. current OS and 1453 // implementation of message passing isn't that much faster than 1454 // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They 1455 // aren't expected to be used for "time critical" sections. 1456 Thread.sleep(backoff.dur!"usecs"); 1457 backoff = min(backoff + 100, 20000); 1458 } 1459 1460 private static struct ValueCapture { 1461 RefCounted!Data data; 1462 1463 void downHandler(ref Actor, DownMsg) @safe nothrow { 1464 data.get.errSt = ScopedActorError.down; 1465 } 1466 1467 void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow { 1468 if (msg.reason == SystemError.requestTimeout) 1469 data.get.errSt = ScopedActorError.timeout; 1470 else 1471 data.get.errSt = ScopedActorError.fatal; 1472 } 1473 1474 void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow { 1475 logAndDropHandler(a, msg); 1476 data.get.errSt = ScopedActorError.unknownMsg; 1477 } 1478 } 1479 1480 void then(T)(T handler, ErrorHandler onError = null) { 1481 scope (exit) 1482 demonitor(rs.rs.self, rs.rs.requestTo); 1483 monitor(rs.rs.self, rs.rs.requestTo); 1484 1485 auto callback = new ValueCapture(self.data); 1486 self.data.get.self.downHandler = &callback.downHandler; 1487 self.data.get.self.defaultHandler = &callback.unknownMsgHandler; 1488 self.data.get.self.errorHandler = &callback.errorHandler; 1489 1490 () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }(); 1491 1492 scope (exit) 1493 () @trusted { 1494 self.data.get.self.downHandler = null; 1495 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler); 1496 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler); 1497 }(); 1498 1499 auto requestTo = rs.rs.requestTo.lock; 1500 if (!requestTo) 1501 throw new ScopedActorException(ScopedActorError.down); 1502 1503 // TODO: this loop is stupid... should use a conditional variable 1504 // instead but that requires changing the mailbox. later 1505 do { 1506 rs.rs.self.process(Clock.currTime); 1507 // force the actor to be alive even though there are no behaviors. 1508 rs.rs.self.state_ = ActorState.waiting; 1509 1510 if (self.data.get.errSt == ScopedActorError.none) { 1511 dynIntervalSleep; 1512 } else { 1513 throw new ScopedActorException(self.data.get.errSt); 1514 } 1515 1516 } 1517 while (self.data.get.self.waitingForReply); 1518 } 1519 } 1520 } 1521 1522 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe { 1523 import std.format : format; 1524 1525 return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line)); 1526 } 1527 1528 @( 1529 "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed") 1530 unittest { 1531 import my.actor.system; 1532 1533 auto sys = makeSystem; 1534 1535 auto a0 = sys.spawn((Actor* self) { 1536 return impl(self, (ref CSelf!() ctx, int x) { 1537 Thread.sleep(50.dur!"msecs"); 1538 return 42; 1539 }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self), 1540 (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self)); 1541 }); 1542 1543 { 1544 auto self = scopedActor; 1545 bool excThrown; 1546 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1547 while (!excThrown && Clock.currTime < stopAt) { 1548 try { 1549 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {}); 1550 } catch (ScopedActorException e) { 1551 excThrown = e.error == ScopedActorError.timeout; 1552 } catch (Exception e) { 1553 logger.info(e.msg); 1554 } 1555 } 1556 assert(excThrown, "timeout did not trigger as expected"); 1557 } 1558 1559 { 1560 auto self = scopedActor; 1561 bool excThrown; 1562 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1563 while (!excThrown && Clock.currTime < stopAt) { 1564 try { 1565 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) { 1566 }); 1567 } catch (ScopedActorException e) { 1568 excThrown = e.error == ScopedActorError.down; 1569 } catch (Exception e) { 1570 logger.info(e.msg); 1571 } 1572 } 1573 assert(excThrown, "detecting terminated actor did not trigger as expected"); 1574 } 1575 }