一、背景
作为 java 工程师,少不了配置 maven 依赖,有依赖必然会在本地配置仓库,默认的目录地址为.m2/repository, 这本身没有什么,但随着项目的逐步增多,版本的逐步升级迭代(每周稳步迭代将会产生52个版本的 jar 包),仓库的目录占空空间将无限膨胀,占用空间甚至能达到 10G 以上,而往往很多的依赖版本已经废弃很久,未来也可能用不上,设想一下你的项目都统一用 spring-core 5.x 版本了,你的仓库里还存了不少 spring-core 4.x/3.x/2.x 版本的jar包...。另一角度讲对于一台 250G 的 mac 电脑来说也算是磁盘危机了。
二、优化方案
1、实现方式:用两种方式来实现
对于外面依赖,用目录的创建时间来判断,比如删除掉一年以前创建的目录;
对于公司自己项目,已知版本号,用目录上的版本号做限制,小于这个版本的数据直接删除目录;
2、语言选择:使用 python 脚本,代码简洁易懂
迭代指定的目录路径,遇到版本号的目录停止迭代,判断时间/判断版本号
符合条件,执行删除
三、代码实现
基于版本号删除
递归文件路径
符合版本号的条件,加入 list 集合
迭代 list 集合,放入多线程删除
同时删除 maven-metadata-local.xml 文件,此文件本地打包时会自动创建
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os
import sys
import time
import functools
import threading
import logging
from os.path import join, getsize
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
__author__ = 'TingFeng'
__date__ = '2022/08/13'
_root_file_path = os.path.abspath(".")
_py_filename = os.path.basename(sys.argv[0]).split(".")[0]
_log_file = '{}/logs/{}.log'.format(_root_file_path, _py_filename)
logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel('INFO')
logging.getLogger().addHandler(console_handler)
# 所有要删除的目录路径、文件路径
paths = []
file_paths = []
def compare_version(v1=None, v2=None, split_flag="."):
if (v1 is None) or (v1 == "") or (v2 is None) or (v2 == ""):
if ((v1 is None) or (v1 == "")) and (v2 is not None) and (v2 != ""):
return 2
if ((v2 is None) or (v2 == "")) and (v1 is not None) and (v1 != ""):
return 1
if v1 == v2:
return 0
try:
current_section_v1 = v1[:v1.index(split_flag)]
except Exception as e:
current_section_v1 = v1
try:
current_section_v2 = v2[:v2.index(split_flag)]
except Exception as e:
current_section_v2 = v2
if int(current_section_v1) > int(current_section_v2):
return 1
elif int(current_section_v1) < int(current_section_v2):
return 2
try:
other_section_v1 = v1[v1.index(split_flag) + 1:]
except Exception as e:
other_section_v1 = ""
try:
other_section_v2 = v2[v2.index(split_flag) + 1:]
except Exception as e:
other_section_v2 = ""
return compare_version(other_section_v1, other_section_v2)
def get_dir_size(path):
total_size = 0
doc_list = os.listdir(path)
for doc in doc_list:
if os.path.isfile(os.path.join(path, doc)):
total_size = total_size + os.path.getsize(os.path.join(path, doc))
else:
total_size = total_size + getsize(os.path.join(path, doc))
return total_size
def get_dir(path):
file_list = os.listdir(path)
try:
for tmp in file_list:
path_tmp = os.path.join(path, tmp)
if os.path.isdir(path_tmp):
if not path_tmp.endswith(dir_suffix):
get_dir(path_tmp)
else:
# paths.append(path_tmp)
# 比较版本
version = path_tmp.split('/')[-1].replace(dir_suffix, '')
flag = compare_version(version, minVersion)
# print('{} version={} minVersion={} flag={}'.format(path_tmp, version, minVersion, flag))
if flag == 2:
paths.append(path_tmp)
elif path_tmp[path_tmp.rfind('/') + 1:] == del_file:
file_paths.append(path_tmp)
except PermissionError:
pass
def rm_target(_path: str):
try:
command = 'rm -rf {}'.format(_path)
f = os.popen(command)
print(f.readline())
logging.info('{} [{}] 完毕'.format(threading.currentThread().getName(), command))
except Exception as e:
logging.error('执行异常,{}'.format(_path), e)
def time_me(info='耗时'):
def _time_me(fn):
@functools.wraps(fn)
def _wrapper(*args, **kwargs):
start = time.perf_counter()
fn(*args, **kwargs)
print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒')
return _wrapper
return _time_me
@time_me()
def main():
all_task = []
pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='code-read-exec')
# 获取目录
get_dir(m2_path)
# 清理目录
sum_size = 0
for p in paths:
size = get_dir_size(p)
sum_size += size
logging.info('{} {}'.format(p, size))
all_task.append(pool.submit(rm_target, p))
# 清理文件
for p in file_paths:
logging.info(p)
all_task.append(pool.submit(rm_target, p))
# 等待所有完成
wait(all_task, return_when=ALL_COMPLETED)
logging.info("\n------所有项目处理完毕--------")
memory = sum_size / 1000 / 1000
logging.info('共 {} 个项目, 约占用磁盘大小 {} {}'.format(len(paths), memory, "GB" if len(str(memory).split('.')[0]) > 4 else "MB"))
if __name__ == '__main__':
m2_path = '/Users/liurenkui/.m2/repository/com/tingfeng'
# 最大线程数量
max_workers = 20
# 最低版本限制,删除指定文件
minVersion = '4.89'
dir_suffix = '-SNAPSHOT'
del_file = 'maven-metadata-local.xml'
main()基于时间删除
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os
import sys
import time
import functools
import threading
import logging
import re
from os.path import join, getsize
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
__author__ = 'TingFeng'
__date__ = '2022/08/13'
_root_file_path = os.path.abspath(".")
_py_filename = os.path.basename(sys.argv[0]).split(".")[0]
_log_file = '{}/logs/{}.log'.format(_root_file_path, _py_filename)
logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel('INFO')
logging.getLogger().addHandler(console_handler)
# 所有要删除的目录路径
paths = []
def get_dir_size(path):
total_size = 0
doc_list = os.listdir(path)
for doc in doc_list:
if os.path.isfile(os.path.join(path, doc)):
total_size = total_size + os.path.getsize(os.path.join(path, doc))
else:
total_size = total_size + getsize(os.path.join(path, doc))
return total_size
def get_dir(path):
file_list = os.listdir(path)
try:
for tmp in file_list:
path_tmp = os.path.join(path, tmp)
if os.path.isdir(path_tmp):
if get_version(path_tmp[path_tmp.rfind('/') + 1:]):
ctime = get_ctime(path_tmp)
if min_time_clean > ctime:
# print(path_tmp, '|', ctime)
paths.append(path_tmp)
else:
get_dir(path_tmp)
except PermissionError:
pass
def rm_target(_path: str):
try:
if move_to_trash:
pass
else:
command = 'rm -rf {}'.format(_path)
f = os.popen(command)
print(f.readline())
logging.info('{} [{}] 完毕'.format(threading.currentThread().getName(), command))
except Exception as e:
logging.error('执行异常,{}'.format(_path), e)
def time_me(info='耗时'):
def _time_me(fn):
@functools.wraps(fn)
def _wrapper(*args, **kwargs):
start = time.perf_counter()
fn(*args, **kwargs)
print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒')
return _wrapper
return _time_me
def get_version(str):
"""获取版本"""
r = re.search(r'([\d.]+)', str)
if r and r.group(1) != '.':
return r.group(1)
def get_ctime(str):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(os.stat(str).st_ctime))
@time_me()
def main():
all_task = []
pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='code-read-exec')
# 获取目录
get_dir(m2_path)
# 清理目录
sum_size = 0
for p in paths:
size = get_dir_size(p)
sum_size += size
logging.info('{} | {}'.format(p, size))
all_task.append(pool.submit(rm_target, p))
# # 等待所有完成
wait(all_task, return_when=ALL_COMPLETED)
logging.info("\n------所有项目处理完毕--------")
memory = sum_size / 1000 / 1000
logging.info('共 {} 个项目, 约占用磁盘大小 {} {}'.format(len(paths), memory, "GB" if len(str(memory).split('.')[0]) > 4 else "MB"))
if __name__ == '__main__':
m2_path = '/Users/liurenkui/.m2/repository'
# 最大线程数量
max_workers = 20
# 最低版本限制,删除指定文件
min_time_clean = '2022-01-31'
# 是否经过回收站,不经过回收站会直接删除,暂未实现 send2trash 模块有异常
move_to_trash = False
main()四、风险点说明
以上实现方案中,会直接进行 rm -f 删除,不经过回收站,所以删除时请确认!!!
python 中有提供了 send2trash 模块,放入回收站功能,但本人在macos 系统中 python 3.7 和 python 3.8 测试,均有 No module named 'Foundation' 异常,有朋友解决还请告知方案,谢谢
附异常issues
https://github.com/arsenetar/send2trash/issues/24
五、优化体验
由原本的 10G+ 成功搜身到 3G+,心情舒畅
未经允许请勿转载:程序喵 » Python3 清理 .m2/repository 目录中废弃的 maven 依赖包
程序喵