Python3 两个数据库所有表索引的对比验证
接着上一篇文章继续说明:Python3 两个数据库触发器、存储过程、函数、视图、表结构、索引对比验证
实现结果如下
对比脚步实现
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os
import sys
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors
"""
表索引对比
"""
wb_result = Workbook()
sheet = wb_result.active
def result_excel_init():
highlight = NamedStyle(name="highlight")
highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
highlight.fill = PatternFill("solid", fgColor="ACB9C9")
sheet.append(['表名', '{} 缺索引'.format(s_db), '{} 缺索引'.format(t_db)])
sheet.row_dimensions[1].height = 25
sheet.column_dimensions['A'].width = 50
sheet.column_dimensions['B'].width = 150
sheet.column_dimensions['C'].width = 150
for cell in list(sheet.rows)[0]:
cell.style = highlight
def result_excel_end():
style = NamedStyle(name="style")
v_center = Alignment(vertical='center', wrap_text=True)
hv_list = [1]
for i, row in enumerate(list(sheet.rows)):
if i == 0:
continue
for cell in row:
if cell.column in hv_list:
pass
else:
style.alignment = v_center
cell.style = style
def export_excel_file(data_list, file_name):
print('\n--- 对比结束,导出 Excel 文件 ---\n')
result_excel_init()
for data in data_list:
print(data)
sheet.append([data.get('表名'), data.get('{} 缺索引'.format(s_db), '').replace(' ', ' '), data.get('{} 缺索引'.format(t_db), '').replace(' ', ' ')])
result_excel_end()
wb_result.save(file_name)
print('\n执行导出结果完成 {}\n'.format(file_name))
def exec_sql(sql, db_name, tb=''):
db = pymysql.connect(host='Ip', port=3306, user='用户', password='密码', database=db_name)
cursor = db.cursor()
cursor.execute(sql)
results = cursor.fetchall()
if results:
return results
# else:
# print('\t{}.{} 查询无索引'.format(db_name, tb if tb else ''))
if __name__ == '__main__':
s_db = 'From'
t_db = 'To'
schema_sql = """
SELECT CONCAT( 'ALTER TABLE ', TABLE_NAME, '', ' ADD ',
IF
(NON_UNIQUE = 1,
CASE
UPPER( INDEX_TYPE )
WHEN 'FULLTEXT' THEN 'FULLTEXT INDEX'
WHEN 'SPATIAL' THEN 'SPATIAL INDEX' ELSE CONCAT( 'INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE )
END,
IF
( UPPER( INDEX_NAME ) = 'PRIMARY', CONCAT( 'PRIMARY KEY USING ', INDEX_TYPE ), CONCAT( 'UNIQUE INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) )
),
'(', GROUP_CONCAT( DISTINCT CONCAT( '', COLUMN_NAME, '' ) ORDER BY SEQ_IN_INDEX ASC SEPARATOR ', ' ), ');'
) AS 'Show_Add_Indexes'
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}'
GROUP BY TABLE_NAME, INDEX_NAME
ORDER BY TABLE_NAME ASC, INDEX_NAME ASC;
"""
source_tables_result = exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
target_tables_result = exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
source_tables = set([tb for (tb,) in source_tables_result])
target_tables = set([tb for (tb,) in target_tables_result])
intersection_tables = set(source_tables) & set(target_tables)
s_dif_tables = set(source_tables) - set(target_tables)
t_dif_tables = set(target_tables) - set(source_tables)
print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))
comp_data_list = list()
for i, table in enumerate(sorted(intersection_tables)):
if table:
s_result = exec_sql(schema_sql.format(s_db, table), s_db, table)
t_result = exec_sql(schema_sql.format(t_db, table), t_db, table)
if s_result == t_result:
continue
s_rows = set([data for (data,) in s_result]) if s_result else set()
t_rows = set([data for (data,) in t_result]) if t_result else set()
t_dif_rows = s_rows - t_rows
s_dif_rows = t_rows - s_rows
if t_dif_rows:
print(t_db, '缺索引:', t_dif_rows)
comp_data_list.append({
'表名': table,
'{} 缺索引'.format(t_db): '\n'.join(t_dif_rows),
})
if s_dif_rows:
print(s_db, '缺索引', table, s_dif_rows)
comp_data_list.append({
'表名': table,
'{} 缺索引'.format(s_db): '\n'.join(s_dif_rows),
})
print("\n------对比完成------\n")
# 导出 Excel 文件
export_excel_file(comp_data_list, '{}->{}-索引-对比.xlsx'.format(s_db, t_db))脚本2
细心的你如果仔细思考一下索引执行就回发现,比如 From 和 To 索引的大小写名称不同,但结果相同;索引名称不同,但索引的字段完全相同;
这类场景,其实并不需要两边都要做修改,比较索引的耗时过程和数据量是成正比的,仅仅名字不同没必要徒增索引耗空间,所以,脚本还有改进的空间,将名字不同,但内容完全相同的索引,给排除掉,而非人工验证,岂不很好?
你也可以基于以上脚本1进行自己扩展实现,如下给出示例之一
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os
import sys
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors
"""
表索引对比
"""
wb_result = Workbook()
sheet = wb_result.active
def result_excel_init():
highlight = NamedStyle(name="highlight")
highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
highlight.fill = PatternFill("solid", fgColor="ACB9C9")
sheet.append(['表名', '{} 缺索引'.format(s_db), '{} 缺索引'.format(t_db), '索引名相同内容不同'])
sheet.row_dimensions[1].height = 25
sheet.column_dimensions['A'].width = 50
sheet.column_dimensions['B'].width = 150
sheet.column_dimensions['C'].width = 150
sheet.column_dimensions['D'].width = 150
for cell in list(sheet.rows)[0]:
cell.style = highlight
def result_excel_end():
style = NamedStyle(name="style")
v_center = Alignment(vertical='center', wrap_text=True)
hv_list = [1]
for i, row in enumerate(list(sheet.rows)):
if i == 0:
continue
for cell in row:
if cell.column in hv_list:
pass
else:
style.alignment = v_center
cell.style = style
def export_excel_file(data_list, file_name):
print('\n--- 对比结束,导出 Excel 文件 ---\n')
result_excel_init()
for data in data_list:
print(data)
sheet.append([data.get('表名'),
data.get('{} 缺索引'.format(s_db), '').replace(' ', ' '),
data.get('{} 缺索引'.format(t_db), '').replace(' ', ' '),
data.get('索引名相同内容不同')])
result_excel_end()
wb_result.save(file_name)
print('\n执行导出结果完成 {}\n'.format(file_name))
def exec_sql(sql, db_name, tb=''):
db = pymysql.connect(host='IP', port=3306, user='用户', password='密码', database=db_name)
cursor = db.cursor()
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
db.close()
if results:
return results
def get_table_idx(rows):
"""
Key 索引名、Value 索引字段
索引名规则:名称#alter语句
"""
tb_idx = {}
if rows:
for r in rows:
fields = r.split('(')[1].replace(');', '').replace(' ', '')
if ' PRIMARY KEY ' in r:
tb_idx['pri_key' + '#' + r] = fields
else:
name = r.split(' INDEX ')[1].split(' ')[0]
tb_idx[name + '#' + r] = fields
return tb_idx
if __name__ == '__main__':
s_db = 'From'
t_db = 'To'
schema_sql = """
SELECT CONCAT( 'ALTER TABLE ', TABLE_NAME, '', ' ADD ',
IF
(NON_UNIQUE = 1,
CASE
UPPER( INDEX_TYPE )
WHEN 'FULLTEXT' THEN 'FULLTEXT INDEX'
WHEN 'SPATIAL' THEN 'SPATIAL INDEX' ELSE CONCAT( 'INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE )
END,
IF
( UPPER( INDEX_NAME ) = 'PRIMARY', CONCAT( 'PRIMARY KEY USING ', INDEX_TYPE ), CONCAT( 'UNIQUE INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) )
),
'(', GROUP_CONCAT( DISTINCT CONCAT( '', COLUMN_NAME, '' ) ORDER BY SEQ_IN_INDEX ASC SEPARATOR ', ' ), ');'
) AS 'Show_Add_Indexes'
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}'
GROUP BY TABLE_NAME, INDEX_NAME
ORDER BY TABLE_NAME ASC, INDEX_NAME ASC;
"""
source_tables_result = exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
target_tables_result = exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
source_tables = set([tb for (tb,) in source_tables_result])
target_tables = set([tb for (tb,) in target_tables_result])
intersection_tables = set(source_tables) & set(target_tables)
s_dif_tables = set(source_tables) - set(target_tables)
t_dif_tables = set(target_tables) - set(source_tables)
print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))
ignore_difName_sameFields = True # 过滤名字不相同但内容相同的索引
comp_data_list = list()
for i, table in enumerate(sorted(intersection_tables)):
if table:
s_result = exec_sql(schema_sql.format(s_db, table), s_db, table)
t_result = exec_sql(schema_sql.format(t_db, table), t_db, table)
if s_result == t_result:
continue
s_rows = set([data for (data,) in s_result]) if s_result else set()
t_rows = set([data for (data,) in t_result]) if t_result else set()
s_tb_idx = get_table_idx(s_rows)
t_tb_idx = get_table_idx(t_rows)
print('\n------ table: {} ------'.format(table))
# 两边的索引名比较
sum_key = s_tb_idx.keys() | t_tb_idx.keys()
for k in sum_key:
sv = s_tb_idx.get(k)
tv = t_tb_idx.get(k)
k_name = k.split('#')[0]
alter_sql = k.split('#')[1]
if not sv:
if ignore_difName_sameFields:
no_hit = True
for s_k in s_tb_idx.keys():
if s_tb_idx.get(s_k) == tv:
print('{} 索引名不同,{} <-> {} 字段内容相同 {} 忽略'.format(table, k_name, s_k.split('#')[0], tv))
no_hit = False
break
if no_hit:
print('`{}`.`{}` 缺索引 {}'.format(s_db, table, k_name))
comp_data_list.append({'表名': table, '{} 缺索引'.format(s_db): alter_sql})
else:
print('`{}`.`{}` 缺索引 {}'.format(s_db, table, k_name))
comp_data_list.append({'表名': table, '{} 缺索引'.format(s_db): alter_sql})
elif not tv:
if ignore_difName_sameFields:
no_hit = True
for t_k in t_tb_idx.keys():
if t_tb_idx.get(t_k) == sv:
print('{} 索引名不同 {} <-> {} 字段内容相同 {} 忽略'.format(table, k_name, t_k.split('#')[0], sv))
no_hit = False
break
if no_hit:
print('`{}`.`{}` 缺索引 {}'.format(t_db, table, k_name))
comp_data_list.append({'表名': table, '{} 缺索引'.format(t_db): alter_sql})
else:
print('`{}`.`{}` 缺索引 {}'.format(t_db, table, k_name))
comp_data_list.append({'表名': table, '{} 缺索引'.format(t_db): alter_sql})
elif sv != tv:
print('{} 索引名相同 {} 字段不同 {} <-> {} '.format(table, k_name, sv, tv))
comp_data_list.append({'表名': table, '索引名相同内容不同': '{}={}\n{}={}'.format(s_db, sv, t_db, tv)})
print("\n------对比完成------\n")
# 导出 Excel 文件
export_excel_file(comp_data_list, '{}->{}-索引-对比2.xlsx'.format(s_db, t_db))未经允许请勿转载:程序喵 » Python3 两个数据库所有表索引的对比验证
程序喵