保存csv到mysql的通用脚本

import os
import pandas as pd
import mysql.connector
import chardet

def detect_encoding(file_path):
    with open(file_path, 'rb') as f:
        raw_data = f.read(10000)  # Read the first 10,000 bytes
    result = chardet.detect(raw_data)
    return result['encoding']

def connect_to_db(host, user, password, database):
    return mysql.connector.connect(
        host=host,
        user=user,
        password=password,
        database=database
    )

def create_table(cursor, table_name, columns, charset, varchar_to_text=None):
    if varchar_to_text is None:
        varchar_to_text = []
    columns_sql = ', '.join(
        [f"`{col}` TEXT" if 'REMARK' in col.upper() or col in varchar_to_text else f"`{col}` VARCHAR(255)" for col in columns]
    )
    create_table_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({columns_sql}) CHARACTER SET {charset}"
    cursor.execute(create_table_sql)

def insert_data(cursor, table_name, columns, data):
    placeholders = ', '.join(['%s'] * len(columns))
    columns_sql = ', '.join([f"`{col}`" for col in columns])
    insert_sql = f"INSERT INTO `{table_name}` ({columns_sql}) VALUES ({placeholders})"
    cursor.executemany(insert_sql, data)

def process_csv_file(file_path, db_connection, chunk_size=1000):
    encoding = detect_encoding(file_path)
    table_name = os.path.splitext(os.path.basename(file_path))[0]
    
    try:
        cursor = db_connection.cursor()
        
        # 开始事务
        db_connection.start_transaction()
        
        # Determine MySQL charset based on detected encoding
        charset = 'utf8mb4' if 'utf-8' in encoding.lower() else 'gbk' if 'gb' in encoding.lower() else 'latin1'
        
        # Read the first chunk to get the columns and insert data
        first_chunk = pd.read_csv(file_path, chunksize=chunk_size, encoding=encoding)
        columns = []
        for i, df in enumerate(first_chunk):
            if i == 0:
                columns = df.columns.tolist()
                create_table(cursor, table_name, columns, charset)
            data = df.values.tolist()
            insert_data(cursor, table_name, columns, data)
        
        # 提交事务
        db_connection.commit()
        print(f"Table `{table_name}` created and data inserted successfully.")
    except mysql.connector.Error as e:
        if e.errno == mysql.connector.errorcode.ER_DATA_TOO_LONG:
            print(f"Data too long error: {e}")
            # Rollback transaction
            db_connection.rollback()
            
            # Extract the column name that caused the error
            error_msg = str(e)
            start_index = error_msg.find("'")
            end_index = error_msg.find("'", start_index + 1)
            col_name = error_msg[start_index + 1:end_index]
            
            print(f"Column `{col_name}` data is too long. Recreating table with `{col_name}` as TEXT.")
            
            # Drop the table if it exists
            cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
            
            # Recreate the table with the problematic column as TEXT
            create_table(cursor, table_name, columns, charset, varchar_to_text=[col_name])
            
            # Retry inserting the data
            try:
                db_connection.start_transaction()
                first_chunk = pd.read_csv(file_path, chunksize=chunk_size, encoding=encoding)
                for i, df in enumerate(first_chunk):
                    data = df.values.tolist()
                    insert_data(cursor, table_name, columns, data)
                db_connection.commit()
                print(f"Table `{table_name}` recreated and data inserted successfully.")
            except Exception as retry_e:
                db_connection.rollback()
                print(f"Failed to insert data into table `{table_name}` on retry. Error: {retry_e}")
        else:
            db_connection.rollback()
            print(f"Failed to insert data into table `{table_name}`. Error: {e}")
    finally:
        cursor.close()

def process_csv_files(directory, db_connection, chunk_size=1000):
    for filename in os.listdir(directory):
        if filename.endswith('.csv'):
            file_path = os.path.join(directory, filename)
            process_csv_file(file_path, db_connection, chunk_size)

if __name__ == "__main__":
    # Database connection parameters
    db_config = {
        'host': 'localhost',
        'user': 'your_username',
        'password': 'your_password',
        'database': 'your_database'
    }

    # Directory containing CSV files
    csv_directory = 'path_to_your_csv_directory'

    # Connect to the database
    db_connection = connect_to_db(**db_config)

    try:
        # Process CSV files
        process_csv_files(csv_directory, db_connection)
    finally:
        # Close the database connection
        db_connection.close()

最近更新

  1. TCP协议是安全的吗?

    2024-06-14 08:28:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-14 08:28:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-14 08:28:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-14 08:28:01       20 阅读

热门阅读

  1. Shell 输入/输出重定向

    2024-06-14 08:28:01       9 阅读
  2. 人生结果等于思维方式乘以热情乘以能力

    2024-06-14 08:28:01       9 阅读
  3. Spring事务相关

    2024-06-14 08:28:01       6 阅读
  4. 深入理解MyBatis XML配置文件

    2024-06-14 08:28:01       8 阅读
  5. 深入解析Web通信 HTTP、HTTPS 和 WebSocket

    2024-06-14 08:28:01       11 阅读
  6. 阿里云aliyun cli的作用以及安装步骤

    2024-06-14 08:28:01       9 阅读
  7. ffmpeg把视频文件转码为MP4格式

    2024-06-14 08:28:01       6 阅读
  8. 「C系列」C 函数指针/回调函数

    2024-06-14 08:28:01       8 阅读
  9. Linux 如何查看磁盘空间占用

    2024-06-14 08:28:01       5 阅读
  10. 探索微软Edge:新时代的浏览器先锋

    2024-06-14 08:28:01       7 阅读