import os
import pandas as pd
import mysql.connector
import chardet
def detect_encoding(file_path):
with open(file_path, 'rb') as f:
raw_data = f.read(10000) # Read the first 10,000 bytes
result = chardet.detect(raw_data)
return result['encoding']
def connect_to_db(host, user, password, database):
return mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
def create_table(cursor, table_name, columns, charset, varchar_to_text=None):
if varchar_to_text is None:
varchar_to_text = []
columns_sql = ', '.join(
[f"`{col}` TEXT" if 'REMARK' in col.upper() or col in varchar_to_text else f"`{col}` VARCHAR(255)" for col in columns]
)
create_table_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({columns_sql}) CHARACTER SET {charset}"
cursor.execute(create_table_sql)
def insert_data(cursor, table_name, columns, data):
placeholders = ', '.join(['%s'] * len(columns))
columns_sql = ', '.join([f"`{col}`" for col in columns])
insert_sql = f"INSERT INTO `{table_name}` ({columns_sql}) VALUES ({placeholders})"
cursor.executemany(insert_sql, data)
def process_csv_file(file_path, db_connection, chunk_size=1000):
encoding = detect_encoding(file_path)
table_name = os.path.splitext(os.path.basename(file_path))[0]
try:
cursor = db_connection.cursor()
# 开始事务
db_connection.start_transaction()
# Determine MySQL charset based on detected encoding
charset = 'utf8mb4' if 'utf-8' in encoding.lower() else 'gbk' if 'gb' in encoding.lower() else 'latin1'
# Read the first chunk to get the columns and insert data
first_chunk = pd.read_csv(file_path, chunksize=chunk_size, encoding=encoding)
columns = []
for i, df in enumerate(first_chunk):
if i == 0:
columns = df.columns.tolist()
create_table(cursor, table_name, columns, charset)
data = df.values.tolist()
insert_data(cursor, table_name, columns, data)
# 提交事务
db_connection.commit()
print(f"Table `{table_name}` created and data inserted successfully.")
except mysql.connector.Error as e:
if e.errno == mysql.connector.errorcode.ER_DATA_TOO_LONG:
print(f"Data too long error: {e}")
# Rollback transaction
db_connection.rollback()
# Extract the column name that caused the error
error_msg = str(e)
start_index = error_msg.find("'")
end_index = error_msg.find("'", start_index + 1)
col_name = error_msg[start_index + 1:end_index]
print(f"Column `{col_name}` data is too long. Recreating table with `{col_name}` as TEXT.")
# Drop the table if it exists
cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
# Recreate the table with the problematic column as TEXT
create_table(cursor, table_name, columns, charset, varchar_to_text=[col_name])
# Retry inserting the data
try:
db_connection.start_transaction()
first_chunk = pd.read_csv(file_path, chunksize=chunk_size, encoding=encoding)
for i, df in enumerate(first_chunk):
data = df.values.tolist()
insert_data(cursor, table_name, columns, data)
db_connection.commit()
print(f"Table `{table_name}` recreated and data inserted successfully.")
except Exception as retry_e:
db_connection.rollback()
print(f"Failed to insert data into table `{table_name}` on retry. Error: {retry_e}")
else:
db_connection.rollback()
print(f"Failed to insert data into table `{table_name}`. Error: {e}")
finally:
cursor.close()
def process_csv_files(directory, db_connection, chunk_size=1000):
for filename in os.listdir(directory):
if filename.endswith('.csv'):
file_path = os.path.join(directory, filename)
process_csv_file(file_path, db_connection, chunk_size)
if __name__ == "__main__":
# Database connection parameters
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
# Directory containing CSV files
csv_directory = 'path_to_your_csv_directory'
# Connect to the database
db_connection = connect_to_db(**db_config)
try:
# Process CSV files
process_csv_files(csv_directory, db_connection)
finally:
# Close the database connection
db_connection.close()
通过sqoop把hive数据到mysql,脚本提示成功,mysql对应的表中没有数
2024-06-14 08:28:01 21 阅读