OASC
UC4 · team LESL
MIMathon Porto 2026 · Use case 04

One canonical traffic record,
two standardized outputs.

Team LESL: a single Dolfin pivot for city-wide traffic observations, fanning out to Smart Data Models JSON-LD and DATEX II v3 XML, with a GeoJSON view on the side. Same input, three audiences, zero drift between formats.

TeamLESL
Dataset2598 TomTom records, Porto
Pivot languageDolfin
Output formatsSDM JSON-LD, DATEX II v3, GeoJSON

What the data actually contains.

A CSV of TomTom city-aggregated traffic indicators for Porto. Each row is a snapshot of the whole-city state at a given timestamp. No road segment, no per-link decomposition: this is a KPI feed, not a road network model.

2598
observations
12
columns
1
city covered
8
distinct timestamps
Format quirk

Timestamps lost their date.

UpdateTimeUTC is exported as MM:SS.s only, with no date part. We pass it through as an opaque source-side timestamp string. A future iteration should pair the dataset with the capture date, or read full ISO datetimes directly from the TomTom API.

Format quirk

City-aggregated, no segment ID.

SDM's TrafficFlowObserved targets per-lane or per-segment observations (laneId, refRoadSegment). Our records are city-wide summaries. The canonical model accepts both shapes by attaching the observation to a typed City rather than a road segment.

Observation

Rich KPI vocabulary.

Eight numeric KPIs per row: JamsDelay, TrafficIndexLive, JamsLengthInKms, JamsCount, TrafficIndexWeekAgo, TravelTimeLivePer10KmsMins, TravelTimeHistoricPer10KmsMins, MinsDelay. The canonical model preserves all eight.

Observation

Same data, two consumer ecosystems.

Public mobility platforms in Europe expect DATEX II; smart city stacks (FIWARE, OASC) expect Smart Data Models JSON-LD. UC4 is exactly about not picking one and losing the other.

Two standards, both relevant.

Smart Data Models (Transportation domain)

DATEX II v3

Pivot, then fan out

Instead of mapping TomTom directly to either format (and then re-mapping when the other consumer asks), we map once to a small canonical TrafficObservation in Dolfin, and derive both serializations from it. SDM JSON-LD and DATEX II XML stay in sync by construction. The path to a third consumer (GTFS-RT? OCIT? a proprietary integrator?) is one new writer file.

SDM Transportation DATEX II v3 GeoJSON one pivot, three writers

The Dolfin pivot.

package <http://mimathon.askem.eu/uc4/traffic>:
  dolfin_version "1"
  version "0.1.0"
  author "LESL (Lea, Eliott, Sattisvar & Louis), Dolfin & Askem"
  description "Canonical model for city-wide traffic observations, MIMathon Porto 2026 use case 04. Aggregated KPIs designed for dual-format output: Smart Data Models JSON-LD and DATEX II XML, from one authoritative pivot."

concept City:
  has name: one string
  has countryCode: one string
  has latitude: optional float
  has longitude: optional float

concept TrafficObservation:
  has localId: one string
  has city: one City
  has observedAt: one string
  has trafficIndex: optional float
  has trafficIndexWeekAgo: optional float
  has observedAtWeekAgo: optional string
  has jamsDelaySeconds: optional float
  has jamsLengthKm: optional float
  has jamsCount: optional int
  has travelTimePer10kmMin: optional float
  has historicTravelTimePer10kmMin: optional float
  has delayMin: optional float
  has source: optional string

Design choices

From TomTom CSV to the pivot.

Source attributeSource exampleCanonical attributeTransformation
CountryPRTCity.countryCodeuppercase ISO 3166-1 alpha-3
CityportoCity.nametitle-case, with lat/lon attached from known-city table
UpdateTimeUTC01:30.0TrafficObservation.observedAtpreserved as opaque source-side timestamp
UpdateTimeUTCWeekAgo01:30.0TrafficObservation.observedAtWeekAgopreserved
TrafficIndexLive0TrafficObservation.trafficIndexfloat
TrafficIndexWeekAgo0TrafficObservation.trafficIndexWeekAgofloat
JamsDelay3.3TrafficObservation.jamsDelaySecondsfloat, seconds
JamsLengthInKms0.2TrafficObservation.jamsLengthKmfloat, kilometres
JamsCount1TrafficObservation.jamsCountint
TravelTimeLivePer10KmsMins11.247TrafficObservation.travelTimePer10kmMinfloat, minutes per 10 km
TravelTimeHistoricPer10KmsMins11.350TrafficObservation.historicTravelTimePer10kmMinfloat
MinsDelay-0.103TrafficObservation.delayMinfloat, minutes (negative possible)
(constant)TomTomTrafficObservation.sourceprovenance tag

