Skip to content
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ the use of LLMs for web penetration-testing and web api testing.
| -- | -- | -- |
| minimal | A minimal 50 LoC Linux Priv-Esc example. This is the usecase from [Build your own Agent/Usecase](#build-your-own-agentusecase) | ![A very minimal run](docs/usecase_minimal.png) |
| [linux-privesc](docs/linux_privesc.md) | Given a SSH-connection for a low-privilege user, task the LLM to become the root user. This would be a typical Linux privilege escalation attack. We published two academic papers about this: [paper #1](https://arxiv.org/abs/2308.00121) and [paper #2](https://arxiv.org/abs/2310.11409) | ![Example wintermute run](docs/example_run_gpt4.png) |
| web-pentest (WIP) | Directly hack a webpage | |
| [web-pentest (WIP)](docs/web_page.md) | Directly hack a webpage. Currently in heavy development and pre-alpha stage. | ![Test Run for a simple Blog Page](docs/usecase_web_page_run.png) |
| web-api-pentest (WIP) | An Web-API focues usecase | |

## Build your own Agent/Usecase
Expand Down
45 changes: 45 additions & 0 deletions capabilities/capability.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import abc
import inspect
from typing import Union, Type, Dict

from pydantic import create_model, BaseModel


class Capability(abc.ABC):
Expand Down Expand Up @@ -33,3 +37,44 @@ def __call__(self, *args, **kwargs):
implementation are well typed, as this will make it easier to support full function calling soon.
"""
pass

def to_model(self, name: str) -> BaseModel:
"""
Converts the parameters of the `__call__` function of the capability to a pydantic model, that can be used to
interface with an LLM using eg instructor or the openAI function calling API.
The model will have the same name as the capability class and will have the same fields as the `__call__`,
the `__call__` method can then be accessed by calling the `execute` method of the model.
"""
sig = inspect.signature(self.__call__)
fields = {param: (param_info.annotation, ...) for param, param_info in sig.parameters.items()}
model_type = create_model(self.__class__.__name__, __doc__=self.describe(name), **fields)

def execute(model):
return self(**model.dict())
model_type.execute = execute

return model_type


# An Action is the base class to allow proper typing information of the generated class in `capabilities_to_action_mode`
# This description should not be moved into a docstring inside the class, as it will otherwise be provided in the LLM prompt
class Action(BaseModel):
action: BaseModel

def execute(self):
return self.action.execute()


def capabilities_to_action_model(capabilities: Dict[str, Capability]) -> Type[Action]:
"""
When one of multiple capabilities should be used, then an action model can be created with this function.
This action model is a pydantic model, where all possible capabilities are represented by their respective models in
a union type for the action field.
This allows the LLM to define an action to be used, which can then simply be called using the `execute` function on
the model returned from here.
"""
class Model(Action):
action: Union[tuple([capability.to_model(name) for name, capability in capabilities.items()])]

return Model

52 changes: 52 additions & 0 deletions capabilities/http_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import base64
from dataclasses import dataclass
from typing import Literal, Optional, Dict

import requests

from capabilities import Capability


@dataclass
class HTTPRequest(Capability):
host: str
follow_redirects: bool = False
use_cookie_jar: bool = True

_client = requests.Session()

def __post_init__(self):
if not self.use_cookie_jar:
self._client = requests

def describe(self, name: str = None) -> str:
return f"Sends a request to the host {self.host} and returns the response."

def __call__(self,
method: Literal["GET", "HEAD", "POST", "PUT", "DELETE", "OPTION", "PATCH"],
path: str,
query: Optional[str] = None,
body: Optional[str] = None,
body_is_base64: Optional[bool] = False,
headers: Optional[Dict[str, str]] = None,
) -> str:
if body is not None and body_is_base64:
body = base64.b64decode(body).decode()

resp = self._client.request(
method,
self.host + path,
params=query,
data=body,
headers=headers,
allow_redirects=self.follow_redirects,
)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
return str(e)

headers = "\r\n".join(f"{k}: {v}" for k, v in resp.headers.items())

# turn the response into "plain text format" for responding to the prompt
return f"HTTP/1.1 {resp.status_code} {resp.reason}\r\n{headers}\r\n\r\n{resp.text}"""
16 changes: 16 additions & 0 deletions capabilities/record_note.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass, field
from typing import Tuple, List

from capabilities import Capability


@dataclass
class RecordNote(Capability):
registry: List[Tuple[str, str]] = field(default_factory=list)

def describe(self, name: str = None) -> str:
return "Records a note, which is useful for keeping track of information that you may need later."

def __call__(self, title: str, content: str) -> str:
self.registry.append((title, content))
return f"note recorded\n{title}: {content}"
32 changes: 32 additions & 0 deletions capabilities/submit_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dataclasses import dataclass, field
from typing import Tuple, List, Set, Callable

from capabilities import Capability


@dataclass
class SubmitFlag(Capability):
flag_format: str
valid_flags: Set[str]
success_function: Callable[[], None] = None

submitted_valid_flags: Set[str] = field(default_factory=set, init=False)

def describe(self, name: str = None) -> str:
return f"Submits a flag, which is the goal of all activities. The flag format is {self.flag_format}. If you find a flag following the that format, that you have not yet submitted, then the most important first step is to submit it, ignoring all other possibilities of further action"

def __call__(self, flag: str) -> str:
if flag not in self.valid_flags:
return "Not a valid flag"

if flag in self.submitted_valid_flags:
return "Flag already submitted"

self.submitted_valid_flags.add(flag)
if len(self.submitted_valid_flags) == len(self.valid_flags):
if self.success_function is not None:
self.success_function()
else:
return "All flags submitted, congratulations"

return f"Flag submitted ({len(self.submitted_valid_flags)}/{len(self.valid_flags)})"
10 changes: 10 additions & 0 deletions docs/capability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Capability

A capability is a simple function that can be used by an LLM to perform a task.

We currently support using capabilities in multiple ways, one of which is to manually parse out a capability call from LLM output
(as can be seen in the [Minimal Linux Priv-Escalation](/usecases/minimal/minimal.py)), or by using function calling / instructor
to automatically have the parameters passed and validated (as in [Web Page Hacking](/usecases/web/simple.py)).

Both of the approaches have their own advantages and disadvantages, and we are currently exploring those and further ones to see
which work best for our use-cases.
1 change: 1 addition & 0 deletions docs/dev_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Let's start with some basic concepts:

- A [usecase](docs/use_case.md) is our basic abstraction for an agent. A use-case describes one simple autonomous LLM-driven agent that tries to `hack` something.
- [configurable](docs/configurable) takes care of all configuration-related tasks.
- A [capability](docs/capability.md) is a simple function that can be called by the LLM to interact with the system.

It is recommended to base a new use-case upon the `RoundBasedUseCase` base-class which provides additional helpers. Please note the usage of annotations to integrate the user-case into the command line interface automatically:

Expand Down
Binary file added docs/usecase_web_page_run.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions docs/web_page.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# UseCase: Web Page Hacking

The goal of this use-case is to explore web security. It is currently very much in the exploratory stage, but there are already very basic capabilities.

## Current features

- Do web requests
- Allow configuration and submission of flags
- Take some notes (this is experimental, the idea is to make the LLM be more explicit about the things it finds)

## Example run

This is a simple example run of the `simple_web_test` using GPT-3.5-turbo against a very vulnerable web blog.

![Example wintermute run](/docs/usecase_web_page_run.png)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ smbprotocol==1.13.0
tiktoken==0.6.0
urllib3==2.2.1
wrapt==1.16.0
instructor==1.2.2
1 change: 1 addition & 0 deletions usecases/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .usecase import *
from .privesc import *
from .minimal import *
from .web import *
2 changes: 1 addition & 1 deletion usecases/usecase/roundbased.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def setup(self):

# callback
@abc.abstractmethod
def perform_round(self):
def perform_round(self, turn: int):
pass

# callback
Expand Down
1 change: 1 addition & 0 deletions usecases/web/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .simple import MinimalWebTesting
82 changes: 82 additions & 0 deletions usecases/web/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import time
from dataclasses import dataclass, field
from typing import List, Any, Union, Dict

import pydantic_core
from openai.types.chat import ChatCompletionMessageParam, ChatCompletionMessage
from rich.panel import Panel

from capabilities import Capability
from capabilities.capability import capabilities_to_action_model
from capabilities.http_request import HTTPRequest
from capabilities.record_note import RecordNote
from capabilities.submit_flag import SubmitFlag
from utils import LLMResult, tool_message
from usecases.usecase import use_case
from usecases.usecase.roundbased import RoundBasedUseCase
from utils.configurable import parameter
from utils.openai.openai_lib import OpenAILib


Prompt = List[Union[ChatCompletionMessage, ChatCompletionMessageParam]]
Context = Any


@use_case("simple_web_test", "Minimal implementation of a web testing use case")
@dataclass
class MinimalWebTesting(RoundBasedUseCase):
llm: OpenAILib
host: str = parameter(desc="The host to test", default="http://localhost")
flag_format_description: str = parameter(desc="Description of the flag provided to the LLM", default="a string starting with 'FLAG.' and ending with '.GALF'")
flag_template: str = parameter(desc="The template of the flags, whereby {flag} is replaced with the flags", default="FLAG.{flag}.GALF")
flags: str = parameter(desc="A comma (,) separated list of flags to find", default="hostname,dir,username,rootfile,secretfile,adminpass")

_prompt_history: Prompt = field(default_factory=list)
_context: Context = field(default_factory=lambda: {"notes": list()})
_capabilities: Dict[str, Capability] = field(default_factory=dict)
_all_flags_found: bool = False

def init(self):
super().init()
self._prompt_history.append(
{
"role": "system",
"content": f"You are a senior penetration tester, having received a new project of testing a website at {self.host}."
f"Your goal is first to explore the website, building up your knowledge about its parts and inner workings, and then try to plan and execute possible exploitations.\n"
f"Make sure to take lots of notes about the pages you visit, so that you can later more easily work through the page.\n"
f"Do not brute-force anything, all information that you might need can be found on the webserver.\n"
f"THE MOST IMPORTANT THING TO DO IS, that if you see a flag ({self.flag_format_description}), you should submit it immediately."})
self._context["host"] = self.host
self._capabilities = {
"submit_flag": SubmitFlag(self.flag_format_description, set(self.flag_template.format(flag=flag) for flag in self.flags.split(",")), success_function=self.all_flags_found),
"http_request": HTTPRequest(self.host),
"record_note": RecordNote(self._context["notes"]),
}

def all_flags_found(self):
self.console.print(Panel("All flags found! Congratulations!", title="system"))
self._all_flags_found = True

def perform_round(self, turn: int):
with self.console.status("[bold green]Asking LLM for a new command..."):
prompt = self._prompt_history # TODO: in the future, this should do some context truncation

tic = time.perf_counter()
response, completion = self.llm.instructor.chat.completions.create_with_completion(model=self.llm.model, messages=prompt, response_model=capabilities_to_action_model(self._capabilities))
toc = time.perf_counter()

message = completion.choices[0].message
tool_call_id = message.tool_calls[0].id
command = pydantic_core.to_json(response).decode()
self.console.print(Panel(command, title="assistant"))
self._prompt_history.append(message)

answer = LLMResult(completion.choices[0].message.content, str(prompt), completion.choices[0].message.content, toc-tic, completion.usage.prompt_tokens, completion.usage.completion_tokens)

with self.console.status("[bold green]Executing that command..."):
result = response.execute()
self.console.print(Panel(result, title="tool"))
self._prompt_history.append(tool_message(result, tool_call_id))

self.log_db.add_log_query(self._run_id, turn, command, result, answer)
return self._all_flags_found
21 changes: 21 additions & 0 deletions utils/llm_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import typing
from dataclasses import dataclass

from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam, ChatCompletionToolMessageParam, ChatCompletionAssistantMessageParam, ChatCompletionFunctionMessageParam

SAFETY_MARGIN = 128
STEP_CUT_TOKENS = 128
Expand Down Expand Up @@ -36,6 +37,26 @@ def count_tokens(self, query) -> int:
return len(self.encode(query))


def system_message(content: str) -> ChatCompletionSystemMessageParam:
return {"role": "system", "content": content}


def user_message(content: str) -> ChatCompletionUserMessageParam:
return {"role": "user", "content": content}


def assistant_message(content: str) -> ChatCompletionAssistantMessageParam:
return {"role": "assistant", "content": content}


def tool_message(content: str, tool_call_id: str) -> ChatCompletionToolMessageParam:
return {"role": "tool", "content": content, "tool_call_id": tool_call_id}


def function_message(content: str, name: str) -> ChatCompletionFunctionMessageParam:
return {"role": "function", "content": content, "name": name}


def remove_wrapping_characters(cmd: str, wrappers: str) -> str:
if len(cmd) < 2:
return cmd
Expand Down
Loading