Python3 清理 .m2/repository 目录中废弃的 maven 依赖包

一、背景

作为 java 工程师,少不了配置 maven 依赖,有依赖必然会在本地配置仓库,默认的目录地址为.m2/repository,  这本身没有什么,但随着项目的逐步增多,版本的逐步升级迭代(每周稳步迭代将会产生52个版本的 jar 包),仓库的目录占空空间将无限膨胀,占用空间甚至能达到 10G 以上,而往往很多的依赖版本已经废弃很久,未来也可能用不上,设想一下你的项目都统一用 spring-core 5.x 版本了,你的仓库里还存了不少 spring-core 4.x/3.x/2.x 版本的jar包...。另一角度讲对于一台 250G 的 mac 电脑来说也算是磁盘危机了。

二、优化方案

1、实现方式:用两种方式来实现

  • 对于外面依赖,用目录的创建时间来判断,比如删除掉一年以前创建的目录;

  • 对于公司自己项目,已知版本号,用目录上的版本号做限制,小于这个版本的数据直接删除目录;

2、语言选择:使用 python 脚本,代码简洁易懂

  • 迭代指定的目录路径,遇到版本号的目录停止迭代,判断时间/判断版本号

  • 符合条件,执行删除

a.jpg

三、代码实现

Snipaste_2022-08-14_15-33-37.jpg

基于版本号删除

  • 递归文件路径

  • 符合版本号的条件,加入 list 集合

  • 迭代 list 集合,放入多线程删除

  • 同时删除 maven-metadata-local.xml 文件,此文件本地打包时会自动创建

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import time
import functools
import threading
import logging
from os.path import join, getsize
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED


__author__ = 'TingFeng'
__date__ = '2022/08/13'


_root_file_path = os.path.abspath(".")
_py_filename = os.path.basename(sys.argv[0]).split(".")[0]
_log_file = '{}/logs/{}.log'.format(_root_file_path, _py_filename)
logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel('INFO')
logging.getLogger().addHandler(console_handler)

# 所有要删除的目录路径、文件路径
paths = []
file_paths = []

def compare_version(v1=None, v2=None, split_flag="."):
    if (v1 is None) or (v1 == "") or (v2 is None) or (v2 == ""):
        if ((v1 is None) or (v1 == "")) and (v2 is not None) and (v2 != ""):
            return 2
        if ((v2 is None) or (v2 == "")) and (v1 is not None) and (v1 != ""):
            return 1
    if v1 == v2:
        return 0
    try:
        current_section_v1 = v1[:v1.index(split_flag)]
    except Exception as e:
        current_section_v1 = v1
    try:
        current_section_v2 = v2[:v2.index(split_flag)]
    except Exception as e:
        current_section_v2 = v2
    if int(current_section_v1) > int(current_section_v2):
        return 1
    elif int(current_section_v1) < int(current_section_v2):
        return 2
    try:
        other_section_v1 = v1[v1.index(split_flag) + 1:]
    except Exception as e:
        other_section_v1 = ""
    try:
        other_section_v2 = v2[v2.index(split_flag) + 1:]
    except Exception as e:
        other_section_v2 = ""
    return compare_version(other_section_v1, other_section_v2)


def get_dir_size(path):
    total_size = 0
    doc_list = os.listdir(path)
    for doc in doc_list:
        if os.path.isfile(os.path.join(path, doc)):
            total_size = total_size + os.path.getsize(os.path.join(path, doc))
        else:
            total_size = total_size + getsize(os.path.join(path, doc))
    return total_size


def get_dir(path):
    file_list = os.listdir(path)
    try:
        for tmp in file_list:
            path_tmp = os.path.join(path, tmp)
            if os.path.isdir(path_tmp):
                if not path_tmp.endswith(dir_suffix):
                    get_dir(path_tmp)
                else:
                    # paths.append(path_tmp)
                    # 比较版本
                    version = path_tmp.split('/')[-1].replace(dir_suffix, '')
                    flag = compare_version(version, minVersion)
                    # print('{} version={} minVersion={} flag={}'.format(path_tmp, version, minVersion, flag))
                    if flag == 2:
                        paths.append(path_tmp)
            elif path_tmp[path_tmp.rfind('/') + 1:] == del_file:
                file_paths.append(path_tmp)
    except PermissionError:
        pass


def rm_target(_path: str):
    try:
        command = 'rm -rf {}'.format(_path)
        f = os.popen(command)
        print(f.readline())
        logging.info('{} [{}] 完毕'.format(threading.currentThread().getName(), command))
    except Exception as e:
        logging.error('执行异常,{}'.format(_path), e)


def time_me(info='耗时'):
    def _time_me(fn):
        @functools.wraps(fn)
        def _wrapper(*args, **kwargs):
            start = time.perf_counter()
            fn(*args, **kwargs)
            print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒')

        return _wrapper

    return _time_me