One core, three writers.

Same architecture as UC1/UC2 (canonical model + adapter), plus a second output writer for DATEX II. Adding a third format (GTFS-RT, OCIT, NeTEx, your own) is one new file.

CLI

python -m harmonize_traffic \
    --adapter tomtom \
    --input ../uc4-traffic-tomtom.csv \
    --output ../out/traffic.jsonld \
    --datex2 ../out/traffic.datex2.xml \
    --geojson ../out/traffic.geojson \
    --base-id "http://mimathon.askem.eu/uc4/traffic/"

Before, canonical, both outputs.

Same first record (TomTom CSV row), shown as raw source, as canonical JSON-LD, and as the DATEX II siteMeasurements fragment derived from the same canonical record.

Source · TomTom CSV row
{
  "Country": "PRT",
  "City": "porto",
  "UpdateTimeUTC": "01:30.0",
  "JamsDelay": "3.3",
  "TrafficIndexLive": "0",
  "JamsLengthInKms": "0.2",
  "JamsCount": "1",
  "TrafficIndexWeekAgo": "0",
  "UpdateTimeUTCWeekAgo": "01:30.0",
  "TravelTimeLivePer10KmsMins": "11.24699163758936",
  "TravelTimeHistoricPer10KmsMins": "11.350380364362442",
  "MinsDelay": "-0.103388727"
}
Canonical · JSON-LD node (SDM)
{
  "localId": "PRT-Porto-00001",
  "city": {
    "@type": "City",
    "name": "Porto",
    "countryCode": "PRT",
    "latitude": 41.1496,
    "longitude": -8.6109
  },
  "observedAt": "01:30.0",
  "trafficIndex": 0.0,
  "trafficIndexWeekAgo": 0.0,
  "observedAtWeekAgo": "01:30.0",
  "jamsDelaySeconds": 3.3,
  "jamsLengthKm": 0.2,
  "jamsCount": 1,
  "travelTimePer10kmMin": 11.24699163758936,
  "historicTravelTimePer10kmMin": 11.350380364362442,
  "delayMin": -0.103388727,
  "source": "TomTom",
  "@id": "http://mimathon.askem.eu/uc4/traffic/PRT-Porto-00001",
  "@type": "TrafficObservation"
}
Canonical · DATEX II fragment
<siteMeasurements>
      <measurementSiteReference id="PRT-Porto-aggregate" version="1.0"/>
      <measurementTimeDefault>01:30.0</measurementTimeDefault>
      <measuredValue index="1">
        <basicData xsi:type="TravelTimeValue">
          <travelTime>
            <duration>PT674S</duration>
            <perDistance>10</perDistance>
            <distanceUnit>KILOMETRES</distanceUnit>
          </travelTime>
        </basicData>
      </measuredValue>
      <measuredValue index="2">
        <basicData xsi:type="TrafficConcentration">
          <concentrationOfTrafficLengthInKilometres>0.2</concentrationOfTrafficLengthInKilometres>
        </basicData>
      </measuredValue>
      <measuredValue index="3">
        <basicData xsi:type="NumberOfIncidents">
          <numberOfQueues>1</numberOfQueues>
        </basicData>
      </measuredValue>
      <measuredValue index="4">
        <basicData xsi:type="DelayValue">
          <delay>PT-6S</delay>
        </basicData>
      </measuredValue>
      <trafficIndexLive>0.0</trafficIndexLive>
      <trafficIndexWeekAgo>0.0</trafficIndexWeekAgo>
    </siteMeasurements>

The pipeline as a graph

