解决新浪微博图床403批量下载图片等资源(以MMChat数据集为例)

文章目录


1. 代码

该 Python 脚本可多线程地批量下载新浪图床图片,每次下载会检查哪些图片已下载并过滤已下载的图片。

import os
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import time

from tqdm import tqdm


def setup_logger():
    # 设置日志记录:同时输出到文件和控制台
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # 文件处理器
    file_handler = logging.FileHandler('download.log')
    file_handler.setLevel(logging.INFO)

    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.ERROR)

    # 日志格式
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # 添加处理器到logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)


def download_image_by_change_referer(output_dir, url):
    """
    下载图片
    Args:
        output_dir:     保存图片的文件夹
        url:            图片链接

    Returns:    下载成功返回True,否则返回False

    """
    headers = {
        'Referer': 'https://weibo.com/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36'
    }

    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            file_name = os.path.join(output_dir, url.split('/')[-1])
            with open(file_name, 'wb') as f:
                f.write(response.content)
            logging.info(f'Successfully downloaded {file_name}')
            return True
        else:
            logging.error(f'Failed to download {url}, status code: {response.status_code}')
            return False
    except requests.RequestException as e:
        logging.error(f'Error downloading {url}: {e}')
        return False
    # time.sleep(0.1)  # 限制请求频率,防止过快请求


def download_image_by_baidu_cache(output_dir, url):
    """
    使用第三方缓存服务来解决防盗链问题
    参考 https://code.newban.cn/466.html
    Args:
        output_dir:     保存图片的文件夹
        url:            图片链接

    Returns:            下载成功返回True,否则返回False

    """
    headers = {
        'Referer': 'https://image.baidu.com/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36'
    }
    try:
        response = requests.get(f'https://image.baidu.com/search/down?url={url}', headers=headers, timeout=10)
        if response.status_code == 200:
            file_name = os.path.join(output_dir, url.split('/')[-1])
            with open(file_name, 'wb') as f:
                f.write(response.content)
            logging.info(f'Successfully downloaded {file_name}')
            return True
        else:
            logging.error(f'Failed to download {url}, status code: {response.status_code}')
            return False
    except requests.RequestException as e:
        logging.error(f'Error downloading {url}: {e}')
        return False


def batch_download(output_dir, urls, method='baidu_cache', max_workers=10):
    """
    批量下载图片
    Args:
        output_dir:     保存图片的文件夹
        urls:           图片链接列表
        max_workers:    最大线程数
        method:         下载方法,可选值为'change_referer'或'baidu_cache'

    Returns:            成功下载的图片数量和失败的图片数量

    """
    if method == 'change_referer':
        download_image_function = download_image_by_change_referer
    elif method == 'baidu_cache':
        download_image_function = download_image_by_baidu_cache
    else:
        raise ValueError(f'Invalid method: {method}')
    success_count, failed_count = 0, 0
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(download_image_function, output_dir, url): url for url in urls}
        for future in tqdm(as_completed(future_to_url), total=len(urls)):
            url = future_to_url[future]
            try:
                result = future.result()
                if result:
                    success_count += 1
                else:
                    failed_count += 1
            except Exception as e:
                logging.error(f'Error processing {url}: {e}')
                failed_count += 1
    return success_count, failed_count


def get_all_image_urls(dataset_root_dir):
    """
    读取所有图片链接
    Args:
        dataset_root_dir:   数据集根目录,包含img_url_dev.json, img_url_test.json, img_url_train.json
    Returns:    图片链接列表

    """
    files = [r'img_url_dev.json', r'img_url_test.json', r'img_url_train.json']
    image_urls = set()
    for idx, file in enumerate(files):
        path = os.path.join(dataset_root_dir, file)
        with open(path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        for line in lines:
            image_urls.update(line.strip().split(';'))
    return image_urls


def get_downloaded_images(output_dir):
    """
    读取已下载的图片(避免重复下载)
    Returns:    已下载的图片集合

    """
    downloaded_images = os.listdir(output_dir)
    return downloaded_images


def main():
    setup_logger()
    config = dict(
        dataset_root_dir=r'D:\Library\Datasets\20_MMChat',
        output_dir='downloaded_images',
        download_image_method='baidu_cache',
    )

    # 图片链接列表
    image_urls = get_all_image_urls(config['dataset_root_dir'])
    # 目标文件夹
    output_dir = config['output_dir']
    os.makedirs(output_dir, exist_ok=True)
    # 过滤已下载的图片
    downloaded_images = set(get_downloaded_images(output_dir))
    image_urls_to_download = [url for url in image_urls if url.split('/')[-1] not in downloaded_images]
    logging.info(f'Total images: {len(image_urls)}, images to download: {len(image_urls_to_download)}')
    confirm = input(
        f'共有 {len(image_urls)} 张图片,已过滤 {len(image_urls) - len(image_urls_to_download)} 张已下载的图片,待下载 {len(image_urls_to_download)} 张图片,确认下载?(y/n): ')
    if confirm.lower() != 'y':
        return

    # 开始批量下载
    success_count, failed_count = batch_download(output_dir, image_urls_to_download,
                                                 method=config['download_image_method'], max_workers=40)

    logging.info(f'下载完成,Success: {success_count}, Failed: {failed_count}')
    print(f'下载完成,Success: {success_count}, Failed: {failed_count}')


if __name__ == '__main__':
    main()

2. 举一反三

该代码适用 MMChat 数据集,原理上支持所有新浪图床批量下载已失效的图片。

你需要修改 get_all_image_urls 方法来获取你想下载的所有图片的 URL 列表,该方法返回值:

[
    "https://wx2.sinaimg.cn/mw2048/bc5ca296ly1fpt9oq74vsj20hs0npdqp.jpg",
    "https://wx3.sinaimg.cn/mw2048/8bec28c2ly1fg1i9o0liqj20zk0qo7bj.jpg",
    "https://wx4.sinaimg.cn/mw2048/954d55d0ly1fmwvia87e1j20ku11210q.jpg",
    ...
]

get_downloaded_images 方法返回已下载的图片,如果上面的三张图片已下载,那么该方法会返回

[
    "bc5ca296ly1fpt9oq74vsj20hs0npdqp.jpg",
    "8bec28c2ly1fg1i9o0liqj20zk0qo7bj.jpg",
    "954d55d0ly1fmwvia87e1j20ku11210q.jpg",
]

最后,你可以修改图片的下载方式,目前支持 download_image_by_change_refererdownload_image_by_baidu_cache

最近更新

  1. TCP协议是安全的吗?

    2024-05-16 12:44:06       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-16 12:44:06       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-16 12:44:06       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-16 12:44:06       18 阅读

热门阅读

  1. 【Python】学生管理系统

    2024-05-16 12:44:06       13 阅读
  2. 2024.5.15晚训题解

    2024-05-16 12:44:06       11 阅读
  3. 【转】VS(Visual Studio)更改文件编码

    2024-05-16 12:44:06       12 阅读
  4. Sping @Autowired @Value @Resourece依赖注入原理

    2024-05-16 12:44:06       13 阅读
  5. spark分布式预测和保存过程中遇到的问题记录

    2024-05-16 12:44:06       12 阅读
  6. OpenCV 实时目标检测

    2024-05-16 12:44:06       12 阅读