堆积如山的Jellyfin“学习资料”库有救了!这个脚本一键搞定海量文件整理

终极 改写 发布于 2025-10-26 212 次阅读


老哥们,我摊牌了。

多年的仓鼠症,让我的 Jellyfin 媒体库…呃,特别是那个存放“珍贵学习资料”的文件夹,已经变成了一片混沌废土。成千上万个文件躺在一个文件夹里,视频、封面、海报、nfo 元数据、字幕文件…像一锅煮烂了的粥,黏糊糊地搅在一起。

每次 Jellyfin 刮削器跑起来,那叫一个心惊胆战。它经常认错刮错,封面张冠李戴,看得我血压飙升。手动整理?面对几万个文件,我只想说:

“臣妾做不到啊!”

痛定思痛,作为一个懒癌晚期的程序员,解决问题的最好方法就是——写个脚本让电脑自己去干。于是,这个专门为网络存储(NAS)优化、能批量整理媒体文件的 Python 脚本诞生了。

整理前 VS 整理后

光说不练假把式,先看看效果,整理完后的海报预览图就不给了,太刺激了。

整理前,你的文件夹可能是这样的:

/珍贵学习资料/
  ├─ ABC-001.mkv
  ├─ ABC-001-poster.jpg
  ├─ ABC-001.nfo
  ├─ ABC-001-fanart.jpg
  ├─ XYZ-123.mp4
  ├─ XYZ-123.srt
  ├─ XYZ-123-poster.jpg
  ... (还有几千上万个文件)

用脚本“一键升天”后:

/珍贵学习资料/
  ├─ ABC-001/
  │   ├─ ABC-001.mkv
  │   ├─ ABC-001-poster.jpg
  │   ├─ ABC-001.nfo
  │   └─ ABC-001-fanart.jpg
  ├─ XYZ-123/
  │   ├─ XYZ-123.mp4
  │   ├─ XYZ-123.srt
  │   └─ XYZ-123-poster.jpg
  ... (文件夹们整整齐齐)

怎么样,是不是瞬间神清气爽?Jellyfin 就喜欢这种结构,每个媒体一个专属文件夹,刮削起来那叫一个稳、准、狠!(事实上我原本是为了整理里番的,近2万的文件,手动整理太痛苦了……当然其他类似的学习资料也管用)

脚本的核心思路是啥?

其实逻辑贼简单,就俩字:“认亲”

脚本会自动扫描当前目录下的所有文件,然后通过文件名来给它们找“家人”。比如,它看到 ABC-001.mkv,就会自动把 ABC-001-poster.jpgABC-001.nfo 这些文件名主体相同,只是后缀不一样的文件归为一类。

这个“认亲”过程的核心,就是一段聪明的正则表达式逻辑,它会剥离掉文件名里像 -poster-fanart_thumb 这种常见的艺术作品后缀,精准找到文件的“基础名称”。

找到一家人后,脚本会:

  1. 用这个“基础名称”创建一个新文件夹。
  2. 把这一家子(视频、图片、字幕等)整整齐齐地搬进这个新家。

为啥说它为网络文件系统 (NAS) 做了优化?

如果你直接在 NAS(比如通过 SMB 或 NFS 协议挂载的网络驱动器)上进行大量小文件操作,你会发现速度慢得感人。这是因为每一次创建文件夹、移动文件的操作,都要和服务器进行一次通信,延迟累加起来非常恐怖。

这个脚本的聪明之处在于:

  1. 第一阶段:批量创建文件夹。 它会先扫描完所有文件,计算出总共需要创建多少个文件夹,然后一口气把所有文件夹全部建好。
  2. 第二阶段:批量移动文件。 文件夹都就位后,它才开始把文件一个个挪进对应的目录里。

这种“两步走”策略,把成千上万次“创建-移动-创建-移动”的零散请求,合并成了两次大的批量操作,大大减少了网络通信的往返次数,在 NAS 上的执行效率直接起飞。

三大安全保障,放心折腾

