Aller au contenu principal

Fonctionnement des Gap Filling Modules

Ce guide fournit un apercu detaille des mecanismes internes des GFM, de l'enregistrement a l'execution.

Chargement des modules

Lorsque le service EOS demarre, tous les GFM sont charges et initialises via le GapFillingModuleLoader.

Processus de chargement

class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}

async def load_all_modules(self):
"""Charger et initialiser toutes les factories GFM."""
# Importer toutes les classes de factory GFM
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 28 modules au total
)

# Initialiser chaque factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory

Initialisation de la Factory

Chaque factory s'initialise une fois et maintient des ressources persistantes :

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}

async def init_cache(self):
"""Charger les facteurs d'emission en memoire pour un acces rapide."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()

Planification de l'orchestrateur

L'orchestrateur coordonne l'execution des GFM via une boucle de planification.

La boucle de planification

class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Boucle de planification principale."""
# Initialiser - creer des workers pour tous les noeuds
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)

# Boucle de planification
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()

if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)

Creation des workers

Lorsqu'un noeud est ajoute au graphe, l'orchestrateur cree des workers :

async def spawn_workers_for_node(self, node: Node):
"""Creer des workers pour tous les GFM applicables."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)

if worker.should_be_scheduled():
self.schedule(worker)

Les trois methodes cles

Chaque worker GFM implemente trois methodes qui controlent son cycle de vie.

should_be_scheduled()

Determine si ce GFM est pertinent pour un noeud donne :

class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Ce GFM doit-il s'executer sur ce noeud ?"""
# S'executer uniquement sur les noeuds de flux de produit alimentaire
if not isinstance(self.node, FoodProductFlowNode):
return False

# S'executer uniquement si l'origine n'est pas deja definie
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False

return True

can_run_now()

Verifie si toutes les dependances sont satisfaites :

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Les dependances sont-elles satisfaites ?"""
# Verifier si les proprietes requises sont disponibles
# (Preferer verifier les proprietes plutot que l'etat du GFM pour un couplage lache)

# Attendre l'attribution de categorie (definie par match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready

run()

Execute la logique de comblement des lacunes :

async def run(self, calc_graph: CalcGraph) -> None:
"""Executer le calcul de comblement des lacunes."""
try:
# Obtenir les entrees requises
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)

# Effectuer le calcul
origin = await self.detect_origin(names, category)

# Mettre a jour le noeud avec le resultat
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)

# Mettre a jour l'etat du GFM
self.update_gfm_state("completed")

except Exception as e:
self.handle_error(e)

Valeurs d'etat du worker

La methode can_run_now() retourne l'une de ces valeurs GapFillingWorkerStatusEnum :

EtatDescriptionAction de l'orchestrateur
readyToutes les dependances satisfaitesExecuter immediatement
rescheduleDependances pas encore disponiblesReplanifier pour la prochaine iteration
cancelNon applicable pour ce noeudRetirer de la file, marquer comme annule

Systeme de proprietes

Les GFM lisent et ecrivent des donnees via des proprietes typees sur les noeuds.

Lecture des proprietes

# Obtenir un type de propriete specifique
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")

# Obtenir la quantite avec unites
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()

Ecriture des proprietes

# Definir les flux environnementaux
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)

# Definir les donnees de localisation
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)

Types de proprietes courants

ProprieteDescription
NamesPropNoms de produits multilingues
QuantityPropQuantites avec conversion d'unites
GlossaryTermPropLiens vers la terminologie/categories
OriginPropDonnees d'origine geographique
LocationPropLocalisation avec coordonnees
EnvironmentalFlowsPropResultats du calcul d'incidence
GfmStatePropSuivi de l'etat d'execution du GFM

Suivi de l'etat du GFM

Chaque noeud suit l'etat d'execution de tous les GFM :

# GfmStateProp stocke l'etat par module a l'aide des valeurs NodeGfmStateEnum
# "S" = planifie, "F" = termine, "C" = annule
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # termine
"OriginGapFillingWorker": "F", # termine
"TransportDecisionGapFillingWorker": "S", # planifie (en cours)
"GreenhouseGapFillingWorker": "S", # planifie (en attente)
"ImpactAssessmentGapFillingWorker": "S" # planifie (en attente)
}

Valeurs d'etat

L'etat d'execution du GFM est suivi a l'aide de NodeGfmStateEnum :

EtatCodeDescription
scheduledSLe GFM est planifie pour s'executer (inclut en attente, attente, en cours)
finishedFLe GFM s'est termine avec succes
canceledCLe GFM a ete annule (non applicable, ignore ou echec)
remarque

L'enum reel utilise des codes a une lettre ("S", "F", "C") pour l'efficacite du stockage.

Gestion des dependances

Les GFM declarent leurs dependances via leur logique can_run_now().

Patrons de dependance courants

efe7383014e3e742a793e0d393d08698

Verification des dependances

L'approche recommandee est de verifier la presence des proprietes requises plutot que d'attendre des GFM specifiques par nom. Cela rend les modules plus faiblement couples et plus faciles a maintenir :

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Verifier si les proprietes requises sont disponibles."""
# Verifier les donnees d'origine (definies par origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule

# Verifier le mode de transport (defini par transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule

# Verifier les donnees de transformation (definies par processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready
Bonne pratique

Verifier les proprietes au lieu des noms de GFM signifie que votre module fonctionnera correctement meme si le GFM en amont est renomme, remplace ou si la propriete est definie par un mecanisme different (par exemple, donnees fournies par l'utilisateur).

Suivi de la qualite des donnees

Les GFM suivent la qualite et la source de leurs calculs :

class DataQuality:
score: float # Score de confiance 0-1
source: str # Identifiant de la source de donnees
methodology: str # Methode de calcul utilisee

# Scores de qualite
QUALITY_LEVELS = {
"primary_data": 1.0, # Mesure directe
"high_quality_proxy": 0.8, # Meme produit, contexte different
"reasonable_proxy": 0.6, # Produit similaire dans la categorie
"default_value": 0.4, # Moyenne de la categorie
"uncertain_estimate": 0.2 # Hypotheses larges
}

Propagation de la qualite

La qualite se degrade a travers les calculs :

def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""La qualite combinee est le minimum de toutes les entrees."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Fiabilite de ce GFM

return base_quality * calculation_confidence

Gestion des erreurs

Les GFM implementent une gestion gracieuse des erreurs :

async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")

except Exception as e:
# Journaliser l'erreur avec contexte
logger.error(
"Echec de l'execution du GFM",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)

# Creer une DataError pour le suivi
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)

# Mettre a jour l'etat en echec
self.update_gfm_state("failed")

Valeurs de repli

Lorsqu'un GFM echoue, les GFM dependants peuvent utiliser des valeurs de repli :

def get_fallback_origin(self, category: str) -> OriginProp:
"""Fournir une origine par defaut basee sur la categorie."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))

Strategies de mise en cache

Mise en cache au niveau Factory

Les factories mettent en cache les ressources partagees :

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Mettre en cache les facteurs d'emission (utilises par tous les workers)
self.emission_factors = await self.load_emission_factors()

# Mettre en cache les donnees d'activite
self.activities = await self.load_activities()

def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Les workers partagent les donnees en cache de la factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)

Mise en cache au niveau Worker

Les workers peuvent mettre en cache les resultats de calcul :

class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}

async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"

if cache_key in self.local_cache:
return self.local_cache[cache_key]

result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result

return result

Debogage et tracage

Journalisation de l'execution

L'orchestrateur journalise des informations d'execution detaillees :

logger.info(
"Execution du GFM",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)

Mutations du graphe

Toutes les modifications du CalcGraph sont suivies comme mutations :

# Chaque changement de propriete cree une mutation
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)

Cela fournit :

  • Auditabilite - Historique complet de tous les changements
  • Debogage - Tracer comment les valeurs ont ete calculees
  • Reproductibilite - Rejouer les calculs de maniere deterministe

Prochaines etapes

  • Catalogue des modules - Modules disponibles
  • SDK GFM (bientot disponible) - Construire des modules personnalises