Python3 两个数据库所有表索引的对比验证

Python3 两个数据库所有表索引的对比验证

接着上一篇文章继续说明:Python3 两个数据库触发器、存储过程、函数、视图、表结构、索引对比验证

实现结果如下

Snipaste_2022-06-26_23-37-49.jpg

对比脚步实现

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors

"""
表索引对比
"""

wb_result = Workbook()
sheet = wb_result.active


def result_excel_init():
    highlight = NamedStyle(name="highlight")
    highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
    highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
    highlight.fill = PatternFill("solid", fgColor="ACB9C9")

    sheet.append(['表名', '{} 缺索引'.format(s_db), '{} 缺索引'.format(t_db)])

    sheet.row_dimensions[1].height = 25
    sheet.column_dimensions['A'].width = 50
    sheet.column_dimensions['B'].width = 150
    sheet.column_dimensions['C'].width = 150
    for cell in list(sheet.rows)[0]:
        cell.style = highlight


def result_excel_end():
    style = NamedStyle(name="style")
    v_center = Alignment(vertical='center', wrap_text=True)
    hv_list = [1]
    for i, row in enumerate(list(sheet.rows)):
        if i == 0:
            continue
        for cell in row:
            if cell.column in hv_list:
                pass
            else:
                style.alignment = v_center
            cell.style = style


def export_excel_file(data_list, file_name):
    print('\n--- 对比结束,导出 Excel 文件 ---\n')
    result_excel_init()
    for data in data_list:
        print(data)
        sheet.append([data.get('表名'), data.get('{} 缺索引'.format(s_db), '').replace(' ', ' '), data.get('{} 缺索引'.format(t_db), '').replace(' ', ' ')])
    result_excel_end()
    wb_result.save(file_name)
    print('\n执行导出结果完成 {}\n'.format(file_name))


def exec_sql(sql, db_name, tb=''):
    db = pymysql.connect(host='Ip', port=3306, user='用户', password='密码', database=db_name)
    cursor = db.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()
    if results:
        return results
    # else:
    #     print('\t{}.{} 查询无索引'.format(db_name, tb if tb else ''))


if __name__ == '__main__':
    s_db = 'From'
    t_db = 'To'
    schema_sql = """
                SELECT CONCAT( 'ALTER TABLE ', TABLE_NAME, '', ' ADD ',
                    IF
                        (NON_UNIQUE = 1,
                            CASE
                                UPPER( INDEX_TYPE ) 
                                WHEN 'FULLTEXT' THEN 'FULLTEXT INDEX' 
                                WHEN 'SPATIAL' THEN 'SPATIAL INDEX' ELSE CONCAT( 'INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) 
                            END,
                        IF
                            ( UPPER( INDEX_NAME ) = 'PRIMARY', CONCAT( 'PRIMARY KEY USING ', INDEX_TYPE ), CONCAT( 'UNIQUE INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) ) 
                        ),
                        '(', GROUP_CONCAT( DISTINCT CONCAT( '', COLUMN_NAME, '' ) ORDER BY SEQ_IN_INDEX ASC SEPARATOR ', ' ), ');' 
                ) AS 'Show_Add_Indexes' 
                FROM information_schema.STATISTICS 
                WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}' 
                GROUP BY TABLE_NAME, INDEX_NAME 
                ORDER BY TABLE_NAME ASC, INDEX_NAME ASC;
                """

    source_tables_result = exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
    target_tables_result = exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
    source_tables = set([tb for (tb,) in source_tables_result])
    target_tables = set([tb for (tb,) in target_tables_result])

    intersection_tables = set(source_tables) & set(target_tables)
    s_dif_tables = set(source_tables) - set(target_tables)
    t_dif_tables = set(target_tables) - set(source_tables)
    print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
    print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
    print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))

    comp_data_list = list()
    for i, table in enumerate(sorted(intersection_tables)):
        if table:
            s_result = exec_sql(schema_sql.format(s_db, table), s_db, table)
            t_result = exec_sql(schema_sql.format(t_db, table), t_db, table)
            if s_result == t_result:
                continue

            s_rows = set([data for (data,) in s_result]) if s_result else set()
            t_rows = set([data for (data,) in t_result]) if t_result else set()
            t_dif_rows = s_rows - t_rows
            s_dif_rows = t_rows - s_rows
            if t_dif_rows:
                print(t_db, '缺索引:', t_dif_rows)
                comp_data_list.append({
                    '表名': table,
                    '{} 缺索引'.format(t_db): '\n'.join(t_dif_rows),
                })
            if s_dif_rows:
                print(s_db, '缺索引', table, s_dif_rows)
                comp_data_list.append({
                    '表名': table,
                    '{} 缺索引'.format(s_db): '\n'.join(s_dif_rows),
                })

    print("\n------对比完成------\n")

    # 导出 Excel 文件
    export_excel_file(comp_data_list, '{}->{}-索引-对比.xlsx'.format(s_db, t_db))

