Вступление
В --> прошлой статье <-- я подробно разбирал коалесинг при удалении папок и файлов. Там мы писали полноценный механизм, который собирает лавину событий в одно понятное сообщение. Теперь настала очередь перемещений.
Логика похожа: если мы двигаем папку, нужно не завалить лог десятками событий о каждом вложенном файле, а красиво вывести одно сообщение вида:
[Перемещено] Папка: old_path -> new_path
[Сводка] Перемещена папка old_path -> new_path; вместе с ней перемещено N объектов
Быстрая реализация
Чтобы реализовать это, я добавил несколько функций и структур, очень похожих на то, что было в on_deleted(), но с поправкой: у нас есть старый путь и новый путь.
Новые переменные
pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()
Они работают так же, как структуры для удаления, но дополнительно храним dst_dir — куда именно поехала папка.
Вспомогательные функции
def _emit_move_summary(src_dir: str):
with lock:
count = pending_move_counts.pop(src_dir, 0)
ts_dst = pending_dir_moves.pop(src_dir, None)
dst_dir = ts_dst[1] if ts_dst else None
t = pending_move_timers.pop(src_dir, None)
if t:
t.cancel()
if dst_dir:
print(f"{stamp()} [Сводка] Перемещена папка {src_dir} -> {dst_dir}; вместе с ней перемещено {count} объектов")
else:
print(f"{stamp()} [Сводка] Перемещена папка {src_dir}; вместе с ней перемещено {count} объектов")
def _arm_move_summary_timer(src_dir: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_move_timers.get(src_dir)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
pending_move_timers[src_dir] = t
t.start()
def emit_move_event(old_path: str, new_path: str, is_dir: bool):
kind = 'Папка' if is_dir else "Файл"
with lock:
live_paths.discard(old_path)
live_paths.add(new_path)
print(f"{stamp()} [Перемещено] {kind}: {old_path} -> {new_path}")
Эти функции почти копия _emit_summary() и _arm_summary_timer(), только печатают перемещение и используют пару src_dir -> dst_dir.
Изменение функции on_moved()
Ключевая часть — обработчик перемещений:
- Если двигается папка:
- Сохраняем src и dst.
- Переносим всех потомков в индексе live_paths с учётом нового пути.
- Считаем количество объектов (потомки + файлы из буфера).
- Выводим лог и запускаем таймер сводки.
- Если двигается файл:
- Добавляем в буфер.
- Проверяем, не двигается ли сейчас родительская папка. Если да — не печатаем лог, а просто увеличиваем её счётчик.
Таким образом, поведение стало идентичным удалению, только теперь мы не выкидываем объекты, а «пересаживаем» их на новое место.
Отличия от on_deleted()
В on_deleted() мы работали только с одним путём — старым. Здесь же нужно помнить и старый (src), и новый (dst). Это основное отличие: мы переносим индекс (live_paths) вместо удаления.
Всё остальное — логика таймеров, счётчиков и буфера — полностью совпадает с тем, что было реализовано ранее.
Заключение
Эта функция далась проще, чем предыдущая с удалением. Всё потому, что концепцию коалесинга я уже отработал, и оставалось только добавить работу со старым и новым путём.
Конечный код:
import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# ============================================================================
# ПЕРЕМЕННЫЕ
# ============================================================================
COALESCE_MS = 300
# --- Коалесинг удалений ---
pending_dir_deletes: dict[str, float] = {}
pending_dir_counts: dict[str, int] = {}
pending_dir_timers: dict[str, threading.Timer] = {}
recent_file_deletes: deque[tuple[str, float]] = deque()
# --- Коалесинг перемещений ---
pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()
live_paths: set[str] = set()
lock = threading.Lock()
# ============================================================================
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ============================================================================
def stamp():
return dt.now().strftime("%Y-%m-%d %H:%M:%S")
def norm(path: str) -> str:
return str(Path(path).resolve())
def is_under(child: str, parent: str) -> bool:
try:
Path(child).resolve().relative_to(Path(parent).resolve())
return True
except Exception:
return False
def seed_index(root: str):
root_abs = norm(root)
with lock:
live_paths.add(root_abs)
for dirpath, dirnames, filenames in os.walk(root_abs):
dp = norm(dirpath)
live_paths.add(dp)
for name in dirnames:
live_paths.add(norm(os.path.join(dp, name)))
for name in filenames:
live_paths.add(norm(os.path.join(dp, name)))
# ------------------------- УДАЛЕНИЯ -------------------------------
def _emit_summary(dir_key: str):
with lock:
count = pending_dir_counts.pop(dir_key, 0)
print(f"{stamp()} [Сводка] Удалена папка {dir_key}; вместе с ней удалено {count} объектов")
pending_dir_deletes.pop(dir_key, None)
t = pending_dir_timers.pop(dir_key, None)
if t:
t.cancel()
def _arm_summary_timer(dir_key: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_dir_timers.get(dir_key)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
pending_dir_timers[dir_key] = t
t.start()
# ------------------------- ПЕРЕМЕЩЕНИЯ ----------------------------
def _emit_move_summary(src_dir: str):
with lock:
count = pending_move_counts.pop(src_dir, 0)
ts_dst = pending_dir_moves.pop(src_dir, None)
dst_dir = ts_dst[1] if ts_dst else None
t = pending_move_timers.pop(src_dir, None)
if t:
t.cancel()
if dst_dir:
print(f"{stamp()} [Сводка] Перемещена папка {src_dir} -> {dst_dir}; вместе с ней перемещено {count} объектов")
else:
print(f"{stamp()} [Сводка] Перемещена папка {src_dir}; вместе с ней перемещено {count} объектов")
def _arm_move_summary_timer(src_dir: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_move_timers.get(src_dir)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
pending_move_timers[src_dir] = t
t.start()
def emit_move_event(old_path: str, new_path: str, is_dir: bool):
kind = 'Папка' if is_dir else "Файл"
with lock:
live_paths.discard(old_path)
live_paths.add(new_path)
print(f"{stamp()} [Перемещено] {kind}: {old_path} -> {new_path}")
# ============================================================================
# ОБРАБОТЧИК СОБЫТИЙ
# ============================================================================
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
p = norm(event.src_path)
kind = 'Папка' if event.is_directory else "Файл"
with lock:
live_paths.add(p)
print(f"{stamp()} [Изменено] {kind}: {p}")
def on_moved(self, event):
src = norm(event.src_path)
dst = norm(event.dest_path)
now = time.time()
if event.is_directory:
kind = 'Папка'
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_moves[src] = (now, dst)
pending_move_counts[src] = 0
global recent_file_moves
newbuf = deque()
for fp, ts in recent_file_moves:
if ts >= cutoff and is_under(fp, src):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_moves = newbuf
descendants = [lp for lp in live_paths if lp != src and is_under(lp, src)]
count_index = len(descendants)
for lp in descendants:
rel = os.path.relpath(lp, src)
new_lp = norm(os.path.join(dst, rel))
live_paths.discard(lp)
live_paths.add(new_lp)
live_paths.discard(src)
live_paths.add(dst)
pending_move_counts[src] = count_index + absorbed_from_buffer
print(f"{stamp()} [Перемещено] {kind}: {src} -> {dst}")
_arm_move_summary_timer(src)
return
kind = 'Файл'
with lock:
recent_file_moves.append((src, now))
was_present = src in live_paths
live_paths.discard(src)
live_paths.add(dst)
parents = [d for d in pending_dir_moves.keys() if is_under(src, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_move_counts[parent] = pending_move_counts.get(parent, 0) + 1
if parent:
_arm_move_summary_timer(parent)
return
print(f"{stamp()} [Перемещено] {kind}: {src} -> {dst}")
def on_created(self, event):
p = norm(event.src_path)
kind = 'Папка' if event.is_directory else "Файл"
with lock:
live_paths.add(p)
print(f"{stamp()} [Создано] {kind}: {p}")
def on_deleted(self, event):
p = norm(event.src_path)
if event.is_directory:
kind = 'Папка'
now = time.time()
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_deletes[p] = now
pending_dir_counts[p] = 0
global recent_file_deletes
newbuf = deque()
for fp, ts in recent_file_deletes:
if ts >= cutoff and is_under(fp, p):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_deletes = newbuf
descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]
count_index = len(descendants)
for lp in descendants:
live_paths.discard(lp)
live_paths.discard(p)
pending_dir_counts[p] = count_index + absorbed_from_buffer
print(f"{stamp()} [Удалено] {kind}: {p}")
_arm_summary_timer(p)
else:
kind = 'Файл'
abs_path = p
now = time.time()
with lock:
recent_file_deletes.append((abs_path, now))
was_present = abs_path in live_paths
live_paths.discard(abs_path)
parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1
if parent:
_arm_summary_timer(parent)
return
print(f"{stamp()} [Удалено] {kind}: {abs_path}")
# ============================================================================
# ТОЧКА ВХОДА
# ============================================================================
if __name__ == "__main__":
path = "."
seed_index(path)
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
print(f"Слежение за {Path(path).resolve()} запущено. Нажмите Ctrl+C для выхода.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()