From 036d097c60861ee58cceb14742fcd13efa6d6d77 Mon Sep 17 00:00:00 2001 From: OrangePie <125730489+OranPie@users.noreply.github.com> Date: Sat, 12 Oct 2024 16:57:01 +0800 Subject: [PATCH] Add some utility to IO class (#137) --- cyaron/io.py | 73 ++++++++++++++++++++++++++++++++--------- cyaron/tests/io_test.py | 56 +++++++++++++++++++++++++++++-- 2 files changed, 112 insertions(+), 17 deletions(-) diff --git a/cyaron/io.py b/cyaron/io.py index 1ddb9ee..bbfa7f9 100644 --- a/cyaron/io.py +++ b/cyaron/io.py @@ -22,7 +22,8 @@ def __init__(self, input_file: Optional[Union[IOBase, str, int]] = None, output_file: Optional[Union[IOBase, str, int]] = None, data_id: Optional[int] = None, - disable_output: bool = False): + disable_output: bool = False, + make_dirs: bool = False): ... @overload @@ -31,7 +32,8 @@ def __init__(self, file_prefix: Optional[str] = None, input_suffix: str = '.in', output_suffix: str = '.out', - disable_output: bool = False): + disable_output: bool = False, + make_dirs: bool = False): ... def __init__( # type: ignore @@ -42,20 +44,22 @@ def __init__( # type: ignore file_prefix: Optional[str] = None, input_suffix: str = '.in', output_suffix: str = '.out', - disable_output: bool = False): + disable_output: bool = False, + make_dirs: bool = False): """ Args: - input_file (optional): input file object or filename or file descriptor. + input_file (optional): input file object or filename or file descriptor. If it's None, make a temp file. Defaults to None. - output_file (optional): input file object or filename or file descriptor. + output_file (optional): input file object or filename or file descriptor. If it's None, make a temp file. Defaults to None. - data_id (optional): the id of the data. It will be add after + data_id (optional): the id of the data. It will be add after `input_file` and `output_file` when they are str. If it's None, the file names will not contain the id. Defaults to None. file_prefix (optional): the prefix for the input and output files. Defaults to None. input_suffix (optional): the suffix of the input file. Defaults to '.in'. output_suffix (optional): the suffix of the output file. Defaults to '.out'. disable_output (optional): set to True to disable output file. Defaults to False. + make_dirs (optional): set to True to create dir if path is not found. Defaults to False. Examples: >>> IO("a","b") # create input file "a" and output file "b" @@ -75,7 +79,12 @@ def __init__( # type: ignore # create input file "data2.in" and output file "data2.out" >>> IO(open('data.in', 'w+'), open('data.out', 'w+')) # input file "data.in" and output file "data.out" + >>> IO("./io/data.in", "./io/data.out", disable_output = True) + # input file "./io/data.in" and output file "./io/data.out" + # if the dir "./io" not found it will be created """ + self.__closed = False + self.input_file, self.output_file = None, None if file_prefix is not None: # legacy mode input_file = '{}{{}}{}'.format(self.__escape_format(file_prefix), @@ -85,16 +94,16 @@ def __init__( # type: ignore self.__escape_format(output_suffix)) self.input_filename, self.output_filename = None, None self.__input_temp, self.__output_temp = False, False - self.__init_file(input_file, data_id, "i") + self.__init_file(input_file, data_id, "i", make_dirs) if not disable_output: - self.__init_file(output_file, data_id, "o") + self.__init_file(output_file, data_id, "o", make_dirs) else: self.output_file = None - self.__closed = False self.is_first_char = {} - def __init_file(self, f: Union[IOBase, str, int, None], - data_id: Union[int, None], file_type: str): + def __init_file(self, f: Union[IOBase, str, int, + None], data_id: Union[int, None], + file_type: str, make_dirs: bool): if isinstance(f, IOBase): # consider ``f`` as a file object if file_type == "i": @@ -104,11 +113,11 @@ def __init_file(self, f: Union[IOBase, str, int, None], elif isinstance(f, int): # consider ``f`` as a file descor self.__init_file(open(f, 'w+', encoding="utf-8", newline='\n'), - data_id, file_type) + data_id, file_type, make_dirs) elif f is None: # consider wanna temp file fd, self.input_filename = tempfile.mkstemp() - self.__init_file(fd, data_id, file_type) + self.__init_file(fd, data_id, file_type, make_dirs) if file_type == "i": self.__input_temp = True else: @@ -116,18 +125,24 @@ def __init_file(self, f: Union[IOBase, str, int, None], else: # consider ``f`` as filename template filename = f.format(data_id or "") + # be sure dir is existed + if make_dirs: + self.__make_dirs(filename) if file_type == "i": self.input_filename = filename else: self.output_filename = filename self.__init_file( open(filename, 'w+', newline='\n', encoding='utf-8'), data_id, - file_type) + file_type, make_dirs) def __escape_format(self, st: str): """replace "{}" to "{{}}" """ return re.sub(r"\{", "{{", re.sub(r"\}", "}}", st)) + def __make_dirs(self, pth: str): + os.makedirs(os.path.dirname(pth), exist_ok=True) + def __del_files(self): """delete files""" if self.__input_temp and self.input_filename is not None: @@ -170,7 +185,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __write(self, file: IOBase, *args, **kwargs): """ - Write every element in *args into file. If the element isn't "\n", insert `separator`. + Write every element in *args into file. If the element isn't "\n", insert `separator`. It will convert every element into str. """ separator = kwargs.get("separator", " ") @@ -185,6 +200,17 @@ def __write(self, file: IOBase, *args, **kwargs): if arg == "\n": self.is_first_char[file] = True + def __clear(self, file: IOBase, pos: int = 0): + """ + Clear the content use truncate() + Args: + file: Which file to clear + pos: Where file will truncate. + """ + file.truncate(pos) + self.is_first_char[file] = True + file.seek(pos) + def input_write(self, *args, **kwargs): """ Write every element in *args into the input file. Splits with `separator`. @@ -208,6 +234,15 @@ def input_writeln(self, *args, **kwargs): args.append("\n") self.input_write(*args, **kwargs) + def input_clear_content(self, pos: int = 0): + """ + Clear the content of input + Args: + pos: Where file will truncate. + """ + + self.__clear(self.input_file, pos) + def output_gen(self, shell_cmd, time_limit=None): """ Run the command `shell_cmd` (usually the std program) and send it the input file as stdin. @@ -268,6 +303,14 @@ def output_writeln(self, *args, **kwargs): args.append("\n") self.output_write(*args, **kwargs) + def output_clear_content(self, pos: int = 0): + """ + Clear the content of output + Args: + pos: Where file will truncate + """ + self.__clear(self.output_file, pos) + def flush_buffer(self): """Flush the input file""" self.input_file.flush() diff --git a/cyaron/tests/io_test.py b/cyaron/tests/io_test.py index b83dc03..42cb996 100644 --- a/cyaron/tests/io_test.py +++ b/cyaron/tests/io_test.py @@ -56,8 +56,10 @@ def test_write_stuff(self): input = f.read() with open("test_write.out") as f: output = f.read() - self.assertEqual(input.split(), ["1", "2", "3", "4", "5", "6", "7", "8", "9"]) - self.assertEqual(output.split(), ["9", "8", "7", "6", "5", "4", "3", "2", "1"]) + self.assertEqual(input.split(), + ["1", "2", "3", "4", "5", "6", "7", "8", "9"]) + self.assertEqual(output.split(), + ["9", "8", "7", "6", "5", "4", "3", "2", "1"]) self.assertEqual(input.count("\n"), 2) self.assertEqual(output.count("\n"), 2) @@ -111,3 +113,53 @@ def test_init_overload(self): with IO(fin, fout) as test: self.assertEqual(test.input_file, fin) self.assertEqual(test.output_file, fout) + + def test_make_dirs(self): + mkdir_true = False + with IO("./automkdir_true/data.in", + "./automkdir_true/data.out", + make_dirs=True): + mkdir_true = os.path.exists("./automkdir_true") + self.assertEqual(mkdir_true, True) + + mkdir_false = False + try: + with IO( + "./automkdir_false/data.in", + "./automkdir_false/data.out", + ): + pass + except FileNotFoundError: + mkdir_false = True + mkdir_false &= not os.path.exists("./automkdir_false") + self.assertEqual(mkdir_false, True) + + def test_output_clear_content(self): + with IO("test_clear.in", "test_clear.out") as test: + test.input_write("This is a test.") + test.input_clear_content() + test.input_write("Cleared content.") + test.output_write("This is a test.") + test.output_clear_content() + test.output_write("Cleared content.") + with open("test_clear.in", encoding="utf-8") as f: + input_text = f.read() + with open("test_clear.out", encoding="utf-8") as f: + output_text = f.read() + self.assertEqual(input_text, "Cleared content.") + self.assertEqual(output_text, "Cleared content.") + + def test_output_clear_content_with_position(self): + with IO("test_clear_pos.in", "test_clear_pos.out") as test: + test.input_write("This is a test.") + test.input_clear_content(5) + test.input_write("Cleared content.") + test.output_write("This is a test.") + test.output_clear_content(5) + test.output_write("Cleared content.") + with open("test_clear_pos.in", encoding="utf-8") as f: + input_text = f.read() + with open("test_clear_pos.out", encoding="utf-8") as f: + output_text = f.read() + self.assertEqual(input_text, "This Cleared content.") + self.assertEqual(output_text, "This Cleared content.")