直接操作海量文件,谁心里不打鼓?万一搞砸了,那可是“删库到跑路”级别的灾难。别怕,我给它上了三道保险:

  1. 预览模式: 正式执行前,脚本会先分析一遍,然后清晰地告诉你它打算创建哪些文件夹、移动哪些文件,让你心里有个底。
  2. 安全模式: 如果你还是不放心,可以选择“安全模式”,它会只处理前 500 个识别出的文件组。你可以先用一小部分文件试试水,看看效果对不对。
  3. 后悔药: 这是最骚的功能!每次执行整理,脚本都会生成一个详细的操作日志(一个 JSON 文件),记录了每一个文件的“搬家”轨迹。如果你整理后不满意,或者发现哪里不对劲,直接运行“回滚”功能,脚本会读取最新的日志,把所有文件原路送回,文件夹也给你删了,主打一个完璧归赵,让你有重来的勇气。

如何使用?

使用方法不能再简单了:

  1. 保存代码: 把下面的完整代码保存成一个 Python 文件(注意编码要UTF-8),比如 organizer.py
  2. 放置脚本: 把这个 organizer.py 文件丢进你那个乱成一锅粥的文件夹里。
  3. 运行它: 在这个文件夹里打开终端(在文件夹地址栏输入 cmd 或 powershell 然后回车),运行命令:python organizer.py
  4. 按提示操作: 接下来,脚本会像个贴心管家一样,一步步问你该怎么做。是直接开干,还是先看看预览?是全速前进,还是安全模式?或者,你是来吃后悔药的?跟着提示选就行了。

完整代码

Talk is cheap, show me the code. 完整代码在此,无任何第三方依赖,开箱即用。

import os
import shutil
import re
import json
import datetime
from pathlib import Path
from collections import defaultdict
import logging

# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class FastNetworkFileOrganizer:
    def __init__(self, base_path):
        self.base_path = Path(base_path)
        self.processed_count = 0
        self.created_folders = 0
        self.script_name = Path(__file__).name
        self.operation_log = []
        
        # 常见后缀模式
        self.suffix_patterns = [
            r'-fanart', r'-poster', r'-thumb', r'-cover', r'-scraper',
            r'-trailer', r'-sample', r'-preview', r'-backdrop', r'-banner',
            r'-logo', r'-discart', r'-clearart', r'-landscape', r'-theme',
            r'_fanart', r'_poster', r'_thumb', r'_cover', r'_scraper',
            r'_trailer', r'_sample', r'_preview', r'_backdrop', r'_banner',
            r'_logo', r'_discart', r'_clearart', r'_landscape', r'_theme'
        ]
        
        # 扩展文件类型分类
        self.file_types = {
            '视频': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v'],
            '字幕': ['.srt', '.ass', '.ssa', '.vtt', '.sub', '.idx'],
            '图片': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'],
            '元数据': ['.nfo', '.info', '.description', '.xml'],
            '其他': []
        }
        
        # 要排除的文件扩展名
        self.excluded_extensions = ['.py', '.pyc', '.pyo', '.exe', '.bat', '.cmd', '.sh']

    def extract_base_name(self, filename):
        """提取文件的基础名称(去除后缀和扩展名)"""
        name = Path(filename).stem
        
        # 去除常见后缀
        for pattern in self.suffix_patterns:
            name = re.sub(pattern, '', name, flags=re.IGNORECASE)
        
        return name.strip()

    def get_files_in_current_directory(self):
        """只获取当前目录下的文件,不包括子文件夹和脚本文件"""
        files = []
        try:
            for item in self.base_path.iterdir():
                if item.is_file() and item.name != self.script_name and item.suffix.lower() not in self.excluded_extensions:
                    files.append(item)
        except Exception as e:
            logger.error(f"读取目录时出错: {e}")
        
        return files

    def analyze_files(self):
        """分析文件并返回分组结果"""
        logger.info("开始分析当前目录下的文件...")
        
        files = self.get_files_in_current_directory()
        logger.info(f"找到 {len(files)} 个文件")
        
        # 按基础名称分组
        base_groups = defaultdict(list)
        for file_path in files:
            base_name = self.extract_base_name(file_path.name)
            if base_name:
                base_groups[base_name].append(file_path)
        
        # 过滤:只保留有多个文件的组
        multi_file_groups = {k: v for k, v in base_groups.items() if len(v) > 1}
        
        logger.info(f"分析完成: 找到 {len(multi_file_groups)} 个需要整理的文件夹")
        return multi_file_groups

    def preview_organization(self, file_groups, max_display=30):
        """预览整理结果"""
        print(f"\n{'='*80}")
        print(f"预览整理结果 (显示前 {min(len(file_groups), max_display)} 个)")
        print(f"{'='*80}")
        print(f"总共发现 {len(file_groups)} 个需要整理的文件夹")
        print(f"将创建 {len(file_groups)} 个新文件夹")
        print(f"将移动 {sum(len(files) for files in file_groups.values())} 个文件")
        print(f"{'='*80}")
        
        sorted_groups = sorted(file_groups.items(), key=lambda x: x[0])
        
        displayed = 0
        for base_name, files in sorted_groups:
            if displayed >= max_display:
                remaining = len(file_groups) - max_display
                print(f"\n... 还有 {remaining} 个文件夹未显示 ...")
                break
                
            folder_name = self.sanitize_folder_name(base_name)
            print(f"\n[{displayed+1}] 新建文件夹: {folder_name}/")
            print(f"  包含 {len(files)} 个文件:")
            
            for file_path in sorted(files):
                file_type = self.get_file_type(file_path)
                print(f"    - {file_path.name} [{file_type}]")
            
            displayed += 1
        
        return len(file_groups)

    def get_file_type(self, file_path):
        """获取文件类型"""
        ext = file_path.suffix.lower()
        for file_type, extensions in self.file_types.items():
            if ext in extensions:
                return file_type
        return '其他'

    def organize_files_fast(self, file_groups):
        """快速整理文件 - 针对网络文件系统优化"""
        total_groups = len(file_groups)
        total_files = sum(len(files) for files in file_groups.values())
        
        print(f"\n开始批量整理: {total_groups} 个文件夹, {total_files} 个文件")
        print("优化策略: 先创建所有文件夹,然后快速移动文件")
        
        # 创建操作日志
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        self.operation_log_path = self.base_path / f"file_organizer_log_{timestamp}.json"
        
        start_time = datetime.datetime.now()
        
        # 第一阶段: 快速创建所有文件夹
        print("\n第一阶段: 创建文件夹...")
        folder_creation_map = {}  # 映射基础名称到实际文件夹路径
        
        for base_name in file_groups.keys():
            try:
                folder_name = self.sanitize_folder_name(base_name)
                target_folder = self.base_path / folder_name
                
                if not target_folder.exists():
                    target_folder.mkdir(exist_ok=False)
                    self.created_folders += 1
                    folder_creation_map[base_name] = target_folder
                    
                    # 记录文件夹创建
                    self.operation_log.append({
                        "type": "create_folder",
                        "folder_path": str(target_folder),
                        "timestamp": datetime.datetime.now().isoformat()
                    })
                else:
                    folder_creation_map[base_name] = target_folder
                    
            except Exception as e:
                logger.error(f"创建文件夹 {base_name} 时出错: {e}")
        
        print(f"文件夹创建完成: {self.created_folders} 个新文件夹")
        
        # 第二阶段: 快速移动文件
        print("\n第二阶段: 移动文件...")
        processed_files = 0
        
        for base_name, files in file_groups.items():
            target_folder = folder_creation_map.get(base_name)
            if not target_folder:
                continue
                
            for file_path in files:
                try:
                    # 直接移动,不检查冲突(速度优化)
                    target_path = target_folder / file_path.name
                    
                    # 记录操作
                    self.operation_log.append({
                        "type": "move_file",
                        "original_path": str(file_path),
                        "new_path": str(target_path),
                        "timestamp": datetime.datetime.now().isoformat()
                    })
                    
                    # 直接移动(假设没有冲突)
                    shutil.move(str(file_path), str(target_path))
                    self.processed_count += 1
                    processed_files += 1
                    
                    # 每100个文件显示一次进度
                    if processed_files % 100 == 0:
                        elapsed = (datetime.datetime.now() - start_time).total_seconds()
                        speed = processed_files / elapsed if elapsed > 0 else 0
                        print(f"进度: {processed_files}/{total_files} 文件 ({processed_files/total_files*100:.1f}%) - 速度: {speed:.1f} 文件/秒")
                        
                except Exception as e:
                    logger.error(f"移动文件 {file_path} 时出错: {e}")
        
        # 保存操作日志
        self.save_operation_log()
        
        total_time = (datetime.datetime.now() - start_time).total_seconds()
        print(f"\n整理完成!")
        print(f"创建了 {self.created_folders} 个文件夹")
        print(f"移动了 {self.processed_count} 个文件")
        print(f"总耗时: {total_time:.2f} 秒")
        print(f"平均速度: {self.processed_count/total_time:.1f} 文件/秒")
        print(f"操作日志: {self.operation_log_path}")

    def save_operation_log(self):
        """保存操作日志到文件"""
        try:
            with open(self.operation_log_path, 'w', encoding='utf-8') as f:
                json.dump(self.operation_log, f, ensure_ascii=False, indent=2)
        except Exception as e:
            logger.error(f"保存操作日志时出错: {e}")

    def rollback_operation(self):
        """回滚操作"""
        log_files = list(self.base_path.glob("file_organizer_log_*.json"))
        if not log_files:
            print("未找到操作日志文件")
            return
        
        log_files.sort(key=os.path.getmtime, reverse=True)
        log_file_path = log_files[0]
        
        print(f"准备从日志文件回滚: {log_file_path}")
        confirm = input("确认执行回滚操作?(y/n): ").lower().strip()
        if confirm != 'y':
            print("回滚操作已取消")
            return
        
        try:
            with open(log_file_path, 'r', encoding='utf-8') as f:
                operation_log = json.load(f)
        except Exception as e:
            print(f"读取操作日志时出错: {e}")
            return
        
        # 按时间倒序回滚
        rollback_operations = sorted(operation_log, key=lambda x: x['timestamp'], reverse=True)
        
        rollback_count = 0
        for op in rollback_operations:
            try:
                if op['type'] == 'move_file' and os.path.exists(op['new_path']):
                    shutil.move(op['new_path'], op['original_path'])
                    rollback_count += 1
                elif op['type'] == 'create_folder':
                    folder_path = Path(op['folder_path'])
                    if folder_path.exists() and not any(folder_path.iterdir()):
                        folder_path.rmdir()
            except Exception as e:
                print(f"回滚操作失败: {e}")
        
        print(f"回滚完成: 成功回滚 {rollback_count} 个操作")
        
        delete_log = input("是否删除操作日志文件?(y/n): ").lower().strip()
        if delete_log == 'y':
            try:
                os.remove(log_file_path)
                print(f"已删除日志文件")
            except Exception as e:
                print(f"删除日志文件时出错: {e}")

    def sanitize_folder_name(self, name):
        """清理文件夹名称"""
        illegal_chars = r'[<>:"/\\|?*]'
        sanitized = re.sub(illegal_chars, '_', name)
        return sanitized[:200] if len(sanitized) > 200 else sanitized

