1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module proc.process;
7 
8 import core.sys.posix.signal : SIGKILL;
9 import core.thread : Thread;
10 import core.time : dur, Duration;
11 import logger = std.experimental.logger;
12 import std.algorithm : filter, count, joiner, map;
13 import std.array : appender, empty, array;
14 import std.exception : collectException;
15 import std.stdio : File, fileno, writeln;
16 import std.typecons : Flag, Yes;
17 static import std.process;
18 static import std.stdio;
19 
20 import my.gc.refc;
21 import my.from_;
22 import my.path;
23 import my.named_type;
24 
25 public import proc.channel;
26 public import proc.pid;
27 public import proc.tty;
28 
29 version (unittest) {
30     import std.file : remove;
31 }
32 
33 /** Manage a process by reference counting so that it is terminated when the it
34  * stops being used such as the instance going out of scope.
35  */
36 auto rcKill(T)(T p, int signal = SIGKILL) {
37     return ScopeKill!T(p, signal);
38 }
39 
40 // backward compatibility.
41 alias scopeKill = rcKill;
42 
43 struct ScopeKill(T) {
44     private {
45         static struct Payload {
46             T process;
47             int signal = SIGKILL;
48             bool hasProcess;
49 
50             ~this() @safe {
51                 if (hasProcess)
52                     process.dispose();
53             }
54         }
55 
56         RefCounted!Payload payload_;
57     }
58 
59     alias process this;
60     ref T process() {
61         return payload_.get.process;
62     }
63 
64     this(T process, int signal) @safe {
65         payload_ = refCounted(Payload(process, signal, true));
66     }
67 }
68 
69 /// Async process wrapper for a std.process SpawnProcess
70 struct SpawnProcess {
71     import core.sys.posix.signal : SIGKILL;
72     import std.algorithm : among;
73 
74     private {
75         enum State {
76             running,
77             terminated,
78             exitCode
79         }
80 
81         std.process.Pid process;
82         RawPid pid;
83         int status_;
84         State st;
85     }
86 
87     this(std.process.Pid process) @safe {
88         this.process = process;
89         this.pid = process.osHandle.RawPid;
90     }
91 
92     ~this() @safe {
93     }
94 
95     /// Returns: The raw OS handle for the process ID.
96     RawPid osHandle() nothrow @safe {
97         return pid;
98     }
99 
100     /// Kill and cleanup the process.
101     void dispose() @safe {
102         final switch (st) {
103         case State.running:
104             this.kill;
105             this.wait;
106             break;
107         case State.terminated:
108             this.wait;
109             break;
110         case State.exitCode:
111             break;
112         }
113 
114         st = State.exitCode;
115     }
116 
117     /** Send `signal` to the process.
118      *
119      * Param:
120      *  signal = a signal from `core.sys.posix.signal`
121      */
122     void kill(int signal = SIGKILL) nothrow @trusted {
123         final switch (st) {
124         case State.running:
125             break;
126         case State.terminated:
127             goto case;
128         case State.exitCode:
129             return;
130         }
131 
132         try {
133             std.process.kill(process, signal);
134         } catch (Exception e) {
135         }
136 
137         st = State.terminated;
138     }
139 
140     /// Blocking wait for the process to terminated.
141     /// Returns: the exit status.
142     int wait() @safe {
143         final switch (st) {
144         case State.running:
145             status_ = std.process.wait(process);
146             break;
147         case State.terminated:
148             status_ = std.process.wait(process);
149             break;
150         case State.exitCode:
151             break;
152         }
153 
154         st = State.exitCode;
155 
156         return status_;
157     }
158 
159     /// Non-blocking wait for the process termination.
160     /// Returns: `true` if the process has terminated.
161     bool tryWait() @safe {
162         final switch (st) {
163         case State.running:
164             auto s = std.process.tryWait(process);
165             if (s.terminated) {
166                 st = State.exitCode;
167                 status_ = s.status;
168             }
169             break;
170         case State.terminated:
171             status_ = std.process.wait(process);
172             st = State.exitCode;
173             break;
174         case State.exitCode:
175             break;
176         }
177 
178         return st.among(State.terminated, State.exitCode) != 0;
179     }
180 
181     /// Returns: The exit status of the process.
182     int status() @safe {
183         if (st != State.exitCode) {
184             throw new Exception(
185                     "Process has not terminated and wait/tryWait been called to collect the exit status");
186         }
187         return status_;
188     }
189 
190     /// Returns: If the process has terminated.
191     bool terminated() @safe {
192         return st.among(State.terminated, State.exitCode) != 0;
193     }
194 }
195 
196 /// Async process that do not block on read from stdin/stderr.
197 struct PipeProcess {
198     import std.algorithm : among;
199     import core.sys.posix.signal : SIGKILL;
200 
201     private {
202         enum State {
203             running,
204             terminated,
205             exitCode
206         }
207 
208         std.process.ProcessPipes process;
209         std.process.Pid pid;
210 
211         FileReadChannel stderr_;
212         FileReadChannel stdout_;
213         FileWriteChannel stdin_;
214         int status_;
215         State st;
216     }
217 
218     this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe {
219         this.pid = pid;
220 
221         this.stdin_ = FileWriteChannel(stdin);
222         this.stdout_ = FileReadChannel(stdout);
223         this.stderr_ = FileReadChannel(stderr);
224     }
225 
226     this(std.process.ProcessPipes process, std.process.Redirect r) @safe {
227         this.process = process;
228         this.pid = process.pid;
229 
230         if (r & std.process.Redirect.stdin) {
231             stdin_ = FileWriteChannel(this.process.stdin);
232         }
233         if (r & std.process.Redirect.stdout) {
234             stdout_ = FileReadChannel(this.process.stdout);
235         }
236         if (r & std.process.Redirect.stderr) {
237             this.stderr_ = FileReadChannel(this.process.stderr);
238         }
239     }
240 
241     /// Returns: The raw OS handle for the process ID.
242     RawPid osHandle() nothrow @safe {
243         return pid.osHandle.RawPid;
244     }
245 
246     /// Access to stdout.
247     ref FileWriteChannel stdin() return scope nothrow @safe {
248         return stdin_;
249     }
250 
251     /// Access to stdout.
252     ref FileReadChannel stdout() return scope nothrow @safe {
253         return stdout_;
254     }
255 
256     /// Access stderr.
257     ref FileReadChannel stderr() return scope nothrow @safe {
258         return stderr_;
259     }
260 
261     /// Kill and cleanup the process.
262     void dispose() @safe {
263         final switch (st) {
264         case State.running:
265             this.kill;
266             this.wait;
267             .destroy(process);
268             break;
269         case State.terminated:
270             this.wait;
271             .destroy(process);
272             break;
273         case State.exitCode:
274             break;
275         }
276 
277         st = State.exitCode;
278     }
279 
280     /** Send `signal` to the process.
281      *
282      * Param:
283      *  signal = a signal from `core.sys.posix.signal`
284      */
285     void kill(int signal = SIGKILL) nothrow @trusted {
286         final switch (st) {
287         case State.running:
288             break;
289         case State.terminated:
290             return;
291         case State.exitCode:
292             return;
293         }
294 
295         try {
296             std.process.kill(pid, signal);
297         } catch (Exception e) {
298         }
299 
300         st = State.terminated;
301     }
302 
303     /// Blocking wait for the process to terminated.
304     /// Returns: the exit status.
305     int wait() @safe {
306         final switch (st) {
307         case State.running:
308             status_ = std.process.wait(pid);
309             break;
310         case State.terminated:
311             status_ = std.process.wait(pid);
312             break;
313         case State.exitCode:
314             break;
315         }
316 
317         st = State.exitCode;
318 
319         return status_;
320     }
321 
322     /// Non-blocking wait for the process termination.
323     /// Returns: `true` if the process has terminated.
324     bool tryWait() @safe {
325         final switch (st) {
326         case State.running:
327             auto s = std.process.tryWait(pid);
328             if (s.terminated) {
329                 st = State.exitCode;
330                 status_ = s.status;
331             }
332             break;
333         case State.terminated:
334             status_ = std.process.wait(pid);
335             st = State.exitCode;
336             break;
337         case State.exitCode:
338             break;
339         }
340 
341         return st.among(State.terminated, State.exitCode) != 0;
342     }
343 
344     /// Returns: The exit status of the process.
345     int status() @safe {
346         if (st != State.exitCode) {
347             throw new Exception(
348                     "Process has not terminated and wait/tryWait been called to collect the exit status");
349         }
350         return status_;
351     }
352 
353     /// Returns: If the process has terminated.
354     bool terminated() @safe {
355         return st.among(State.terminated, State.exitCode) != 0;
356     }
357 }
358 
359 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin,
360         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
361         const string[string] env = null, std.process.Config config = std.process.Config.none,
362         scope const char[] workDir = null) {
363     return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr,
364             env, config, workDir));
365 }
366 
367 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env,
368         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
369     return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin,
370             std.stdio.stdout, std.stdio.stderr, env, config, workDir));
371 }
372 
373 SpawnProcess spawnProcess(scope const(char)[] program,
374         File stdin = std.stdio.stdin, File stdout = std.stdio.stdout,
375         File stderr = std.stdio.stderr, const string[string] env = null,
376         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
377     return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin,
378             stdout, stderr, env, config, workDir));
379 }
380 
381 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin,
382         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
383         scope const string[string] env = null, std.process.Config config = std.process.Config.none,
384         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
385     return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr,
386             env, config, workDir, shellPath));
387 }
388 
389 /// ditto
390 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env,
391         std.process.Config config = std.process.Config.none,
392         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
393     return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath));
394 }
395 
396 PipeProcess pipeProcess(scope const(char[])[] args,
397         std.process.Redirect redirect = std.process.Redirect.all,
398         const string[string] env = null, std.process.Config config = std.process.Config.none,
399         scope const(char)[] workDir = null) @safe {
400     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect);
401 }
402 
403 PipeProcess pipeShell(scope const(char)[] command,
404         std.process.Redirect redirect = std.process.Redirect.all,
405         const string[string] env = null, std.process.Config config = std.process.Config.none,
406         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
407     return PipeProcess(std.process.pipeShell(command, redirect, env, config,
408             workDir, shellPath), redirect);
409 }
410 
411 /** Moves the process to a separate process group and on exit kill it and all
412  * its children.
413  */
414 @safe struct Sandbox(ProcessT) {
415     import core.sys.posix.signal : SIGKILL;
416 
417     private {
418         ProcessT p;
419         RawPid pid;
420     }
421 
422     this(ProcessT p) @safe {
423         import core.sys.posix.unistd : setpgid;
424 
425         this.p = p;
426         this.pid = p.osHandle;
427         setpgid(pid, 0);
428     }
429 
430     RawPid osHandle() nothrow @safe {
431         return pid;
432     }
433 
434     static if (__traits(hasMember, ProcessT, "stdin")) {
435         ref FileWriteChannel stdin() nothrow @safe {
436             return p.stdin;
437         }
438     }
439 
440     static if (__traits(hasMember, ProcessT, "stdout")) {
441         ref FileReadChannel stdout() nothrow @safe {
442             return p.stdout;
443         }
444     }
445 
446     static if (__traits(hasMember, ProcessT, "stderr")) {
447         ref FileReadChannel stderr() nothrow @safe {
448             return p.stderr;
449         }
450     }
451 
452     void dispose() @safe {
453         // this also reaps the children thus cleaning up zombies
454         this.kill;
455         p.dispose;
456     }
457 
458     /** Send `signal` to the process.
459      *
460      * Param:
461      *  signal = a signal from `core.sys.posix.signal`
462      */
463     void kill(int signal = SIGKILL) nothrow @safe {
464         // must first retrieve the submap because after the process is killed
465         // its children may have changed.
466         auto pmap = makePidMap.getSubMap(pid);
467 
468         p.kill(signal);
469 
470         // only kill and reap the children
471         pmap.remove(pid);
472         proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap;
473     }
474 
475     int wait() @safe {
476         return p.wait;
477     }
478 
479     bool tryWait() @safe {
480         return p.tryWait;
481     }
482 
483     int status() @safe {
484         return p.status;
485     }
486 
487     bool terminated() @safe {
488         return p.terminated;
489     }
490 }
491 
492 auto sandbox(T)(T p) @safe {
493     return Sandbox!T(p);
494 }
495 
496 @("shall terminate a group of processes")
497 unittest {
498     import std.datetime.stopwatch : StopWatch, AutoStart;
499 
500     immutable scriptName = makeScript(`#!/bin/bash
501 sleep 10m &
502 sleep 10m &
503 sleep 10m
504 `);
505     scope (exit)
506         remove(scriptName);
507 
508     auto p = pipeProcess([scriptName]).sandbox.rcKill;
509     waitUntilChildren(p.osHandle, 3);
510     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
511     p.kill;
512     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
513     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
514 
515     assert(p.wait == -9);
516     assert(p.terminated);
517     assert(preChildren == 3);
518     assert(postChildren == 0);
519 }
520 
521 /** dispose the process after the timeout.
522  */
523 @safe struct Timeout(ProcessT) {
524     import core.sys.posix.signal : SIGKILL;
525     import core.thread;
526     import std.algorithm : among;
527     import std.datetime : Clock, Duration;
528 
529     private {
530         enum Msg {
531             none,
532             stop,
533             status,
534         }
535 
536         enum Reply {
537             none,
538             running,
539             normalDeath,
540             killedByTimeout,
541         }
542 
543         static struct Payload {
544             ProcessT p;
545             RawPid pid;
546             Background background;
547             Reply backgroundReply;
548         }
549 
550         RefCounted!Payload rc;
551     }
552 
553     this(ProcessT p, Duration timeout) @trusted {
554         import std.algorithm : move;
555 
556         auto pid = p.osHandle;
557         rc = refCounted(Payload(move(p), pid));
558         rc.get.background = new Background(&rc.get.p, timeout);
559         rc.get.background.isDaemon = true;
560         rc.get.background.start;
561     }
562 
563     ~this() @trusted {
564         rc.release;
565     }
566 
567     private static class Background : Thread {
568         import core.sync.condition : Condition;
569         import core.sync.mutex : Mutex;
570 
571         Duration timeout;
572         ProcessT* p;
573         Mutex mtx;
574         Msg[] msg;
575         Reply reply_;
576         RawPid pid;
577         int signal = SIGKILL;
578 
579         this(ProcessT* p, Duration timeout) {
580             this.p = p;
581             this.timeout = timeout;
582             this.mtx = new Mutex();
583             this.pid = p.osHandle;
584 
585             super(&run);
586         }
587 
588         void run() {
589             checkProcess(this.pid, this.timeout, this);
590         }
591 
592         void put(Msg msg) @trusted nothrow {
593             this.mtx.lock_nothrow();
594             scope (exit)
595                 this.mtx.unlock_nothrow();
596             this.msg ~= msg;
597         }
598 
599         Msg popMsg() @trusted nothrow {
600             this.mtx.lock_nothrow();
601             scope (exit)
602                 this.mtx.unlock_nothrow();
603             if (msg.empty)
604                 return Msg.none;
605             auto rval = msg[$ - 1];
606             msg = msg[0 .. $ - 1];
607             return rval;
608         }
609 
610         void setReply(Reply reply_) @trusted nothrow {
611             this.mtx.lock_nothrow();
612             scope (exit)
613                 this.mtx.unlock_nothrow();
614             this.reply_ = reply_;
615         }
616 
617         Reply reply() @trusted nothrow {
618             this.mtx.lock_nothrow();
619             scope (exit)
620                 this.mtx.unlock_nothrow();
621             return reply_;
622         }
623 
624         void setSignal(int signal) @trusted nothrow {
625             this.mtx.lock_nothrow();
626             scope (exit)
627                 this.mtx.unlock_nothrow();
628             this.signal = signal;
629         }
630 
631         void kill() @trusted nothrow {
632             this.mtx.lock_nothrow();
633             scope (exit)
634                 this.mtx.unlock_nothrow();
635             p.kill(signal);
636         }
637     }
638 
639     private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow {
640         import std.algorithm : max, min;
641         import std.variant : Variant;
642         static import core.sys.posix.signal;
643 
644         const stopAt = Clock.currTime + timeout;
645         // the purpose is to poll the process often "enough" that if it
646         // terminates early `Process` detects it fast enough. 1000 is chosen
647         // because it "feels good". the purpose
648         auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs";
649 
650         bool forceStop;
651         bool running = true;
652         while (running && Clock.currTime < stopAt) {
653             const msg = bg.popMsg;
654 
655             final switch (msg) {
656             case Msg.none:
657                 () @trusted { Thread.sleep(sleepInterval); }();
658                 break;
659             case Msg.stop:
660                 forceStop = true;
661                 running = false;
662                 break;
663             case Msg.status:
664                 bg.setReply(Reply.running);
665                 break;
666             }
667 
668             () @trusted {
669                 if (core.sys.posix.signal.kill(p, 0) == -1) {
670                     running = false;
671                 }
672             }();
673         }
674 
675         // may be children alive thus must ensure that the whole process tree
676         // is killed if this is a sandbox with a timeout.
677         bg.kill();
678 
679         if (!forceStop && Clock.currTime >= stopAt) {
680             bg.setReply(Reply.killedByTimeout);
681         } else {
682             bg.setReply(Reply.normalDeath);
683         }
684     }
685 
686     RawPid osHandle() nothrow @trusted {
687         return rc.get.pid;
688     }
689 
690     static if (__traits(hasMember, ProcessT, "stdin")) {
691         ref FileWriteChannel stdin() nothrow @safe {
692             return rc.get.p.stdin;
693         }
694     }
695 
696     static if (__traits(hasMember, ProcessT, "stdout")) {
697         ref FileReadChannel stdout() nothrow @safe {
698             return rc.get.p.stdout;
699         }
700     }
701 
702     static if (__traits(hasMember, ProcessT, "stderr")) {
703         ref FileReadChannel stderr() nothrow @trusted {
704             return rc.get.p.stderr;
705         }
706     }
707 
708     void dispose() @trusted {
709         if (rc.get.backgroundReply.among(Reply.none, Reply.running)) {
710             rc.get.background.put(Msg.stop);
711             rc.get.background.join;
712             rc.get.backgroundReply = rc.get.background.reply;
713         }
714         rc.get.p.dispose;
715     }
716 
717     /** Send `signal` to the process.
718      *
719      * Param:
720      *  signal = a signal from `core.sys.posix.signal`
721      */
722     void kill(int signal = SIGKILL) nothrow @trusted {
723         rc.get.background.setSignal(signal);
724         rc.get.background.kill();
725     }
726 
727     int wait() @trusted {
728         while (!this.tryWait) {
729             Thread.sleep(20.dur!"msecs");
730         }
731         return rc.get.p.wait;
732     }
733 
734     bool tryWait() @trusted {
735         return rc.get.p.tryWait;
736     }
737 
738     int status() @trusted {
739         return rc.get.p.status;
740     }
741 
742     bool terminated() @trusted {
743         return rc.get.p.terminated;
744     }
745 
746     bool timeoutTriggered() @trusted {
747         if (rc.get.backgroundReply.among(Reply.none, Reply.running)) {
748             rc.get.background.put(Msg.status);
749             rc.get.backgroundReply = rc.get.background.reply;
750         }
751         return rc.get.backgroundReply == Reply.killedByTimeout;
752     }
753 }
754 
755 auto timeout(T)(T p, Duration timeout_) @trusted {
756     return Timeout!T(p, timeout_);
757 }
758 
759 /// Returns when the process has pending data.
760 void waitForPendingData(ProcessT)(Process p) {
761     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
762         Thread.sleep(20.dur!"msecs");
763     }
764 }
765 
766 @("shall kill the process after the timeout")
767 unittest {
768     import std.datetime.stopwatch : StopWatch, AutoStart;
769 
770     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill;
771     auto sw = StopWatch(AutoStart.yes);
772     p.wait;
773     sw.stop;
774 
775     assert(sw.peek >= 100.dur!"msecs");
776     assert(sw.peek <= 500.dur!"msecs");
777     assert(p.wait == -9);
778     assert(p.terminated);
779     assert(p.status == -9);
780     assert(p.timeoutTriggered);
781 }
782 
783 struct DrainElement {
784     enum Type {
785         stdout,
786         stderr,
787     }
788 
789     Type type;
790     const(ubyte)[] data;
791 
792     /// Returns: iterates the data as an input range.
793     auto byUTF8() @safe pure nothrow const @nogc {
794         static import std.utf;
795 
796         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
797     }
798 
799     bool empty() @safe pure nothrow const @nogc {
800         return data.length == 0;
801     }
802 }
803 
804 /** A range that drains a process stdout/stderr until it terminates.
805  *
806  * There may be `DrainElement` that are empty.
807  */
808 struct DrainRange(ProcessT) {
809     private {
810         enum State {
811             start,
812             draining,
813             lastStdout,
814             lastStderr,
815             lastElement,
816             empty,
817         }
818 
819         ProcessT p;
820         DrainElement front_;
821         State st;
822         ubyte[] buf;
823     }
824 
825     this(ProcessT p) {
826         this.p = p;
827         this.buf = new ubyte[4096];
828     }
829 
830     DrainElement front() @safe pure nothrow const @nogc {
831         assert(!empty, "Can't get front of an empty range");
832         return front_;
833     }
834 
835     void popFront() @safe {
836         assert(!empty, "Can't pop front of an empty range");
837 
838         static bool isAnyPipeOpen(ref ProcessT p) {
839             return p.stdout.isOpen || p.stderr.isOpen;
840         }
841 
842         DrainElement readData(ref ProcessT p) @safe {
843             if (p.stderr.hasPendingData) {
844                 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf));
845             } else if (p.stdout.hasPendingData) {
846                 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf));
847             }
848             return DrainElement.init;
849         }
850 
851         DrainElement waitUntilData() @safe {
852             import std.datetime : Clock;
853 
854             // may livelock if the process never terminates and never writes to
855             // the terminal. timeout ensure that it sooner or later is break
856             // the loop. This is important if the drain is part of a timeout
857             // wrapping.
858 
859             const timeout = 100.dur!"msecs";
860             const stopAt = Clock.currTime + timeout;
861             const sleepFor = timeout / 20;
862             const useSleep = Clock.currTime + sleepFor;
863             bool running = true;
864             while (running) {
865                 const now = Clock.currTime;
866 
867                 running = (now < stopAt) && isAnyPipeOpen(p);
868 
869                 auto bufRead = readData(p);
870 
871                 if (!bufRead.empty) {
872                     return DrainElement(bufRead.type, bufRead.data.dup);
873                 } else if (running && now > useSleep && bufRead.empty) {
874                     import core.thread : Thread;
875 
876                     () @trusted { Thread.sleep(sleepFor); }();
877                 }
878             }
879 
880             return DrainElement.init;
881         }
882 
883         front_ = DrainElement.init;
884 
885         final switch (st) {
886         case State.start:
887             st = State.draining;
888             front_ = waitUntilData;
889             break;
890         case State.draining:
891             if (p.terminated) {
892                 st = State.lastStdout;
893             } else if (isAnyPipeOpen(p)) {
894                 front_ = waitUntilData();
895             } else {
896                 st = State.lastStdout;
897             }
898             break;
899         case State.lastStdout:
900             if (p.stdout.hasPendingData) {
901                 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup);
902             } else {
903                 st = State.lastStderr;
904             }
905             break;
906         case State.lastStderr:
907             if (p.stderr.hasPendingData) {
908                 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup);
909             } else {
910                 st = State.lastElement;
911             }
912             break;
913         case State.lastElement:
914             st = State.empty;
915             break;
916         case State.empty:
917             break;
918         }
919     }
920 
921     bool empty() @safe pure nothrow const @nogc {
922         return st == State.empty;
923     }
924 }
925 
926 /// Drain a process pipe until empty.
927 auto drain(T)(T p) {
928     return DrainRange!T(p);
929 }
930 
931 /// Read the data from a ReadChannel by line.
932 struct DrainByLineCopyRange(ProcessT) {
933     private {
934         enum State {
935             start,
936             draining,
937             lastLine,
938             lastBuf,
939             empty,
940         }
941 
942         ProcessT process;
943         DrainRange!ProcessT range;
944         State st;
945         const(ubyte)[] buf;
946         const(char)[] line;
947     }
948 
949     this(ProcessT p) {
950         process = p;
951         range = p.drain;
952     }
953 
954     string front() @trusted pure nothrow const @nogc {
955         import std.exception : assumeUnique;
956 
957         assert(!empty, "Can't get front of an empty range");
958         return line.assumeUnique;
959     }
960 
961     void popFront() @safe {
962         assert(!empty, "Can't pop front of an empty range");
963         import std.algorithm : countUntil;
964         import std.array : array;
965         static import std.utf;
966 
967         const(ubyte)[] updateBuf(size_t idx) {
968             const(ubyte)[] tmp;
969             if (buf.empty) {
970                 // do nothing
971             } else if (idx == -1) {
972                 tmp = buf;
973                 buf = null;
974             } else {
975                 idx = () {
976                     if (idx < buf.length) {
977                         return idx + 1;
978                     }
979                     return idx;
980                 }();
981                 tmp = buf[0 .. idx];
982                 buf = buf[idx .. $];
983             }
984 
985             if (!tmp.empty && tmp[$ - 1] == '\n') {
986                 tmp = tmp[0 .. $ - 1];
987             }
988             return tmp;
989         }
990 
991         void drainLine() {
992             void fillBuf() {
993                 if (!range.empty) {
994                     range.popFront;
995                 }
996                 if (!range.empty) {
997                     buf ~= range.front.data;
998                 }
999             }
1000 
1001             size_t idx;
1002             () {
1003                 int cnt;
1004                 do {
1005                     fillBuf();
1006                     idx = buf.countUntil('\n');
1007                     // 2 is a magic number which mean that it at most wait 2x timeout for data
1008                 }
1009                 while (!range.empty && idx == -1 && cnt++ < 2);
1010             }();
1011 
1012             if (idx != -1) {
1013                 auto tmp = updateBuf(idx);
1014                 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1015             }
1016         }
1017 
1018         bool lastLine() {
1019             size_t idx = buf.countUntil('\n');
1020             if (idx == -1)
1021                 return true;
1022 
1023             auto tmp = updateBuf(idx);
1024             line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1025             return false;
1026         }
1027 
1028         line = null;
1029         final switch (st) {
1030         case State.start:
1031             drainLine;
1032             st = State.draining;
1033             break;
1034         case State.draining:
1035             drainLine;
1036             if (range.empty)
1037                 st = State.lastLine;
1038             break;
1039         case State.lastLine:
1040             if (lastLine)
1041                 st = State.lastBuf;
1042             break;
1043         case State.lastBuf:
1044             line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array;
1045             st = State.empty;
1046             break;
1047         case State.empty:
1048             break;
1049         }
1050     }
1051 
1052     bool empty() @safe pure nothrow const @nogc {
1053         return st == State.empty;
1054     }
1055 }
1056 
1057 @("shall drain the process output by line")
1058 unittest {
1059     import std.algorithm : filter, joiner, map;
1060     import std.array : array;
1061 
1062     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill;
1063     auto res = p.process.drainByLineCopy.filter!"!a.empty".array;
1064 
1065     assert(res.length == 3);
1066     assert(res.joiner.count >= 30);
1067     assert(p.wait == 0);
1068     assert(p.terminated);
1069 }
1070 
1071 auto drainByLineCopy(T)(T p) {
1072     return DrainByLineCopyRange!T(p);
1073 }
1074 
1075 /// Drain the process output until it is done executing.
1076 auto drainToNull(T)(T p) {
1077     foreach (l; p.drain()) {
1078     }
1079     return p;
1080 }
1081 
1082 /// Drain the output from the process into an output range.
1083 auto drain(ProcessT, T)(ProcessT p, ref T range) {
1084     foreach (l; p.drain()) {
1085         range.put(l);
1086     }
1087     return p;
1088 }
1089 
1090 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
1091 unittest {
1092     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill;
1093     auto res = p.drain.array;
1094 
1095     // this is just a sanity check. It has to be kind a high because there is
1096     // some wiggleroom allowed
1097     assert(res.count > 1 && res.count <= 50);
1098 
1099     assert(res.filter!(a => a.type == DrainElement.Type.stdout)
1100             .map!(a => a.data)
1101             .joiner
1102             .count == 30);
1103     assert(p.wait == 0);
1104     assert(p.terminated);
1105 }
1106 
1107 @("shall kill the process tree when the timeout is reached")
1108 unittest {
1109     immutable script = makeScript(`#!/bin/bash
1110 sleep 10m
1111 `);
1112     scope (exit)
1113         remove(script);
1114 
1115     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill;
1116     waitUntilChildren(p.osHandle, 1);
1117     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1118     const res = p.process.drain.array;
1119     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1120 
1121     assert(p.wait == -9);
1122     assert(p.terminated);
1123     assert(preChildren == 1);
1124     assert(postChildren == 0);
1125 }
1126 
1127 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
1128     import core.sys.posix.sys.stat;
1129     import std.file : getAttributes, setAttributes, thisExePath;
1130     import std.stdio : File;
1131     import std.path : baseName;
1132     import std.conv : to;
1133 
1134     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
1135 
1136     File(fname, "w").writeln(script);
1137     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
1138     return fname;
1139 }
1140 
1141 /// Wait for p to have num children or fail after 10s.
1142 void waitUntilChildren(RawPid p, int num) {
1143     import std.datetime : Clock;
1144 
1145     const failAt = Clock.currTime + 10.dur!"seconds";
1146     do {
1147         Thread.sleep(50.dur!"msecs");
1148         if (Clock.currTime > failAt)
1149             break;
1150     }
1151     while (makePidMap.getSubMap(p).remove(p).length < num);
1152 }
1153 
1154 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable);
1155 struct AddressMap {
1156     Address begin;
1157     Address end;
1158     NamedType!(string, Tag!"Permission", string.init, TagStringable) perm;
1159 }
1160 
1161 /** An assoc array mapping the path of each shared library loaded by
1162  * the process to the address it is loaded at in the process address space.
1163  */
1164 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow {
1165     import std.stdio : File;
1166     import std.format : formattedRead, format;
1167     import std.algorithm : countUntil;
1168     import std.typecons : tuple;
1169     import std..string : strip;
1170 
1171     typeof(return) rval;
1172 
1173     try {
1174         foreach (l; File(format!"/proc/%s/maps"(pid)).byLine
1175                 .map!(a => tuple(a, a.countUntil('/')))
1176                 .filter!(a => a[1] != -1)) {
1177             try {
1178                 auto p = AbsolutePath(l[0][l[1] .. $].idup);
1179                 auto m = l[0][0 .. l[1]].strip;
1180                 ulong begin;
1181                 ulong end;
1182                 char[] perm;
1183                 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) {
1184                     continue;
1185                 }
1186 
1187                 auto amap = AddressMap(Address(begin), Address(end),
1188                         typeof(AddressMap.init.perm)(perm.idup));
1189                 if (auto v = p in rval) {
1190                     *v ~= amap;
1191                 } else {
1192                     rval[p] = [amap];
1193                 }
1194             } catch (Exception e) {
1195             }
1196         }
1197     } catch (Exception e) {
1198         logger.trace(e.msg).collectException;
1199     }
1200 
1201     return rval;
1202 }
1203 
1204 @("shall read the libraries the process is using")
1205 unittest {
1206     immutable scriptName = makeScript(`#!/bin/bash
1207 sleep 10m &
1208 sleep 10m &
1209 sleep 10m
1210 `);
1211     scope (exit)
1212         remove(scriptName);
1213 
1214     auto p = pipeProcess([scriptName]).sandbox.rcKill;
1215     waitUntilChildren(p.osHandle, 3);
1216     auto res = libs(p.osHandle);
1217     p.kill;
1218 
1219     assert(res.length != 0);
1220     assert(AbsolutePath("/bin/bash") in res);
1221 }
1222 
1223 /** Parses the output from a run of 'ldd' on a binary.
1224  *
1225  * Params:
1226  *  input = an input range of lines which is the output from ldd
1227  *
1228  * Example
1229  * ---
1230  * writeln(parseLddOutput(`
1231  *     linux-vdso.so.1 =>  (0x00007fffbf5fe000)
1232  *     libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)
1233  *     libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)
1234  *     libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)
1235  *     /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)
1236  * `))
1237  * ---
1238  *
1239  * Returns: a dictionary of {path: address} for each library required by the
1240  * specified binary.
1241  */
1242 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) {
1243     import std.regex : regex, matchFirst;
1244     import std.format : formattedRead;
1245 
1246     typeof(return) rval;
1247     const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`);
1248 
1249     foreach (l; input) {
1250         auto m = matchFirst(l, reLinux);
1251         if (m.empty || m.length < 3) {
1252             continue;
1253         }
1254 
1255         try {
1256             ulong addr;
1257             formattedRead!"0x%x"(m["addr"], addr);
1258             rval[AbsolutePath(m["lib"])] = Address(addr);
1259         } catch (Exception e) {
1260         }
1261     }
1262 
1263     return rval;
1264 }
1265 
1266 @("shall parse the output of ldd")
1267 unittest {
1268     auto res = parseLddOutput([
1269             "linux-vdso.so.1 =>  (0x00007fffbf5fe000)",
1270             "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)",
1271             "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)",
1272             "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)",
1273             "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)"
1274             ]);
1275     assert(res.length == 3);
1276     assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080);
1277 }