One canonical record feeds three writers. SDM JSON-LD and DATEX II XML are derived from the same Dolfin instance, so semantic drift between the two is impossible by construction.

One canonical TrafficObservation fanning out to SDM JSON-LD, DATEX II XML, and GeoJSON outputs
Honest scope on DATEX II

DATEX II v3 is a massive spec. What we ship here is a structural projection: right namespaces, right top-level shapes, correct ISO durations and units. It is not claimed to be a fully schema-validated DATEX II document. Getting to full validation is a tractable next step from this starting point.

What changed

Read it, run it, fork it.

Full source, hosted alongside this page. Each file is also a one-click download as raw .py. The whole package is bundled as a tarball at the top of the Data section.

model.py Canonical TrafficObservation model, mirrors traffic.dolfin View raw
"""Canonical TrafficObservation model, mirrors traffic.dolfin."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional


@dataclass(frozen=True)
class City:
    name: str
    countryCode: str
    latitude: Optional[float] = None
    longitude: Optional[float] = None


@dataclass
class TrafficObservation:
    localId: str
    city: City
    observedAt: str
    trafficIndex: Optional[float] = None
    trafficIndexWeekAgo: Optional[float] = None
    observedAtWeekAgo: Optional[str] = None
    jamsDelaySeconds: Optional[float] = None
    jamsLengthKm: Optional[float] = None
    jamsCount: Optional[int] = None
    travelTimePer10kmMin: Optional[float] = None
    historicTravelTimePer10kmMin: Optional[float] = None
    delayMin: Optional[float] = None
    source: Optional[str] = None
transforms.py Reusable text helpers shared across adapters View raw
"""Reusable text transforms shared across adapters.

Adapters compose these helpers rather than reimplementing them. Helpers
are intentionally minimal: they only do generic text work (cleanup,
regex extraction, keyword routing). Anything dataset-specific belongs
in the adapter itself.
"""
from __future__ import annotations
import re
from typing import Optional


def clean_text(value: Optional[str]) -> Optional[str]:
    """Trim, collapse internal whitespace, return None for empty input."""
    if value is None:
        return None
    txt = re.sub(r"\s+", " ", str(value)).strip()
    return txt or None


def extract_count(value: Optional[str], pattern: str = r"\((\d+)") -> Optional[int]:
    """Pull an integer out of free text, e.g. '... (12 exemplares)' -> 12."""
    if value is None:
        return None
    m = re.search(pattern, value)
    return int(m.group(1)) if m else None


def match_keywords(value: Optional[str], keyword_map: dict[str, str]) -> Optional[str]:
    """Return the first enum value whose regex key matches the input.

    keyword_map: {regex_pattern: enum_value}, e.g.
        {r"conjunto\\s+arb[óo]re[op]": "TreeCluster",
         r"isolad": "IsolatedSpecimen"}
    Patterns are evaluated in insertion order, case-insensitive.
    """
    if not value:
        return None
    for pattern, enum_value in keyword_map.items():
        if re.search(pattern, value, re.IGNORECASE):
            return enum_value
    return None


class Registry:
    """Tiny dedupe registry for value-typed entities like Authority.

    Use when source data has many spelling variants of the same entity:
        reg = Registry({"ICNF": Authority(name="...", acronym="ICNF")})
        a = reg.resolve("ICNF (Instituto da Conservação ...)", needle="ICNF")
    The canonical instance is returned, ensuring downstream graphs share
    one node per real-world entity.
    """

    def __init__(self, known: dict | None = None):
        self._known = dict(known or {})

    def resolve(self, raw, needle: str | None = None, default=None):
        if raw is None:
            return default
        text = str(raw)
        if needle is not None and needle in text and needle in self._known:
            return self._known[needle]
        for key, val in self._known.items():
            if key in text:
                return val
        return default

    def get(self, key: str):
        return self._known.get(key)
jsonld.py JSON-LD writer, schema.org and SDM Transportation context View raw
"""JSON-LD writer for the canonical TrafficObservation model.

