Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updates to device cache rebuild and plugin refresh #366

Merged
merged 2 commits into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 55 additions & 36 deletions synse_server/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,55 @@ async def update_device_cache() -> None:
logger.debug(_('refreshing plugins prior to updating device cache'))
plugin.manager.refresh()

# A temporary dicts used to collect the data for rebuilding the device cache.
alias_map = {}
tags_map = {}

for p in plugin.manager:
if not p.active:
logger.debug(
_('plugin not active, will not get its devices'),
plugin=p.tag, plugin_id=p.id,
)
continue

try:
with p as client:
device_count = 0
for device in client.devices(): # all devices
# Get updates for device alias
if device.alias:
if device.alias in alias_map:
logger.error(
'alias already exists... not updating alias map',
alias=device.alias, device=device.id,
)
else:
alias_map[device.alias] = device

# Get updates for device tags
for tag in device.tags:
key = synse_grpc.utils.tag_string(tag)
tag_devices = tags_map.get(key)
if tag_devices:
tag_devices.append(device)
else:
tags_map[key] = [device]

device_count += 1
logger.debug(
_('got devices from plugin'),
plugin=p.tag, plugin_id=p.id, device_count=device_count,
)

except grpc.RpcError as e:
logger.warning(_('failed to get device(s)'), plugin=p.tag, plugin_id=p.id, error=e)
continue
except Exception:
logger.exception(
_('unexpected error when updating devices for plugin'), plugin_id=p.id)
raise

async with device_cache_lock:
# IMPORTANT (etd): `clear` must be called with the namespace. It seems weird
# to require the namespace since the cache instance has an associated namespace,
Expand All @@ -162,43 +211,13 @@ async def update_device_cache() -> None:
# Opened an issue to track.
# https://github.com/argaen/aiocache/issues/479
await device_cache.clear(NS_DEVICE)
await alias_cache.clear(NS_ALIAS)

for p in plugin.manager:
if not p.active:
logger.debug(
_('plugin not active, will not get its devices'),
plugin=p.tag, plugin_id=p.id,
)
continue
try:
with p as client:
device_count = 0
for device in client.devices():
# Update the alias cache if the device has an alias.
if device.alias:
await add_alias(device.alias, device)

# Update the device cache, mapping each tag to the device.
for tag in device.tags:
key = synse_grpc.utils.tag_string(tag)
val = await device_cache.get(key)
if val is None:
await device_cache.set(key, [device])
else:
await device_cache.set(key, val + [device])
device_count += 1
logger.debug(
_('got devices from plugin'),
plugin=p.tag, plugin_id=p.id, device_count=device_count,
)

except grpc.RpcError as e:
logger.warning(_('failed to get device(s)'), plugin=p.tag, plugin_id=p.id, error=e)
continue
except Exception:
logger.exception(
_('unexpected error when updating devices for plugin'), plugin_id=p.id)
raise
for k, v in alias_map.items():
await add_alias(k, v)

for k, v in tags_map.items():
await device_cache.set(k, v)


async def get_device(device_id: str) -> Union[api.V3Device, None]:
Expand Down
77 changes: 47 additions & 30 deletions synse_server/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@ class PluginManager:
of the manager should operate on the same plugin state. The PluginManager
is also iterable, where iterating over the manager will provide the
snapshot of currently registered plugins.

Attributes:
is_refreshing: A state flag determining whether the manager is
currently performing a plugin refresh. Since plugin refresh
may be started via async task or API call, this state variable
is used to prevent two refreshes from happening simultaneously.
"""

plugins: Dict[str, 'Plugin'] = {}

def __init__(self):
self.is_refreshing = False

def __iter__(self) -> 'PluginManager':
self._snapshot = list(self.plugins.values())
self._idx = 0
Expand Down Expand Up @@ -179,36 +188,44 @@ def refresh(self) -> None:
initialization. New plugins may only be added at runtime via plugin
discovery mechanisms.
"""
logger.debug(_('refreshing plugin manager'))
start = time.time()

for address, protocol in self.load():
try:
self.register(address=address, protocol=protocol)
except Exception as e:
# Do not raise. This could happen if we can't communicate with
# the configured plugin. Future refreshes will attempt to re-register
# in this case. Log the failure and continue trying to register
# any remaining plugins.
logger.warning(
_('failed to register configured plugin - will attempt re-registering later'),
address=address, protocol=protocol, error=e,
)
continue

for address, protocol in self.discover():
try:
self.register(address=address, protocol=protocol)
except Exception as e:
# Do not raise. This could happen if we can't communicate with
# the configured plugin. Future refreshes will attempt to re-register
# in this case. Log the failure and continue trying to register
# any remaining plugins.
logger.warning(
_('failed to register discovered plugin - will attempt re-registering later'),
address=address, protocol=protocol, error=e,
)
continue
if self.is_refreshing:
logger.debug(_('manager is already refreshing'))
return

try:
self.is_refreshing = True
logger.debug(_('refreshing plugin manager'))
start = time.time()

for address, protocol in self.load():
try:
self.register(address=address, protocol=protocol)
except Exception as e:
# Do not raise. This could happen if we can't communicate with
# the configured plugin. Future refreshes will attempt to re-register
# in this case. Log the failure and continue trying to register
# any remaining plugins.
logger.warning(
_('failed to register configured plugin - will attempt re-registering later'), # noqa
address=address, protocol=protocol, error=e,
)
continue

for address, protocol in self.discover():
try:
self.register(address=address, protocol=protocol)
except Exception as e:
# Do not raise. This could happen if we can't communicate with
# the configured plugin. Future refreshes will attempt to re-register
# in this case. Log the failure and continue trying to register
# any remaining plugins.
logger.warning(
_('failed to register discovered plugin - will attempt re-registering later'), # noqa
address=address, protocol=protocol, error=e,
)
continue
finally:
self.is_refreshing = False

logger.debug(
_('plugin manager refresh complete'),
Expand Down