Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some utility to IO class #137

Merged
merged 15 commits into from
Oct 12, 2024
73 changes: 58 additions & 15 deletions cyaron/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def __init__(self,
input_file: Optional[Union[IOBase, str, int]] = None,
output_file: Optional[Union[IOBase, str, int]] = None,
data_id: Optional[int] = None,
disable_output: bool = False):
disable_output: bool = False,
make_dirs: bool = False):
...

@overload
Expand All @@ -31,7 +32,8 @@ def __init__(self,
file_prefix: Optional[str] = None,
input_suffix: str = '.in',
output_suffix: str = '.out',
disable_output: bool = False):
disable_output: bool = False,
make_dirs: bool = False):
...

def __init__( # type: ignore
Expand All @@ -42,20 +44,22 @@ def __init__( # type: ignore
file_prefix: Optional[str] = None,
input_suffix: str = '.in',
output_suffix: str = '.out',
disable_output: bool = False):
disable_output: bool = False,
make_dirs: bool = False):
"""
Args:
input_file (optional): input file object or filename or file descriptor.
input_file (optional): input file object or filename or file descriptor.
If it's None, make a temp file. Defaults to None.
output_file (optional): input file object or filename or file descriptor.
output_file (optional): input file object or filename or file descriptor.
If it's None, make a temp file. Defaults to None.
data_id (optional): the id of the data. It will be add after
data_id (optional): the id of the data. It will be add after
`input_file` and `output_file` when they are str.
If it's None, the file names will not contain the id. Defaults to None.
file_prefix (optional): the prefix for the input and output files. Defaults to None.
input_suffix (optional): the suffix of the input file. Defaults to '.in'.
output_suffix (optional): the suffix of the output file. Defaults to '.out'.
disable_output (optional): set to True to disable output file. Defaults to False.
make_dirs (optional): set to True to create dir if path is not found. Defaults to False.
Examples:
>>> IO("a","b")
# create input file "a" and output file "b"
Expand All @@ -75,7 +79,12 @@ def __init__( # type: ignore
# create input file "data2.in" and output file "data2.out"
>>> IO(open('data.in', 'w+'), open('data.out', 'w+'))
# input file "data.in" and output file "data.out"
>>> IO("./io/data.in", "./io/data.out", disable_output = True)
# input file "./io/data.in" and output file "./io/data.out"
# if the dir "./io" not found it will be created
"""
self.__closed = False
self.input_file, self.output_file = None, None
if file_prefix is not None:
# legacy mode
input_file = '{}{{}}{}'.format(self.__escape_format(file_prefix),
Expand All @@ -85,16 +94,16 @@ def __init__( # type: ignore
self.__escape_format(output_suffix))
self.input_filename, self.output_filename = None, None
self.__input_temp, self.__output_temp = False, False
self.__init_file(input_file, data_id, "i")
self.__init_file(input_file, data_id, "i", make_dirs)
if not disable_output:
self.__init_file(output_file, data_id, "o")
self.__init_file(output_file, data_id, "o", make_dirs)
else:
self.output_file = None
self.__closed = False
self.is_first_char = {}

def __init_file(self, f: Union[IOBase, str, int, None],
data_id: Union[int, None], file_type: str):
def __init_file(self, f: Union[IOBase, str, int,
None], data_id: Union[int, None],
file_type: str, make_dirs: bool):
if isinstance(f, IOBase):
# consider ``f`` as a file object
if file_type == "i":
Expand All @@ -104,30 +113,36 @@ def __init_file(self, f: Union[IOBase, str, int, None],
elif isinstance(f, int):
# consider ``f`` as a file descor
self.__init_file(open(f, 'w+', encoding="utf-8", newline='\n'),
data_id, file_type)
data_id, file_type, make_dirs)
elif f is None:
# consider wanna temp file
fd, self.input_filename = tempfile.mkstemp()
self.__init_file(fd, data_id, file_type)
self.__init_file(fd, data_id, file_type, make_dirs)
if file_type == "i":
self.__input_temp = True
else:
self.__output_temp = True
else:
# consider ``f`` as filename template
filename = f.format(data_id or "")
# be sure dir is existed
if make_dirs:
self.__make_dirs(filename)
if file_type == "i":
self.input_filename = filename
else:
self.output_filename = filename
self.__init_file(
open(filename, 'w+', newline='\n', encoding='utf-8'), data_id,
file_type)
file_type, make_dirs)

def __escape_format(self, st: str):
"""replace "{}" to "{{}}" """
return re.sub(r"\{", "{{", re.sub(r"\}", "}}", st))

