Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Asynchronous corpus updates #46

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
27 changes: 27 additions & 0 deletions allofplos/article_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import re
import subprocess

from io import BytesIO

import lxml.etree as et
import requests


from allofplos.transformations import (filename_to_doi, EXT_URL_TMP, INT_URL_TMP,
BASE_URL_ARTICLE_LANDING_PAGE)
from allofplos.plos_regex import (validate_doi, corpusdir)
Expand Down Expand Up @@ -1097,3 +1100,27 @@ def from_filename(cls, filename):
"""Initiate an article object using a local XML file.
"""
return cls(filename_to_doi(filename))

@classmethod
def from_bytes(cls, resp, directory=corpusdir, write=False, overwrite=True):
tree = et.parse(BytesIO(resp))
root = tree.getroot()
tag_path = ["/",
"article",
"front",
"article-meta",
"article-id"]
tag_location = '/'.join(tag_path)
article_ids = root.xpath(tag_location)
for art_id in article_ids:
if art_id.get('pub-id-type')=='doi':
temp = cls(art_id.text, directory=directory)
temp._tree = tree
if write and (not os.path.isfile(temp.filename) or overwrite):
with open(temp.filename, 'w') as file:
file.write(et.tostring(tree, method='xml', encoding='unicode'))
break
return temp



Empty file.
143 changes: 143 additions & 0 deletions allofplos/async_utils/fetch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import asyncio
import aiohttp
import requests
import time
import os
import shutil


import lxml.etree as et
from timeit import default_timer

from allofplos.plos_corpus import listdir_nohidden
from allofplos.plos_regex import ALLOFPLOS_DIR_PATH, corpusdir
from allofplos.transformations import URL_TMP, url_to_doi
from allofplos.samples.corpus_analysis import get_all_local_dois
from allofplos import Article

MIN_DELAY = 1.0 # minimum for wait before beginning the next http-request (in s)
MIN_FILES = 9990 # index of the files to start with
NUM_FILES = 10 # how many files do you process

ASYNC_DIRECTORY = os.path.join(ALLOFPLOS_DIR_PATH, "async_test_dir")
SYNC_DIRECTORY = os.path.join(ALLOFPLOS_DIR_PATH, "sync_test_dir")

async def fetch(doi, session):
"""Given a doi, fetch the associated url, using the given asynchronous
session (a ClientSession) as a context manager.

Returns the article created by transforming the content of the response.

NB: This needs to do better error handling if the url fails or points to an
invalid xml file.
"""
url = URL_TMP.format(doi)
async with session.get(url) as response:
resp = await response.read()
article = Article.from_bytes(resp,
directory=ASYNC_DIRECTORY,
write=True,
overwrite=True)
return article

async def fetch_all(dois, max_rate=MIN_DELAY, limit_per_host=3.0):
"""Launch requests for each doi.

This first gets all of the dois passed in as dois.

Then it checks for the existence of dois that are corrected articles that
should also be downloaded.
"""

tasks = []
conn = aiohttp.TCPConnector(limit_per_host=limit_per_host)
async with aiohttp.ClientSession(connector=conn) as session:
for doi in dois:
await asyncio.sleep(max_rate) # ensures no more requests than max_rate per second
task = asyncio.ensure_future(fetch(doi, session))
tasks.append(task) # create list of tasks

first_batch = await asyncio.gather(*tasks) # gather task responses
corrected_dois = [article.related_doi
for article in first_batch
if article.type_=="correction"]
for doi in corrected_dois:
await asyncio.sleep(max_rate) # ensures no more requests than max_rate per second
task = asyncio.ensure_future(fetch(doi, session))
tasks.append(task) # create list of tasks

second_batch = await asyncio.gather(*tasks) # gather task responses



def sequential_fetch(doi):
"""
Fetch urls on the basis of the doi being passed in as part of a sequential
process.

Returns the article created by transforming the content of the response.

NB: This needs to do better error handling if the url fails or points to an
invalid xml file.
"""
url = URL_TMP.format(doi)
response = requests.get(url)
time.sleep(MIN_DELAY)
article = Article.from_bytes(response.text.encode('utf-8'),
directory=ASYNC_DIRECTORY,
write=True)
return article

def demo_sequential(dois):
"""Organises the process of downloading articles associated with dois
to SYNC_DIRECTORY sequentially.

Side-effect: prints a timer to indicate how long it took.
"""
recreate_dir(SYNC_DIRECTORY)
start_time = default_timer()
for doi in dois:
start_time_url = default_timer()
article = sequential_fetch(doi)
if article.type_ == "correction":
new_article = sequential_fetch(article.related_doi)

tot_elapsed = default_timer() - start_time
print(' TOTAL SECONDS: '.rjust(30, '-') + '{0:5.2f} '. \
format(tot_elapsed, '\n'))


def demo_async(dois):
"""Organises the process of downloading articles associated with the doi to
ASYNC_DIRECTORY asynchronous functionality.

Side-effect: prints a timer to indicate how long it took.
"""
recreate_dir(ASYNC_DIRECTORY)
start_time = default_timer()
loop = asyncio.get_event_loop() # event loop
future = asyncio.ensure_future(fetch_all(dois)) # tasks to do
loop.run_until_complete(future) # loop until done
loop.run_until_complete(asyncio.sleep(0))
loop.close()
tot_elapsed = default_timer() - start_time
print(' TOTAL SECONDS: '.rjust(30, '-') + '{0:5.2f} '. \
format(tot_elapsed, '\n'))

def recreate_dir(directory):
"""Removes and recreates the directory.
"""
if os.path.isdir(directory):
shutil.rmtree(directory)
os.makedirs(directory, exist_ok=True)

def main():
"""Main loop for running and comparing the different appraoches.
"""

dois = get_all_local_dois(corpusdir)[MIN_FILES:MIN_FILES+NUM_FILES]
demo_sequential(dois)
demo_async(dois)

if __name__ == '__main__':
main()
23 changes: 5 additions & 18 deletions allofplos/plos_corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ def repo_download(dois, tempdir, ignore_existing=True, plos_network=False):
:param ignore_existing: Don't re-download to tempdir if already downloaded
"""
# make temporary directory, if needed
try:
os.mkdir(tempdir)
except FileExistsError:
pass
os.makedirs(tempdir, exist_ok=True)

if ignore_existing:
existing_articles = [filename_to_doi(file) for file in listdir_nohidden(tempdir)]
Expand Down Expand Up @@ -423,10 +420,7 @@ def download_updated_xml(article_file,
:return: boolean for whether update was available & downloaded
"""
doi = filename_to_doi(article_file)
try:
os.mkdir(tempdir)
except FileExistsError:
pass
os.makedirs(tempdir, exist_ok=True)
url = URL_TMP.format(doi)
articletree_remote = et.parse(url)
articleXML_remote = et.tostring(articletree_remote, method='xml', encoding='unicode')
Expand Down Expand Up @@ -693,10 +687,7 @@ def remote_proofs_direct_check(tempdir=newarticledir, article_list=None, plos_ne
:param article-list: list of uncorrected proofs to check for updates.
:return: list of all articles with updated vor
"""
try:
os.mkdir(tempdir)
except FileExistsError:
pass
os.makedirs(tempdir, exist_ok=True)
proofs_download_list = []
if article_list is None:
article_list = get_uncorrected_proofs_list()
Expand Down Expand Up @@ -866,9 +857,7 @@ def create_local_plos_corpus(corpusdir=corpusdir, rm_metadata=True):
:param rm_metadata: COMPLETE HERE
:return: None
"""
if os.path.isdir(corpusdir) is False:
os.mkdir(corpusdir)
print('Creating folder for article xml')
os.makedirs(corpusdir, exist_ok=True)
zip_date, zip_size, metadata_path = get_zip_metadata()
zip_path = download_file_from_google_drive(zip_id, local_zip, file_size=zip_size)
unzip_articles(file_path=zip_path)
Expand All @@ -885,9 +874,7 @@ def create_test_plos_corpus(corpusdir=corpusdir):
:param corpusdir: directory where the corpus is to be downloaded and extracted
:return: None
"""
if os.path.isdir(corpusdir) is False:
os.mkdir(corpusdir)
print('Creating folder for article xml')
os.makedirs(corpusdir, exist_ok=True)
zip_path = download_file_from_google_drive(test_zip_id, local_test_zip)
unzip_articles(file_path=zip_path, extract_directory=corpusdir)

Expand Down
5 changes: 1 addition & 4 deletions allofplos/samples/corpus_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,7 @@ def revisiondate_sanity_check(article_list=None, tempdir=newarticledir, director
article_list = sorted(pubdates, key=pubdates.__getitem__, reverse=True)
article_list = article_list[:30000]

try:
os.mkdir(tempdir)
except FileExistsError:
pass
os.makedirs(tempdir, exist_ok=True)
articles_different_list = []
max_value = len(article_list)
bar = progressbar.ProgressBar(redirect_stdout=True, max_value=max_value)
Expand Down
9 changes: 4 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import sys

if sys.version_info.major < 3:
sys.exit('Sorry, Python < 3.4 is not supported')
elif sys.version_info.minor < 4:
sys.exit('Sorry, Python < 3.4 is not supported')
sys.exit('Sorry, Python < 3.5 is not supported')
elif sys.version_info.minor < 5:
sys.exit('Sorry, Python < 3.5 is not supported')

here = path.abspath(path.dirname(__file__))

Expand All @@ -27,7 +27,6 @@
'Intended Audience :: Science/Research',
'Topic :: Scientific/Engineering',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
],
Expand All @@ -50,7 +49,7 @@
'tqdm==4.17.1',
'urllib3==1.22',
],
python_requires='>=3.4',
python_requires='>=3.5',
# If there are data files included in your packages that need to be
# installed, specify them here. If using Python 2.6 or less, then these
# have to be included in MANIFEST.in as well.
Expand Down