ubuntu/linux 服务器操作面板
资源内容介绍
ubuntu/linux 服务器操作面板 from flask import Flask, request, redirect, url_for, render_template, send_from_directory, abort, flashfrom apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.triggers.cron import CronTriggerfrom apscheduler.triggers.interval import IntervalTriggerimport osimport mimetypesfrom docx import Documentimport subprocessimport loggingimport sysapp = Flask(__name__)app.config['UPLOAD_FOLDER'] = 'uploads'app.config['MAX_CONTENT_PATH'] = 16 * 1024 * 1024 # 最大文件大小:16MBapp.secret_key = 'supersecretkey' # 用于flash消息# 确保上传文件夹存在if not os.path.exists(app.config['UPLOAD_FOLDER']): os.makedirs(app.config['UPLOAD_FOLDER'])# 配置调度器scheduler = BackgroundScheduler()scheduler.start()# 配置日志log_filename = os.path.join(app.config['UPLOAD_FOLDER'], 'scheduler.log')# 创建日志记录器logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG) # 设置为最低级别,以便所有级别的日志都可以被记录# 创建文件处理器并指定编码file_handler = logging.FileHandler(log_filename, encoding='utf-8')file_handler.setLevel(logging.DEBUG)# 创建日志格式器formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)# 确保处理器唯一if not logger.hasHandlers(): logger.addHandler(file_handler)def enable_logging(): logger.setLevel(logging.DEBUG)def disable_logging(): logger.setLevel(logging.CRITICAL)names={}@app.route('/')def index(): files = os.listdir(app.config['UPLOAD_FOLDER']) jobs = scheduler.get_jobs() return render_template('index.html', files=files, jobs=jobs,names=names)@app.route('/upload', methods=['GET', 'POST'])def upload_file(): if request.method == 'POST': file = request.files['file'] if file: filename = file.filename file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) return redirect(url_for('index')) return render_template('upload.html')@app.route('/uploads/<filename>')def uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename)@app.route('/delete/<filename>', methods=['POST'])def delete_file(filename): try: file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): os.remove(file_path) except: folder_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(folder_path) and os.path.isdir(folder_path): os.rmdir(folder_path) flash(f'文件夹 {filename} 已删除', 'success') else: flash(f'文件夹 {filename} 不存在', 'error') return redirect(url_for('index'))@app.route('/view/<filename>')def view_file(filename): file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if not os.path.exists(file_path): abort(404) mimetype, _ = mimetypes.guess_type(file_path) if mimetype and mimetype.startswith('text') or filename.endswith('.log'): with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return render_template('view_text.html', content=content, filename=filename) elif mimetype and mimetype.startswith('image'): return render_template('view_image.html', filename=filename) elif mimetype == 'application/pdf': return render_template('view_pdf.html', filename=filename) elif mimetype in ['application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document']: if filename.endswith('.docx'): content = read_docx(file_path) elif filename.endswith('.doc'): content = convert_doc_to_text(file_path) return render_template('view_text.html', content=content, filename=filename) else: return redirect(url_for('uploaded_file', filename=filename))@app.route('/command', methods=['GET', 'POST'])def command(): if request.method == 'POST': cmd = request.form['cmd'] try: result = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: result = e.output return render_template('command.html', cmd=cmd, result=result) return render_template('command.html')@app.route('/schedule', methods=['GET', 'POST'])def schedule(): if request.method == 'POST': schedule_type = request.form['schedule_type'] script = request.form['script'] if schedule_type == 'interval': interval_seconds = int(request.form['interval_seconds']) job = scheduler.add_job(run_script, IntervalTrigger(seconds=interval_seconds), args=[script]) names[job.id]=(script) flash(f'Scheduled {script} to run every {interval_seconds} seconds.', 'success') elif schedule_type == 'cron': hour = request.form['hour'] minute = request.form['minute'] job = scheduler.add_job(run_script, CronTrigger(hour=hour, minute=minute), args=[script]) names[job.id]=(script) flash(f'Scheduled {script} to run at {hour}:{minute} every day.', 'success') return redirect(url_for('index')) scripts = [f for f in os.listdir(app.config['UPLOAD_FOLDER']) if f.endswith('.py')] return render_template('schedule.html', scripts=scripts)@app.route('/remove_job/<job_id>', methods=['POST'])def remove_job(job_id): scheduler.remove_job(job_id) flash(f'Removed job {job_id}.', 'success') return redirect(url_for('index'))@app.route('/create_file', methods=['POST'])def create_file(): file_name = request.form['file_name'] if not file_name: flash('文件名不能为空', 'error') return redirect(url_for('index')) file_path = os.path.join(app.config['UPLOAD_FOLDER'], file_name) with open(file_path, 'w') as f: f.write('') flash(f'文件 {file_name} 已创建', 'success') return redirect(url_for('index'))@app.route('/create_folder', methods=['POST'])def create_folder(): folder_name = request.form['folder_name'] if not folder_name: flash('文件夹名不能为空', 'error') return redirect(url_for('index')) folder_path = os.path.join(app.config['UPLOAD_FOLDER'], folder_name) os.makedirs(folder_path, exist_ok=True) flash(f'文件夹 {folder_name} 已创建', 'success') return redirect(url_for('index'))def run_script(script): try: script_path = os.path.abspath(os.path.join(app.config['UPLOAD_FOLDER'], script)) result = subprocess.check_output(['python', script_path], stderr=subprocess.STDOUT, universal_newlines=True, encoding='utf-8', errors='ignore') enable_logging() logger.info(f"Script {script} executed successfully: {result}") disable_logging() except subprocess.CalledProcessError as e: enable_logging() logger.error(f"Error executing script {script}: {e.output}") disable_logging() except UnicodeDecodeError as e: enable_logging() logger.error(f"Unicode decoding error: {e}") disable_logging() except FileNotFoundError as e: enable_logging() logger.error(f"File not found: {e}") disable_logging()def read_docx(file_path): doc = Document(file_path) full_text = [] for para in doc.paragraphs: full_text.append(para.text) return '\n'.join(full_text)def convert_doc_to_text(file_path): # 使用libreoffice将.doc文件转换为.txt txt_path = file_path + '.txt' subprocess.run(['libreoff