def __make_dirs(self, pth: str):
os.makedirs(os.path.dirname(pth), exist_ok=True)

def __del_files(self):
"""delete files"""
if self.__input_temp and self.input_filename is not None:
Expand Down Expand Up @@ -170,7 +185,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def __write(self, file: IOBase, *args, **kwargs):
"""
Write every element in *args into file. If the element isn't "\n", insert `separator`.
Write every element in *args into file. If the element isn't "\n", insert `separator`.
It will convert every element into str.
"""
separator = kwargs.get("separator", " ")
Expand All @@ -185,6 +200,17 @@ def __write(self, file: IOBase, *args, **kwargs):
if arg == "\n":
self.is_first_char[file] = True

def __clear(self, file: IOBase, pos: int = 0):
"""
Clear the content use truncate()
Args:
file: Which file to clear
pos: Where file will truncate.
"""
file.truncate(pos)
self.is_first_char[file] = True
file.seek(pos)

def input_write(self, *args, **kwargs):
"""
Write every element in *args into the input file. Splits with `separator`.
Expand All @@ -208,6 +234,15 @@ def input_writeln(self, *args, **kwargs):
args.append("\n")
self.input_write(*args, **kwargs)

def input_clear_content(self, pos: int = 0):
"""
Clear the content of input
Args:
pos: Where file will truncate.
"""

self.__clear(self.input_file, pos)

def output_gen(self, shell_cmd, time_limit=None):
"""
Run the command `shell_cmd` (usually the std program) and send it the input file as stdin.
Expand Down Expand Up @@ -268,6 +303,14 @@ def output_writeln(self, *args, **kwargs):
args.append("\n")
self.output_write(*args, **kwargs)

def output_clear_content(self, pos: int = 0):
"""
Clear the content of output
Args:
pos: Where file will truncate
"""
self.__clear(self.output_file, pos)

def flush_buffer(self):
"""Flush the input file"""
self.input_file.flush()
56 changes: 54 additions & 2 deletions cyaron/tests/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ def test_write_stuff(self):
input = f.read()
with open("test_write.out") as f:
output = f.read()
self.assertEqual(input.split(), ["1", "2", "3", "4", "5", "6", "7", "8", "9"])
self.assertEqual(output.split(), ["9", "8", "7", "6", "5", "4", "3", "2", "1"])
self.assertEqual(input.split(),
["1", "2", "3", "4", "5", "6", "7", "8", "9"])
self.assertEqual(output.split(),
["9", "8", "7", "6", "5", "4", "3", "2", "1"])
self.assertEqual(input.count("\n"), 2)
self.assertEqual(output.count("\n"), 2)

Expand Down Expand Up @@ -111,3 +113,53 @@ def test_init_overload(self):
with IO(fin, fout) as test:
self.assertEqual(test.input_file, fin)
self.assertEqual(test.output_file, fout)

def test_make_dirs(self):
mkdir_true = False
with IO("./automkdir_true/data.in",
"./automkdir_true/data.out",
make_dirs=True):
mkdir_true = os.path.exists("./automkdir_true")
self.assertEqual(mkdir_true, True)

mkdir_false = False
try:
with IO(
"./automkdir_false/data.in",
"./automkdir_false/data.out",
):
pass
except FileNotFoundError:
mkdir_false = True
mkdir_false &= not os.path.exists("./automkdir_false")
self.assertEqual(mkdir_false, True)

def test_output_clear_content(self):
with IO("test_clear.in", "test_clear.out") as test:
test.input_write("This is a test.")
test.input_clear_content()
test.input_write("Cleared content.")
test.output_write("This is a test.")
test.output_clear_content()
test.output_write("Cleared content.")
with open("test_clear.in", encoding="utf-8") as f:
input_text = f.read()
with open("test_clear.out", encoding="utf-8") as f:
output_text = f.read()
self.assertEqual(input_text, "Cleared content.")
self.assertEqual(output_text, "Cleared content.")

def test_output_clear_content_with_position(self):
with IO("test_clear_pos.in", "test_clear_pos.out") as test:
test.input_write("This is a test.")
test.input_clear_content(5)
test.input_write("Cleared content.")
test.output_write("This is a test.")
test.output_clear_content(5)
test.output_write("Cleared content.")
with open("test_clear_pos.in", encoding="utf-8") as f:
input_text = f.read()
with open("test_clear_pos.out", encoding="utf-8") as f:
output_text = f.read()
self.assertEqual(input_text, "This Cleared content.")
self.assertEqual(output_text, "This Cleared content.")
Loading