1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module app; 7 8 import core.memory : GC; 9 import core.thread : Thread; 10 import core.time : dur; 11 import logger = std.experimental.logger; 12 import std.algorithm : sort; 13 import std.array : array, appender, empty; 14 import std.datetime : Clock, Duration, dur, SysTime; 15 import std.datetime.stopwatch : StopWatch, AutoStart; 16 import std.exception : collectException; 17 import std.functional : toDelegate; 18 import std.stdio : writeln, writefln; 19 import std.typecons : tuple, Tuple; 20 21 import my.actor; 22 import my.stat; 23 import my.gc.refc; 24 25 immutable MByte = 1024.0 * 1024.0; 26 27 void main(string[] args) { 28 import std.file : thisExePath; 29 import std.format : format; 30 import std.path : baseName; 31 import std.traits; 32 static import std.getopt; 33 34 TestFn[string] metrics; 35 metrics["create"] = toDelegate(&testActorCreate); 36 metrics["send_msg"] = toDelegate(&testActorMsg); 37 metrics["delayed_msg1"] = () => testActorDelayedMsg(1.dur!"msecs", 5.dur!"msecs", 1000); 38 metrics["delayed_msg10"] = () => testActorDelayedMsg(12.dur!"msecs", 10.dur!"msecs", 100); 39 metrics["delayed_msg100"] = () => testActorDelayedMsg(100.dur!"msecs", 100.dur!"msecs", 10); 40 metrics["delayed_msg1000"] = () => testActorDelayedMsg(1000.dur!"msecs", 1000.dur!"msecs", 5); 41 42 string[] metricName; 43 uint repeatTimes = 1; 44 auto helpInfo = std.getopt.getopt(args, "m|metric", format("metric to run %s", 45 metrics.byKey), &metricName, "r|repeat", "repeat the metric test", &repeatTimes); 46 47 if (helpInfo.helpWanted) { 48 std.getopt.defaultGetoptPrinter(format!"usage: %s <options>\n"(thisExePath.baseName), 49 helpInfo.options); 50 return; 51 } 52 53 metricName = metricName.empty ? metrics.byKey.array.sort.array : metricName; 54 55 foreach (const iter; 0 .. repeatTimes) { 56 writeln("# Iteration ", iter); 57 foreach (m; metricName) { 58 writeln("##############"); 59 run(metrics[m]); 60 writeln; 61 } 62 } 63 } 64 65 alias TestFn = Metric delegate(); 66 67 void run(TestFn t) { 68 auto m = t(); 69 writeln("data points ", m.values.length); 70 auto data = m.values.makeData; 71 auto bstat = basicStat(data); 72 writeln(bstat); 73 writeln("95% is < ", (bstat.mean.value + bstat.sd.value * 2.0) / 1000000.0, " ms"); 74 writeln("bytes per actor ", m.mem); 75 } 76 77 struct Metric { 78 double[] values; 79 double mem; 80 } 81 82 struct Mem { 83 ulong start; 84 double peek() { 85 const used = GC.stats.usedSize; 86 if (used < start) 87 return start - used; 88 return used - start; 89 } 90 } 91 92 Mem mem() { 93 return Mem(GC.stats.usedSize); 94 } 95 96 Metric testActorCreate() { 97 writeln("# Test time to create an actor"); 98 writeln("unit: nanoseconds"); 99 100 Metric rval; 101 102 auto sys = makeSystem; 103 auto m = mem; 104 auto perf() { 105 auto sw = StopWatch(AutoStart.yes); 106 foreach (_; 0 .. 1000) 107 sys.spawn((Actor* a) => impl(a, (int a) {})); 108 rval.values ~= sw.peek.total!"nsecs" / 1000.0; 109 } 110 111 foreach (_; 0 .. 1000) 112 perf; 113 114 rval.mem = m.peek / 1000000.0; 115 116 return rval; 117 } 118 119 Metric testActorMsg() { 120 writeln("# How long does it take to send an actor message from actor a->b"); 121 writeln("unit: nanoseconds"); 122 123 Metric rval; 124 125 auto sys = makeSystem; 126 auto m = mem; 127 ulong nrActors; 128 auto perf() { 129 int count; 130 auto a1 = sys.spawn((Actor* a) => impl(a, (ref Capture!(int*, "count") c, int x) { 131 (*c.count)++; 132 }, capture(&count))); 133 nrActors++; 134 135 Actor* spawnA2(Actor* self) { 136 static void fn(ref Capture!(Actor*, "self", WeakAddress, "a1") c, int x) { 137 send(c.a1, x); 138 send(c.self.address, x + 1); 139 if (x > 100) 140 c.self.shutdown; 141 } 142 143 return impl(self, &fn, capture(self, a1)); 144 } 145 146 auto actors = appender!(WeakAddress[])(); 147 actors.put(a1); 148 foreach (_; 0 .. 100) { 149 actors.put(sys.spawn(&spawnA2)); 150 nrActors++; 151 } 152 153 auto sw = StopWatch(AutoStart.yes); 154 foreach (a; actors.data) 155 send(a, 1); 156 157 int reqs; 158 while (count < 10000) { 159 Thread.sleep(1.dur!"msecs"); 160 } 161 rval.values ~= sw.peek.total!"nsecs" / cast(double) count; 162 163 foreach (a; actors.data) 164 sendExit(a, ExitReason.userShutdown); 165 } 166 167 foreach (_; 0 .. 100) 168 perf; 169 170 rval.mem = m.peek / cast(double) nrActors; 171 172 return rval; 173 } 174 175 Metric testActorDelayedMsg(Duration delayFor, Duration rate, const ulong dataPoints) { 176 writeln("# Test delayed message trigger jitter"); 177 writefln("delay: %s rate: %s", delayFor, rate); 178 writeln("What is the jitter of a delayed message compared to the expected arrival time"); 179 writeln("unit: nanoseconds"); 180 181 Metric rval; 182 183 import std.parallelism; 184 185 auto sys = System(new TaskPool(4), true); 186 auto m = mem; 187 188 auto perf() { 189 static struct Get { 190 } 191 192 static struct Msg { 193 SysTime expectedArrival; 194 } 195 196 static struct StartMsg { 197 } 198 199 auto sender = sys.spawn((Actor* self) { 200 self.name = "sender"; 201 //self.exitHandler((ref Actor self, ExitMsg m) nothrow{ 202 // logger.info("sender exit").collectException; 203 // self.shutdown; 204 //}); 205 206 return impl(self, (ref Capture!(Actor*, "self", Duration, "delay", 207 Duration, "rate") ctx, WeakAddress recv) { 208 delayedSend(recv, delay(ctx.delay), Msg(Clock.currTime + ctx.delay)); 209 delayedSend(ctx.self, delay(ctx.rate), recv); 210 }, capture(self, delayFor, rate)); 211 }); 212 213 auto collector = sys.spawn((Actor* self) { 214 self.name = "collector"; 215 self.exitHandler((ref Actor self, ExitMsg m) nothrow{ 216 logger.info("collector exit").collectException; 217 self.shutdown; 218 }); 219 220 auto st = tuple!("diffs")(refCounted((double[]).init)); 221 alias CT = typeof(st); 222 return impl(self, (ref CT ctx, Duration d) { 223 ctx.diffs.get ~= cast(double) d.total!"nsecs"; 224 }, capture(st), (ref CT ctx, Get _) { 225 auto tmp = ctx.diffs.get.dup; 226 ctx.diffs.get = null; 227 return tmp; 228 }, capture(st)); 229 }); 230 231 auto recv = sys.spawn((Actor* self, WeakAddress collector) { 232 self.name = "recv"; 233 //self.exitHandler((ref Actor self, ExitMsg m) nothrow{ 234 // logger.info("recv exit").collectException; 235 // self.shutdown; 236 //}); 237 238 return impl(self, (ref Capture!(WeakAddress, "collector") ctx, Msg m) { 239 send(ctx.collector, Clock.currTime - m.expectedArrival); 240 }, capture(collector)); 241 }, collector); 242 243 // one dies, both goes down. 244 collector.linkTo(sender); 245 collector.linkTo(recv); 246 send(sender, recv); 247 248 auto self = scopedActor; 249 double[] values; 250 while (values.length < dataPoints) { 251 self.request(collector, infTimeout).send(Get.init).then((double[] d) { 252 values ~= d; 253 }); 254 Thread.sleep(50.dur!"msecs"); 255 } 256 rval.values ~= values; 257 sendExit(collector, ExitReason.userShutdown); 258 } 259 260 foreach (_; 0 .. 10) 261 perf; 262 263 return rval; 264 }