Zum Hauptinhalt springen

Funktionsweise von Gap Filling Modules

Diese Anleitung bietet einen detaillierten Einblick in die interne Mechanik von GFMs, von der Registrierung bis zur Ausführung.

Modul-Laden

Wenn der EOS-Service startet, werden alle GFMs durch den GapFillingModuleLoader geladen und initialisiert.

Ladeprozess

class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}

async def load_all_modules(self):
"""Alle GFM-Factories laden und initialisieren."""
# Alle GFM-Factory-Klassen importieren
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... insgesamt 28 Module
)

# Jede Factory initialisieren
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory

Factory-Initialisierung

Jede Factory wird einmal initialisiert und verwaltet persistente Ressourcen:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}

async def init_cache(self):
"""Emissionsfaktoren für schnellen Zugriff in Speicher laden."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()

Orchestrator-Planung

Der Orchestrator koordiniert die GFM-Ausführung durch eine Planungsschleife.

Die Planungsschleife

class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Hauptplanungsschleife."""
# Initialisieren - Worker für alle Knoten erzeugen
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)

# Planungsschleife
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()

if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)

Worker-Erzeugung

Wenn ein Knoten zum Graphen hinzugefügt wird, erzeugt der Orchestrator Worker:

async def spawn_workers_for_node(self, node: Node):
"""Worker für alle anwendbaren GFMs erstellen."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)

if worker.should_be_scheduled():
self.schedule(worker)

Die drei Schlüsselmethoden

Jeder GFM-Worker implementiert drei Methoden, die seinen Lebenszyklus steuern.

should_be_scheduled()

Bestimmt, ob dieses GFM für einen gegebenen Knoten relevant ist:

class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Soll dieses GFM auf diesem Knoten laufen?"""
# Nur auf Lebensmittelprodukt-Flussknoten ausführen
if not isinstance(self.node, FoodProductFlowNode):
return False

# Nur ausführen, wenn Herkunft noch nicht gesetzt ist
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False

return True

can_run_now()

Prüft, ob alle Abhängigkeiten erfüllt sind:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Sind Abhängigkeiten erfüllt?"""
# Prüfen, ob erforderliche Eigenschaften verfügbar sind
# (Bevorzuge Eigenschaftsprüfung statt GFM-Zustand für lose Kopplung)

# Auf Kategoriezuweisung warten (gesetzt durch match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready

run()

Führt die Gap-Filling-Logik aus:

async def run(self, calc_graph: CalcGraph) -> None:
"""Gap-Filling-Berechnung ausführen."""
try:
# Erforderliche Eingaben holen
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)

# Berechnung durchführen
origin = await self.detect_origin(names, category)

# Knoten mit Ergebnis aktualisieren
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)

# GFM-Zustand aktualisieren
self.update_gfm_state("completed")

except Exception as e:
self.handle_error(e)

Worker-Statuswerte

Die Methode can_run_now() gibt einen dieser GapFillingWorkerStatusEnum-Werte zurück:

StatusBeschreibungOrchestrator-Aktion
readyAlle Abhängigkeiten erfülltSofort ausführen
rescheduleAbhängigkeiten noch nicht verfügbarFür nächste Iteration neu planen
cancelNicht anwendbar für diesen KnotenAus Warteschlange entfernen, als abgebrochen markieren

Eigenschaftssystem

GFMs lesen und schreiben Daten durch typisierte Eigenschaften auf Knoten.

Eigenschaften lesen

# Spezifischen Eigenschaftstyp holen
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("de")

# Menge mit Einheiten holen
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()

Eigenschaften schreiben

# Umweltflüsse setzen
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)

# Standortdaten setzen
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)

Gängige Eigenschaftstypen

EigenschaftBeschreibung
NamesPropMehrsprachige Produktnamen
QuantityPropMengen mit Einheitenumrechnung
GlossaryTermPropVerknüpfungen zu Terminologie/Kategorien
OriginPropGeografische Herkunftsdaten
LocationPropStandort mit Koordinaten
EnvironmentalFlowsPropAuswirkungsberechnungsergebnisse
GfmStatePropGFM-Ausführungsstatus-Verfolgung

GFM-Zustandsverfolgung

Jeder Knoten verfolgt den Ausführungszustand aller GFMs:

# GfmStateProp speichert pro-Modul-Zustand mit NodeGfmStateEnum-Werten
# "S" = geplant, "F" = abgeschlossen, "C" = abgebrochen
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # abgeschlossen
"OriginGapFillingWorker": "F", # abgeschlossen
"TransportDecisionGapFillingWorker": "S", # geplant (in Arbeit)
"GreenhouseGapFillingWorker": "S", # geplant (wartend)
"ImpactAssessmentGapFillingWorker": "S" # geplant (ausstehend)
}

Zustandswerte

Der GFM-Ausführungszustand wird mit NodeGfmStateEnum verfolgt:

ZustandCodeBeschreibung
scheduledSGFM ist zur Ausführung geplant (umfasst ausstehend, wartend, laufend)
finishedFGFM wurde erfolgreich abgeschlossen
canceledCGFM wurde abgebrochen (nicht anwendbar, übersprungen oder fehlgeschlagen)
hinweis

Die tatsächliche Enumeration verwendet Einzelbuchstaben-Codes ("S", "F", "C") für Speichereffizienz.

Abhängigkeitsverwaltung

GFMs deklarieren Abhängigkeiten durch ihre can_run_now()-Logik.

Gängige Abhängigkeitsmuster

54ab5918297fb5f93e6dbd4d2a7ca62b

Abhängigkeiten prüfen

Der empfohlene Ansatz ist, auf das Vorhandensein erforderlicher Eigenschaften zu prüfen, anstatt auf spezifische GFMs nach Namen zu warten. Dies macht Module loser gekoppelt und einfacher zu warten:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Prüfen, ob erforderliche Eigenschaften verfügbar sind."""
# Auf Herkunftsdaten prüfen (gesetzt durch origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule

# Auf Transportmodus prüfen (gesetzt durch transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule

# Auf Verarbeitungsdaten prüfen (gesetzt durch processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready
Bewährte Praxis

Die Prüfung auf Eigenschaften statt GFM-Namen bedeutet, dass Ihr Modul korrekt funktioniert, auch wenn das vorgelagerte GFM umbenannt, ersetzt wird oder die Eigenschaft durch einen anderen Mechanismus gesetzt wird (z.B. vom Benutzer bereitgestellte Daten).

Datenqualitätsverfolgung

GFMs verfolgen die Qualität und Quelle ihrer Berechnungen:

class DataQuality:
score: float # 0-1 Vertrauensbewertung
source: str # Datenquellen-Identifikator
methodology: str # Verwendete Berechnungsmethode

# Qualitätsbewertungen
QUALITY_LEVELS = {
"primary_data": 1.0, # Direkte Messung
"high_quality_proxy": 0.8, # Gleiches Produkt, anderer Kontext
"reasonable_proxy": 0.6, # Ähnliches Produkt in Kategorie
"default_value": 0.4, # Kategoriedurchschnitt
"uncertain_estimate": 0.2 # Breite Annahmen
}

Qualitätspropagation

Qualität verschlechtert sich durch Berechnungen:

def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""Kombinierte Qualität ist das Minimum aller Eingaben."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Zuverlässigkeit dieses GFMs

return base_quality * calculation_confidence

Fehlerbehandlung

GFMs implementieren sorgfältige Fehlerbehandlung:

async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")

except Exception as e:
# Fehler mit Kontext protokollieren
logger.error(
"GFM-Ausführung fehlgeschlagen",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)

# DataError zur Verfolgung erstellen
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)

# Zustand auf fehlgeschlagen aktualisieren
self.update_gfm_state("failed")

Fallback-Werte

Wenn ein GFM fehlschlägt, können abhängige GFMs Fallback-Werte verwenden:

def get_fallback_origin(self, category: str) -> OriginProp:
"""Standard-Herkunft basierend auf Kategorie bereitstellen."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))

Caching-Strategien

Factory-Level-Caching

Factories cachen gemeinsam genutzte Ressourcen:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Emissionsfaktoren cachen (von allen Workern verwendet)
self.emission_factors = await self.load_emission_factors()

# Aktivitätsdaten cachen
self.activities = await self.load_activities()

def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Worker teilen sich die gecachten Daten der Factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)

Worker-Level-Caching

Worker können Berechnungsergebnisse cachen:

class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}

async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"

if cache_key in self.local_cache:
return self.local_cache[cache_key]

result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result

return result

Debugging und Tracing

Ausführungsprotokollierung

Der Orchestrator protokolliert detaillierte Ausführungsinformationen:

logger.info(
"GFM-Ausführung",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)

Graph-Mutationen

Alle Änderungen am CalcGraph werden als Mutationen verfolgt:

# Jede Eigenschaftsänderung erstellt eine Mutation
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)

Dies bietet:

  • Nachvollziehbarkeit - Vollständige Historie aller Änderungen
  • Debugging - Nachverfolgen, wie Werte berechnet wurden
  • Reproduzierbarkeit - Berechnungen deterministisch wiederholen

Nächste Schritte

  • Modulkatalog - Verfügbare Module
  • GFM SDK (in Kürze) - Eigene Module erstellen