Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: use a shared loop between sanic and asyncio components #364

Merged
merged 1 commit into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ attrs==19.3.0 # via aiohttp, pytest
chardet==3.0.4 # via aiohttp
coverage==5.0.3 # via pytest-cov
idna-ssl==1.1.0 # via aiohttp
idna==2.8 # via idna-ssl, yarl
idna==2.9 # via idna-ssl, yarl
importlib-metadata==1.5.0 # via pluggy, pytest
more-itertools==8.2.0 # via pytest
multidict==4.7.4 # via aiohttp, yarl
Expand All @@ -27,4 +27,4 @@ six==1.14.0 # via packaging
typing-extensions==3.7.4.1 # via aiohttp
wcwidth==0.1.8 # via pytest
yarl==1.4.2 # via aiohttp
zipp==2.1.0 # via importlib-metadata
zipp==3.0.0 # via importlib-metadata
18 changes: 9 additions & 9 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ cachetools==4.0.0 # via google-auth
certifi==2019.11.28 # via httpx, kubernetes, requests
chardet==3.0.4 # via httpx, requests
contextvars==2.4 # via sniffio
google-auth==1.11.0 # via kubernetes
grpcio==1.26.0
google-auth==1.11.2 # via kubernetes
grpcio==1.27.2
h11==0.8.1 # via httpx
h2==3.1.1 # via httpx
h2==3.2.0 # via httpx
hpack==3.0.0 # via h2
hstspreload==2020.1.30 # via httpx
httptools==0.0.13 # via sanic
hstspreload==2020.2.20 # via httpx
httptools==0.1.1 # via sanic
httpx==0.9.3 # via sanic
hyperframe==5.2.0 # via h2
idna==2.8 # via httpx, requests
idna==2.9 # via httpx, requests
immutables==0.11 # via contextvars
kubernetes==10.0.1
multidict==4.7.4 # via sanic
oauthlib==3.1.0 # via requests-oauthlib
prometheus-client==0.7.1
protobuf==3.11.2 # via synse-grpc
protobuf==3.11.3 # via synse-grpc
pyasn1-modules==0.2.8 # via google-auth
pyasn1==0.4.8 # via pyasn1-modules, rsa
python-dateutil==2.8.1 # via kubernetes
pyyaml==5.3
requests-oauthlib==1.3.0 # via kubernetes
requests==2.22.0 # via kubernetes, requests-oauthlib
requests==2.23.0 # via kubernetes, requests-oauthlib
rfc3986==1.3.2 # via httpx
rsa==4.0 # via google-auth
sanic==19.12.2
Expand All @@ -48,4 +48,4 @@ websocket-client==0.57.0 # via kubernetes
websockets==8.1

# The following packages are considered to be unsafe in a requirements file:
# setuptools==45.1.0 # via google-auth, kubernetes, protobuf
# setuptools==45.2.0 # via google-auth, kubernetes, protobuf
6 changes: 6 additions & 0 deletions synse_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
"""

import argparse
import asyncio
import os
import sys

import uvloop

import synse_server
from synse_server.server import Synse


def main() -> None:
uvloop.install()
asyncio.set_event_loop(uvloop.new_event_loop())

parser = argparse.ArgumentParser(
description='API server for the Synse platform',
)
Expand Down
183 changes: 119 additions & 64 deletions synse_server/api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ async def version(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.version(),
)
try:
return utils.http_json_response(
await cmd.version(),
)
except Exception:
logger.exception('failed to get version info')
raise


@v3.route('/config')
Expand All @@ -87,9 +91,13 @@ async def config(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.config(),
)
try:
return utils.http_json_response(
await cmd.config(),
)
except Exception:
logger.exception('failed to get server config')
raise


@v3.route('/plugin')
Expand All @@ -104,11 +112,15 @@ async def plugins(request: Request) -> HTTPResponse:

refresh = request.args.get('refresh', 'false').lower() == 'true'

return utils.http_json_response(
await cmd.plugins(
refresh=refresh,
),
)
try:
return utils.http_json_response(
await cmd.plugins(
refresh=refresh,
),
)
except Exception:
logger.exception('failed to get plugins')
raise


@v3.route('/plugin/<plugin_id>')
Expand All @@ -125,9 +137,13 @@ async def plugin(request: Request, plugin_id: str) -> HTTPResponse:
"""
log_request(request, id=plugin_id)

return utils.http_json_response(
await cmd.plugin(plugin_id),
)
try:
return utils.http_json_response(
await cmd.plugin(plugin_id),
)
except Exception:
logger.exception('failed to get plugin info', id=plugin_id)
raise


@v3.route('/plugin/health')
Expand All @@ -140,9 +156,13 @@ async def plugin_health(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.plugin_health(),
)
try:
return utils.http_json_response(
await cmd.plugin_health(),
)
except Exception:
logger.exception('failed to get plugin health')
raise


@v3.route('/scan')
Expand Down Expand Up @@ -203,14 +223,18 @@ async def scan(request: Request) -> HTTPResponse:
)
sort_keys = param_sort[0]

return utils.http_json_response(
await cmd.scan(
ns=namespace,
tag_groups=tag_groups,
force=force,
sort=sort_keys,
),
)
try:
return utils.http_json_response(
await cmd.scan(
ns=namespace,
tag_groups=tag_groups,
force=force,
sort=sort_keys,
),
)
except Exception:
logger.exception('failed to get devices (scan)')
raise


