使用Python操作数据库
使用Python操作数据库
1、创建SQL测试数据
-- 创建test数据库
CREATE DATABASE IF NOT EXISTS test
CHARACTER SET utf8mb4
COLLATE utf8mb4_unicode_ci;
-- 使用test数据库
USE test;
-- 创建用户表 users
-- DECIMAL(10, 2) 表示最多10位数字,其中2位小数
CREATE TABLE users(
`id` INT AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(20) NOT NULL,
`salary` DECIMAL(10, 2) NOT NULL
);
-- 插入数据
-- 字符数据使用单引号
INSERT INTO users(id, name, salary)
VALUES
(11, 'chen', 1400),
(12, 'wang', 1200);

2、使用Python连接数据库
import pymysql
# 连接数据库
conn = pymysql.connect(
host='localhost',
user='root',
passwd='123456', # 密码必须是字符串,添加引号
db='test',
port=3306, # 端口必须是int类型,否则会报错 ValueError: port should be of type int
charset='utf8' # 指定字符集为UTF-8,告诉数据库如何处理和存储文本数据。
)
try:
# 创建游标
cur1 = conn.cursor()
cur2 = conn.cursor()
# 执行查询
cur1.execute('SELECT * FROM users')
cur2.execute('DESC users')
# 获取结果
result1 = cur1.fetchall()
result2 = cur2.fetchall()
# 打印结果
print("users 表的数据:")
for row in result1:
print(row)
print("\nusers 表的结构:")
for row in result2:
print(row)
# 关闭游标和连接
finally:
cur1.close()
cur2.close()
conn.close()
3、使用AI创建数据库操作脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
交互式数据库操作脚本
功能: 通过菜单选择操作,自定义所有参数
用法: python db_assistant.py
"""
import pymysql
import sys
import os
from typing import List, Dict, Any, Optional
class DatabaseAssistant:
"""数据库交互助手"""
def __init__(self):
"""初始化,获取数据库连接配置"""
self.connection_params = {}
self.current_db = None
self.db_manager = None
self.setup_database_connection()
def setup_database_connection(self):
"""设置数据库连接"""
print("\n" + "="*60)
print("🔧 数据库连接配置")
print("="*60)
# 获取连接参数
host = input("主机地址 (默认: localhost): ").strip() or "localhost"
user = input("用户名 (默认: root): ").strip() or "root"
password = input("密码 (默认: 123456): ").strip() or "123456"
port = input("端口 (默认: 3306): ").strip() or "3306"
charset = input("字符集 (默认: utf8mb4): ").strip() or "utf8mb4"
self.connection_params = {
'host': host,
'user': user,
'password': password,
'port': int(port),
'charset': charset
}
# 测试连接
if self.test_connection():
print("✅ 数据库连接成功!")
else:
print("❌ 连接失败,请检查参数")
sys.exit(1)
def test_connection(self):
"""测试数据库连接"""
try:
conn = pymysql.connect(
host=self.connection_params['host'],
user=self.connection_params['user'],
password=self.connection_params['password'],
port=self.connection_params['port'],
charset=self.connection_params['charset']
)
conn.close()
return True
except pymysql.Error as e:
print(f"连接错误: {e}")
return False
def get_connection(self, database=None):
"""获取数据库连接"""
try:
conn = pymysql.connect(
host=self.connection_params['host'],
user=self.connection_params['user'],
password=self.connection_params['password'],
database=database,
port=self.connection_params['port'],
charset=self.connection_params['charset'],
cursorclass=pymysql.cursors.DictCursor
)
return conn
except pymysql.Error as e:
print(f"❌ 连接数据库失败: {e}")
return None
def show_main_menu(self):
"""显示主菜单"""
while True:
print("\n" + "="*60)
print("📊 数据库操作助手 - 主菜单")
print("="*60)
if self.current_db:
print(f"当前数据库: [{self.current_db}]")
else:
print("当前数据库: [未选择]")
print("\n请选择操作:")
print(" 1. 📂 数据库操作")
print(" 2. 📋 数据表操作")
print(" 3. 📝 数据操作")
print(" 4. 🔄 切换数据库")
print(" 5. 🔧 重新配置连接")
print(" 0. ❌ 退出程序")
choice = input("\n请输入选项 (0-5): ").strip()
if choice == '0':
print("👋 再见!")
sys.exit(0)
elif choice == '1':
self.database_operations_menu()
elif choice == '2':
if not self.current_db:
print("⚠️ 请先选择数据库!")
self.select_database()
continue
self.table_operations_menu()
elif choice == '3':
if not self.current_db:
print("⚠️ 请先选择数据库!")
self.select_database()
continue
self.data_operations_menu()
elif choice == '4':
self.select_database()
elif choice == '5':
self.setup_database_connection()
else:
print("❌ 无效选项,请重新选择")
def select_database(self):
"""选择当前操作的数据库"""
print("\n" + "="*60)
print("选择数据库")
print("="*60)
databases = self.list_databases(show=False)
if not databases:
print("没有可用的数据库")
return
print("\n可用的数据库:")
for i, db in enumerate(databases, 1):
print(f" {i}. {db}")
print(f" {len(databases)+1}. [创建新数据库]")
print(f" {len(databases)+2}. [返回主菜单]")
try:
choice = int(input(f"\n请选择数据库 (1-{len(databases)+2}): ").strip())
if choice == len(databases) + 1:
self.create_database_interactive()
elif choice == len(databases) + 2:
return
elif 1 <= choice <= len(databases):
self.current_db = databases[choice-1]
print(f"✅ 已选择数据库: {self.current_db}")
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
def list_databases(self, show=True):
"""列出所有数据库"""
try:
conn = self.get_connection()
if not conn:
return []
with conn.cursor() as cursor:
cursor.execute("SHOW DATABASES")
databases = [db['Database'] for db in cursor.fetchall()]
if show:
print("\n📂 所有数据库:")
print("-" * 40)
for i, db in enumerate(databases, 1):
print(f"{i:2d}. {db}")
print("-" * 40)
print(f"总计: {len(databases)} 个数据库")
return databases
except pymysql.Error as e:
print(f"❌ 查询数据库列表失败: {e}")
return []
finally:
if conn:
conn.close()
# ==================== 数据库操作菜单 ====================
def database_operations_menu(self):
"""数据库操作菜单"""
while True:
print("\n" + "="*60)
print("📂 数据库操作")
print("="*60)
print("请选择操作:")
print(" 1. 📋 查看所有数据库")
print(" 2. ➕ 创建数据库")
print(" 3. ❌ 删除数据库")
print(" 4. 📊 查看数据库信息")
print(" 0. ↩️ 返回主菜单")
choice = input("\n请输入选项 (0-4): ").strip()
if choice == '0':
return
elif choice == '1':
self.list_databases()
elif choice == '2':
self.create_database_interactive()
elif choice == '3':
self.delete_database_interactive()
elif choice == '4':
self.show_database_info()
else:
print("❌ 无效选项")
def create_database_interactive(self):
"""交互式创建数据库"""
print("\n" + "="*60)
print("创建数据库")
print("="*60)
db_name = input("请输入数据库名: ").strip()
if not db_name:
print("❌ 数据库名不能为空")
return
charset = input(f"字符集 (默认: {self.connection_params['charset']}): ").strip() or self.connection_params['charset']
try:
conn = self.get_connection()
if not conn:
return
with conn.cursor() as cursor:
sql = f"CREATE DATABASE IF NOT EXISTS `{db_name}` " \
f"DEFAULT CHARACTER SET {charset} " \
f"COLLATE {charset}_unicode_ci"
cursor.execute(sql)
conn.commit()
print(f"✅ 数据库 '{db_name}' 创建成功!")
# 询问是否选择该数据库
choice = input("是否选择该数据库作为当前数据库? (y/N): ").strip().lower()
if choice == 'y':
self.current_db = db_name
print(f"✅ 已选择数据库: {self.current_db}")
except pymysql.Error as e:
print(f"❌ 创建数据库失败: {e}")
finally:
if conn:
conn.close()
def delete_database_interactive(self):
"""交互式删除数据库"""
print("\n" + "="*60)
print("删除数据库")
print("="*60)
databases = self.list_databases(show=False)
if not databases:
print("没有可用的数据库")
return
print("\n可删除的数据库:")
for i, db in enumerate(databases, 1):
print(f" {i}. {db}")
try:
choice = int(input(f"\n请选择要删除的数据库 (1-{len(databases)}): ").strip())
if 1 <= choice <= len(databases):
db_name = databases[choice-1]
# 确认
confirm = input(f"⚠️ 确定要删除数据库 '{db_name}' 吗? 此操作不可恢复! (输入 'DELETE' 确认): ").strip()
if confirm == 'DELETE':
try:
conn = self.get_connection()
if not conn:
return
with conn.cursor() as cursor:
cursor.execute(f"DROP DATABASE IF EXISTS `{db_name}`")
conn.commit()
print(f"✅ 数据库 '{db_name}' 已删除!")
# 如果删除的是当前数据库,清除选择
if self.current_db == db_name:
self.current_db = None
print("⚠️ 已清除当前数据库选择")
except pymysql.Error as e:
print(f"❌ 删除数据库失败: {e}")
finally:
if conn:
conn.close()
else:
print("❌ 操作已取消")
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
def show_database_info(self):
"""显示数据库信息"""
databases = self.list_databases(show=False)
if not databases:
print("没有可用的数据库")
return
print("\n选择要查看的数据库:")
for i, db in enumerate(databases, 1):
print(f" {i}. {db}")
try:
choice = int(input(f"\n请选择数据库 (1-{len(databases)}): ").strip())
if 1 <= choice <= len(databases):
db_name = databases[choice-1]
try:
conn = self.get_connection(db_name)
if not conn:
return
with conn.cursor() as cursor:
# 获取数据库字符集
cursor.execute(f"SELECT DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME "
f"FROM information_schema.SCHEMATA "
f"WHERE SCHEMA_NAME = '{db_name}'")
db_info = cursor.fetchone()
# 获取所有表
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
# 获取表数量和数据量
total_rows = 0
for table in tables:
table_name = table[f'Tables_in_{db_name}']
cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
count = cursor.fetchone()['count']
total_rows += count
print(f"\n📊 数据库信息: {db_name}")
print("-" * 50)
print(f"字符集: {db_info['DEFAULT_CHARACTER_SET_NAME']}")
print(f"排序规则: {db_info['DEFAULT_COLLATION_NAME']}")
print(f"表数量: {len(tables)}")
print(f"总数据行数: {total_rows:,}")
print("-" * 50)
if tables:
print("\n📋 表列表:")
for table in tables:
table_name = table[f'Tables_in_{db_name}']
cursor.execute(f"SHOW TABLE STATUS LIKE '{table_name}'")
status = cursor.fetchone()
print(f" - {table_name} (行数: {status['Rows']:,})")
except pymysql.Error as e:
print(f"❌ 查询数据库信息失败: {e}")
finally:
if conn:
conn.close()
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
# ==================== 数据表操作菜单 ====================
def table_operations_menu(self):
"""数据表操作菜单"""
while True:
print("\n" + "="*60)
print(f"📋 数据表操作 - 数据库: {self.current_db}")
print("="*60)
print("请选择操作:")
print(" 1. 📋 查看所有表")
print(" 2. ➕ 创建表")
print(" 3. ❌ 删除表")
print(" 4. 📊 查看表结构")
print(" 5. 🔧 添加表字段")
print(" 6. 📄 查看表详细信息")
print(" 0. ↩️ 返回主菜单")
choice = input("\n请输入选项 (0-6): ").strip()
if choice == '0':
return
elif choice == '1':
self.list_tables_interactive()
elif choice == '2':
self.create_table_interactive()
elif choice == '3':
self.delete_table_interactive()
elif choice == '4':
self.describe_table_interactive()
elif choice == '5':
self.add_column_interactive()
elif choice == '6':
self.show_table_info_interactive()
else:
print("❌ 无效选项")
def list_tables_interactive(self, show=True):
"""交互式列出表"""
try:
conn = self.get_connection(self.current_db)
if not conn:
return []
with conn.cursor() as cursor:
cursor.execute("SHOW TABLES")
tables = [list(table.values())[0] for table in cursor.fetchall()]
if show:
if tables:
print(f"\n📊 数据库 '{self.current_db}' 中的表:")
print("-" * 50)
for i, table in enumerate(tables, 1):
cursor.execute(f"SHOW TABLE STATUS LIKE '{table}'")
table_info = cursor.fetchone()
rows = table_info['Rows'] if table_info else '?'
print(f"{i:2d}. {table:20s} (行数: {rows:,})")
print("-" * 50)
else:
print(f"数据库 '{self.current_db}' 中没有表")
return tables
except pymysql.Error as e:
print(f"❌ 查询表列表失败: {e}")
return []
finally:
if conn:
conn.close()
def create_table_interactive(self):
"""交互式创建表"""
print("\n" + "="*60)
print("创建数据表")
print("="*60)
table_name = input("请输入表名: ").strip()
if not table_name:
print("❌ 表名不能为空")
return
print("\n现在开始定义表字段,输入 'done' 结束")
columns = []
field_count = 1
while True:
print(f"\n字段 #{field_count}:")
field_name = input(" 字段名: ").strip()
if field_name.lower() == 'done':
if len(columns) == 0:
print("⚠️ 至少需要定义一个字段")
continue
break
if not field_name:
print("❌ 字段名不能为空")
continue
# 字段类型
print("\n 常用字段类型:")
print(" 1. INT - 整数")
print(" 2. VARCHAR(长度) - 可变长度字符串")
print(" 3. TEXT - 长文本")
print(" 4. DECIMAL(总位数,小数位) - 小数")
print(" 5. DATE - 日期")
print(" 6. DATETIME - 日期时间")
print(" 7. TIMESTAMP - 时间戳")
print(" 8. 自定义输入")
type_choice = input("\n 请选择字段类型 (1-8): ").strip()
if type_choice == '1':
field_type = "INT"
elif type_choice == '2':
length = input(" 请输入最大长度 (默认: 255): ").strip() or "255"
field_type = f"VARCHAR({length})"
elif type_choice == '3':
field_type = "TEXT"
elif type_choice == '4':
total = input(" 总位数 (默认: 10): ").strip() or "10"
decimal = input(" 小数位数 (默认: 2): ").strip() or "2"
field_type = f"DECIMAL({total},{decimal})"
elif type_choice == '5':
field_type = "DATE"
elif type_choice == '6':
field_type = "DATETIME"
elif type_choice == '7':
field_type = "TIMESTAMP"
elif type_choice == '8':
field_type = input(" 请输入自定义字段类型: ").strip()
else:
print("❌ 无效选项,使用默认类型 VARCHAR(255)")
field_type = "VARCHAR(255)"
# 字段额外属性
print("\n 字段额外属性 (可多选,用逗号分隔):")
print(" 1. NOT NULL - 不允许为空")
print(" 2. UNIQUE - 唯一约束")
print(" 3. PRIMARY KEY - 主键")
print(" 4. AUTO_INCREMENT - 自增")
print(" 5. DEFAULT '值' - 默认值")
print(" 6. 无")
extra_choice = input("\n 请选择额外属性 (1-6): ").strip()
extra = ""
if extra_choice == '1':
extra = "NOT NULL"
elif extra_choice == '2':
extra = "UNIQUE"
elif extra_choice == '3':
extra = "PRIMARY KEY"
elif extra_choice == '4':
extra = "AUTO_INCREMENT"
elif extra_choice == '5':
default_value = input(" 请输入默认值: ").strip()
extra = f"DEFAULT '{default_value}'"
elif extra_choice == '6':
extra = ""
# 添加字段
column = {"name": field_name, "type": field_type}
if extra:
column["extra"] = extra
columns.append(column)
field_count += 1
print(f"✅ 字段 '{field_name}' 已添加")
print(f" 已定义字段: {', '.join([col['name'] for col in columns])}")
# 创建表
try:
conn = self.get_connection(self.current_db)
if not conn:
return
with conn.cursor() as cursor:
# 构建字段定义SQL
column_defs = []
for col in columns:
col_def = f"`{col['name']}` {col['type']}"
if 'extra' in col:
col_def += f" {col['extra']}"
column_defs.append(col_def)
columns_sql = ",\n ".join(column_defs)
# 完整的建表SQL
sql = f"""
CREATE TABLE IF NOT EXISTS `{table_name}` (
{columns_sql}
) ENGINE=InnoDB DEFAULT CHARSET={self.connection_params['charset']}
"""
cursor.execute(sql)
conn.commit()
print(f"\n✅ 表 '{table_name}' 创建成功!")
# 询问是否查看表结构
choice = input("是否查看表结构? (y/N): ").strip().lower()
if choice == 'y':
self.describe_table(self.current_db, table_name)
except pymysql.Error as e:
print(f"❌ 创建表失败: {e}")
finally:
if conn:
conn.close()
def delete_table_interactive(self):
"""交互式删除表"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要删除的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择要删除的表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
# 确认
confirm = input(f"⚠️ 确定要删除表 '{table_name}' 吗? (y/N): ").strip().lower()
if confirm == 'y':
try:
conn = self.get_connection(self.current_db)
if not conn:
return
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
conn.commit()
print(f"✅ 表 '{table_name}' 已删除!")
except pymysql.Error as e:
print(f"❌ 删除表失败: {e}")
finally:
if conn:
conn.close()
else:
print("❌ 操作已取消")
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
def describe_table_interactive(self):
"""交互式查看表结构"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要查看的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
self.describe_table(self.current_db, table_name)
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
def describe_table(self, db_name, table_name):
"""显示表结构"""
try:
conn = self.get_connection(db_name)
if not conn:
return False
with conn.cursor() as cursor:
# 获取表结构
cursor.execute(f"DESC `{table_name}`")
columns = cursor.fetchall()
if not columns:
print(f"表 '{table_name}' 不存在或没有字段")
return False
print(f"\n📋 表结构: {table_name}")
print("-" * 80)
print(f"{'字段名':15s} {'类型':20s} {'允许空':8s} {'键':5s} {'默认值':15s} {'额外':10s}")
print("-" * 80)
for col in columns:
print(f"{col['Field']:15s} {col['Type']:20s} {col['Null']:8s} "
f"{col['Key']:5s} {str(col['Default']):15s} {col['Extra']:10s}")
print("-" * 80)
return True
except pymysql.Error as e:
print(f"❌ 查询表结构失败: {e}")
return False
finally:
if conn:
conn.close()
def add_column_interactive(self):
"""交互式添加字段"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要添加字段的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
print(f"\n为表 '{table_name}' 添加新字段:")
field_name = input("字段名: ").strip()
if not field_name:
print("❌ 字段名不能为空")
return
print("\n常用字段类型:")
print("1. INT - 整数")
print("2. VARCHAR(长度) - 可变长度字符串")
print("3. TEXT - 长文本")
print("4. DECIMAL(总位数,小数位) - 小数")
print("5. DATE - 日期")
print("6. 自定义输入")
type_choice = input("\n请选择字段类型 (1-6): ").strip()
if type_choice == '1':
field_type = "INT"
elif type_choice == '2':
length = input("请输入最大长度 (默认: 255): ").strip() or "255"
field_type = f"VARCHAR({length})"
elif type_choice == '3':
field_type = "TEXT"
elif type_choice == '4':
total = input("总位数 (默认: 10): ").strip() or "10"
decimal = input("小数位数 (默认: 2): ").strip() or "2"
field_type = f"DECIMAL({total},{decimal})"
elif type_choice == '5':
field_type = "DATE"
elif type_choice == '6':
field_type = input("请输入自定义字段类型: ").strip()
else:
print("❌ 无效选项,使用默认类型 VARCHAR(255)")
field_type = "VARCHAR(255)"
# 字段额外属性
print("\n字段额外属性:")
print("1. NOT NULL - 不允许为空")
print("2. NULL - 允许为空 (默认)")
print("3. DEFAULT '值' - 默认值")
extra_choice = input("\n请选择额外属性 (1-3): ").strip()
if extra_choice == '1':
extra = "NOT NULL"
elif extra_choice == '3':
default_value = input("请输入默认值: ").strip()
extra = f"DEFAULT '{default_value}'"
else:
extra = "NULL"
# 添加到哪个字段后面
conn = self.get_connection(self.current_db)
with conn.cursor() as cursor:
cursor.execute(f"DESC `{table_name}`")
columns = cursor.fetchall()
if columns:
print("\n现有字段:")
for i, col in enumerate(columns, 1):
print(f" {i}. {col['Field']}")
print(f" {len(columns)+1}. 添加到第一个")
print(f" {len(columns)+2}. 不指定位置")
pos_choice = input(f"\n新字段添加到哪个字段后面? (1-{len(columns)+2}): ").strip()
try:
pos_choice = int(pos_choice)
if 1 <= pos_choice <= len(columns):
after_field = columns[pos_choice-1]['Field']
else:
after_field = None
except:
after_field = None
else:
after_field = None
# 构建字段定义
column_def = {"name": field_name, "type": field_type, "extra": extra}
if after_field:
column_def["after"] = after_field
# 添加字段
try:
if not conn:
conn = self.get_connection(self.current_db)
with conn.cursor() as cursor:
col_def = f"`{column_def['name']}` {column_def['type']} {column_def['extra']}"
if 'after' in column_def:
col_def += f" AFTER `{column_def['after']}`"
sql = f"ALTER TABLE `{table_name}` ADD COLUMN {col_def}"
cursor.execute(sql)
conn.commit()
print(f"✅ 字段 '{field_name}' 添加成功!")
# 询问是否查看表结构
choice = input("是否查看更新后的表结构? (y/N): ").strip().lower()
if choice == 'y':
self.describe_table(self.current_db, table_name)
except pymysql.Error as e:
print(f"❌ 添加字段失败: {e}")
finally:
if conn:
conn.close()
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
def show_table_info_interactive(self):
"""交互式查看表详细信息"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要查看的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
try:
conn = self.get_connection(self.current_db)
if not conn:
return {}
with conn.cursor() as cursor:
# 获取表状态
cursor.execute(f"SHOW TABLE STATUS LIKE '{table_name}'")
table_status = cursor.fetchone()
if not table_status:
print(f"表 '{table_name}' 不存在")
return {}
# 获取表结构
cursor.execute(f"DESC `{table_name}`")
columns = cursor.fetchall()
# 获取索引
cursor.execute(f"SHOW INDEX FROM `{table_name}`")
indexes = cursor.fetchall()
# 获取数据行数
cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
row_count = cursor.fetchone()['count']
print(f"\n📊 表详细信息: {table_name}")
print("=" * 60)
print(f"数据库: {self.current_db}")
print(f"引擎: {table_status['Engine']}")
print(f"行数: {row_count:,}")
print(f"创建时间: {table_status['Create_time']}")
print(f"字符集: {table_status['Collation']}")
print(f"注释: {table_status['Comment']}")
print(f"自增当前值: {table_status['Auto_increment'] or 'N/A'}")
print("=" * 60)
print(f"字段数: {len(columns)}")
print(f"索引数: {len(indexes)}")
if indexes:
print("\n🔑 索引信息:")
primary_keys = []
unique_indexes = {}
for idx in indexes:
if idx['Key_name'] == 'PRIMARY':
primary_keys.append(idx['Column_name'])
elif idx['Non_unique'] == 0:
if idx['Key_name'] not in unique_indexes:
unique_indexes[idx['Key_name']] = []
unique_indexes[idx['Key_name']].append(idx['Column_name'])
else:
print(f" 普通索引 '{idx['Key_name']}': {idx['Column_name']}")
if primary_keys:
print(f" 主键: {', '.join(primary_keys)}")
for idx_name, cols in unique_indexes.items():
print(f" 唯一索引 '{idx_name}': {', '.join(cols)}")
return {
'status': table_status,
'columns': columns,
'indexes': indexes
}
except pymysql.Error as e:
print(f"❌ 获取表信息失败: {e}")
return {}
finally:
if conn:
conn.close()
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
# ==================== 数据操作菜单 ====================
def data_operations_menu(self):
"""数据操作菜单"""
while True:
print("\n" + "="*60)
print(f"📝 数据操作 - 数据库: {self.current_db}")
print("="*60)
print("请选择操作:")
print(" 1. 🔍 查询数据")
print(" 2. ➕ 插入数据")
print(" 3. 🔄 更新数据")
print(" 4. ❌ 删除数据")
print(" 5. 📥 批量插入")
print(" 0. ↩️ 返回主菜单")
choice = input("\n请输入选项 (0-5): ").strip()
if choice == '0':
return
elif choice == '1':
self.query_data_interactive()
elif choice == '2':
self.insert_data_interactive()
elif choice == '3':
self.update_data_interactive()
elif choice == '4':
self.delete_data_interactive()
elif choice == '5':
self.batch_insert_interactive()
else:
print("❌ 无效选项")
def query_data_interactive(self):
"""交互式查询数据"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要查询的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
# 获取表结构以显示字段
conn = self.get_connection(self.current_db)
with conn.cursor() as cursor:
cursor.execute(f"DESC `{table_name}`")
columns = cursor.fetchall()
field_names = [col['Field'] for col in columns]
print(f"\n表 '{table_name}' 的字段: {', '.join(field_names)}")
# 查询选项
print("\n查询选项:")
print("1. 查询所有数据")
print("2. 条件查询")
print("3. 排序查询")
print("4. 限制查询条数")
print("5. 自定义查询")
query_choice = input("\n请选择查询方式 (1-5): ").strip()
if query_choice == '1':
# 查询所有数据
self.query_and_display_data(table_name)
elif query_choice == '2':
# 条件查询
print("\n可用的字段: " + ", ".join(field_names))
condition = input("请输入WHERE条件 (例如: age > 18 AND name LIKE '%张%'): ").strip()
if condition:
self.query_and_display_data(table_name, where=condition)
else:
print("❌ 条件不能为空")
elif query_choice == '3':
# 排序查询
print("\n可排序的字段: " + ", ".join(field_names))
order_field = input("请输入排序字段: ").strip()
order_direction = input("排序方向 (ASC/DESC, 默认: ASC): ").strip().upper() or "ASC"
if order_field:
self.query_and_display_data(table_name, order_by=f"{order_field} {order_direction}")
else:
print("❌ 排序字段不能为空")
elif query_choice == '4':
# 限制查询条数
limit_count = input("请输入要查询的条数: ").strip()
if limit_count.isdigit():
self.query_and_display_data(table_name, limit=limit_count)
else:
print("❌ 请输入有效的数字")
elif query_choice == '5':
# 自定义查询
print("\n自定义查询")
select_fields = input(f"选择字段 (默认: *, 可用字段: {', '.join(field_names)}): ").strip() or "*"
where_condition = input("WHERE条件 (可选): ").strip()
order_by = input("ORDER BY (可选): ").strip()
limit = input("LIMIT (可选): ").strip()
self.query_and_display_data(
table_name,
columns=select_fields,
where=where_condition if where_condition else None,
order_by=order_by if order_by else None,
limit=limit if limit else None
)
else:
print("❌ 无效选项")
conn.close()
else:
print("❌ 无效选项")
except ValueError:
print("❌ 请输入数字")
def query_and_display_data(self, table_name, columns='*', where=None, order_by=None, limit=None):
"""查询并显示数据"""
try:
conn = self.get_connection(self.current_db)
if not conn:
return []
with conn.cursor() as cursor:
# 构建查询SQL
sql = f"SELECT {columns} FROM `{table_name}`"
if where:
sql += f" WHERE {where}"
if order_by:
sql += f" ORDER BY {order_by}"
if limit:
sql += f" LIMIT {limit}"
print(f"\n📝 执行的SQL: {sql}")
cursor.execute(sql)
results = cursor.fetchall()
# 显示结果
if results:
print(f"\n🔍 查询结果 ({len(results)} 条记录):")
print("=" * 80)
# 显示字段名
fields = list(results[0].keys())
header = " | ".join([f"{field:20s}" for field in fields])
print(header)
print("-" * 80)
# 显示数据
for row in results:
line = " | ".join([f"{str(value)[:20]:20s}" for value in row.values()])
print(line)
print("=" * 80)
else:
print("没有找到匹配的记录")
return results
except pymysql.Error as e:
print(f"❌ 查询数据失败: {e}")
return []
finally:
if conn:
conn.close()
def insert_data_interactive(self):
"""交互式插入数据"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要插入数据的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
# 获取表结构
conn = self.get_connection(self.current_db)
with conn.cursor() as cursor:
cursor.execute(f"DESC `{table_name}`")
columns = cursor.fetchall()
print(f"\n📋 表 '{table_name}' 结构:")
for col in columns:
print(f" - {col['Field']} ({col['Type']}) "
f"{'NOT NULL' if col['Null'] == 'NO' else 'NULL'} "
f"{col['Extra'] if col['Extra'] else ''}")
print(f"\n💡 提示: 自增字段会自动生成,可以为空字段可以跳过")
# 收集数据
data = {}
for col in columns:
field = col['Field']
field_type = col['Type']
is_nullable = col['Null'] == 'YES'
is_auto_increment = 'auto_increment' in col['Extra'].lower()
# 跳过自增字段
if is_auto_increment:
continue
# 获取字段值
value = input(f"请输入 '{field}' ({field_type}): ").strip()
# 如果值为空且字段允许为空,设置为NULL
if value == '':
if is_nullable:
data[field] = None
else:
print(f"⚠️ 字段 '{field}' 不允许为空,使用空字符串代替")
data[field] = ''
else:
data[field] = value
# 确认插入
print(f"\n📝 将要插入的数据:")
for key, value in data.items():
print(f" {key}: {value}")
confirm = input("\n确认插入数据? (y/N): ").strip().lower()
if confirm == 'y':
# 执行插入
columns_str = ', '.join([f"`{k}`" for k in data.keys()])
placeholders = ', '.join(['%s'] * len(data))
values = tuple(data.values())
sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
cursor.execute(sql, values)
conn.commit()
print(f"✅ 数据插入成功! (ID: {cursor.lastrowid})")
else:
print("❌ 操作已取消")
conn.close()
else:
print("❌ 无效选项")
except pymysql.Error as e:
print(f"❌ 插入数据失败: {e}")
except ValueError:
print("❌ 请输入数字")
def update_data_interactive(self):
"""交互式更新数据"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要更新数据的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
# 先显示一些数据
print(f"\n先查看一些数据:")
self.query_and_display_data(table_name, limit=5)
# 获取表结构
conn = self.get_connection(self.current_db)
with conn.cursor() as cursor:
cursor.execute(f"DESC `{table_name}`")
columns = cursor.fetchall()
field_names = [col['Field'] for col in columns]
print(f"\n可更新的字段: {', '.join(field_names)}")
# 获取更新条件
print(f"\n设置更新条件 (WHERE子句):")
print(f"例如: id = 1 或 name LIKE '%张%'")
where_condition = input("请输入WHERE条件: ").strip()
if not where_condition:
print("❌ 更新条件不能为空")
return
# 先查询受影响的行数
cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}` WHERE {where_condition}")
affected_count = cursor.fetchone()['count']
if affected_count == 0:
print(f"⚠️ 没有找到匹配条件的记录")
return
print(f"⚠️ 将影响 {affected_count} 条记录")
# 收集更新数据
print(f"\n请输入要更新的字段值 (留空表示不更新该字段):")
update_data = {}
for col in columns:
field = col['Field']
field_type = col['Type']
is_auto_increment = 'auto_increment' in col['Extra'].lower()
# 跳过自增字段
if is_auto_increment:
continue
value = input(f" {field} ({field_type}): ").strip()
if value != '':
update_data[field] = value
if not update_data:
print("❌ 没有要更新的字段")
return
# 确认更新
print(f"\n📝 将要更新的数据:")
for key, value in update_data.items():
print(f" {key}: {value}")
print(f" 条件: {where_condition}")
print(f" 影响: {affected_count} 条记录")
confirm = input("\n⚠️ 确认更新? (y/N): ").strip().lower()
if confirm == 'y':
# 执行更新
set_clause = ', '.join([f"`{k}` = %s" for k in update_data.keys()])
values = tuple(update_data.values())
sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_condition}"
cursor.execute(sql, values)
conn.commit()
print(f"✅ 更新成功,影响 {cursor.rowcount} 条记录")
# 询问是否查看更新后的数据
view = input("是否查看更新后的数据? (y/N): ").strip().lower()
if view == 'y':
self.query_and_display_data(table_name, where=where_condition)
else:
print("❌ 操作已取消")
conn.close()
else:
print("❌ 无效选项")
except pymysql.Error as e:
print(f"❌ 更新数据失败: {e}")
except ValueError:
print("❌ 请输入数字")
def delete_data_interactive(self):
"""交互式删除数据"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要删除数据的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
# 先显示一些数据
print(f"\n先查看一些数据:")
self.query_and_display_data(table_name, limit=5)
# 获取删除条件
print(f"\n设置删除条件 (WHERE子句):")
print(f"⚠️ 警告: 删除操作不可恢复!")
print(f"例如: id = 1 或 age < 18")
where_condition = input("请输入WHERE条件: ").strip()
if not where_condition:
print("❌ 删除条件不能为空")
return
# 先查询受影响的行数
conn = self.get_connection(self.current_db)
with conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}` WHERE {where_condition}")
affected_count = cursor.fetchone()['count']
if affected_count == 0:
print(f"⚠️ 没有找到匹配条件的记录")
return
print(f"⚠️ 将删除 {affected_count} 条记录")
# 确认删除
confirm = input(f"\n❌ 确定要删除 {affected_count} 条记录吗? 此操作不可恢复! (输入 'DELETE' 确认): ").strip()
if confirm == 'DELETE':
# 执行删除
sql = f"DELETE FROM `{table_name}` WHERE {where_condition}"
cursor.execute(sql)
conn.commit()
print(f"✅ 删除成功,共删除 {cursor.rowcount} 条记录")
else:
print("❌ 操作已取消")
conn.close()
else:
print("❌ 无效选项")
except pymysql.Error as e:
print(f"❌ 删除数据失败: {e}")
except ValueError:
print("❌ 请输入数字")
def batch_insert_interactive(self):
"""交互式批量插入数据"""
tables = self.list_tables_interactive(show=False)
if not tables:
print("没有可用的表")
return
print("\n选择要批量插入数据的表:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
try:
choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
if 1 <= choice <= len(tables):
table_name = tables[choice-1]
# 获取表结构
conn = self.get_connection(self.current_db)
with conn.cursor() as cursor:
cursor.execute(f"DESC `{table_name}`")
columns = cursor.fetchall()
print(f"\n📋 表 '{table_name}' 结构:")
for col in columns:
print(f" - {col['Field']} ({col['Type']}) "
f"{'NOT NULL' if col['Null'] == 'NO' else 'NULL'} "
f"{col['Extra'] if col['Extra'] else ''}")
# 确定要插入的字段
print(f"\n💡 提示: 自增字段会自动生成,可以为空字段可以跳过")
print(f"请选择要插入的字段 (输入字段名,用逗号分隔):")
all_fields = [col['Field'] for col in columns]
auto_increment_fields = [col['Field'] for col in columns if 'auto_increment' in col['Extra'].lower()]
print(f"可用字段: {', '.join(all_fields)}")
if auto_increment_fields:
print(f"自增字段 (可跳过): {', '.join(auto_increment_fields)}")
selected_fields_input = input("请输入字段 (留空使用所有非自增字段): ").strip()
if selected_fields_input:
selected_fields = [f.strip() for f in selected_fields_input.split(',')]
# 验证字段是否存在
invalid_fields = [f for f in selected_fields if f not in all_fields]
if invalid_fields:
print(f"❌ 无效字段: {', '.join(invalid_fields)}")
return
else:
# 使用所有非自增字段
selected_fields = [f for f in all_fields if f not in auto_increment_fields]
print(f"将插入的字段: {', '.join(selected_fields)}")
# 收集批量数据
print(f"\n开始输入批量数据,输入 'done' 结束")
data_list = []
record_count = 1
while True:
print(f"\n记录 #{record_count}:")
data = {}
skip = False
for field in selected_fields:
# 获取字段信息
col_info = next((col for col in columns if col['Field'] == field), None)
if not col_info:
continue
value = input(f" {field} ({col_info['Type']}): ").strip()
# 检查是否输入 done
if value.lower() == 'done':
skip = True
break
# 处理空值
if value == '':
if col_info['Null'] == 'YES':
data[field] = None
else:
data[field] = ''
else:
data[field] = value
if skip:
break
if data:
data_list.append(data)
print(f"✅ 记录 #{record_count} 已添加")
record_count += 1
if not data_list:
print("❌ 没有数据可插入")
return
# 确认批量插入
print(f"\n📝 将要批量插入 {len(data_list)} 条记录:")
for i, data in enumerate(data_list[:3], 1): # 只显示前3条
print(f" 记录 {i}: {data}")
if len(data_list) > 3:
print(f" ... 还有 {len(data_list)-3} 条记录")
confirm = input(f"\n确认批量插入 {len(data_list)} 条记录? (y/N): ").strip().lower()
if confirm == 'y':
# 执行批量插入
columns_str = ', '.join([f"`{k}`" for k in selected_fields])
placeholders = ', '.join(['%s'] * len(selected_fields))
# 构建所有值
values = []
for data in data_list:
values.append(tuple(data.values()))
sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
cursor.executemany(sql, values)
conn.commit()
print(f"✅ 批量插入成功,共 {cursor.rowcount} 条记录")
else:
print("❌ 操作已取消")
conn.close()
else:
print("❌ 无效选项")
except pymysql.Error as e:
print(f"❌ 批量插入失败: {e}")
except ValueError:
print("❌ 请输入数字")
def main():
"""主函数"""
print("\n" + "="*60)
print("🚀 交互式数据库操作助手")
print("="*60)
print("功能: 创建/删除数据库、创建/删除表、插入/查询/更新/删除数据")
print("="*60)
# 创建助手实例
assistant = DatabaseAssistant()
# 显示主菜单
assistant.show_main_menu()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n👋 程序已退出")
except Exception as e:
print(f"\n❌ 程序出错: {e}")
4、将Python脚本打包成 .exe文件
Python代码如何打包成exe_哔哩哔哩_bilibili
1、下载打包程序:pip install pyinstaller
2、进入存放Python的文件夹:cd C:\Users\Admin\Desktop\Py_Mysql
3、执行打包程序: pyinstaller -F Db_Assistant.py
-F 的意思是将所有文件打包到一个exe文件中

脚本使用下来感觉一般,后续慢慢优化吧

浙公网安备 33010602011771号