Skip to content

Commit

Permalink
Add some utility to IO class (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
OranPie authored Oct 12, 2024
1 parent c92f60d commit 036d097
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 17 deletions.
73 changes: 58 additions & 15 deletions cyaron/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def __init__(self,
input_file: Optional[Union[IOBase, str, int]] = None,
output_file: Optional[Union[IOBase, str, int]] = None,
data_id: Optional[int] = None,
disable_output: bool = False):
disable_output: bool = False,
make_dirs: bool = False):
...

@overload
Expand All @@ -31,7 +32,8 @@ def __init__(self,
file_prefix: Optional[str] = None,
input_suffix: str = '.in',
output_suffix: str = '.out',
disable_output: bool = False):
disable_output: bool = False,
make_dirs: bool = False):
...

def __init__( # type: ignore
Expand All @@ -42,20 +44,22 @@ def __init__( # type: ignore
file_prefix: Optional[str] = None,
input_suffix: str = '.in',
output_suffix: str = '.out',
disable_output: bool = False):
disable_output: bool = False,
make_dirs: bool = False):
"""
Args:
input_file (optional): input file object or filename or file descriptor.
input_file (optional): input file object or filename or file descriptor.
If it's None, make a temp file. Defaults to None.
output_file (optional): input file object or filename or file descriptor.
output_file (optional): input file object or filename or file descriptor.
If it's None, make a temp file. Defaults to None.
data_id (optional): the id of the data. It will be add after
data_id (optional): the id of the data. It will be add after
`input_file` and `output_file` when they are str.
If it's None, the file names will not contain the id. Defaults to None.
file_prefix (optional): the prefix for the input and output files. Defaults to None.
input_suffix (optional): the suffix of the input file. Defaults to '.in'.
output_suffix (optional): the suffix of the output file. Defaults to '.out'.
disable_output (optional): set to True to disable output file. Defaults to False.
make_dirs (optional): set to True to create dir if path is not found. Defaults to False.
Examples:
>>> IO("a","b")
# create input file "a" and output file "b"
Expand All @@ -75,7 +79,12 @@ def __init__( # type: ignore
# create input file "data2.in" and output file "data2.out"
>>> IO(open('data.in', 'w+'), open('data.out', 'w+'))
# input file "data.in" and output file "data.out"
>>> IO("./io/data.in", "./io/data.out", disable_output = True)
# input file "./io/data.in" and output file "./io/data.out"
# if the dir "./io" not found it will be created
"""
self.__closed = False
self.input_file, self.output_file = None, None
if file_prefix is not None:
# legacy mode
input_file = '{}{{}}{}'.format(self.__escape_format(file_prefix),
Expand All @@ -85,16 +94,16 @@ def __init__( # type: ignore
self.__escape_format(output_suffix))
self.input_filename, self.output_filename = None, None
self.__input_temp, self.__output_temp = False, False
self.__init_file(input_file, data_id, "i")
self.__init_file(input_file, data_id, "i", make_dirs)
if not disable_output:
self.__init_file(output_file, data_id, "o")
self.__init_file(output_file, data_id, "o", make_dirs)
else:
self.output_file = None
self.__closed = False
self.is_first_char = {}

def __init_file(self, f: Union[IOBase, str, int, None],
data_id: Union[int, None], file_type: str):
def __init_file(self, f: Union[IOBase, str, int,
None], data_id: Union[int, None],
file_type: str, make_dirs: bool):
if isinstance(f, IOBase):
# consider ``f`` as a file object
if file_type == "i":
Expand All @@ -104,30 +113,36 @@ def __init_file(self, f: Union[IOBase, str, int, None],
elif isinstance(f, int):
# consider ``f`` as a file descor
self.__init_file(open(f, 'w+', encoding="utf-8", newline='\n'),
data_id, file_type)
data_id, file_type, make_dirs)
elif f is None:
# consider wanna temp file
fd, self.input_filename = tempfile.mkstemp()
self.__init_file(fd, data_id, file_type)
self.__init_file(fd, data_id, file_type, make_dirs)
if file_type == "i":
self.__input_temp = True
else:
self.__output_temp = True
else:
# consider ``f`` as filename template
filename = f.format(data_id or "")
# be sure dir is existed
if make_dirs:
self.__make_dirs(filename)
if file_type == "i":
self.input_filename = filename
else:
self.output_filename = filename
self.__init_file(
open(filename, 'w+', newline='\n', encoding='utf-8'), data_id,
file_type)
file_type, make_dirs)

def __escape_format(self, st: str):
"""replace "{}" to "{{}}" """
return re.sub(r"\{", "{{", re.sub(r"\}", "}}", st))

