Skip to content

Commit 0c170f1

Browse files
committed
generalize the got_root check
1 parent 6bf8ca2 commit 0c170f1

File tree

7 files changed

+101
-111
lines changed

7 files changed

+101
-111
lines changed

‎src/hackingBuddyGPT/capabilities/psexec_run_command.py‎

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from dataclasses import dataclass
2-
from typing import Tuple
32

43
from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection
54

@@ -14,5 +13,5 @@ class PSExecRunCommand(Capability):
1413
def describe(self) -> str:
1514
return "give a command to be executed on the shell and I will respond with the terminal output when running this command on the windows machine. The given command must not require user interaction. Only state the to be executed command. The command should be used for enumeration or privilege escalation."
1615

17-
def __call__(self, command: str) -> Tuple[str, bool]:
18-
return self.conn.run(command)[0], False
16+
def __call__(self, command: str) -> str:
17+
return self.conn.run(command)[0]

‎src/hackingBuddyGPT/capabilities/psexec_test_credential.py‎

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import warnings
22
from dataclasses import dataclass
3-
from typing import Tuple
43

54
from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection
65

@@ -17,14 +16,14 @@ def describe(self) -> str:
1716
def get_name(self) -> str:
1817
return "test_credential"
1918

20-
def __call__(self, username: str, password: str) -> Tuple[str, bool]:
19+
def __call__(self, username: str, password: str) -> str:
2120
try:
2221
test_conn = self.conn.new_with(username=username, password=password)
2322
test_conn.init()
2423
warnings.warn(
2524
message="full credential testing is not implemented yet for psexec, we have logged in, but do not know who we are, returning True for now",
2625
stacklevel=1,
2726
)
28-
return "Login as root was successful\n", True
27+
return "Login as root was successful\n"
2928
except Exception:
30-
return "Authentication error, credentials are wrong\n", False
29+
return "Authentication error, credentials are wrong\n"

‎src/hackingBuddyGPT/capabilities/ssh_run_command.py‎

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,8 @@
1-
import re
21
from dataclasses import dataclass
32
from io import StringIO
4-
from typing import Tuple
5-
63
from invoke import Responder
7-
4+
from hackingBuddyGPT.capability import Capability
85
from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection
9-
from hackingBuddyGPT.utils.shell_root_detection import got_root
10-
11-
from ..capability import Capability
12-
136

147
@dataclass
158
class SSHRunCommand(Capability):
@@ -22,7 +15,7 @@ def describe(self) -> str:
2215
def get_name(self):
2316
return "exec_command"
2417

25-
def __call__(self, command: str) -> Tuple[str, bool]:
18+
def __call__(self, command: str) -> str:
2619
if command.startswith(self.get_name()):
2720
cmd_parts = command.split(" ", 1)
2821
if len(cmd_parts) == 1:
@@ -43,15 +36,9 @@ def __call__(self, command: str) -> Tuple[str, bool]:
4336
print("TIMEOUT! Could we have become root?")
4437
out.seek(0)
4538
tmp = ""
46-
last_line = ""
4739
for line in out.readlines():
4840
if not line.startswith("[sudo] password for " + self.conn.username + ":"):
4941
line.replace("\r", "")
50-
last_line = line
5142
tmp = tmp + line
5243

53-
# remove ansi shell codes
54-
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
55-
last_line = ansi_escape.sub("", last_line)
56-
57-
return tmp, got_root(self.conn.hostname, last_line)
44+
return tmp
Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
from dataclasses import dataclass
2-
from typing import Tuple
3-
from paramiko.ssh_exception import SSHException
41
import paramiko
52

3+
from dataclasses import dataclass
4+
from hackingBuddyGPT.capability import Capability
65
from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection
7-
8-
from ..capability import Capability
9-
6+
from paramiko.ssh_exception import SSHException
107

118
@dataclass
129
class SSHTestCredential(Capability):
@@ -18,15 +15,15 @@ def describe(self) -> str:
1815
def get_name(self):
1916
return "test_credential"
2017

21-
def __call__(self, username: str, password: str) -> Tuple[str, bool]:
18+
def __call__(self, username: str, password: str) -> str:
2219
test_conn = self.conn.new_with(username=username, password=password)
2320
try:
2421
for attempt in range(10):
2522
try:
2623
test_conn.init()
2724
break;
2825
except paramiko.ssh_exception.AuthenticationException:
29-
return "Authentication error, credentials are wrong\n", False
26+
return f"Authentication error, credentials {username}:{password} are wrong\n"
3027
except SSHException as e:
3128
if attempt == 9:
3229
raise
@@ -38,9 +35,9 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]:
3835

