




pip install Pillow

下面是完整的 Python 代码:

import os
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
import threading

# 全局变量和锁用于跟踪处理的图像数量
processed_images_count = 0
processed_images_lock = threading.Lock()

def is_image_file(file):
        with Image.open(file) as img:
            return img.format in ['JPEG', 'PNG', 'BMP', 'GIF', 'TIFF']
    except IOError as e:
        print(f"无法打开图像文件 {file}: {e}")  # 打印错误信息和文件路径
        return False

def compress_image(input_file):
    global processed_images_count

    print(f"正在处理图像: {input_file}")  # 添加了调试信息

        # 打开图像文件
        with Image.open(input_file) as img:
            # 获取图像的格式
            file_format = img.format

            # 保存压缩后的图像
            img.save(input_file, format=file_format, optimize=True)
            print(f"已压缩图像: {input_file}")  # 添加了调试信息

    except Exception as e:  # 捕获所有异常
        print(f"压缩图像时发生错误 {input_file}: {e}")  # 打印错误信息和文件路径

    # 更新已处理图像的计数器
    with processed_images_lock:
        processed_images_count += 1
        print(f"已成功压缩 {processed_images_count} 张图像: {input_file}")

def compress_images_in_folders(thread_count):
    image_files = []

    # 遍历文件夹,找到所有图像文件
    for root, _, files in os.walk(os.getcwd()):
        print(f"正在访问文件夹: {root}")  # 打印正在访问的文件夹
        for file in files:
            input_file = os.path.join(root, file)

            # 检查文件扩展名
            _, ext = os.path.splitext(input_file)
            if ext.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff']:
                if is_image_file(input_file):

    # 使用线程池进行图像压缩
    with ThreadPoolExecutor(max_workers=thread_count) as executor:
        executor.map(compress_image, image_files)

if __name__ == "__main__":
    thread_count = 32  # 固定线程数量为32
    print(f"使用 {thread_count} 个线程进行图像压缩")


