1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 This is based on webfreak's
7 [fswatch](git@github.com:WebFreak001/FSWatch.git). I had problems with the
8 API as it where because I needed to be able to watch multiple directories,
9 filter what files are to be watched and to be robust against broken symlinks.
10 
11 Lets say you want to watch a directory for changes and add all directories to
12 be watched too.
13 
14 ---
15 auto fw = fileWatch();
16 fw.watchRecurse("my_dir");
17 while (true) {
18     auto ev = fw.wait;
19     foreach (e; ev) {
20         e.match!(
21         (Event.Access x) => writeln(x),
22         (Event.Attribute x) => writeln(x),
23         (Event.CloseWrite x) => writeln(x),
24         (Event.CloseNoWrite x) => writeln(x),
25         (Event.Create x) { fw.watchRecurse(x.path); },
26         (Event.Delete x) => writeln(x),
27         (Event.DeleteSelf x) => writeln(x),
28         (Event.Modify x) => writeln(x),
29         (Event.MoveSelf x) => writeln(x),
30         (Event.Rename x) => writeln(x),
31         (Event.Open x) => writeln(x),
32         );
33     }
34 }
35 ---
36 */
37 module my.fswatch;
38 
39 import core.sys.linux.errno : errno;
40 import core.sys.linux.fcntl : fcntl, F_SETFD, FD_CLOEXEC;
41 import core.sys.linux.sys.inotify : inotify_rm_watch, inotify_init1, inotify_add_watch, inotify_event, IN_CLOEXEC,
42     IN_NONBLOCK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE,
43     IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO,
44     IN_CREATE, IN_DELETE, IN_DELETE_SELF, IN_MOVE_SELF, IN_UNMOUNT, IN_IGNORED, IN_EXCL_UNLINK;
45 import core.sys.linux.unistd : close, read;
46 import core.sys.posix.poll : pollfd, poll, POLLIN, POLLNVAL;
47 import core.thread : Thread;
48 import core.time : dur, Duration;
49 import logger = std.experimental.logger;
50 import std.array : appender, empty, array;
51 import std.conv : to;
52 import std.file : DirEntry, isDir, dirEntries, rmdirRecurse, write, append,
53     rename, remove, exists, SpanMode, mkdir, rmdir;
54 import std.path : buildPath;
55 import std.range : isInputRange;
56 import std..string : toStringz, fromStringz;
57 import std.exception : collectException;
58 
59 import sumtype;
60 
61 import my.named_type;
62 import my.optional;
63 import my.path : AbsolutePath, Path;
64 import my.set;
65 
66 struct Event {
67     /// An overflow occured. Unknown what events actually triggered.
68     static struct Overflow {
69     }
70 
71     /// File was accessed (e.g., read(2), execve(2)).
72     static struct Access {
73         AbsolutePath path;
74     }
75 
76     /** Metadata changed—for example, permissions (e.g., chmod(2)), timestamps
77      * (e.g., utimensat(2)), extended attributes (setxattr(2)), link count
78      * (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)),
79      * and user/group ID (e.g., chown(2)).
80      */
81     static struct Attribute {
82         AbsolutePath path;
83     }
84 
85     /// File opened for writing was closed.
86     static struct CloseWrite {
87         AbsolutePath path;
88     }
89 
90     /// File or directory not opened for writing was closed.
91     static struct CloseNoWrite {
92         AbsolutePath path;
93     }
94 
95     /** File/directory created in watched directory (e.g., open(2) O_CREAT,
96      * mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket).
97      */
98     static struct Create {
99         AbsolutePath path;
100     }
101 
102     /// File/directory deleted from watched directory.
103     static struct Delete {
104         AbsolutePath path;
105     }
106 
107     /** Watched file/directory was itself deleted. (This event also occurs if
108      * an object is moved to another filesystem, since mv(1) in effect copies
109      * the file to the other filesystem and then deletes it from the original
110      * filesys‐ tem.)  In addition, an IN_IGNORED event will subsequently be
111      * generated for the watch descriptor.
112      */
113     static struct DeleteSelf {
114         AbsolutePath path;
115     }
116 
117     /// File was modified (e.g., write(2), truncate(2)).
118     static struct Modify {
119         AbsolutePath path;
120     }
121 
122     /// Watched file/directory was itself moved.
123     static struct MoveSelf {
124         AbsolutePath path;
125     }
126 
127     /// Occurs when a file or folder inside a folder is renamed.
128     static struct Rename {
129         AbsolutePath from;
130         AbsolutePath to;
131     }
132 
133     /// File or directory was opened.
134     static struct Open {
135         AbsolutePath path;
136     }
137 }
138 
139 alias FileChangeEvent = SumType!(Event.Access, Event.Attribute, Event.CloseWrite,
140         Event.CloseNoWrite, Event.Create, Event.Delete, Event.DeleteSelf,
141         Event.Modify, Event.MoveSelf, Event.Rename, Event.Open, Event.Overflow);
142 
143 /// Construct a FileWatch.
144 auto fileWatch(FileWatch.FollowSymlink follow = FileWatch.FollowSymlink.init) {
145     int fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
146     if (fd == -1) {
147         throw new Exception(
148                 "inotify_init1 returned invalid file descriptor. Error code " ~ errno.to!string);
149     }
150     return FileWatch(fd, follow);
151 }
152 
153 /// Listens for create/modify/removal of files and directories.
154 enum ContentEvents = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY
155     | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_EXCL_UNLINK | IN_CLOSE_WRITE;
156 
157 /// Listen for events that change the metadata.
158 enum MetadataEvents = IN_ACCESS | IN_ATTRIB | IN_OPEN | IN_CLOSE_NOWRITE | IN_EXCL_UNLINK;
159 
160 /** An instance of a FileWatcher
161  */
162 struct FileWatch {
163     import std.functional : toDelegate;
164 
165     alias FollowSymlink = NamedType!(bool, Tag!"FollowSymlink", bool.init, TagStringable);
166 
167     private {
168         FdPoller poller;
169         int fd;
170         ubyte[1024 * 4] eventBuffer; // 4kb buffer for events
171         struct FDInfo {
172             int wd;
173             bool watched;
174             Path path;
175 
176             this(this) {
177             }
178         }
179 
180         FDInfo[int] directoryMap; // map every watch descriptor to a directory
181 
182         FollowSymlink follow;
183     }
184 
185     private this(int fd, FollowSymlink follow) {
186         this.fd = fd;
187         this.follow = follow;
188         poller.put(FdPoll(fd), [PollEvent.in_]);
189     }
190 
191     ~this() {
192         if (fd) {
193             foreach (fdinfo; directoryMap.byValue) {
194                 if (fdinfo.watched)
195                     inotify_rm_watch(fd, fdinfo.wd);
196             }
197             close(fd);
198         }
199     }
200 
201     /** Add a path to watch for events.
202      *
203      * Params:
204      *  path = path to watch
205      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
206      *
207      * Returns: true if the path was successfully added.
208      */
209     bool watch(Path path, uint events = ContentEvents) {
210         import my.file : followSymlink;
211         import my.optional;
212 
213         if (follow)
214             path = followSymlink(path).orElse(path);
215 
216         const wd = inotify_add_watch(fd, path.toStringz, events);
217         if (wd != -1) {
218             const fc = fcntl(fd, F_SETFD, FD_CLOEXEC);
219             if (fc != -1) {
220                 directoryMap[wd] = FDInfo(wd, true, path);
221                 return true;
222             }
223         }
224 
225         return false;
226     }
227 
228     ///
229     bool watch(string p, uint events = ContentEvents) {
230         return watch(Path(p));
231     }
232 
233     private static bool allFiles(string p) {
234         return true;
235     }
236 
237     /** Recursively add the path and all its subdirectories and files to be watched.
238      *
239      * Params:
240      *  pred = only those files and directories that `pred` returns true for are watched, by default every file/directory.
241      *  root = directory to watch together with its content and subdirectories.
242      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
243      *
244      * Returns: paths that failed to be added.
245      */
246     AbsolutePath[] watchRecurse(Path root, uint events = ContentEvents,
247             bool delegate(string) pred = toDelegate(&allFiles)) {
248         import std.algorithm : filter;
249         import my.file : existsAnd;
250 
251         auto failed = appender!(AbsolutePath[])();
252 
253         if (!watch(root, events)) {
254             failed.put(AbsolutePath(root));
255         }
256 
257         if (!existsAnd!isDir(root)) {
258             return failed.data;
259         }
260 
261         auto dirs = [AbsolutePath(root)];
262         Set!AbsolutePath visited;
263         while (!dirs.empty) {
264             auto front = dirs[0];
265             dirs = dirs[1 .. $];
266             if (front in visited)
267                 continue;
268             visited.add(front);
269 
270             try {
271                 foreach (p; dirEntries(front, SpanMode.shallow).filter!(a => pred(a.name))) {
272                     if (!watch(Path(p.name), events)) {
273                         failed.put(AbsolutePath(p.name));
274                     }
275                     if (existsAnd!isDir(Path(p.name))) {
276                         dirs ~= AbsolutePath(p.name);
277                     }
278                 }
279             } catch (Exception e) {
280                 () @trusted { logger.trace(e); }();
281                 logger.trace(e.msg);
282                 failed.put(AbsolutePath(front));
283             }
284         }
285 
286         return failed.data;
287     }
288 
289     ///
290     AbsolutePath[] watchRecurse(string root, uint events = ContentEvents,
291             bool delegate(string) pred = toDelegate(&allFiles)) {
292         return watchRecurse(Path(root), events, pred);
293     }
294 
295     /** The events that have occured since last query.
296      *
297      * Params:
298      *  timeout = max time to wait for events.
299      *
300      * Returns: the events that has occured to the watched paths.
301      */
302     FileChangeEvent[] getEvents(Duration timeout = Duration.zero) {
303         import std.algorithm : min;
304 
305         FileChangeEvent[] events;
306         if (!fd)
307             return events;
308 
309         auto res = poller.wait(timeout);
310 
311         if (res.empty) {
312             return events;
313         }
314 
315         if (res[0].status[PollStatus.nval]) {
316             throw new Exception("Failed to poll events. File descriptor not open " ~ fd.to!string);
317         }
318 
319         if (!res[0].status[PollStatus.in_]) {
320             // no events to read
321             return events;
322         }
323 
324         const receivedBytes = read(fd, eventBuffer.ptr, eventBuffer.length);
325         int i = 0;
326         AbsolutePath[uint] cookie;
327         while (true) {
328             auto info = cast(inotify_event*)(eventBuffer.ptr + i);
329 
330             if (info.wd == -1) {
331                 events ~= FileChangeEvent(Event.Overflow.init);
332             }
333             if (info.wd !in directoryMap)
334                 continue;
335 
336             auto fname = () {
337                 string fileName = info.name.ptr.fromStringz.idup;
338                 return AbsolutePath(buildPath(directoryMap[info.wd].path, fileName));
339             }();
340 
341             if ((info.mask & IN_MOVED_TO) == 0) {
342                 if (auto v = info.cookie in cookie) {
343                     events ~= FileChangeEvent(Event.Delete(*v));
344                     cookie.remove(info.cookie);
345                 }
346             }
347 
348             if ((info.mask & IN_ACCESS) != 0) {
349                 events ~= FileChangeEvent(Event.Access(fname));
350             }
351 
352             if ((info.mask & IN_ATTRIB) != 0) {
353                 events ~= FileChangeEvent(Event.Attribute(fname));
354             }
355 
356             if ((info.mask & IN_CLOSE_WRITE) != 0) {
357                 events ~= FileChangeEvent(Event.CloseWrite(fname));
358             }
359 
360             if ((info.mask & IN_CLOSE_NOWRITE) != 0) {
361                 events ~= FileChangeEvent(Event.CloseNoWrite(fname));
362             }
363 
364             if ((info.mask & IN_CREATE) != 0) {
365                 events ~= FileChangeEvent(Event.Create(fname));
366             }
367 
368             if ((info.mask & IN_DELETE) != 0) {
369                 events ~= FileChangeEvent(Event.Delete(fname));
370             }
371 
372             if ((info.mask & IN_DELETE_SELF) != 0) {
373                 // must go via the mapping or there may be trailing junk in fname.
374                 events ~= FileChangeEvent(Event.DeleteSelf(directoryMap[info.wd].path.AbsolutePath));
375             }
376 
377             if ((info.mask & IN_MODIFY) != 0) {
378                 events ~= FileChangeEvent(Event.Modify(fname));
379             }
380 
381             if ((info.mask & IN_MOVE_SELF) != 0) {
382                 // must go via the mapping or there may be trailing junk in fname.
383                 events ~= FileChangeEvent(Event.MoveSelf(directoryMap[info.wd].path.AbsolutePath));
384             }
385 
386             if ((info.mask & IN_MOVED_FROM) != 0) {
387                 cookie[info.cookie] = fname;
388             }
389 
390             if ((info.mask & IN_MOVED_TO) != 0) {
391                 if (auto v = info.cookie in cookie) {
392                     events ~= FileChangeEvent(Event.Rename(*v, fname));
393                     cookie.remove(info.cookie);
394                 } else {
395                     events ~= FileChangeEvent(Event.Create(fname));
396                 }
397             }
398 
399             if ((info.mask & IN_DELETE_SELF) != 0 || (info.mask & IN_MOVE_SELF) != 0) {
400                 inotify_rm_watch(fd, info.wd);
401                 directoryMap[info.wd].watched = false;
402             }
403 
404             i += inotify_event.sizeof + info.len;
405 
406             if (i >= receivedBytes)
407                 break;
408         }
409 
410         foreach (c; cookie.byValue) {
411             events ~= FileChangeEvent(Event.Delete(AbsolutePath(c)));
412         }
413 
414         return events;
415     }
416 }
417 
418 ///
419 unittest {
420     import core.thread;
421 
422     if (exists("test"))
423         rmdirRecurse("test");
424     scope (exit) {
425         if (exists("test"))
426             rmdirRecurse("test");
427     }
428 
429     auto watcher = fileWatch();
430 
431     mkdir("test");
432     assert(watcher.watch("test"));
433 
434     write("test/a.txt", "abc");
435     auto ev = watcher.getEvents(5.dur!"seconds");
436     assert(ev.length > 0);
437     assert(ev[0].tryMatch!((Event.Create x) {
438             assert(x.path == AbsolutePath("test/a.txt"));
439             return true;
440         }));
441 
442     append("test/a.txt", "def");
443     ev = watcher.getEvents(5.dur!"seconds");
444     assert(ev.length > 0);
445     assert(ev[0].tryMatch!((Event.Modify x) {
446             assert(x.path == AbsolutePath("test/a.txt"));
447             return true;
448         }));
449 
450     rename("test/a.txt", "test/b.txt");
451     ev = watcher.getEvents(5.dur!"seconds");
452     assert(ev.length > 0);
453     assert(ev[0].tryMatch!((Event.Rename x) {
454             assert(x.from == AbsolutePath("test/a.txt"));
455             assert(x.to == AbsolutePath("test/b.txt"));
456             return true;
457         }));
458 
459     remove("test/b.txt");
460     ev = watcher.getEvents(5.dur!"seconds");
461     assert(ev.length > 0);
462     assert(ev[0].tryMatch!((Event.Delete x) {
463             assert(x.path == AbsolutePath("test/b.txt"));
464             return true;
465         }));
466 
467     rmdirRecurse("test");
468     ev = watcher.getEvents(5.dur!"seconds");
469     assert(ev.length > 0);
470     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
471             assert(x.path == AbsolutePath("test"));
472             return true;
473         }));
474 }
475 
476 ///
477 unittest {
478     import std.algorithm : canFind;
479 
480     if (exists("test2"))
481         rmdirRecurse("test2");
482     if (exists("test3"))
483         rmdirRecurse("test3");
484     scope (exit) {
485         if (exists("test2"))
486             rmdirRecurse("test2");
487         if (exists("test3"))
488             rmdirRecurse("test3");
489     }
490 
491     auto watcher = fileWatch();
492     mkdir("test2");
493     assert(watcher.watchRecurse("test2").length == 0);
494 
495     write("test2/a.txt", "abc");
496     auto ev = watcher.getEvents(5.dur!"seconds");
497     assert(ev.length == 3);
498     assert(ev[0].tryMatch!((Event.Create x) {
499             assert(x.path == AbsolutePath("test2/a.txt"));
500             return true;
501         }));
502     assert(ev[1].tryMatch!((Event.Modify x) {
503             assert(x.path == AbsolutePath("test2/a.txt"));
504             return true;
505         }));
506     assert(ev[2].tryMatch!((Event.CloseWrite x) {
507             assert(x.path == AbsolutePath("test2/a.txt"));
508             return true;
509         }));
510 
511     rename("test2/a.txt", "./testfile-a.txt");
512     ev = watcher.getEvents(5.dur!"seconds");
513     assert(ev.length == 1);
514     assert(ev[0].tryMatch!((Event.Delete x) {
515             assert(x.path == AbsolutePath("test2/a.txt"));
516             return true;
517         }));
518 
519     rename("./testfile-a.txt", "test2/b.txt");
520     ev = watcher.getEvents(5.dur!"seconds");
521     assert(ev.length == 1);
522     assert(ev[0].tryMatch!((Event.Create x) {
523             assert(x.path == AbsolutePath("test2/b.txt"));
524             return true;
525         }));
526 
527     remove("test2/b.txt");
528     ev = watcher.getEvents(5.dur!"seconds");
529     assert(ev.length == 1);
530     assert(ev[0].tryMatch!((Event.Delete x) {
531             assert(x.path == AbsolutePath("test2/b.txt"));
532             return true;
533         }));
534 
535     mkdir("test2/mydir");
536     rmdir("test2/mydir");
537     ev = watcher.getEvents(5.dur!"seconds");
538     assert(ev.length == 2);
539     assert(ev[0].tryMatch!((Event.Create x) {
540             assert(x.path == AbsolutePath("test2/mydir"));
541             return true;
542         }));
543     assert(ev[1].tryMatch!((Event.Delete x) {
544             assert(x.path == AbsolutePath("test2/mydir"));
545             return true;
546         }));
547 
548     // test for creation, modification, removal of subdirectory
549     mkdir("test2/subdir");
550     ev = watcher.getEvents(5.dur!"seconds");
551     assert(ev.length == 1);
552     assert(ev[0].tryMatch!((Event.Create x) {
553             assert(x.path == AbsolutePath("test2/subdir"));
554             // add the created directory to be watched
555             watcher.watchRecurse(x.path);
556             return true;
557         }));
558 
559     write("test2/subdir/c.txt", "abc");
560     ev = watcher.getEvents(5.dur!"seconds");
561     assert(ev.length == 3);
562     assert(ev[0].tryMatch!((Event.Create x) {
563             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
564             return true;
565         }));
566 
567     write("test2/subdir/c.txt", "\nabc");
568     ev = watcher.getEvents(5.dur!"seconds");
569     assert(ev.length == 2);
570     assert(ev[0].tryMatch!((Event.Modify x) {
571             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
572             return true;
573         }));
574 
575     rmdirRecurse("test2/subdir");
576     ev = watcher.getEvents(5.dur!"seconds");
577     assert(ev.length == 3);
578     foreach (e; ev) {
579         assert(ev[0].tryMatch!((Event.Delete x) {
580                 assert(canFind([
581                     AbsolutePath("test2/subdir/c.txt"),
582                     AbsolutePath("test2/subdir")
583                 ], x.path));
584                 return true;
585             }, (Event.DeleteSelf x) {
586                 assert(x.path == AbsolutePath("test2/subdir"));
587                 return true;
588             }));
589     }
590 
591     // removal of watched folder
592     rmdirRecurse("test2");
593     ev = watcher.getEvents(5.dur!"seconds");
594     assert(ev.length == 1);
595     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
596             assert(x.path == AbsolutePath("test2"));
597             return true;
598         }));
599 }
600 
601 struct MonitorResult {
602     enum Kind {
603         Access,
604         Attribute,
605         CloseWrite,
606         CloseNoWrite,
607         Create,
608         Delete,
609         DeleteSelf,
610         Modify,
611         MoveSelf,
612         Rename,
613         Open,
614         Overflow,
615     }
616 
617     Kind kind;
618     AbsolutePath path;
619 }
620 
621 /** Monitor root's for filesystem changes which create/remove/modify
622  * files/directories.
623  */
624 struct Monitor {
625     import std.array : appender;
626     import std.file : isDir;
627     import std.typecons : Tuple, tuple;
628     import std.utf : UTFException;
629     import my.filter : GlobFilter;
630     import my.fswatch;
631 
632     private {
633         Set!AbsolutePath roots;
634         FileWatch fw;
635         // global additives to subFilters.
636         GlobFilter fileFilter;
637         GlobFilter[AbsolutePath] subFilters;
638         uint events;
639 
640         // roots that has been removed that may be re-added later on. the user
641         // expects them to trigger events.
642         Set!AbsolutePath monitorRoots;
643     }
644 
645     /**
646      * Params:
647      *  roots = directories to recursively monitor
648      */
649     this(AbsolutePath[] roots, GlobFilter fileFilter,
650             FileWatch.FollowSymlink follow = FileWatch.FollowSymlink.init,
651             uint events = ContentEvents) {
652         this(roots, fileFilter, null, follow, events);
653     }
654 
655     this(AbsolutePath[] roots, GlobFilter fileFilter, GlobFilter[AbsolutePath] subFilters,
656             FileWatch.FollowSymlink follow, uint events = ContentEvents) {
657         this.roots = toSet(roots);
658         this.fileFilter = fileFilter;
659         this.subFilters = subFilters;
660         this.events = events;
661 
662         auto app = appender!(AbsolutePath[])();
663         fw = fileWatch(follow);
664         foreach (r; roots) {
665             app.put(fw.watchRecurse(r, events, (a) {
666                     return isInteresting(fileFilter, subFilters, a);
667                 }));
668         }
669 
670         logger.trace(!app.data.empty, "unable to watch ", app.data);
671     }
672 
673     static bool isInteresting(GlobFilter rootFilter, ref GlobFilter[AbsolutePath] subFilters,
674             string p) nothrow {
675         import my.file : existsAnd;
676         import my.filter : closest, GlobFilterClosestMatch, merge;
677 
678         try {
679             const ap = AbsolutePath(p);
680 
681             if (existsAnd!isDir(ap))
682                 return true;
683             auto f = closest(subFilters, ap).orElse(GlobFilterClosestMatch(rootFilter,
684                     AbsolutePath(".")));
685             f.filter = merge(f.filter, rootFilter);
686             return f.match(ap.toString);
687         } catch (Exception e) {
688             collectException(logger.trace(e.msg));
689         }
690 
691         return false;
692     }
693 
694     /** Wait up to `timeout` for an event to occur for the monitored `roots`.
695      *
696      * Params:
697      *  timeout = how long to wait for the event
698      */
699     MonitorResult[] wait(Duration timeout) {
700         import std.array : array;
701         import std.algorithm : canFind, startsWith, filter;
702 
703         auto rval = appender!(MonitorResult[])();
704 
705         {
706             auto rm = appender!(AbsolutePath[])();
707             foreach (a; monitorRoots.toRange.filter!(a => exists(a))) {
708                 fw.watchRecurse(a, events, a => isInteresting(fileFilter, subFilters, a));
709                 rm.put(a);
710                 rval.put(MonitorResult(MonitorResult.Kind.Create, a));
711             }
712             foreach (a; rm.data) {
713                 monitorRoots.remove(a);
714             }
715         }
716 
717         if (!rval.data.empty) {
718             // collect whatever events that happend to have queued up together
719             // with the artifically created.
720             timeout = Duration.zero;
721         }
722 
723         try {
724             foreach (e; fw.getEvents(timeout)) {
725                 e.match!((Event.Overflow x) {
726                     rval.put(MonitorResult(MonitorResult.Kind.Overflow));
727                 }, (Event.Access x) {
728                     rval.put(MonitorResult(MonitorResult.Kind.Access, x.path));
729                 }, (Event.Attribute x) {
730                     rval.put(MonitorResult(MonitorResult.Kind.Attribute, x.path));
731                 }, (Event.CloseWrite x) {
732                     rval.put(MonitorResult(MonitorResult.Kind.CloseWrite, x.path));
733                 }, (Event.CloseNoWrite x) {
734                     rval.put(MonitorResult(MonitorResult.Kind.CloseNoWrite, x.path));
735                 }, (Event.Create x) {
736                     rval.put(MonitorResult(MonitorResult.Kind.Create, x.path));
737                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, subFilters, a));
738                 }, (Event.Modify x) {
739                     rval.put(MonitorResult(MonitorResult.Kind.Modify, x.path));
740                 }, (Event.MoveSelf x) {
741                     rval.put(MonitorResult(MonitorResult.Kind.MoveSelf, x.path));
742                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, subFilters, a));
743 
744                     if (x.path in roots) {
745                         monitorRoots.add(x.path);
746                     }
747                 }, (Event.Delete x) {
748                     rval.put(MonitorResult(MonitorResult.Kind.Delete, x.path));
749                 }, (Event.DeleteSelf x) {
750                     rval.put(MonitorResult(MonitorResult.Kind.DeleteSelf, x.path));
751 
752                     if (x.path in roots) {
753                         monitorRoots.add(x.path);
754                     }
755                 }, (Event.Rename x) {
756                     rval.put(MonitorResult(MonitorResult.Kind.Rename, x.to));
757                 }, (Event.Open x) {
758                     rval.put(MonitorResult(MonitorResult.Kind.Open, x.path));
759                 },);
760             }
761         } catch (Exception e) {
762             logger.trace(e.msg);
763         }
764 
765         return rval.data.filter!(a => isInteresting(fileFilter, subFilters, a.path)).array;
766     }
767 
768     /** Collects events from the monitored `roots` over a period.
769      *
770      * Params:
771      *  collectTime = for how long to clear the queue
772      */
773     MonitorResult[] collect(Duration collectTime) {
774         import std.algorithm : max, min;
775         import std.datetime : Clock;
776 
777         auto rval = appender!(MonitorResult[])();
778         const stopAt = Clock.currTime + collectTime;
779 
780         do {
781             collectTime = max(stopAt - Clock.currTime, 1.dur!"msecs");
782             if (!monitorRoots.empty) {
783                 // must use a hybrid approach of poll + inotify because if a
784                 // root is added it will only be detected by polling.
785                 collectTime = min(10.dur!"msecs", collectTime);
786             }
787 
788             rval.put(wait(collectTime));
789         }
790         while (Clock.currTime < stopAt);
791 
792         return rval.data;
793     }
794 }
795 
796 @("shall re-apply monitoring for a file that is removed")
797 unittest {
798     import my.filter : GlobFilter;
799     import my.test;
800 
801     auto ta = makeTestArea("re-apply monitoring");
802     const testTxt = ta.inSandbox("test.txt").AbsolutePath;
803 
804     write(testTxt, "abc");
805     auto fw = Monitor([testTxt], GlobFilter(["*"], null));
806     write(testTxt, "abcc");
807     assert(!fw.wait(Duration.zero).empty);
808 
809     remove(testTxt);
810     assert(!fw.wait(Duration.zero).empty);
811 
812     write(testTxt, "abcc");
813     assert(!fw.wait(Duration.zero).empty);
814 }
815 
816 /** A file descriptor to poll.
817  */
818 struct FdPoll {
819     int value;
820 }
821 
822 /// Uses the linux poll syscall to wait for activity on the file descriptors.
823 struct FdPoller {
824     import std.algorithm : min, filter;
825 
826     private {
827         pollfd[] fds;
828         PollResult[] results;
829     }
830 
831     void put(FdPoll fd, PollEvent[] evs) {
832         import core.sys.posix.poll;
833 
834         pollfd pfd;
835         pfd.fd = fd.value;
836         foreach (e; evs) {
837             final switch (e) with (PollEvent) {
838             case in_:
839                 pfd.events |= POLLIN;
840                 break;
841             case out_:
842                 pfd.events |= POLLOUT;
843                 break;
844             }
845         }
846         fds ~= pfd;
847 
848         // they must be the same length or else `wait` will fail.
849         results.length = fds.length;
850     }
851 
852     void remove(FdPoll fd) {
853         fds = fds.filter!(a => a.fd != fd.value).array;
854 
855         results.length = fds.length;
856     }
857 
858     PollResult[] wait(Duration timeout = Duration.zero) {
859         import core.sys.posix.poll;
860         import std.bitmanip : BitArray;
861 
862         const code = poll(&fds[0], fds.length, cast(int) min(int.max, timeout.total!"msecs"));
863 
864         if (code < 0) {
865             import core.stdc.errno : errno, EINTR;
866 
867             if (errno == EINTR) {
868                 // poll just interrupted. try again.
869                 return (PollResult[]).init;
870             }
871 
872             throw new Exception("Failed to poll events. Error code " ~ errno.to!string);
873         } else if (code == 0) {
874             // timeout triggered
875             return (PollResult[]).init;
876         }
877 
878         size_t idx;
879         foreach (a; fds.filter!(a => a.revents != 0)) {
880             PollResult res;
881             res.status = BitArray([
882                     (a.revents & POLLIN) != 0, (a.revents & POLLOUT) != 0,
883                     (a.revents & POLLPRI) != 0, (a.revents & POLLERR) != 0,
884                     (a.revents & POLLHUP) != 0, (a.revents & POLLNVAL) != 0,
885                     ]);
886             res.fd = FdPoll(a.fd);
887             results[idx] = res;
888             idx++;
889         }
890 
891         return results[0 .. idx];
892     }
893 }
894 
895 /// Type of event to poll for.
896 enum PollEvent {
897     in_,
898     out_,
899 }
900 
901 /// What each bit in `PollResult.status` represent.
902 enum PollStatus {
903     // There is data to read.
904     in_,
905     // Writing  is  now  possible,  though a write larger that the available
906     // space in a socket or pipe will still block (unless O_NONBLOCK is set).
907     out_,
908     // There is some exceptional condition on the file descriptor.  Possibilities include:
909     // *  There is out-of-band data on a TCP socket (see tcp(7)).
910     // *  A pseudoterminal master in packet mode has seen a state change on the slave (see ioctl_tty(2)).
911     // *  A cgroup.events file has been modified (see cgroups(7)).
912     pri,
913     // Error condition (only returned in revents; ignored in events). This bit
914     // is also set for a file descriptor referring to the write end of a pipe
915     // when the read end has been closed.
916     error,
917     // Hang up (only returned in revents; ignored in events). Note that when
918     // reading from a channel such as a pipe or a stream socket, this event
919     // merely indicates that the peer closed its end of the channel.
920     // Subsequent reads from the channel will re‐ turn 0 (end of file) only
921     // after all outstanding data in the channel has been consumed.
922     hup,
923     /// Invalid request: fd not open (only returned in revents; ignored in events).
924     nval,
925 }
926 
927 /// File descriptors that triggered.
928 struct PollResult {
929     import std.bitmanip : BitArray;
930 
931     BitArray status;
932     FdPoll fd;
933 }