1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module proc.process; 7 8 import core.sys.posix.signal : SIGKILL; 9 import core.thread : Thread; 10 import core.time : dur, Duration; 11 import logger = std.experimental.logger; 12 import std.algorithm : filter, count, joiner, map; 13 import std.array : appender, empty, array; 14 import std.exception : collectException; 15 import std.stdio : File, fileno, writeln; 16 import std.typecons : Flag, Yes; 17 static import std.process; 18 static import std.stdio; 19 20 import my.gc.refc; 21 import my.from_; 22 import my.path; 23 import my.named_type; 24 25 public import proc.channel; 26 public import proc.pid; 27 public import proc.tty; 28 29 version (unittest) { 30 import std.file : remove; 31 } 32 33 /** Manage a process by reference counting so that it is terminated when the it 34 * stops being used such as the instance going out of scope. 35 */ 36 auto rcKill(T)(T p, int signal = SIGKILL) { 37 return ScopeKill!T(p, signal); 38 } 39 40 // backward compatibility. 41 alias scopeKill = rcKill; 42 43 struct ScopeKill(T) { 44 private { 45 static struct Payload { 46 T process; 47 int signal = SIGKILL; 48 bool hasProcess; 49 50 ~this() @safe { 51 if (hasProcess) 52 process.dispose(); 53 } 54 } 55 56 RefCounted!Payload payload_; 57 } 58 59 alias process this; 60 ref T process() { 61 return payload_.get.process; 62 } 63 64 this(T process, int signal) @safe { 65 payload_ = refCounted(Payload(process, signal, true)); 66 } 67 } 68 69 /// Async process wrapper for a std.process SpawnProcess 70 struct SpawnProcess { 71 import core.sys.posix.signal : SIGKILL; 72 import std.algorithm : among; 73 74 private { 75 enum State { 76 running, 77 terminated, 78 exitCode 79 } 80 81 std.process.Pid process; 82 RawPid pid; 83 int status_; 84 State st; 85 } 86 87 this(std.process.Pid process) @safe { 88 this.process = process; 89 this.pid = process.osHandle.RawPid; 90 } 91 92 ~this() @safe { 93 } 94 95 /// Returns: The raw OS handle for the process ID. 96 RawPid osHandle() nothrow @safe { 97 return pid; 98 } 99 100 /// Kill and cleanup the process. 101 void dispose() @safe { 102 final switch (st) { 103 case State.running: 104 this.kill; 105 this.wait; 106 break; 107 case State.terminated: 108 this.wait; 109 break; 110 case State.exitCode: 111 break; 112 } 113 114 st = State.exitCode; 115 } 116 117 /** Send `signal` to the process. 118 * 119 * Param: 120 * signal = a signal from `core.sys.posix.signal` 121 */ 122 void kill(int signal = SIGKILL) nothrow @trusted { 123 final switch (st) { 124 case State.running: 125 break; 126 case State.terminated: 127 goto case; 128 case State.exitCode: 129 return; 130 } 131 132 try { 133 std.process.kill(process, signal); 134 } catch (Exception e) { 135 } 136 137 st = State.terminated; 138 } 139 140 /// Blocking wait for the process to terminated. 141 /// Returns: the exit status. 142 int wait() @safe { 143 final switch (st) { 144 case State.running: 145 status_ = std.process.wait(process); 146 break; 147 case State.terminated: 148 status_ = std.process.wait(process); 149 break; 150 case State.exitCode: 151 break; 152 } 153 154 st = State.exitCode; 155 156 return status_; 157 } 158 159 /// Non-blocking wait for the process termination. 160 /// Returns: `true` if the process has terminated. 161 bool tryWait() @safe { 162 final switch (st) { 163 case State.running: 164 auto s = std.process.tryWait(process); 165 if (s.terminated) { 166 st = State.exitCode; 167 status_ = s.status; 168 } 169 break; 170 case State.terminated: 171 status_ = std.process.wait(process); 172 st = State.exitCode; 173 break; 174 case State.exitCode: 175 break; 176 } 177 178 return st.among(State.terminated, State.exitCode) != 0; 179 } 180 181 /// Returns: The exit status of the process. 182 int status() @safe { 183 if (st != State.exitCode) { 184 throw new Exception( 185 "Process has not terminated and wait/tryWait been called to collect the exit status"); 186 } 187 return status_; 188 } 189 190 /// Returns: If the process has terminated. 191 bool terminated() @safe { 192 return st.among(State.terminated, State.exitCode) != 0; 193 } 194 } 195 196 /// Async process that do not block on read from stdin/stderr. 197 struct PipeProcess { 198 import std.algorithm : among; 199 import core.sys.posix.signal : SIGKILL; 200 201 private { 202 enum State { 203 running, 204 terminated, 205 exitCode 206 } 207 208 std.process.ProcessPipes process; 209 std.process.Pid pid; 210 211 FileReadChannel stderr_; 212 FileReadChannel stdout_; 213 FileWriteChannel stdin_; 214 int status_; 215 State st; 216 } 217 218 this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe { 219 this.pid = pid; 220 221 this.stdin_ = FileWriteChannel(stdin); 222 this.stdout_ = FileReadChannel(stdout); 223 this.stderr_ = FileReadChannel(stderr); 224 } 225 226 this(std.process.ProcessPipes process, std.process.Redirect r) @safe { 227 this.process = process; 228 this.pid = process.pid; 229 230 if (r & std.process.Redirect.stdin) { 231 stdin_ = FileWriteChannel(this.process.stdin); 232 } 233 if (r & std.process.Redirect.stdout) { 234 stdout_ = FileReadChannel(this.process.stdout); 235 } 236 if (r & std.process.Redirect.stderr) { 237 this.stderr_ = FileReadChannel(this.process.stderr); 238 } 239 } 240 241 /// Returns: The raw OS handle for the process ID. 242 RawPid osHandle() nothrow @safe { 243 return pid.osHandle.RawPid; 244 } 245 246 /// Access to stdout. 247 ref FileWriteChannel stdin() return scope nothrow @safe { 248 return stdin_; 249 } 250 251 /// Access to stdout. 252 ref FileReadChannel stdout() return scope nothrow @safe { 253 return stdout_; 254 } 255 256 /// Access stderr. 257 ref FileReadChannel stderr() return scope nothrow @safe { 258 return stderr_; 259 } 260 261 /// Kill and cleanup the process. 262 void dispose() @safe { 263 final switch (st) { 264 case State.running: 265 this.kill; 266 this.wait; 267 .destroy(process); 268 break; 269 case State.terminated: 270 this.wait; 271 .destroy(process); 272 break; 273 case State.exitCode: 274 break; 275 } 276 277 st = State.exitCode; 278 } 279 280 /** Send `signal` to the process. 281 * 282 * Param: 283 * signal = a signal from `core.sys.posix.signal` 284 */ 285 void kill(int signal = SIGKILL) nothrow @trusted { 286 final switch (st) { 287 case State.running: 288 break; 289 case State.terminated: 290 return; 291 case State.exitCode: 292 return; 293 } 294 295 try { 296 std.process.kill(pid, signal); 297 } catch (Exception e) { 298 } 299 300 st = State.terminated; 301 } 302 303 /// Blocking wait for the process to terminated. 304 /// Returns: the exit status. 305 int wait() @safe { 306 final switch (st) { 307 case State.running: 308 status_ = std.process.wait(pid); 309 break; 310 case State.terminated: 311 status_ = std.process.wait(pid); 312 break; 313 case State.exitCode: 314 break; 315 } 316 317 st = State.exitCode; 318 319 return status_; 320 } 321 322 /// Non-blocking wait for the process termination. 323 /// Returns: `true` if the process has terminated. 324 bool tryWait() @safe { 325 final switch (st) { 326 case State.running: 327 auto s = std.process.tryWait(pid); 328 if (s.terminated) { 329 st = State.exitCode; 330 status_ = s.status; 331 } 332 break; 333 case State.terminated: 334 status_ = std.process.wait(pid); 335 st = State.exitCode; 336 break; 337 case State.exitCode: 338 break; 339 } 340 341 return st.among(State.terminated, State.exitCode) != 0; 342 } 343 344 /// Returns: The exit status of the process. 345 int status() @safe { 346 if (st != State.exitCode) { 347 throw new Exception( 348 "Process has not terminated and wait/tryWait been called to collect the exit status"); 349 } 350 return status_; 351 } 352 353 /// Returns: If the process has terminated. 354 bool terminated() @safe { 355 return st.among(State.terminated, State.exitCode) != 0; 356 } 357 } 358 359 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin, 360 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 361 const string[string] env = null, std.process.Config config = std.process.Config.none, 362 scope const char[] workDir = null) { 363 return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr, 364 env, config, workDir)); 365 } 366 367 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env, 368 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 369 return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin, 370 std.stdio.stdout, std.stdio.stderr, env, config, workDir)); 371 } 372 373 SpawnProcess spawnProcess(scope const(char)[] program, 374 File stdin = std.stdio.stdin, File stdout = std.stdio.stdout, 375 File stderr = std.stdio.stderr, const string[string] env = null, 376 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 377 return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin, 378 stdout, stderr, env, config, workDir)); 379 } 380 381 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin, 382 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 383 scope const string[string] env = null, std.process.Config config = std.process.Config.none, 384 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 385 return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr, 386 env, config, workDir, shellPath)); 387 } 388 389 /// ditto 390 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env, 391 std.process.Config config = std.process.Config.none, 392 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 393 return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath)); 394 } 395 396 PipeProcess pipeProcess(scope const(char[])[] args, 397 std.process.Redirect redirect = std.process.Redirect.all, 398 const string[string] env = null, std.process.Config config = std.process.Config.none, 399 scope const(char)[] workDir = null) @safe { 400 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect); 401 } 402 403 PipeProcess pipeShell(scope const(char)[] command, 404 std.process.Redirect redirect = std.process.Redirect.all, 405 const string[string] env = null, std.process.Config config = std.process.Config.none, 406 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 407 return PipeProcess(std.process.pipeShell(command, redirect, env, config, 408 workDir, shellPath), redirect); 409 } 410 411 /** Moves the process to a separate process group and on exit kill it and all 412 * its children. 413 */ 414 @safe struct Sandbox(ProcessT) { 415 import core.sys.posix.signal : SIGKILL; 416 417 private { 418 ProcessT p; 419 RawPid pid; 420 } 421 422 this(ProcessT p) @safe { 423 import core.sys.posix.unistd : setpgid; 424 425 this.p = p; 426 this.pid = p.osHandle; 427 setpgid(pid, 0); 428 } 429 430 RawPid osHandle() nothrow @safe { 431 return pid; 432 } 433 434 static if (__traits(hasMember, ProcessT, "stdin")) { 435 ref FileWriteChannel stdin() nothrow @safe { 436 return p.stdin; 437 } 438 } 439 440 static if (__traits(hasMember, ProcessT, "stdout")) { 441 ref FileReadChannel stdout() nothrow @safe { 442 return p.stdout; 443 } 444 } 445 446 static if (__traits(hasMember, ProcessT, "stderr")) { 447 ref FileReadChannel stderr() nothrow @safe { 448 return p.stderr; 449 } 450 } 451 452 void dispose() @safe { 453 // this also reaps the children thus cleaning up zombies 454 this.kill; 455 p.dispose; 456 } 457 458 /** Send `signal` to the process. 459 * 460 * Param: 461 * signal = a signal from `core.sys.posix.signal` 462 */ 463 void kill(int signal = SIGKILL) nothrow @safe { 464 // must first retrieve the submap because after the process is killed 465 // its children may have changed. 466 auto pmap = makePidMap.getSubMap(pid); 467 468 p.kill(signal); 469 470 // only kill and reap the children 471 pmap.remove(pid); 472 proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap; 473 } 474 475 int wait() @safe { 476 return p.wait; 477 } 478 479 bool tryWait() @safe { 480 return p.tryWait; 481 } 482 483 int status() @safe { 484 return p.status; 485 } 486 487 bool terminated() @safe { 488 return p.terminated; 489 } 490 } 491 492 auto sandbox(T)(T p) @safe { 493 return Sandbox!T(p); 494 } 495 496 @("shall terminate a group of processes") 497 unittest { 498 import std.datetime.stopwatch : StopWatch, AutoStart; 499 500 immutable scriptName = makeScript(`#!/bin/bash 501 sleep 10m & 502 sleep 10m & 503 sleep 10m 504 `); 505 scope (exit) 506 remove(scriptName); 507 508 auto p = pipeProcess([scriptName]).sandbox.rcKill; 509 waitUntilChildren(p.osHandle, 3); 510 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 511 p.kill; 512 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 513 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 514 515 assert(p.wait == -9); 516 assert(p.terminated); 517 assert(preChildren == 3); 518 assert(postChildren == 0); 519 } 520 521 /** dispose the process after the timeout. 522 */ 523 @safe struct Timeout(ProcessT) { 524 import core.sys.posix.signal : SIGKILL; 525 import core.thread; 526 import std.algorithm : among; 527 import std.datetime : Clock, Duration; 528 529 private { 530 enum Msg { 531 none, 532 stop, 533 status, 534 } 535 536 enum Reply { 537 none, 538 running, 539 normalDeath, 540 killedByTimeout, 541 } 542 543 static struct Payload { 544 ProcessT p; 545 RawPid pid; 546 Background background; 547 Reply backgroundReply; 548 } 549 550 RefCounted!Payload rc; 551 } 552 553 this(ProcessT p, Duration timeout) @trusted { 554 import std.algorithm : move; 555 556 auto pid = p.osHandle; 557 rc = refCounted(Payload(move(p), pid)); 558 rc.get.background = new Background(&rc.get.p, timeout); 559 rc.get.background.isDaemon = true; 560 rc.get.background.start; 561 } 562 563 ~this() @trusted { 564 rc.release; 565 } 566 567 private static class Background : Thread { 568 import core.sync.condition : Condition; 569 import core.sync.mutex : Mutex; 570 571 Duration timeout; 572 ProcessT* p; 573 Mutex mtx; 574 Msg[] msg; 575 Reply reply_; 576 RawPid pid; 577 int signal = SIGKILL; 578 579 this(ProcessT* p, Duration timeout) { 580 this.p = p; 581 this.timeout = timeout; 582 this.mtx = new Mutex(); 583 this.pid = p.osHandle; 584 585 super(&run); 586 } 587 588 void run() { 589 checkProcess(this.pid, this.timeout, this); 590 } 591 592 void put(Msg msg) @trusted nothrow { 593 this.mtx.lock_nothrow(); 594 scope (exit) 595 this.mtx.unlock_nothrow(); 596 this.msg ~= msg; 597 } 598 599 Msg popMsg() @trusted nothrow { 600 this.mtx.lock_nothrow(); 601 scope (exit) 602 this.mtx.unlock_nothrow(); 603 if (msg.empty) 604 return Msg.none; 605 auto rval = msg[$ - 1]; 606 msg = msg[0 .. $ - 1]; 607 return rval; 608 } 609 610 void setReply(Reply reply_) @trusted nothrow { 611 this.mtx.lock_nothrow(); 612 scope (exit) 613 this.mtx.unlock_nothrow(); 614 this.reply_ = reply_; 615 } 616 617 Reply reply() @trusted nothrow { 618 this.mtx.lock_nothrow(); 619 scope (exit) 620 this.mtx.unlock_nothrow(); 621 return reply_; 622 } 623 624 void setSignal(int signal) @trusted nothrow { 625 this.mtx.lock_nothrow(); 626 scope (exit) 627 this.mtx.unlock_nothrow(); 628 this.signal = signal; 629 } 630 631 void kill() @trusted nothrow { 632 this.mtx.lock_nothrow(); 633 scope (exit) 634 this.mtx.unlock_nothrow(); 635 p.kill(signal); 636 } 637 } 638 639 private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow { 640 import std.algorithm : max, min; 641 import std.variant : Variant; 642 static import core.sys.posix.signal; 643 644 const stopAt = Clock.currTime + timeout; 645 // the purpose is to poll the process often "enough" that if it 646 // terminates early `Process` detects it fast enough. 1000 is chosen 647 // because it "feels good". the purpose 648 auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs"; 649 650 bool forceStop; 651 bool running = true; 652 while (running && Clock.currTime < stopAt) { 653 const msg = bg.popMsg; 654 655 final switch (msg) { 656 case Msg.none: 657 () @trusted { Thread.sleep(sleepInterval); }(); 658 break; 659 case Msg.stop: 660 forceStop = true; 661 running = false; 662 break; 663 case Msg.status: 664 bg.setReply(Reply.running); 665 break; 666 } 667 668 () @trusted { 669 if (core.sys.posix.signal.kill(p, 0) == -1) { 670 running = false; 671 } 672 }(); 673 } 674 675 // may be children alive thus must ensure that the whole process tree 676 // is killed if this is a sandbox with a timeout. 677 bg.kill(); 678 679 if (!forceStop && Clock.currTime >= stopAt) { 680 bg.setReply(Reply.killedByTimeout); 681 } else { 682 bg.setReply(Reply.normalDeath); 683 } 684 } 685 686 RawPid osHandle() nothrow @trusted { 687 return rc.get.pid; 688 } 689 690 static if (__traits(hasMember, ProcessT, "stdin")) { 691 ref FileWriteChannel stdin() nothrow @safe { 692 return rc.get.p.stdin; 693 } 694 } 695 696 static if (__traits(hasMember, ProcessT, "stdout")) { 697 ref FileReadChannel stdout() nothrow @safe { 698 return rc.get.p.stdout; 699 } 700 } 701 702 static if (__traits(hasMember, ProcessT, "stderr")) { 703 ref FileReadChannel stderr() nothrow @trusted { 704 return rc.get.p.stderr; 705 } 706 } 707 708 void dispose() @trusted { 709 if (rc.get.backgroundReply.among(Reply.none, Reply.running)) { 710 rc.get.background.put(Msg.stop); 711 rc.get.background.join; 712 rc.get.backgroundReply = rc.get.background.reply; 713 } 714 rc.get.p.dispose; 715 } 716 717 /** Send `signal` to the process. 718 * 719 * Param: 720 * signal = a signal from `core.sys.posix.signal` 721 */ 722 void kill(int signal = SIGKILL) nothrow @trusted { 723 rc.get.background.setSignal(signal); 724 rc.get.background.kill(); 725 } 726 727 int wait() @trusted { 728 while (!this.tryWait) { 729 Thread.sleep(20.dur!"msecs"); 730 } 731 return rc.get.p.wait; 732 } 733 734 bool tryWait() @trusted { 735 return rc.get.p.tryWait; 736 } 737 738 int status() @trusted { 739 return rc.get.p.status; 740 } 741 742 bool terminated() @trusted { 743 return rc.get.p.terminated; 744 } 745 746 bool timeoutTriggered() @trusted { 747 if (rc.get.backgroundReply.among(Reply.none, Reply.running)) { 748 rc.get.background.put(Msg.status); 749 rc.get.backgroundReply = rc.get.background.reply; 750 } 751 return rc.get.backgroundReply == Reply.killedByTimeout; 752 } 753 } 754 755 auto timeout(T)(T p, Duration timeout_) @trusted { 756 return Timeout!T(p, timeout_); 757 } 758 759 /// Returns when the process has pending data. 760 void waitForPendingData(ProcessT)(Process p) { 761 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 762 Thread.sleep(20.dur!"msecs"); 763 } 764 } 765 766 @("shall kill the process after the timeout") 767 unittest { 768 import std.datetime.stopwatch : StopWatch, AutoStart; 769 770 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill; 771 auto sw = StopWatch(AutoStart.yes); 772 p.wait; 773 sw.stop; 774 775 assert(sw.peek >= 100.dur!"msecs"); 776 assert(sw.peek <= 500.dur!"msecs"); 777 assert(p.wait == -9); 778 assert(p.terminated); 779 assert(p.status == -9); 780 assert(p.timeoutTriggered); 781 } 782 783 struct DrainElement { 784 enum Type { 785 stdout, 786 stderr, 787 } 788 789 Type type; 790 const(ubyte)[] data; 791 792 /// Returns: iterates the data as an input range. 793 auto byUTF8() @safe pure nothrow const @nogc { 794 static import std.utf; 795 796 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 797 } 798 799 bool empty() @safe pure nothrow const @nogc { 800 return data.length == 0; 801 } 802 } 803 804 /** A range that drains a process stdout/stderr until it terminates. 805 * 806 * There may be `DrainElement` that are empty. 807 */ 808 struct DrainRange(ProcessT) { 809 private { 810 enum State { 811 start, 812 draining, 813 lastStdout, 814 lastStderr, 815 lastElement, 816 empty, 817 } 818 819 ProcessT p; 820 DrainElement front_; 821 State st; 822 ubyte[] buf; 823 } 824 825 this(ProcessT p) { 826 this.p = p; 827 this.buf = new ubyte[4096]; 828 } 829 830 DrainElement front() @safe pure nothrow const @nogc { 831 assert(!empty, "Can't get front of an empty range"); 832 return front_; 833 } 834 835 void popFront() @safe { 836 assert(!empty, "Can't pop front of an empty range"); 837 838 static bool isAnyPipeOpen(ref ProcessT p) { 839 return p.stdout.isOpen || p.stderr.isOpen; 840 } 841 842 DrainElement readData(ref ProcessT p) @safe { 843 if (p.stderr.hasPendingData) { 844 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf)); 845 } else if (p.stdout.hasPendingData) { 846 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf)); 847 } 848 return DrainElement.init; 849 } 850 851 DrainElement waitUntilData() @safe { 852 import std.datetime : Clock; 853 854 // may livelock if the process never terminates and never writes to 855 // the terminal. timeout ensure that it sooner or later is break 856 // the loop. This is important if the drain is part of a timeout 857 // wrapping. 858 859 const timeout = 100.dur!"msecs"; 860 const stopAt = Clock.currTime + timeout; 861 const sleepFor = timeout / 20; 862 const useSleep = Clock.currTime + sleepFor; 863 bool running = true; 864 while (running) { 865 const now = Clock.currTime; 866 867 running = (now < stopAt) && isAnyPipeOpen(p); 868 869 auto bufRead = readData(p); 870 871 if (!bufRead.empty) { 872 return DrainElement(bufRead.type, bufRead.data.dup); 873 } else if (running && now > useSleep && bufRead.empty) { 874 import core.thread : Thread; 875 876 () @trusted { Thread.sleep(sleepFor); }(); 877 } 878 } 879 880 return DrainElement.init; 881 } 882 883 front_ = DrainElement.init; 884 885 final switch (st) { 886 case State.start: 887 st = State.draining; 888 front_ = waitUntilData; 889 break; 890 case State.draining: 891 if (p.terminated) { 892 st = State.lastStdout; 893 } else if (isAnyPipeOpen(p)) { 894 front_ = waitUntilData(); 895 } else { 896 st = State.lastStdout; 897 } 898 break; 899 case State.lastStdout: 900 if (p.stdout.hasPendingData) { 901 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup); 902 } else { 903 st = State.lastStderr; 904 } 905 break; 906 case State.lastStderr: 907 if (p.stderr.hasPendingData) { 908 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup); 909 } else { 910 st = State.lastElement; 911 } 912 break; 913 case State.lastElement: 914 st = State.empty; 915 break; 916 case State.empty: 917 break; 918 } 919 } 920 921 bool empty() @safe pure nothrow const @nogc { 922 return st == State.empty; 923 } 924 } 925 926 /// Drain a process pipe until empty. 927 auto drain(T)(T p) { 928 return DrainRange!T(p); 929 } 930 931 /// Read the data from a ReadChannel by line. 932 struct DrainByLineCopyRange(ProcessT) { 933 private { 934 enum State { 935 start, 936 draining, 937 lastLine, 938 lastBuf, 939 empty, 940 } 941 942 ProcessT process; 943 DrainRange!ProcessT range; 944 State st; 945 const(ubyte)[] buf; 946 const(char)[] line; 947 } 948 949 this(ProcessT p) { 950 process = p; 951 range = p.drain; 952 } 953 954 string front() @trusted pure nothrow const @nogc { 955 import std.exception : assumeUnique; 956 957 assert(!empty, "Can't get front of an empty range"); 958 return line.assumeUnique; 959 } 960 961 void popFront() @safe { 962 assert(!empty, "Can't pop front of an empty range"); 963 import std.algorithm : countUntil; 964 import std.array : array; 965 static import std.utf; 966 967 const(ubyte)[] updateBuf(size_t idx) { 968 const(ubyte)[] tmp; 969 if (buf.empty) { 970 // do nothing 971 } else if (idx == -1) { 972 tmp = buf; 973 buf = null; 974 } else { 975 idx = () { 976 if (idx < buf.length) { 977 return idx + 1; 978 } 979 return idx; 980 }(); 981 tmp = buf[0 .. idx]; 982 buf = buf[idx .. $]; 983 } 984 985 if (!tmp.empty && tmp[$ - 1] == '\n') { 986 tmp = tmp[0 .. $ - 1]; 987 } 988 return tmp; 989 } 990 991 void drainLine() { 992 void fillBuf() { 993 if (!range.empty) { 994 range.popFront; 995 } 996 if (!range.empty) { 997 buf ~= range.front.data; 998 } 999 } 1000 1001 size_t idx; 1002 () { 1003 int cnt; 1004 do { 1005 fillBuf(); 1006 idx = buf.countUntil('\n'); 1007 // 2 is a magic number which mean that it at most wait 2x timeout for data 1008 } 1009 while (!range.empty && idx == -1 && cnt++ < 2); 1010 }(); 1011 1012 if (idx != -1) { 1013 auto tmp = updateBuf(idx); 1014 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1015 } 1016 } 1017 1018 bool lastLine() { 1019 size_t idx = buf.countUntil('\n'); 1020 if (idx == -1) 1021 return true; 1022 1023 auto tmp = updateBuf(idx); 1024 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1025 return false; 1026 } 1027 1028 line = null; 1029 final switch (st) { 1030 case State.start: 1031 drainLine; 1032 st = State.draining; 1033 break; 1034 case State.draining: 1035 drainLine; 1036 if (range.empty) 1037 st = State.lastLine; 1038 break; 1039 case State.lastLine: 1040 if (lastLine) 1041 st = State.lastBuf; 1042 break; 1043 case State.lastBuf: 1044 line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array; 1045 st = State.empty; 1046 break; 1047 case State.empty: 1048 break; 1049 } 1050 } 1051 1052 bool empty() @safe pure nothrow const @nogc { 1053 return st == State.empty; 1054 } 1055 } 1056 1057 @("shall drain the process output by line") 1058 unittest { 1059 import std.algorithm : filter, joiner, map; 1060 import std.array : array; 1061 1062 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill; 1063 auto res = p.process.drainByLineCopy.filter!"!a.empty".array; 1064 1065 assert(res.length == 3); 1066 assert(res.joiner.count >= 30); 1067 assert(p.wait == 0); 1068 assert(p.terminated); 1069 } 1070 1071 auto drainByLineCopy(T)(T p) { 1072 return DrainByLineCopyRange!T(p); 1073 } 1074 1075 /// Drain the process output until it is done executing. 1076 auto drainToNull(T)(T p) { 1077 foreach (l; p.drain()) { 1078 } 1079 return p; 1080 } 1081 1082 /// Drain the output from the process into an output range. 1083 auto drain(ProcessT, T)(ProcessT p, ref T range) { 1084 foreach (l; p.drain()) { 1085 range.put(l); 1086 } 1087 return p; 1088 } 1089 1090 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 1091 unittest { 1092 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill; 1093 auto res = p.drain.array; 1094 1095 // this is just a sanity check. It has to be kind a high because there is 1096 // some wiggleroom allowed 1097 assert(res.count > 1 && res.count <= 50); 1098 1099 assert(res.filter!(a => a.type == DrainElement.Type.stdout) 1100 .map!(a => a.data) 1101 .joiner 1102 .count == 30); 1103 assert(p.wait == 0); 1104 assert(p.terminated); 1105 } 1106 1107 @("shall kill the process tree when the timeout is reached") 1108 unittest { 1109 immutable script = makeScript(`#!/bin/bash 1110 sleep 10m 1111 `); 1112 scope (exit) 1113 remove(script); 1114 1115 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill; 1116 waitUntilChildren(p.osHandle, 1); 1117 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1118 const res = p.process.drain.array; 1119 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1120 1121 assert(p.wait == -9); 1122 assert(p.terminated); 1123 assert(preChildren == 1); 1124 assert(postChildren == 0); 1125 } 1126 1127 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 1128 import core.sys.posix.sys.stat; 1129 import std.file : getAttributes, setAttributes, thisExePath; 1130 import std.stdio : File; 1131 import std.path : baseName; 1132 import std.conv : to; 1133 1134 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 1135 1136 File(fname, "w").writeln(script); 1137 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 1138 return fname; 1139 } 1140 1141 /// Wait for p to have num children or fail after 10s. 1142 void waitUntilChildren(RawPid p, int num) { 1143 import std.datetime : Clock; 1144 1145 const failAt = Clock.currTime + 10.dur!"seconds"; 1146 do { 1147 Thread.sleep(50.dur!"msecs"); 1148 if (Clock.currTime > failAt) 1149 break; 1150 } 1151 while (makePidMap.getSubMap(p).remove(p).length < num); 1152 } 1153 1154 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable); 1155 struct AddressMap { 1156 Address begin; 1157 Address end; 1158 NamedType!(string, Tag!"Permission", string.init, TagStringable) perm; 1159 } 1160 1161 /** An assoc array mapping the path of each shared library loaded by 1162 * the process to the address it is loaded at in the process address space. 1163 */ 1164 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow { 1165 import std.stdio : File; 1166 import std.format : formattedRead, format; 1167 import std.algorithm : countUntil; 1168 import std.typecons : tuple; 1169 import std..string : strip; 1170 1171 typeof(return) rval; 1172 1173 try { 1174 foreach (l; File(format!"/proc/%s/maps"(pid)).byLine 1175 .map!(a => tuple(a, a.countUntil('/'))) 1176 .filter!(a => a[1] != -1)) { 1177 try { 1178 auto p = AbsolutePath(l[0][l[1] .. $].idup); 1179 auto m = l[0][0 .. l[1]].strip; 1180 ulong begin; 1181 ulong end; 1182 char[] perm; 1183 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) { 1184 continue; 1185 } 1186 1187 auto amap = AddressMap(Address(begin), Address(end), 1188 typeof(AddressMap.init.perm)(perm.idup)); 1189 if (auto v = p in rval) { 1190 *v ~= amap; 1191 } else { 1192 rval[p] = [amap]; 1193 } 1194 } catch (Exception e) { 1195 } 1196 } 1197 } catch (Exception e) { 1198 logger.trace(e.msg).collectException; 1199 } 1200 1201 return rval; 1202 } 1203 1204 @("shall read the libraries the process is using") 1205 unittest { 1206 immutable scriptName = makeScript(`#!/bin/bash 1207 sleep 10m & 1208 sleep 10m & 1209 sleep 10m 1210 `); 1211 scope (exit) 1212 remove(scriptName); 1213 1214 auto p = pipeProcess([scriptName]).sandbox.rcKill; 1215 waitUntilChildren(p.osHandle, 3); 1216 auto res = libs(p.osHandle); 1217 p.kill; 1218 1219 assert(res.length != 0); 1220 assert(AbsolutePath("/bin/bash") in res); 1221 } 1222 1223 /** Parses the output from a run of 'ldd' on a binary. 1224 * 1225 * Params: 1226 * input = an input range of lines which is the output from ldd 1227 * 1228 * Example 1229 * --- 1230 * writeln(parseLddOutput(` 1231 * linux-vdso.so.1 => (0x00007fffbf5fe000) 1232 * libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000) 1233 * libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000) 1234 * libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000) 1235 * /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000) 1236 * `)) 1237 * --- 1238 * 1239 * Returns: a dictionary of {path: address} for each library required by the 1240 * specified binary. 1241 */ 1242 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) { 1243 import std.regex : regex, matchFirst; 1244 import std.format : formattedRead; 1245 1246 typeof(return) rval; 1247 const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`); 1248 1249 foreach (l; input) { 1250 auto m = matchFirst(l, reLinux); 1251 if (m.empty || m.length < 3) { 1252 continue; 1253 } 1254 1255 try { 1256 ulong addr; 1257 formattedRead!"0x%x"(m["addr"], addr); 1258 rval[AbsolutePath(m["lib"])] = Address(addr); 1259 } catch (Exception e) { 1260 } 1261 } 1262 1263 return rval; 1264 } 1265 1266 @("shall parse the output of ldd") 1267 unittest { 1268 auto res = parseLddOutput([ 1269 "linux-vdso.so.1 => (0x00007fffbf5fe000)", 1270 "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)", 1271 "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)", 1272 "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)", 1273 "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)" 1274 ]); 1275 assert(res.length == 3); 1276 assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080); 1277 }