diff --git a/.github/workflows/dependency-review.yml b/.github/workflows/dependency-review.yml new file mode 100644 index 00000000..e665a562 --- /dev/null +++ b/.github/workflows/dependency-review.yml @@ -0,0 +1,39 @@ +# Dependency Review Action +# +# This Action will scan dependency manifest files that change as part of a Pull Request, +# surfacing known-vulnerable versions of the packages declared or updated in the PR. +# Once installed, if the workflow run is marked as required, PRs introducing known-vulnerable +# packages will be blocked from merging. +# +# Source repository: https://github.com/actions/dependency-review-action +# Public documentation: https://docs.github.com/en/code-security/supply-chain-security/understanding-your-software-supply-chain/about-dependency-review#dependency-review-enforcement +name: 'Dependency review' +on: + pull_request: + branches: [ "main", "development" ] + +# If using a dependency submission action in this workflow this permission will need to be set to: +# +# permissions: +# contents: write +# +# https://docs.github.com/en/enterprise-cloud@latest/code-security/supply-chain-security/understanding-your-software-supply-chain/using-the-dependency-submission-api +permissions: + contents: read + # Write permissions for pull-requests are required for using the `comment-summary-in-pr` option, comment out if you aren't using this option + pull-requests: write + +jobs: + dependency-review: + runs-on: ubuntu-latest + steps: + - name: 'Checkout repository' + uses: actions/checkout@v4 + - name: 'Dependency Review' + uses: actions/dependency-review-action@v4 + # Commonly enabled options, see https://github.com/actions/dependency-review-action#configuration-options for all available options. + with: + comment-summary-in-pr: always + # fail-on-severity: moderate + # deny-licenses: GPL-1.0-or-later, LGPL-2.0-or-later + # retry-on-snapshot-warnings: true diff --git a/README.md b/README.md index d8473829..5cfed5e5 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,11 @@ HackingBuddyGPT helps security researchers use LLMs to discover new attack vectors and save the world (or earn bug bounties) in 50 lines of code or less. In the long run, we hope to make the world a safer place by empowering security professionals to get more hacking done by using AI. The more testing they can do, the safer all of us will get. +**🆕 New Feature**: hackingBuddyGPT now supports both SSH connections to remote targets and local shell execution for easier testing and development! + +**⚠️ WARNING**: This software will execute commands on live environments. When using local shell mode, commands will be executed on your local system, which could potentially lead to data loss, system modification, or security vulnerabilities. Always use appropriate precautions and consider using isolated environments or virtual machines for testing. + + We aim to become **THE go-to framework for security researchers** and pen-testers interested in using LLMs or LLM-based autonomous agents for security testing. To aid their experiments, we also offer re-usable [linux priv-esc benchmarks](https://github.com/ipa-lab/benchmark-privesc-linux) and publish all our findings as open-access reports. If you want to use hackingBuddyGPT and need help selecting the best LLM for your tasks, [we have a paper comparing multiple LLMs](https://arxiv.org/abs/2310.11409). @@ -65,13 +70,14 @@ Our initial forays were focused upon evaluating the efficiency of LLMs for [linu privilege escalation attacks](https://arxiv.org/abs/2310.11409) and we are currently breaching out into evaluation the use of LLMs for web penetration-testing and web api testing. -| Name | Description | Screenshot | -|------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| [minimal](https://docs.hackingbuddy.ai/docs/dev-guide/dev-quickstart) | A minimal 50 LoC Linux Priv-Esc example. This is the usecase from [Build your own Agent/Usecase](#build-your-own-agentusecase) | ![A very minimal run](https://docs.hackingbuddy.ai/run_archive/2024-04-29_minimal.png) | -| [linux-privesc](https://docs.hackingbuddy.ai/docs/usecases/linux-priv-esc) | Given an SSH-connection for a low-privilege user, task the LLM to become the root user. This would be a typical Linux privilege escalation attack. We published two academic papers about this: [paper #1](https://arxiv.org/abs/2308.00121) and [paper #2](https://arxiv.org/abs/2310.11409) | ![Example wintermute run](https://docs.hackingbuddy.ai/run_archive/2024-04-06_linux.png) | -| [web-pentest (WIP)](https://docs.hackingbuddy.ai/docs/usecases/web) | Directly hack a webpage. Currently in heavy development and pre-alpha stage. | ![Test Run for a simple Blog Page](https://docs.hackingbuddy.ai/run_archive/2024-05-03_web.png) | -| [web-api-pentest (WIP)](https://docs.hackingbuddy.ai/docs/usecases/web-api) | Directly test a REST API. Currently in heavy development and pre-alpha stage. (Documentation and testing of REST API.) | Documentation:![web_api_documentation.png](https://docs.hackingbuddy.ai/run_archive/2024-05-15_web-api_documentation.png) Testing:![web_api_testing.png](https://docs.hackingbuddy.ai/run_archive/2024-05-15_web-api.png) | +| Name | Description | Screenshot | +|------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| [minimal](https://docs.hackingbuddy.ai/docs/dev-guide/dev-quickstart) | A minimal 50 LoC Linux Priv-Esc example. This is the usecase from [Build your own Agent/Usecase](#build-your-own-agentusecase) | ![A very minimal run](https://docs.hackingbuddy.ai/run_archive/2024-04-29_minimal.png) | +| [linux-privesc](https://docs.hackingbuddy.ai/docs/usecases/linux-priv-esc) | Given a connection (SSH or local shell) for a low-privilege user, task the LLM to become the root user. This would be a typical Linux privilege escalation attack. We published two academic papers about this: [paper #1](https://arxiv.org/abs/2308.00121) and [paper #2](https://arxiv.org/abs/2310.11409) | ![Example wintermute run](https://docs.hackingbuddy.ai/run_archive/2024-04-06_linux.png) | +| [web-pentest (WIP)](https://docs.hackingbuddy.ai/docs/usecases/web) | Directly hack a webpage. Currently in heavy development and pre-alpha stage. | ![Test Run for a simple Blog Page](https://docs.hackingbuddy.ai/run_archive/2024-05-03_web.png) | +| [web-api-pentest (WIP)](https://docs.hackingbuddy.ai/docs/usecases/web-api) | Directly test a REST API. Currently in heavy development and pre-alpha stage. (Documentation and testing of REST API.) | Documentation:![web_api_documentation.png](https://docs.hackingbuddy.ai/run_archive/2024-05-15_web-api_documentation.png) Testing:![web_api_testing.png](https://docs.hackingbuddy.ai/run_archive/2024-05-15_web-api.png) | | [extended linux-privesc](https://docs.hackingbuddy.ai/docs/usecases/extended-linux-privesc) | This usecases extends linux-privesc with additional features such as retrieval augmented generation (RAG) or chain-of-thought (CoT) | ![Extended Linux Privilege Escalation Run](https://docs.hackingbuddy.ai/run_archive/2025-4-14_extended_privesc_usecase_1.png) ![Extended Linux Privilege Escalation Run](https://docs.hackingbuddy.ai/run_archive/2025-4-14_extended_privesc_usecase_2.png) | + ## Build your own Agent/Usecase So you want to create your own LLM hacking agent? We've got you covered and taken care of the tedious groundwork. @@ -79,7 +85,7 @@ So you want to create your own LLM hacking agent? We've got you covered and take Create a new usecase and implement `perform_round` containing all system/LLM interactions. We provide multiple helper and base classes so that a new experiment can be implemented in a few dozen lines of code. Tedious tasks, such as connecting to the LLM, logging, etc. are taken care of by our framework. Check our [developer quickstart quide](https://docs.hackingbuddy.ai/docs/dev-guide/dev-quickstart) for more information. -The following would create a new (minimal) linux privilege-escalation agent. Through using our infrastructure, this already uses configurable LLM-connections (e.g., for testing OpenAI or locally run LLMs), logs trace data to a local sqlite database for each run, implements a round limit (after which the agent will stop if root has not been achieved until then) and can connect to a linux target over SSH for fully-autonomous command execution (as well as password guessing). +The following would create a new (minimal) linux privilege-escalation agent. Through using our infrastructure, this already uses configurable LLM-connections (e.g., for testing OpenAI or locally run LLMs), logs trace data to a local sqlite database for each run, implements a round limit (after which the agent will stop if root has not been achieved until then) and can connect to a target system either locally or over SSH for fully-autonomous command execution (as well as password guessing). ~~~ python template_dir = pathlib.Path(__file__).parent @@ -155,7 +161,9 @@ We try to keep our python dependencies as light as possible. This should allow f 1. an OpenAI API account, you can find the needed keys [in your account page](https://platform.openai.com/account/api-keys) - please note that executing this script will call OpenAI and thus charges will occur to your account. Please keep track of those. -2. a potential target that is accessible over SSH. You can either use a deliberately vulnerable machine such as [Lin.Security.1](https://www.vulnhub.com/entry/) or a security benchmark such as our [linux priv-esc benchmark](https://github.com/ipa-lab/benchmark-privesc-linux). +2. a target environment to test against. You have two options: + - **Local Shell**: Use your local system (useful for testing and development) + - **SSH Target**: A remote machine accessible over SSH. You can use a deliberately vulnerable machine such as [Lin.Security.1](https://www.vulnhub.com/entry/) or a security benchmark such as our [linux priv-esc benchmark](https://github.com/ipa-lab/benchmark-privesc-linux). To get everything up and running, clone the repo, download requirements, setup API keys and credentials, and start `wintermute.py`: @@ -229,11 +237,45 @@ usage: src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc [--help] [--config con --conn.port='2222' (default from .env file, alternatives: 22 from builtin) ``` -### Provide a Target Machine over SSH +### Connection Options: Local Shell vs SSH + +hackingBuddyGPT now supports two connection modes: + +#### Local Shell Mode +Use your local system for testing and development. This is useful for quick experimentation without needing a separate target machine. + +**Setup Steps:** +1. First, create a new tmux session with a specific name: + ```bash + $ tmux new-session -s + ``` + +2. Once you have the tmux shell running, use hackingBuddyGPT to interact with it: + ```bash + # Local shell with tmux session + $ python src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc --conn=local_shell --conn.tmux_session= + ``` + +**Example:** +```bash +# Step 1: Create tmux session named "hacking_session" +$ tmux new-session -s hacking_session + +# Step 2: In another terminal, run hackingBuddyGPT +$ python src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc --conn=local_shell --conn.tmux_session=hacking_session +``` + +#### SSH Mode +Connect to a remote target machine over SSH. This is the traditional mode for testing against vulnerable VMs. + +```bash +# SSH connection (note the updated format with --conn=ssh) +$ python src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc --conn=ssh --conn.host=192.168.122.151 --conn.username=lowpriv --conn.password=trustno1 +``` -The next important part is having a machine that we can run our agent against. In our case, the target machine will be situated at `192.168.122.151`. +When using SSH mode, the target machine should be situated at your specified IP address (e.g., `192.168.122.151` in the example above). -We are using vulnerable Linux systems running in Virtual Machines for this. Never run this against real systems. +We are using vulnerable Linux systems running in Virtual Machines for SSH testing. Never run this against real production systems. > 💡 **We also provide vulnerable machines!** > @@ -277,9 +319,13 @@ Finally we can run hackingBuddyGPT against our provided test VM. Enjoy! With that out of the way, let's look at an example hackingBuddyGPT run. Each run is structured in rounds. At the start of each round, hackingBuddyGPT asks a LLM for the next command to execute (e.g., `whoami`) for the first round. It then executes that command on the virtual machine, prints its output and starts a new round (in which it also includes the output of prior rounds) until it reaches step number 10 or becomes root: ```bash -# start wintermute, i.e., attack the configured virtual machine -$ python src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc --llm.api_key=sk...ChangeMeToYourOpenAiApiKey --llm.model=gpt-4-turbo --llm.context_size=8192 --conn.host=192.168.122.151 --conn.username=lowpriv --conn.password=trustno1 --conn.hostname=test1 +# Example 1: Using local shell with tmux session +# First create the tmux session: tmux new-session -s hacking_session +# Then run hackingBuddyGPT: +$ python src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc --llm.api_key=sk...ChangeMeToYourOpenAiApiKey --llm.model=gpt-4-turbo --llm.context_size=8192 --conn=local_shell --conn.tmux_session=hacking_session +# Example 2: Using SSH connection (updated format) +$ python src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc --llm.api_key=sk...ChangeMeToYourOpenAiApiKey --llm.model=gpt-4-turbo --llm.context_size=8192 --conn=ssh --conn.host=192.168.122.151 --conn.username=lowpriv --conn.password=trustno1 --conn.hostname=test1 # install dependencies for testing if you want to run the tests $ pip install '.[testing]' diff --git a/src/hackingBuddyGPT/capabilities/http_request.py b/src/hackingBuddyGPT/capabilities/http_request.py index 9c9961ae..874cf253 100644 --- a/src/hackingBuddyGPT/capabilities/http_request.py +++ b/src/hackingBuddyGPT/capabilities/http_request.py @@ -60,7 +60,7 @@ def __call__( allow_redirects=self.follow_redirects, ) except requests.exceptions.RequestException as e: - url = self.host + ("" if path.startswith("/") else "/") + path + ("?{query}" if query else "") + url = self.host + ("" if path.startswith("/") else "/") + path + (f"?{query}" if query else "") return f"Could not request '{url}': {e}" response_headers = "\r\n".join(f"{k}: {v}" for k, v in resp.headers.items()) diff --git a/src/hackingBuddyGPT/capabilities/local_shell.py b/src/hackingBuddyGPT/capabilities/local_shell.py new file mode 100644 index 00000000..4e90754a --- /dev/null +++ b/src/hackingBuddyGPT/capabilities/local_shell.py @@ -0,0 +1,37 @@ +import re +from dataclasses import dataclass +from typing import Tuple + +from hackingBuddyGPT.capabilities import Capability +from hackingBuddyGPT.utils.local_shell import LocalShellConnection + + +@dataclass +class LocalShellCapability(Capability): + conn: LocalShellConnection + + def describe(self) -> str: + return "give a command to be executed and I will respond with the terminal output when running this command on the shell via tmux. The given command must not require user interaction. Do not use quotation marks in front and after your command." + + def get_name(self): + return "local_exec" + + def _got_root(self, output: str) -> bool: + """Check if we got root access based on the command output.""" + if not output.strip(): + return False + + lines = output.strip().split('\n') + last_line = lines[-1] if lines else '' + + # Check for common root indicators + return ( + "root" in output.lower() or + last_line.strip().endswith("#") or + "root@" in last_line or + last_line.strip() == "#" + ) + + def __call__(self, cmd: str) -> Tuple[str, bool]: + out, _, _ = self.conn.run(cmd) # This is CORRECT - use the commented version + return out, self._got_root(out) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/privesc/linux.py b/src/hackingBuddyGPT/usecases/privesc/linux.py old mode 100644 new mode 100755 index 7b9228e6..38a2d755 --- a/src/hackingBuddyGPT/usecases/privesc/linux.py +++ b/src/hackingBuddyGPT/usecases/privesc/linux.py @@ -1,18 +1,24 @@ from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential +from hackingBuddyGPT.capabilities.local_shell import LocalShellCapability from hackingBuddyGPT.usecases.base import AutonomousAgentUseCase, use_case from hackingBuddyGPT.utils import SSHConnection - +from hackingBuddyGPT.utils.local_shell import LocalShellConnection +from typing import Union from .common import Privesc class LinuxPrivesc(Privesc): - conn: SSHConnection = None + conn: Union[SSHConnection, LocalShellConnection] = None system: str = "linux" def init(self): super().init() - self.add_capability(SSHRunCommand(conn=self.conn), default=True) - self.add_capability(SSHTestCredential(conn=self.conn)) + if isinstance(self.conn, LocalShellConnection): + self.add_capability(LocalShellCapability(conn=self.conn), default=True) + self.add_capability(SSHTestCredential(conn=self.conn)) + else: + self.add_capability(SSHRunCommand(conn=self.conn), default=True) + self.add_capability(SSHTestCredential(conn=self.conn)) @use_case("Linux Privilege Escalation") diff --git a/src/hackingBuddyGPT/utils/local_shell/__init__.py b/src/hackingBuddyGPT/utils/local_shell/__init__.py new file mode 100644 index 00000000..93e07699 --- /dev/null +++ b/src/hackingBuddyGPT/utils/local_shell/__init__.py @@ -0,0 +1,3 @@ +from .local_shell import LocalShellConnection + +__all__ = ["LocalShellConnection"] diff --git a/src/hackingBuddyGPT/utils/local_shell/local_shell.py b/src/hackingBuddyGPT/utils/local_shell/local_shell.py new file mode 100755 index 00000000..0ecf913c --- /dev/null +++ b/src/hackingBuddyGPT/utils/local_shell/local_shell.py @@ -0,0 +1,335 @@ +from dataclasses import dataclass, field +from typing import Optional, Tuple +import time +import uuid +import subprocess +import re +import signal +import getpass + +from hackingBuddyGPT.utils.configurable import configurable + +@configurable("local_shell", "attaches to a running local shell inside tmux using tmux") +@dataclass +class LocalShellConnection: + tmux_session: str = field(metadata={"help": "tmux session name of the running shell inside tmux"}) + delay: float = field(default=0.5, metadata={"help": "delay between commands"}) + max_wait: int = field(default=300, metadata={"help": "maximum wait time for command completion"}) + + # Static attributes for connection info + username: str = field(default_factory=getpass.getuser, metadata={"help": "username for the connection"}) + password: str = field(default="", metadata={"help": "password for the connection"}) + host: str = field(default="localhost", metadata={"help": "host for the connection"}) + hostname: str = field(default="localhost", metadata={"help": "hostname for the connection"}) + port: Optional[int] = field(default=None, metadata={"help": "port for the connection"}) + keyfilename: str = field(default="", metadata={"help": "key filename for the connection"}) + + # Internal state + last_output_hash: Optional[int] = field(default=None, init=False) + _initialized: bool = field(default=False, init=False) + + def init(self): + if not self.check_session(): + raise RuntimeError(f"Tmux session '{self.tmux_session}' does not exist. Please create it first or use an existing session name.") + else: + print(f"Connected to existing tmux session: {self.tmux_session}") + self._initialized = True + + def new_with(self, *, tmux_session=None, delay=None, max_wait=None) -> "LocalShellConnection": + return LocalShellConnection( + tmux_session=tmux_session or self.tmux_session, + delay=delay or self.delay, + max_wait=max_wait or self.max_wait, + ) + + def run(self, cmd, *args, **kwargs) -> Tuple[str, str, int]: + """ + Run a command and return (stdout, stderr, return_code). + This is the main interface method that matches the project pattern. + """ + if not self._initialized: + self.init() + + if not cmd.strip(): + return "", "", 0 + + try: + output = self.run_with_unique_markers(cmd) + + return output, "", 0 + except Exception as e: + return "", str(e), 1 + + def send_command(self, command): + """Send a command to the tmux session.""" + try: + subprocess.run(['tmux', 'send-keys', '-t', self.tmux_session, command, 'Enter'], check=True) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to send command to tmux: {e}") + + def capture_output(self, history_lines=10000): + """Capture the entire tmux pane content including scrollback.""" + try: + # Capture with history to get more content + result = subprocess.run( + ['tmux', 'capture-pane', '-t', self.tmux_session, '-p', '-S', f'-{history_lines}'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + return result.stdout + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to capture tmux output: {e}") + + def get_cursor_position(self): + """Get cursor position to detect if command is still running.""" + try: + result = subprocess.run( + ['tmux', 'display-message', '-t', self.tmux_session, '-p', '#{cursor_x},#{cursor_y}'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + return result.stdout.strip() + except subprocess.CalledProcessError: + return None + + def wait_for_command_completion(self, timeout=None, check_interval=0.5): + """ + Advanced method to wait for command completion using multiple indicators. + """ + if timeout is None: + timeout = self.max_wait + + start_time = time.time() + last_output_hash = None + last_cursor_pos = None + stable_count = 0 + min_stable_time = 1.5 # Reduced for faster detection + + while time.time() - start_time < timeout: + # Use hash for large outputs to detect changes more efficiently + current_output = self.capture_output(1000) # Smaller buffer for speed + current_output_hash = hash(current_output) + current_cursor = self.get_cursor_position() + + # Check if output and cursor position are stable + if (current_output_hash == last_output_hash and + current_cursor == last_cursor_pos and + current_cursor is not None): + stable_count += 1 + + # If stable for enough cycles, check for prompt + if stable_count >= (min_stable_time / check_interval): + if self._has_prompt_at_end(current_output): + return True + else: + stable_count = 0 + + last_output_hash = current_output_hash + last_cursor_pos = current_cursor + + time.sleep(check_interval) + + return False + + def _has_prompt_at_end(self, output): + if not output.strip(): + return False + + lines = output.strip().split('\n') + if not lines: + return False + + last_line = lines[-1].strip() + + prompt_patterns = [ + r'.*[$#]\s*$', # Basic $ or # prompts + r'.*>\s*$', # > prompts + r'.*@.*:.*[$#]\s*$', # user@host:path$ format + r'.*@.*:.*>\s*$', # user@host:path> format + r'^\S+:\S*[$#]\s*$', # Simple host:path$ format + r'.*\$\s*$', # Ends with $ (catch-all) + r'.*#\s*$', # Ends with # (catch-all) + ] + + for pattern in prompt_patterns: + if re.match(pattern, last_line): + return True + + if len(last_line) < 100 and any(char in last_line for char in ['$', '#', '>', ':']): + if not any(keyword in last_line.lower() for keyword in + ['error', 'warning', 'failed', 'success', 'completed', 'finished']): + return True + + return False + + def run_with_unique_markers(self, command): + """Run command using unique markers - improved version for large outputs.""" + start_marker = f"CMDSTART{uuid.uuid4().hex[:8]}" + end_marker = f"CMDEND{uuid.uuid4().hex[:8]}" + + try: + self.send_command(f"echo '{start_marker}'") + time.sleep(0.5) + + self.send_command(command) + + if not self.wait_for_command_completion(): + raise RuntimeError(f"Command timed out after {self.max_wait}s") + + self.send_command(f"echo '{end_marker}'") + time.sleep(0.8) + + final_output = self.capture_output(50000) + + # Extract content between markers + result = self._extract_between_markers(final_output, start_marker, end_marker, command) + return result + + except Exception as e: + return self.run_simple_fallback(command) + + def _extract_between_markers(self, output, start_marker, end_marker, original_command): + lines = output.splitlines() + start_idx = -1 + end_idx = -1 + + for i, line in enumerate(lines): + if start_marker in line: + start_idx = i + elif end_marker in line and start_idx != -1: + end_idx = i + break + + if start_idx == -1 or end_idx == -1: + return self.run_simple_fallback(original_command) + + extracted_lines = [] + for i in range(start_idx + 1, end_idx): + line = lines[i] + if not self._is_command_echo(line, original_command): + extracted_lines.append(line) + + return '\n'.join(extracted_lines).strip() + + def _is_command_echo(self, line, command): + stripped = line.strip() + if not stripped: + return False + + for prompt_char in ['$', '#', '>']: + if prompt_char in stripped: + after_prompt = stripped.split(prompt_char, 1)[-1].strip() + if after_prompt == command: + return True + + return stripped == command + + def run_simple_fallback(self, command): + try: + subprocess.run(['tmux', 'set-option', '-t', self.tmux_session, 'history-limit', '50000'], + capture_output=True) + + clear_marker = f"__CLEAR_{uuid.uuid4().hex[:8]}__" + self.send_command('clear') + time.sleep(0.3) + self.send_command(f'echo "{clear_marker}"') + time.sleep(0.3) + + self.send_command(command) + + self.wait_for_command_completion() + + end_marker = f"__END_{uuid.uuid4().hex[:8]}__" + self.send_command(f'echo "{end_marker}"') + time.sleep(0.5) + + output = self.capture_output(50000) + + lines = output.splitlines() + start_idx = -1 + end_idx = -1 + + for i, line in enumerate(lines): + if clear_marker in line: + start_idx = i + elif end_marker in line and start_idx != -1: + end_idx = i + break + + if start_idx != -1 and end_idx != -1: + result_lines = lines[start_idx + 1:end_idx] + if result_lines and command in result_lines[0]: + result_lines = result_lines[1:] + result = '\n'.join(result_lines).strip() + else: + result = self._extract_recent_output(output, command) + + subprocess.run(['tmux', 'set-option', '-t', self.tmux_session, 'history-limit', '10000'], + capture_output=True) + + return result + + except Exception as e: + subprocess.run(['tmux', 'set-option', '-t', self.tmux_session, 'history-limit', '10000'], + capture_output=True) + raise RuntimeError(f"Error executing command: {e}") + + def _extract_recent_output(self, output, command): + lines = output.splitlines() + + for i in range(len(lines) - 1, -1, -1): + line = lines[i] + if command in line and any(prompt in line for prompt in ['$', '#', '>', '└─']): + return '\n'.join(lines[i + 1:]).strip() + + return '\n'.join(lines[-50:]).strip() if lines else "" + + def run_with_timeout(self, command, timeout=60): + old_max_wait = self.max_wait + self.max_wait = timeout + try: + return self.run(command) + finally: + self.max_wait = old_max_wait + + def interrupt_command(self): + try: + subprocess.run(['tmux', 'send-keys', '-t', self.tmux_session, 'C-c'], check=True) + time.sleep(1) + return True + except subprocess.CalledProcessError: + return False + + def check_session(self): + try: + result = subprocess.run( + ['tmux', 'list-sessions', '-F', '#{session_name}'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + + session_names = result.stdout.strip().split('\n') + return self.tmux_session in session_names + + except subprocess.CalledProcessError: + return False + + def get_session_info(self): + try: + result = subprocess.run( + ['tmux', 'display-message', '-t', self.tmux_session, '-p', + 'Session: #{session_name}, Window: #{window_name}, Pane: #{pane_index}'], + stdout=subprocess.PIPE, + text=True, + check=True + ) + return result.stdout.strip() + except subprocess.CalledProcessError: + return "Session info unavailable" +