ftp/ftp_client.py
2026-01-19 14:21:45 +08:00

432 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import sched
import os
import ftplib
import logging
import shutil
import datetime
import sys
import platform
import subprocess
from urllib.parse import urlparse
from logging.handlers import TimedRotatingFileHandler
import socket
# ================= 全局配置与路径修正 =================
# 获取当前脚本所在的绝对目录 (关键:解决开机自启时工作目录不一致的问题)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(BASE_DIR, 'conf.json')
# 默认参数
MIN_FREE_SPACE_MB = 1024 # 磁盘最小剩余空间 (MB)
SCAN_INTERVAL = 30 * 60 # 扫描间隔 (秒)
# 初始化调度器
scheduler = sched.scheduler(time.time, time.sleep)
is_task_running = False
# ================= 模块一:自启动与卸载管理 =================
def setup_autostart():
"""
配置开机自启
Windows: 写入注册表 HKCU Run (使用 pythonw.exe 避免黑框)
Linux: 创建并启用 Systemd 服务 (支持崩溃重启)
"""
system_type = platform.system()
script_path = os.path.abspath(__file__)
# 简单的标志文件,避免每次运行都重复操作注册表/系统文件
lock_file = os.path.join(BASE_DIR, '.autostart_configured')
if os.path.exists(lock_file):
return
print(f"[*] 正在为 {system_type} 配置开机自启动...")
if system_type == "Windows":
try:
import winreg
key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
app_name = "FTP_Auto_Client"
# 替换为 pythonw.exe 以实现无窗口运行
python_exe = sys.executable.replace("python.exe", "pythonw.exe")
# 构造命令 (处理路径含空格的情况)
cmd = f'"{python_exe}" "{script_path}"'
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE)
winreg.SetValueEx(key, app_name, 0, winreg.REG_SZ, cmd)
winreg.CloseKey(key)
# 创建标记文件
with open(lock_file, 'w') as f: f.write("1")
print("[+] Windows 自启动已配置 (注册表 HKCU Run)。")
except Exception as e:
print(f"[-] Windows 配置失败: {e}")
elif system_type == "Linux":
if os.geteuid() != 0:
print("[!] 提示: 未使用 sudo 运行,跳过 Systemd 服务配置。")
print(f" 若需开机自启,请运行: sudo {sys.executable} {script_path}")
return
service_name = "ftp_client_service"
service_file = f"/etc/systemd/system/{service_name}.service"
python_exe = sys.executable
content = f"""[Unit]
Description=FTP Auto Download Service
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory={BASE_DIR}
ExecStart={python_exe} {script_path}
Restart=always
RestartSec=60
StandardOutput=null
StandardError=null
[Install]
WantedBy=multi-user.target
"""
try:
with open(service_file, 'w') as f:
f.write(content)
subprocess.run(["systemctl", "daemon-reload"], check=True)
subprocess.run(["systemctl", "enable", service_name], check=True)
with open(lock_file, 'w') as f: f.write("1")
print(f"[+] Linux Systemd 服务已安装并启用: {service_name}")
except Exception as e:
print(f"[-] Linux 配置失败: {e}")
def remove_autostart():
"""
卸载开机自启配置
"""
system_type = platform.system()
lock_file = os.path.join(BASE_DIR, '.autostart_configured')
print(f"[*] 正在卸载 {system_type} 自启动配置...")
if system_type == "Windows":
try:
import winreg
key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
app_name = "FTP_Auto_Client"
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE)
try:
winreg.DeleteValue(key, app_name)
print("[+] Windows 注册表启动项已删除。")
except FileNotFoundError:
print("[-] 未找到启动项,可能已删除。")
winreg.CloseKey(key)
except Exception as e:
print(f"[-] 卸载出错: {e}")
elif system_type == "Linux":
if os.geteuid() != 0:
print("[!] 错误: 卸载服务需要 Root 权限 (sudo)。")
return
service_name = "ftp_client_service"
service_file = f"/etc/systemd/system/{service_name}.service"
try:
subprocess.run(["systemctl", "stop", service_name], stderr=subprocess.DEVNULL)
subprocess.run(["systemctl", "disable", service_name], stderr=subprocess.DEVNULL)
if os.path.exists(service_file):
os.remove(service_file)
print(f"[+] 服务文件已删除: {service_file}")
subprocess.run(["systemctl", "daemon-reload"], check=True)
print("[+] Linux Systemd 服务已彻底卸载。")
except Exception as e:
print(f"[-] 卸载出错: {e}")
# 清除标记文件
if os.path.exists(lock_file):
os.remove(lock_file)
# ================= 模块二:基础工具 =================
def load_config():
"""读取并解析配置文件"""
if not os.path.exists(CONFIG_FILE):
print(f"Error: 配置文件未找到 -> {CONFIG_FILE}")
return None
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error: 配置文件读取失败: {e}")
return None
def setup_logging(log_dir_config):
"""初始化日志,确保路径绝对化"""
# 路径修正:如果是相对路径,转为绝对路径
if not os.path.isabs(log_dir_config):
log_dir = os.path.join(BASE_DIR, log_dir_config)
else:
log_dir = log_dir_config
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, "ftp_dl.log")
logger = logging.getLogger("FTP_Manager")
logger.setLevel(logging.INFO)
if not logger.handlers:
# 按天回滚保留30天
handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=30, encoding='utf-8')
formatter = logging.Formatter('%(asctime)s - [%(levelname)s] - %(message)s')
handler.setFormatter(formatter)
# 控制台输出 (方便手动调试时查看,后台运行时不影响)
console = logging.StreamHandler()
console.setFormatter(formatter)
logger.addHandler(handler)
logger.addHandler(console)
return logger
# ================= 模块三:核心业务逻辑 =================
def check_and_clean_disk_space(download_path_config, logger):
"""磁盘清理策略"""
try:
# 路径修正
if not os.path.isabs(download_path_config):
download_path = os.path.join(BASE_DIR, download_path_config)
else:
download_path = download_path_config
if not os.path.exists(download_path):
os.makedirs(download_path)
total, used, free = shutil.disk_usage(download_path)
free_mb = free // (1024 * 1024)
if free_mb < MIN_FREE_SPACE_MB:
logger.warning(f"磁盘空间不足 (剩余 {free_mb}MB < 阈值 {MIN_FREE_SPACE_MB}MB),启动清理...")
files_by_date = {}
for root, dirs, files in os.walk(download_path):
for name in files:
filepath = os.path.join(root, name)
mtime = os.path.getmtime(filepath)
date_str = datetime.datetime.fromtimestamp(mtime).strftime('%Y-%m-%d')
if date_str not in files_by_date:
files_by_date[date_str] = []
files_by_date[date_str].append(filepath)
if not files_by_date:
logger.info("目录为空,无法清理。")
return
# 删除最早日期的一批文件
sorted_dates = sorted(files_by_date.keys())
oldest_date = sorted_dates[0]
logger.info(f"正在删除 {oldest_date} 的旧文件...")
for fpath in files_by_date[oldest_date]:
try:
os.remove(fpath)
logger.info(f"已删除: {fpath}")
except Exception as e:
logger.error(f"删除失败 {fpath}: {e}")
logger.info("清理完成。")
except Exception as e:
logger.error(f"磁盘维护错误: {e}")
def run_ftp_job(config):
"""执行 FTP 下载流程"""
global is_task_running
if is_task_running:
print("警告: 上一轮任务未结束,跳过本次调度。")
return
is_task_running = True
# 确保日志路径正确
logger = setup_logging(config.get('log_file_path', 'logs'))
try:
# 解析配置
parse_result = urlparse(config['parse_ftp_url'])
ftp_host = parse_result.hostname
ftp_port = parse_result.port or 21
ftp_user = parse_result.username
ftp_pass = parse_result.password
# 路径处理
local_path = config['download_path']
if not os.path.isabs(local_path):
local_path = os.path.join(BASE_DIR, local_path)
FTP_TIMEOUT = 60
# 执行磁盘检查
check_and_clean_disk_space(local_path, logger)
ftp = ftplib.FTP()
logger.info(f"正在连接 FTP: {ftp_host}:{ftp_port}")
ftp.connect(ftp_host, ftp_port, timeout=FTP_TIMEOUT)
ftp.login(ftp_user, ftp_pass)
ftp.set_pasv(True)
ftp.encoding = 'utf-8'
try:
files_list = ftp.nlst()
except ftplib.error_perm:
files_list = []
logger.info(f"服务器文件数: {len(files_list)}")
sure_files = [f for f in files_list if f.endswith('.sure')]
for sure_file in sure_files:
target_file = sure_file[:-5] # 去掉 .sure
if target_file in files_list:
local_file_path = os.path.join(local_path, target_file)
# 获取远程大小
try:
remote_size = ftp.size(target_file)
except:
remote_size = -1
# 本地存在且大小一致则跳过
if os.path.exists(local_file_path):
local_size = os.path.getsize(local_file_path)
if remote_size != -1 and local_size == remote_size:
logger.info(f"跳过已存在文件: {target_file}")
# 视需求决定是否删除服务器文件
# ftp.delete(target_file); ftp.delete(sure_file)
continue
else:
logger.warning(f"文件校验不一致,重新下载: {target_file}")
os.remove(local_file_path)
logger.info(f"开始下载: {target_file}")
temp_file_path = local_file_path + ".tmp"
download_success = False
try:
with open(temp_file_path, 'wb') as f:
ftp.retrbinary(f'RETR {target_file}', f.write)
if remote_size != -1:
if os.path.getsize(temp_file_path) != remote_size:
raise Exception("文件大小校验失败")
os.rename(temp_file_path, local_file_path)
download_success = True
logger.info(f"下载完成: {target_file}")
except Exception as dl_err:
logger.error(f"下载失败 {target_file}: {dl_err}")
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
if download_success:
try:
ftp.delete(target_file)
ftp.delete(sure_file)
logger.info(f"已清理服务器文件: {target_file}")
except Exception as del_err:
logger.error(f"删除服务器文件失败: {del_err}")
else:
logger.warning(f"孤立的标识文件: {sure_file}")
except (socket.timeout, socket.error) as net_err:
logger.error(f"网络错误: {net_err}")
except Exception as e:
logger.error(f"全局异常: {e}")
finally:
try:
ftp.quit()
except:
pass
try:
ftp.close()
except:
pass
is_task_running = False
# ================= 模块四:调度系统 =================
def schedule_runner():
"""循环调度器"""
config = load_config()
if not config:
# 如果读不到配置60秒后重试
scheduler.enter(60, 1, schedule_runner, ())
return
# 这里初始化日志是为了打印下次运行时间
logger = setup_logging(config.get('log_file_path', 'logs'))
now = datetime.datetime.now()
next_run = now + datetime.timedelta(seconds=SCAN_INTERVAL)
logger.info(f"当前任务周期结束。下次扫描时间: {next_run.strftime('%Y-%m-%d %H:%M:%S')}")
# 安排下一次任务
scheduler.enter(SCAN_INTERVAL, 1, run_wrapper, (config,))
def run_wrapper(config):
"""包装器:先干活,再预约下一次"""
run_ftp_job(config)
schedule_runner()
# ================= 主程序入口 =================
if __name__ == "__main__":
# --- 卸载模式 ---
if len(sys.argv) > 1 and sys.argv[1] == '--uninstall':
remove_autostart()
sys.exit(0)
# --- 正常模式 ---
print(f"[{datetime.datetime.now()}] 程序启动 (PID: {os.getpid()})")
print(f"工作目录: {BASE_DIR}")
# 1. 尝试自我配置 (开机自启)
setup_autostart()
# 2. 读取配置
cfg = load_config()
if cfg:
print("配置加载成功,开始运行...")
print("提示: 使用 'python ftp_client.py --uninstall' 可移除开机自启。")
# 立即执行一次
run_ftp_job(cfg)
# 启动调度循环
schedule_runner()
try:
scheduler.run()
except KeyboardInterrupt:
print("\n程序已手动停止。")
else:
print("程序无法启动:缺少配置文件 conf.json")