Funktionsweise von Gap Filling Modules
Diese Anleitung bietet einen detaillierten Einblick in die interne Mechanik von GFMs, von der Registrierung bis zur Ausführung.
Modul-Laden
Wenn der EOS-Service startet, werden alle GFMs durch den GapFillingModuleLoader geladen und initialisiert.
Ladeprozess
class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}
async def load_all_modules(self):
"""Alle GFM-Factories laden und initialisieren."""
# Alle GFM-Factory-Klassen importieren
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... insgesamt 28 Module
)
# Jede Factory initialisieren
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory
Factory-Initialisierung
Jede Factory wird einmal initialisiert und verwaltet persistente Ressourcen:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}
async def init_cache(self):
"""Emissionsfaktoren für schnellen Zugriff in Speicher laden."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()
Orchestrator-Planung
Der Orchestrator koordiniert die GFM-Ausführung durch eine Planungsschleife.
Die Planungsschleife
class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Hauptplanungsschleife."""
# Initialisieren - Worker für alle Knoten erzeugen
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)
# Planungsschleife
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()
if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)
Worker-Erzeugung
Wenn ein Knoten zum Graphen hinzugefügt wird, erzeugt der Orchestrator Worker:
async def spawn_workers_for_node(self, node: Node):
"""Worker für alle anwendbaren GFMs erstellen."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)
if worker.should_be_scheduled():
self.schedule(worker)
Die drei Schlüsselmethoden
Jeder GFM-Worker implementiert drei Methoden, die seinen Lebenszyklus steuern.
should_be_scheduled()
Bestimmt, ob dieses GFM für einen gegebenen Knoten relevant ist:
class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Soll dieses GFM auf diesem Knoten laufen?"""
# Nur auf Lebensmittelprodukt-Flussknoten ausführen
if not isinstance(self.node, FoodProductFlowNode):
return False
# Nur ausführen, wenn Herkunft noch nicht gesetzt ist
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False
return True
can_run_now()
Prüft, ob alle Abhängigkeiten erfüllt sind:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Sind Abhängigkeiten erfüllt?"""
# Prüfen, ob erforderliche Eigenschaften verfügbar sind
# (Bevorzuge Eigenschaftsprüfung statt GFM-Zustand für lose Kopplung)
# Auf Kategoriezuweisung warten (gesetzt durch match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
run()
Führt die Gap-Filling-Logik aus:
async def run(self, calc_graph: CalcGraph) -> None:
"""Gap-Filling-Berechnung ausführen."""
try:
# Erforderliche Eingaben holen
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)
# Berechnung durchführen
origin = await self.detect_origin(names, category)
# Knoten mit Ergebnis aktualisieren
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)
# GFM-Zustand aktualisieren
self.update_gfm_state("completed")
except Exception as e:
self.handle_error(e)
Worker-Statuswerte
Die Methode can_run_now() gibt einen dieser GapFillingWorkerStatusEnum-Werte zurück:
| Status | Beschreibung | Orchestrator-Aktion |
|---|---|---|
ready | Alle Abhängigkeiten erfüllt | Sofort ausführen |
reschedule | Abhängigkeiten noch nicht verfügbar | Für nächste Iteration neu planen |
cancel | Nicht anwendbar für diesen Knoten | Aus Warteschlange entfernen, als abgebrochen markieren |
Eigenschaftssystem
GFMs lesen und schreiben Daten durch typisierte Eigenschaften auf Knoten.
Eigenschaften lesen
# Spezifischen Eigenschaftstyp holen
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("de")
# Menge mit Einheiten holen
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()
Eigenschaften schreiben
# Umweltflüsse setzen
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)
# Standortdaten setzen
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)
Gängige Eigenschaftstypen
| Eigenschaft | Beschreibung |
|---|---|
NamesProp | Mehrsprachige Produktnamen |
QuantityProp | Mengen mit Einheitenumrechnung |
GlossaryTermProp | Verknüpfungen zu Terminologie/Kategorien |
OriginProp | Geografische Herkunftsdaten |
LocationProp | Standort mit Koordinaten |
EnvironmentalFlowsProp | Auswirkungsberechnungsergebnisse |
GfmStateProp | GFM-Ausführungsstatus-Verfolgung |
GFM-Zustandsverfolgung
Jeder Knoten verfolgt den Ausführungszustand aller GFMs:
# GfmStateProp speichert pro-Modul-Zustand mit NodeGfmStateEnum-Werten
# "S" = geplant, "F" = abgeschlossen, "C" = abgebrochen
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # abgeschlossen
"OriginGapFillingWorker": "F", # abgeschlossen
"TransportDecisionGapFillingWorker": "S", # geplant (in Arbeit)
"GreenhouseGapFillingWorker": "S", # geplant (wartend)
"ImpactAssessmentGapFillingWorker": "S" # geplant (ausstehend)
}
Zustandswerte
Der GFM-Ausführungszustand wird mit NodeGfmStateEnum verfolgt:
| Zustand | Code | Beschreibung |
|---|---|---|
scheduled | S | GFM ist zur Ausführung geplant (umfasst ausstehend, wartend, laufend) |
finished | F | GFM wurde erfolgreich abgeschlossen |
canceled | C | GFM wurde abgebrochen (nicht anwendbar, übersprungen oder fehlgeschlagen) |
Die tatsächliche Enumeration verwendet Einzelbuchstaben-Codes ("S", "F", "C") für Speichereffizienz.
Abhängigkeitsverwaltung
GFMs deklarieren Abhängigkeiten durch ihre can_run_now()-Logik.
Gängige Abhängigkeitsmuster
Abhängigkeiten prüfen
Der empfohlene Ansatz ist, auf das Vorhandensein erforderlicher Eigenschaften zu prüfen, anstatt auf spezifische GFMs nach Namen zu warten. Dies macht Module loser gekoppelt und einfacher zu warten:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Prüfen, ob erforderliche Eigenschaften verfügbar sind."""
# Auf Herkunftsdaten prüfen (gesetzt durch origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule
# Auf Transportmodus prüfen (gesetzt durch transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule
# Auf Verarbeitungsdaten prüfen (gesetzt durch processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
Die Prüfung auf Eigenschaften statt GFM-Namen bedeutet, dass Ihr Modul korrekt funktioniert, auch wenn das vorgelagerte GFM umbenannt, ersetzt wird oder die Eigenschaft durch einen anderen Mechanismus gesetzt wird (z.B. vom Benutzer bereitgestellte Daten).
Datenqualitätsverfolgung
GFMs verfolgen die Qualität und Quelle ihrer Berechnungen:
class DataQuality:
score: float # 0-1 Vertrauensbewertung
source: str # Datenquellen-Identifikator
methodology: str # Verwendete Berechnungsmethode
# Qualitätsbewertungen
QUALITY_LEVELS = {
"primary_data": 1.0, # Direkte Messung
"high_quality_proxy": 0.8, # Gleiches Produkt, anderer Kontext
"reasonable_proxy": 0.6, # Ähnliches Produkt in Kategorie
"default_value": 0.4, # Kategoriedurchschnitt
"uncertain_estimate": 0.2 # Breite Annahmen
}
Qualitätspropagation
Qualität verschlechtert sich durch Berechnungen:
def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""Kombinierte Qualität ist das Minimum aller Eingaben."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Zuverlässigkeit dieses GFMs
return base_quality * calculation_confidence
Fehlerbehandlung
GFMs implementieren sorgfältige Fehlerbehandlung:
async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")
except Exception as e:
# Fehler mit Kontext protokollieren
logger.error(
"GFM-Ausführung fehlgeschlagen",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)
# DataError zur Verfolgung erstellen
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)
# Zustand auf fehlgeschlagen aktualisieren
self.update_gfm_state("failed")
Fallback-Werte
Wenn ein GFM fehlschlägt, können abhängige GFMs Fallback-Werte verwenden:
def get_fallback_origin(self, category: str) -> OriginProp:
"""Standard-Herkunft basierend auf Kategorie bereitstellen."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))
Caching-Strategien
Factory-Level-Caching
Factories cachen gemeinsam genutzte Ressourcen:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Emissionsfaktoren cachen (von allen Workern verwendet)
self.emission_factors = await self.load_emission_factors()
# Aktivitätsdaten cachen
self.activities = await self.load_activities()
def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Worker teilen sich die gecachten Daten der Factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)
Worker-Level-Caching
Worker können Berechnungsergebnisse cachen:
class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}
async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"
if cache_key in self.local_cache:
return self.local_cache[cache_key]
result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result
return result
Debugging und Tracing
Ausführungsprotokollierung
Der Orchestrator protokolliert detaillierte Ausführungsinformationen:
logger.info(
"GFM-Ausführung",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)
Graph-Mutationen
Alle Änderungen am CalcGraph werden als Mutationen verfolgt:
# Jede Eigenschaftsänderung erstellt eine Mutation
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)
Dies bietet:
- Nachvollziehbarkeit - Vollständige Historie aller Änderungen
- Debugging - Nachverfolgen, wie Werte berechnet wurden
- Reproduzierbarkeit - Berechnungen deterministisch wiederholen
Nächste Schritte
- Modulkatalog - Verfügbare Module
- GFM SDK (in Kürze) - Eigene Module erstellen