1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 This is based on webfreak's 7 [fswatch](git@github.com:WebFreak001/FSWatch.git). I had problems with the 8 API as it where because I needed to be able to watch multiple directories, 9 filter what files are to be watched and to be robust against broken symlinks. 10 11 Lets say you want to watch a directory for changes and add all directories to 12 be watched too. 13 14 --- 15 auto fw = fileWatch(); 16 fw.watchRecurse("my_dir"); 17 while (true) { 18 auto ev = fw.wait; 19 foreach (e; ev) { 20 e.match!( 21 (Event.Access x) => writeln(x), 22 (Event.Attribute x) => writeln(x), 23 (Event.CloseWrite x) => writeln(x), 24 (Event.CloseNoWrite x) => writeln(x), 25 (Event.Create x) { fw.watchRecurse(x.path); }, 26 (Event.Delete x) => writeln(x), 27 (Event.DeleteSelf x) => writeln(x), 28 (Event.Modify x) => writeln(x), 29 (Event.MoveSelf x) => writeln(x), 30 (Event.Rename x) => writeln(x), 31 (Event.Open x) => writeln(x), 32 ); 33 } 34 } 35 --- 36 */ 37 module my.fswatch; 38 39 import core.sys.linux.errno : errno; 40 import core.sys.linux.fcntl : fcntl, F_SETFD, FD_CLOEXEC; 41 import core.sys.linux.sys.inotify : inotify_rm_watch, inotify_init1, inotify_add_watch, inotify_event, IN_CLOEXEC, 42 IN_NONBLOCK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE, 43 IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, 44 IN_CREATE, IN_DELETE, IN_DELETE_SELF, IN_MOVE_SELF, IN_UNMOUNT, IN_IGNORED, IN_EXCL_UNLINK; 45 import core.sys.linux.unistd : close, read; 46 import core.sys.posix.poll : pollfd, poll, POLLIN, POLLNVAL; 47 import core.thread : Thread; 48 import core.time : dur, Duration; 49 import logger = std.experimental.logger; 50 import std.array : appender, empty, array; 51 import std.conv : to; 52 import std.file : DirEntry, isDir, dirEntries, rmdirRecurse, write, append, 53 rename, remove, exists, SpanMode, mkdir, rmdir; 54 import std.path : buildPath; 55 import std.range : isInputRange; 56 import std..string : toStringz, fromStringz; 57 import std.exception : collectException; 58 59 import sumtype; 60 61 import my.named_type; 62 import my.optional; 63 import my.path : AbsolutePath, Path; 64 import my.set; 65 66 struct Event { 67 /// An overflow occured. Unknown what events actually triggered. 68 static struct Overflow { 69 } 70 71 /// File was accessed (e.g., read(2), execve(2)). 72 static struct Access { 73 AbsolutePath path; 74 } 75 76 /** Metadata changed—for example, permissions (e.g., chmod(2)), timestamps 77 * (e.g., utimensat(2)), extended attributes (setxattr(2)), link count 78 * (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)), 79 * and user/group ID (e.g., chown(2)). 80 */ 81 static struct Attribute { 82 AbsolutePath path; 83 } 84 85 /// File opened for writing was closed. 86 static struct CloseWrite { 87 AbsolutePath path; 88 } 89 90 /// File or directory not opened for writing was closed. 91 static struct CloseNoWrite { 92 AbsolutePath path; 93 } 94 95 /** File/directory created in watched directory (e.g., open(2) O_CREAT, 96 * mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket). 97 */ 98 static struct Create { 99 AbsolutePath path; 100 } 101 102 /// File/directory deleted from watched directory. 103 static struct Delete { 104 AbsolutePath path; 105 } 106 107 /** Watched file/directory was itself deleted. (This event also occurs if 108 * an object is moved to another filesystem, since mv(1) in effect copies 109 * the file to the other filesystem and then deletes it from the original 110 * filesys‐ tem.) In addition, an IN_IGNORED event will subsequently be 111 * generated for the watch descriptor. 112 */ 113 static struct DeleteSelf { 114 AbsolutePath path; 115 } 116 117 /// File was modified (e.g., write(2), truncate(2)). 118 static struct Modify { 119 AbsolutePath path; 120 } 121 122 /// Watched file/directory was itself moved. 123 static struct MoveSelf { 124 AbsolutePath path; 125 } 126 127 /// Occurs when a file or folder inside a folder is renamed. 128 static struct Rename { 129 AbsolutePath from; 130 AbsolutePath to; 131 } 132 133 /// File or directory was opened. 134 static struct Open { 135 AbsolutePath path; 136 } 137 } 138 139 alias FileChangeEvent = SumType!(Event.Access, Event.Attribute, Event.CloseWrite, 140 Event.CloseNoWrite, Event.Create, Event.Delete, Event.DeleteSelf, 141 Event.Modify, Event.MoveSelf, Event.Rename, Event.Open, Event.Overflow); 142 143 /// Construct a FileWatch. 144 auto fileWatch(FileWatch.FollowSymlink follow = FileWatch.FollowSymlink.init) { 145 int fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); 146 if (fd == -1) { 147 throw new Exception( 148 "inotify_init1 returned invalid file descriptor. Error code " ~ errno.to!string); 149 } 150 return FileWatch(fd, follow); 151 } 152 153 /// Listens for create/modify/removal of files and directories. 154 enum ContentEvents = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY 155 | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_EXCL_UNLINK | IN_CLOSE_WRITE; 156 157 /// Listen for events that change the metadata. 158 enum MetadataEvents = IN_ACCESS | IN_ATTRIB | IN_OPEN | IN_CLOSE_NOWRITE | IN_EXCL_UNLINK; 159 160 /** An instance of a FileWatcher 161 */ 162 struct FileWatch { 163 import std.functional : toDelegate; 164 165 alias FollowSymlink = NamedType!(bool, Tag!"FollowSymlink", bool.init, TagStringable); 166 167 private { 168 FdPoller poller; 169 int fd; 170 ubyte[1024 * 4] eventBuffer; // 4kb buffer for events 171 struct FDInfo { 172 int wd; 173 bool watched; 174 Path path; 175 176 this(this) { 177 } 178 } 179 180 FDInfo[int] directoryMap; // map every watch descriptor to a directory 181 182 FollowSymlink follow; 183 } 184 185 private this(int fd, FollowSymlink follow) { 186 this.fd = fd; 187 this.follow = follow; 188 poller.put(FdPoll(fd), [PollEvent.in_]); 189 } 190 191 ~this() { 192 if (fd) { 193 foreach (fdinfo; directoryMap.byValue) { 194 if (fdinfo.watched) 195 inotify_rm_watch(fd, fdinfo.wd); 196 } 197 close(fd); 198 } 199 } 200 201 /** Add a path to watch for events. 202 * 203 * Params: 204 * path = path to watch 205 * events = events to watch for. See man inotify and core.sys.linux.sys.inotify. 206 * 207 * Returns: true if the path was successfully added. 208 */ 209 bool watch(Path path, uint events = ContentEvents) { 210 import my.file : followSymlink; 211 import my.optional; 212 213 if (follow) 214 path = followSymlink(path).orElse(path); 215 216 const wd = inotify_add_watch(fd, path.toStringz, events); 217 if (wd != -1) { 218 const fc = fcntl(fd, F_SETFD, FD_CLOEXEC); 219 if (fc != -1) { 220 directoryMap[wd] = FDInfo(wd, true, path); 221 return true; 222 } 223 } 224 225 return false; 226 } 227 228 /// 229 bool watch(string p, uint events = ContentEvents) { 230 return watch(Path(p)); 231 } 232 233 private static bool allFiles(string p) { 234 return true; 235 } 236 237 /** Recursively add the path and all its subdirectories and files to be watched. 238 * 239 * Params: 240 * pred = only those files and directories that `pred` returns true for are watched, by default every file/directory. 241 * root = directory to watch together with its content and subdirectories. 242 * events = events to watch for. See man inotify and core.sys.linux.sys.inotify. 243 * 244 * Returns: paths that failed to be added. 245 */ 246 AbsolutePath[] watchRecurse(Path root, uint events = ContentEvents, 247 bool delegate(string) pred = toDelegate(&allFiles)) { 248 import std.algorithm : filter; 249 import my.file : existsAnd; 250 251 auto failed = appender!(AbsolutePath[])(); 252 253 if (!watch(root, events)) { 254 failed.put(AbsolutePath(root)); 255 } 256 257 if (!existsAnd!isDir(root)) { 258 return failed.data; 259 } 260 261 auto dirs = [AbsolutePath(root)]; 262 Set!AbsolutePath visited; 263 while (!dirs.empty) { 264 auto front = dirs[0]; 265 dirs = dirs[1 .. $]; 266 if (front in visited) 267 continue; 268 visited.add(front); 269 270 try { 271 foreach (p; dirEntries(front, SpanMode.shallow).filter!(a => pred(a.name))) { 272 if (!watch(Path(p.name), events)) { 273 failed.put(AbsolutePath(p.name)); 274 } 275 if (existsAnd!isDir(Path(p.name))) { 276 dirs ~= AbsolutePath(p.name); 277 } 278 } 279 } catch (Exception e) { 280 () @trusted { logger.trace(e); }(); 281 logger.trace(e.msg); 282 failed.put(AbsolutePath(front)); 283 } 284 } 285 286 return failed.data; 287 } 288 289 /// 290 AbsolutePath[] watchRecurse(string root, uint events = ContentEvents, 291 bool delegate(string) pred = toDelegate(&allFiles)) { 292 return watchRecurse(Path(root), events, pred); 293 } 294 295 /** The events that have occured since last query. 296 * 297 * Params: 298 * timeout = max time to wait for events. 299 * 300 * Returns: the events that has occured to the watched paths. 301 */ 302 FileChangeEvent[] getEvents(Duration timeout = Duration.zero) { 303 import std.algorithm : min; 304 305 FileChangeEvent[] events; 306 if (!fd) 307 return events; 308 309 auto res = poller.wait(timeout); 310 311 if (res.empty) { 312 return events; 313 } 314 315 if (res[0].status[PollStatus.nval]) { 316 throw new Exception("Failed to poll events. File descriptor not open " ~ fd.to!string); 317 } 318 319 if (!res[0].status[PollStatus.in_]) { 320 // no events to read 321 return events; 322 } 323 324 const receivedBytes = read(fd, eventBuffer.ptr, eventBuffer.length); 325 int i = 0; 326 AbsolutePath[uint] cookie; 327 while (true) { 328 auto info = cast(inotify_event*)(eventBuffer.ptr + i); 329 330 if (info.wd == -1) { 331 events ~= FileChangeEvent(Event.Overflow.init); 332 } 333 if (info.wd !in directoryMap) 334 continue; 335 336 auto fname = () { 337 string fileName = info.name.ptr.fromStringz.idup; 338 return AbsolutePath(buildPath(directoryMap[info.wd].path, fileName)); 339 }(); 340 341 if ((info.mask & IN_MOVED_TO) == 0) { 342 if (auto v = info.cookie in cookie) { 343 events ~= FileChangeEvent(Event.Delete(*v)); 344 cookie.remove(info.cookie); 345 } 346 } 347 348 if ((info.mask & IN_ACCESS) != 0) { 349 events ~= FileChangeEvent(Event.Access(fname)); 350 } 351 352 if ((info.mask & IN_ATTRIB) != 0) { 353 events ~= FileChangeEvent(Event.Attribute(fname)); 354 } 355 356 if ((info.mask & IN_CLOSE_WRITE) != 0) { 357 events ~= FileChangeEvent(Event.CloseWrite(fname)); 358 } 359 360 if ((info.mask & IN_CLOSE_NOWRITE) != 0) { 361 events ~= FileChangeEvent(Event.CloseNoWrite(fname)); 362 } 363 364 if ((info.mask & IN_CREATE) != 0) { 365 events ~= FileChangeEvent(Event.Create(fname)); 366 } 367 368 if ((info.mask & IN_DELETE) != 0) { 369 events ~= FileChangeEvent(Event.Delete(fname)); 370 } 371 372 if ((info.mask & IN_DELETE_SELF) != 0) { 373 // must go via the mapping or there may be trailing junk in fname. 374 events ~= FileChangeEvent(Event.DeleteSelf(directoryMap[info.wd].path.AbsolutePath)); 375 } 376 377 if ((info.mask & IN_MODIFY) != 0) { 378 events ~= FileChangeEvent(Event.Modify(fname)); 379 } 380 381 if ((info.mask & IN_MOVE_SELF) != 0) { 382 // must go via the mapping or there may be trailing junk in fname. 383 events ~= FileChangeEvent(Event.MoveSelf(directoryMap[info.wd].path.AbsolutePath)); 384 } 385 386 if ((info.mask & IN_MOVED_FROM) != 0) { 387 cookie[info.cookie] = fname; 388 } 389 390 if ((info.mask & IN_MOVED_TO) != 0) { 391 if (auto v = info.cookie in cookie) { 392 events ~= FileChangeEvent(Event.Rename(*v, fname)); 393 cookie.remove(info.cookie); 394 } else { 395 events ~= FileChangeEvent(Event.Create(fname)); 396 } 397 } 398 399 if ((info.mask & IN_DELETE_SELF) != 0 || (info.mask & IN_MOVE_SELF) != 0) { 400 inotify_rm_watch(fd, info.wd); 401 directoryMap[info.wd].watched = false; 402 } 403 404 i += inotify_event.sizeof + info.len; 405 406 if (i >= receivedBytes) 407 break; 408 } 409 410 foreach (c; cookie.byValue) { 411 events ~= FileChangeEvent(Event.Delete(AbsolutePath(c))); 412 } 413 414 return events; 415 } 416 } 417 418 /// 419 unittest { 420 import core.thread; 421 422 if (exists("test")) 423 rmdirRecurse("test"); 424 scope (exit) { 425 if (exists("test")) 426 rmdirRecurse("test"); 427 } 428 429 auto watcher = fileWatch(); 430 431 mkdir("test"); 432 assert(watcher.watch("test")); 433 434 write("test/a.txt", "abc"); 435 auto ev = watcher.getEvents(5.dur!"seconds"); 436 assert(ev.length > 0); 437 assert(ev[0].tryMatch!((Event.Create x) { 438 assert(x.path == AbsolutePath("test/a.txt")); 439 return true; 440 })); 441 442 append("test/a.txt", "def"); 443 ev = watcher.getEvents(5.dur!"seconds"); 444 assert(ev.length > 0); 445 assert(ev[0].tryMatch!((Event.Modify x) { 446 assert(x.path == AbsolutePath("test/a.txt")); 447 return true; 448 })); 449 450 rename("test/a.txt", "test/b.txt"); 451 ev = watcher.getEvents(5.dur!"seconds"); 452 assert(ev.length > 0); 453 assert(ev[0].tryMatch!((Event.Rename x) { 454 assert(x.from == AbsolutePath("test/a.txt")); 455 assert(x.to == AbsolutePath("test/b.txt")); 456 return true; 457 })); 458 459 remove("test/b.txt"); 460 ev = watcher.getEvents(5.dur!"seconds"); 461 assert(ev.length > 0); 462 assert(ev[0].tryMatch!((Event.Delete x) { 463 assert(x.path == AbsolutePath("test/b.txt")); 464 return true; 465 })); 466 467 rmdirRecurse("test"); 468 ev = watcher.getEvents(5.dur!"seconds"); 469 assert(ev.length > 0); 470 assert(ev[0].tryMatch!((Event.DeleteSelf x) { 471 assert(x.path == AbsolutePath("test")); 472 return true; 473 })); 474 } 475 476 /// 477 unittest { 478 import std.algorithm : canFind; 479 480 if (exists("test2")) 481 rmdirRecurse("test2"); 482 if (exists("test3")) 483 rmdirRecurse("test3"); 484 scope (exit) { 485 if (exists("test2")) 486 rmdirRecurse("test2"); 487 if (exists("test3")) 488 rmdirRecurse("test3"); 489 } 490 491 auto watcher = fileWatch(); 492 mkdir("test2"); 493 assert(watcher.watchRecurse("test2").length == 0); 494 495 write("test2/a.txt", "abc"); 496 auto ev = watcher.getEvents(5.dur!"seconds"); 497 assert(ev.length == 3); 498 assert(ev[0].tryMatch!((Event.Create x) { 499 assert(x.path == AbsolutePath("test2/a.txt")); 500 return true; 501 })); 502 assert(ev[1].tryMatch!((Event.Modify x) { 503 assert(x.path == AbsolutePath("test2/a.txt")); 504 return true; 505 })); 506 assert(ev[2].tryMatch!((Event.CloseWrite x) { 507 assert(x.path == AbsolutePath("test2/a.txt")); 508 return true; 509 })); 510 511 rename("test2/a.txt", "./testfile-a.txt"); 512 ev = watcher.getEvents(5.dur!"seconds"); 513 assert(ev.length == 1); 514 assert(ev[0].tryMatch!((Event.Delete x) { 515 assert(x.path == AbsolutePath("test2/a.txt")); 516 return true; 517 })); 518 519 rename("./testfile-a.txt", "test2/b.txt"); 520 ev = watcher.getEvents(5.dur!"seconds"); 521 assert(ev.length == 1); 522 assert(ev[0].tryMatch!((Event.Create x) { 523 assert(x.path == AbsolutePath("test2/b.txt")); 524 return true; 525 })); 526 527 remove("test2/b.txt"); 528 ev = watcher.getEvents(5.dur!"seconds"); 529 assert(ev.length == 1); 530 assert(ev[0].tryMatch!((Event.Delete x) { 531 assert(x.path == AbsolutePath("test2/b.txt")); 532 return true; 533 })); 534 535 mkdir("test2/mydir"); 536 rmdir("test2/mydir"); 537 ev = watcher.getEvents(5.dur!"seconds"); 538 assert(ev.length == 2); 539 assert(ev[0].tryMatch!((Event.Create x) { 540 assert(x.path == AbsolutePath("test2/mydir")); 541 return true; 542 })); 543 assert(ev[1].tryMatch!((Event.Delete x) { 544 assert(x.path == AbsolutePath("test2/mydir")); 545 return true; 546 })); 547 548 // test for creation, modification, removal of subdirectory 549 mkdir("test2/subdir"); 550 ev = watcher.getEvents(5.dur!"seconds"); 551 assert(ev.length == 1); 552 assert(ev[0].tryMatch!((Event.Create x) { 553 assert(x.path == AbsolutePath("test2/subdir")); 554 // add the created directory to be watched 555 watcher.watchRecurse(x.path); 556 return true; 557 })); 558 559 write("test2/subdir/c.txt", "abc"); 560 ev = watcher.getEvents(5.dur!"seconds"); 561 assert(ev.length == 3); 562 assert(ev[0].tryMatch!((Event.Create x) { 563 assert(x.path == AbsolutePath("test2/subdir/c.txt")); 564 return true; 565 })); 566 567 write("test2/subdir/c.txt", "\nabc"); 568 ev = watcher.getEvents(5.dur!"seconds"); 569 assert(ev.length == 2); 570 assert(ev[0].tryMatch!((Event.Modify x) { 571 assert(x.path == AbsolutePath("test2/subdir/c.txt")); 572 return true; 573 })); 574 575 rmdirRecurse("test2/subdir"); 576 ev = watcher.getEvents(5.dur!"seconds"); 577 assert(ev.length == 3); 578 foreach (e; ev) { 579 assert(ev[0].tryMatch!((Event.Delete x) { 580 assert(canFind([ 581 AbsolutePath("test2/subdir/c.txt"), 582 AbsolutePath("test2/subdir") 583 ], x.path)); 584 return true; 585 }, (Event.DeleteSelf x) { 586 assert(x.path == AbsolutePath("test2/subdir")); 587 return true; 588 })); 589 } 590 591 // removal of watched folder 592 rmdirRecurse("test2"); 593 ev = watcher.getEvents(5.dur!"seconds"); 594 assert(ev.length == 1); 595 assert(ev[0].tryMatch!((Event.DeleteSelf x) { 596 assert(x.path == AbsolutePath("test2")); 597 return true; 598 })); 599 } 600 601 struct MonitorResult { 602 enum Kind { 603 Access, 604 Attribute, 605 CloseWrite, 606 CloseNoWrite, 607 Create, 608 Delete, 609 DeleteSelf, 610 Modify, 611 MoveSelf, 612 Rename, 613 Open, 614 Overflow, 615 } 616 617 Kind kind; 618 AbsolutePath path; 619 } 620 621 /** Monitor root's for filesystem changes which create/remove/modify 622 * files/directories. 623 */ 624 struct Monitor { 625 import std.array : appender; 626 import std.file : isDir; 627 import std.typecons : Tuple, tuple; 628 import std.utf : UTFException; 629 import my.filter : GlobFilter; 630 import my.fswatch; 631 632 private { 633 Set!AbsolutePath roots; 634 FileWatch fw; 635 // global additives to subFilters. 636 GlobFilter fileFilter; 637 GlobFilter[AbsolutePath] subFilters; 638 uint events; 639 640 // roots that has been removed that may be re-added later on. the user 641 // expects them to trigger events. 642 Set!AbsolutePath monitorRoots; 643 } 644 645 /** 646 * Params: 647 * roots = directories to recursively monitor 648 */ 649 this(AbsolutePath[] roots, GlobFilter fileFilter, 650 FileWatch.FollowSymlink follow = FileWatch.FollowSymlink.init, 651 uint events = ContentEvents) { 652 this(roots, fileFilter, null, follow, events); 653 } 654 655 this(AbsolutePath[] roots, GlobFilter fileFilter, GlobFilter[AbsolutePath] subFilters, 656 FileWatch.FollowSymlink follow, uint events = ContentEvents) { 657 this.roots = toSet(roots); 658 this.fileFilter = fileFilter; 659 this.subFilters = subFilters; 660 this.events = events; 661 662 auto app = appender!(AbsolutePath[])(); 663 fw = fileWatch(follow); 664 foreach (r; roots) { 665 app.put(fw.watchRecurse(r, events, (a) { 666 return isInteresting(fileFilter, subFilters, a); 667 })); 668 } 669 670 logger.trace(!app.data.empty, "unable to watch ", app.data); 671 } 672 673 static bool isInteresting(GlobFilter rootFilter, ref GlobFilter[AbsolutePath] subFilters, 674 string p) nothrow { 675 import my.file : existsAnd; 676 import my.filter : closest, GlobFilterClosestMatch, merge; 677 678 try { 679 const ap = AbsolutePath(p); 680 681 if (existsAnd!isDir(ap)) 682 return true; 683 auto f = closest(subFilters, ap).orElse(GlobFilterClosestMatch(rootFilter, 684 AbsolutePath("."))); 685 f.filter = merge(f.filter, rootFilter); 686 return f.match(ap.toString); 687 } catch (Exception e) { 688 collectException(logger.trace(e.msg)); 689 } 690 691 return false; 692 } 693 694 /** Wait up to `timeout` for an event to occur for the monitored `roots`. 695 * 696 * Params: 697 * timeout = how long to wait for the event 698 */ 699 MonitorResult[] wait(Duration timeout) { 700 import std.array : array; 701 import std.algorithm : canFind, startsWith, filter; 702 703 auto rval = appender!(MonitorResult[])(); 704 705 { 706 auto rm = appender!(AbsolutePath[])(); 707 foreach (a; monitorRoots.toRange.filter!(a => exists(a))) { 708 fw.watchRecurse(a, events, a => isInteresting(fileFilter, subFilters, a)); 709 rm.put(a); 710 rval.put(MonitorResult(MonitorResult.Kind.Create, a)); 711 } 712 foreach (a; rm.data) { 713 monitorRoots.remove(a); 714 } 715 } 716 717 if (!rval.data.empty) { 718 // collect whatever events that happend to have queued up together 719 // with the artifically created. 720 timeout = Duration.zero; 721 } 722 723 try { 724 foreach (e; fw.getEvents(timeout)) { 725 e.match!((Event.Overflow x) { 726 rval.put(MonitorResult(MonitorResult.Kind.Overflow)); 727 }, (Event.Access x) { 728 rval.put(MonitorResult(MonitorResult.Kind.Access, x.path)); 729 }, (Event.Attribute x) { 730 rval.put(MonitorResult(MonitorResult.Kind.Attribute, x.path)); 731 }, (Event.CloseWrite x) { 732 rval.put(MonitorResult(MonitorResult.Kind.CloseWrite, x.path)); 733 }, (Event.CloseNoWrite x) { 734 rval.put(MonitorResult(MonitorResult.Kind.CloseNoWrite, x.path)); 735 }, (Event.Create x) { 736 rval.put(MonitorResult(MonitorResult.Kind.Create, x.path)); 737 fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, subFilters, a)); 738 }, (Event.Modify x) { 739 rval.put(MonitorResult(MonitorResult.Kind.Modify, x.path)); 740 }, (Event.MoveSelf x) { 741 rval.put(MonitorResult(MonitorResult.Kind.MoveSelf, x.path)); 742 fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, subFilters, a)); 743 744 if (x.path in roots) { 745 monitorRoots.add(x.path); 746 } 747 }, (Event.Delete x) { 748 rval.put(MonitorResult(MonitorResult.Kind.Delete, x.path)); 749 }, (Event.DeleteSelf x) { 750 rval.put(MonitorResult(MonitorResult.Kind.DeleteSelf, x.path)); 751 752 if (x.path in roots) { 753 monitorRoots.add(x.path); 754 } 755 }, (Event.Rename x) { 756 rval.put(MonitorResult(MonitorResult.Kind.Rename, x.to)); 757 }, (Event.Open x) { 758 rval.put(MonitorResult(MonitorResult.Kind.Open, x.path)); 759 },); 760 } 761 } catch (Exception e) { 762 logger.trace(e.msg); 763 } 764 765 return rval.data.filter!(a => isInteresting(fileFilter, subFilters, a.path)).array; 766 } 767 768 /** Collects events from the monitored `roots` over a period. 769 * 770 * Params: 771 * collectTime = for how long to clear the queue 772 */ 773 MonitorResult[] collect(Duration collectTime) { 774 import std.algorithm : max, min; 775 import std.datetime : Clock; 776 777 auto rval = appender!(MonitorResult[])(); 778 const stopAt = Clock.currTime + collectTime; 779 780 do { 781 collectTime = max(stopAt - Clock.currTime, 1.dur!"msecs"); 782 if (!monitorRoots.empty) { 783 // must use a hybrid approach of poll + inotify because if a 784 // root is added it will only be detected by polling. 785 collectTime = min(10.dur!"msecs", collectTime); 786 } 787 788 rval.put(wait(collectTime)); 789 } 790 while (Clock.currTime < stopAt); 791 792 return rval.data; 793 } 794 } 795 796 @("shall re-apply monitoring for a file that is removed") 797 unittest { 798 import my.filter : GlobFilter; 799 import my.test; 800 801 auto ta = makeTestArea("re-apply monitoring"); 802 const testTxt = ta.inSandbox("test.txt").AbsolutePath; 803 804 write(testTxt, "abc"); 805 auto fw = Monitor([testTxt], GlobFilter(["*"], null)); 806 write(testTxt, "abcc"); 807 assert(!fw.wait(Duration.zero).empty); 808 809 remove(testTxt); 810 assert(!fw.wait(Duration.zero).empty); 811 812 write(testTxt, "abcc"); 813 assert(!fw.wait(Duration.zero).empty); 814 } 815 816 /** A file descriptor to poll. 817 */ 818 struct FdPoll { 819 int value; 820 } 821 822 /// Uses the linux poll syscall to wait for activity on the file descriptors. 823 struct FdPoller { 824 import std.algorithm : min, filter; 825 826 private { 827 pollfd[] fds; 828 PollResult[] results; 829 } 830 831 void put(FdPoll fd, PollEvent[] evs) { 832 import core.sys.posix.poll; 833 834 pollfd pfd; 835 pfd.fd = fd.value; 836 foreach (e; evs) { 837 final switch (e) with (PollEvent) { 838 case in_: 839 pfd.events |= POLLIN; 840 break; 841 case out_: 842 pfd.events |= POLLOUT; 843 break; 844 } 845 } 846 fds ~= pfd; 847 848 // they must be the same length or else `wait` will fail. 849 results.length = fds.length; 850 } 851 852 void remove(FdPoll fd) { 853 fds = fds.filter!(a => a.fd != fd.value).array; 854 855 results.length = fds.length; 856 } 857 858 PollResult[] wait(Duration timeout = Duration.zero) { 859 import core.sys.posix.poll; 860 import std.bitmanip : BitArray; 861 862 const code = poll(&fds[0], fds.length, cast(int) min(int.max, timeout.total!"msecs")); 863 864 if (code < 0) { 865 import core.stdc.errno : errno, EINTR; 866 867 if (errno == EINTR) { 868 // poll just interrupted. try again. 869 return (PollResult[]).init; 870 } 871 872 throw new Exception("Failed to poll events. Error code " ~ errno.to!string); 873 } else if (code == 0) { 874 // timeout triggered 875 return (PollResult[]).init; 876 } 877 878 size_t idx; 879 foreach (a; fds.filter!(a => a.revents != 0)) { 880 PollResult res; 881 res.status = BitArray([ 882 (a.revents & POLLIN) != 0, (a.revents & POLLOUT) != 0, 883 (a.revents & POLLPRI) != 0, (a.revents & POLLERR) != 0, 884 (a.revents & POLLHUP) != 0, (a.revents & POLLNVAL) != 0, 885 ]); 886 res.fd = FdPoll(a.fd); 887 results[idx] = res; 888 idx++; 889 } 890 891 return results[0 .. idx]; 892 } 893 } 894 895 /// Type of event to poll for. 896 enum PollEvent { 897 in_, 898 out_, 899 } 900 901 /// What each bit in `PollResult.status` represent. 902 enum PollStatus { 903 // There is data to read. 904 in_, 905 // Writing is now possible, though a write larger that the available 906 // space in a socket or pipe will still block (unless O_NONBLOCK is set). 907 out_, 908 // There is some exceptional condition on the file descriptor. Possibilities include: 909 // * There is out-of-band data on a TCP socket (see tcp(7)). 910 // * A pseudoterminal master in packet mode has seen a state change on the slave (see ioctl_tty(2)). 911 // * A cgroup.events file has been modified (see cgroups(7)). 912 pri, 913 // Error condition (only returned in revents; ignored in events). This bit 914 // is also set for a file descriptor referring to the write end of a pipe 915 // when the read end has been closed. 916 error, 917 // Hang up (only returned in revents; ignored in events). Note that when 918 // reading from a channel such as a pipe or a stream socket, this event 919 // merely indicates that the peer closed its end of the channel. 920 // Subsequent reads from the channel will re‐ turn 0 (end of file) only 921 // after all outstanding data in the channel has been consumed. 922 hup, 923 /// Invalid request: fd not open (only returned in revents; ignored in events). 924 nval, 925 } 926 927 /// File descriptors that triggered. 928 struct PollResult { 929 import std.bitmanip : BitArray; 930 931 BitArray status; 932 FdPoll fd; 933 }