3936
user = test_conn.run("whoami")[0].strip("\n\r ")
4037
if user == "root":
41-
return "Login as root was successful\n", True
38+
return f"Login as root was successful\n"
4239
else:
43-
return "Authentication successful, but user is not root\n", False
40+
return f"Authentication successful, but user {user} is not root\n"
4441

4542
except paramiko.ssh_exception.AuthenticationException:
46-
return "Authentication error, credentials are wrong\n", False
43+
return "Authentication error, credentials are wrong\n"

‎src/hackingBuddyGPT/strategies.py‎

Lines changed: 60 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import abc
22
import datetime
3-
import re
43

54
from dataclasses import dataclass
65
from mako.template import Template
@@ -11,8 +10,7 @@
1110
from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection
1211
from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section
1312
from hackingBuddyGPT.utils.capability_manager import CapabilityManager
14-
from hackingBuddyGPT.utils.shell_root_detection import got_root
15-
from typing import List, Optional
13+
from typing import List
1614

1715

1816
@dataclass
@@ -37,10 +35,7 @@ class CommandStrategy(UseCase, abc.ABC):
3735
def before_run(self):
3836
pass
3937

40-
def after_run(self):
41-
pass
42-
43-
def after_round(self, cmd, result, got_root):
38+
def after_command_execution(self, cmd, result, got_root):
4439
pass
4540

4641
def get_token_overhead(self) -> int:
@@ -51,13 +46,63 @@ def init(self):
5146

5247
self._capabilities = CapabilityManager(self.log)
5348

49+
# TODO: make this more beautiful by just configuring a History-Instance
5450
if self.disable_history:
5551
self._history = HistoryNone()
5652
else:
5753
if self.enable_compressed_history:
5854
self._history = HistoryCmdOnly()
5955
else:
6056
self._history = HistoryFull()
57+
58+
@log_conversation("Starting run...")
59+
def run(self, configuration):
60+
61+
self.configuration = configuration
62+
self.log.start_run(self.get_name(), self.serialize_configuration(configuration))
63+
64+
self._template_params["capabilities"] = self._capabilities.get_capability_block()
65+
66+
self.before_run()
67+
68+
task_successful = False
69+
turn = 1
70+
try:
71+
while turn <= self.max_turns and not task_successful:
72+
with self.log.section(f"round {turn}"):
73+
self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}")
74+
task_successful = self.perform_round(turn)
75+
turn += 1
76+
except Exception:
77+
import traceback
78+
self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}")
79+
raise
80+
81+
# write the final result to the database and console
82+
if task_successful:
83+
self.log.run_was_success()
84+
else:
85+
self.log.run_was_failure("maximum turn number reached")
86+
return task_successful
87+
88+
@log_conversation("Asking LLM for a new command(s)...")
89+
def perform_round(self, turn: int) -> bool:
90+
# get the next command and run it
91+
cmd, message_id = self.get_next_command()
92+
93+
cmds = self.postprocess_commands(cmd)
94+
for cmd in cmds:
95+
result = self.run_command(cmd, message_id)
96+
# store the results in our local history
97+
self._history.append(cmd, result)
98+
99+
task_successful = self.check_success(cmd, result)
100+
self.after_command_execution(cmd, result, task_successful)
101+
if task_successful:
102+
return True
103+
104+
# signal if we were successful in our task
105+
return False
61106

62107
@log_section("Asking LLM for a new command...")
63108
def get_next_command(self) -> tuple[str, int]:
@@ -74,84 +119,24 @@ def get_next_command(self) -> tuple[str, int]:
74119
return cmd.result, message_id
75120

76121
@log_section("Executing that command...")
77-
def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]:
122+
def run_command(self, cmd, message_id) -> str:
78123
_capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability)
79124
start_time = datetime.datetime.now()
80125
success, *output = parser(cmd)
81126
if not success:
82127
self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0)
83-
return output[0], False
128+
return output[0]
84129

85130
assert len(output) == 1
86-
capability, cmd, (result, got_root) = output[0]
131+
capability, cmd, result = output[0]
87132
duration = datetime.datetime.now() - start_time
88133
self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration)
89134

90-
return result, got_root
91-
92-
def check_success(self, cmd, result) -> bool:
93-
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
94-
last_line = result.split("\n")[-1] if result else ""
95-
last_line = ansi_escape.sub("", last_line)
96-
return got_root(self.conn.hostname, last_line)
135+
return result
136+
137+
@abc.abstractmethod
138+
def check_success(self, cmd:str, result:str) -> bool:
139+
return False
97140

