Passa al contenuto principale

Come Funzionano i Gap Filling Module

Questa guida fornisce uno sguardo dettagliato alla meccanica interna dei GFM, dalla registrazione all'esecuzione.

Caricamento dei Moduli

Quando il servizio EOS si avvia, tutti i GFM vengono caricati e inizializzati attraverso il GapFillingModuleLoader.

Processo di Caricamento

class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}

async def load_all_modules(self):
"""Carica e inizializza tutte le Factory GFM."""
# Importa tutte le classi Factory GFM
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 28 moduli in totale
)

# Inizializza ogni factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory

Inizializzazione Factory

Ogni factory viene inizializzata una volta e mantiene risorse persistenti:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}

async def init_cache(self):
"""Carica i fattori di emissione in memoria per accesso rapido."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()

Pianificazione dell'Orchestratore

L'Orchestratore coordina l'esecuzione dei GFM attraverso un ciclo di pianificazione.

Il Ciclo di Pianificazione

class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Ciclo di pianificazione principale."""
# Inizializza - crea worker per tutti i nodi
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)

# Ciclo di pianificazione
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()

if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)

Creazione dei Worker

Quando un nodo viene aggiunto al grafo, l'orchestratore crea i worker:

async def spawn_workers_for_node(self, node: Node):
"""Crea worker per tutti i GFM applicabili."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)

if worker.should_be_scheduled():
self.schedule(worker)

I Tre Metodi Chiave

Ogni worker GFM implementa tre metodi che controllano il suo ciclo di vita.

should_be_scheduled()

Determina se questo GFM e rilevante per un dato nodo:

class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Questo GFM dovrebbe essere eseguito su questo nodo?"""
# Esegui solo sui nodi food product flow
if not isinstance(self.node, FoodProductFlowNode):
return False

# Esegui solo se l'origine non e gia impostata
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False

return True

can_run_now()

Verifica se tutte le dipendenze sono soddisfatte:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Le dipendenze sono soddisfatte?"""
# Verifica se le proprieta richieste sono disponibili
# (Preferisci verificare le proprieta invece dello stato GFM per accoppiamento lasco)

# Attendi l'assegnazione della categoria (impostata da match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready

run()

Esegue la logica di gap-filling:

async def run(self, calc_graph: CalcGraph) -> None:
"""Esegui il calcolo di gap-filling."""
try:
# Ottieni gli input richiesti
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)

# Esegui il calcolo
origin = await self.detect_origin(names, category)

# Aggiorna il nodo con il risultato
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)

# Aggiorna lo stato GFM
self.update_gfm_state("completed")

except Exception as e:
self.handle_error(e)

Valori di Stato del Worker

Il metodo can_run_now() restituisce uno di questi valori GapFillingWorkerStatusEnum:

StatoDescrizioneAzione Orchestratore
readyTutte le dipendenze soddisfatteEsegui immediatamente
rescheduleDipendenze non ancora disponibiliRipianifica per la prossima iterazione
cancelNon applicabile per questo nodoRimuovi dalla coda, marca come cancellato

Sistema di Proprieta

I GFM leggono e scrivono dati attraverso proprieta tipizzate sui nodi.

Lettura delle Proprieta

# Ottieni un tipo di proprieta specifico
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")

# Ottieni quantita con unita
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()

Scrittura delle Proprieta

# Imposta flussi ambientali
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)

# Imposta dati di localizzazione
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)

Tipi di Proprieta Comuni

ProprietaDescrizione
NamesPropNomi prodotto multilingue
QuantityPropQuantita con conversione unita
GlossaryTermPropLink a terminologia/categorie
OriginPropDati di origine geografica
LocationPropLocalizzazione con coordinate
EnvironmentalFlowsPropRisultati del calcolo di impatto
GfmStatePropTracciamento dello stato di esecuzione GFM

Tracciamento dello Stato GFM

Ogni nodo traccia lo stato di esecuzione di tutti i GFM:

# GfmStateProp memorizza lo stato per-modulo usando valori NodeGfmStateEnum
# "S" = scheduled, "F" = finished, "C" = canceled
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # completato
"OriginGapFillingWorker": "F", # completato
"TransportDecisionGapFillingWorker": "S", # pianificato (in corso)
"GreenhouseGapFillingWorker": "S", # pianificato (in attesa)
"ImpactAssessmentGapFillingWorker": "S" # pianificato (in sospeso)
}

Valori di Stato

Lo stato di esecuzione GFM viene tracciato usando NodeGfmStateEnum:

StatoCodiceDescrizione
scheduledSGFM pianificato per essere eseguito (include in sospeso, in attesa, in esecuzione)
finishedFGFM completato con successo
canceledCGFM cancellato (non applicabile, saltato, o fallito)
note

L'enum effettivo usa codici a singola lettera ("S", "F", "C") per efficienza di archiviazione.

Gestione delle Dipendenze

I GFM dichiarano le dipendenze attraverso la loro logica can_run_now().

Pattern di Dipendenza Comuni

2b939610788bb765423a14d53e61ee95

Verifica delle Dipendenze

L'approccio consigliato e verificare la presenza delle proprieta richieste piuttosto che attendere specifici GFM per nome. Questo rende i moduli piu debolmente accoppiati e piu facili da mantenere:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Verifica se le proprieta richieste sono disponibili."""
# Verifica i dati di origine (impostati da origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule

# Verifica la modalita di trasporto (impostata da transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule

# Verifica i dati di trasformazione (impostati da processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready
Buona Pratica

Verificare le proprieta invece dei nomi GFM significa che il tuo modulo funzionera correttamente anche se il GFM a monte viene rinominato, sostituito, o se la proprieta viene impostata attraverso un meccanismo diverso (es. dati forniti dall'utente).

Tracciamento della Qualita dei Dati

I GFM tracciano la qualita e la fonte dei loro calcoli:

class DataQuality:
score: float # Punteggio di confidenza 0-1
source: str # Identificatore della fonte dati
methodology: str # Metodo di calcolo utilizzato

# Punteggi di qualita
QUALITY_LEVELS = {
"primary_data": 1.0, # Misurazione diretta
"high_quality_proxy": 0.8, # Stesso prodotto, contesto diverso
"reasonable_proxy": 0.6, # Prodotto simile nella categoria
"default_value": 0.4, # Media della categoria
"uncertain_estimate": 0.2 # Ipotesi generiche
}

Propagazione della Qualita

La qualita si degrada attraverso i calcoli:

def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""La qualita combinata e il minimo di tutti gli input."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Affidabilita di questo GFM

return base_quality * calculation_confidence

Gestione degli Errori

I GFM implementano una gestione degli errori robusta:

async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")

except Exception as e:
# Registra l'errore con contesto
logger.error(
"Esecuzione GFM fallita",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)

# Crea DataError per tracciamento
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)

# Aggiorna lo stato a fallito
self.update_gfm_state("failed")

Valori di Fallback

Quando un GFM fallisce, i GFM dipendenti possono usare valori di fallback:

def get_fallback_origin(self, category: str) -> OriginProp:
"""Fornisce origine predefinita basata sulla categoria."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))

Strategie di Caching

Caching a Livello Factory

Le factory memorizzano in cache le risorse condivise:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Cache dei fattori di emissione (usati da tutti i worker)
self.emission_factors = await self.load_emission_factors()

# Cache dei dati di attivita
self.activities = await self.load_activities()

def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# I worker condividono i dati in cache della factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)

Caching a Livello Worker

I worker possono memorizzare in cache i risultati dei calcoli:

class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}

async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"

if cache_key in self.local_cache:
return self.local_cache[cache_key]

result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result

return result

Debug e Tracciamento

Logging dell'Esecuzione

L'orchestratore registra informazioni dettagliate sull'esecuzione:

logger.info(
"Esecuzione GFM",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)

Mutazioni del Grafo

Tutte le modifiche al CalcGraph vengono tracciate come mutazioni:

# Ogni modifica di proprieta crea una mutazione
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)

Questo fornisce:

  • Verificabilita - Storia completa di tutte le modifiche
  • Debug - Traccia come sono stati calcolati i valori
  • Riproducibilita - Riproduzione deterministica dei calcoli

Prossimi Passi

  • Catalogo Moduli - Moduli disponibili
  • GFM SDK (in arrivo) - Costruisci moduli personalizzati