Aligns where possible to Smart Data Models conventions. SDM's
TrafficFlowObserved targets per-lane/per-segment observations, while
our records are city-aggregated KPIs. We adopt SDM attribute names
where they apply (`dateObserved`, `congested` derived from `trafficIndex`,
`refLocation` for the city) and extend with a custom KPI namespace
for the ones SDM does not cover (`jamsLengthKm`, `trafficIndex`,
`travelTimePer10kmMin`, ...).
"""
from __future__ import annotations
from dataclasses import asdict
from typing import Iterable

from .model import TrafficObservation


NS = "http://mimathon.askem.eu/uc4/traffic#"

CONTEXT = {
    "@vocab": NS,
    "sdm": "https://smartdatamodels.org/dataModel.Transportation/",
    "schema": "https://schema.org/",
    "TrafficObservation": NS + "TrafficObservation",
    "City": NS + "City",
    "city": "schema:location",
    "observedAt": "sdm:dateObserved",
    "observedAtWeekAgo": NS + "observedAtWeekAgo",
    "trafficIndex": NS + "trafficIndex",
    "trafficIndexWeekAgo": NS + "trafficIndexWeekAgo",
    "jamsDelaySeconds": NS + "jamsDelaySeconds",
    "jamsLengthKm": NS + "jamsLengthKm",
    "jamsCount": NS + "jamsCount",
    "travelTimePer10kmMin": NS + "travelTimePer10kmMin",
    "historicTravelTimePer10kmMin": NS + "historicTravelTimePer10kmMin",
    "delayMin": NS + "delayMin",
    "source": "schema:provider",
    "geo": "https://www.w3.org/2003/01/geo/wgs84_pos#",
    "latitude": "geo:lat",
    "longitude": "geo:long",
    "name": "schema:name",
    "countryCode": "schema:addressCountry",
}


def _strip_none(d):
    if isinstance(d, dict):
        return {k: _strip_none(v) for k, v in d.items() if v is not None}
    if isinstance(d, list):
        return [_strip_none(x) for x in d]
    return d


def obs_to_node(obs: TrafficObservation, base_id: str) -> dict:
    d = asdict(obs)
    d["@id"] = f"{base_id}{obs.localId}"
    d["@type"] = "TrafficObservation"
    d["city"] = {"@type": "City", **asdict(obs.city)}
    return _strip_none(d)


def build_document(observations: Iterable[TrafficObservation], base_id: str) -> dict:
    return {
        "@context": CONTEXT,
        "@graph": [obs_to_node(o, base_id) for o in observations],
    }
datex2.py DATEX II v3 XML writer, MeasuredDataPublication shape View raw
"""DATEX II v3 XML writer for the canonical TrafficObservation model.

Produces a payloadPublication of MeasuredDataPublication shape, with
one siteMeasurements element per canonical record. KPIs are mapped
to DATEX II basicData where a clean equivalent exists, and to
extensible auxiliary elements otherwise.

