commit ced24a71e2cfa6f8a53bb750308a759da101c8ef Author: fangjiajunzzz <3323312903@qq.com> Date: Mon Jan 19 13:26:42 2026 +0800 提交 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8daf0a5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +ftp_downloads/* +!ftp_downloads/.gitkeep + +logger/* +!logger/.gitkeep + +.claude + \ No newline at end of file diff --git a/conf.json b/conf.json new file mode 100644 index 0000000..4634420 --- /dev/null +++ b/conf.json @@ -0,0 +1,5 @@ +{ + "parse_ftp_url": "ftp://remote:remote@rhx0.top:2121/" , + "download_path" : "ftp_downloads", + "log_file_path" : "logger" +} diff --git a/ftp_client.py b/ftp_client.py new file mode 100644 index 0000000..c7a3583 --- /dev/null +++ b/ftp_client.py @@ -0,0 +1,295 @@ +import json +import time +import sched +import os +import ftplib +import logging +import shutil +import datetime +from urllib.parse import urlparse +from logging.handlers import TimedRotatingFileHandler +import socket +# ================= 配置与常量 ================= +CONFIG_FILE = 'conf.json' +MIN_FREE_SPACE_MB = 1024 # 设定磁盘最小剩余空间 (例如 1GB),低于此值触发清理 +SCAN_INTERVAL = 30 * 60 # 30 分钟(单位:秒) + +# 初始化调度器 +scheduler = sched.scheduler(time.time, time.sleep) + +# 用于判断任务是否正在执行 +is_task_running = False + +def load_config(): + """读取配置文件""" + if not os.path.exists(CONFIG_FILE): + print(f"Error: {CONFIG_FILE} not found.") + return None + with open(CONFIG_FILE, 'r', encoding='utf-8') as f: + return json.load(f) + +def setup_logging(log_dir): + """配置日志系统,每天生成一个日志文件""" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + log_file = os.path.join(log_dir, "ftp_dl.log") + + logger = logging.getLogger("FTP_Manager") + logger.setLevel(logging.INFO) + + # 避免重复添加handler + if not logger.handlers: + # 按天回滚日志,保留最近30天 + handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=30, encoding='utf-8') + formatter = logging.Formatter('%(asctime)s - [%(levelname)s] - %(message)s') + handler.setFormatter(formatter) + + # 同时输出到控制台(方便调试,后台运行时可忽略) + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + logger.addHandler(handler) + logger.addHandler(console_handler) + + return logger + +# ================= 核心功能逻辑 ================= + +def check_and_clean_disk_space(download_path, logger): + """ + 检查磁盘空间。如果空间不足,删除最早一天的下载文件。 + 策略:根据文件的修改时间(mtime)将文件按日期分组,然后删除最老的一组。 + """ + try: + total, used, free = shutil.disk_usage(download_path) + free_mb = free // (1024 * 1024) + + if free_mb < MIN_FREE_SPACE_MB: + logger.warning(f"磁盘空间不足 (剩余 {free_mb}MB),开始执行清理策略...") + + # 1. 遍历所有文件并按日期归类 + files_by_date = {} + for root, dirs, files in os.walk(download_path): + for name in files: + filepath = os.path.join(root, name) + # 获取文件修改时间的时间戳 + mtime = os.path.getmtime(filepath) + date_str = datetime.datetime.fromtimestamp(mtime).strftime('%Y-%m-%d') + + if date_str not in files_by_date: + files_by_date[date_str] = [] + files_by_date[date_str].append(filepath) + + if not files_by_date: + logger.info("下载目录为空,无需清理。") + return + + # 2. 找到最早的日期 + sorted_dates = sorted(files_by_date.keys()) + oldest_date = sorted_dates[0] + + # 3. 删除该日期的所有文件 + logger.info(f"正在删除日期为 {oldest_date} 的旧文件以释放空间...") + for fpath in files_by_date[oldest_date]: + try: + os.remove(fpath) + logger.info(f"已删除: {fpath}") + except Exception as e: + logger.error(f"删除文件失败 {fpath}: {e}") + + logger.info("清理完成。") + else: + logger.info(f"磁盘空间充足 (剩余 {free_mb}MB)。") + + except Exception as e: + logger.error(f"磁盘检查/清理过程中发生错误: {e}") + +def run_ftp_job(config): + """ + 执行具体的FTP下载任务 (增强版:防断网、防中断) + """ + global is_task_running + + if is_task_running: + print("上一个任务仍在执行,跳过本次任务。") + return + + is_task_running = True + logger = setup_logging(config['log_file_path']) + # logger.info("================ 开始执行定时任务 ================") + + # 解析配置 + parse_result = urlparse(config['parse_ftp_url']) + ftp_host = parse_result.hostname + ftp_port = parse_result.port or 21 + ftp_user = parse_result.username + ftp_pass = parse_result.password + local_path = config['download_path'] + + # FTP 连接超时时间 (秒) + FTP_TIMEOUT = 60 + + if not os.path.exists(local_path): + os.makedirs(local_path) + + # 1. 检查磁盘空间 + check_and_clean_disk_space(local_path, logger) + + ftp = ftplib.FTP() + try: + logger.info(f"正在连接 FTP 服务器: {ftp_host}:{ftp_port}") + + # 【优化点1】设置连接超时,防止网络卡死 + ftp.connect(ftp_host, ftp_port, timeout=FTP_TIMEOUT) + ftp.login(ftp_user, ftp_pass) + ftp.set_pasv(True) + ftp.encoding = 'utf-8' # 防止中文乱码 + + # 获取文件列表 + try: + files_list = ftp.nlst() + except ftplib.error_perm: + files_list = [] + + logger.info(f"获取到文件列表: {len(files_list)} 个文件") + + # 筛选 .sure 文件 + sure_files = [f for f in files_list if f.endswith('.sure')] + + for sure_file in sure_files: + target_file = sure_file[:-5] # 移除 .sure 后缀 + + if target_file in files_list: + local_file_path = os.path.join(local_path, target_file) + + # 获取远程文件大小用于校验 + try: + remote_size = ftp.size(target_file) + except: + remote_size = -1 + + # 检查本地是否已存在完整文件 + if os.path.exists(local_file_path): + local_size = os.path.getsize(local_file_path) + if remote_size != -1 and local_size == remote_size: + logger.info(f"文件已存在且完整,跳过: {target_file}") + # (可选) 这里也可以选择删除服务器上的文件,视业务需求而定 + continue + else: + logger.warning(f"本地文件 {target_file} 大小不一致 (本地:{local_size}, 远程:{remote_size}),准备重新下载") + os.remove(local_file_path) # 删除旧的不完整文件 + + logger.info(f"开始下载: {target_file}") + + # 【优化点2】使用 .tmp 临时文件路径 + temp_file_path = local_file_path + ".tmp" + + download_success = False + try: + with open(temp_file_path, 'wb') as f: + # 使用回调写入,这里是网络中断最容易发生的地方 + ftp.retrbinary(f'RETR {target_file}', f.write) + + # 下载完成后的二次校验 + if remote_size != -1: + downloaded_size = os.path.getsize(temp_file_path) + if downloaded_size != remote_size: + raise Exception(f"下载文件大小不匹配: 需 {remote_size}, 实得 {downloaded_size}") + + # 【优化点3】原子操作:下载成功且校验通过后,才重命名为正式文件 + os.rename(temp_file_path, local_file_path) + download_success = True + logger.info(f"下载并校验成功: {target_file}") + + except Exception as dl_err: + logger.error(f"下载中断或失败 {target_file}: {dl_err}") + # 【优化点4】清理残留的垃圾文件 + if os.path.exists(temp_file_path): + try: + os.remove(temp_file_path) + logger.info(f"已清理未完成的临时文件: {temp_file_path}") + except: + pass + + # 只有下载成功才删除服务器文件 + if download_success: + try: + ftp.delete(target_file) + ftp.delete(sure_file) + logger.info(f"已清理服务器文件: {target_file} & {sure_file}") + except Exception as del_err: + logger.error(f"服务器文件删除失败: {del_err}") + + else: + logger.warning(f"发现 {sure_file} 但未找到源文件 {target_file}") + # 视情况决定是否删除孤立的 .sure 文件 + # ftp.delete(sure_file) + + except (socket.timeout, socket.error) as net_err: + logger.error(f"网络连接错误 (可能是断网或超时): {net_err}") + except ftplib.all_errors as ftp_err: + logger.error(f"FTP 协议错误: {ftp_err}") + except Exception as e: + logger.error(f"未知错误: {e}") + finally: + try: + ftp.quit() + except: + try: + ftp.close() # 强制关闭 + except: + pass + logger.info("本次任务结束") + is_task_running = False + +def schedule_runner(): + """ + 计算下一次运行时间并加入调度器 + """ + config = load_config() + if not config: + return + + logger = setup_logging(config['log_file_path']) + + # 获取当前时间 + now = datetime.datetime.now() + + # 计算下次运行的延迟时间 + delay_seconds = SCAN_INTERVAL + logger.info(f"程序运行中... 下次扫描将在 {now + datetime.timedelta(seconds=delay_seconds)} 启动 (等待 {delay_seconds:.2f} 秒)") + + # 加入调度器: + # 1. 执行任务 + # 2. 任务完成后,重新调用 schedule_runner 安排下一次(实现无限循环) + scheduler.enter(delay_seconds, 1, run_wrapper, (config,)) + +def run_wrapper(config): + """ + 包装器:运行任务后,立即安排下一次调度 + """ + run_ftp_job(config) + schedule_runner() + +# ================= 主程序入口 ================= + +if __name__ == "__main__": + # 首次启动,先执行一次任务 + cfg = load_config() + if cfg: + log_dir = cfg.get('log_file_path', 'logger') + if not os.path.exists(log_dir): os.makedirs(log_dir) + print(f"Service started. Logs will be saved to {log_dir}") + + # 执行初次扫描任务 + run_ftp_job(cfg) + + # 启动调度器 + schedule_runner() + try: + # 阻塞主线程,运行调度器 + scheduler.run() + except KeyboardInterrupt: + print("程序已手动停止") + \ No newline at end of file