From 458ceee13df605d910964ef30204ad71dd0ee0f7 Mon Sep 17 00:00:00 2001 From: Zepu Zhang Date: Tue, 9 Nov 2021 21:56:27 -0800 Subject: [PATCH 1/2] bump upathlib version requirement --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 7878dd8..9792278 100644 --- a/setup.cfg +++ b/setup.cfg @@ -17,7 +17,7 @@ package_dir = packages = find: python_requires = >= 3.7 install_requires= - upathlib >= 0.5.0 + upathlib >= 0.6.1 # If user needs to use Biglist with Azure or GCP blob stores, From 54ced12552693a9a22c393f41b3856dcb22bb1a5 Mon Sep 17 00:00:00 2001 From: Zepu Zhang Date: Thu, 11 Nov 2021 22:29:17 -0800 Subject: [PATCH 2/2] fine-tune --- CHANGELOG | 6 ++++++ src/biglist/__init__.py | 2 +- src/biglist/_biglist.py | 39 ++++++++++++++++++++++----------------- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 7ce4c6a..08265d7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,9 @@ +Release 0.6.2 +============= + +- Fix and fine-tuning related to threading. + + Release 0.6.1 ============= diff --git a/src/biglist/__init__.py b/src/biglist/__init__.py index 31cf8e9..44c56a7 100644 --- a/src/biglist/__init__.py +++ b/src/biglist/__init__.py @@ -1,7 +1,7 @@ from ._biglist import Biglist, ListView, FileView, FileIterStat -__version__ = '0.6.1' +__version__ = '0.6.2' __all__ = [ diff --git a/src/biglist/_biglist.py b/src/biglist/_biglist.py index 32be01c..e49a8c5 100644 --- a/src/biglist/_biglist.py +++ b/src/biglist/_biglist.py @@ -51,6 +51,11 @@ def __init__(self, max_workers: int = 3): self._task_file_data: Dict[Future, Tuple] = {} + def __del__(self): + if self._executor is not None: + self._executor.shutdown() + self._executor = None + def _callback(self, t): self._sem.release() del self._task_file_data[t] @@ -428,27 +433,27 @@ def __iter__(self): elif ndatafiles > 1: max_workers = min(3, ndatafiles) tasks = queue.Queue(max_workers) - executor = ThreadPoolExecutor(max_workers) - for i in range(max_workers): - t = executor.submit( - self.load_data_file, - self._data_dir / datafiles[i][0] - ) - tasks.put(t) - nfiles_queued = max_workers - - for _ in range(ndatafiles): - t = tasks.get() - data = t.result() - - if nfiles_queued < ndatafiles: + with ThreadPoolExecutor(max_workers) as executor: + for i in range(max_workers): t = executor.submit( self.load_data_file, - self._data_dir / datafiles[nfiles_queued][0] + self._data_dir / datafiles[i][0] ) tasks.put(t) - nfiles_queued += 1 - yield from data + nfiles_queued = max_workers + + for _ in range(ndatafiles): + t = tasks.get() + data = t.result() + + if nfiles_queued < ndatafiles: + t = executor.submit( + self.load_data_file, + self._data_dir / datafiles[nfiles_queued][0] + ) + tasks.put(t) + nfiles_queued += 1 + yield from data if self._append_buffer: yield from self._append_buffer