@time_me()
def main():
    all_task = []
    pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='code-read-exec')

    # 获取目录
    get_dir(m2_path)

    # 清理目录
    sum_size = 0
    for p in paths:
        size = get_dir_size(p)
        sum_size += size
        logging.info('{} {}'.format(p, size))
        all_task.append(pool.submit(rm_target, p))

    # 清理文件
    for p in file_paths:
        logging.info(p)
        all_task.append(pool.submit(rm_target, p))

    # 等待所有完成
    wait(all_task, return_when=ALL_COMPLETED)
    logging.info("\n------所有项目处理完毕--------")
    memory = sum_size / 1000 / 1000
    logging.info('共 {} 个项目, 约占用磁盘大小 {} {}'.format(len(paths), memory, "GB" if len(str(memory).split('.')[0]) > 4 else "MB"))


if __name__ == '__main__':
    m2_path = '/Users/liurenkui/.m2/repository/com/tingfeng'

    # 最大线程数量
    max_workers = 20
    # 最低版本限制,删除指定文件
    minVersion = '4.89'
    dir_suffix = '-SNAPSHOT'
    del_file = 'maven-metadata-local.xml'

    main()


基于时间删除

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import time
import functools
import threading
import logging
import re
from os.path import join, getsize
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED


__author__ = 'TingFeng'
__date__ = '2022/08/13'


_root_file_path = os.path.abspath(".")
_py_filename = os.path.basename(sys.argv[0]).split(".")[0]
_log_file = '{}/logs/{}.log'.format(_root_file_path, _py_filename)
logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel('INFO')
logging.getLogger().addHandler(console_handler)

# 所有要删除的目录路径
paths = []


def get_dir_size(path):
    total_size = 0
    doc_list = os.listdir(path)
    for doc in doc_list:
        if os.path.isfile(os.path.join(path, doc)):
            total_size = total_size + os.path.getsize(os.path.join(path, doc))
        else:
            total_size = total_size + getsize(os.path.join(path, doc))
    return total_size


def get_dir(path):
    file_list = os.listdir(path)
    try:
        for tmp in file_list:
            path_tmp = os.path.join(path, tmp)
            if os.path.isdir(path_tmp):
                if get_version(path_tmp[path_tmp.rfind('/') + 1:]):
                    ctime = get_ctime(path_tmp)
                    if min_time_clean > ctime:
                        # print(path_tmp, '|', ctime)
                        paths.append(path_tmp)
                else:
                    get_dir(path_tmp)
    except PermissionError:
        pass


def rm_target(_path: str):
    try:
        if move_to_trash:
            pass
        else:
            command = 'rm -rf {}'.format(_path)
            f = os.popen(command)
            print(f.readline())
        logging.info('{} [{}] 完毕'.format(threading.currentThread().getName(), command))
    except Exception as e:
        logging.error('执行异常,{}'.format(_path), e)


def time_me(info='耗时'):
    def _time_me(fn):
        @functools.wraps(fn)
        def _wrapper(*args, **kwargs):
            start = time.perf_counter()
            fn(*args, **kwargs)
            print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒')

        return _wrapper

    return _time_me


def get_version(str):
    """获取版本"""
    r = re.search(r'([\d.]+)', str)
    if r and r.group(1) != '.':
        return r.group(1)


def get_ctime(str):
    return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(os.stat(str).st_ctime))


@time_me()
def main():
    all_task = []
    pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='code-read-exec')

    # 获取目录
    get_dir(m2_path)

    # 清理目录
    sum_size = 0
    for p in paths:
        size = get_dir_size(p)
        sum_size += size
        logging.info('{}  | {}'.format(p, size))
        all_task.append(pool.submit(rm_target, p))

    # # 等待所有完成
    wait(all_task, return_when=ALL_COMPLETED)
    logging.info("\n------所有项目处理完毕--------")
    memory = sum_size / 1000 / 1000
    logging.info('共 {} 个项目, 约占用磁盘大小 {} {}'.format(len(paths), memory, "GB" if len(str(memory).split('.')[0]) > 4 else "MB"))


if __name__ == '__main__':
    m2_path = '/Users/liurenkui/.m2/repository'

    # 最大线程数量
    max_workers = 20
    # 最低版本限制,删除指定文件
    min_time_clean = '2022-01-31'
    # 是否经过回收站,不经过回收站会直接删除,暂未实现 send2trash 模块有异常
    move_to_trash = False

    main()

四、风险点说明

以上实现方案中,会直接进行 rm -f 删除,不经过回收站,所以删除时请确认!!!

python 中有提供了 send2trash 模块,放入回收站功能,但本人在macos 系统中 python 3.7 和 python 3.8 测试,均有 No module named 'Foundation' 异常,有朋友解决还请告知方案,谢谢

附异常issues

  • https://github.com/arsenetar/send2trash/issues/24

五、优化体验

Snipaste_2022-08-14_15-42-16.jpg

由原本的 10G+ 成功搜身到 3G+,心情舒畅

未经允许请勿转载:程序喵 » Python3 清理 .m2/repository 目录中废弃的 maven 依赖包

点  赞 (1) 打  赏
分享到: