Skip to content

Commit

Permalink
Merge pull request #3 from zpz/zepu
Browse files Browse the repository at this point in the history
Zepu
  • Loading branch information
zpz authored Nov 12, 2021
2 parents e370ee7 + 54ced12 commit 110160e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Release 0.6.2
=============

- Fix and fine-tuning related to threading.


Release 0.6.1
=============

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package_dir =
packages = find:
python_requires = >= 3.7
install_requires=
upathlib >= 0.5.0
upathlib >= 0.6.1


# If user needs to use Biglist with Azure or GCP blob stores,
Expand Down
2 changes: 1 addition & 1 deletion src/biglist/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ._biglist import Biglist, ListView, FileView, FileIterStat


__version__ = '0.6.1'
__version__ = '0.6.2'


__all__ = [
Expand Down
39 changes: 22 additions & 17 deletions src/biglist/_biglist.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def __init__(self, max_workers: int = 3):

self._task_file_data: Dict[Future, Tuple] = {}

def __del__(self):
if self._executor is not None:
self._executor.shutdown()
self._executor = None

def _callback(self, t):
self._sem.release()
del self._task_file_data[t]
Expand Down Expand Up @@ -428,27 +433,27 @@ def __iter__(self):
elif ndatafiles > 1:
max_workers = min(3, ndatafiles)
tasks = queue.Queue(max_workers)
executor = ThreadPoolExecutor(max_workers)
for i in range(max_workers):
t = executor.submit(
self.load_data_file,
self._data_dir / datafiles[i][0]
)
tasks.put(t)
nfiles_queued = max_workers

for _ in range(ndatafiles):
t = tasks.get()
data = t.result()

if nfiles_queued < ndatafiles:
with ThreadPoolExecutor(max_workers) as executor:
for i in range(max_workers):
t = executor.submit(
self.load_data_file,
self._data_dir / datafiles[nfiles_queued][0]
self._data_dir / datafiles[i][0]
)
tasks.put(t)
nfiles_queued += 1
yield from data
nfiles_queued = max_workers

for _ in range(ndatafiles):
t = tasks.get()
data = t.result()

if nfiles_queued < ndatafiles:
t = executor.submit(
self.load_data_file,
self._data_dir / datafiles[nfiles_queued][0]
)
tasks.put(t)
nfiles_queued += 1
yield from data

if self._append_buffer:
yield from self._append_buffer
Expand Down

0 comments on commit 110160e

Please sign in to comment.