Skip to content

Commit

Permalink
Merge pull request #132 from zpz/zepu
Browse files Browse the repository at this point in the history
Zepu
  • Loading branch information
zpz authored Jun 21, 2024
2 parents 25a10f5 + a99a7b1 commit 88d05f7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 26 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).


## [0.9.1] - 2024-06-20

- Finetuning in `Biglist.flush`.


## [0.9.0] - 2024-06-18

- Removed parameter `keep_files`. `__del__` no longer calls `destroy`; instead, it always try to call `flush`.
Expand Down
2 changes: 1 addition & 1 deletion src/biglist/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@
Slicer,
)

__version__ = '0.9.0'
__version__ = '0.9.1'
70 changes: 45 additions & 25 deletions src/biglist/_biglist.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import pyarrow
from typing_extensions import Self
from upathlib import LocalUpath, Path, PathType, Upath, resolve_path, serializer
from upathlib import LocalUpath, PathType, Upath, resolve_path, serializer

from ._parquet import ParquetFileReader, make_parquet_schema
from ._util import Element, FileReader, Seq
Expand Down Expand Up @@ -791,7 +791,7 @@ def __init__(
ff.write_json(self.info, overwrite=True)

def __del__(self) -> None:
if self._info_file.is_file(): # otherwise `destroy()` has been called
if self._info_file.is_file(): # otherwise `destroy()` may have been called
self._warn_flush()
self.flush()

Expand All @@ -816,9 +816,17 @@ def storage_version(self) -> int:

def _warn_flush(self):
if self._append_buffer or self._append_files_buffer:
# This is not the only situation that can be suspicious.
# If you used `flush(eager=True)` without a `flush()` later,
# both `self._append_buffer` and `self._append_files_buffer` can be
# empty yet the on-disk info is not fully integrated.
#
# Unless you know what you are doing, don't use `flush(eager=True)`.
warnings.warn(
f"did you forget to flush {self.__class__.__name__} at '{self.path}'?"
)
return True
return False

def __len__(self) -> int:
"""
Expand Down Expand Up @@ -1028,6 +1036,7 @@ def flush(
Note that `flush` is called automatically when a Biglist object is garbage collected.
However, user is strongly recommended to explicitly call `flush` at the end of their writing session.
(See :meth:`_warn_flush`.)
On the other hand, you should **not** call `flush` frequently "just to be safe".
It has I/O overhead, and it may create small data files because it flushes the append buffer
Expand Down Expand Up @@ -1055,32 +1064,44 @@ def flush(
# appends by other workers. The last call to ``flush`` across all workers
# will get the final meta info right.

data = []

if self._append_files_buffer:
# Saving file meta data without merging it into `info.json`.
# This puts the data structure in a transitional state.
filename = f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S.%f')}_{str(uuid4()).replace('-', '')[:10]}"
(self.path / '_flush_eager' / filename).write_json(
self._append_files_buffer, overwrite=False
)
self._append_files_buffer.clear()
if eager:
# Saving file meta data without merging it into `info.json`.
# This puts the data structure in a transitional state.
filename = f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S.%f')}_{str(uuid4()).replace('-', '')[:10]}"
(self.path / '_flush_eager' / filename).write_json(
self._append_files_buffer, overwrite=False
)
self._append_files_buffer.clear()
else:
data.extend(self._append_files_buffer)

if not eager:
# Merge file meta data into `info.json`, finalizing the data structure.
with self._info_file.lock(timeout=lock_timeout) as ff:
data = []
empty = True
if not data:
for f in (self.path / '_flush_eager').iterdir():
z = f.read_json()
data.extend(z)
f.remove_file()
if data:
self.info.update(ff.read_json())
z0 = self.info['data_files_info']
z = sorted(set((*(tuple(v[:2]) for v in z0), *map(tuple, data))))
# TODO: maybe a merge sort can be more efficient.
cum = list(itertools.accumulate(v[1] for v in z))
z = [(a, b, c) for (a, b), c in zip(z, cum)]
self.info['data_files_info'] = z
ff.write_json(self.info, overwrite=True)
empty = False
break
if data or not empty:
with self._info_file.lock(timeout=lock_timeout) as ff:
for f in (self.path / '_flush_eager').iterdir():
z = f.read_json()
data.extend(z)
f.remove_file()
if data:
self.info.update(ff.read_json())
z0 = self.info['data_files_info']
z = sorted(
set((*(tuple(v[:2]) for v in z0), *map(tuple, data)))
)
# TODO: maybe a merge sort can be more efficient.
cum = list(itertools.accumulate(v[1] for v in z))
z = [(a, b, c) for (a, b), c in zip(z, cum)]
self.info['data_files_info'] = z
ff.write_json(self.info, overwrite=True)

def reload(self) -> None:
"""
Expand Down Expand Up @@ -1405,8 +1426,7 @@ def new(
**kwargs
additional arguments are passed on to :meth:`__init__`.
"""
if isinstance(data_path, (str, Path, Upath)):
# TODO: in py 3.10, we will be able to do `isinstance(data_path, PathType)`
if isinstance(data_path, PathType):
data_path = [resolve_path(data_path)]
else:
data_path = [resolve_path(p) for p in data_path]
Expand Down

0 comments on commit 88d05f7

Please sign in to comment.