file-download sftp
资源内容介绍
file-download sftp #!/usr/bin/env python# encoding: utf-8import argparseimport configparserimport ftplibimport hashlibimport jsonimport loggingimport osimport reimport threadingimport timeimport tracebackfrom concurrent.futures import ThreadPoolExecutorfrom datetime import datetime, timedeltafrom enum import Enumfrom pathlib import PurePosixPathfrom posixpath import dirnamefrom queue import Queue, Emptyimport paramikoimport pymysqllogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')class Enums(Enum): # 文件类型 PATH_TYPE_UNKNOWN = ("UNKOWN", 0) PATH_TYPE_FILE = ("FILE", 1) PATH_TYPE_DIR = ("DIR", 2) # 下载方式 DOWNLOAD_MODE_FULL = ("FULL", 0) DOWNLOAD_MODE_INCREMENT = ("INCRE", 1) # 文件操作 OPERATIONS_DEL = ("DEL", 0) OPERATIONS_MV = ("MV", 1) # 数据类型 DATA_TYPE_CP = ("CP", 2) DATA_TYPE_CP_SP = ("CP_SP", 20) DATA_TYPE_FT = ("FT", 3) DATA_TYPE_FT_SP = ("FT_SP", 30) DATA_TYPE_DEFECT = ("DEFECT", 5) # 文件状态 QUEUE = ("下载成功,在解析队列中", 1) TRASH_FILE = ("垃圾文件", 14) DOWNLOAD_FAIL = ("下载失败", 15) @property def DESC(self): return self._value_[0] @property def CODE(self): return self._value_[1] @classmethod def get_code_by_desc(cls, description): for member in cls: if str(member.DESC).strip().upper() == str(description).strip().upper(): return member.CODE raise ValueError(f"No member with description '{description}' found")class ConfigLoader: """ 加载配置文件内容 """ def __init__(self, config_file_path): self.config_file_path = config_file_path self.config = configparser.ConfigParser() self.config.read(config_file_path, encoding="UTF-8") self.starrocks_host = self.config.get("starrocks", 'host') self.starrocks_port = self.config.get("starrocks", 'port') self.starrocks_http_port = self.config.get("starrocks", 'http_port') self.starrocks_database = self.config.get("starrocks", 'database') self.starrocks_user = self.config.get("starrocks", 'user') self.starrocks_password = self.config.get("starrocks", 'password') self.mysql_host = self.config.get("mysql", 'host') self.mysql_port = self.config.get("mysql", 'port') self.mysql_database = self.config.get("mysql", 'database') self.mysql_user = self.config.get("mysql", 'user') self.mysql_password = self.config.get("mysql", 'password') self.encode_list = str(self.config.get("common", 'encode_list')).split(",") self.defect_white_list = str(self.config.get("common", 'defect_white_list')) self.defect_black_list = str(self.config.get("common", 'defect_black_list')) self.defect_trash_file_delete = str(self.config.get("common", 'defect_trash_file_delete')) self.pool_size = int(self.config.get("common", 'pool_size')) def get(self, section, option): try: value = self.config.get(section, option) return value except (configparser.NoSectionError, configparser.NoOptionError): return None def set_value(self, section, option, value): if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, option, value) def save(self): with open(self.config_file_path, 'w') as configfile: self.config.write(configfile)class Tools: def generator_id(self, string_str): """ 计算字符串ID :param string_str: 字符串 :return: """ return abs(int.from_bytes(bytes.fromhex(hashlib.md5(str(string_str).encode()).hexdigest()[-16:]), 'little', signed=True)) def calculate_md5(self, file_abs_path): """ 计算文件 md5 :param file_abs_path: 文件绝对路径 :return: """ hash_md5 = hashlib.md5() with open(file_abs_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def modify_file_name(self, lpath, md5): """ 修改所有文件名.可以根据需求自定义修改 :param lpath: :param md5: :return: """ file_name = os.path.basename(lpath) ldir = os.path.dirname(lpath) new_file_name = md5 + "::" + file_name new_lpath = os.path.join(ldir, new_file_name).replace("\\", '/') # 修改文件名 if os.path.exists(new_lpath): os.remove(new_lpath) os.rename(lpath, new_lpath) logging.info(f"Renamed: {lpath} -> {new_lpath}") return new_lpath def delete_file(self, file_abs_path): """ 判断指定路径的文件是否存在且是文件,若满足条件则删除该文件 :param file_abs_path: 要判断并删除的文件路径 :return: 无 """ if os.path.exists(file_abs_path) and os.path.isfile(file_abs_path): try: os.remove(file_abs_path) logging.warning(f"{file_abs_path}是垃圾文件, 已成功删除。") except FileNotFoundError: logging.info(f"{file_abs_path} 不存在,可能已被其他操作删除。") except PermissionError: logging.info(f"没有权限删除 {file_abs_path}。") else: logging.info(f"{file_abs_path} 不存在或不是一个文件。") def generator_file_list(self, backup_name, backup_path, bak_ftp_id, data_type, download_duration, download_end_time, download_start_time, download_status, file_list, instance_id, md5, mtime, new_lpath, rpath, size, src_ftp_id, config, tools): # 如果数据类型是cp_sp/ft_sp[cp_sp/ft_sp在一个工作流中,cp/sp数据都要录入] if str(data_type).strip().upper() == Enums.DATA_TYPE_CP_SP.DESC: # cp的录入一份 file_list.append([str(PurePosixPath(new_lpath).name), self.generator_id(str(PurePosixPath(rpath).name) + "::" + md5), Enums.DATA_TYPE_CP.CODE, 'data_source', mtime, size, md5, str(PurePosixPath(new_lpath).parent), str(PurePosixPath(rpath).parent), src_ftp_id, backup_name, backup_path, bak_ftp_id, download_duration, download_start_time, download_end_time, download_status, download_end_time, instance_id ]) # cp_sp的录入一份 file_list.append([str(PurePosixPath(new_lpath).name), self.generator_id(str(PurePosixPath(rpath).name) + "::" + md5), Enums.DATA_TYPE_CP_SP.CODE, 'data_source', mtime, size, md5, str(PurePosixPath(new_lpath).parent), str(PurePosixPath(rpath).parent), src_ftp_id, backup_name, backup_path, bak_ftp_id, download_duration, download_start_time,