@v3.route('/tags')
Expand Down Expand Up @@ -242,12 +266,16 @@ async def tags(request: Request) -> HTTPResponse:

include_ids = request.args.get('ids', 'false').lower() == 'true'

return utils.http_json_response(
await cmd.tags(
namespaces,
with_id_tags=include_ids,
),
)
try:
return utils.http_json_response(
await cmd.tags(
namespaces,
with_id_tags=include_ids,
),
)
except Exception:
logger.exception('failed to get device tags')
raise


@v3.route('/info/<device_id>')
Expand All @@ -264,9 +292,13 @@ async def info(request: Request, device_id: str) -> HTTPResponse:
"""
log_request(request, id=device_id)

return utils.http_json_response(
await cmd.info(device_id),
)
try:
return utils.http_json_response(
await cmd.info(device_id),
)
except Exception:
logger.exception('failed to get device info', id=device_id)
raise


@v3.route('/read')
Expand Down Expand Up @@ -308,12 +340,16 @@ async def read(request: Request) -> HTTPResponse:
for group in param_tags:
tag_groups.append(group.split(','))

return utils.http_json_response(
await cmd.read(
ns=namespace,
tag_groups=tag_groups,
),
)
try:
return utils.http_json_response(
await cmd.read(
ns=namespace,
tag_groups=tag_groups,
),
)
except Exception:
logger.exception('failed to read device(s)', namespace=namespace, tag_groups=tag_groups)
raise


@v3.route('/readcache')
Expand Down Expand Up @@ -365,9 +401,8 @@ async def response_streamer(response):
try:
async for reading in cmd.read_cache(start, end):
await response.write(ujson.dumps(reading) + '\n')
except Exception as e:
logger.error('failure when streaming cached readings')
logger.exception(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

except Exception:
logger.exception('failure when streaming cached readings')

return stream(response_streamer, content_type='application/json; charset=utf-8')

Expand All @@ -390,9 +425,13 @@ async def read_device(request: Request, device_id: str) -> HTTPResponse:
"""
log_request(request, id=device_id)

return utils.http_json_response(
await cmd.read_device(device_id),
)
try:
return utils.http_json_response(
await cmd.read_device(device_id),
)
except Exception:
logger.exception('failed to read device', id=device_id)
raise


@v3.route('/write/<device_id>', methods=['POST'])
Expand Down Expand Up @@ -432,12 +471,16 @@ async def async_write(request: Request, device_id: str) -> HTTPResponse:
'invalid json: key "action" is required in payload, but not found'
)

return utils.http_json_response(
await cmd.write_async(
device_id=device_id,
payload=data,
),
)
try:
return utils.http_json_response(
await cmd.write_async(
device_id=device_id,
payload=data,
),
)
except Exception:
logger.exception('failed to write asynchronously', id=device_id, payload=data)
raise


@v3.route('/write/wait/<device_id>', methods=['POST'])
Expand Down Expand Up @@ -477,12 +520,16 @@ async def sync_write(request: Request, device_id: str) -> HTTPResponse:
'invalid json: key "action" is required in payload, but not found'
)

return utils.http_json_response(
await cmd.write_sync(
device_id=device_id,
payload=data,
),
)
try:
return utils.http_json_response(
await cmd.write_sync(
device_id=device_id,
payload=data,
),
)
except Exception:
logger.exception('failed to write synchronously', id=device_id, payload=data)
raise


@v3.route('/transaction')
Expand All @@ -495,9 +542,13 @@ async def transactions(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.transactions(),
)
try:
return utils.http_json_response(
await cmd.transactions(),
)
except Exception:
logger.exception('failed to list transactions')
raise


@v3.route('/transaction/<transaction_id>')
Expand All @@ -514,9 +565,13 @@ async def transaction(request: Request, transaction_id: str) -> HTTPResponse:
"""
log_request(request, id=transaction_id)

return utils.http_json_response(
await cmd.transaction(transaction_id),
)
try:
return utils.http_json_response(
await cmd.transaction(transaction_id),
)
except Exception:
logger.exception('failed to get transaction info', id=transaction_id)
raise


@v3.route('/device/<device_id>', methods=['GET', 'POST'])
Expand Down
9 changes: 6 additions & 3 deletions synse_server/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from structlog import get_logger
from synse_grpc import api

from synse_server import config, plugin
from synse_server import config, loop, plugin
from synse_server.i18n import _

logger = get_logger()
Expand All @@ -35,8 +35,8 @@
namespace=NS_ALIAS,
)

device_cache_lock = asyncio.Lock()
alias_cache_lock = asyncio.Lock()
device_cache_lock = asyncio.Lock(loop=loop.synse_loop)
alias_cache_lock = asyncio.Lock(loop=loop.synse_loop)


async def get_transaction(transaction_id: str) -> dict:
Expand Down Expand Up @@ -216,7 +216,10 @@ async def get_device(device_id: str) -> Union[api.V3Device, None]:
# empty, return the first element of the list - there should only be
# one element; otherwise, return None.
if result:
logger.debug(_('got device from cache'))
device = result[0]
else:
logger.debug(_('failed to lookup device from cache'), id=device_id)

# No device was found from an ID lookup. Try looking up the ID in the
# alias cache.
Expand Down
Loading