def __make_dirs(self, pth: str):
os.makedirs(os.path.dirname(pth), exist_ok=True)

def __del_files(self):
"""delete files"""
if self.__input_temp and self.input_filename is not None:
Expand Down Expand Up @@ -170,7 +185,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def __write(self, file: IOBase, *args, **kwargs):
"""
Write every element in *args into file. If the element isn't "\n", insert `separator`.
Write every element in *args into file. If the element isn't "\n", insert `separator`.
It will convert every element into str.
"""
separator = kwargs.get("separator", " ")
Expand All @@ -185,6 +200,17 @@ def __write(self, file: IOBase, *args, **kwargs):
if arg == "\n":
self.is_first_char[file] = True

def __clear(self, file: IOBase, pos: int = 0):
"""
Clear the content use truncate()
Args:
file: Which file to clear
pos: Where file will truncate.
"""
file.truncate(pos)
self.is_first_char[file] = True
file.seek(pos)

def input_write(self, *args, **kwargs):
"""
Write every element in *args into the input file. Splits with `separator`.
Expand All @@ -208,6 +234,15 @@ def input_writeln(self, *args, **kwargs):
args.append("\n")
self.input_write(*args, **kwargs)

def input_clear_content(self, pos: int = 0):
"""
Clear the content of input
Args:
pos: Where file will truncate.
"""

self.__clear(self.input_file, pos)

def output_gen(self, shell_cmd, time_limit=None):
"""
Run the command `shell_cmd` (usually the std program) and send it the input file as stdin.
Expand Down Expand Up @@ -268,6 +303,14 @@ def output_writeln(self, *args, **kwargs):
args.append("\n")
self.output_write(*args, **kwargs)

def output_clear_content(self, pos: int = 0):
"""
Clear the content of output
Args:
pos: Where file will truncate
"""
self.__clear(self.output_file, pos)

def flush_buffer(self):
"""Flush the input file"""
self.input_file.flush()
56 changes: 54 additions & 2 deletions cyaron/tests/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ def test_write_stuff(self):
input = f.read()
with open("test_write.out") as f:
output = f.read()
self.assertEqual(input.split(), ["1", "2", "3", "4", "5", "6", "7", "8", "9"])
self.assertEqual(output.split(), ["9", "8", "7", "6", "5", "4", "3", "2", "1"])
self.assertEqual(input.split(),
["1", "2", "3", "4", "5", "6", "7", "8", "9"])
self.assertEqual(output.split(),
["9", "8", "7", "6", "5", "4", "3", "2", "1"])
self.assertEqual(input.count("\n"), 2)
self.assertEqual(output.count("\n"), 2)

Expand Down Expand Up @@ -111,3 +113,53 @@ def test_init_overload(self):
with IO(fin, fout) as test:
self.assertEqual(test.input_file, fin)
self.assertEqual(test.output_file, fout)

def test_make_dirs(self):
mkdir_true = False
with IO("./automkdir_true/data.in",
"./automkdir_true/data.out",
make_dirs=True):
mkdir_true = os.path.exists("./automkdir_true")
self.assertEqual(mkdir_true, True)

mkdir_false = False
try:
with IO(
"./automkdir_false/data.in",
"./automkdir_false/data.out",
):
pass
except FileNotFoundError:
mkdir_false = True
mkdir_false &= not os.path.exists("./automkdir_false")
self.assertEqual(mkdir_false, True)

def test_output_clear_content(self):
with IO("test_clear.in", "test_clear.out") as test:
test.input_write("This is a test.")
test.input_clear_content()
test.input_write("Cleared content.")
test.output_write("This is a test.")
test.output_clear_content()
test.output_write("Cleared content.")
with open("test_clear.in", encoding="utf-8") as f:
input_text = f.read()
with open("test_clear.out", encoding="utf-8") as f:
output_text = f.read()
self.assertEqual(input_text, "Cleared content.")
self.assertEqual(output_text, "Cleared content.")

def test_output_clear_content_with_position(self):
with IO("test_clear_pos.in", "test_clear_pos.out") as test:
test.input_write("This is a test.")
test.input_clear_content(5)
test.input_write("Cleared content.")
test.output_write("This is a test.")
test.output_clear_content(5)
test.output_write("Cleared content.")
with open("test_clear_pos.in", encoding="utf-8") as f:
input_text = f.read()
with open("test_clear_pos.out", encoding="utf-8") as f:
output_text = f.read()
self.assertEqual(input_text, "This Cleared content.")
self.assertEqual(output_text, "This Cleared content.")

0 comments on commit 036d097

Please sign in to comment.