Skip to content

Commit e8dbc5e

Browse files
committed
Add KmlPlaceNameTag and refactor ExifTag a bit
1 parent a4d4d25 commit e8dbc5e

3 files changed

Lines changed: 208 additions & 40 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ isodate = "^0.6.1"
3636
gpxpy = "1.5.0"
3737
Pint = "^0.21.0"
3838
geopy = "^2.4.1"
39+
fastkml = "^1.1.0"
3940

4041
[tool.poetry.group.dev.dependencies]
4142
pytest = "^8.3.5"

tempren/tags/geo.py

Lines changed: 139 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,144 @@
1-
from tempren.primitives import Tag
1+
import itertools
2+
import logging
3+
from dataclasses import dataclass
4+
from pathlib import Path
5+
from typing import List, Optional
26

7+
import fastkml.kml
8+
from geopy import Point
9+
from geopy.distance import great_circle
310

4-
class PlaceNameTag(Tag):
5-
"""Convert"""
11+
from tempren.primitives import File, Tag
612

7-
require_context = False
13+
14+
@dataclass
15+
class Place:
16+
name: str
17+
latitude: float
18+
longitude: float
19+
radius: Optional[float]
20+
21+
@property
22+
def position(self) -> Point:
23+
return Point(self.latitude, self.longitude)
24+
25+
def __str__(self):
26+
return f"{self.name}, {self.latitude}, {self.longitude}, {self.radius}"
27+
28+
29+
class KmlPlaceNameTag(Tag):
30+
"""Convert GPS latitude/longitude coordinates into names based on the file"""
31+
32+
require_context = True
33+
34+
log: logging.Logger
35+
36+
# TODO: Introduce global cache for storing data between different files processing
37+
places: List[Place]
38+
default: str
39+
40+
def __init__(self):
41+
self.log = logging.getLogger(self.__class__.__name__)
42+
43+
def configure(
44+
self,
45+
kml: str,
46+
use_look_at: bool = True,
47+
use_folders: bool = True,
48+
default: str = "",
49+
) -> None:
50+
"""
51+
:param kml: Path to the KML file containing the places
52+
:param use_look_at: Use LookAt attributes (instead of Point) of the placemark when looking for best matches
53+
:param use_folders: Add folder itself (using its LookAt) as POI
54+
:param default: Default value used when there is no matching POI
55+
Given coordinates, select best matching (closest) placemark and return its name.
56+
When placemarks are grouped into folders, returned name will be prefixed with it, i.e.
57+
FolderName/PlacemarkName
58+
"""
59+
60+
if not Path(kml).exists():
61+
raise FileNotFoundError(kml)
62+
63+
self.places = self._load_file(kml, use_look_at, use_folders)
64+
self.default = default
65+
66+
def _load_file(
67+
self, kml_path: str, use_look_at: bool, use_folders: bool
68+
) -> List[Place]:
69+
self.log.debug("Loading %s", kml_path)
70+
kml_file = fastkml.kml.KML.parse(kml_path)
71+
72+
def _collect_places(kml, prefix: List[str]) -> List[Place]:
73+
if isinstance(kml, fastkml.containers.Folder):
74+
if use_folders and kml.view is not None:
75+
folder_place = Place(
76+
"/".join(prefix + [kml.name]),
77+
kml.view.latitude,
78+
kml.view.longitude,
79+
radius=kml.view.range,
80+
)
81+
prefix = prefix + [kml.name]
82+
return [folder_place] + list(
83+
itertools.chain.from_iterable(
84+
[_collect_places(f, prefix) for f in kml.features]
85+
)
86+
)
87+
prefix = prefix + [kml.name]
88+
elif isinstance(kml, fastkml.containers.Placemark):
89+
if use_look_at and kml.view is not None:
90+
return [
91+
Place(
92+
"/".join(prefix + [kml.name]),
93+
kml.view.latitude,
94+
kml.view.longitude,
95+
radius=kml.view.range,
96+
)
97+
]
98+
else:
99+
latitude = kml.kml_geometry.kml_coordinates.coords[0][0]
100+
longitude = kml.kml_geometry.kml_coordinates.coords[0][1]
101+
radius = kml.view.range if kml.view is not None else None
102+
return [
103+
Place(
104+
"/".join(prefix + [kml.name]),
105+
latitude,
106+
longitude,
107+
radius=radius,
108+
)
109+
]
110+
111+
if hasattr(kml, "features"):
112+
return list(
113+
itertools.chain.from_iterable(
114+
[_collect_places(f, prefix) for f in kml.features]
115+
)
116+
)
117+
return []
118+
119+
places = _collect_places(kml_file, [])
120+
121+
self.log.debug("Found %d places:", len(places))
122+
for place in places:
123+
self.log.debug("%s", place)
124+
125+
return places
8126

9127
def process(self, file: File, context: Optional[str]) -> str:
10-
assert context is None
11-
return _calculate_hash(hashlib.md5(), file.absolute_path, CHUNK_SIZE)
128+
assert context is not None
129+
coordinates = Point.from_string(context)
130+
place = self._find_best_match(self.places, coordinates)
131+
if place is None:
132+
return self.default
133+
return place.name
134+
135+
@staticmethod
136+
def _find_best_match(places: List[Place], point: Point) -> Optional[Place]:
137+
gc = great_circle()
138+
min_distance_place: Optional[Place] = None
139+
for place in places:
140+
if min_distance_place is None or gc.measure(
141+
place.position, point
142+
) < gc.measure(min_distance_place.position, point):
143+
min_distance_place = place
144+
return min_distance_place

tempren/tags/image.py