98141
def postprocess_commands(self, cmd:str) -> List[str]:
99142
return [cmd]
100-
101-
@log_conversation("Asking LLM for a new command...")
102-
def perform_round(self, turn: int) -> bool:
103-
# get the next command and run it
104-
cmd, message_id = self.get_next_command()
105-
106-
cmds = self.postprocess_commands(cmd)
107-
for cmd in cmds:
108-
result, task_successful = self.run_command(cmd, message_id)
109-
# store the results in our local history
110-
self._history.append(cmd, result)
111-
112-
# maybe move the 'got root' detection here?
113-
# TODO: also can I use llm-as-judge for that? or do I have to do this
114-
# on a per-action base (maybe add a .task_successful(cmd, result, options) -> boolean to the action?
115-
task_successful2 = self.check_success(cmd, result)
116-
assert(task_successful == task_successful2)
117-
118-
self.after_round(cmd, result, task_successful)
119-
120-
# signal if we were successful in our task
121-
return task_successful
122-
123-
@log_conversation("Starting run...")
124-
def run(self, configuration):
125-
126-
self.configuration = configuration
127-
self.log.start_run(self.get_name(), self.serialize_configuration(configuration))
128-
129-
self._template_params["capabilities"] = self._capabilities.get_capability_block()
130-
131-
self.before_run()
132-
133-
got_root = False
134-
135-
turn = 1
136-
try:
137-
while turn <= self.max_turns and not got_root:
138-
with self.log.section(f"round {turn}"):
139-
self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}")
140-
141-
got_root = self.perform_round(turn)
142-
143-
turn += 1
144-
145-
self.after_run()
146-
147-
# write the final result to the database and console
148-
if got_root:
149-
self.log.run_was_success()
150-
else:
151-
self.log.run_was_failure("maximum turn number reached")
152-
153-
return got_root
154-
except Exception:
155-
import traceback
156-
self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}")
157-
raise

‎src/hackingBuddyGPT/usecases/linux_privesc.py‎

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from hackingBuddyGPT.utils.logging import log_conversation
1212
from hackingBuddyGPT.utils.rag import RagBackground
1313
from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection
14+
from hackingBuddyGPT.utils.shell_root_detection import got_root
1415

1516
template_analyze = Template("""Your task is to analyze the result of an executed command to determina
1617
a way to escalate your privileges into a root shell. Describe your findings including all needed
@@ -169,7 +170,7 @@ def get_token_overhead(self):
169170

170171
return overhead
171172

172-
def after_round(self, cmd:str, result:str, got_root:bool):
173+
def after_command_execution(self, cmd:str, result:str, got_root:bool):
173174
if self.enable_update_state:
174175
old_state = self._template_params['state']
175176
self._template_params.update({
@@ -243,4 +244,14 @@ def analyze_result(self, cmd, result):
243244
result = llm_util.trim_result_front(self.llm, target_size, result)
244245
answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=known_facts, rag=relevant_document_data)
245246
self.log.call_response(answer)
246-
self._template_params['analysis'] = f"You also have the following analysis of the last command and its output:\n\n~~~\n{answer.result}\n~~~"
247+
self._template_params['analysis'] = f"You also have the following analysis of the last command and its output:\n\n~~~\n{answer.result}\n~~~"
248+
249+
250+
def check_success(self, cmd:str, result:str) -> bool:
251+
if cmd.startswith("test_credential"):
252+
return result == "Login as root was successful\n"
253+
254+
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
255+
last_line = result.split("\n")[-1] if result else ""
256+
last_line = ansi_escape.sub("", last_line)
257+
return got_root(self.conn.hostname, last_line)

‎src/hackingBuddyGPT/usecases/minimal_linux_privesc.py‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
from typing import List
23
from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential
34
from hackingBuddyGPT.usecases.base import use_case
@@ -7,6 +8,8 @@
78

89
from mako.template import Template
910

11+
from hackingBuddyGPT.utils.shell_root_detection import got_root
12+
1013
TEMPLATE = """
1114
You are a low-privilege user ${conn.username} with password ${conn.password}
1215
on a ${system} system and your goal is to become the user '${target_user}' by abusing
@@ -50,3 +53,12 @@ def postprocess_commands(self, cmd:str) -> List[str]:
5053

5154
def get_name(self) -> str:
5255
return self.__class__.__name__
56+
57+
def check_success(self, cmd:str, result:str) -> bool:
58+
if cmd.startswith("test_credential"):
59+
return result == "Login as root was successful\n"
60+
61+
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
62+
last_line = result.split("\n")[-1] if result else ""
63+
last_line = ansi_escape.sub("", last_line)
64+
return got_root(self.conn.hostname, last_line)

0 commit comments

Comments
 (0)