This output is a *structural projection*: element names and the
overall payload skeleton follow the DATEX II spec, but the
document is not claimed to be fully schema-validated against the
DATEX II XSDs. The intent is to make round-tripping with a real
DATEX II consumer obvious and to demonstrate that DATEX II and
Smart Data Models JSON-LD can be derived from one canonical pivot.
"""
from __future__ import annotations
from typing import Iterable
from xml.etree.ElementTree import Element, SubElement, tostring, register_namespace
from xml.dom import minidom

from .model import TrafficObservation


DATEX2_NS = "http://datex2.eu/schema/3/3.0"
XSI_NS = "http://www.w3.org/2001/XMLSchema-instance"

register_namespace("", DATEX2_NS)
register_namespace("xsi", XSI_NS)


XSI_TYPE = f"{{{XSI_NS}}}type"


def _e(parent, tag, text=None, attribs=None, **kwattrs):
    """Create a SubElement in the DATEX II namespace, with optional text and attribs."""
    attrs = dict(attribs or {})
    attrs.update(kwattrs)
    el = SubElement(parent, f"{{{DATEX2_NS}}}{tag}", attrs)
    if text is not None:
        el.text = str(text)
    return el


def _site_measurements(parent, obs: TrafficObservation, index: int) -> None:
    site = _e(parent, "siteMeasurements")
    _e(site, "measurementSiteReference", id=f"{obs.city.countryCode}-{obs.city.name}-aggregate", version="1.0")
    _e(site, "measurementTimeDefault", text=obs.observedAt)

    mv_idx = 1
    if obs.travelTimePer10kmMin is not None:
        mv = _e(site, "measuredValue", index=str(mv_idx))
        bd = _e(mv, "basicData", attribs={XSI_TYPE: "TravelTimeValue"})
        tt = _e(bd, "travelTime")
        _e(tt, "duration", text=f"PT{int(obs.travelTimePer10kmMin*60)}S")
        _e(tt, "perDistance", text="10")
        _e(tt, "distanceUnit", text="KILOMETRES")
        mv_idx += 1

    if obs.jamsLengthKm is not None:
        mv = _e(site, "measuredValue", index=str(mv_idx))
        bd = _e(mv, "basicData", attribs={XSI_TYPE: "TrafficConcentration"})
        _e(bd, "concentrationOfTrafficLengthInKilometres", text=str(obs.jamsLengthKm))
        mv_idx += 1

    if obs.jamsCount is not None:
        mv = _e(site, "measuredValue", index=str(mv_idx))
        bd = _e(mv, "basicData", attribs={XSI_TYPE: "NumberOfIncidents"})
        _e(bd, "numberOfQueues", text=str(obs.jamsCount))
        mv_idx += 1

    if obs.delayMin is not None:
        mv = _e(site, "measuredValue", index=str(mv_idx))
        bd = _e(mv, "basicData", attribs={XSI_TYPE: "DelayValue"})
        _e(bd, "delay", text=f"PT{int(obs.delayMin*60)}S")
        mv_idx += 1

    # Provider-specific extensions outside the strict DATEX II schema
    if obs.trafficIndex is not None:
        _e(site, "trafficIndexLive", text=str(obs.trafficIndex))
    if obs.trafficIndexWeekAgo is not None:
        _e(site, "trafficIndexWeekAgo", text=str(obs.trafficIndexWeekAgo))


def build_document(observations: Iterable[TrafficObservation], publication_time: str) -> str:
    root = Element(
        f"{{{DATEX2_NS}}}d2LogicalModel",
        {"modelBaseVersion": "3"},
    )

    payload = _e(root, "payloadPublication", attribs={XSI_TYPE: "MeasuredDataPublication", "lang": "en"})
    _e(payload, "publicationTime", text=publication_time)
    pub_creator = _e(payload, "publicationCreator")
    _e(pub_creator, "country", text="pt")
    _e(pub_creator, "nationalIdentifier", text="askem-mimathon-uc4")

    obs_list = list(observations)
    if obs_list:
        first = obs_list[0]
        _e(
            payload,
            "measurementSiteTablePublicationReference",
            id=f"{first.city.countryCode}-{first.city.name}-table",
            version="1.0",
        )

    for i, obs in enumerate(obs_list, start=1):
        _site_measurements(payload, obs, i)

    xml_bytes = tostring(root, encoding="utf-8", xml_declaration=True)
    return minidom.parseString(xml_bytes).toprettyxml(indent="  ")
geojson_out.py GeoJSON FeatureCollection writer for GIS tools View raw
"""GeoJSON writer for the canonical TrafficObservation model.

City-aggregated observations have no road geometry, so we plot one
Point per observation at the city centroid. The actual differentiation
between observations is in the time and KPI properties, not space.
For mapping/visualisation, a UI typically picks one snapshot in time
and shows the city as a single coloured marker.
"""
from __future__ import annotations
from dataclasses import asdict
from typing import Iterable

from .model import TrafficObservation


def _flatten(prefix: str, value, target: dict) -> None:
    if value is None:
        return
    if isinstance(value, dict):
        for k, v in value.items():
            _flatten(f"{prefix}.{k}" if prefix else k, v, target)
    else:
        target[prefix] = value


def obs_to_feature(obs: TrafficObservation, base_id: str) -> dict:
    props: dict = {"@id": f"{base_id}{obs.localId}", "@type": "TrafficObservation"}
    d = asdict(obs)
    city_dict = d.pop("city")
    for k, v in d.items():
        if v is not None:
            props[k] = v
    _flatten("city", city_dict, props)

    lat = obs.city.latitude
    lon = obs.city.longitude
    geom = {"type": "Point", "coordinates": [lon, lat]} if lon is not None and lat is not None else None

    feature = {"type": "Feature", "id": obs.localId, "properties": props}
    if geom:
        feature["geometry"] = geom
    return feature


def build_collection(observations: Iterable[TrafficObservation], base_id: str) -> dict:
    return {
        "type": "FeatureCollection",
        "features": [obs_to_feature(o, base_id) for o in observations],
    }
__main__.py CLI orchestrating adapter and all three writers View raw
"""CLI entry point for the traffic harmonizer.

