Source code for libdebug.utils.pipe_manager

#
# This file is part of libdebug Python library (https://github.com/libdebug/libdebug).
# Copyright (c) 2023-2024 Gabriele Digregorio, Roberto Alessandro Bertolini. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for details.
#

from __future__ import annotations

import os
import time
from select import select

from libdebug.liblog import liblog


[docs] class PipeManager: """Class for managing pipes of the child process.""" _instance = None timeout_default: int = 2 def __init__(self: PipeManager, stdin_write: int, stdout_read: int, stderr_read: int) -> None: """Initializes the PipeManager class. Args: stdin_write (int): file descriptor for stdin write. stdout_read (int): file descriptor for stdout read. stderr_read (int): file descriptor for stderr read. """ self.stdin_write: int = stdin_write self.stdout_read: int = stdout_read self.stderr_read: int = stderr_read def _recv( self: PipeManager, numb: int | None = None, timeout: float = timeout_default, stderr: bool = False, ) -> bytes: """Receives at most numb bytes from the child process. Args: numb (int, optional): number of bytes to receive. Defaults to None. timeout (float, optional): timeout in seconds. Defaults to timeout_default. stderr (bool, optional): receive from stderr. Defaults to False. Returns: bytes: received bytes from the child process stdout. Raises: ValueError: numb is negative. RuntimeError: no stdout pipe of the child process. """ pipe_read: int = self.stderr_read if stderr else self.stdout_read if not pipe_read: raise RuntimeError("No pipe of the child process") # Buffer for the received data data_buffer = b"" if numb: # Checking the numb if numb < 0: raise ValueError("The number of bytes to receive must be positive") # Setting the alarm end_time = time.time() + timeout while numb > 0: if end_time is not None and time.time() > end_time: # Timeout reached break # Adjust the timeout for select to the remaining time remaining_time = None if end_time is None else max(0, end_time - time.time()) ready, _, _ = select([pipe_read], [], [], remaining_time) if not ready: # No data ready within the remaining timeout break try: data = os.read(pipe_read, numb) except OSError as e: raise RuntimeError("Broken pipe. Is the child process still running?") from e if not data: # No more data available break numb -= len(data) data_buffer += data else: ready, _, _ = select([pipe_read], [], [], timeout) if ready: # Read all available bytes up to 4096 data = os.read(pipe_read, 4096) data_buffer += data liblog.pipe(f"Received {len(data_buffer)} bytes from the child process: {data_buffer!r}") return data_buffer
[docs] def close(self: PipeManager) -> None: """Closes all the pipes of the child process.""" os.close(self.stdin_write) os.close(self.stdout_read) os.close(self.stderr_read)
[docs] def recv(self: PipeManager, numb: int | None = None, timeout: int = timeout_default) -> bytes: """Receives at most numb bytes from the child process stdout. Args: numb (int, optional): number of bytes to receive. Defaults to None. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received bytes from the child process stdout. """ return self._recv(numb=numb, timeout=timeout, stderr=False)
[docs] def recverr(self: PipeManager, numb: int | None = None, timeout: int = timeout_default) -> bytes: """Receives at most numb bytes from the child process stderr. Args: numb (int, optional): number of bytes to receive. Defaults to None. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received bytes from the child process stderr. """ return self._recv(numb=numb, timeout=timeout, stderr=True)
def _recvonceuntil( self: PipeManager, delims: bytes, drop: bool = False, timeout: float = timeout_default, stderr: bool = False, ) -> bytes: """Receives data from the child process until the delimiters are found. Args: delims (bytes): delimiters where to stop. drop (bool, optional): drop the delimiter. Defaults to False. timeout (float, optional): timeout in seconds. Defaults to timeout_default. stderr (bool, optional): receive from stderr. Defaults to False. Returns: bytes: received data from the child process stdout. Raises: RuntimeError: no stdout pipe of the child process. TimeoutError: timeout reached. """ if isinstance(delims, str): liblog.warning("The delimiters are a string, converting to bytes") delims = delims.encode() # Buffer for the received data data_buffer = b"" # Setting the alarm end_time = time.time() + timeout while True: if end_time is not None and time.time() > end_time: # Timeout reached raise TimeoutError("Timeout reached") # Adjust the timeout for select to the remaining time remaining_time = None if end_time is None else max(0, end_time - time.time()) data = self._recv(numb=1, timeout=remaining_time, stderr=stderr) data_buffer += data if delims in data_buffer: # Delims reached if drop: data_buffer = data_buffer[: -len(delims)] break return data_buffer def _recvuntil( self: PipeManager, delims: bytes, occurences: int = 1, drop: bool = False, timeout: float = timeout_default, stderr: bool = False, ) -> bytes: """Receives data from the child process until the delimiters are found occurences time. Args: delims (bytes): delimiters where to stop. occurences (int, optional): number of delimiters to find. Defaults to 1. drop (bool, optional): drop the delimiter. Defaults to False. timeout (float, optional): timeout in seconds. Defaults to timeout_default. stderr (bool, optional): receive from stderr. Defaults to False. Returns: bytes: received data from the child process stdout. """ if occurences <= 0: raise ValueError("The number of occurences to receive must be positive") # Buffer for the received data data_buffer = b"" # Setting the alarm end_time = time.time() + timeout for _ in range(occurences): # Adjust the timeout for select to the remaining time remaining_time = None if end_time is None else max(0, end_time - time.time()) data_buffer += self._recvonceuntil(delims=delims, drop=drop, timeout=remaining_time, stderr=stderr) return data_buffer
[docs] def recvuntil( self: PipeManager, delims: bytes, occurences: int = 1, drop: bool = False, timeout: int = timeout_default, ) -> bytes: """Receives data from the child process stdout until the delimiters are found. Args: delims (bytes): delimiters where to stop. occurences (int, optional): number of delimiters to find. Defaults to 1. drop (bool, optional): drop the delimiter. Defaults to False. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received data from the child process stdout. """ return self._recvuntil( delims=delims, occurences=occurences, drop=drop, timeout=timeout, stderr=False, )
[docs] def recverruntil( self: PipeManager, delims: bytes, occurences: int = 1, drop: bool = False, timeout: int = timeout_default, ) -> bytes: """Receives data from the child process stderr until the delimiters are found. Args: delims (bytes): delimiters where to stop. occurences (int, optional): number of delimiters to find. Defaults to 1. drop (bool, optional): drop the delimiter. Defaults to False. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received data from the child process stderr. """ return self._recvuntil( delims=delims, occurences=occurences, drop=drop, timeout=timeout, stderr=True, )
[docs] def recvline(self: PipeManager, numlines: int = 1, drop: bool = True, timeout: int = timeout_default) -> bytes: """Receives numlines lines from the child process stdout. Args: numlines (int, optional): number of lines to receive. Defaults to 1. drop (bool, optional): drop the line ending. Defaults to True. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received lines from the child process stdout. """ return self.recvuntil(delims=b"\n", occurences=numlines, drop=drop, timeout=timeout)
[docs] def recverrline(self: PipeManager, numlines: int = 1, drop: bool = True, timeout: int = timeout_default) -> bytes: """Receives numlines lines from the child process stderr. Args: numlines (int, optional): number of lines to receive. Defaults to 1. drop (bool, optional): drop the line ending. Defaults to True. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received lines from the child process stdout. """ return self.recverruntil(delims=b"\n", occurences=numlines, drop=drop, timeout=timeout)
[docs] def send(self: PipeManager, data: bytes) -> int: """Sends data to the child process stdin. Args: data (bytes): data to send. Returns: int: number of bytes sent. Raises: RuntimeError: no stdin pipe of the child process. """ if not self.stdin_write: raise RuntimeError("No stdin pipe of the child process") liblog.pipe(f"Sending {len(data)} bytes to the child process: {data!r}") if isinstance(data, str): liblog.warning("The input data is a string, converting to bytes") data = data.encode() return os.write(self.stdin_write, data)
[docs] def sendline(self: PipeManager, data: bytes) -> int: """Sends data to the child process stdin and append a newline. Args: data (bytes): data to send. Returns: int: number of bytes sent. """ if isinstance(data, str): liblog.warning("The input data is a string, converting to bytes") data = data.encode() return self.send(data=data + b"\n")
[docs] def sendafter( self: PipeManager, delims: bytes, data: bytes, occurences: int = 1, drop: bool = False, timeout: int = timeout_default, ) -> tuple[bytes, int]: """Sends data to the child process stdin after the delimiters are found. Args: delims (bytes): delimiters where to stop. data (bytes): data to send. occurences (int, optional): number of delimiters to find. Defaults to 1. drop (bool, optional): drop the delimiter. Defaults to False. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received data from the child process stdout. int: number of bytes sent. """ received = self.recvuntil(delims=delims, occurences=occurences, drop=drop, timeout=timeout) sent = self.send(data) return (received, sent)
[docs] def sendlineafter( self: PipeManager, delims: bytes, data: bytes, occurences: int = 1, drop: bool = False, timeout: int = timeout_default, ) -> tuple[bytes, int]: """Sends line to the child process stdin after the delimiters are found. Args: delims (bytes): delimiters where to stop. data (bytes): data to send. occurences (int, optional): number of delimiters to find. Defaults to 1. drop (bool, optional): drop the delimiter. Defaults to False. timeout (int, optional): timeout in seconds. Defaults to timeout_default. Returns: bytes: received data from the child process stdout. int: number of bytes sent. """ received = self.recvuntil(delims=delims, occurences=occurences, drop=drop, timeout=timeout) sent = self.sendline(data) return (received, sent)