使用步骤
安装依赖模块
确保安装 mysql-connector-python:
pip install mysql-connector-python
确保 mysqldump 和 mysql 命令可用
此脚本依赖于系统中安装的 mysqldump 和 mysql 命令。请确保它们已安装并配置在系统路径中。
配置参数
替换脚本中的 host、user、password、original_db 和 new_db 变量,使用你自己的数据库连接信息和数据库名称。
运行脚本:
python clone_database.py
检查结果
脚本运行后,原始数据库的数据将被复制到一个新数据库中。
import mysql.connector
import subprocess
import os
def export_and_clone_databases(host, user, password, databases, suffix):
try:
# Step 1: 创建 MySQL 连接
connection = mysql.connector.connect(
host=host,
user=user,
password=password
)
cursor = connection.cursor()
for original_db in databases:
# Step 2: 检查原始数据库是否存在(使用反引号转义数据库名)
cursor.execute(f"SHOW DATABASES LIKE '{original_db}';")
if not cursor.fetchone():
print(f"Error: Database '{original_db}' does not exist.")
continue
# Step 3: 创建新数据库(用反引号转义数据库名)
new_db = f"{original_db}_{suffix}"
cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{new_db}`;")
print(f"New database '{new_db}' created successfully.")
# Step 4: 使用 mysqldump 导出原数据库
dump_file = f"{original_db}.sql"
# dump_command = f"mysqldump -h {host} -u {user} -p{password} `{original_db}` > {dump_file}"
dump_command = f'"C:\\Program Files\\MySQL\\MySQL Server 8.0\\bin\\mysqldump.exe" -h {host} -u {user} -p{password} {original_db} > {dump_file}'
print(f"Executing: {dump_command}")
subprocess.run(dump_command, shell=True, check=True)
print(f"Database '{original_db}' exported to '{dump_file}'.")
# Step 5: 导入 SQL 文件到新数据库
# import_command = f"mysql -h {host} -u {user} -p{password} `{new_db}` < {dump_file}"
import_command = f'"C:\\Program Files\\MySQL\\MySQL Server 8.0\\bin\\mysql.exe" -h {host} -u {user} -p{password} {new_db} < {dump_file}'
print(f"Executing: {import_command}")
subprocess.run(import_command, shell=True, check=True)
print(f"Data imported into new database '{new_db}' successfully.")
# Step 6: 删除临时文件(可选)
if os.path.exists(dump_file):
os.remove(dump_file)
print(f"Temporary file '{dump_file}' deleted.")
except mysql.connector.Error as err:
print(f"Error: {err}")
except subprocess.CalledProcessError as err:
print(f"Command failed: {err}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
print("MySQL connection closed.")
# 使用示例
if __name__ == "__main__":
host = "localhost" # 数据库地址
user = "root" # 数据库用户名
password = "xxx" # 数据库密码
# 定义需要复制的数据库列表
databases = [
"industry-xxx"
]
# 新数据库的后缀
suffix = "xxx" # 可更改为需要的后缀
export_and_clone_databases(host, user, password, databases, suffix)