Skip to content

Commit

Permalink
Update "update_attachment_storage_bytes" to support no lock
Browse files Browse the repository at this point in the history
  • Loading branch information
noliveleger committed Nov 27, 2024
1 parent 6df6fa5 commit c207c62
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 56 deletions.
2 changes: 2 additions & 0 deletions kobo/apps/openrosa/apps/logger/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

SUBMISSIONS_SUSPENDED_HEARTBEAT_KEY = 'kobo:update_attachment_storage_bytes:heartbeat'
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
from __future__ import annotations

import time

from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand
from django.db.models import Sum, OuterRef, Subquery
from django_redis import get_redis_connection

from kobo.apps.openrosa.apps.logger.constants import (
SUBMISSIONS_SUSPENDED_HEARTBEAT_KEY
)
from kobo.apps.openrosa.apps.logger.models.attachment import Attachment
from kobo.apps.openrosa.apps.logger.models.xform import XForm
from kobo.apps.openrosa.apps.main.models.user_profile import UserProfile
from kobo.apps.openrosa.libs.utils.jsonbfield_helper import ReplaceValues


class Command(BaseCommand):

help = (
'Retroactively calculate the total attachment file storage '
'per xform and user profile'
Expand All @@ -22,6 +29,7 @@ def __init__(self, *args, **kwargs):
self._verbosity = 0
self._force = False
self._sync = False
self._redis_client = get_redis_connection()

def add_arguments(self, parser):
parser.add_argument(
Expand Down Expand Up @@ -52,10 +60,14 @@ def add_arguments(self, parser):
)

parser.add_argument(
'-l', '--skip-lock-release',
'-nl', '--no-lock',
action='store_true',
default=False,
help='Do not attempts to remove submission lock on user profiles. Default is False',
help=(
'Do not lock accounts from receiving submissions while updating '
'storage counters.\n'
'WARNING: This may result in discrepancies. The default value is False.'
)
)

def handle(self, *args, **kwargs):
Expand All @@ -65,7 +77,7 @@ def handle(self, *args, **kwargs):
self._sync = kwargs['sync']
chunks = kwargs['chunks']
username = kwargs['username']
skip_lock_release = kwargs['skip_lock_release']
no_lock = kwargs['no_lock']

if self._force and self._sync:
self.stderr.write(
Expand All @@ -89,7 +101,7 @@ def handle(self, *args, **kwargs):
'`force` option has been enabled'
)

if not skip_lock_release:
if not no_lock:
self._release_locks()

profile_queryset = self._reset_user_profile_counters()
Expand All @@ -112,57 +124,62 @@ def handle(self, *args, **kwargs):
)
continue

self._lock_user_profile(user)
if not no_lock:
self._lock_user_profile(user)

for xform in user_xforms.iterator(chunk_size=chunks):
try:
for xform in user_xforms.iterator(chunk_size=chunks):

# write out xform progress
if self._verbosity > 1:
self.stdout.write(
f"Calculating attachments for xform_id #{xform['pk']}"
f" (user {user.username})"
)
# aggregate total media file size for all media per xform
form_attachments = Attachment.objects.filter(
instance__xform_id=xform['pk'],
).aggregate(total=Sum('media_file_size'))

