使用Python操作数据库

使用Python操作数据库

1、创建SQL测试数据

-- 创建test数据库
CREATE DATABASE IF NOT EXISTS test
CHARACTER SET utf8mb4
COLLATE utf8mb4_unicode_ci;

-- 使用test数据库
USE test;

-- 创建用户表 users
-- DECIMAL(10, 2) 表示最多10位数字,其中2位小数
CREATE TABLE users(
`id` INT AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(20) NOT NULL,
`salary` DECIMAL(10, 2) NOT NULL
);

-- 插入数据
-- 字符数据使用单引号
INSERT INTO users(id, name, salary)
VALUES
    (11, 'chen', 1400),
    (12, 'wang', 1200);

image-20260203120334219

2、使用Python连接数据库

import pymysql

# 连接数据库
conn = pymysql.connect(
    host='localhost',
    user='root',
    passwd='123456',  # 密码必须是字符串,添加引号
    db='test',
    port=3306,        # 端口必须是int类型,否则会报错 ValueError: port should be of type int
    charset='utf8'    # 指定字符集为UTF-8,告诉数据库如何处理和存储文本数据。  
)

try:
    # 创建游标
    cur1 = conn.cursor()
    cur2 = conn.cursor()

    # 执行查询
    cur1.execute('SELECT * FROM users')
    cur2.execute('DESC users')

    # 获取结果
    result1 = cur1.fetchall()
    result2 = cur2.fetchall()

    # 打印结果
    print("users 表的数据:")
    for row in result1:
        print(row)
    print("\nusers 表的结构:")
    for row in result2:
        print(row)

# 关闭游标和连接
finally:
    cur1.close()
    cur2.close()
    conn.close()

