#
# This file is part of libdebug Python library (https://github.com/libdebug/libdebug).
# Copyright (c) 2023-2024 Roberto Alessandro Bertolini, Gabriele Digregorio, Francesco Panebianco. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for details.
#
from __future__ import annotations
import functools
import os
import signal
from pathlib import Path
from queue import Queue
from subprocess import Popen
from threading import Thread, current_thread
from typing import TYPE_CHECKING
import psutil
from libdebug.architectures.syscall_hijacking_provider import syscall_hijacking_provider
from libdebug.builtin.antidebug_syscall_hook import on_enter_ptrace, on_exit_ptrace
from libdebug.builtin.pretty_print_syscall_hook import pprint_on_enter, pprint_on_exit
from libdebug.data.breakpoint import Breakpoint
from libdebug.data.memory_view import MemoryView
from libdebug.data.signal_hook import SignalHook
from libdebug.data.syscall_hook import SyscallHook
from libdebug.debugger.internal_debugger_instance_manager import (
extend_internal_debugger,
link_to_internal_debugger,
)
from libdebug.interfaces.interface_helper import provide_debugging_interface
from libdebug.liblog import liblog
from libdebug.state.resume_context import ResumeContext
from libdebug.utils.debugger_wrappers import (
background_alias,
change_state_function_process,
change_state_function_thread,
)
from libdebug.utils.debugging_utils import (
check_absolute_address,
normalize_and_validate_address,
resolve_symbol_in_maps,
)
from libdebug.utils.libcontext import libcontext
from libdebug.utils.print_style import PrintStyle
from libdebug.utils.signal_utils import (
resolve_signal_name,
resolve_signal_number,
)
from libdebug.utils.syscall_utils import (
get_all_syscall_numbers,
resolve_syscall_name,
resolve_syscall_number,
)
if TYPE_CHECKING:
from collections.abc import Callable
from libdebug.data.memory_map import MemoryMap
from libdebug.interfaces.debugging_interface import DebuggingInterface
from libdebug.state.thread_context import ThreadContext
from libdebug.utils.pipe_manager import PipeManager
THREAD_TERMINATE = -1
GDB_GOBACK_LOCATION = str((Path(__file__).parent / "utils" / "gdb.py").resolve())
[docs]
class InternalDebugger:
"""A class that holds the global debugging state."""
aslr_enabled: bool
"""A flag that indicates if ASLR is enabled or not."""
argv: list[str]
"""The command line arguments of the debugged process."""
env: dict[str, str] | None
"""The environment variables of the debugged process."""
escape_antidebug: bool
"""A flag that indicates if the debugger should escape anti-debugging techniques."""
autoreach_entrypoint: bool
"""A flag that indicates if the debugger should automatically reach the entry point of the debugged process."""
auto_interrupt_on_command: bool
"""A flag that indicates if the debugger should automatically interrupt the debugged process when a command is issued."""
breakpoints: dict[int, Breakpoint]
"""A dictionary of all the breakpoints set on the process.
Key: the address of the breakpoint."""
syscall_hooks: dict[int, SyscallHook]
"""A dictionary of all the syscall hooks set on the process.
Key: the syscall number."""
signal_hooks: dict[int, SignalHook]
"""A dictionary of all the signal hooks set on the process.
Key: the signal number."""
signals_to_block: list[int]
"""The signals to not forward to the process."""
syscalls_to_pprint: list[int] | None
"""The syscalls to pretty print."""
syscalls_to_not_pprint: list[int] | None
"""The syscalls to not pretty print."""
threads: list[ThreadContext]
"""A list of all the threads of the debugged process."""
process_id: int
"""The PID of the debugged process."""
pipe_manager: PipeManager
"""The pipe manager used to communicate with the debugged process."""
memory: MemoryView
"""The memory view of the debugged process."""
debugging_interface: DebuggingInterface
"""The debugging interface used to communicate with the debugged process."""
instanced: bool = False
"""Whether the process was started and has not been killed yet."""
pprint_syscalls: bool
"""A flag that indicates if the debugger should pretty print syscalls."""
resume_context: ResumeContext
"""Context that indicates if the debugger should resume the debugged process."""
_polling_thread: Thread | None
"""The background thread used to poll the process for state change."""
_polling_thread_command_queue: Queue | None
"""The queue used to send commands to the background thread."""
_polling_thread_response_queue: Queue | None
"""The queue used to receive responses from the background thread."""
_is_running: bool
"""The overall state of the debugged process. True if the process is running, False otherwise."""
def __init__(self: InternalDebugger) -> None:
"""Initialize the context."""
# These must be reinitialized on every call to "debugger"
self.aslr_enabled = False
self.autoreach_entrypoint = True
self.argv = []
self.env = {}
self.escape_antidebug = False
self.breakpoints = {}
self.syscall_hooks = {}
self.signal_hooks = {}
self.syscalls_to_pprint = None
self.syscalls_to_not_pprint = None
self.signals_to_block = []
self.pprint_syscalls = False
self.pipe_manager = None
self.process_id = 0
self.threads = list()
self.instanced = False
self._is_running = False
self.resume_context = ResumeContext()
self.__polling_thread_command_queue = Queue()
self.__polling_thread_response_queue = Queue()
[docs]
def clear(self: InternalDebugger) -> None:
"""Reinitializes the context, so it is ready for a new run."""
# These must be reinitialized on every call to "run"
self.breakpoints.clear()
self.syscall_hooks.clear()
self.signal_hooks.clear()
self.syscalls_to_pprint = None
self.syscalls_to_not_pprint = None
self.signals_to_block.clear()
self.pprint_syscalls = False
self.pipe_manager = None
self.process_id = 0
self.threads.clear()
self.instanced = False
self._is_running = False
self.resume_context.clear()
[docs]
def start_up(self: InternalDebugger) -> None:
"""Starts up the context."""
# The context is linked to itself
link_to_internal_debugger(self, self)
self.start_processing_thread()
with extend_internal_debugger(self):
self.debugging_interface = provide_debugging_interface()
self.memory = MemoryView(self._peek_memory, self._poke_memory)
[docs]
def start_processing_thread(self: InternalDebugger) -> None:
"""Starts the thread that will poll the traced process for state change."""
# Set as daemon so that the Python interpreter can exit even if the thread is still running
self.__polling_thread = Thread(
target=self.__polling_thread_function,
name="libdebug__polling_thread",
daemon=True,
)
self.__polling_thread.start()
def _background_invalid_call(self: InternalDebugger) -> None:
"""Raises an error when an invalid call is made in background mode."""
raise RuntimeError("This method is not available in a callback.")
[docs]
def run(self: InternalDebugger) -> None:
"""Starts the process and waits for it to stop."""
if not self.argv:
raise RuntimeError("No binary file specified.")
if not Path(self.argv[0]).is_file():
raise RuntimeError(f"File {self.argv[0]} does not exist.")
if not os.access(self.argv[0], os.X_OK):
raise RuntimeError(
f"File {self.argv[0]} is not executable.",
)
if self.instanced:
liblog.debugger("Process already running, stopping it before restarting.")
self.kill()
if self.threads:
self.clear()
self.debugging_interface.reset()
self.instanced = True
if not self.__polling_thread_command_queue.empty():
raise RuntimeError("Polling thread command queue not empty.")
self.__polling_thread_command_queue.put((self.__threaded_run, ()))
if self.escape_antidebug:
liblog.debugger("Enabling anti-debugging escape mechanism.")
self._enable_antidebug_escaping()
self._join_and_check_status()
if not self.pipe_manager:
raise RuntimeError("Something went wrong during pipe initialization.")
return self.pipe_manager
[docs]
def attach(self: InternalDebugger, pid: int) -> None:
"""Attaches to an existing process."""
if self.instanced:
liblog.debugger("Process already running, stopping it before restarting.")
self.kill()
if self.threads:
self.clear()
self.debugging_interface.reset()
self.instanced = True
if not self.__polling_thread_command_queue.empty():
raise RuntimeError("Polling thread command queue not empty.")
self.__polling_thread_command_queue.put((self.__threaded_attach, (pid,)))
self._join_and_check_status()
[docs]
def detach(self: InternalDebugger) -> None:
"""Detaches from the process."""
if not self.instanced:
raise RuntimeError("Process not running, cannot detach.")
self._ensure_process_stopped()
self.__polling_thread_command_queue.put((self.__threaded_detach, ()))
self._join_and_check_status()
[docs]
@background_alias(_background_invalid_call)
def kill(self: InternalDebugger) -> None:
"""Kills the process."""
try:
self._ensure_process_stopped()
except OSError:
# This exception might occur if the process has already died
liblog.debugger("OSError raised during kill")
self.__polling_thread_command_queue.put((self.__threaded_kill, ()))
self.instanced = False
if self.pipe_manager:
self.pipe_manager.close()
self._join_and_check_status()
[docs]
def terminate(self: InternalDebugger) -> None:
"""Terminates the background thread.
The debugger object cannot be used after this method is called.
This method should only be called to free up resources when the debugger object is no longer needed.
"""
if self.__polling_thread is not None:
self.__polling_thread_command_queue.put((THREAD_TERMINATE, ()))
self.__polling_thread.join()
del self.__polling_thread
self.__polling_thread = None
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def cont(self: InternalDebugger) -> None:
"""Continues the process.
Args:
auto_wait (bool, optional): Whether to automatically wait for the process to stop after continuing. Defaults to True.
"""
self.__polling_thread_command_queue.put((self.__threaded_cont, ()))
self._join_and_check_status()
self.__polling_thread_command_queue.put((self.__threaded_wait, ()))
[docs]
@background_alias(_background_invalid_call)
def interrupt(self: InternalDebugger) -> None:
"""Interrupts the process."""
if not self.instanced:
raise RuntimeError("Process not running, cannot interrupt.")
# We have to ensure that at least one thread is alive before executing the method
if self.threads[0].dead:
raise RuntimeError("All threads are dead.")
if not self.running:
return
self.resume_context.force_interrupt = True
os.kill(self.process_id, signal.SIGSTOP)
self.wait()
[docs]
@background_alias(_background_invalid_call)
def wait(self: InternalDebugger) -> None:
"""Waits for the process to stop."""
if not self.instanced:
raise RuntimeError("Process not running, cannot wait.")
self._join_and_check_status()
if self.threads[0].dead or not self.running:
# Most of the time the function returns here, as there was a wait already
# queued by the previous command
return
self.__polling_thread_command_queue.put((self.__threaded_wait, ()))
self._join_and_check_status()
[docs]
def maps(self: InternalDebugger) -> list[MemoryMap]:
"""Returns the memory maps of the process."""
self._ensure_process_stopped()
return self.debugging_interface.maps()
[docs]
def print_maps(self: InternalDebugger) -> None:
"""Prints the memory maps of the process."""
self._ensure_process_stopped()
maps = self.maps()
for memory_map in maps:
if "x" in memory_map.permissions:
print(f"{PrintStyle.RED}{memory_map}{PrintStyle.RESET}")
elif "w" in memory_map.permissions:
print(f"{PrintStyle.YELLOW}{memory_map}{PrintStyle.RESET}")
elif "r" in memory_map.permissions:
print(f"{PrintStyle.GREEN}{memory_map}{PrintStyle.RESET}")
else:
print(memory_map)
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def breakpoint(
self: InternalDebugger,
position: int | str,
hardware: bool = False,
condition: str | None = None,
length: int = 1,
callback: None | Callable[[ThreadContext, Breakpoint], None] = None,
file: str = "default",
) -> Breakpoint:
"""Sets a breakpoint at the specified location.
Args:
position (int | bytes): The location of the breakpoint.
hardware (bool, optional): Whether the breakpoint should be hardware-assisted or purely software.
Defaults to False.
condition (str, optional): The trigger condition for the breakpoint. Defaults to None.
length (int, optional): The length of the breakpoint. Only for watchpoints. Defaults to 1.
callback (Callable[[ThreadContext, Breakpoint], None], optional): A callback to be called when the
breakpoint is hit. Defaults to None.
file (str, optional): The user-defined backing file to resolve the address in. Defaults to "default"
(libdebug will first try to solve the address as an absolute address, then as a relative address w.r.t.
the "binary" map file).
"""
if isinstance(position, str):
address = self.resolve_symbol(position, file)
else:
address = self.resolve_address(position, file)
position = hex(address)
if condition:
if not hardware:
raise ValueError(
"Breakpoint condition is supported only for hardware watchpoints.",
)
if condition.lower() not in ["w", "rw", "x"]:
raise ValueError(
"Invalid condition for watchpoints. Supported conditions are 'r', 'rw', 'x'.",
)
if length not in [1, 2, 4, 8]:
raise ValueError(
"Invalid length for watchpoints. Supported lengths are 1, 2, 4, 8.",
)
if hardware and not condition:
condition = "x"
bp = Breakpoint(address, position, 0, hardware, callback, condition, length)
link_to_internal_debugger(bp, self)
self.__polling_thread_command_queue.put((self.__threaded_breakpoint, (bp,)))
self._join_and_check_status()
# the breakpoint should have been set by interface
if address not in self.breakpoints:
raise RuntimeError("Something went wrong while inserting the breakpoint.")
return bp
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def hook_signal(
self: InternalDebugger,
signal_to_hook: int | str,
callback: None | Callable[[ThreadContext, int], None] = None,
hook_hijack: bool = True,
) -> SignalHook:
"""Hooks a signal in the target process.
Args:
signal_to_hook (int | str): The signal to hook.
callback (Callable[[ThreadContext, int], None], optional): A callback to be called when the signal is
received. Defaults to None.
hook_hijack (bool, optional): Whether to execute the hook/hijack of the new signal after an hijack or not.
Defaults to False.
"""
if callback is None:
raise ValueError("A callback must be specified.")
if isinstance(signal_to_hook, str):
signal_number = resolve_signal_number(signal_to_hook)
elif isinstance(signal_to_hook, int):
signal_number = signal_to_hook
else:
raise TypeError("signal must be an int or a str")
match signal_number:
case signal.SIGKILL:
raise ValueError(
f"Cannot hook SIGKILL ({signal_number}) as it cannot be caught or ignored. This is a kernel restriction."
)
case signal.SIGSTOP:
raise ValueError(
f"Cannot hook SIGSTOP ({signal_number}) as it is used by the debugger or ptrace for their internal operations."
)
case signal.SIGTRAP:
raise ValueError(
f"Cannot hook SIGTRAP ({signal_number}) as it is used by the debugger or ptrace for their internal operations."
)
if signal_number in self.signal_hooks:
liblog.warning(
f"Signal {resolve_signal_name(signal_number)} ({signal_number}) is already hooked. Overriding it.",
)
self.unhook_signal(self.signal_hooks[signal_number])
if not isinstance(hook_hijack, bool):
raise TypeError("hook_hijack must be a boolean")
hook = SignalHook(signal_number, callback, hook_hijack)
link_to_internal_debugger(hook, self)
self.__polling_thread_command_queue.put((self.__threaded_signal_hook, (hook,)))
self._join_and_check_status()
return hook
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def unhook_signal(self: InternalDebugger, hook: SignalHook) -> None:
"""Unhooks a signal in the target process.
Args:
hook (SignalHook): The signal hook to unhook.
"""
if hook.signal_number not in self.signal_hooks:
raise ValueError(f"Signal {hook.signal_number} is not hooked.")
hook = self.signal_hooks[hook.signal_number]
self.__polling_thread_command_queue.put((self.__threaded_signal_unhook, (hook,)))
self._join_and_check_status()
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def hijack_signal(
self: InternalDebugger,
original_signal: int | str,
new_signal: int | str,
hook_hijack: bool = True,
) -> None:
"""Hijacks a signal in the target process.
Args:
original_signal (int | str): The signal to hijack.
new_signal (int | str): The signal to replace the original signal with.
hook_hijack (bool, optional): Whether to execute the hook/hijack of the new signal after the hijack or not.
Defaults to True.
"""
if isinstance(original_signal, str):
original_signal_number = resolve_signal_number(original_signal)
else:
original_signal_number = original_signal
new_signal_number = resolve_signal_number(new_signal) if isinstance(new_signal, str) else new_signal
if original_signal_number == new_signal_number:
raise ValueError(
"The original signal and the new signal must be different during hijacking.",
)
def callback(thread: ThreadContext, _: int) -> None:
"""The callback to execute when the signal is received."""
thread.signal = new_signal_number
return self.hook_signal(original_signal_number, callback, hook_hijack)
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def hook_syscall(
self: InternalDebugger,
syscall: int | str,
on_enter: Callable[[ThreadContext, int], None] | None = None,
on_exit: Callable[[ThreadContext, int], None] | None = None,
hook_hijack: bool = True,
) -> SyscallHook:
"""Hooks a syscall in the target process.
Args:
syscall (int | str): The syscall name or number to hook.
on_enter (Callable[[ThreadContext, int], None], optional): The callback to execute when the syscall is entered. Defaults to None.
on_exit (Callable[[ThreadContext, int], None], optional): The callback to execute when the syscall is exited. Defaults to None.
hook_hijack (bool, optional): Whether the syscall after the hijack should be hooked. Defaults to True.
Returns:
SyscallHook: The syscall hook object.
"""
if on_enter is None and on_exit is None:
raise ValueError(
"At least one callback between on_enter and on_exit should be specified.",
)
syscall_number = resolve_syscall_number(syscall) if isinstance(syscall, str) else syscall
if not isinstance(hook_hijack, bool):
raise TypeError("hook_hijack must be a boolean")
# Check if the syscall is already hooked (by the user or by the pretty print hook)
if syscall_number in self.syscall_hooks:
hook = self.syscall_hooks[syscall_number]
if hook.on_enter_user or hook.on_exit_user:
liblog.warning(
f"Syscall {resolve_syscall_name(syscall_number)} is already hooked by a user-defined hook. Overriding it.",
)
hook.on_enter_user = on_enter
hook.on_exit_user = on_exit
hook.hook_hijack = hook_hijack
hook.enabled = True
else:
hook = SyscallHook(
syscall_number,
on_enter,
on_exit,
None,
None,
hook_hijack,
)
link_to_internal_debugger(hook, self)
self.__polling_thread_command_queue.put(
(self.__threaded_syscall_hook, (hook,)),
)
self._join_and_check_status()
return hook
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def unhook_syscall(self: InternalDebugger, hook: SyscallHook) -> None:
"""Unhooks a syscall in the target process.
Args:
hook (SyscallHook): The syscall hook to unhook.
"""
if hook.syscall_number not in self.syscall_hooks:
raise ValueError(f"Syscall {hook.syscall_number} is not hooked.")
hook = self.syscall_hooks[hook.syscall_number]
if hook.on_enter_pprint or hook.on_exit_pprint:
hook.on_enter_user = None
hook.on_exit_user = None
else:
self.__polling_thread_command_queue.put(
(self.__threaded_syscall_unhook, (hook,)),
)
self._join_and_check_status()
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def hijack_syscall(
self: InternalDebugger,
original_syscall: int | str,
new_syscall: int | str,
hook_hijack: bool = True,
**kwargs: int,
) -> SyscallHook:
"""Hijacks a syscall in the target process.
Args:
original_syscall (int | str): The syscall name or number to hijack.
new_syscall (int | str): The syscall name or number to replace the original syscall with.
hook_hijack (bool, optional): Whether the syscall after the hijack should be hooked. Defaults to True.
**kwargs: (int, optional): The arguments to pass to the new syscall.
Returns:
SyscallHook: The syscall hook object.
"""
if set(kwargs) - syscall_hijacking_provider().allowed_args:
raise ValueError("Invalid keyword arguments in syscall hijack")
if isinstance(original_syscall, str):
original_syscall_number = resolve_syscall_number(original_syscall)
else:
original_syscall_number = original_syscall
new_syscall_number = resolve_syscall_number(new_syscall) if isinstance(new_syscall, str) else new_syscall
if original_syscall_number == new_syscall_number:
raise ValueError(
"The original syscall and the new syscall must be different during hijacking.",
)
on_enter = syscall_hijacking_provider().create_hijacker(
new_syscall_number,
**kwargs,
)
# Check if the syscall is already hooked (by the user or by the pretty print hook)
if original_syscall_number in self.syscall_hooks:
hook = self.syscall_hooks[original_syscall_number]
if hook.on_enter_user or hook.on_exit_user:
liblog.warning(
f"Syscall {original_syscall_number} is already hooked by a user-defined hook. Overriding it.",
)
hook.on_enter_user = on_enter
hook.on_exit_user = None
hook.hook_hijack = hook_hijack
hook.enabled = True
else:
hook = SyscallHook(
original_syscall_number,
on_enter,
None,
None,
None,
hook_hijack,
)
link_to_internal_debugger(hook, self)
self.__polling_thread_command_queue.put(
(self.__threaded_syscall_hook, (hook,)),
)
self._join_and_check_status()
return hook
[docs]
@background_alias(_background_invalid_call)
@change_state_function_process
def migrate_to_gdb(self: InternalDebugger, open_in_new_process: bool = True) -> None:
"""Migrates the current debugging session to GDB."""
# TODO: not needed?
self.interrupt()
self.__polling_thread_command_queue.put((self.__threaded_migrate_to_gdb, ()))
self._join_and_check_status()
if open_in_new_process and libcontext.terminal:
self._open_gdb_in_new_process()
else:
if open_in_new_process:
liblog.warning(
"Cannot open in a new process. Please configure the terminal in libcontext.terminal.",
)
self._open_gdb_in_shell()
self.__polling_thread_command_queue.put((self.__threaded_migrate_from_gdb, ()))
self._join_and_check_status()
# We have to ignore a SIGSTOP signal that is sent by GDB
# TODO: once we have signal handling, we should remove this
self.step()
def _craft_gdb_migration_command(self: InternalDebugger) -> list[str]:
"""Crafts the command to migrate to GDB."""
gdb_command = [
"/bin/gdb",
"-q",
"--pid",
str(self.process_id),
"-ex",
"source " + GDB_GOBACK_LOCATION,
"-ex",
"ni",
"-ex",
"ni",
]
bp_args = []
for bp in self.breakpoints.values():
if bp.enabled:
bp_args.append("-ex")
if bp.hardware and bp.condition == "rw":
bp_args.append(f"awatch *(int{bp.length * 8}_t *) {bp.address:0x}")
elif bp.hardware and bp.condition == "w":
bp_args.append(f"watch *(int{bp.length * 8}_t *) {bp.address:0x}")
elif bp.hardware:
bp_args.append("hb *" + hex(bp.address))
else:
bp_args.append("b *" + hex(bp.address))
if self.instruction_pointer == bp.address:
# We have to enqueue an additional continue
bp_args.append("-ex")
bp_args.append("ni")
return gdb_command + bp_args
def _open_gdb_in_new_process(self: InternalDebugger) -> None:
"""Opens GDB in a new process following the configuration in libcontext.terminal."""
args = self._craft_gdb_migration_command()
initial_pid = Popen(libcontext.terminal + args).pid
os.waitpid(initial_pid, 0)
liblog.debugger("Waiting for GDB process to terminate...")
for proc in psutil.process_iter():
try:
cmdline = proc.cmdline()
except psutil.ZombieProcess:
# This is a zombie process, which psutil tracks but we cannot interact with
continue
if args == cmdline:
gdb_process = proc
break
else:
raise RuntimeError("GDB process not found.")
while gdb_process.is_running() and gdb_process.status() != psutil.STATUS_ZOMBIE:
# As the GDB process is in a different group, we do not have the authority to wait on it
# So we must keep polling it until it is no longer running
pass
def _open_gdb_in_shell(self: InternalDebugger) -> None:
"""Open GDB in the current shell."""
gdb_pid = os.fork()
if gdb_pid == 0: # This is the child process.
args = self._craft_gdb_migration_command()
os.execv("/bin/gdb", args)
else: # This is the parent process.
os.waitpid(gdb_pid, 0) # Wait for the child process to finish.
def _background_step(self: InternalDebugger, thread: ThreadContext) -> None:
"""Executes a single instruction of the process.
Args:
thread (ThreadContext): The thread to step. Defaults to None.
"""
self.__threaded_step(thread)
self.__threaded_wait()
[docs]
@background_alias(_background_step)
@change_state_function_thread
def step(self: InternalDebugger, thread: ThreadContext) -> None:
"""Executes a single instruction of the process.
Args:
thread (ThreadContext): The thread to step. Defaults to None.
"""
self._ensure_process_stopped()
self.__polling_thread_command_queue.put((self.__threaded_step, (thread,)))
self.__polling_thread_command_queue.put((self.__threaded_wait, ()))
self._join_and_check_status()
def _background_step_until(
self: InternalDebugger,
thread: ThreadContext,
position: int | str,
max_steps: int = -1,
file: str = "default",
) -> None:
"""Executes instructions of the process until the specified location is reached.
Args:
thread (ThreadContext): The thread to step. Defaults to None.
position (int | bytes): The location to reach.
max_steps (int, optional): The maximum number of steps to execute. Defaults to -1.
file (str, optional): The user-defined backing file to resolve the address in. Defaults to "default"
(libdebug will first try to solve the address as an absolute address, then as a relative address w.r.t.
the "binary" map file).
"""
if isinstance(position, str):
address = self.resolve_symbol(position, file)
else:
address = self.resolve_address(position, file)
self.__threaded_step_until(thread, address, max_steps)
[docs]
@background_alias(_background_step_until)
@change_state_function_thread
def step_until(
self: InternalDebugger,
thread: ThreadContext,
position: int | str,
max_steps: int = -1,
file: str = "default",
) -> None:
"""Executes instructions of the process until the specified location is reached.
Args:
thread (ThreadContext): The thread to step. Defaults to None.
position (int | bytes): The location to reach.
max_steps (int, optional): The maximum number of steps to execute. Defaults to -1.
file (str, optional): The user-defined backing file to resolve the address in. Defaults to "default"
(libdebug will first try to solve the address as an absolute address, then as a relative address w.r.t.
the "binary" map file).
"""
if isinstance(position, str):
address = self.resolve_symbol(position, file)
else:
address = self.resolve_address(position, file)
arguments = (
thread,
address,
max_steps,
)
self.__polling_thread_command_queue.put((self.__threaded_step_until, arguments))
self._join_and_check_status()
def _background_finish(
self: InternalDebugger,
thread: ThreadContext,
heuristic: str = "backtrace",
) -> None:
"""Continues execution until the current function returns or the process stops.
The command requires a heuristic to determine the end of the function. The available heuristics are:
- `backtrace`: The debugger will place a breakpoint on the saved return address found on the stack and continue execution on all threads.
- `step-mode`: The debugger will step on the specified thread until the current function returns. This will be slower.
Args:
thread (ThreadContext): The thread to finish.
heuristic (str, optional): The heuristic to use. Defaults to "backtrace".
"""
self.__threaded_finish(thread, heuristic)
[docs]
@background_alias(_background_finish)
@change_state_function_thread
def finish(self: InternalDebugger, thread: ThreadContext, heuristic: str = "backtrace") -> None:
"""Continues execution until the current function returns or the process stops.
The command requires a heuristic to determine the end of the function. The available heuristics are:
- `backtrace`: The debugger will place a breakpoint on the saved return address found on the stack and continue execution on all threads.
- `step-mode`: The debugger will step on the specified thread until the current function returns. This will be slower.
Args:
thread (ThreadContext): The thread to finish.
heuristic (str, optional): The heuristic to use. Defaults to "backtrace".
"""
self.__polling_thread_command_queue.put(
(self.__threaded_finish, (thread, heuristic)),
)
self._join_and_check_status()
[docs]
def enable_pretty_print(
self: InternalDebugger,
) -> SyscallHook:
"""Hooks a syscall in the target process to pretty prints its arguments and return value."""
self._ensure_process_stopped()
syscall_numbers = get_all_syscall_numbers()
for syscall_number in syscall_numbers:
# Check if the syscall is already hooked (by the user or by the pretty print hook)
if syscall_number in self.syscall_hooks:
hook = self.syscall_hooks[syscall_number]
if syscall_number not in (self.syscalls_to_not_pprint or []) and syscall_number in (
self.syscalls_to_pprint or syscall_numbers
):
hook.on_enter_pprint = pprint_on_enter
hook.on_exit_pprint = pprint_on_exit
else:
# Remove the pretty print hook from previous pretty print calls
hook.on_enter_pprint = None
hook.on_exit_pprint = None
elif syscall_number not in (self.syscalls_to_not_pprint or []) and syscall_number in (
self.syscalls_to_pprint or syscall_numbers
):
hook = SyscallHook(
syscall_number,
None,
None,
pprint_on_enter,
pprint_on_exit,
)
link_to_internal_debugger(hook, self)
self.__polling_thread_command_queue.put(
(self.__threaded_syscall_hook, (hook,)),
)
self._join_and_check_status()
[docs]
def disable_pretty_print(self: InternalDebugger) -> None:
"""Unhooks all syscalls that are pretty printed."""
self._ensure_process_stopped()
installed_hooks = list(self.syscall_hooks.values())
for hook in installed_hooks:
if hook.on_enter_pprint or hook.on_exit_pprint:
if hook.on_enter_user or hook.on_exit_user:
hook.on_enter_pprint = None
hook.on_exit_pprint = None
else:
self.__polling_thread_command_queue.put(
(self.__threaded_syscall_unhook, (hook,)),
)
self._join_and_check_status()
[docs]
def insert_new_thread(self: InternalDebugger, thread: ThreadContext) -> None:
"""Insert a new thread in the context.
Args:
thread (ThreadContext): the thread to insert.
"""
if thread in self.threads:
raise RuntimeError("Thread already registered.")
self.threads.append(thread)
[docs]
def set_thread_as_dead(
self: InternalDebugger,
thread_id: int,
exit_code: int | None,
exit_signal: int | None,
) -> None:
"""Set a thread as dead and update its exit code and exit signal.
Args:
thread_id (int): the ID of the thread to set as dead.
exit_code (int, optional): the exit code of the thread.
exit_signal (int, optional): the exit signal of the thread.
"""
for thread in self.threads:
if thread.thread_id == thread_id:
thread.set_as_dead()
thread._exit_code = exit_code
thread._exit_signal = exit_signal
break
[docs]
def get_thread_by_id(self: InternalDebugger, thread_id: int) -> ThreadContext:
"""Get a thread by its ID.
Args:
thread_id (int): the ID of the thread to get.
Returns:
ThreadContext: the thread with the specified ID.
"""
for thread in self.threads:
if thread.thread_id == thread_id and not thread.dead:
return thread
return None
[docs]
def resolve_address(self: InternalDebugger, address: int, backing_file: str) -> int:
"""Normalizes and validates the specified address.
Args:
address (int): The address to normalize and validate.
backing_file (str): The backing file to resolve the address in.
Returns:
int: The normalized and validated address.
Raises:
ValueError: If the substring `backing_file` is present in multiple backing files.
"""
maps = self.debugging_interface.maps()
if backing_file in ["default", "absolute"]:
if check_absolute_address(address, maps):
# If the address is absolute, we can return it directly
return address
elif backing_file == "absolute":
# The address is explicitly an absolute address but we did not find it
raise ValueError(
"The specified absolute address does not exist. Check the address or specify a backing file.",
)
else:
# If the address was not found and the backing file is not "absolute",
# we have to assume it is in the main map
backing_file = self._get_process_full_path()
liblog.debugger(
f"No backing file specified and no correspondant absolute address for {hex(address)}. Assuming {backing_file}."
)
elif (
backing_file == (full_backing_path := self._get_process_full_path())
or backing_file == "binary"
or backing_file == self._get_process_name()
):
backing_file = full_backing_path
filtered_maps = []
unique_files = set()
for vmap in maps:
if backing_file in vmap.backing_file:
filtered_maps.append(vmap)
unique_files.add(vmap.backing_file)
if len(unique_files) > 1:
raise ValueError(
f"The substring {backing_file} is present in multiple, different backing files. The address resolution cannot be accurate."
)
if not filtered_maps:
raise ValueError(f"The specified string {backing_file} does not correspond to any backing file.")
return normalize_and_validate_address(address, filtered_maps)
[docs]
def resolve_symbol(self: InternalDebugger, symbol: str, backing_file: str) -> int:
"""Resolves the address of the specified symbol.
Args:
symbol (str): The symbol to resolve.
backing_file (str): The backing file to resolve the symbol in.
Returns:
int: The address of the symbol.
"""
maps = self.debugging_interface.maps()
if backing_file == "absolute":
raise ValueError("Cannot use `absolute` backing file with symbols.")
if backing_file == "default":
# If no explicit backing file is specified, we have to assume it is in the main map
backing_file = self._get_process_full_path()
liblog.debugger(f"No backing file specified for the symbol {symbol}. Assuming {backing_file}.")
elif (
backing_file == (full_backing_path := self._get_process_full_path())
or backing_file == "binary"
or backing_file == self._get_process_name()
):
backing_file = full_backing_path
filtered_maps = []
unique_files = set()
for vmap in maps:
if backing_file in vmap.backing_file:
filtered_maps.append(vmap)
unique_files.add(vmap.backing_file)
if len(unique_files) > 1:
raise ValueError(
f"The substring {backing_file} is present in multiple, different backing files. The address resolution cannot be accurate."
)
if not filtered_maps:
raise ValueError(f"The specified string {backing_file} does not correspond to any backing file.")
return resolve_symbol_in_maps(symbol, filtered_maps)
def _background_ensure_process_stopped(self: InternalDebugger) -> None:
"""Validates the state of the process."""
# In background mode, there shouldn't be anything to do here
@background_alias(_background_ensure_process_stopped)
def _ensure_process_stopped(self: InternalDebugger) -> None:
"""Validates the state of the process."""
if not self.running:
return
if self.auto_interrupt_on_command:
self.interrupt()
self._join_and_check_status()
def _is_in_background(self: InternalDebugger) -> None:
return current_thread() == self.__polling_thread
def __polling_thread_function(self: InternalDebugger) -> None:
"""This function is run in a thread. It is used to poll the process for state change."""
while True:
# Wait for the main thread to signal a command to execute
command, args = self.__polling_thread_command_queue.get()
if command == THREAD_TERMINATE:
# Signal that the command has been executed
self.__polling_thread_command_queue.task_done()
return
# Execute the command
try:
return_value = command(*args)
except BaseException as e:
return_value = e
if return_value is not None:
self.__polling_thread_response_queue.put(return_value)
# Signal that the command has been executed
self.__polling_thread_command_queue.task_done()
if return_value is not None:
self.__polling_thread_response_queue.join()
def _join_and_check_status(self: InternalDebugger) -> None:
# Wait for the background thread to signal "task done" before returning
# We don't want any asynchronous behaviour here
self.__polling_thread_command_queue.join()
# Check for any exceptions raised by the background thread
if not self.__polling_thread_response_queue.empty():
response = self.__polling_thread_response_queue.get()
self.__polling_thread_response_queue.task_done()
if response is not None:
raise response
@functools.cache
def _get_process_full_path(self: InternalDebugger) -> str:
"""Get the full path of the process.
Returns:
str: the full path of the process.
"""
return str(Path(f"/proc/{self.process_id}/exe").readlink())
@functools.cache
def _get_process_name(self: InternalDebugger) -> str:
"""Get the name of the process.
Returns:
str: the name of the process.
"""
with Path(f"/proc/{self.process_id}/comm").open() as f:
return f.read().strip()
def __threaded_run(self: InternalDebugger) -> None:
liblog.debugger("Starting process %s.", self.argv[0])
self.debugging_interface.run()
self.set_stopped()
def __threaded_attach(self: InternalDebugger, pid: int) -> None:
liblog.debugger("Attaching to process %d.", pid)
self.debugging_interface.attach(pid)
self.set_stopped()
def __threaded_detach(self: InternalDebugger) -> None:
liblog.debugger("Detaching from process %d.", self.process_id)
self.debugging_interface.detach()
self.set_stopped()
def __threaded_kill(self: InternalDebugger) -> None:
if self.argv:
liblog.debugger(
"Killing process %s (%d).",
self.argv[0],
self.process_id,
)
else:
liblog.debugger("Killing process %d.", self.process_id)
self.debugging_interface.kill()
def __threaded_cont(self: InternalDebugger) -> None:
if self.argv:
liblog.debugger(
"Continuing process %s (%d).",
self.argv[0],
self.process_id,
)
else:
liblog.debugger("Continuing process %d.", self.process_id)
self.set_running()
self.debugging_interface.cont()
def __threaded_wait(self: InternalDebugger) -> None:
if self.argv:
liblog.debugger(
"Waiting for process %s (%d) to stop.",
self.argv[0],
self.process_id,
)
else:
liblog.debugger("Waiting for process %d to stop.", self.process_id)
while True:
if self.threads[0].dead:
# All threads are dead
liblog.debugger("All threads dead")
break
self.resume_context.resume = True
self.debugging_interface.wait()
if self.resume_context.resume:
self.debugging_interface.cont()
else:
break
self.set_stopped()
def __threaded_breakpoint(self: InternalDebugger, bp: Breakpoint) -> None:
liblog.debugger("Setting breakpoint at 0x%x.", bp.address)
self.debugging_interface.set_breakpoint(bp)
def __threaded_syscall_hook(self: InternalDebugger, hook: SyscallHook) -> None:
liblog.debugger(f"Hooking syscall {hook.syscall_number}.")
self.debugging_interface.set_syscall_hook(hook)
def __threaded_signal_hook(self: InternalDebugger, hook: SignalHook) -> None:
liblog.debugger(
f"Hooking signal {resolve_signal_name(hook.signal_number)} ({hook.signal_number}).",
)
self.debugging_interface.set_signal_hook(hook)
def __threaded_syscall_unhook(self: InternalDebugger, hook: SyscallHook) -> None:
liblog.debugger(f"Unhooking syscall {hook.syscall_number}.")
self.debugging_interface.unset_syscall_hook(hook)
def __threaded_signal_unhook(self: InternalDebugger, hook: SignalHook) -> None:
liblog.debugger(f"Unhooking syscall {hook.signal_number}.")
self.debugging_interface.unset_signal_hook(hook)
def __threaded_step(self: InternalDebugger, thread: ThreadContext) -> None:
liblog.debugger("Stepping thread %s.", thread.thread_id)
self.debugging_interface.step(thread)
self.set_running()
def __threaded_step_until(
self: InternalDebugger,
thread: ThreadContext,
address: int,
max_steps: int,
) -> None:
liblog.debugger("Stepping thread %s until 0x%x.", thread.thread_id, address)
self.debugging_interface.step_until(thread, address, max_steps)
self.set_stopped()
def __threaded_finish(self: InternalDebugger, thread: ThreadContext, heuristic: str) -> None:
prefix = heuristic.capitalize()
liblog.debugger(f"{prefix} finish on thread %s", thread.thread_id)
self.debugging_interface.finish(thread, heuristic=heuristic)
self.set_stopped()
def __threaded_migrate_to_gdb(self: InternalDebugger) -> None:
self.debugging_interface.migrate_to_gdb()
def __threaded_migrate_from_gdb(self: InternalDebugger) -> None:
self.debugging_interface.migrate_from_gdb()
def __threaded_peek_memory(self: InternalDebugger, address: int) -> bytes | BaseException:
try:
value = self.debugging_interface.peek_memory(address)
# TODO: this is only for amd64
return value.to_bytes(8, "little")
except BaseException as e:
return e
def __threaded_poke_memory(self: InternalDebugger, address: int, data: bytes) -> None:
int_data = int.from_bytes(data, "little")
self.debugging_interface.poke_memory(address, int_data)
@background_alias(__threaded_peek_memory)
def _peek_memory(self: InternalDebugger, address: int) -> bytes:
"""Reads memory from the process."""
if not self.instanced:
raise RuntimeError("Process not running, cannot step.")
if self.running:
# Reading memory while the process is running could lead to concurrency issues
# and corrupted values
liblog.debugger(
"Process is running. Waiting for it to stop before reading memory.",
)
self._ensure_process_stopped()
self.__polling_thread_command_queue.put(
(self.__threaded_peek_memory, (address,)),
)
# We cannot call _join_and_check_status here, as we need the return value which might not be an exception
self.__polling_thread_command_queue.join()
value = self.__polling_thread_response_queue.get()
self.__polling_thread_response_queue.task_done()
if isinstance(value, BaseException):
raise value
return value
@background_alias(__threaded_poke_memory)
def _poke_memory(self: InternalDebugger, address: int, data: bytes) -> None:
"""Writes memory to the process."""
if not self.instanced:
raise RuntimeError("Process not running, cannot step.")
if self.running:
# Reading memory while the process is running could lead to concurrency issues
# and corrupted values
liblog.debugger(
"Process is running. Waiting for it to stop before writing to memory.",
)
self._ensure_process_stopped()
self.__polling_thread_command_queue.put(
(self.__threaded_poke_memory, (address, data)),
)
self._join_and_check_status()
def _enable_antidebug_escaping(self: InternalDebugger) -> None:
"""Enables the anti-debugging escape mechanism."""
hook = SyscallHook(
resolve_syscall_number("ptrace"),
on_enter_ptrace,
on_exit_ptrace,
None,
None,
)
link_to_internal_debugger(hook, self)
self.__polling_thread_command_queue.put((self.__threaded_syscall_hook, (hook,)))
# setup hidden state for the hook
hook._traceme_called = False
hook._command = None
@property
def running(self: InternalDebugger) -> bool:
"""Get the state of the process.
Returns:
bool: True if the process is running, False otherwise.
"""
return self._is_running
[docs]
def set_running(self: InternalDebugger) -> None:
"""Set the state of the process to running."""
self._is_running = True
[docs]
def set_stopped(self: InternalDebugger) -> None:
"""Set the state of the process to stopped."""
self._is_running = False