Lines changed: 68 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import piexif
77
import PIL
8+
from geopy import Point
89
from piexif import TAGS
910
from piexif import TYPES as TAG_TYPES
1011
from PIL.Image import Image
@@ -122,48 +123,62 @@ def extract_metadata(self, image: Image) -> bool:
122123
raise NotImplementedError()
123124

124125

126+
def extract_exif_value(exif_dict, tag_name: str):
127+
tag_id, tag_type = tag_name_to_id_type(tag_name)
128+
for src in exif_dict.values():
129+
if isinstance(src, dict) and tag_id in src:
130+
tag_value = src[tag_id]
131+
return convert_tag_value(tag_type, tag_value)
132+
else:
133+
raise MissingMetadataError()
134+
135+
136+
def tag_name_to_id_type(tag_name: str) -> Tuple[int, int]:
137+
for _, tags in TAGS.items():
138+
for tag_id, description in tags.items():
139+
if description["name"] == tag_name:
140+
return tag_id, description["type"]
141+
raise ValueError(f"Could not find tag id for '{tag_name}'")
142+
143+
144+
def convert_tag_value(tag_type, tag_value):
145+
if tag_type in (TAG_TYPES.Rational, TAG_TYPES.SRational):
146+
if isinstance(tag_value[0], tuple):
147+
if len(tag_value) == 3:
148+
# We are probably dealing with GPS coordinates
149+
(
150+
degrees,
151+
minutes,
152+
seconds,
153+
) = (convert_tag_value(tag_type, v) for v in tag_value)
154+
return f"{degrees}°{minutes}{seconds}″"
155+
return " ".join(str(convert_tag_value(tag_type, v)) for v in tag_value)
156+
else:
157+
if tag_value[1] == 1:
158+
return tag_value[0]
159+
else:
160+
return tag_value[0] / tag_value[1]
161+
if tag_type == TAG_TYPES.Ascii:
162+
return tag_value.decode("ascii")
163+
return tag_value
164+
165+
125166
class ExifTag(Tag):
126167
"""Extract value of any EXIF tag"""
127168

128-
tag_id: int
129-
tag_type: int
169+
tag_name: str
130170

131171
def configure(self, tag_name: str): # type: ignore
132172
"""
133173
:param tag_name: name of the tag to extract (e.g. 'FocalLength', 'DateTime', 'FNumber')
134174
"""
135175
# TODO: generate list of supported tags dynamically
136176
assert tag_name, "expected non empty tag name"
137-
self.tag_id, self.tag_type = self._tag_name_to_id_type(tag_name)
138-
139-
@staticmethod
140-
def _tag_name_to_id_type(tag_name: str) -> Tuple[int, int]:
141-
for _, tags in TAGS.items():
142-
for tag_id, description in tags.items():
143-
if description["name"] == tag_name:
144-
return tag_id, description["type"]
145-
raise ValueError(f"Could not find tag id for '{tag_name}'")
177+
self.tag_name = tag_name
146178

147179
def process(self, file: File, context: Optional[str]) -> Any:
148180
exif_dict = piexif.load(str(file.absolute_path))
149-
for src in exif_dict.values():
150-
if isinstance(src, dict) and self.tag_id in src:
151-
return self._extract_value(self.tag_type, src[self.tag_id])
152-
else:
153-
raise MissingMetadataError()
154-
155-
def _extract_value(self, tag_type, tag_value):
156-
if tag_type in (TAG_TYPES.Rational, TAG_TYPES.SRational):
157-
if isinstance(tag_value[0], tuple):
158-
return " ".join(str(self._extract_value(v)) for v in tag_value)
159-
else:
160-
if tag_value[1] == 1:
161-
return tag_value[0]
162-
else:
163-
return round(tag_value[0] / tag_value[1], 1)
164-
if tag_type == TAG_TYPES.Ascii:
165-
return tag_value.decode("ascii")
166-
return tag_value
181+
return extract_exif_value(exif_dict, self.tag_name)
167182

168183

169184
_tag_type_map = {
@@ -195,13 +210,32 @@ class ResolutionTagAlias(TagAlias):
195210
"""%Image.Width()x%Image.Height()"""
196211

197212

198-
class GpsPositionTag(ExifTag):
213+
class GpsPositionTag(Tag):
199214
"""Latitude and longitude of place where photo was taken"""
200215

201-
def configure(self):
202-
self.latitude_tag_id, self.latitude_tag_type = self._tag_name_to_id_type(
203-
"GpsLatitude"
204-
)
216+
require_context = False
217+
218+
use_decimal: bool
219+
220+
def configure(self, decimal: bool = True):
221+
"""
222+
:param decimal: Use simpler decimal notation instead of degrees-minutes-seconds
223+
"""
224+
self.use_decimal = decimal
225+
226+
def process(self, file: File, context: Optional[str]) -> Any:
227+
assert context is None
228+
229+
exif_dict = piexif.load(str(file.absolute_path))
230+
latitude = extract_exif_value(exif_dict, "GPSLatitude")
231+
latitude_ref = extract_exif_value(exif_dict, "GPSLatitudeRef")
232+
longitude = extract_exif_value(exif_dict, "GPSLongitude")
233+
longitude_ref = extract_exif_value(exif_dict, "GPSLongitudeRef")
234+
235+
degrees_notation = f"{latitude}{latitude_ref}, {longitude}{longitude_ref}"
236+
if self.use_decimal:
237+
return Point.from_string(degrees_notation).format_decimal()
238+
return degrees_notation
205239

206240

207241
class HasGpsPositionTagAlias(TagAlias):

0 commit comments

Comments
 (0)