Skip to content

Commit

Permalink
Support KML track import created by Pentax K-1
Browse files Browse the repository at this point in the history
  • Loading branch information
jedie committed Aug 1, 2024
1 parent 15ee6b1 commit d2ed53a
Show file tree
Hide file tree
Showing 14 changed files with 402 additions and 40 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ Because this is a project and not really a reuse-able-app ;)

[comment]: <> (✂✂✂ auto generated history start ✂✂✂)

* [**dev**](https://github.com/jedie/django-for-runners/compare/v0.17.4...main)
* [v0.18.0.dev1](https://github.com/jedie/django-for-runners/compare/v0.17.4...v0.18.0.dev1)
* 2024-08-01 - Support KML track import created by Pentax K-1
* 2024-08-01 - Catch metaweather.com error
* 2024-07-31 - Project updates
* 2024-07-31 - Update requirements
* 2024-01-18 - +typeguard +manageprojects updates
Expand Down
2 changes: 1 addition & 1 deletion for_runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
Store your GPX tracks of your running (or other sports activity) in django.
"""

__version__ = '0.17.4'
__version__ = '0.18.0.dev1'
__author__ = 'Jens Diemer <[email protected]>'
9 changes: 8 additions & 1 deletion for_runners/gpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import gpxpy
from gpxpy.geo import distance as geo_distance
from gpxpy.gpx import GPX

# https://github.com/jedie/django-for-runners
from for_runners.exceptions import GpxDataError
Expand All @@ -20,12 +21,18 @@
)


def get_identifier(gpxpy_instance):
def get_identifier(gpxpy_instance: GPX) -> Identifier:
"""
:return: Identifier named tuple
"""
time_bounds = gpxpy_instance.get_time_bounds()

if not time_bounds.start_time:
raise GpxDataError("No start time found!")

if not time_bounds.end_time:
raise GpxDataError("No end time found!")

try:
first_track = gpxpy_instance.tracks[0]
except IndexError:
Expand Down
Empty file.
109 changes: 109 additions & 0 deletions for_runners/gpx_tools/kml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import dataclasses
import datetime
import logging
import re

import gpxpy.gpx
from gpxpy.gpx import GPX, GPXTrackPoint
from lxml import etree
from lxml.etree import Element


log = logging.getLogger(__name__)


def get_single_text(node: Element, xpath: str, namespaces) -> str | None:
if elements := node.xpath(xpath, namespaces=namespaces):
assert len(elements) == 1, f'Expected 1 element, got {len(elements)}'
return elements[0].text


@dataclasses.dataclass
class Coordinates:
longitude: float
latitude: float
altitude: float


def parse_coordinates(coordinates: str) -> Coordinates | None:
"""
>>> parse_coordinates('2.444563,51.052540,8.0')
Coordinates(longitude=2.444563, latitude=51.05254, altitude=8.0)
"""
match = re.match(r'\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*', coordinates)
if match:
lon, lat, alt = map(float, match.groups())
return Coordinates(longitude=lon, latitude=lat, altitude=alt)


def get_coordinates(placemark: Element, namespaces) -> Coordinates | None:
if coordinates := get_single_text(placemark, './/kml:coordinates', namespaces=namespaces):
return parse_coordinates(coordinates)


def parse_datetime(datetime_str) -> datetime.datetime | None:
dt_part, tz_part = datetime_str.rsplit(' ', 1)

try:
dt = datetime.datetime.strptime(dt_part, '%Y/%m/%d %H:%M:%S')
except ValueError:
log.exception('Failed to parse datetime string %r', datetime_str)
return None

if not tz_part.startswith('UTC'):
log.warning('Timezone not in UTC format: %r', tz_part)
return None

sign = 1 if tz_part[3] == '+' else -1
try:
hours_offset = int(tz_part[4:6])
minutes_offset = int(tz_part[7:9])
except ValueError:
log.exception('Failed to parse timezone offset %r', tz_part)
return None

tz_offset = datetime.timedelta(hours=sign * hours_offset, minutes=sign * minutes_offset)
dt = dt.replace(tzinfo=datetime.timezone(tz_offset))
return dt


def datetime_from_description(placemark: Element, namespaces):
if description := get_single_text(placemark, 'kml:description', namespaces=namespaces):
dt_str = description.partition('<br>')[0]
return parse_datetime(dt_str)


def kml2gpx(kml_file) -> GPX:
"""
Convert a KML file to a GPX object.
Notes:
* Only tested with KML files from a Pentax K-1 camera!
"""
gpx = GPX()
track = gpxpy.gpx.GPXTrack()
gpx.tracks.append(track)

segment = gpxpy.gpx.GPXTrackSegment()
track.segments.append(segment)

root = etree.parse(kml_file).getroot()
namespaces = {'kml': root.nsmap.get(None)}

for placemark in root.xpath('//kml:Placemark', namespaces=namespaces):
dt = datetime_from_description(placemark, namespaces=namespaces)
if not dt:
continue

coordinates = get_coordinates(placemark, namespaces=namespaces)
if not coordinates:
continue

point = GPXTrackPoint(
latitude=coordinates.latitude,
longitude=coordinates.longitude,
elevation=coordinates.altitude,
time=dt,
)
segment.points.append(point)

return gpx
Empty file.
84 changes: 84 additions & 0 deletions for_runners/gpx_tools/tests/test_kml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from datetime import datetime, timedelta, timezone

from bx_py_utils.test_utils.datetime import parse_dt
from django.test import SimpleTestCase
from gpxpy.gpx import GPX

from for_runners.gpx_tools.kml import Coordinates, kml2gpx, parse_coordinates, parse_datetime
from for_runners.tests.fixture_files import get_fixture_path


class KmlTestCase(SimpleTestCase):

def test_parse_coordinates(self):
self.assertEqual(
parse_coordinates('2.444563,51.052540,8.0'),
Coordinates(longitude=2.444563, latitude=51.05254, altitude=8.0),
)
self.assertEqual(
parse_coordinates('-2.444563,-51.052540,-8.0'),
Coordinates(longitude=-2.444563, latitude=-51.05254, altitude=-8.0),
)
self.assertEqual(
parse_coordinates(' 2.444563 , 51.052540 , 8.0 '),
Coordinates(longitude=2.444563, latitude=51.05254, altitude=8.0),
)
self.assertIsNone(parse_coordinates('2.444563,51.052540'))
self.assertIsNone(parse_coordinates('abc,def,ghi'))

def test_parse_datetime(self):
self.assertEqual(
parse_datetime('2024/07/21 14:30:24 UTC+01:00'),
datetime(
2024,
7,
21,
14,
30,
24,
tzinfo=timezone(timedelta(hours=1)),
),
)
self.assertEqual(
parse_datetime('2024/07/21 14:30:24 UTC-05:30'),
datetime(
2024,
7,
21,
14,
30,
24,
tzinfo=timezone(timedelta(hours=-5, minutes=-30)),
),
)

with self.assertLogs('for_runners.gpx_tools.kml', 'WARNING') as cm:
self.assertIsNone(parse_datetime('2024/07/21 14:30:24'))
self.assertIn('Failed to parse datetime string', '\n'.join(cm.output))

with self.assertLogs('for_runners.gpx_tools.kml', 'WARNING') as cm:
self.assertIsNone(parse_datetime('21/07/2024 14:30:24 UTC+01:00'))
self.assertIn('Failed to parse datetime string', '\n'.join(cm.output))

with self.assertLogs('for_runners.gpx_tools.kml', 'WARNING') as cm:
self.assertIsNone(parse_datetime('2024/07/21 14:30:24 UTC+1:00'))
self.assertIn('Failed to parse timezone offset', '\n'.join(cm.output))

def test_kml2gpx(self):
fixture_path = get_fixture_path('PentaxK1.KML')

with self.assertLogs('for_runners.gpx_tools.kml', 'WARNING'):
gpx = kml2gpx(fixture_path)
self.assertIsInstance(gpx, GPX)

first_point = gpx.tracks[0].segments[0].points[0]
self.assertEqual(first_point.latitude, 51.052540)
self.assertEqual(first_point.longitude, 2.444563)
self.assertEqual(first_point.elevation, 8.0)
self.assertEqual(first_point.time, parse_dt('2024-07-21T14:30:24+01:00'))

last_point = gpx.tracks[0].segments[0].points[-1]
self.assertEqual(last_point.latitude, 50.944859)
self.assertEqual(last_point.longitude, 1.847900)
self.assertEqual(last_point.elevation, 14.0)
self.assertEqual(last_point.time, parse_dt('2024-07-21T21:28:31+01:00'))
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


class Command(BaseCommand):
help = "Import GPS files (*.gpx)"
help = "Import GPS track files (*.gpx|*.kml) for a user"

def add_arguments(self, parser):
parser.add_argument(
Expand All @@ -27,7 +27,7 @@ def add_arguments(self, parser):
required=True,
help="The user to assign to the imported files",
)
parser.add_argument("path", help="Path to *.gpx files")
parser.add_argument("path", help="Path to *.gpx/*.kml files")

def handle(self, *args, **options):
username = options.get("username")
Expand All @@ -44,15 +44,20 @@ def handle(self, *args, **options):
path = Path(options.get("path"))
path = path.expanduser()
path = path.resolve()

if path.is_file():
self.stderr.write(f"ERROR: Given path '{path}' is a file, but must be a directory that conains the files!")
sys.exit(4)

if not path.is_dir():
self.stderr.write(f"ERROR: Given path '{path}' is not a existing directory!")
sys.exit(4)
sys.exit(5)

self.stdout.write(f"Read directory: {path}")
self.stdout.write("\n")

new_tracks = 0
for no, instance in enumerate(add_from_files(gpx_files_file_path=path, user=user), 1):
for no, instance in enumerate(add_from_files(tracks_path=path, user=user), 1):
self.stdout.write(self.style.SUCCESS("%i - Add new track: %s" % (no, instance)))
new_tracks += 1

Expand Down
55 changes: 34 additions & 21 deletions for_runners/services/gpx_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,38 @@
"""

import logging
import os

from django.db import IntegrityError, transaction

# https://github.com/jedie/django-tools
from django_tools.unittest_utils.assertments import assert_is_dir, assert_is_file
from gpxpy.gpx import GPXException
from gpxpy.gpx import GPX

# https://github.com/jedie/django-for-runners
from for_runners.exceptions import GpxDataError
from for_runners.gpx import get_identifier, parse_gpx
from for_runners.gpx_tools.kml import kml2gpx
from for_runners.models import GpxModel


log = logging.getLogger(__name__)


def add_gpx(*, gpx_content, user):
def add_gpx(*, gpx: GPX, user) -> GpxModel | None:
"""
Create a new for_runners.models.GpxModel entry
:param gpx_content: String content of the new gpx file
:return: GpxModel instance
"""
try:
gpxpy_instance = parse_gpx(gpx_content)
except GPXException as err:
log.exception(f"Invalid GPX Data: {err}")
return

identifier = get_identifier(gpxpy_instance)
identifier = get_identifier(gpx)

try:
instance = GpxModel.objects.get_by_identifier(identifier)
except GpxModel.DoesNotExist:
log.debug("Create new track for user: %s", user)
gpx_content = gpx.to_xml()
instance = GpxModel.objects.create(gpx=gpx_content, tracked_by=user)
return instance
else:
Expand All @@ -50,32 +47,48 @@ def add_gpx(*, gpx_content, user):
return


def add_from_file(*, gpx_file_file_path, user):
def add_from_file(*, track_path, user):
"""
Read content from gpx file <gpx_files_file_path> and add to <user>
"""
assert_is_file(gpx_file_file_path)
assert_is_file(track_path)

file_suffix = track_path.suffix.lower()
log.info(f'Add track file: {track_path} ({file_suffix=})')
if file_suffix == '.kml':
gpx: GPX = kml2gpx(track_path)
elif file_suffix == '.gpx':
gpx_content = track_path.read_text()
gpx: GPX = parse_gpx(gpx_content)
else:
raise GpxDataError(f"Unknown file extension: {track_path}")

return add_gpx(gpx=gpx, user=user)

log.info(f'Add GPX file: {gpx_file_file_path}')
with gpx_file_file_path.open("r") as f:
gpx_content = f.read()

return add_gpx(gpx_content=gpx_content, user=user)
def multi_glob(path, *, extensions: tuple):
with os.scandir(path) as it:
for entry in it:
if entry.is_file():
file_path = path / entry.name
if file_path.suffix.lower() in extensions:
yield file_path


def add_from_files(*, gpx_files_file_path, user, skip_errors=True):
def add_from_files(*, tracks_path, user, skip_errors=True):
"""
Add all *.gpx files from <gpx_files_file_path> to <user>
"""
assert_is_dir(gpx_files_file_path)
assert_is_dir(tracks_path)

tracks = multi_glob(tracks_path, extensions=('.gpx', '.kml'))

gpx_files = gpx_files_file_path.glob("**/*.gpx")
for gpx_file_file_path in sorted(gpx_files):
for track_path in sorted(tracks):
try:
with transaction.atomic():
instance = add_from_file(gpx_file_file_path=gpx_file_file_path, user=user)
instance = add_from_file(track_path=track_path, user=user)
except (IntegrityError, GpxDataError) as err:
log.error("Skip .gpx file: %s", err)
log.exception("Skip track file: %s", err)
if not skip_errors:
raise
else:
Expand Down
Loading

0 comments on commit d2ed53a

Please sign in to comment.