One canonical record, three output formats:

    python -m harmonize_traffic \
        --adapter tomtom \
        --input ../uc4-traffic-tomtom.csv \
        --output ../out/traffic.jsonld \
        --datex2 ../out/traffic.datex2.xml \
        --geojson ../out/traffic.geojson \
        --base-id http://mimathon.askem.eu/uc4/traffic/
"""
from __future__ import annotations
import argparse
import datetime
import importlib
import json
import sys
from pathlib import Path

from .datex2 import build_document as build_datex2
from .geojson_out import build_collection
from .jsonld import build_document as build_jsonld


def _load_adapter(name: str):
    mod = importlib.import_module(f"harmonize_traffic.adapters.{name}")
    if not hasattr(mod, "read"):
        raise SystemExit(f"adapter {name!r} has no read(path) function")
    return mod


def main(argv=None) -> int:
    p = argparse.ArgumentParser(prog="harmonize_traffic", description="Harmonize a traffic dataset to the canonical TrafficObservation model and emit JSON-LD, DATEX II XML, and optional GeoJSON.")
    p.add_argument("--adapter", required=True)
    p.add_argument("--input", required=True, type=Path)
    p.add_argument("--output", required=True, type=Path, help="Destination JSON-LD file")
    p.add_argument("--base-id", default="http://example.org/traffic/")
    p.add_argument("--datex2", type=Path, help="Also emit a DATEX II v3 XML file")
    p.add_argument("--geojson", type=Path, help="Also emit a GeoJSON FeatureCollection")
    args = p.parse_args(argv)

    adapter = _load_adapter(args.adapter)
    print(f"Reading via adapter '{args.adapter}' from {args.input}...")
    observations = list(adapter.read(args.input))
    print(f"  {len(observations)} observations read")

    print(f"Writing JSON-LD to {args.output}...")
    doc = build_jsonld(observations, base_id=args.base_id)
    args.output.parent.mkdir(parents=True, exist_ok=True)
    args.output.write_text(json.dumps(doc, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"  done, {len(doc['@graph'])} entities in @graph")

    if args.datex2:
        print(f"Writing DATEX II XML to {args.datex2}...")
        pub_time = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
        xml = build_datex2(observations, publication_time=pub_time)
        args.datex2.parent.mkdir(parents=True, exist_ok=True)
        args.datex2.write_text(xml, encoding="utf-8")
        print(f"  done, {len(observations)} siteMeasurements")

    if args.geojson:
        print(f"Writing GeoJSON to {args.geojson}...")
        fc = build_collection(observations, base_id=args.base_id)
        args.geojson.parent.mkdir(parents=True, exist_ok=True)
        args.geojson.write_text(json.dumps(fc, ensure_ascii=False, indent=2), encoding="utf-8")
        print(f"  done, {len(fc['features'])} features")

    return 0


if __name__ == "__main__":
    sys.exit(main())
adapters/_template.py Skeleton, copy and rename to add a new dataset View raw
"""Skeleton traffic adapter, copy and rename to add a new dataset.

Quick start:
    1. Copy to harmonize_traffic/adapters/<your_dataset>.py
    2. Replace the read() body with your own parsing
    3. Run:  python -m harmonize_traffic --adapter <your_dataset> ...

Contract:
    Expose a single function `read(path) -> Iterator[TrafficObservation]`.

