Team LESL: a single Dolfin pivot for city-wide traffic observations, fanning out to Smart Data Models JSON-LD and DATEX II v3 XML, with a GeoJSON view on the side. Same input, three audiences, zero drift between formats.
A CSV of TomTom city-aggregated traffic indicators for Porto. Each row is a snapshot of the whole-city state at a given timestamp. No road segment, no per-link decomposition: this is a KPI feed, not a road network model.
UpdateTimeUTC is exported as MM:SS.s
only, with no date part. We pass it through as an opaque
source-side timestamp string. A future iteration should pair
the dataset with the capture date, or read full ISO datetimes
directly from the TomTom API.
SDM's TrafficFlowObserved targets per-lane or
per-segment observations (laneId,
refRoadSegment). Our records are city-wide
summaries. The canonical model accepts both shapes by
attaching the observation to a typed City rather
than a road segment.
Eight numeric KPIs per row: JamsDelay,
TrafficIndexLive, JamsLengthInKms,
JamsCount, TrafficIndexWeekAgo,
TravelTimeLivePer10KmsMins,
TravelTimeHistoricPer10KmsMins,
MinsDelay. The canonical model preserves all eight.
Public mobility platforms in Europe expect DATEX II; smart city stacks (FIWARE, OASC) expect Smart Data Models JSON-LD. UC4 is exactly about not picking one and losing the other.
TrafficFlowObserved exists, with dateObserved, intensity, occupancy, averageVehicleSpeed, congested...refRoadSegment / laneId as the typical anchordateObserved ✓, congested can be derived from trafficIndex, but jamsLengthKm / trafficIndex / jamsCount are not SDM attributespayloadPublication[xsi:type=MeasuredDataPublication] is the right shape for periodic measured valuessiteMeasurements bundles one site's readings; basicData[xsi:type=TravelTimeValue / TrafficConcentration / DelayValue / NumberOfIncidents] carries the typed measurements
Instead of mapping TomTom directly to either format (and then
re-mapping when the other consumer asks), we map once to a
small canonical TrafficObservation in Dolfin, and
derive both serializations from it. SDM JSON-LD and
DATEX II XML stay in sync by construction. The path
to a third consumer (GTFS-RT? OCIT? a proprietary integrator?)
is one new writer file.
package <http://mimathon.askem.eu/uc4/traffic>: dolfin_version "1" version "0.1.0" author "LESL (Lea, Eliott, Sattisvar & Louis), Dolfin & Askem" description "Canonical model for city-wide traffic observations, MIMathon Porto 2026 use case 04. Aggregated KPIs designed for dual-format output: Smart Data Models JSON-LD and DATEX II XML, from one authoritative pivot." concept City: has name: one string has countryCode: one string has latitude: optional float has longitude: optional float concept TrafficObservation: has localId: one string has city: one City has observedAt: one string has trafficIndex: optional float has trafficIndexWeekAgo: optional float has observedAtWeekAgo: optional string has jamsDelaySeconds: optional float has jamsLengthKm: optional float has jamsCount: optional int has travelTimePer10kmMin: optional float has historicTravelTimePer10kmMin: optional float has delayMin: optional float has source: optional string
TrafficObservation hangs off a typed City rather than a road segment, matching city-wide aggregates without breaking per-segment cases (a future variant can add refRoadSegment)observedAt as a string (no native date in Dolfin), observedAtWeekAgo for the comparison baseline that SDM doesn't natively modelsource at the record level (here: "TomTom") to preserve provenance through harmonization| Source attribute | Source example | Canonical attribute | Transformation |
|---|---|---|---|
| Country | PRT | City.countryCode | uppercase ISO 3166-1 alpha-3 |
| City | porto | City.name | title-case, with lat/lon attached from known-city table |
| UpdateTimeUTC | 01:30.0 | TrafficObservation.observedAt | preserved as opaque source-side timestamp |
| UpdateTimeUTCWeekAgo | 01:30.0 | TrafficObservation.observedAtWeekAgo | preserved |
| TrafficIndexLive | 0 | TrafficObservation.trafficIndex | float |
| TrafficIndexWeekAgo | 0 | TrafficObservation.trafficIndexWeekAgo | float |
| JamsDelay | 3.3 | TrafficObservation.jamsDelaySeconds | float, seconds |
| JamsLengthInKms | 0.2 | TrafficObservation.jamsLengthKm | float, kilometres |
| JamsCount | 1 | TrafficObservation.jamsCount | int |
| TravelTimeLivePer10KmsMins | 11.247 | TrafficObservation.travelTimePer10kmMin | float, minutes per 10 km |
| TravelTimeHistoricPer10KmsMins | 11.350 | TrafficObservation.historicTravelTimePer10kmMin | float |
| MinsDelay | -0.103 | TrafficObservation.delayMin | float, minutes (negative possible) |
| (constant) | TomTom | TrafficObservation.source | provenance tag |
Same architecture as UC1/UC2 (canonical model + adapter), plus a second output writer for DATEX II. Adding a third format (GTFS-RT, OCIT, NeTEx, your own) is one new file.
python -m harmonize_traffic \
--adapter tomtom \
--input ../uc4-traffic-tomtom.csv \
--output ../out/traffic.jsonld \
--datex2 ../out/traffic.datex2.xml \
--geojson ../out/traffic.geojson \
--base-id "http://mimathon.askem.eu/uc4/traffic/"
Same first record (TomTom CSV row), shown as raw source, as
canonical JSON-LD, and as the DATEX II siteMeasurements
fragment derived from the same canonical record.
{
"Country": "PRT",
"City": "porto",
"UpdateTimeUTC": "01:30.0",
"JamsDelay": "3.3",
"TrafficIndexLive": "0",
"JamsLengthInKms": "0.2",
"JamsCount": "1",
"TrafficIndexWeekAgo": "0",
"UpdateTimeUTCWeekAgo": "01:30.0",
"TravelTimeLivePer10KmsMins": "11.24699163758936",
"TravelTimeHistoricPer10KmsMins": "11.350380364362442",
"MinsDelay": "-0.103388727"
}
{
"localId": "PRT-Porto-00001",
"city": {
"@type": "City",
"name": "Porto",
"countryCode": "PRT",
"latitude": 41.1496,
"longitude": -8.6109
},
"observedAt": "01:30.0",
"trafficIndex": 0.0,
"trafficIndexWeekAgo": 0.0,
"observedAtWeekAgo": "01:30.0",
"jamsDelaySeconds": 3.3,
"jamsLengthKm": 0.2,
"jamsCount": 1,
"travelTimePer10kmMin": 11.24699163758936,
"historicTravelTimePer10kmMin": 11.350380364362442,
"delayMin": -0.103388727,
"source": "TomTom",
"@id": "http://mimathon.askem.eu/uc4/traffic/PRT-Porto-00001",
"@type": "TrafficObservation"
}
<siteMeasurements>
<measurementSiteReference id="PRT-Porto-aggregate" version="1.0"/>
<measurementTimeDefault>01:30.0</measurementTimeDefault>
<measuredValue index="1">
<basicData xsi:type="TravelTimeValue">
<travelTime>
<duration>PT674S</duration>
<perDistance>10</perDistance>
<distanceUnit>KILOMETRES</distanceUnit>
</travelTime>
</basicData>
</measuredValue>
<measuredValue index="2">
<basicData xsi:type="TrafficConcentration">
<concentrationOfTrafficLengthInKilometres>0.2</concentrationOfTrafficLengthInKilometres>
</basicData>
</measuredValue>
<measuredValue index="3">
<basicData xsi:type="NumberOfIncidents">
<numberOfQueues>1</numberOfQueues>
</basicData>
</measuredValue>
<measuredValue index="4">
<basicData xsi:type="DelayValue">
<delay>PT-6S</delay>
</basicData>
</measuredValue>
<trafficIndexLive>0.0</trafficIndexLive>
<trafficIndexWeekAgo>0.0</trafficIndexWeekAgo>
</siteMeasurements>
One canonical record feeds three writers. SDM JSON-LD and DATEX II XML are derived from the same Dolfin instance, so semantic drift between the two is impossible by construction.
DATEX II v3 is a massive spec. What we ship here is a structural projection: right namespaces, right top-level shapes, correct ISO durations and units. It is not claimed to be a fully schema-validated DATEX II document. Getting to full validation is a tractable next step from this starting point.
City with lat/lon attachedjamsLengthKm, travelTimePer10kmMin, delayMin)source: "TomTom"
Full source, hosted alongside this page. Each file is also a
one-click download as raw .py. The whole package is
bundled as a tarball at the top of the Data section.
"""Canonical TrafficObservation model, mirrors traffic.dolfin."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class City:
name: str
countryCode: str
latitude: Optional[float] = None
longitude: Optional[float] = None
@dataclass
class TrafficObservation:
localId: str
city: City
observedAt: str
trafficIndex: Optional[float] = None
trafficIndexWeekAgo: Optional[float] = None
observedAtWeekAgo: Optional[str] = None
jamsDelaySeconds: Optional[float] = None
jamsLengthKm: Optional[float] = None
jamsCount: Optional[int] = None
travelTimePer10kmMin: Optional[float] = None
historicTravelTimePer10kmMin: Optional[float] = None
delayMin: Optional[float] = None
source: Optional[str] = None
"""Reusable text transforms shared across adapters.
Adapters compose these helpers rather than reimplementing them. Helpers
are intentionally minimal: they only do generic text work (cleanup,
regex extraction, keyword routing). Anything dataset-specific belongs
in the adapter itself.
"""
from __future__ import annotations
import re
from typing import Optional
def clean_text(value: Optional[str]) -> Optional[str]:
"""Trim, collapse internal whitespace, return None for empty input."""
if value is None:
return None
txt = re.sub(r"\s+", " ", str(value)).strip()
return txt or None
def extract_count(value: Optional[str], pattern: str = r"\((\d+)") -> Optional[int]:
"""Pull an integer out of free text, e.g. '... (12 exemplares)' -> 12."""
if value is None:
return None
m = re.search(pattern, value)
return int(m.group(1)) if m else None
def match_keywords(value: Optional[str], keyword_map: dict[str, str]) -> Optional[str]:
"""Return the first enum value whose regex key matches the input.
keyword_map: {regex_pattern: enum_value}, e.g.
{r"conjunto\\s+arb[óo]re[op]": "TreeCluster",
r"isolad": "IsolatedSpecimen"}
Patterns are evaluated in insertion order, case-insensitive.
"""
if not value:
return None
for pattern, enum_value in keyword_map.items():
if re.search(pattern, value, re.IGNORECASE):
return enum_value
return None
class Registry:
"""Tiny dedupe registry for value-typed entities like Authority.
Use when source data has many spelling variants of the same entity:
reg = Registry({"ICNF": Authority(name="...", acronym="ICNF")})
a = reg.resolve("ICNF (Instituto da Conservação ...)", needle="ICNF")
The canonical instance is returned, ensuring downstream graphs share
one node per real-world entity.
"""
def __init__(self, known: dict | None = None):
self._known = dict(known or {})
def resolve(self, raw, needle: str | None = None, default=None):
if raw is None:
return default
text = str(raw)
if needle is not None and needle in text and needle in self._known:
return self._known[needle]
for key, val in self._known.items():
if key in text:
return val
return default
def get(self, key: str):
return self._known.get(key)
"""JSON-LD writer for the canonical TrafficObservation model.
Aligns where possible to Smart Data Models conventions. SDM's
TrafficFlowObserved targets per-lane/per-segment observations, while
our records are city-aggregated KPIs. We adopt SDM attribute names
where they apply (`dateObserved`, `congested` derived from `trafficIndex`,
`refLocation` for the city) and extend with a custom KPI namespace
for the ones SDM does not cover (`jamsLengthKm`, `trafficIndex`,
`travelTimePer10kmMin`, ...).
"""
from __future__ import annotations
from dataclasses import asdict
from typing import Iterable
from .model import TrafficObservation
NS = "http://mimathon.askem.eu/uc4/traffic#"
CONTEXT = {
"@vocab": NS,
"sdm": "https://smartdatamodels.org/dataModel.Transportation/",
"schema": "https://schema.org/",
"TrafficObservation": NS + "TrafficObservation",
"City": NS + "City",
"city": "schema:location",
"observedAt": "sdm:dateObserved",
"observedAtWeekAgo": NS + "observedAtWeekAgo",
"trafficIndex": NS + "trafficIndex",
"trafficIndexWeekAgo": NS + "trafficIndexWeekAgo",
"jamsDelaySeconds": NS + "jamsDelaySeconds",
"jamsLengthKm": NS + "jamsLengthKm",
"jamsCount": NS + "jamsCount",
"travelTimePer10kmMin": NS + "travelTimePer10kmMin",
"historicTravelTimePer10kmMin": NS + "historicTravelTimePer10kmMin",
"delayMin": NS + "delayMin",
"source": "schema:provider",
"geo": "https://www.w3.org/2003/01/geo/wgs84_pos#",
"latitude": "geo:lat",
"longitude": "geo:long",
"name": "schema:name",
"countryCode": "schema:addressCountry",
}
def _strip_none(d):
if isinstance(d, dict):
return {k: _strip_none(v) for k, v in d.items() if v is not None}
if isinstance(d, list):
return [_strip_none(x) for x in d]
return d
def obs_to_node(obs: TrafficObservation, base_id: str) -> dict:
d = asdict(obs)
d["@id"] = f"{base_id}{obs.localId}"
d["@type"] = "TrafficObservation"
d["city"] = {"@type": "City", **asdict(obs.city)}
return _strip_none(d)
def build_document(observations: Iterable[TrafficObservation], base_id: str) -> dict:
return {
"@context": CONTEXT,
"@graph": [obs_to_node(o, base_id) for o in observations],
}
"""DATEX II v3 XML writer for the canonical TrafficObservation model.
Produces a payloadPublication of MeasuredDataPublication shape, with
one siteMeasurements element per canonical record. KPIs are mapped
to DATEX II basicData where a clean equivalent exists, and to
extensible auxiliary elements otherwise.
This output is a *structural projection*: element names and the
overall payload skeleton follow the DATEX II spec, but the
document is not claimed to be fully schema-validated against the
DATEX II XSDs. The intent is to make round-tripping with a real
DATEX II consumer obvious and to demonstrate that DATEX II and
Smart Data Models JSON-LD can be derived from one canonical pivot.
"""
from __future__ import annotations
from typing import Iterable
from xml.etree.ElementTree import Element, SubElement, tostring, register_namespace
from xml.dom import minidom
from .model import TrafficObservation
DATEX2_NS = "http://datex2.eu/schema/3/3.0"
XSI_NS = "http://www.w3.org/2001/XMLSchema-instance"
register_namespace("", DATEX2_NS)
register_namespace("xsi", XSI_NS)
XSI_TYPE = f"{{{XSI_NS}}}type"
def _e(parent, tag, text=None, attribs=None, **kwattrs):
"""Create a SubElement in the DATEX II namespace, with optional text and attribs."""
attrs = dict(attribs or {})
attrs.update(kwattrs)
el = SubElement(parent, f"{{{DATEX2_NS}}}{tag}", attrs)
if text is not None:
el.text = str(text)
return el
def _site_measurements(parent, obs: TrafficObservation, index: int) -> None:
site = _e(parent, "siteMeasurements")
_e(site, "measurementSiteReference", id=f"{obs.city.countryCode}-{obs.city.name}-aggregate", version="1.0")
_e(site, "measurementTimeDefault", text=obs.observedAt)
mv_idx = 1
if obs.travelTimePer10kmMin is not None:
mv = _e(site, "measuredValue", index=str(mv_idx))
bd = _e(mv, "basicData", attribs={XSI_TYPE: "TravelTimeValue"})
tt = _e(bd, "travelTime")
_e(tt, "duration", text=f"PT{int(obs.travelTimePer10kmMin*60)}S")
_e(tt, "perDistance", text="10")
_e(tt, "distanceUnit", text="KILOMETRES")
mv_idx += 1
if obs.jamsLengthKm is not None:
mv = _e(site, "measuredValue", index=str(mv_idx))
bd = _e(mv, "basicData", attribs={XSI_TYPE: "TrafficConcentration"})
_e(bd, "concentrationOfTrafficLengthInKilometres", text=str(obs.jamsLengthKm))
mv_idx += 1
if obs.jamsCount is not None:
mv = _e(site, "measuredValue", index=str(mv_idx))
bd = _e(mv, "basicData", attribs={XSI_TYPE: "NumberOfIncidents"})
_e(bd, "numberOfQueues", text=str(obs.jamsCount))
mv_idx += 1
if obs.delayMin is not None:
mv = _e(site, "measuredValue", index=str(mv_idx))
bd = _e(mv, "basicData", attribs={XSI_TYPE: "DelayValue"})
_e(bd, "delay", text=f"PT{int(obs.delayMin*60)}S")
mv_idx += 1
# Provider-specific extensions outside the strict DATEX II schema
if obs.trafficIndex is not None:
_e(site, "trafficIndexLive", text=str(obs.trafficIndex))
if obs.trafficIndexWeekAgo is not None:
_e(site, "trafficIndexWeekAgo", text=str(obs.trafficIndexWeekAgo))
def build_document(observations: Iterable[TrafficObservation], publication_time: str) -> str:
root = Element(
f"{{{DATEX2_NS}}}d2LogicalModel",
{"modelBaseVersion": "3"},
)
payload = _e(root, "payloadPublication", attribs={XSI_TYPE: "MeasuredDataPublication", "lang": "en"})
_e(payload, "publicationTime", text=publication_time)
pub_creator = _e(payload, "publicationCreator")
_e(pub_creator, "country", text="pt")
_e(pub_creator, "nationalIdentifier", text="askem-mimathon-uc4")
obs_list = list(observations)
if obs_list:
first = obs_list[0]
_e(
payload,
"measurementSiteTablePublicationReference",
id=f"{first.city.countryCode}-{first.city.name}-table",
version="1.0",
)
for i, obs in enumerate(obs_list, start=1):
_site_measurements(payload, obs, i)
xml_bytes = tostring(root, encoding="utf-8", xml_declaration=True)
return minidom.parseString(xml_bytes).toprettyxml(indent=" ")
"""GeoJSON writer for the canonical TrafficObservation model.
City-aggregated observations have no road geometry, so we plot one
Point per observation at the city centroid. The actual differentiation
between observations is in the time and KPI properties, not space.
For mapping/visualisation, a UI typically picks one snapshot in time
and shows the city as a single coloured marker.
"""
from __future__ import annotations
from dataclasses import asdict
from typing import Iterable
from .model import TrafficObservation
def _flatten(prefix: str, value, target: dict) -> None:
if value is None:
return
if isinstance(value, dict):
for k, v in value.items():
_flatten(f"{prefix}.{k}" if prefix else k, v, target)
else:
target[prefix] = value
def obs_to_feature(obs: TrafficObservation, base_id: str) -> dict:
props: dict = {"@id": f"{base_id}{obs.localId}", "@type": "TrafficObservation"}
d = asdict(obs)
city_dict = d.pop("city")
for k, v in d.items():
if v is not None:
props[k] = v
_flatten("city", city_dict, props)
lat = obs.city.latitude
lon = obs.city.longitude
geom = {"type": "Point", "coordinates": [lon, lat]} if lon is not None and lat is not None else None
feature = {"type": "Feature", "id": obs.localId, "properties": props}
if geom:
feature["geometry"] = geom
return feature
def build_collection(observations: Iterable[TrafficObservation], base_id: str) -> dict:
return {
"type": "FeatureCollection",
"features": [obs_to_feature(o, base_id) for o in observations],
}
"""CLI entry point for the traffic harmonizer.
One canonical record, three output formats:
python -m harmonize_traffic \
--adapter tomtom \
--input ../uc4-traffic-tomtom.csv \
--output ../out/traffic.jsonld \
--datex2 ../out/traffic.datex2.xml \
--geojson ../out/traffic.geojson \
--base-id http://mimathon.askem.eu/uc4/traffic/
"""
from __future__ import annotations
import argparse
import datetime
import importlib
import json
import sys
from pathlib import Path
from .datex2 import build_document as build_datex2
from .geojson_out import build_collection
from .jsonld import build_document as build_jsonld
def _load_adapter(name: str):
mod = importlib.import_module(f"harmonize_traffic.adapters.{name}")
if not hasattr(mod, "read"):
raise SystemExit(f"adapter {name!r} has no read(path) function")
return mod
def main(argv=None) -> int:
p = argparse.ArgumentParser(prog="harmonize_traffic", description="Harmonize a traffic dataset to the canonical TrafficObservation model and emit JSON-LD, DATEX II XML, and optional GeoJSON.")
p.add_argument("--adapter", required=True)
p.add_argument("--input", required=True, type=Path)
p.add_argument("--output", required=True, type=Path, help="Destination JSON-LD file")
p.add_argument("--base-id", default="http://example.org/traffic/")
p.add_argument("--datex2", type=Path, help="Also emit a DATEX II v3 XML file")
p.add_argument("--geojson", type=Path, help="Also emit a GeoJSON FeatureCollection")
args = p.parse_args(argv)
adapter = _load_adapter(args.adapter)
print(f"Reading via adapter '{args.adapter}' from {args.input}...")
observations = list(adapter.read(args.input))
print(f" {len(observations)} observations read")
print(f"Writing JSON-LD to {args.output}...")
doc = build_jsonld(observations, base_id=args.base_id)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(doc, ensure_ascii=False, indent=2), encoding="utf-8")
print(f" done, {len(doc['@graph'])} entities in @graph")
if args.datex2:
print(f"Writing DATEX II XML to {args.datex2}...")
pub_time = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
xml = build_datex2(observations, publication_time=pub_time)
args.datex2.parent.mkdir(parents=True, exist_ok=True)
args.datex2.write_text(xml, encoding="utf-8")
print(f" done, {len(observations)} siteMeasurements")
if args.geojson:
print(f"Writing GeoJSON to {args.geojson}...")
fc = build_collection(observations, base_id=args.base_id)
args.geojson.parent.mkdir(parents=True, exist_ok=True)
args.geojson.write_text(json.dumps(fc, ensure_ascii=False, indent=2), encoding="utf-8")
print(f" done, {len(fc['features'])} features")
return 0
if __name__ == "__main__":
sys.exit(main())
"""Skeleton traffic adapter, copy and rename to add a new dataset.
Quick start:
1. Copy to harmonize_traffic/adapters/<your_dataset>.py
2. Replace the read() body with your own parsing
3. Run: python -m harmonize_traffic --adapter <your_dataset> ...
Contract:
Expose a single function `read(path) -> Iterator[TrafficObservation]`.
See harmonize_traffic/adapters/tomtom.py for a worked example.
"""
from __future__ import annotations
from pathlib import Path
from typing import Iterator
from ..model import City, TrafficObservation
from ..transforms import clean_text
def read(path: str | Path) -> Iterator[TrafficObservation]:
raise NotImplementedError("Implement read() for your dataset, see tomtom.py")
"""Adapter for the Porto TomTom traffic indicators CSV.
Source columns:
Country, City, UpdateTimeUTC,
JamsDelay, TrafficIndexLive, JamsLengthInKms, JamsCount,
TrafficIndexWeekAgo, UpdateTimeUTCWeekAgo,
TravelTimeLivePer10KmsMins, TravelTimeHistoricPer10KmsMins,
MinsDelay
Maps to TrafficObservation. UpdateTimeUTC is exported as MM:SS only,
without a date, so we keep it as an opaque source-side timestamp
string. Country and city names are normalised lightly.
"""
from __future__ import annotations
import csv
from pathlib import Path
from typing import Iterator
from ..model import City, TrafficObservation
from ..transforms import clean_text
# Approximate centroid of Porto (Wikidata Q45)
_KNOWN_CITY = {
("PRT", "porto"): City(
name="Porto", countryCode="PRT", latitude=41.1496, longitude=-8.6109
),
}
def _safe_float(s):
if s in (None, ""):
return None
try:
return float(s)
except (ValueError, TypeError):
return None
def _safe_int(s):
f = _safe_float(s)
return int(round(f)) if f is not None else None
def _resolve_city(country: str, city: str) -> City:
key = ((country or "").strip().upper(), (city or "").strip().lower())
if key in _KNOWN_CITY:
return _KNOWN_CITY[key]
return City(name=(city or "").strip().title() or "Unknown", countryCode=(country or "").strip().upper() or "??")
def read(csv_path: str | Path) -> Iterator[TrafficObservation]:
"""Yield canonical TrafficObservation records from a TomTom CSV."""
with Path(csv_path).open(encoding="utf-8") as f:
reader = csv.DictReader(f)
for idx, row in enumerate(reader, start=1):
city = _resolve_city(row.get("Country"), row.get("City"))
yield TrafficObservation(
localId=f"{city.countryCode}-{city.name}-{idx:05d}",
city=city,
observedAt=(clean_text(row.get("UpdateTimeUTC")) or ""),
observedAtWeekAgo=clean_text(row.get("UpdateTimeUTCWeekAgo")),
trafficIndex=_safe_float(row.get("TrafficIndexLive")),
trafficIndexWeekAgo=_safe_float(row.get("TrafficIndexWeekAgo")),
jamsDelaySeconds=_safe_float(row.get("JamsDelay")),
jamsLengthKm=_safe_float(row.get("JamsLengthInKms")),
jamsCount=_safe_int(row.get("JamsCount")),
travelTimePer10kmMin=_safe_float(row.get("TravelTimeLivePer10KmsMins")),
historicTravelTimePer10kmMin=_safe_float(row.get("TravelTimeHistoricPer10KmsMins")),
delayMin=_safe_float(row.get("MinsDelay")),
source="TomTom",
)
The current output has the right namespaces and shapes but is not validated against the official XSDs. Adding a validation step (xmlschema or lxml against the published DATEX II 3 schemas) and tightening the writer to satisfy it is the natural next step.
City-wide aggregates exercise only half of the canonical model. A per-segment TomTom feed, an OpenStreetMap road-network export with traffic counters, or a DATEX II native source would stress-test the City vs refRoadSegment dichotomy and likely surface the need for a sibling SegmentObservation concept.
Right now we only go TomTom CSV → canonical. A DATEX II reader (the inverse of datex2.py) would let us ingest from a real DATEX II feed and emit the SDM view, which is exactly the original UC4 brief in production: a GPS provider sends DATEX II, the city republishes as SDM Open Data.
Same goal as UC1 and UC2. For traffic feeds this matters even more: every provider has its own CSV/JSON dialect. A declarative YAML mapping and a CLI scaffolder would make ingesting a new provider a one-afternoon task.