import os import sys import hashlib import shutil import threading import queue import logging from logging.handlers import RotatingFileHandler from datetime import datetime import tkinter as tk from tkinter import ttk, filedialog, messagebox import configparser # ---- Windows drive helpers need ctypes ---- try: import ctypes HAS_CTYPES = True except Exception: HAS_CTYPES = False # ---------------------------- # Configuration / Constants # ---------------------------- DEFAULT_SOURCE = r"C:\Recordings" ARCHIVE_ROOT_NAME = "Archive" # created under destination root LOG_FILE = "archiver.log" INI_FILE = "archiver.ini" READ_CHUNK_SIZE = 8 * 1024 * 1024 # 8MB UI_POLL_MS = 100 # Default extensions if INI is absent (comma-separated list in INI) DEFAULT_EXTENSIONS = [ ".mkv", ".mp4", ".mov", ".avi", ".mxf", ".mp3", ".wav", ".flac", ".m4a" ] # ---------------------------- # Logging setup # ---------------------------- logger = logging.getLogger("archiver") logger.setLevel(logging.INFO) handler = RotatingFileHandler(LOG_FILE, maxBytes=2_000_000, backupCount=3, encoding="utf-8") formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) # ---------------------------- # Windows drive helpers # ---------------------------- def _list_drive_letters_ctypes(): drives = [] if not HAS_CTYPES: return drives bitmask = ctypes.windll.kernel32.GetLogicalDrives() for i in range(26): if bitmask & (1 << i): drives.append(f"{chr(65 + i)}:\\") return drives def _get_drive_type(root): # 2=removable, 3=fixed, 4=network, 5=CD-ROM, 6=RAM if not HAS_CTYPES: return 0 return ctypes.windll.kernel32.GetDriveTypeW(str(root)) def _human_drive_type(drive_type): return { 2: "Removable", 3: "Fixed", 4: "Network", 5: "CD/DVD", 6: "RAM", 0: "Unknown" }.get(drive_type, "Unknown") def detect_archive_targets(): """ Show likely archive targets: Removable, Fixed (non-system), and Network. Skip system drive and CD/DVD. Verify readiness by listing root. """ targets = [] system_drive = (os.environ.get("SystemDrive") or "C:") + "\\" letters = _list_drive_letters_ctypes() if HAS_CTYPES else [f"{chr(65+i)}:\\" for i in range(2,26)] for root in letters: if root.upper() == system_drive.upper(): continue try: dtype = _get_drive_type(root) if dtype in (2, 3, 4): # Removable, Fixed, Network try: os.listdir(root) # ensure accessible except Exception: continue targets.append((root, _human_drive_type(dtype))) except Exception: continue targets.sort(key=lambda t: t[0]) return targets # ---------------------------- # File operations # ---------------------------- def compute_md5(file_path, stop_flag=None): hasher = hashlib.md5() with open(file_path, "rb") as f: while True: if stop_flag and stop_flag.is_set(): return None chunk = f.read(READ_CHUNK_SIZE) if not chunk: break hasher.update(chunk) return hasher.hexdigest() def copy_with_progress(src, dst, progress_callback=None, stop_flag=None): total_size = os.path.getsize(src) copied = 0 with open(src, "rb") as fsrc, open(dst, "wb") as fdst: while True: if stop_flag and stop_flag.is_set(): return False buf = fsrc.read(READ_CHUNK_SIZE) if not buf: break fdst.write(buf) copied += len(buf) if progress_callback and total_size > 0: progress_callback(copied, total_size) try: shutil.copystat(src, dst) except Exception: pass return True def ensure_dir(path): os.makedirs(path, exist_ok=True) def date_folders_for(path): ts = os.path.getmtime(path) dt = datetime.fromtimestamp(ts) year = f"{dt.year:04d}" ymd = dt.strftime("%Y-%m-%d") return year, ymd # ---------------------------- # Worker thread & UI messaging # ---------------------------- class ArchiverWorker(threading.Thread): def __init__(self, src_dir, dest_root, files, ui_queue, archive_root_name=ARCHIVE_ROOT_NAME): super().__init__(daemon=True) self.src_dir = src_dir self.dest_root = dest_root self.files = files # list of filenames (relative to src_dir) self.ui_queue = ui_queue self.stop_flag = threading.Event() self.archive_root_name = archive_root_name def log(self, level, msg): logger.log(level, msg) self.ui_queue.put(("log", msg)) def progress(self, filename, bytes_done, bytes_total): self.ui_queue.put(("progress_file", filename, bytes_done, bytes_total)) def overall_progress(self, done, total): self.ui_queue.put(("progress_overall", done, total)) def run(self): total_files = len(self.files) files_done = 0 self.overall_progress(files_done, total_files) for relname in self.files: if self.stop_flag.is_set(): self.log(logging.WARNING, "Archiving cancelled by user.") break src = os.path.join(self.src_dir, relname) if not os.path.isfile(src): self.log(logging.WARNING, f"Skipping missing: {src}") files_done += 1 self.overall_progress(files_done, total_files) continue try: self.log(logging.INFO, f"Starting: {src}") self.ui_queue.put(("status", f"Hashing source: {relname}")) md5_src = compute_md5(src, self.stop_flag) if md5_src is None: self.log(logging.warning, f"Cancelled while hashing: {src}") break self.log(logging.INFO, f"Source MD5: {md5_src}") year, ymd = date_folders_for(src) dest_dir = os.path.join(self.dest_root, self.archive_root_name, year, ymd) ensure_dir(dest_dir) dest = os.path.join(dest_dir, os.path.basename(src)) if os.path.exists(dest): self.ui_queue.put(("status", f"Found existing: {os.path.relpath(dest, self.dest_root)}; verifying")) md5_existing = compute_md5(dest, self.stop_flag) if md5_existing == md5_src: self.log(logging.INFO, f"Destination already has identical file. Removing source: {src}") os.remove(src) files_done += 1 self.overall_progress(files_done, total_files) continue else: base, ext = os.path.splitext(dest) idx = 1 new_dest = f"{base} ({idx}){ext}" while os.path.exists(new_dest): idx += 1 new_dest = f"{base} ({idx}){ext}" dest = new_dest self.log(logging.INFO, f"Destination exists; will use: {dest}") self.ui_queue.put(("status", f"Copying: {relname}")) ok = copy_with_progress(src, dest, progress_callback=lambda d, t: self.progress(relname, d, t), stop_flag=self.stop_flag) if not ok: self.log(logging.WARNING, f"Cancelled while copying: {src}") break self.ui_queue.put(("status", f"Verifying: {relname}")) md5_dst = compute_md5(dest, self.stop_flag) self.log(logging.INFO, f"Dest MD5: {md5_dst}") if md5_dst != md5_src: self.log(logging.ERROR, f"MD5 mismatch! Keeping source. Src={md5_src}, Dst={md5_dst}") self.ui_queue.put(("error", f"MD5 mismatch for: {relname}. Source kept.")) try: os.remove(dest) except Exception: pass else: try: os.remove(src) self.log(logging.INFO, f"Verified & removed source: {src}") except Exception as e: self.log(logging.ERROR, f"Verified but failed to remove source: {src} ({e})") self.ui_queue.put(("error", f"Verified but failed to delete source: {relname}")) except Exception as e: self.log(logging.ERROR, f"Error on file {src}: {e}") self.ui_queue.put(("error", f"Error on {os.path.basename(src)}: {e}")) files_done += 1 self.overall_progress(files_done, total_files) self.ui_queue.put(("done", None)) # ---------------------------- # GUI # ---------------------------- class App(tk.Tk): def __init__(self): super().__init__() self.title("Generic Archiver") self.geometry("900x680") self.minsize(900, 680) self.ui_queue = queue.Queue() self.worker = None # Settings / INI self.config_path = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), INI_FILE) self.settings = self._load_settings() self.src_var = tk.StringVar(value=self.settings.get("source_dir") or DEFAULT_SOURCE) self.dest_var = tk.StringVar(value=self.settings.get("dest_dir") or "") self.extensions = self._parse_extensions(self.settings.get("extensions")) self.status_var = tk.StringVar(value="Ready.") self.dest_display_var = tk.StringVar(value=self.dest_var.get() or "No destination chosen") # Flag to suppress progress reset on programmatic selection changes self.suppress_selection_reset = False self._build_ui() self._populate_drives() # Initial scan is programmatic; do NOT reset progress on selection changes self._scan_source(user_initiated=False) # Reset progress bars only when the USER changes the selection self.tree.bind("<>", self._on_selection_changed) self.drive_combo.bind("<>", self._on_drive_selected) self.after(UI_POLL_MS, self._poll_queue) # ---------- Settings / INI ---------- def _load_settings(self): cfg = configparser.ConfigParser() data = {} try: if os.path.exists(self.config_path): cfg.read(self.config_path, encoding="utf-8") s = cfg["settings"] data["source_dir"] = s.get("source_dir", "").strip() data["dest_dir"] = s.get("dest_dir", "").strip() data["extensions"] = s.get("extensions", "").strip() else: data["source_dir"] = "" data["dest_dir"] = "" data["extensions"] = ",".join(DEFAULT_EXTENSIONS) except Exception: data["source_dir"] = "" data["dest_dir"] = "" data["extensions"] = ",".join(DEFAULT_EXTENSIONS) return data def _save_settings_if_ready(self): """ Save when the user has chosen at least a source and destination. We also persist the extensions list (editable by directly editing the INI). """ source = self.src_var.get().strip() dest = self._resolve_destination_root().strip() if not source or not dest: return cfg = configparser.ConfigParser() cfg["settings"] = { "source_dir": source, "dest_dir": dest, "extensions": ",".join(self.extensions) if self.extensions else "" } try: with open(self.config_path, "w", encoding="utf-8") as f: cfg.write(f) self._log_ui(f"Settings saved to {self.config_path}") except Exception as e: self._log_ui(f"Failed to save settings: {e}") def _parse_extensions(self, ext_str): if not ext_str: return [] parts = [p.strip().lower() for p in ext_str.split(",") if p.strip()] # Ensure they all start with '.' norm = [] for p in parts: if not p.startswith("."): p = "." + p norm.append(p) return norm # ---------- UI ---------- def _build_ui(self): pad = {"padx": 8, "pady": 6} # Source selection frm_src = ttk.Frame(self) frm_src.pack(fill="x", **pad) ttk.Label(frm_src, text="Source folder:").pack(side="left") ttk.Entry(frm_src, textvariable=self.src_var, width=70).pack(side="left", padx=6) ttk.Button(frm_src, text="Browse…", command=self._browse_src).pack(side="left") ttk.Button(frm_src, text="Rescan", command=lambda: self._scan_source(user_initiated=True)).pack(side="left", padx=(6,0)) # Destination selection (drive or folder) frm_dest = ttk.Frame(self) frm_dest.pack(fill="x", **pad) ttk.Label(frm_dest, text="Destination drive:").pack(side="left") self.drive_combo = ttk.Combobox(frm_dest, width=24, state="readonly") self.drive_combo.pack(side="left", padx=6) ttk.Button(frm_dest, text="Refresh", command=self._populate_drives).pack(side="left") ttk.Label(frm_dest, text="or pick a folder:").pack(side="left", padx=(16, 6)) ttk.Button(frm_dest, text="Pick Folder…", command=self._browse_dest).pack(side="left") # Destination indicator frm_dest_note = ttk.Frame(self) frm_dest_note.pack(fill="x", **pad) ttk.Label(frm_dest_note, text="Destination:").pack(side="left") self.dest_display_lbl = ttk.Label(frm_dest_note, textvariable=self.dest_display_var, foreground="#444") self.dest_display_lbl.pack(side="left", padx=6) # Extensions info (read-only hint) frm_ext = ttk.Frame(self) frm_ext.pack(fill="x", **pad) exts_text = ", ".join(self.extensions) if self.extensions else "(all files)" ttk.Label(frm_ext, text=f"Extensions: {exts_text} (edit in {INI_FILE})", foreground="#555").pack(side="left") # Files list (UPDATED COLUMNS: name, size, modified) frm_list = ttk.Frame(self) frm_list.pack(fill="both", expand=True, **pad) topbar = ttk.Frame(frm_list) topbar.pack(fill="x") ttk.Label(topbar, text="Files found:").pack(side="left") ttk.Button(topbar, text="Select All", command=lambda: self._select_all(True)).pack(side="left", padx=6) ttk.Button(topbar, text="Select None", command=lambda: self._select_all(False)).pack(side="left", padx=6) self.tree = ttk.Treeview(frm_list, columns=("name", "size", "modified"), show="headings", selectmode="extended") self.tree.heading("name", text="Filename") self.tree.heading("size", text="Size") self.tree.heading("modified", text="Modified") self.tree.column("name", width=440, anchor="w") self.tree.column("size", width=120, anchor="e") self.tree.column("modified", width=180, anchor="center") self.tree.pack(fill="both", expand=True) self.tree_scroll = ttk.Scrollbar(frm_list, orient="vertical", command=self.tree.yview) self.tree.configure(yscrollcommand=self.tree_scroll.set) self.tree_scroll.pack(side="right", fill="y") # Progress frm_prog = ttk.Frame(self) frm_prog.pack(fill="x", **pad) ttk.Label(frm_prog, text="File progress:").pack(anchor="w") self.pb_file = ttk.Progressbar(frm_prog, orient="horizontal", mode="determinate") self.pb_file.pack(fill="x", pady=(0, 4)) ttk.Label(frm_prog, text="Overall progress:").pack(anchor="w") self.pb_overall = ttk.Progressbar(frm_prog, orient="horizontal", mode="determinate") self.pb_overall.pack(fill="x") # Action buttons frm_actions = ttk.Frame(self) frm_actions.pack(fill="x", **pad) self.btn_start = ttk.Button(frm_actions, text="Archive Selected", command=self._start_archive) self.btn_start.pack(side="left") self.btn_cancel = ttk.Button(frm_actions, text="Cancel", command=self._cancel, state="disabled") self.btn_cancel.pack(side="left", padx=6) ttk.Label(frm_actions, textvariable=self.status_var, foreground="#555").pack(side="right") # Log output frm_log = ttk.LabelFrame(self, text="Log") frm_log.pack(fill="both", expand=False, **pad) self.txt_log = tk.Text(frm_log, height=10, wrap="none") self.txt_log.pack(fill="both", expand=True) log_scroll = ttk.Scrollbar(frm_log, orient="vertical", command=self.txt_log.yview) self.txt_log.configure(yscrollcommand=log_scroll.set) log_scroll.pack(side="right", fill="y") # If INI had a dest_dir, reflect it in the destination label if self.dest_var.get(): self.dest_display_var.set(self.dest_var.get()) # ---------- Browsers / selectors ---------- def _browse_src(self): chosen = filedialog.askdirectory(title="Choose Source Folder") if chosen: self.src_var.set(chosen) self._scan_source(user_initiated=True) self._save_settings_if_ready() # Do not reset progress here; will reset when user changes selection. def _browse_dest(self): chosen = filedialog.askdirectory(title="Choose Destination Folder") if chosen: self.dest_var.set(chosen) self.drive_combo.set("") # clear drive selection if manually chosen self.dest_display_var.set(chosen) self._log_ui(f"Manual destination set: {chosen}") self._save_settings_if_ready() def _on_drive_selected(self, _evt=None): # User picked a drive; reflect and save (if source exists) val = self.drive_combo.get() if val: dest_root = val.split()[0] # "E:\ (Fixed)" -> "E:\" self.dest_var.set("") # indicates we're using drive, not a manual folder self.dest_display_var.set(dest_root) self._save_settings_if_ready() # ---------- Drives ---------- def _populate_drives(self): targets = detect_archive_targets() items = [f"{root} ({dtype})" for root, dtype in targets] self.drive_combo["values"] = items # If INI specified a folder, leave combo alone; otherwise preselect first drive if items and not self.dest_var.get(): self.drive_combo.current(0) self.dest_display_var.set(items[0].split()[0]) elif not items and not self.dest_var.get(): self.dest_display_var.set("No destination chosen") # ---------- File list ---------- def _scan_source(self, user_initiated: bool): """ Refresh file list. If called programmatically (user_initiated=False), we suppress selection-reset behavior so progress bars don't clear. """ try: self.suppress_selection_reset = not user_initiated self.tree.delete(*self.tree.get_children()) src = self.src_var.get() if not os.path.isdir(src): self._log_ui(f"Source folder not found: {src}") self.status_var.set("Source folder not found.") return files = [] want_exts = [e.lower() for e in (self.extensions or [])] for name in os.listdir(src): full = os.path.join(src, name) if not os.path.isfile(full): continue if want_exts: _, ext = os.path.splitext(name) if ext.lower() not in want_exts: continue # skip non-matching size = os.path.getsize(full) mtime = os.path.getmtime(full) files.append((name, size, mtime)) files.sort(key=lambda x: x[2]) # by mtime ascending for name, size, mtime in files: human_size = self._human_bytes(size) mod = datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M:%S") # Use filename as iid; if duplicates exist, Treeview requires unique IDs. Add suffix if needed. iid = name if iid in self.tree.get_children(""): # unlikely here since we just cleared; keeping for safety if modified iid = f"{name}__{int(mtime)}" self.tree.insert("", "end", iid=iid, values=(name, human_size, mod)) self.status_var.set(f"Found {len(files)} file(s).") finally: # Re-enable user selection reset if it was suppressed for programmatic updates self.suppress_selection_reset = False # ---------- Selection helpers ---------- def _select_all(self, select=True): children = self.tree.get_children() if select: self.tree.selection_set(children) else: self.tree.selection_remove(children) def _get_selected_files(self): # Return filenames from the 'name' column for selected rows sels = list(self.tree.selection()) names = [] for iid in sels: vals = self.tree.item(iid, "values") if vals: names.append(vals[0]) return names def _on_selection_changed(self, _evt=None): # Only reset progress bars when the USER changed selection (not programmatic updates) if self.suppress_selection_reset: return self._reset_progress_bars() def _reset_progress_bars(self): self.pb_file.config(value=0, maximum=100) sel_count = len(self._get_selected_files()) self.pb_overall.config(value=0, maximum=(sel_count if sel_count else 1)) # ---------- Destination resolution ---------- def _resolve_destination_root(self): # If user picked a folder (dest_var), use it; otherwise use selected drive root if self.dest_var.get(): return self.dest_var.get() val = self.drive_combo.get() if val: return val.split()[0] return "" # ---------- Archive workflow ---------- def _start_archive(self): src = self.src_var.get() dst_root = self._resolve_destination_root() if not os.path.isdir(src): messagebox.showerror("Error", f"Source folder does not exist:\n{src}") return if not dst_root or not os.path.isdir(dst_root): messagebox.showerror("Error", "Please select a destination drive or folder.") return files = self._get_selected_files() if not files: messagebox.showinfo("Nothing selected", "Please select one or more files to archive.") return self._save_settings_if_ready() self.btn_start.config(state="disabled") self.btn_cancel.config(state="normal") self.pb_file.config(value=0, maximum=100) self.pb_overall.config(value=0, maximum=len(files)) self.status_var.set("Starting…") self._log_ui(f"Archiving to: {os.path.join(dst_root, ARCHIVE_ROOT_NAME)}") self.worker = ArchiverWorker(src, dst_root, files, self.ui_queue, archive_root_name=ARCHIVE_ROOT_NAME) self.worker.start() def _cancel(self): if self.worker and self.worker.is_alive(): self.worker.stop_flag.set() self._log_ui("Cancellation requested…") # ---------- Worker queue ---------- def _poll_queue(self): try: while True: msg = self.ui_queue.get_nowait() self._handle_worker_message(msg) except queue.Empty: pass self.after(UI_POLL_MS, self._poll_queue) def _handle_worker_message(self, msg): kind = msg[0] if kind == "log": self._log_ui(msg[1]) elif kind == "status": self.status_var.set(msg[1]) elif kind == "progress_file": _, filename, done, total = msg pct = int((done / total) * 100) if total else 0 self.pb_file.config(value=pct, maximum=100) elif kind == "progress_overall": _, done, total = msg self.pb_overall.config(value=done, maximum=total if total else 1) elif kind == "error": self._log_ui(f"ERROR: {msg[1]}") elif kind == "done": self._log_ui("All done.") self.status_var.set("Done.") self.btn_start.config(state="normal") self.btn_cancel.config(state="disabled") # Programmatic refresh of the list — do not reset progress bars self._scan_source(user_initiated=False) # ---------- Utils ---------- def _log_ui(self, text): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.txt_log.insert("end", f"{timestamp} {text}\n") self.txt_log.see("end") logger.info(text) @staticmethod def _human_bytes(n): for unit in ["B","KB","MB","GB","TB"]: if n < 1024.0: return f"{n:3.1f} {unit}" n /= 1024.0 return f"{n:.1f} PB" def main(): logger.info("=== Generic Archiver launched ===") app = App() app.mainloop() if __name__ == "__main__": main()