Key Functions
seed_index(root)
At startup, fillslive_pathswith absolute paths of all files and directories under the root. This is the baseline from which we count removed objects.norm(path)
Converts a path to an absolute one via.resolve()(removes./.., expands symlinks). Needed for consistent keys.is_under(child, parent)
Checks whetherchildlies insideparent. Used to filter descendants.on_deleted(event)
The main function.
• If a directory was deleted: we mark a “tombstone”, remove its descendants fromlive_paths, count them, open the coalescing window, and print the final summary via a timer.
• If a file was deleted: we add a record to the buffer of “early” deletions and, if the file is inside a folder with a tombstone, we increment its counter._arm_summary_timer(dir_key)/_emit_summary(dir_key)
Restart/execute the summary timer. When the coalescing window expires, they print the total and clear the state.
Theory: macOS FSEvents, queues, and coalescing
How FSEvents delivers events
On macOS, the FSEvents service keeps a per-directory change journal. An application (watchdog) subscribes to directories and receives batches of events where paths are already normalized by the kernel. Important details:
- Batching and OS-level coalescing: during fast cascade deletions (many files → a folder), the system may not send every child event separately. Sometimes you will see only “directory deleted”.
- Delivery order is not strictly guaranteed: individual child events can arrive before or after the directory deletion event—differences of tens/hundreds of milliseconds.
- Threads: watchdog invokes handlers from the observer’s worker thread; our timers live in their own threads. Synchronization is required.
Why, without coalescing, losses/miscounts are possible
If you print logs immediately without a “waiting window”, then:
- you can miss some child deletions (they arrive after “directory deleted”);
- or get double counting: you counted descendants by index first, then “late” file deletions arrived and increased the number again.
What our coalescing does
We added a time window COALESCE_MS. If a directory DELETE arrives, we:
- Count its descendants from our
live_pathsindex (built byseed_indexand updated inon_created/on_modified). - Remove them from
live_pathsand mark a tombstonepending_dir_deletes[p]. - Start the summary timer for
COALESCE_MS. - All “late” file deletions within this folder that arrive within the window are absorbed into
pending_dir_counts[p]. - When the window closes, we print one summary and clear the state.
This protects against “holes” in statistics and against double counting.
How the code executes on directory deletion
Below are the steps that happen in code when on_deleted(is_directory=True) fires:
- Normalize the path
p = norm(event.src_path)→ the path is absolute and consistent. - Prepare the coalescing window
now = time.time()andcutoff = now - COALESCE_MS/1000. Create a tombstone for the directory (under lock)
pending_dir_deletes[p] = now pending_dir_counts[p] = 0This is the root of the summary.
- Absorb early deletions (under lock)
Iteraterecent_file_deletesand take those not earlier thancutoffthat lie underp. They incrementabsorbed_from_buffer. - Count via the index (under lock)
Find all descendants ofpinlive_pathsand count them. This covers the case where the OS sent only “directory deleted” without children. Then remove them from the index (andpitself). - Final counter (under lock)
pending_dir_counts[p] = count_index + absorbed_from_buffer. - Immediate fact log
Print that directorypwas deleted. (The summary is later.) - Start/extend the timer
_arm_summary_timer(p)— starts/restarts the timer forCOALESCE_MS. - What happens “during the window”
If, within the window, individualDELETEevents for files arrive insidep, they find the parent inpending_dir_deletesand incrementpending_dir_counts[p]. The timer may be extended if desired (in your code — restart on repeated_arm_summary_timerfor the parent). - When the timer fires
_emit_summary(p)prints: “Deleted folder X; N objects removed”, and clearspending_dir_*.
Diagrams for visual understanding of the code
1) Architecture (high level)
┌─────────────┐ ┌─────────────────┐ ┌───────────────────┐ ┌───────────────────────┐
│ OS (macOS) │──►│ FSEvents (OS) │──►│ watchdog.Observer │──►│ MyHandler (callbacks) │
└─────────────┘ └─────────────────┘ └───────────────────┘ └───────────┬───────────┘
│
▼
┌──────────────┐
│ on_deleted() │
└──────┬───────┘
│
▼
┌────────────────────────────┐
│ coalescing/summary (300ms)│
└─────────────┬──────────────┘
▼
┌─────────────┐
│ output │
└─────────────┘2) Execution sequence for directory deletion
Actor: Filesystem(FSEvents) Observer(watchdog) MyHandler.on_deleted State/Stores
───────────────────── ────────────────── ───────────────────── ─────────────
1) DELETE dir=/A ───────────► event: is_directory ─► p=norm(/A) live_paths (before)
│
2) lock ────┼──► pending_dir_deletes[p]=now
│ pending_dir_counts[p]=0
│ absorb recent_file_deletes within cutoff
│ descendants = {x ∈ live_paths | x under p}
│ remove descendants & p from live_paths
│ pending_dir_counts[p]+=len(descendants)+absorbed
▼
3) print "[Deleted] Dir: /A"
4) _arm_summary_timer(p) ─────► start/restart Timer(COALESCE_MS)
5) (during window) ◄───── DELETE file=/A/… may arrive
on_deleted(file) → +1 to pending_dir_counts[p], extend timer
6) timer fires ──────────► _emit_summary(p) → print "[Summary] … N objects removed"
clear pending_dir_* for p
3) Finite state machine for on_deleted
┌───────────────────────────────────────────────────────────┐
│ [IDLE] │
└───────────┬───────────────────────────────────────────────┘
│ DELETE(dir)
▼
┌───────────────────────────────────────────────────────────┐
│ [DIR_TOMBSTONE_SET] (tombstone created, index counted) │
└───────────┬───────────────────────────────────────────────┘
│ DELETE(file under dir) during window
▼
┌───────────────────────────────────────────────────────────┐
│ [COALESCE_WINDOW] (accumulate counter, timer runs) │
└───────────┬───────────────────────────────────────────────┘
│ timer expired
▼
┌───────────────────────────────────────────────────────────┐
│ [SUMMARY_EMIT] → print summary, clear pending_* │
└───────────┬───────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────┐
│ [IDLE] │
└───────────────────────────────────────────────────────────┘
4) Coalescing timeline
time → t0 t0+60ms t0+140ms t0+280ms t0+300ms
│ │ │ │ │
FSEvents │ DELETE /A ───►│ │ │ │
files │ DELETE /A/f1 ──►│ │ │
│ │ DELETE /A/f2 ──►│ │
│ │ │ │ (timer)
summary │ emit summary: count = index(A)+absorbed
5) Data structure relationships
┌───────────────────┐ write/remove ┌─────────────────────┐
│ live_paths │◄──────────────────────│ seed_index, events │
│ set[str] │ │ on_created/modified│
└─────────┬─────────┘ └─────────┬───────────┘
│ read (for dir delete) │
▼ │
┌───────────────────┐ append ┌─────────────────────────┐ refer/absorb ┌──────────────────────┐
│ recent_file_del. │◄────────────│ on_deleted(file) │────────────────────►│ pending_dir_counts │
│ deque[(str,float)]│ └─────────────────────────┘ │ dict[str,int] │
└───────────────────┘ └───────────┬──────────┘
│
▼
┌──────────────────────┐
│ │ set/start timer ┌─────────────────────────┐ │ pending_dir_deletes │
│ watchdog timer thread│─────────────────────►│ _arm_summary_timer(dir) │─────────►│ dict[str,float] │
│ │ └───────────┬─────────────┘ └──────────────────────┘
│ │
▼ │
┌─────────────────────────┐ │
│ _emit_summary(dir) │◄────────────────────┘
└─────────────────────────┘ (clear pending_*)
6) Threads and locks
┌───────────────────────────────────────────────┐
│ Observer Thread (watchdog) │
│ calls MyHandler.on_* │
│ - modifies live_paths │
│ - updates pending_dir_* / deque │
└───────────────▲───────────────────────────────┘
│
acquire lock() ┌─────┴─────┐ acquire lock()
│ lock │
release lock() └─────┬─────┘ release lock()
│
┌───────────────▼───────────────────────────────┐
│ Timer Thread(s) │
│ - _arm_summary_timer/_emit_summary │
│ - reads & clears pending_dir_* │
└───────────────────────────────────────────────┘
7) “Early” deletion queue and absorption
recent_file_deletes (deque):
head tail
┌───────────────┬───────────────┬───────────────┬───────────────┐
│ (fp1, ts=100) │ (fp2, ts=180) │ (fp3, ts=240) │ (fp4, ts=410) │
└───────────────┴───────────────┴───────────────┴───────────────┘
DELETE dir=/A at now=300, cutoff=300-COALESCE=0.3s ⇒ 300-0.3=0.0s (example)
Absorb elements satisfying:
ts ≥ cutoff AND is_under(fp, /A)
Suppose fp1, fp2, fp3 are under /A and satisfy ts ≥ cutoff ⇒ absorbed=3
Non-matching elements are moved to a new buffer and returned to the deque.
Tables
1) Core structures
| Structure | Type | Written by | Read by | Purpose |
|---|---|---|---|---|
live_paths | set[str] | seed_index, on_created/modified | on_deleted | Index of “live” paths to know what disappeared when a folder is deleted |
recent_file_deletes | deque[(str, float)] | on_deleted(file) | on_deleted(dir) | Buffer of early file DELETEs “absorbed” by the parent directory |
pending_dir_deletes | dict[str, float] | on_deleted(dir) | _emit_summary, on_deleted(file) | Tombstone: the fact of directory deletion and start of the window |
pending_dir_counts | dict[str, int] | on_deleted(dir/file) | _emit_summary | Counter of “how many disappeared” |
pending_dir_timers | dict[str, threading.Timer] | _arm_summary_timer | _emit_summary | Deferred summary output |
2) Choosing the coalescing window COALESCE_MS
| Criterion / Window | 100 ms | 300 ms (recommended) | 700 ms |
|---|---|---|---|
| Risk of “missed” files (late DELETEs) | medium | low | even lower |
| Risk of double counting | low | low | low |
| User-visible summary delay | low | moderate | noticeable |
| Timer/lock overhead | low | moderate | slightly higher |
| UX/correctness balance | undercount possible | best balance | delay is large |
| Why | fast, but fragile for batch ops | late FSEvents usually arrive within this window | makes the response feel sluggish |
Why 300 ms: on macOS, FSEvents batches cascade operations, and late deletions of child objects often arrive within ~200–250 ms; 300 ms reliably “swallows” them without a noticeable delay for the user.
Limitations and threading notes
- All structures are modified under
lockbecause watchdog callbacks and timers run in different threads. - For very large trees,
seed_indexmay take time. This is the cost of a correct “directory deletion summary”. - If the user deleted a tree via a tool that doesn’t emit child events, we are still correct: we get the count from
live_paths.
Full code with comments
Above, we explained only the key functions. Below is the full listing where the comments already contain detailed explanations of all parts.
import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# ============================================================================
# VARIABLES
# ============================================================================
COALESCE_MS = 300
'''
Coalescing window (ms): within this time we "merge" a burst of deletions.
'''
pending_dir_deletes: dict[str, float] = {} # directory -> deletion time
pending_dir_counts: dict[str, int] = {} # directory -> number of removed objects
pending_dir_timers: dict[str, threading.Timer] = {} # directory -> summary timer
'''
"Tombstones" for deleted directories and related artifacts.
pending_dir_deletes - Dictionary variable name
: dict[] - Hint for human/IDE/linter; just good practice for debugging
str - Directory path string
float/int - Numeric values depending on function context
threading.Timer - Timer class
= {} - Dictionary is created empty
'''
recent_file_deletes: deque[tuple[str, float]] = deque()
'''
Buffer of "early" file deletions (in case DELETEs for files arrive first,
and the directory DELETE arrives later).
recent_file_deletes - Name of the double-ended queue variable
: deque[] - Hint for human/IDE/linter; just good practice for debugging
tuple[] - Immutable, ordered tuple with flexible objects
str - Path string
float - Floating numbers, we use for timestamps
= deque() - Queue is created empty
'''
live_paths: set[str] = set()
'''
Lightweight index of live paths; contains absolute normalized paths of all files
and directories we consider "existing".
Needed on macOS to count how many items inside a folder disappeared, even if the OS
didn't send separate events for children.
live_paths - Name of the set variable
: set[] - Hint for human/IDE/linter; just good practice for debugging
str - Path string
= set() - Set is created empty
'''
lock = threading.Lock()
'''
Protects critical sections where shared resources are accessed and modified.
By acquiring the lock before entering a critical section, we ensure only one thread
executes that code at a time, preventing data races.
Lock() - Create an instance. By default the lock is not acquired until a thread gets it
acquire() - Acquire the lock
release() - Release the lock
with lock: - Equivalent to acquire() at block start and release() on exit
'''
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def stamp():
return dt.now().strftime("%Y-%m-%d %H:%M:%S")
'''
Convenient timestamp formatting
'''
def norm(path: str) -> str:
return str(Path(path).resolve())
'''
Create an absolute normalized path for consistent keys.
norm - Function name
(path: str) - Parameter path, the function expects a string
-> str - Function returns a string
str(Path(path).resolve()) - Convert to full path and to string
'''
def is_under(child: str, parent: str) -> bool:
try:
Path(child).resolve().relative_to(Path(parent).resolve())
return True
except Exception:
return False
'''
Check: does child lie inside parent?
is_under - Function name
(child: str, parent: str) - Two string parameters
-> bool - Function returns a bool
Path(child).resolve().relative_to(Path(parent).resolve()) - Compare resolved paths to see
if one is under the other.
If it runs without errors → True.
On error → False.
'''
def seed_index(root: str):
root_abs = norm(root)
with lock:
live_paths.add(root_abs)
for dirpath, dirnames, filenames in os.walk(root_abs):
dp = norm(dirpath)
live_paths.add(dp)
for name in dirnames:
live_paths.add(norm(os.path.join(dp, name)))
for name in filenames:
live_paths.add(norm(os.path.join(dp, name)))
'''
Initialize live_paths index with the current disk state.
Important for correct counts if the folder already contains something at start.
seed_index - Function name
(root: str) - Parameter root, expected to be str
root_abs = norm(root) - Normalize root path string
with lock: - Lock the shared live_paths
live_paths.add(root_abs) - Add root path to live_paths
for dirpath, dirnames, filenames in os.walk(root_abs) - Walk entire tree
dp = norm(dirpath) - Normalize current dir path
live_paths.add(dp) - Add directory path to live_paths
for name in dirnames - Iterate subdirectories
live_paths.add(norm(os.path.join(dp, name))) - Add each subdirectory
for name in filenames - Iterate files
live_paths.add(norm(os.path.join(dp, name))) - Add each file
'''
def _emit_summary(dir_key: str):
with lock:
count = pending_dir_counts.pop(dir_key, 0)
print(f"{stamp()} [Summary] Directory {dir_key} deleted; {count} objects removed with it")
pending_dir_deletes.pop(dir_key, None)
t = pending_dir_timers.pop(dir_key, None)
if t:
t.cancel()
'''
Fires on timer: prints the summary and clears per-directory state.
_emit_summary - Function name
(dir_key: str) - Parameter dir_key, expected to be str
with lock: - Lock the shared state
count = pending_dir_counts.pop(dir_key, 0) - Pop count (0 if missing)
pending_dir_deletes.pop(dir_key, None) - Clean tombstone
t = pending_dir_timers.pop(dir_key, None) - Clean timer
if t: t.cancel() - Cancel timer if present
'''
def _arm_summary_timer(dir_key: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_dir_timers.get(dir_key)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
pending_dir_timers[dir_key] = t
t.start()
'''
(Re)start the summary timer for the directory.
_arm_summary_timer - Function name
(dir_key: str) - Parameter dir_key, expected to be str
ttl = COALESCE_MS / 1000.0 - Convert ms to seconds
with lock: - Lock shared state
old = pending_dir_timers.get(dir_key) - Get existing timer
if old: old.cancel() - Cancel if exists
t = threading.Timer(...) - Create a new timer that calls _emit_summary
pending_dir_timers[dir_key] = t - Store the timer
t.start() - Start timer
'''
# ============================================================================
# EVENT HANDLER
# ============================================================================
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
p = norm(event.src_path)
kind = 'Directory' if event.is_directory else "File"
with lock:
live_paths.add(p)
print(f"{stamp()} [Modified] {kind}: {p}")
def on_moved(self, event):
p = norm(event.src_path)
p_dest = norm(event.dest_path)
kind = 'Directory' if event.is_directory else "File"
print(f"{stamp()} [Moved] {kind}: {p} -> {p_dest}")
def on_created(self, event):
p = norm(event.src_path)
kind = 'Directory' if event.is_directory else "File"
with lock:
live_paths.add(p)
print(f"{stamp()} [Created] {kind}: {p}")
def on_deleted(self, event): # Function called on deletion
p = norm(event.src_path) # Normalize event path
if event.is_directory: # Is it a directory path?
kind = 'Directory' # Used in logs
now = time.time() # Current time as float
cutoff = now - COALESCE_MS / 1000.0 # Older than this won't be absorbed
absorbed_from_buffer = 0 # Counter of deletions before dir delete
with lock: # Single-thread access
pending_dir_deletes[p] = now # Add tombstone for dir p
pending_dir_counts[p] = 0 # Reset accumulated count
global recent_file_deletes # We'll reassign the global deque
newbuf = deque() # Keep only unrelated/too-old records
for fp, ts in recent_file_deletes: # Iterate prior file deletions
if ts >= cutoff and is_under(fp, p): # If recent and under the deleted dir
absorbed_from_buffer += 1 # Increment absorbed counter
else: # Otherwise
newbuf.append((fp, ts)) # Keep in the new buffer
recent_file_deletes = newbuf # Replace buffer
descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)] # All live descendants of p
count_index = len(descendants) # Count them
for lp in descendants: # Remove descendants
live_paths.discard(lp)
live_paths.discard(p) # Remove the directory itself
pending_dir_counts[p] = count_index + absorbed_from_buffer # Final count
print(f"{stamp()} [Deleted] {kind}: {p}") # Log directory deletion
_arm_summary_timer(p) # Schedule summary for the dir
else: # A file was deleted
kind = 'File' # Log will say file
abs_path = p # Absolute path
now = time.time() # Current time as float
with lock: # Single-thread access
recent_file_deletes.append((abs_path, now)) # Put (path, time) into early-deletes buffer
was_present = abs_path in live_paths # Mark if it existed
live_paths.discard(abs_path) # Remove from live set
parents = [d for d in pending_dir_deletes if is_under(abs_path, d)] # Is it under a tombstoned dir?
parent = max(parents, key=len) if parents else None # Choose the deepest parent
if parent and was_present: # Only if the file really existed
pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1 # Increment parent's count
if parent: # If tied to a parent dir
_arm_summary_timer(parent) # Suppress standalone log; extend parent's timer
return
print(f"{stamp()} [Deleted] {kind}: {abs_path}") # Log standalone file deletion
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == "__main__":
path = "." # Watch path
seed_index(path)
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
print(f"Watching {Path(path).resolve()} started. Press Ctrl+C to exit.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()