ftp/ftp_client.py
2026-01-19 13:26:42 +08:00

295 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import sched
import os
import ftplib
import logging
import shutil
import datetime
from urllib.parse import urlparse
from logging.handlers import TimedRotatingFileHandler
import socket
# ================= 配置与常量 =================
CONFIG_FILE = 'conf.json'
MIN_FREE_SPACE_MB = 1024 # 设定磁盘最小剩余空间 (例如 1GB),低于此值触发清理
SCAN_INTERVAL = 30 * 60 # 30 分钟(单位:秒)
# 初始化调度器
scheduler = sched.scheduler(time.time, time.sleep)
# 用于判断任务是否正在执行
is_task_running = False
def load_config():
"""读取配置文件"""
if not os.path.exists(CONFIG_FILE):
print(f"Error: {CONFIG_FILE} not found.")
return None
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
def setup_logging(log_dir):
"""配置日志系统,每天生成一个日志文件"""
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, "ftp_dl.log")
logger = logging.getLogger("FTP_Manager")
logger.setLevel(logging.INFO)
# 避免重复添加handler
if not logger.handlers:
# 按天回滚日志保留最近30天
handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=30, encoding='utf-8')
formatter = logging.Formatter('%(asctime)s - [%(levelname)s] - %(message)s')
handler.setFormatter(formatter)
# 同时输出到控制台(方便调试,后台运行时可忽略)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(handler)
logger.addHandler(console_handler)
return logger
# ================= 核心功能逻辑 =================
def check_and_clean_disk_space(download_path, logger):
"""
检查磁盘空间。如果空间不足,删除最早一天的下载文件。
策略:根据文件的修改时间(mtime)将文件按日期分组,然后删除最老的一组。
"""
try:
total, used, free = shutil.disk_usage(download_path)
free_mb = free // (1024 * 1024)
if free_mb < MIN_FREE_SPACE_MB:
logger.warning(f"磁盘空间不足 (剩余 {free_mb}MB),开始执行清理策略...")
# 1. 遍历所有文件并按日期归类
files_by_date = {}
for root, dirs, files in os.walk(download_path):
for name in files:
filepath = os.path.join(root, name)
# 获取文件修改时间的时间戳
mtime = os.path.getmtime(filepath)
date_str = datetime.datetime.fromtimestamp(mtime).strftime('%Y-%m-%d')
if date_str not in files_by_date:
files_by_date[date_str] = []
files_by_date[date_str].append(filepath)
if not files_by_date:
logger.info("下载目录为空,无需清理。")
return
# 2. 找到最早的日期
sorted_dates = sorted(files_by_date.keys())
oldest_date = sorted_dates[0]
# 3. 删除该日期的所有文件
logger.info(f"正在删除日期为 {oldest_date} 的旧文件以释放空间...")
for fpath in files_by_date[oldest_date]:
try:
os.remove(fpath)
logger.info(f"已删除: {fpath}")
except Exception as e:
logger.error(f"删除文件失败 {fpath}: {e}")
logger.info("清理完成。")
else:
logger.info(f"磁盘空间充足 (剩余 {free_mb}MB)。")
except Exception as e:
logger.error(f"磁盘检查/清理过程中发生错误: {e}")
def run_ftp_job(config):
"""
执行具体的FTP下载任务 (增强版:防断网、防中断)
"""
global is_task_running
if is_task_running:
print("上一个任务仍在执行,跳过本次任务。")
return
is_task_running = True
logger = setup_logging(config['log_file_path'])
# logger.info("================ 开始执行定时任务 ================")
# 解析配置
parse_result = urlparse(config['parse_ftp_url'])
ftp_host = parse_result.hostname
ftp_port = parse_result.port or 21
ftp_user = parse_result.username
ftp_pass = parse_result.password
local_path = config['download_path']
# FTP 连接超时时间 (秒)
FTP_TIMEOUT = 60
if not os.path.exists(local_path):
os.makedirs(local_path)
# 1. 检查磁盘空间
check_and_clean_disk_space(local_path, logger)
ftp = ftplib.FTP()
try:
logger.info(f"正在连接 FTP 服务器: {ftp_host}:{ftp_port}")
# 【优化点1】设置连接超时防止网络卡死
ftp.connect(ftp_host, ftp_port, timeout=FTP_TIMEOUT)
ftp.login(ftp_user, ftp_pass)
ftp.set_pasv(True)
ftp.encoding = 'utf-8' # 防止中文乱码
# 获取文件列表
try:
files_list = ftp.nlst()
except ftplib.error_perm:
files_list = []
logger.info(f"获取到文件列表: {len(files_list)} 个文件")
# 筛选 .sure 文件
sure_files = [f for f in files_list if f.endswith('.sure')]
for sure_file in sure_files:
target_file = sure_file[:-5] # 移除 .sure 后缀
if target_file in files_list:
local_file_path = os.path.join(local_path, target_file)
# 获取远程文件大小用于校验
try:
remote_size = ftp.size(target_file)
except:
remote_size = -1
# 检查本地是否已存在完整文件
if os.path.exists(local_file_path):
local_size = os.path.getsize(local_file_path)
if remote_size != -1 and local_size == remote_size:
logger.info(f"文件已存在且完整,跳过: {target_file}")
# (可选) 这里也可以选择删除服务器上的文件,视业务需求而定
continue
else:
logger.warning(f"本地文件 {target_file} 大小不一致 (本地:{local_size}, 远程:{remote_size}),准备重新下载")
os.remove(local_file_path) # 删除旧的不完整文件
logger.info(f"开始下载: {target_file}")
# 【优化点2】使用 .tmp 临时文件路径
temp_file_path = local_file_path + ".tmp"
download_success = False
try:
with open(temp_file_path, 'wb') as f:
# 使用回调写入,这里是网络中断最容易发生的地方
ftp.retrbinary(f'RETR {target_file}', f.write)
# 下载完成后的二次校验
if remote_size != -1:
downloaded_size = os.path.getsize(temp_file_path)
if downloaded_size != remote_size:
raise Exception(f"下载文件大小不匹配: 需 {remote_size}, 实得 {downloaded_size}")
# 【优化点3】原子操作下载成功且校验通过后才重命名为正式文件
os.rename(temp_file_path, local_file_path)
download_success = True
logger.info(f"下载并校验成功: {target_file}")
except Exception as dl_err:
logger.error(f"下载中断或失败 {target_file}: {dl_err}")
# 【优化点4】清理残留的垃圾文件
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
logger.info(f"已清理未完成的临时文件: {temp_file_path}")
except:
pass
# 只有下载成功才删除服务器文件
if download_success:
try:
ftp.delete(target_file)
ftp.delete(sure_file)
logger.info(f"已清理服务器文件: {target_file} & {sure_file}")
except Exception as del_err:
logger.error(f"服务器文件删除失败: {del_err}")
else:
logger.warning(f"发现 {sure_file} 但未找到源文件 {target_file}")
# 视情况决定是否删除孤立的 .sure 文件
# ftp.delete(sure_file)
except (socket.timeout, socket.error) as net_err:
logger.error(f"网络连接错误 (可能是断网或超时): {net_err}")
except ftplib.all_errors as ftp_err:
logger.error(f"FTP 协议错误: {ftp_err}")
except Exception as e:
logger.error(f"未知错误: {e}")
finally:
try:
ftp.quit()
except:
try:
ftp.close() # 强制关闭
except:
pass
logger.info("本次任务结束")
is_task_running = False
def schedule_runner():
"""
计算下一次运行时间并加入调度器
"""
config = load_config()
if not config:
return
logger = setup_logging(config['log_file_path'])
# 获取当前时间
now = datetime.datetime.now()
# 计算下次运行的延迟时间
delay_seconds = SCAN_INTERVAL
logger.info(f"程序运行中... 下次扫描将在 {now + datetime.timedelta(seconds=delay_seconds)} 启动 (等待 {delay_seconds:.2f} 秒)")
# 加入调度器:
# 1. 执行任务
# 2. 任务完成后,重新调用 schedule_runner 安排下一次(实现无限循环)
scheduler.enter(delay_seconds, 1, run_wrapper, (config,))
def run_wrapper(config):
"""
包装器:运行任务后,立即安排下一次调度
"""
run_ftp_job(config)
schedule_runner()
# ================= 主程序入口 =================
if __name__ == "__main__":
# 首次启动,先执行一次任务
cfg = load_config()
if cfg:
log_dir = cfg.get('log_file_path', 'logger')
if not os.path.exists(log_dir): os.makedirs(log_dir)
print(f"Service started. Logs will be saved to {log_dir}")
# 执行初次扫描任务
run_ftp_job(cfg)
# 启动调度器
schedule_runner()
try:
# 阻塞主线程,运行调度器
scheduler.run()
except KeyboardInterrupt:
print("程序已手动停止")