3、使用AI创建数据库操作脚本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
交互式数据库操作脚本
功能: 通过菜单选择操作,自定义所有参数
用法: python db_assistant.py
"""

import pymysql
import sys
import os
from typing import List, Dict, Any, Optional

class DatabaseAssistant:
    """数据库交互助手"""
    
    def __init__(self):
        """初始化,获取数据库连接配置"""
        self.connection_params = {}
        self.current_db = None
        self.db_manager = None
        
        self.setup_database_connection()
    
    def setup_database_connection(self):
        """设置数据库连接"""
        print("\n" + "="*60)
        print("🔧 数据库连接配置")
        print("="*60)
        
        # 获取连接参数
        host = input("主机地址 (默认: localhost): ").strip() or "localhost"
        user = input("用户名 (默认: root): ").strip() or "root"
        password = input("密码 (默认: 123456): ").strip() or "123456"
        port = input("端口 (默认: 3306): ").strip() or "3306"
        charset = input("字符集 (默认: utf8mb4): ").strip() or "utf8mb4"
        
        self.connection_params = {
            'host': host,
            'user': user,
            'password': password,
            'port': int(port),
            'charset': charset
        }
        
        # 测试连接
        if self.test_connection():
            print("✅ 数据库连接成功!")
        else:
            print("❌ 连接失败,请检查参数")
            sys.exit(1)
    
    def test_connection(self):
        """测试数据库连接"""
        try:
            conn = pymysql.connect(
                host=self.connection_params['host'],
                user=self.connection_params['user'],
                password=self.connection_params['password'],
                port=self.connection_params['port'],
                charset=self.connection_params['charset']
            )
            conn.close()
            return True
        except pymysql.Error as e:
            print(f"连接错误: {e}")
            return False
    
    def get_connection(self, database=None):
        """获取数据库连接"""
        try:
            conn = pymysql.connect(
                host=self.connection_params['host'],
                user=self.connection_params['user'],
                password=self.connection_params['password'],
                database=database,
                port=self.connection_params['port'],
                charset=self.connection_params['charset'],
                cursorclass=pymysql.cursors.DictCursor
            )
            return conn
        except pymysql.Error as e:
            print(f"❌ 连接数据库失败: {e}")
            return None
    
    def show_main_menu(self):
        """显示主菜单"""
        while True:
            print("\n" + "="*60)
            print("📊 数据库操作助手 - 主菜单")
            print("="*60)
            
            if self.current_db:
                print(f"当前数据库: [{self.current_db}]")
            else:
                print("当前数据库: [未选择]")
            
            print("\n请选择操作:")
            print(" 1. 📂 数据库操作")
            print(" 2. 📋 数据表操作")
            print(" 3. 📝 数据操作")
            print(" 4. 🔄 切换数据库")
            print(" 5. 🔧 重新配置连接")
            print(" 0. ❌ 退出程序")
            
            choice = input("\n请输入选项 (0-5): ").strip()
            
            if choice == '0':
                print("👋 再见!")
                sys.exit(0)
            elif choice == '1':
                self.database_operations_menu()
            elif choice == '2':
                if not self.current_db:
                    print("⚠️  请先选择数据库!")
                    self.select_database()
                    continue
                self.table_operations_menu()
            elif choice == '3':
                if not self.current_db:
                    print("⚠️  请先选择数据库!")
                    self.select_database()
                    continue
                self.data_operations_menu()
            elif choice == '4':
                self.select_database()
            elif choice == '5':
                self.setup_database_connection()
            else:
                print("❌ 无效选项,请重新选择")
    
    def select_database(self):
        """选择当前操作的数据库"""
        print("\n" + "="*60)
        print("选择数据库")
        print("="*60)
        
        databases = self.list_databases(show=False)
        if not databases:
            print("没有可用的数据库")
            return
        
        print("\n可用的数据库:")
        for i, db in enumerate(databases, 1):
            print(f" {i}. {db}")
        
        print(f" {len(databases)+1}. [创建新数据库]")
        print(f" {len(databases)+2}. [返回主菜单]")
        
        try:
            choice = int(input(f"\n请选择数据库 (1-{len(databases)+2}): ").strip())
            
            if choice == len(databases) + 1:
                self.create_database_interactive()
            elif choice == len(databases) + 2:
                return
            elif 1 <= choice <= len(databases):
                self.current_db = databases[choice-1]
                print(f"✅ 已选择数据库: {self.current_db}")
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    def list_databases(self, show=True):
        """列出所有数据库"""
        try:
            conn = self.get_connection()
            if not conn:
                return []
                
            with conn.cursor() as cursor:
                cursor.execute("SHOW DATABASES")
                databases = [db['Database'] for db in cursor.fetchall()]
                
                if show:
                    print("\n📂 所有数据库:")
                    print("-" * 40)
                    for i, db in enumerate(databases, 1):
                        print(f"{i:2d}. {db}")
                    print("-" * 40)
                    print(f"总计: {len(databases)} 个数据库")
                
                return databases
                
        except pymysql.Error as e:
            print(f"❌ 查询数据库列表失败: {e}")
            return []
        finally:
            if conn:
                conn.close()
    
    # ==================== 数据库操作菜单 ====================
    
    def database_operations_menu(self):
        """数据库操作菜单"""
        while True:
            print("\n" + "="*60)
            print("📂 数据库操作")
            print("="*60)
            
            print("请选择操作:")
            print(" 1. 📋 查看所有数据库")
            print(" 2. ➕ 创建数据库")
            print(" 3. ❌ 删除数据库")
            print(" 4. 📊 查看数据库信息")
            print(" 0. ↩️  返回主菜单")
            
            choice = input("\n请输入选项 (0-4): ").strip()
            
            if choice == '0':
                return
            elif choice == '1':
                self.list_databases()
            elif choice == '2':
                self.create_database_interactive()
            elif choice == '3':
                self.delete_database_interactive()
            elif choice == '4':
                self.show_database_info()
            else:
                print("❌ 无效选项")
    
    def create_database_interactive(self):
        """交互式创建数据库"""
        print("\n" + "="*60)
        print("创建数据库")
        print("="*60)
        
        db_name = input("请输入数据库名: ").strip()
        if not db_name:
            print("❌ 数据库名不能为空")
            return
        
        charset = input(f"字符集 (默认: {self.connection_params['charset']}): ").strip() or self.connection_params['charset']
        
        try:
            conn = self.get_connection()
            if not conn:
                return
                
            with conn.cursor() as cursor:
                sql = f"CREATE DATABASE IF NOT EXISTS `{db_name}` " \
                      f"DEFAULT CHARACTER SET {charset} " \
                      f"COLLATE {charset}_unicode_ci"
                cursor.execute(sql)
                conn.commit()
                
                print(f"✅ 数据库 '{db_name}' 创建成功!")
                
                # 询问是否选择该数据库
                choice = input("是否选择该数据库作为当前数据库? (y/N): ").strip().lower()
                if choice == 'y':
                    self.current_db = db_name
                    print(f"✅ 已选择数据库: {self.current_db}")
                
        except pymysql.Error as e:
            print(f"❌ 创建数据库失败: {e}")
        finally:
            if conn:
                conn.close()
    
    def delete_database_interactive(self):
        """交互式删除数据库"""
        print("\n" + "="*60)
        print("删除数据库")
        print("="*60)
        
        databases = self.list_databases(show=False)
        if not databases:
            print("没有可用的数据库")
            return
        
        print("\n可删除的数据库:")
        for i, db in enumerate(databases, 1):
            print(f" {i}. {db}")
        
        try:
            choice = int(input(f"\n请选择要删除的数据库 (1-{len(databases)}): ").strip())
            
            if 1 <= choice <= len(databases):
                db_name = databases[choice-1]
                
                # 确认
                confirm = input(f"⚠️  确定要删除数据库 '{db_name}' 吗? 此操作不可恢复! (输入 'DELETE' 确认): ").strip()
                
                if confirm == 'DELETE':
                    try:
                        conn = self.get_connection()
                        if not conn:
                            return
                            
                        with conn.cursor() as cursor:
                            cursor.execute(f"DROP DATABASE IF EXISTS `{db_name}`")
                            conn.commit()
                            
                            print(f"✅ 数据库 '{db_name}' 已删除!")
                            
                            # 如果删除的是当前数据库,清除选择
                            if self.current_db == db_name:
                                self.current_db = None
                                print("⚠️  已清除当前数据库选择")
                            
                    except pymysql.Error as e:
                        print(f"❌ 删除数据库失败: {e}")
                    finally:
                        if conn:
                            conn.close()
                else:
                    print("❌ 操作已取消")
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    def show_database_info(self):
        """显示数据库信息"""
        databases = self.list_databases(show=False)
        if not databases:
            print("没有可用的数据库")
            return
        
        print("\n选择要查看的数据库:")
        for i, db in enumerate(databases, 1):
            print(f" {i}. {db}")
        
        try:
            choice = int(input(f"\n请选择数据库 (1-{len(databases)}): ").strip())
            
            if 1 <= choice <= len(databases):
                db_name = databases[choice-1]
                
                try:
                    conn = self.get_connection(db_name)
                    if not conn:
                        return
                    
                    with conn.cursor() as cursor:
                        # 获取数据库字符集
                        cursor.execute(f"SELECT DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME "
                                      f"FROM information_schema.SCHEMATA "
                                      f"WHERE SCHEMA_NAME = '{db_name}'")
                        db_info = cursor.fetchone()
                        
                        # 获取所有表
                        cursor.execute("SHOW TABLES")
                        tables = cursor.fetchall()
                        
                        # 获取表数量和数据量
                        total_rows = 0
                        for table in tables:
                            table_name = table[f'Tables_in_{db_name}']
                            cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
                            count = cursor.fetchone()['count']
                            total_rows += count
                        
                        print(f"\n📊 数据库信息: {db_name}")
                        print("-" * 50)
                        print(f"字符集: {db_info['DEFAULT_CHARACTER_SET_NAME']}")
                        print(f"排序规则: {db_info['DEFAULT_COLLATION_NAME']}")
                        print(f"表数量: {len(tables)}")
                        print(f"总数据行数: {total_rows:,}")
                        print("-" * 50)
                        
                        if tables:
                            print("\n📋 表列表:")
                            for table in tables:
                                table_name = table[f'Tables_in_{db_name}']
                                cursor.execute(f"SHOW TABLE STATUS LIKE '{table_name}'")
                                status = cursor.fetchone()
                                print(f"  - {table_name} (行数: {status['Rows']:,})")
                    
                except pymysql.Error as e:
                    print(f"❌ 查询数据库信息失败: {e}")
                finally:
                    if conn:
                        conn.close()
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    # ==================== 数据表操作菜单 ====================
    
    def table_operations_menu(self):
        """数据表操作菜单"""
        while True:
            print("\n" + "="*60)
            print(f"📋 数据表操作 - 数据库: {self.current_db}")
            print("="*60)
            
            print("请选择操作:")
            print(" 1. 📋 查看所有表")
            print(" 2. ➕ 创建表")
            print(" 3. ❌ 删除表")
            print(" 4. 📊 查看表结构")
            print(" 5. 🔧 添加表字段")
            print(" 6. 📄 查看表详细信息")
            print(" 0. ↩️  返回主菜单")
            
            choice = input("\n请输入选项 (0-6): ").strip()
            
            if choice == '0':
                return
            elif choice == '1':
                self.list_tables_interactive()
            elif choice == '2':
                self.create_table_interactive()
            elif choice == '3':
                self.delete_table_interactive()
            elif choice == '4':
                self.describe_table_interactive()
            elif choice == '5':
                self.add_column_interactive()
            elif choice == '6':
                self.show_table_info_interactive()
            else:
                print("❌ 无效选项")
    
    def list_tables_interactive(self, show=True):
        """交互式列出表"""
        try:
            conn = self.get_connection(self.current_db)
            if not conn:
                return []
                
            with conn.cursor() as cursor:
                cursor.execute("SHOW TABLES")
                tables = [list(table.values())[0] for table in cursor.fetchall()]
                
                if show:
                    if tables:
                        print(f"\n📊 数据库 '{self.current_db}' 中的表:")
                        print("-" * 50)
                        for i, table in enumerate(tables, 1):
                            cursor.execute(f"SHOW TABLE STATUS LIKE '{table}'")
                            table_info = cursor.fetchone()
                            rows = table_info['Rows'] if table_info else '?'
                            print(f"{i:2d}. {table:20s} (行数: {rows:,})")
                        print("-" * 50)
                    else:
                        print(f"数据库 '{self.current_db}' 中没有表")
                
                return tables
                
        except pymysql.Error as e:
            print(f"❌ 查询表列表失败: {e}")
            return []
        finally:
            if conn:
                conn.close()
    
    def create_table_interactive(self):
        """交互式创建表"""
        print("\n" + "="*60)
        print("创建数据表")
        print("="*60)
        
        table_name = input("请输入表名: ").strip()
        if not table_name:
            print("❌ 表名不能为空")
            return
        
        print("\n现在开始定义表字段,输入 'done' 结束")
        columns = []
        field_count = 1
        
        while True:
            print(f"\n字段 #{field_count}:")
            field_name = input("  字段名: ").strip()
            
            if field_name.lower() == 'done':
                if len(columns) == 0:
                    print("⚠️  至少需要定义一个字段")
                    continue
                break
            
            if not field_name:
                print("❌ 字段名不能为空")
                continue
            
            # 字段类型
            print("\n  常用字段类型:")
            print("  1. INT - 整数")
            print("  2. VARCHAR(长度) - 可变长度字符串")
            print("  3. TEXT - 长文本")
            print("  4. DECIMAL(总位数,小数位) - 小数")
            print("  5. DATE - 日期")
            print("  6. DATETIME - 日期时间")
            print("  7. TIMESTAMP - 时间戳")
            print("  8. 自定义输入")
            
            type_choice = input("\n  请选择字段类型 (1-8): ").strip()
            
            if type_choice == '1':
                field_type = "INT"
            elif type_choice == '2':
                length = input("  请输入最大长度 (默认: 255): ").strip() or "255"
                field_type = f"VARCHAR({length})"
            elif type_choice == '3':
                field_type = "TEXT"
            elif type_choice == '4':
                total = input("  总位数 (默认: 10): ").strip() or "10"
                decimal = input("  小数位数 (默认: 2): ").strip() or "2"
                field_type = f"DECIMAL({total},{decimal})"
            elif type_choice == '5':
                field_type = "DATE"
            elif type_choice == '6':
                field_type = "DATETIME"
            elif type_choice == '7':
                field_type = "TIMESTAMP"
            elif type_choice == '8':
                field_type = input("  请输入自定义字段类型: ").strip()
            else:
                print("❌ 无效选项,使用默认类型 VARCHAR(255)")
                field_type = "VARCHAR(255)"
            
            # 字段额外属性
            print("\n  字段额外属性 (可多选,用逗号分隔):")
            print("  1. NOT NULL - 不允许为空")
            print("  2. UNIQUE - 唯一约束")
            print("  3. PRIMARY KEY - 主键")
            print("  4. AUTO_INCREMENT - 自增")
            print("  5. DEFAULT '值' - 默认值")
            print("  6. 无")
            
            extra_choice = input("\n  请选择额外属性 (1-6): ").strip()
            
            extra = ""
            if extra_choice == '1':
                extra = "NOT NULL"
            elif extra_choice == '2':
                extra = "UNIQUE"
            elif extra_choice == '3':
                extra = "PRIMARY KEY"
            elif extra_choice == '4':
                extra = "AUTO_INCREMENT"
            elif extra_choice == '5':
                default_value = input("  请输入默认值: ").strip()
                extra = f"DEFAULT '{default_value}'"
            elif extra_choice == '6':
                extra = ""
            
            # 添加字段
            column = {"name": field_name, "type": field_type}
            if extra:
                column["extra"] = extra
            
            columns.append(column)
            field_count += 1
            
            print(f"✅ 字段 '{field_name}' 已添加")
            print(f"   已定义字段: {', '.join([col['name'] for col in columns])}")
        
        # 创建表
        try:
            conn = self.get_connection(self.current_db)
            if not conn:
                return
                
            with conn.cursor() as cursor:
                # 构建字段定义SQL
                column_defs = []
                for col in columns:
                    col_def = f"`{col['name']}` {col['type']}"
                    if 'extra' in col:
                        col_def += f" {col['extra']}"
                    column_defs.append(col_def)
                
                columns_sql = ",\n    ".join(column_defs)
                
                # 完整的建表SQL
                sql = f"""
                CREATE TABLE IF NOT EXISTS `{table_name}` (
                    {columns_sql}
                ) ENGINE=InnoDB DEFAULT CHARSET={self.connection_params['charset']}
                """
                
                cursor.execute(sql)
                conn.commit()
                print(f"\n✅ 表 '{table_name}' 创建成功!")
                
                # 询问是否查看表结构
                choice = input("是否查看表结构? (y/N): ").strip().lower()
                if choice == 'y':
                    self.describe_table(self.current_db, table_name)
                
        except pymysql.Error as e:
            print(f"❌ 创建表失败: {e}")
        finally:
            if conn:
                conn.close()
    
    def delete_table_interactive(self):
        """交互式删除表"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要删除的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择要删除的表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                # 确认
                confirm = input(f"⚠️  确定要删除表 '{table_name}' 吗? (y/N): ").strip().lower()
                
                if confirm == 'y':
                    try:
                        conn = self.get_connection(self.current_db)
                        if not conn:
                            return
                            
                        with conn.cursor() as cursor:
                            cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
                            conn.commit()
                            print(f"✅ 表 '{table_name}' 已删除!")
                            
                    except pymysql.Error as e:
                        print(f"❌ 删除表失败: {e}")
                    finally:
                        if conn:
                            conn.close()
                else:
                    print("❌ 操作已取消")
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    def describe_table_interactive(self):
        """交互式查看表结构"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要查看的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                self.describe_table(self.current_db, table_name)
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    def describe_table(self, db_name, table_name):
        """显示表结构"""
        try:
            conn = self.get_connection(db_name)
            if not conn:
                return False
                
            with conn.cursor() as cursor:
                # 获取表结构
                cursor.execute(f"DESC `{table_name}`")
                columns = cursor.fetchall()
                
                if not columns:
                    print(f"表 '{table_name}' 不存在或没有字段")
                    return False
                
                print(f"\n📋 表结构: {table_name}")
                print("-" * 80)
                print(f"{'字段名':15s} {'类型':20s} {'允许空':8s} {'键':5s} {'默认值':15s} {'额外':10s}")
                print("-" * 80)
                
                for col in columns:
                    print(f"{col['Field']:15s} {col['Type']:20s} {col['Null']:8s} "
                          f"{col['Key']:5s} {str(col['Default']):15s} {col['Extra']:10s}")
                print("-" * 80)
                
                return True
                
        except pymysql.Error as e:
            print(f"❌ 查询表结构失败: {e}")
            return False
        finally:
            if conn:
                conn.close()
    
    def add_column_interactive(self):
        """交互式添加字段"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要添加字段的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                print(f"\n为表 '{table_name}' 添加新字段:")
                
                field_name = input("字段名: ").strip()
                if not field_name:
                    print("❌ 字段名不能为空")
                    return
                
                print("\n常用字段类型:")
                print("1. INT - 整数")
                print("2. VARCHAR(长度) - 可变长度字符串")
                print("3. TEXT - 长文本")
                print("4. DECIMAL(总位数,小数位) - 小数")
                print("5. DATE - 日期")
                print("6. 自定义输入")
                
                type_choice = input("\n请选择字段类型 (1-6): ").strip()
                
                if type_choice == '1':
                    field_type = "INT"
                elif type_choice == '2':
                    length = input("请输入最大长度 (默认: 255): ").strip() or "255"
                    field_type = f"VARCHAR({length})"
                elif type_choice == '3':
                    field_type = "TEXT"
                elif type_choice == '4':
                    total = input("总位数 (默认: 10): ").strip() or "10"
                    decimal = input("小数位数 (默认: 2): ").strip() or "2"
                    field_type = f"DECIMAL({total},{decimal})"
                elif type_choice == '5':
                    field_type = "DATE"
                elif type_choice == '6':
                    field_type = input("请输入自定义字段类型: ").strip()
                else:
                    print("❌ 无效选项,使用默认类型 VARCHAR(255)")
                    field_type = "VARCHAR(255)"
                
                # 字段额外属性
                print("\n字段额外属性:")
                print("1. NOT NULL - 不允许为空")
                print("2. NULL - 允许为空 (默认)")
                print("3. DEFAULT '值' - 默认值")
                
                extra_choice = input("\n请选择额外属性 (1-3): ").strip()
                
                if extra_choice == '1':
                    extra = "NOT NULL"
                elif extra_choice == '3':
                    default_value = input("请输入默认值: ").strip()
                    extra = f"DEFAULT '{default_value}'"
                else:
                    extra = "NULL"
                
                # 添加到哪个字段后面
                conn = self.get_connection(self.current_db)
                with conn.cursor() as cursor:
                    cursor.execute(f"DESC `{table_name}`")
                    columns = cursor.fetchall()
                    
                    if columns:
                        print("\n现有字段:")
                        for i, col in enumerate(columns, 1):
                            print(f" {i}. {col['Field']}")
                        
                        print(f" {len(columns)+1}. 添加到第一个")
                        print(f" {len(columns)+2}. 不指定位置")
                        
                        pos_choice = input(f"\n新字段添加到哪个字段后面? (1-{len(columns)+2}): ").strip()
                        
                        try:
                            pos_choice = int(pos_choice)
                            if 1 <= pos_choice <= len(columns):
                                after_field = columns[pos_choice-1]['Field']
                            else:
                                after_field = None
                        except:
                            after_field = None
                    else:
                        after_field = None
                
                # 构建字段定义
                column_def = {"name": field_name, "type": field_type, "extra": extra}
                if after_field:
                    column_def["after"] = after_field
                
                # 添加字段
                try:
                    if not conn:
                        conn = self.get_connection(self.current_db)
                    
                    with conn.cursor() as cursor:
                        col_def = f"`{column_def['name']}` {column_def['type']} {column_def['extra']}"
                        if 'after' in column_def:
                            col_def += f" AFTER `{column_def['after']}`"
                        
                        sql = f"ALTER TABLE `{table_name}` ADD COLUMN {col_def}"
                        cursor.execute(sql)
                        conn.commit()
                        
                        print(f"✅ 字段 '{field_name}' 添加成功!")
                        
                        # 询问是否查看表结构
                        choice = input("是否查看更新后的表结构? (y/N): ").strip().lower()
                        if choice == 'y':
                            self.describe_table(self.current_db, table_name)
                        
                except pymysql.Error as e:
                    print(f"❌ 添加字段失败: {e}")
                finally:
                    if conn:
                        conn.close()
                
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    def show_table_info_interactive(self):
        """交互式查看表详细信息"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要查看的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                try:
                    conn = self.get_connection(self.current_db)
                    if not conn:
                        return {}
                    
                    with conn.cursor() as cursor:
                        # 获取表状态
                        cursor.execute(f"SHOW TABLE STATUS LIKE '{table_name}'")
                        table_status = cursor.fetchone()
                        
                        if not table_status:
                            print(f"表 '{table_name}' 不存在")
                            return {}
                        
                        # 获取表结构
                        cursor.execute(f"DESC `{table_name}`")
                        columns = cursor.fetchall()
                        
                        # 获取索引
                        cursor.execute(f"SHOW INDEX FROM `{table_name}`")
                        indexes = cursor.fetchall()
                        
                        # 获取数据行数
                        cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
                        row_count = cursor.fetchone()['count']
                        
                        print(f"\n📊 表详细信息: {table_name}")
                        print("=" * 60)
                        print(f"数据库: {self.current_db}")
                        print(f"引擎: {table_status['Engine']}")
                        print(f"行数: {row_count:,}")
                        print(f"创建时间: {table_status['Create_time']}")
                        print(f"字符集: {table_status['Collation']}")
                        print(f"注释: {table_status['Comment']}")
                        print(f"自增当前值: {table_status['Auto_increment'] or 'N/A'}")
                        print("=" * 60)
                        print(f"字段数: {len(columns)}")
                        print(f"索引数: {len(indexes)}")
                        
                        if indexes:
                            print("\n🔑 索引信息:")
                            primary_keys = []
                            unique_indexes = {}
                            for idx in indexes:
                                if idx['Key_name'] == 'PRIMARY':
                                    primary_keys.append(idx['Column_name'])
                                elif idx['Non_unique'] == 0:
                                    if idx['Key_name'] not in unique_indexes:
                                        unique_indexes[idx['Key_name']] = []
                                    unique_indexes[idx['Key_name']].append(idx['Column_name'])
                                else:
                                    print(f"  普通索引 '{idx['Key_name']}': {idx['Column_name']}")
                            
                            if primary_keys:
                                print(f"  主键: {', '.join(primary_keys)}")
                            
                            for idx_name, cols in unique_indexes.items():
                                print(f"  唯一索引 '{idx_name}': {', '.join(cols)}")
                        
                        return {
                            'status': table_status,
                            'columns': columns,
                            'indexes': indexes
                        }
                        
                except pymysql.Error as e:
                    print(f"❌ 获取表信息失败: {e}")
                    return {}
                finally:
                    if conn:
                        conn.close()
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    # ==================== 数据操作菜单 ====================
    
    def data_operations_menu(self):
        """数据操作菜单"""
        while True:
            print("\n" + "="*60)
            print(f"📝 数据操作 - 数据库: {self.current_db}")
            print("="*60)
            
            print("请选择操作:")
            print(" 1. 🔍 查询数据")
            print(" 2. ➕ 插入数据")
            print(" 3. 🔄 更新数据")
            print(" 4. ❌ 删除数据")
            print(" 5. 📥 批量插入")
            print(" 0. ↩️  返回主菜单")
            
            choice = input("\n请输入选项 (0-5): ").strip()
            
            if choice == '0':
                return
            elif choice == '1':
                self.query_data_interactive()
            elif choice == '2':
                self.insert_data_interactive()
            elif choice == '3':
                self.update_data_interactive()
            elif choice == '4':
                self.delete_data_interactive()
            elif choice == '5':
                self.batch_insert_interactive()
            else:
                print("❌ 无效选项")
    
    def query_data_interactive(self):
        """交互式查询数据"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要查询的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                # 获取表结构以显示字段
                conn = self.get_connection(self.current_db)
                with conn.cursor() as cursor:
                    cursor.execute(f"DESC `{table_name}`")
                    columns = cursor.fetchall()
                    field_names = [col['Field'] for col in columns]
                    
                    print(f"\n表 '{table_name}' 的字段: {', '.join(field_names)}")
                    
                    # 查询选项
                    print("\n查询选项:")
                    print("1. 查询所有数据")
                    print("2. 条件查询")
                    print("3. 排序查询")
                    print("4. 限制查询条数")
                    print("5. 自定义查询")
                    
                    query_choice = input("\n请选择查询方式 (1-5): ").strip()
                    
                    if query_choice == '1':
                        # 查询所有数据
                        self.query_and_display_data(table_name)
                    
                    elif query_choice == '2':
                        # 条件查询
                        print("\n可用的字段: " + ", ".join(field_names))
                        condition = input("请输入WHERE条件 (例如: age > 18 AND name LIKE '%张%'): ").strip()
                        if condition:
                            self.query_and_display_data(table_name, where=condition)
                        else:
                            print("❌ 条件不能为空")
                    
                    elif query_choice == '3':
                        # 排序查询
                        print("\n可排序的字段: " + ", ".join(field_names))
                        order_field = input("请输入排序字段: ").strip()
                        order_direction = input("排序方向 (ASC/DESC, 默认: ASC): ").strip().upper() or "ASC"
                        
                        if order_field:
                            self.query_and_display_data(table_name, order_by=f"{order_field} {order_direction}")
                        else:
                            print("❌ 排序字段不能为空")
                    
                    elif query_choice == '4':
                        # 限制查询条数
                        limit_count = input("请输入要查询的条数: ").strip()
                        if limit_count.isdigit():
                            self.query_and_display_data(table_name, limit=limit_count)
                        else:
                            print("❌ 请输入有效的数字")
                    
                    elif query_choice == '5':
                        # 自定义查询
                        print("\n自定义查询")
                        select_fields = input(f"选择字段 (默认: *, 可用字段: {', '.join(field_names)}): ").strip() or "*"
                        where_condition = input("WHERE条件 (可选): ").strip()
                        order_by = input("ORDER BY (可选): ").strip()
                        limit = input("LIMIT (可选): ").strip()
                        
                        self.query_and_display_data(
                            table_name, 
                            columns=select_fields,
                            where=where_condition if where_condition else None,
                            order_by=order_by if order_by else None,
                            limit=limit if limit else None
                        )
                    
                    else:
                        print("❌ 无效选项")
                
                conn.close()
                
            else:
                print("❌ 无效选项")
        except ValueError:
            print("❌ 请输入数字")
    
    def query_and_display_data(self, table_name, columns='*', where=None, order_by=None, limit=None):
        """查询并显示数据"""
        try:
            conn = self.get_connection(self.current_db)
            if not conn:
                return []
                
            with conn.cursor() as cursor:
                # 构建查询SQL
                sql = f"SELECT {columns} FROM `{table_name}`"
                
                if where:
                    sql += f" WHERE {where}"
                
                if order_by:
                    sql += f" ORDER BY {order_by}"
                
                if limit:
                    sql += f" LIMIT {limit}"
                
                print(f"\n📝 执行的SQL: {sql}")
                
                cursor.execute(sql)
                results = cursor.fetchall()
                
                # 显示结果
                if results:
                    print(f"\n🔍 查询结果 ({len(results)} 条记录):")
                    print("=" * 80)
                    
                    # 显示字段名
                    fields = list(results[0].keys())
                    header = " | ".join([f"{field:20s}" for field in fields])
                    print(header)
                    print("-" * 80)
                    
                    # 显示数据
                    for row in results:
                        line = " | ".join([f"{str(value)[:20]:20s}" for value in row.values()])
                        print(line)
                    print("=" * 80)
                else:
                    print("没有找到匹配的记录")
                
                return results
                
        except pymysql.Error as e:
            print(f"❌ 查询数据失败: {e}")
            return []
        finally:
            if conn:
                conn.close()
    
    def insert_data_interactive(self):
        """交互式插入数据"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要插入数据的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                # 获取表结构
                conn = self.get_connection(self.current_db)
                with conn.cursor() as cursor:
                    cursor.execute(f"DESC `{table_name}`")
                    columns = cursor.fetchall()
                    
                    print(f"\n📋 表 '{table_name}' 结构:")
                    for col in columns:
                        print(f"  - {col['Field']} ({col['Type']}) "
                              f"{'NOT NULL' if col['Null'] == 'NO' else 'NULL'} "
                              f"{col['Extra'] if col['Extra'] else ''}")
                    
                    print(f"\n💡 提示: 自增字段会自动生成,可以为空字段可以跳过")
                    
                    # 收集数据
                    data = {}
                    for col in columns:
                        field = col['Field']
                        field_type = col['Type']
                        is_nullable = col['Null'] == 'YES'
                        is_auto_increment = 'auto_increment' in col['Extra'].lower()
                        
                        # 跳过自增字段
                        if is_auto_increment:
                            continue
                        
                        # 获取字段值
                        value = input(f"请输入 '{field}' ({field_type}): ").strip()
                        
                        # 如果值为空且字段允许为空,设置为NULL
                        if value == '':
                            if is_nullable:
                                data[field] = None
                            else:
                                print(f"⚠️  字段 '{field}' 不允许为空,使用空字符串代替")
                                data[field] = ''
                        else:
                            data[field] = value
                    
                    # 确认插入
                    print(f"\n📝 将要插入的数据:")
                    for key, value in data.items():
                        print(f"  {key}: {value}")
                    
                    confirm = input("\n确认插入数据? (y/N): ").strip().lower()
                    
                    if confirm == 'y':
                        # 执行插入
                        columns_str = ', '.join([f"`{k}`" for k in data.keys()])
                        placeholders = ', '.join(['%s'] * len(data))
                        values = tuple(data.values())
                        
                        sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
                        cursor.execute(sql, values)
                        conn.commit()
                        
                        print(f"✅ 数据插入成功! (ID: {cursor.lastrowid})")
                    else:
                        print("❌ 操作已取消")
                
                conn.close()
                
            else:
                print("❌ 无效选项")
        except pymysql.Error as e:
            print(f"❌ 插入数据失败: {e}")
        except ValueError:
            print("❌ 请输入数字")
    
    def update_data_interactive(self):
        """交互式更新数据"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要更新数据的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                # 先显示一些数据
                print(f"\n先查看一些数据:")
                self.query_and_display_data(table_name, limit=5)
                
                # 获取表结构
                conn = self.get_connection(self.current_db)
                with conn.cursor() as cursor:
                    cursor.execute(f"DESC `{table_name}`")
                    columns = cursor.fetchall()
                    field_names = [col['Field'] for col in columns]
                    
                    print(f"\n可更新的字段: {', '.join(field_names)}")
                    
                    # 获取更新条件
                    print(f"\n设置更新条件 (WHERE子句):")
                    print(f"例如: id = 1 或 name LIKE '%张%'")
                    where_condition = input("请输入WHERE条件: ").strip()
                    
                    if not where_condition:
                        print("❌ 更新条件不能为空")
                        return
                    
                    # 先查询受影响的行数
                    cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}` WHERE {where_condition}")
                    affected_count = cursor.fetchone()['count']
                    
                    if affected_count == 0:
                        print(f"⚠️  没有找到匹配条件的记录")
                        return
                    
                    print(f"⚠️  将影响 {affected_count} 条记录")
                    
                    # 收集更新数据
                    print(f"\n请输入要更新的字段值 (留空表示不更新该字段):")
                    update_data = {}
                    
                    for col in columns:
                        field = col['Field']
                        field_type = col['Type']
                        is_auto_increment = 'auto_increment' in col['Extra'].lower()
                        
                        # 跳过自增字段
                        if is_auto_increment:
                            continue
                        
                        value = input(f"  {field} ({field_type}): ").strip()
                        if value != '':
                            update_data[field] = value
                    
                    if not update_data:
                        print("❌ 没有要更新的字段")
                        return
                    
                    # 确认更新
                    print(f"\n📝 将要更新的数据:")
                    for key, value in update_data.items():
                        print(f"  {key}: {value}")
                    print(f"  条件: {where_condition}")
                    print(f"  影响: {affected_count} 条记录")
                    
                    confirm = input("\n⚠️  确认更新? (y/N): ").strip().lower()
                    
                    if confirm == 'y':
                        # 执行更新
                        set_clause = ', '.join([f"`{k}` = %s" for k in update_data.keys()])
                        values = tuple(update_data.values())
                        
                        sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_condition}"
                        cursor.execute(sql, values)
                        conn.commit()
                        
                        print(f"✅ 更新成功,影响 {cursor.rowcount} 条记录")
                        
                        # 询问是否查看更新后的数据
                        view = input("是否查看更新后的数据? (y/N): ").strip().lower()
                        if view == 'y':
                            self.query_and_display_data(table_name, where=where_condition)
                    else:
                        print("❌ 操作已取消")
                
                conn.close()
                
            else:
                print("❌ 无效选项")
        except pymysql.Error as e:
            print(f"❌ 更新数据失败: {e}")
        except ValueError:
            print("❌ 请输入数字")
    
    def delete_data_interactive(self):
        """交互式删除数据"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要删除数据的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                # 先显示一些数据
                print(f"\n先查看一些数据:")
                self.query_and_display_data(table_name, limit=5)
                
                # 获取删除条件
                print(f"\n设置删除条件 (WHERE子句):")
                print(f"⚠️  警告: 删除操作不可恢复!")
                print(f"例如: id = 1 或 age < 18")
                where_condition = input("请输入WHERE条件: ").strip()
                
                if not where_condition:
                    print("❌ 删除条件不能为空")
                    return
                
                # 先查询受影响的行数
                conn = self.get_connection(self.current_db)
                with conn.cursor() as cursor:
                    cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}` WHERE {where_condition}")
                    affected_count = cursor.fetchone()['count']
                    
                    if affected_count == 0:
                        print(f"⚠️  没有找到匹配条件的记录")
                        return
                    
                    print(f"⚠️  将删除 {affected_count} 条记录")
                    
                    # 确认删除
                    confirm = input(f"\n❌ 确定要删除 {affected_count} 条记录吗? 此操作不可恢复! (输入 'DELETE' 确认): ").strip()
                    
                    if confirm == 'DELETE':
                        # 执行删除
                        sql = f"DELETE FROM `{table_name}` WHERE {where_condition}"
                        cursor.execute(sql)
                        conn.commit()
                        
                        print(f"✅ 删除成功,共删除 {cursor.rowcount} 条记录")
                    else:
                        print("❌ 操作已取消")
                
                conn.close()
                
            else:
                print("❌ 无效选项")
        except pymysql.Error as e:
            print(f"❌ 删除数据失败: {e}")
        except ValueError:
            print("❌ 请输入数字")
    
    def batch_insert_interactive(self):
        """交互式批量插入数据"""
        tables = self.list_tables_interactive(show=False)
        if not tables:
            print("没有可用的表")
            return
        
        print("\n选择要批量插入数据的表:")
        for i, table in enumerate(tables, 1):
            print(f" {i}. {table}")
        
        try:
            choice = int(input(f"\n请选择表 (1-{len(tables)}): ").strip())
            
            if 1 <= choice <= len(tables):
                table_name = tables[choice-1]
                
                # 获取表结构
                conn = self.get_connection(self.current_db)
                with conn.cursor() as cursor:
                    cursor.execute(f"DESC `{table_name}`")
                    columns = cursor.fetchall()
                    
                    print(f"\n📋 表 '{table_name}' 结构:")
                    for col in columns:
                        print(f"  - {col['Field']} ({col['Type']}) "
                              f"{'NOT NULL' if col['Null'] == 'NO' else 'NULL'} "
                              f"{col['Extra'] if col['Extra'] else ''}")
                    
                    # 确定要插入的字段
                    print(f"\n💡 提示: 自增字段会自动生成,可以为空字段可以跳过")
                    print(f"请选择要插入的字段 (输入字段名,用逗号分隔):")
                    
                    all_fields = [col['Field'] for col in columns]
                    auto_increment_fields = [col['Field'] for col in columns if 'auto_increment' in col['Extra'].lower()]
                    
                    print(f"可用字段: {', '.join(all_fields)}")
                    if auto_increment_fields:
                        print(f"自增字段 (可跳过): {', '.join(auto_increment_fields)}")
                    
                    selected_fields_input = input("请输入字段 (留空使用所有非自增字段): ").strip()
                    
                    if selected_fields_input:
                        selected_fields = [f.strip() for f in selected_fields_input.split(',')]
                        # 验证字段是否存在
                        invalid_fields = [f for f in selected_fields if f not in all_fields]
                        if invalid_fields:
                            print(f"❌ 无效字段: {', '.join(invalid_fields)}")
                            return
                    else:
                        # 使用所有非自增字段
                        selected_fields = [f for f in all_fields if f not in auto_increment_fields]
                    
                    print(f"将插入的字段: {', '.join(selected_fields)}")
                    
                    # 收集批量数据
                    print(f"\n开始输入批量数据,输入 'done' 结束")
                    data_list = []
                    record_count = 1
                    
                    while True:
                        print(f"\n记录 #{record_count}:")
                        
                        data = {}
                        skip = False
                        
                        for field in selected_fields:
                            # 获取字段信息
                            col_info = next((col for col in columns if col['Field'] == field), None)
                            if not col_info:
                                continue
                            
                            value = input(f"  {field} ({col_info['Type']}): ").strip()
                            
                            # 检查是否输入 done
                            if value.lower() == 'done':
                                skip = True
                                break
                            
                            # 处理空值
                            if value == '':
                                if col_info['Null'] == 'YES':
                                    data[field] = None
                                else:
                                    data[field] = ''
                            else:
                                data[field] = value
                        
                        if skip:
                            break
                        
                        if data:
                            data_list.append(data)
                            print(f"✅ 记录 #{record_count} 已添加")
                            record_count += 1
                    
                    if not data_list:
                        print("❌ 没有数据可插入")
                        return
                    
                    # 确认批量插入
                    print(f"\n📝 将要批量插入 {len(data_list)} 条记录:")
                    for i, data in enumerate(data_list[:3], 1):  # 只显示前3条
                        print(f"  记录 {i}: {data}")
                    
                    if len(data_list) > 3:
                        print(f"  ... 还有 {len(data_list)-3} 条记录")
                    
                    confirm = input(f"\n确认批量插入 {len(data_list)} 条记录? (y/N): ").strip().lower()
                    
                    if confirm == 'y':
                        # 执行批量插入
                        columns_str = ', '.join([f"`{k}`" for k in selected_fields])
                        placeholders = ', '.join(['%s'] * len(selected_fields))
                        
                        # 构建所有值
                        values = []
                        for data in data_list:
                            values.append(tuple(data.values()))
                        
                        sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
                        cursor.executemany(sql, values)
                        conn.commit()
                        
                        print(f"✅ 批量插入成功,共 {cursor.rowcount} 条记录")
                    else:
                        print("❌ 操作已取消")
                
                conn.close()
                
            else:
                print("❌ 无效选项")
        except pymysql.Error as e:
            print(f"❌ 批量插入失败: {e}")
        except ValueError:
            print("❌ 请输入数字")

def main():
    """主函数"""
    print("\n" + "="*60)
    print("🚀 交互式数据库操作助手")
    print("="*60)
    print("功能: 创建/删除数据库、创建/删除表、插入/查询/更新/删除数据")
    print("="*60)
    
    # 创建助手实例
    assistant = DatabaseAssistant()
    
    # 显示主菜单
    assistant.show_main_menu()

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n\n👋 程序已退出")
    except Exception as e:
        print(f"\n❌ 程序出错: {e}")

4、将Python脚本打包成 .exe文件

Python代码如何打包成exe_哔哩哔哩_bilibili

1、下载打包程序:pip install pyinstaller

2、进入存放Python的文件夹:cd C:\Users\Admin\Desktop\Py_Mysql

3、执行打包程序: pyinstaller -F Db_Assistant.py

-F 的意思是将所有文件打包到一个exe文件中

image-20260203131323173

脚本使用下来感觉一般,后续慢慢优化吧

posted @ 2026-02-03 13:39  Q&25  阅读(5)  评论(1)    收藏  举报