Introduction
In --> the previous article <-- I took a detailed look at coalescing when deleting folders and files. There, we wrote a complete mechanism that collects a storm of events into one understandable message. Now it’s time to handle moves.
The logic is similar: if we move a folder, we don’t want to flood the log with dozens of events for each nested file. Instead, we want one neat message like this:
[Moved] Folder: old_path -> new_path
[Summary] Folder moved old_path -> new_path; along with it N objects were moved
Quick implementation
To implement this, I added several functions and structures very similar to what we had in on_deleted(), but with a caveat: we have both the old path and the new path.
New variables
pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()
They work just like the structures for deletion, but additionally we store dst_dir — where exactly the folder was moved.
Helper functions
def _emit_move_summary(src_dir: str):
with lock:
count = pending_move_counts.pop(src_dir, 0)
ts_dst = pending_dir_moves.pop(src_dir, None)
dst_dir = ts_dst[1] if ts_dst else None
t = pending_move_timers.pop(src_dir, None)
if t:
t.cancel()
if dst_dir:
print(f"{stamp()} [Summary] Folder moved {src_dir} -> {dst_dir}; along with it {count} objects were moved")
else:
print(f"{stamp()} [Summary] Folder moved {src_dir}; along with it {count} objects were moved")
def _arm_move_summary_timer(src_dir: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_move_timers.get(src_dir)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
pending_move_timers[src_dir] = t
t.start()
def emit_move_event(old_path: str, new_path: str, is_dir: bool):
kind = 'Folder' if is_dir else "File"
with lock:
live_paths.discard(old_path)
live_paths.add(new_path)
print(f"{stamp()} [Moved] {kind}: {old_path} -> {new_path}")
These functions are almost a copy of _emit_summary() and _arm_summary_timer(), but they print a move and use the pair src_dir -> dst_dir.
Modifying the on_moved() function
The key part is the move handler:
- If a folder is moved:
- Save src and dst.
- Move all descendants in the live_paths index according to the new path.
- Count the number of objects (descendants + files from the buffer).
- Print the log and start the summary timer.
- If a file is moved:
- Add it to the buffer.
- Check whether the parent folder is currently being moved. If so, don’t print a log; just increment the folder’s counter.
Thus, the behavior becomes identical to deletion, except now we don’t throw objects away — we “replant” them to a new place.
Differences from on_deleted()
In on_deleted() we worked with only one path — the old one. Here we need to keep both the old (src) and the new (dst). This is the main difference: we move the index (live_paths) instead of deleting.
Everything else — the logic of timers, counters, and the buffer — fully matches what was implemented earlier.
Conclusion
This function was easier than the previous one with deletion. That’s because I had already worked out the coalescing concept, and all that remained was to add handling of the old and new paths.
Final code:
import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# ============================================================================
# VARIABLES
# ============================================================================
COALESCE_MS = 300
# --- Deletion coalescing ---
pending_dir_deletes: dict[str, float] = {}
pending_dir_counts: dict[str, int] = {}
pending_dir_timers: dict[str, threading.Timer] = {}
recent_file_deletes: deque[tuple[str, float]] = deque()
# --- Move coalescing ---
pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()
live_paths: set[str] = set()
lock = threading.Lock()
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def stamp():
return dt.now().strftime("%Y-%m-%d %H:%M:%S")
def norm(path: str) -> str:
return str(Path(path).resolve())
def is_under(child: str, parent: str) -> bool:
try:
Path(child).resolve().relative_to(Path(parent).resolve())
return True
except Exception:
return False
def seed_index(root: str):
root_abs = norm(root)
with lock:
live_paths.add(root_abs)
for dirpath, dirnames, filenames in os.walk(root_abs):
dp = norm(dirpath)
live_paths.add(dp)
for name in dirnames:
live_paths.add(norm(os.path.join(dp, name)))
for name in filenames:
live_paths.add(norm(os.path.join(dp, name)))
# ------------------------- DELETIONS -------------------------------
def _emit_summary(dir_key: str):
with lock:
count = pending_dir_counts.pop(dir_key, 0)
print(f"{stamp()} [Summary] Folder deleted {dir_key}; along with it {count} objects were deleted")
pending_dir_deletes.pop(dir_key, None)
t = pending_dir_timers.pop(dir_key, None)
if t:
t.cancel()
def _arm_summary_timer(dir_key: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_dir_timers.get(dir_key)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
pending_dir_timers[dir_key] = t
t.start()
# ------------------------- MOVES ----------------------------
def _emit_move_summary(src_dir: str):
with lock:
count = pending_move_counts.pop(src_dir, 0)
ts_dst = pending_dir_moves.pop(src_dir, None)
dst_dir = ts_dst[1] if ts_dst else None
t = pending_move_timers.pop(src_dir, None)
if t:
t.cancel()
if dst_dir:
print(f"{stamp()} [Summary] Folder moved {src_dir} -> {dst_dir}; along with it {count} objects were moved")
else:
print(f"{stamp()} [Summary] Folder moved {src_dir}; along with it {count} objects were moved")
def _arm_move_summary_timer(src_dir: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_move_timers.get(src_dir)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
pending_move_timers[src_dir] = t
t.start()
def emit_move_event(old_path: str, new_path: str, is_dir: bool):
kind = 'Folder' if is_dir else "File"
with lock:
live_paths.discard(old_path)
live_paths.add(new_path)
print(f"{stamp()} [Moved] {kind}: {old_path} -> {new_path}")
# ============================================================================
# EVENT HANDLER
# ============================================================================
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
p = norm(event.src_path)
kind = 'Folder' if event.is_directory else "File"
with lock:
live_paths.add(p)
print(f"{stamp()} [Modified] {kind}: {p}")
def on_moved(self, event):
src = norm(event.src_path)
dst = norm(event.dest_path)
now = time.time()
if event.is_directory:
kind = 'Folder'
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_moves[src] = (now, dst)
pending_move_counts[src] = 0
global recent_file_moves
newbuf = deque()
for fp, ts in recent_file_moves:
if ts >= cutoff and is_under(fp, src):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_moves = newbuf
descendants = [lp for lp in live_paths if lp != src and is_under(lp, src)]
count_index = len(descendants)
for lp in descendants:
rel = os.path.relpath(lp, src)
new_lp = norm(os.path.join(dst, rel))
live_paths.discard(lp)
live_paths.add(new_lp)
live_paths.discard(src)
live_paths.add(dst)
pending_move_counts[src] = count_index + absorbed_from_buffer
print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")
_arm_move_summary_timer(src)
return
kind = 'File'
with lock:
recent_file_moves.append((src, now))
was_present = src in live_paths
live_paths.discard(src)
live_paths.add(dst)
parents = [d for d in pending_dir_moves.keys() if is_under(src, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_move_counts[parent] = pending_move_counts.get(parent, 0) + 1
if parent:
_arm_move_summary_timer(parent)
return
print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")
def on_created(self, event):
p = norm(event.src_path)
kind = 'Folder' if event.is_directory else "File"
with lock:
live_paths.add(p)
print(f"{stamp()} [Created] {kind}: {p}")
def on_deleted(self, event):
p = norm(event.src_path)
if event.is_directory:
kind = 'Folder'
now = time.time()
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_deletes[p] = now
pending_dir_counts[p] = 0
global recent_file_deletes
newbuf = deque()
for fp, ts in recent_file_deletes:
if ts >= cutoff and is_under(fp, p):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_deletes = newbuf
descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]
count_index = len(descendants)
for lp in descendants:
live_paths.discard(lp)
live_paths.discard(p)
pending_dir_counts[p] = count_index + absorbed_from_buffer
print(f"{stamp()} [Deleted] {kind}: {p}")
_arm_summary_timer(p)
else:
kind = 'File'
abs_path = p
now = time.time()
with lock:
recent_file_deletes.append((abs_path, now))
was_present = abs_path in live_paths
live_paths.discard(abs_path)
parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1
if parent:
_arm_summary_timer(parent)
return
print(f"{stamp()} [Deleted] {kind}: {abs_path}")
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == "__main__":
path = "."
seed_index(path)
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
print(f"Watching {Path(path).resolve()} started. Press Ctrl+C to exit.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()