脚本2

细心的你如果仔细思考一下索引执行就回发现,比如 From 和 To 索引的大小写名称不同,但结果相同;索引名称不同,但索引的字段完全相同;

这类场景,其实并不需要两边都要做修改,比较索引的耗时过程和数据量是成正比的,仅仅名字不同没必要徒增索引耗空间,所以,脚本还有改进的空间,将名字不同,但内容完全相同的索引,给排除掉,而非人工验证,岂不很好?

你也可以基于以上脚本1进行自己扩展实现,如下给出示例之一

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors

"""
表索引对比
"""

wb_result = Workbook()
sheet = wb_result.active


def result_excel_init():
    highlight = NamedStyle(name="highlight")
    highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
    highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
    highlight.fill = PatternFill("solid", fgColor="ACB9C9")

    sheet.append(['表名', '{} 缺索引'.format(s_db), '{} 缺索引'.format(t_db), '索引名相同内容不同'])

    sheet.row_dimensions[1].height = 25
    sheet.column_dimensions['A'].width = 50
    sheet.column_dimensions['B'].width = 150
    sheet.column_dimensions['C'].width = 150
    sheet.column_dimensions['D'].width = 150

    for cell in list(sheet.rows)[0]:
        cell.style = highlight


def result_excel_end():
    style = NamedStyle(name="style")
    v_center = Alignment(vertical='center', wrap_text=True)
    hv_list = [1]
    for i, row in enumerate(list(sheet.rows)):
        if i == 0:
            continue
        for cell in row:
            if cell.column in hv_list:
                pass
            else:
                style.alignment = v_center
            cell.style = style


def export_excel_file(data_list, file_name):
    print('\n--- 对比结束,导出 Excel 文件 ---\n')
    result_excel_init()
    for data in data_list:
        print(data)
        sheet.append([data.get('表名'),
                      data.get('{} 缺索引'.format(s_db), '').replace(' ', ' '),
                      data.get('{} 缺索引'.format(t_db), '').replace(' ', ' '),
                      data.get('索引名相同内容不同')])
    result_excel_end()
    wb_result.save(file_name)
    print('\n执行导出结果完成 {}\n'.format(file_name))


def exec_sql(sql, db_name, tb=''):
    db = pymysql.connect(host='IP', port=3306, user='用户', password='密码', database=db_name)
    cursor = db.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()
    cursor.close()
    db.close()
    if results:
        return results


def get_table_idx(rows):
    """
        Key 索引名、Value 索引字段
            索引名规则:名称#alter语句
    """
    tb_idx = {}
    if rows:
        for r in rows:
            fields = r.split('(')[1].replace(');', '').replace(' ', '')
            if ' PRIMARY KEY ' in r:
                tb_idx['pri_key' + '#' + r] = fields
            else:
                name = r.split(' INDEX ')[1].split(' ')[0]
                tb_idx[name + '#' + r] = fields
    return tb_idx