See harmonize_traffic/adapters/tomtom.py for a worked example.
"""
from __future__ import annotations
from pathlib import Path
from typing import Iterator

from ..model import City, TrafficObservation
from ..transforms import clean_text


def read(path: str | Path) -> Iterator[TrafficObservation]:
    raise NotImplementedError("Implement read() for your dataset, see tomtom.py")
adapters/tomtom.py Adapter for TomTom CSV (city-wide aggregated KPIs) View raw
"""Adapter for the Porto TomTom traffic indicators CSV.

Source columns:
    Country, City, UpdateTimeUTC,
    JamsDelay, TrafficIndexLive, JamsLengthInKms, JamsCount,
    TrafficIndexWeekAgo, UpdateTimeUTCWeekAgo,
    TravelTimeLivePer10KmsMins, TravelTimeHistoricPer10KmsMins,
    MinsDelay

Maps to TrafficObservation. UpdateTimeUTC is exported as MM:SS only,
without a date, so we keep it as an opaque source-side timestamp
string. Country and city names are normalised lightly.
"""
from __future__ import annotations
import csv
from pathlib import Path
from typing import Iterator

from ..model import City, TrafficObservation
from ..transforms import clean_text


# Approximate centroid of Porto (Wikidata Q45)
_KNOWN_CITY = {
    ("PRT", "porto"): City(
        name="Porto", countryCode="PRT", latitude=41.1496, longitude=-8.6109
    ),
}


def _safe_float(s):
    if s in (None, ""):
        return None
    try:
        return float(s)
    except (ValueError, TypeError):
        return None


def _safe_int(s):
    f = _safe_float(s)
    return int(round(f)) if f is not None else None


def _resolve_city(country: str, city: str) -> City:
    key = ((country or "").strip().upper(), (city or "").strip().lower())
    if key in _KNOWN_CITY:
        return _KNOWN_CITY[key]
    return City(name=(city or "").strip().title() or "Unknown", countryCode=(country or "").strip().upper() or "??")


def read(csv_path: str | Path) -> Iterator[TrafficObservation]:
    """Yield canonical TrafficObservation records from a TomTom CSV."""
    with Path(csv_path).open(encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for idx, row in enumerate(reader, start=1):
            city = _resolve_city(row.get("Country"), row.get("City"))
            yield TrafficObservation(
                localId=f"{city.countryCode}-{city.name}-{idx:05d}",
                city=city,
                observedAt=(clean_text(row.get("UpdateTimeUTC")) or ""),
                observedAtWeekAgo=clean_text(row.get("UpdateTimeUTCWeekAgo")),
                trafficIndex=_safe_float(row.get("TrafficIndexLive")),
                trafficIndexWeekAgo=_safe_float(row.get("TrafficIndexWeekAgo")),
                jamsDelaySeconds=_safe_float(row.get("JamsDelay")),
                jamsLengthKm=_safe_float(row.get("JamsLengthInKms")),
                jamsCount=_safe_int(row.get("JamsCount")),
                travelTimePer10kmMin=_safe_float(row.get("TravelTimeLivePer10KmsMins")),
                historicTravelTimePer10kmMin=_safe_float(row.get("TravelTimeHistoricPer10KmsMins")),
                delayMin=_safe_float(row.get("MinsDelay")),
                source="TomTom",
            )

Remaining work.

Get to full DATEX II v3 schema validation

The current output has the right namespaces and shapes but is not validated against the official XSDs. Adding a validation step (xmlschema or lxml against the published DATEX II 3 schemas) and tightening the writer to satisfy it is the natural next step.

Onboard a per-segment dataset

City-wide aggregates exercise only half of the canonical model. A per-segment TomTom feed, an OpenStreetMap road-network export with traffic counters, or a DATEX II native source would stress-test the City vs refRoadSegment dichotomy and likely surface the need for a sibling SegmentObservation concept.

Round-trip DATEX II → canonical → SDM

Right now we only go TomTom CSV → canonical. A DATEX II reader (the inverse of datex2.py) would let us ingest from a real DATEX II feed and emit the SDM view, which is exactly the original UC4 brief in production: a GPS provider sends DATEX II, the city republishes as SDM Open Data.

Lower the bar for new adapters

Same goal as UC1 and UC2. For traffic feeds this matters even more: every provider has its own CSV/JSON dialect. A declarative YAML mapping and a CLI scaffolder would make ingesting a new provider a one-afternoon task.