1+ import abc
2+ from dataclasses import dataclass
3+ import datetime
4+ from typing import Optional
5+ import re
6+
7+ from mako .template import Template
8+
9+ from hackingBuddyGPT .capabilities .capability import capabilities_to_simple_text_handler
10+ from hackingBuddyGPT .usecases .base import UseCase
11+ from hackingBuddyGPT .utils import llm_util
12+ from hackingBuddyGPT .utils .cli_history import SlidingCliHistory
13+ from hackingBuddyGPT .utils .openai .openai_llm import OpenAIConnection
14+ from hackingBuddyGPT .utils .logging import log_conversation , Logger , log_param , log_section
15+ from hackingBuddyGPT .utils .capability_manager import CapabilityManager
16+ from hackingBuddyGPT .utils .shell_root_detection import got_root
17+
18+ @dataclass
19+ class CommandStrategy (UseCase , abc .ABC ):
20+
21+ _capabilities : CapabilityManager = None
22+
23+ _sliding_history : SlidingCliHistory = None
24+
25+ _max_history_size : int = 0
26+
27+ _template : Template = None
28+
29+ _template_params = {}
30+
31+ max_turns : int = 10
32+
33+ llm : OpenAIConnection = None
34+
35+ log : Logger = log_param
36+
37+ disable_history : bool = False
38+
39+ def before_run (self ):
40+ pass
41+
42+ def after_run (self ):
43+ pass
44+
45+ def after_round (self , cmd , result , got_root ):
46+ pass
47+
48+ def get_space_for_history (self ):
49+ pass
50+
51+ def init (self ):
52+ super ().init ()
53+
54+ self ._capabilities = CapabilityManager (self .log )
55+
56+ self ._sliding_history = SlidingCliHistory (self .llm )
57+
58+ @log_section ("Asking LLM for a new command..." )
59+ def get_next_command (self ) -> tuple [str , int ]:
60+ history = ""
61+ if not self .disable_history :
62+ history = self ._sliding_history .get_history (self ._max_history_size - self .get_state_size ())
63+
64+ self ._template_params .update ({"history" : history })
65+ cmd = self .llm .get_response (self ._template , ** self ._template_params )
66+ message_id = self .log .call_response (cmd )
67+
68+ return llm_util .cmd_output_fixer (cmd .result ), message_id
69+
70+ @log_section ("Executing that command..." )
71+ def run_command (self , cmd , message_id ) -> tuple [Optional [str ], bool ]:
72+ _capability_descriptions , parser = capabilities_to_simple_text_handler (self ._capabilities ._capabilities , default_capability = self ._capabilities ._default_capability )
73+ start_time = datetime .datetime .now ()
74+ success , * output = parser (cmd )
75+ if not success :
76+ self .log .add_tool_call (message_id , tool_call_id = 0 , function_name = "" , arguments = cmd , result_text = output [0 ], duration = 0 )
77+ return output [0 ], False
78+
79+ assert len (output ) == 1
80+ capability , cmd , (result , got_root ) = output [0 ]
81+ duration = datetime .datetime .now () - start_time
82+ self .log .add_tool_call (message_id , tool_call_id = 0 , function_name = capability , arguments = cmd , result_text = result , duration = duration )
83+
84+ return result , got_root
85+
86+ def check_success (self , cmd , result ) -> bool :
87+ ansi_escape = re .compile (r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])" )
88+ last_line = result .split ("\n " )[- 1 ] if result else ""
89+ last_line = ansi_escape .sub ("" , last_line )
90+ return got_root (self .conn .hostname , last_line )
91+
92+
93+ @log_conversation ("Asking LLM for a new command..." )
94+ def perform_round (self , turn : int ) -> bool :
95+ # get the next command and run it
96+ cmd , message_id = self .get_next_command ()
97+ result , task_successful = self .run_command (cmd , message_id )
98+
99+ # maybe move the 'got root' detection here?
100+ # TODO: also can I use llm-as-judge for that? or do I have to do this
101+ # on a per-action base (maybe add a .task_successful(cmd, result, options) -> boolean to the action?
102+ task_successful2 = self .check_success (cmd , result )
103+ assert (task_successful == task_successful2 )
104+
105+ self .after_round (cmd , result , task_successful )
106+
107+ # store the results in our local history
108+ if not self .disable_history :
109+ self ._sliding_history .add_command (cmd , result )
110+
111+ # signal if we were successful in our task
112+ return task_successful
113+
114+ @log_conversation ("Starting run..." )
115+ def run (self , configuration ):
116+
117+ self .configuration = configuration
118+ self .log .start_run (self .get_name (), self .serialize_configuration (configuration ))
119+
120+ self ._template_params ["capabilities" ] = self ._capabilities .get_capability_block ()
121+
122+
123+ # calculate sizes
124+ self ._max_history_size = self .llm .context_size - llm_util .SAFETY_MARGIN - self .llm .count_tokens (self ._template .source )
125+
126+ self .before_run ()
127+
128+ got_root = False
129+
130+ turn = 1
131+ try :
132+ while turn <= self .max_turns and not got_root :
133+ with self .log .section (f"round { turn } " ):
134+ self .log .console .log (f"[yellow]Starting turn { turn } of { self .max_turns } " )
135+
136+ got_root = self .perform_round (turn )
137+
138+ turn += 1
139+
140+ self .after_run ()
141+
142+ # write the final result to the database and console
143+ if got_root :
144+ self .log .run_was_success ()
145+ else :
146+ self .log .run_was_failure ("maximum turn number reached" )
147+
148+ return got_root
149+ except Exception :
150+ import traceback
151+ self .log .run_was_failure ("exception occurred" , details = f":\n \n { traceback .format_exc ()} " )
152+ raise
0 commit comments