if __name__ == '__main__':
    s_db = 'From'
    t_db = 'To'

    schema_sql = """
                SELECT CONCAT( 'ALTER TABLE ', TABLE_NAME, '', ' ADD ',
                    IF
                        (NON_UNIQUE = 1,
                            CASE
                                UPPER( INDEX_TYPE ) 
                                WHEN 'FULLTEXT' THEN 'FULLTEXT INDEX' 
                                WHEN 'SPATIAL' THEN 'SPATIAL INDEX' ELSE CONCAT( 'INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) 
                            END,
                        IF
                            ( UPPER( INDEX_NAME ) = 'PRIMARY', CONCAT( 'PRIMARY KEY USING ', INDEX_TYPE ), CONCAT( 'UNIQUE INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) ) 
                        ),
                        '(', GROUP_CONCAT( DISTINCT CONCAT( '', COLUMN_NAME, '' ) ORDER BY SEQ_IN_INDEX ASC SEPARATOR ', ' ), ');' 
                ) AS 'Show_Add_Indexes' 
                FROM information_schema.STATISTICS 
                WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}' 
                GROUP BY TABLE_NAME, INDEX_NAME 
                ORDER BY TABLE_NAME ASC, INDEX_NAME ASC;
                """

    source_tables_result = exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
    target_tables_result = exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
    source_tables = set([tb for (tb,) in source_tables_result])
    target_tables = set([tb for (tb,) in target_tables_result])

    intersection_tables = set(source_tables) & set(target_tables)
    s_dif_tables = set(source_tables) - set(target_tables)
    t_dif_tables = set(target_tables) - set(source_tables)
    print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
    print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
    print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))

    ignore_difName_sameFields = True  # 过滤名字不相同但内容相同的索引

    comp_data_list = list()
    for i, table in enumerate(sorted(intersection_tables)):
        if table:
            s_result = exec_sql(schema_sql.format(s_db, table), s_db, table)
            t_result = exec_sql(schema_sql.format(t_db, table), t_db, table)
            if s_result == t_result:
                continue

            s_rows = set([data for (data,) in s_result]) if s_result else set()
            t_rows = set([data for (data,) in t_result]) if t_result else set()
            s_tb_idx = get_table_idx(s_rows)
            t_tb_idx = get_table_idx(t_rows)

            print('\n------ table: {} ------'.format(table))
            # 两边的索引名比较
            sum_key = s_tb_idx.keys() | t_tb_idx.keys()
            for k in sum_key:
                sv = s_tb_idx.get(k)
                tv = t_tb_idx.get(k)
                k_name = k.split('#')[0]
                alter_sql = k.split('#')[1]
                if not sv:
                    if ignore_difName_sameFields:
                        no_hit = True
                        for s_k in s_tb_idx.keys():
                            if s_tb_idx.get(s_k) == tv:
                                print('{} 索引名不同,{} <-> {} 字段内容相同 {} 忽略'.format(table, k_name, s_k.split('#')[0], tv))
                                no_hit = False
                                break
                        if no_hit:
                            print('`{}`.`{}` 缺索引 {}'.format(s_db, table, k_name))
                            comp_data_list.append({'表名': table, '{} 缺索引'.format(s_db): alter_sql})
                    else:
                        print('`{}`.`{}` 缺索引 {}'.format(s_db, table, k_name))
                        comp_data_list.append({'表名': table, '{} 缺索引'.format(s_db): alter_sql})
                elif not tv:
                    if ignore_difName_sameFields:
                        no_hit = True
                        for t_k in t_tb_idx.keys():
                            if t_tb_idx.get(t_k) == sv:
                                print('{} 索引名不同 {} <-> {} 字段内容相同 {} 忽略'.format(table, k_name, t_k.split('#')[0], sv))
                                no_hit = False
                                break
                        if no_hit:
                            print('`{}`.`{}` 缺索引 {}'.format(t_db, table, k_name))
                            comp_data_list.append({'表名': table, '{} 缺索引'.format(t_db): alter_sql})
                    else:
                        print('`{}`.`{}` 缺索引 {}'.format(t_db, table, k_name))
                        comp_data_list.append({'表名': table, '{} 缺索引'.format(t_db): alter_sql})
                elif sv != tv:
                    print('{} 索引名相同 {} 字段不同 {} <-> {} '.format(table, k_name, sv, tv))
                    comp_data_list.append({'表名': table, '索引名相同内容不同': '{}={}\n{}={}'.format(s_db, sv, t_db, tv)})

    print("\n------对比完成------\n")

    # 导出 Excel 文件
    export_excel_file(comp_data_list, '{}->{}-索引-对比2.xlsx'.format(s_db, t_db))


未经允许请勿转载:程序喵 » Python3 两个数据库所有表索引的对比验证

点  赞 (2) 打  赏
分享到: