Come Funzionano i Gap Filling Module
Questa guida fornisce uno sguardo dettagliato alla meccanica interna dei GFM, dalla registrazione all'esecuzione.
Caricamento dei Moduli
Quando il servizio EOS si avvia, tutti i GFM vengono caricati e inizializzati attraverso il GapFillingModuleLoader.
Processo di Caricamento
class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}
async def load_all_modules(self):
"""Carica e inizializza tutte le Factory GFM."""
# Importa tutte le classi Factory GFM
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 28 moduli in totale
)
# Inizializza ogni factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory
Inizializzazione Factory
Ogni factory viene inizializzata una volta e mantiene risorse persistenti:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}
async def init_cache(self):
"""Carica i fattori di emissione in memoria per accesso rapido."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()
Pianificazione dell'Orchestratore
L'Orchestratore coordina l'esecuzione dei GFM attraverso un ciclo di pianificazione.
Il Ciclo di Pianificazione
class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Ciclo di pianificazione principale."""
# Inizializza - crea worker per tutti i nodi
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)
# Ciclo di pianificazione
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()
if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)
Creazione dei Worker
Quando un nodo viene aggiunto al grafo, l'orchestratore crea i worker:
async def spawn_workers_for_node(self, node: Node):
"""Crea worker per tutti i GFM applicabili."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)
if worker.should_be_scheduled():
self.schedule(worker)
I Tre Metodi Chiave
Ogni worker GFM implementa tre metodi che controllano il suo ciclo di vita.
should_be_scheduled()
Determina se questo GFM e rilevante per un dato nodo:
class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Questo GFM dovrebbe essere eseguito su questo nodo?"""
# Esegui solo sui nodi food product flow
if not isinstance(self.node, FoodProductFlowNode):
return False
# Esegui solo se l'origine non e gia impostata
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False
return True
can_run_now()
Verifica se tutte le dipendenze sono soddisfatte:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Le dipendenze sono soddisfatte?"""
# Verifica se le proprieta richieste sono disponibili
# (Preferisci verificare le proprieta invece dello stato GFM per accoppiamento lasco)
# Attendi l'assegnazione della categoria (impostata da match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
run()
Esegue la logica di gap-filling:
async def run(self, calc_graph: CalcGraph) -> None:
"""Esegui il calcolo di gap-filling."""
try:
# Ottieni gli input richiesti
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)
# Esegui il calcolo
origin = await self.detect_origin(names, category)
# Aggiorna il nodo con il risultato
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)
# Aggiorna lo stato GFM
self.update_gfm_state("completed")
except Exception as e:
self.handle_error(e)
Valori di Stato del Worker
Il metodo can_run_now() restituisce uno di questi valori GapFillingWorkerStatusEnum:
| Stato | Descrizione | Azione Orchestratore |
|---|---|---|
ready | Tutte le dipendenze soddisfatte | Esegui immediatamente |
reschedule | Dipendenze non ancora disponibili | Ripianifica per la prossima iterazione |
cancel | Non applicabile per questo nodo | Rimuovi dalla coda, marca come cancellato |
Sistema di Proprieta
I GFM leggono e scrivono dati attraverso proprieta tipizzate sui nodi.
Lettura delle Proprieta
# Ottieni un tipo di proprieta specifico
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")
# Ottieni quantita con unita
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()
Scrittura delle Proprieta
# Imposta flussi ambientali
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)
# Imposta dati di localizzazione
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)
Tipi di Proprieta Comuni
| Proprieta | Descrizione |
|---|---|
NamesProp | Nomi prodotto multilingue |
QuantityProp | Quantita con conversione unita |
GlossaryTermProp | Link a terminologia/categorie |
OriginProp | Dati di origine geografica |
LocationProp | Localizzazione con coordinate |
EnvironmentalFlowsProp | Risultati del calcolo di impatto |
GfmStateProp | Tracciamento dello stato di esecuzione GFM |
Tracciamento dello Stato GFM
Ogni nodo traccia lo stato di esecuzione di tutti i GFM:
# GfmStateProp memorizza lo stato per-modulo usando valori NodeGfmStateEnum
# "S" = scheduled, "F" = finished, "C" = canceled
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # completato
"OriginGapFillingWorker": "F", # completato
"TransportDecisionGapFillingWorker": "S", # pianificato (in corso)
"GreenhouseGapFillingWorker": "S", # pianificato (in attesa)
"ImpactAssessmentGapFillingWorker": "S" # pianificato (in sospeso)
}
Valori di Stato
Lo stato di esecuzione GFM viene tracciato usando NodeGfmStateEnum:
| Stato | Codice | Descrizione |
|---|---|---|
scheduled | S | GFM pianificato per essere eseguito (include in sospeso, in attesa, in esecuzione) |
finished | F | GFM completato con successo |
canceled | C | GFM cancellato (non applicabile, saltato, o fallito) |
L'enum effettivo usa codici a singola lettera ("S", "F", "C") per efficienza di archiviazione.
Gestione delle Dipendenze
I GFM dichiarano le dipendenze attraverso la loro logica can_run_now().
Pattern di Dipendenza Comuni
Verifica delle Dipendenze
L'approccio consigliato e verificare la presenza delle proprieta richieste piuttosto che attendere specifici GFM per nome. Questo rende i moduli piu debolmente accoppiati e piu facili da mantenere:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Verifica se le proprieta richieste sono disponibili."""
# Verifica i dati di origine (impostati da origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule
# Verifica la modalita di trasporto (impostata da transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule
# Verifica i dati di trasformazione (impostati da processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
Verificare le proprieta invece dei nomi GFM significa che il tuo modulo funzionera correttamente anche se il GFM a monte viene rinominato, sostituito, o se la proprieta viene impostata attraverso un meccanismo diverso (es. dati forniti dall'utente).
Tracciamento della Qualita dei Dati
I GFM tracciano la qualita e la fonte dei loro calcoli:
class DataQuality:
score: float # Punteggio di confidenza 0-1
source: str # Identificatore della fonte dati
methodology: str # Metodo di calcolo utilizzato
# Punteggi di qualita
QUALITY_LEVELS = {
"primary_data": 1.0, # Misurazione diretta
"high_quality_proxy": 0.8, # Stesso prodotto, contesto diverso
"reasonable_proxy": 0.6, # Prodotto simile nella categoria
"default_value": 0.4, # Media della categoria
"uncertain_estimate": 0.2 # Ipotesi generiche
}
Propagazione della Qualita
La qualita si degrada attraverso i calcoli:
def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""La qualita combinata e il minimo di tutti gli input."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Affidabilita di questo GFM
return base_quality * calculation_confidence
Gestione degli Errori
I GFM implementano una gestione degli errori robusta:
async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")
except Exception as e:
# Registra l'errore con contesto
logger.error(
"Esecuzione GFM fallita",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)
# Crea DataError per tracciamento
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)
# Aggiorna lo stato a fallito
self.update_gfm_state("failed")
Valori di Fallback
Quando un GFM fallisce, i GFM dipendenti possono usare valori di fallback:
def get_fallback_origin(self, category: str) -> OriginProp:
"""Fornisce origine predefinita basata sulla categoria."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))
Strategie di Caching
Caching a Livello Factory
Le factory memorizzano in cache le risorse condivise:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Cache dei fattori di emissione (usati da tutti i worker)
self.emission_factors = await self.load_emission_factors()
# Cache dei dati di attivita
self.activities = await self.load_activities()
def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# I worker condividono i dati in cache della factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)
Caching a Livello Worker
I worker possono memorizzare in cache i risultati dei calcoli:
class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}
async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"
if cache_key in self.local_cache:
return self.local_cache[cache_key]
result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result
return result
Debug e Tracciamento
Logging dell'Esecuzione
L'orchestratore registra informazioni dettagliate sull'esecuzione:
logger.info(
"Esecuzione GFM",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)
Mutazioni del Grafo
Tutte le modifiche al CalcGraph vengono tracciate come mutazioni:
# Ogni modifica di proprieta crea una mutazione
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)
Questo fornisce:
- Verificabilita - Storia completa di tutte le modifiche
- Debug - Traccia come sono stati calcolati i valori
- Riproducibilita - Riproduzione deterministica dei calcoli
Prossimi Passi
- Catalogo Moduli - Moduli disponibili
- GFM SDK (in arrivo) - Costruisci moduli personalizzati