Fonctionnement des Gap Filling Modules
Ce guide fournit un apercu detaille des mecanismes internes des GFM, de l'enregistrement a l'execution.
Chargement des modules
Lorsque le service EOS demarre, tous les GFM sont charges et initialises via le GapFillingModuleLoader.
Processus de chargement
class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}
async def load_all_modules(self):
"""Charger et initialiser toutes les factories GFM."""
# Importer toutes les classes de factory GFM
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 28 modules au total
)
# Initialiser chaque factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory
Initialisation de la Factory
Chaque factory s'initialise une fois et maintient des ressources persistantes :
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}
async def init_cache(self):
"""Charger les facteurs d'emission en memoire pour un acces rapide."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()
Planification de l'orchestrateur
L'orchestrateur coordonne l'execution des GFM via une boucle de planification.
La boucle de planification
class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Boucle de planification principale."""
# Initialiser - creer des workers pour tous les noeuds
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)
# Boucle de planification
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()
if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)
Creation des workers
Lorsqu'un noeud est ajoute au graphe, l'orchestrateur cree des workers :
async def spawn_workers_for_node(self, node: Node):
"""Creer des workers pour tous les GFM applicables."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)
if worker.should_be_scheduled():
self.schedule(worker)
Les trois methodes cles
Chaque worker GFM implemente trois methodes qui controlent son cycle de vie.
should_be_scheduled()
Determine si ce GFM est pertinent pour un noeud donne :
class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Ce GFM doit-il s'executer sur ce noeud ?"""
# S'executer uniquement sur les noeuds de flux de produit alimentaire
if not isinstance(self.node, FoodProductFlowNode):
return False
# S'executer uniquement si l'origine n'est pas deja definie
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False
return True
can_run_now()
Verifie si toutes les dependances sont satisfaites :
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Les dependances sont-elles satisfaites ?"""
# Verifier si les proprietes requises sont disponibles
# (Preferer verifier les proprietes plutot que l'etat du GFM pour un couplage lache)
# Attendre l'attribution de categorie (definie par match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
run()
Execute la logique de comblement des lacunes :
async def run(self, calc_graph: CalcGraph) -> None:
"""Executer le calcul de comblement des lacunes."""
try:
# Obtenir les entrees requises
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)
# Effectuer le calcul
origin = await self.detect_origin(names, category)
# Mettre a jour le noeud avec le resultat
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)
# Mettre a jour l'etat du GFM
self.update_gfm_state("completed")
except Exception as e:
self.handle_error(e)
Valeurs d'etat du worker
La methode can_run_now() retourne l'une de ces valeurs GapFillingWorkerStatusEnum :
| Etat | Description | Action de l'orchestrateur |
|---|---|---|
ready | Toutes les dependances satisfaites | Executer immediatement |
reschedule | Dependances pas encore disponibles | Replanifier pour la prochaine iteration |
cancel | Non applicable pour ce noeud | Retirer de la file, marquer comme annule |
Systeme de proprietes
Les GFM lisent et ecrivent des donnees via des proprietes typees sur les noeuds.
Lecture des proprietes
# Obtenir un type de propriete specifique
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")
# Obtenir la quantite avec unites
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()
Ecriture des proprietes
# Definir les flux environnementaux
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)
# Definir les donnees de localisation
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)
Types de proprietes courants
| Propriete | Description |
|---|---|
NamesProp | Noms de produits multilingues |
QuantityProp | Quantites avec conversion d'unites |
GlossaryTermProp | Liens vers la terminologie/categories |
OriginProp | Donnees d'origine geographique |
LocationProp | Localisation avec coordonnees |
EnvironmentalFlowsProp | Resultats du calcul d'incidence |
GfmStateProp | Suivi de l'etat d'execution du GFM |
Suivi de l'etat du GFM
Chaque noeud suit l'etat d'execution de tous les GFM :
# GfmStateProp stocke l'etat par module a l'aide des valeurs NodeGfmStateEnum
# "S" = planifie, "F" = termine, "C" = annule
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # termine
"OriginGapFillingWorker": "F", # termine
"TransportDecisionGapFillingWorker": "S", # planifie (en cours)
"GreenhouseGapFillingWorker": "S", # planifie (en attente)
"ImpactAssessmentGapFillingWorker": "S" # planifie (en attente)
}
Valeurs d'etat
L'etat d'execution du GFM est suivi a l'aide de NodeGfmStateEnum :
| Etat | Code | Description |
|---|---|---|
scheduled | S | Le GFM est planifie pour s'executer (inclut en attente, attente, en cours) |
finished | F | Le GFM s'est termine avec succes |
canceled | C | Le GFM a ete annule (non applicable, ignore ou echec) |
L'enum reel utilise des codes a une lettre ("S", "F", "C") pour l'efficacite du stockage.
Gestion des dependances
Les GFM declarent leurs dependances via leur logique can_run_now().
Patrons de dependance courants
Verification des dependances
L'approche recommandee est de verifier la presence des proprietes requises plutot que d'attendre des GFM specifiques par nom. Cela rend les modules plus faiblement couples et plus faciles a maintenir :
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Verifier si les proprietes requises sont disponibles."""
# Verifier les donnees d'origine (definies par origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule
# Verifier le mode de transport (defini par transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule
# Verifier les donnees de transformation (definies par processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
Verifier les proprietes au lieu des noms de GFM signifie que votre module fonctionnera correctement meme si le GFM en amont est renomme, remplace ou si la propriete est definie par un mecanisme different (par exemple, donnees fournies par l'utilisateur).
Suivi de la qualite des donnees
Les GFM suivent la qualite et la source de leurs calculs :
class DataQuality:
score: float # Score de confiance 0-1
source: str # Identifiant de la source de donnees
methodology: str # Methode de calcul utilisee
# Scores de qualite
QUALITY_LEVELS = {
"primary_data": 1.0, # Mesure directe
"high_quality_proxy": 0.8, # Meme produit, contexte different
"reasonable_proxy": 0.6, # Produit similaire dans la categorie
"default_value": 0.4, # Moyenne de la categorie
"uncertain_estimate": 0.2 # Hypotheses larges
}
Propagation de la qualite
La qualite se degrade a travers les calculs :
def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""La qualite combinee est le minimum de toutes les entrees."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Fiabilite de ce GFM
return base_quality * calculation_confidence
Gestion des erreurs
Les GFM implementent une gestion gracieuse des erreurs :
async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")
except Exception as e:
# Journaliser l'erreur avec contexte
logger.error(
"Echec de l'execution du GFM",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)
# Creer une DataError pour le suivi
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)
# Mettre a jour l'etat en echec
self.update_gfm_state("failed")
Valeurs de repli
Lorsqu'un GFM echoue, les GFM dependants peuvent utiliser des valeurs de repli :
def get_fallback_origin(self, category: str) -> OriginProp:
"""Fournir une origine par defaut basee sur la categorie."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))
Strategies de mise en cache
Mise en cache au niveau Factory
Les factories mettent en cache les ressources partagees :
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Mettre en cache les facteurs d'emission (utilises par tous les workers)
self.emission_factors = await self.load_emission_factors()
# Mettre en cache les donnees d'activite
self.activities = await self.load_activities()
def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Les workers partagent les donnees en cache de la factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)
Mise en cache au niveau Worker
Les workers peuvent mettre en cache les resultats de calcul :
class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}
async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"
if cache_key in self.local_cache:
return self.local_cache[cache_key]
result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result
return result
Debogage et tracage
Journalisation de l'execution
L'orchestrateur journalise des informations d'execution detaillees :
logger.info(
"Execution du GFM",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)
Mutations du graphe
Toutes les modifications du CalcGraph sont suivies comme mutations :
# Chaque changement de propriete cree une mutation
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)
Cela fournit :
- Auditabilite - Historique complet de tous les changements
- Debogage - Tracer comment les valeurs ont ete calculees
- Reproductibilite - Rejouer les calculs de maniere deterministe
Prochaines etapes
- Catalogue des modules - Modules disponibles
- SDK GFM (bientot disponible) - Construire des modules personnalises