if form_attachments['total']:
if (
xform['attachment_storage_bytes']
== form_attachments['total']
):
if self._verbosity > 2:
self.stdout.write(
'\tSkipping xform update! '
'Attachment storage is already accurate'
self._heartbeat(user)

# write out xform progress
if self._verbosity > 1:
self.stdout.write(
f"Calculating attachments for xform_id #{xform['pk']}"
f" (user {user.username})"
)
# aggregate total media file size for all media per xform
form_attachments = Attachment.objects.filter(
instance__xform_id=xform['pk'],
).aggregate(total=Sum('media_file_size'))

if form_attachments['total']:
if (
xform['attachment_storage_bytes']
== form_attachments['total']
):
if self._verbosity > 2:
self.stdout.write(
'\tSkipping xform update! '
'Attachment storage is already accurate'
)
else:
if self._verbosity > 2:
self.stdout.write(
f'\tUpdating xform attachment storage to '
f"{form_attachments['total']} bytes"
)

XForm.all_objects.filter(
pk=xform['pk']
).update(
attachment_storage_bytes=form_attachments['total']
)

else:
if self._verbosity > 2:
self.stdout.write(
f'\tUpdating xform attachment storage to '
f"{form_attachments['total']} bytes"
self.stdout.write('\tNo attachments found')
if not xform['attachment_storage_bytes'] == 0:
XForm.all_objects.filter(
pk=xform['pk']
).update(
attachment_storage_bytes=0
)

XForm.all_objects.filter(
pk=xform['pk']
).update(
attachment_storage_bytes=form_attachments['total']
)

else:
if self._verbosity > 2:
self.stdout.write('\tNo attachments found')
if not xform['attachment_storage_bytes'] == 0:
XForm.all_objects.filter(
pk=xform['pk']
).update(
attachment_storage_bytes=0
)

# need to call `update_user_profile()` one more time outside the loop
# because the last user profile will not be up-to-date otherwise
self._update_user_profile(user)
self._update_user_profile(user)
finally:
if not no_lock:
self._release_lock(user)

if self._verbosity >= 1:
self.stdout.write('Done!')
Expand All @@ -187,6 +204,13 @@ def _get_queryset(self, profile_queryset, username):

return users.order_by('pk')

def _heartbeat(self, user: settings.AUTH_USER_MODEL):
self._redis_client.hset(
SUBMISSIONS_SUSPENDED_HEARTBEAT_KEY, mapping={
user.username: int(time.time())
}
)

def _lock_user_profile(self, user: settings.AUTH_USER_MODEL):
# Retrieve or create user's profile.
(
Expand All @@ -204,8 +228,24 @@ def _lock_user_profile(self, user: settings.AUTH_USER_MODEL):
# new submissions from coming in while the
# `attachment_storage_bytes` is being calculated.
user_profile.metadata['submissions_suspended'] = True
user_profile.metadata['attachments_counting_status'] = 'not-completed'
user_profile.save(update_fields=['metadata'])

self._heartbeat(user)

def _release_lock(self, user: settings.AUTH_USER_MODEL):
# Release any locks on the users' profile from getting submissions
if self._verbosity > 1:
self.stdout.write(f'Releasing submission lock for {user.username}…')

UserProfile.objects.filter(user_id=user.pk).update(
metadata=ReplaceValues(
'metadata',
updates={'submissions_suspended': False},
),
)
self._redis_client.hdel(SUBMISSIONS_SUSPENDED_HEARTBEAT_KEY, user.username)

def _release_locks(self):
# Release any locks on the users' profile from getting submissions
if self._verbosity > 1:
Expand Down Expand Up @@ -249,9 +289,8 @@ def _update_user_profile(self, user: settings.AUTH_USER_MODEL):
f'{user.username}’s profile'
)

# Update user's profile (and lock the related row)
# Update user's profile
updates = {
'submissions_suspended': False,
'attachments_counting_status': 'complete',
}

Expand Down
89 changes: 84 additions & 5 deletions kobo/apps/openrosa/apps/logger/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# coding: utf-8
import csv
import datetime
import logging
import time
import zipfile
from collections import defaultdict
from datetime import timedelta
Expand All @@ -11,14 +12,19 @@
from django.conf import settings
from django.core.management import call_command
from django.utils import timezone
from django_redis import get_redis_connection

from kobo.apps.kobo_auth.shortcuts import User
from kobo.apps.openrosa.libs.utils.jsonbfield_helper import ReplaceValues
from kobo.celery import celery_app
from kpi.deployment_backends.kc_access.storage import (
default_kobocat_storage as default_storage,
)
from kpi.utils.log import logging
from .constants import SUBMISSIONS_SUSPENDED_HEARTBEAT_KEY
from .models.daily_xform_submission_counter import DailyXFormSubmissionCounter
from .models import Instance, XForm
from ..main.models import UserProfile


@celery_app.task()
Expand Down Expand Up @@ -46,7 +52,10 @@ def fix_root_node_names(**kwargs):
# #### END ISSUE 242 FIX ######


@shared_task(soft_time_limit=3600, time_limit=3630)
@shared_task(
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT
)
def generate_stats_zip(output_filename):
# Limit to last month and this month
now = datetime.datetime.now()
Expand Down Expand Up @@ -121,6 +130,76 @@ def list_created_by_month(model, date_field):
zip_file.close()


@celery_app.task()
def sync_storage_counters():
call_command('update_attachment_storage_bytes', verbosity=3, sync=True)
@celery_app.task
def fix_stale_submissions_suspended_flag():
"""
Task to fix stale `submissions_suspended` flag to ensure that accounts are
not indefinitely locked, preventing users from accessing or collecting their
data.
Note:
- This task is **not** automatically included in the periodic tasks.
- If the task `sync_storage_counters` is added to the periodic tasks,
this task should also be manually added to ensure consistency
in the system's storage management and cleanup process.
"""

redis_client = get_redis_connection()
lock = redis_client.hgetall(SUBMISSIONS_SUSPENDED_HEARTBEAT_KEY)
if not lock:
return

usernames = []

for username, timestamp in lock.items():
username = username.decode()
timestamp = int(timestamp.decode())

if timestamp + settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT <= int(
time.time()
):
logging.info(
f'Removing `submission_suspended` flag on user #{username}’s profile'
)
usernames.append(username)

if usernames:
UserProfile.objects.filter(user__username__in=usernames).update(
metadata=ReplaceValues(
'metadata',
updates={'submissions_suspended': False},
),
)
redis_client.hdel(SUBMISSIONS_SUSPENDED_HEARTBEAT_KEY, *usernames)


@celery_app.task(
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT
)
def sync_storage_counters(**kwargs):
"""
Task to synchronize the "storage" counters for user profiles and their projects (XForm).
This task ensures consistency between the storage usage tracked at the profile level
and the cumulative storage used by all associated projects. The total storage usage
calculated from the projects should match the storage counter of the corresponding profile.
Note:
- This task is **not** automatically included in the periodic tasks.
- If this task is added to periodic tasks, ensure that the
`fix_stale_submissions_suspended_flag` task is also scheduled to maintain
system integrity and prevent stale data issues.
"""

# The `no_lock` option is not hard-coded when calling the command, allowing
# superusers to control the lock behaviour from the admin interface without
# requiring a redeployment.
no_lock = kwargs.get('no_lock', False)

call_command(
'update_attachment_storage_bytes',
verbosity=3,
sync=True,
no_lock=no_lock,
)
1 change: 0 additions & 1 deletion kpi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def enketo_flush_cached_preview(server_url, form_id):
response.raise_for_status()



@celery_app.task(time_limit=LIMIT_HOURS_23, soft_time_limit=LIMIT_HOURS_23)
def perform_maintenance():
"""
Expand Down

0 comments on commit c207c62

Please sign in to comment.