def main():
    print("快速文件整理工具 (最终版)")
    print("专为网络文件系统优化,支持回滚")
    print("=" * 50)
    
    current_dir = os.getcwd()
    print(f"当前工作目录: {current_dir}")
    
    confirm = input("是否在当前目录执行整理?(y/n): ").lower().strip()
    folder_path = current_dir if confirm == 'y' else input("请输入文件夹路径: ").strip()
    
    if not os.path.exists(folder_path):
        print("文件夹不存在!")
        return
    
    organizer = FastNetworkFileOrganizer(folder_path)
    
    print("\n请选择操作:")
    print("1. 整理文件")
    print("2. 回滚操作")
    
    choice = input("您的选择 (1/2): ").strip()
    
    if choice == '1':
        file_groups = organizer.analyze_files()
        
        if not file_groups:
            print("没有找到需要整理的文件组")
            return
        
        organizer.preview_organization(file_groups)
        
        confirm = input("\n确认执行整理操作?(y/n): ").lower().strip()
        if confirm != 'y':
            print("操作已取消")
            return
        
        print("\n选择处理模式:")
        print("1. 快速模式 (推荐)")
        print("2. 安全模式 (处理前500个)")
        
        mode = input("请选择模式 (1/2): ").strip()
        
        if mode == '2':
            limited_groups = dict(list(file_groups.items())[:500])
            print(f"安全模式: 只处理前 {len(limited_groups)} 个文件夹")
            organizer.organize_files_fast(limited_groups)
        else:
            organizer.organize_files_fast(file_groups)
            
    elif choice == '2':
        organizer.rollback_operation()
    else:
        print("无效选择")

if __name__ == "__main__":
    main()

写在最后

这个小工具完美解决了我个人的痛点,现在我的 Jellyfin 媒体库前所未有的清爽。如果你也面临同样的问题,不妨试试它。当然了,在操作你的“珍贵”数据之前,请务必做好备份! 虽然有回滚功能,但多一手准备总是好的。

希望这个脚本能解救同样处于水深火热中的你。