Saltar al contenido principal

Cómo Funcionan los Gap Filling Modules

Esta guía proporciona una mirada detallada a las mecánicas internas de los GFMs, desde el registro hasta la ejecución.

Carga de Módulos

Cuando el servicio EOS arranca, todos los GFMs se cargan e inicializan a través del GapFillingModuleLoader.

Proceso de Carga

class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}

async def load_all_modules(self):
"""Cargar e inicializar todas las factories de GFM."""
# Importar todas las clases de factory de GFM
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 28 módulos en total
)

# Inicializar cada factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory

Inicialización de Factory

Cada factory se inicializa una vez y mantiene recursos persistentes:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}

async def init_cache(self):
"""Cargar factores de emisión en memoria para acceso rápido."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()

Programación del Orquestador

El Orquestador coordina la ejecución de GFM a través de un bucle de programación.

El Bucle de Programación

class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Bucle principal de programación."""
# Inicializar - generar trabajadores para todos los nodos
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)

# Bucle de programación
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()

if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)

Generación de Trabajadores

Cuando un nodo se añade al grafo, el orquestador genera trabajadores:

async def spawn_workers_for_node(self, node: Node):
"""Crear trabajadores para todos los GFMs aplicables."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)

if worker.should_be_scheduled():
self.schedule(worker)

Los Tres Métodos Clave

Cada trabajador GFM implementa tres métodos que controlan su ciclo de vida.

should_be_scheduled()

Determina si este GFM es relevante para un nodo dado:

class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""¿Debería ejecutarse este GFM en este nodo?"""
# Solo ejecutar en nodos de flujo de producto alimentario
if not isinstance(self.node, FoodProductFlowNode):
return False

# Solo ejecutar si el origen no está ya establecido
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False

return True

can_run_now()

Comprueba si todas las dependencias están satisfechas:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""¿Están satisfechas las dependencias?"""
# Comprobar si las propiedades requeridas están disponibles
# (Preferir comprobar propiedades sobre estado de GFM para acoplamiento débil)

# Esperar asignación de categoría (establecida por match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready

run()

Ejecuta la lógica de relleno de vacíos:

async def run(self, calc_graph: CalcGraph) -> None:
"""Ejecutar el cálculo de relleno de vacíos."""
try:
# Obtener entradas requeridas
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)

# Realizar cálculo
origin = await self.detect_origin(names, category)

# Actualizar nodo con resultado
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)

# Actualizar estado del GFM
self.update_gfm_state("completed")

except Exception as e:
self.handle_error(e)

Valores de Estado del Trabajador

El método can_run_now() devuelve uno de estos valores de GapFillingWorkerStatusEnum:

EstadoDescripciónAcción del Orquestador
readyTodas las dependencias satisfechasEjecutar inmediatamente
rescheduleDependencias aún no disponiblesReprogramar para siguiente iteración
cancelNo aplicable para este nodoEliminar de la cola, marcar como cancelado

Sistema de Propiedades

Los GFMs leen y escriben datos a través de propiedades tipadas en los nodos.

Lectura de Propiedades

# Obtener un tipo de propiedad específico
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")

# Obtener cantidad con unidades
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()

Escritura de Propiedades

# Establecer flujos ambientales
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)

# Establecer datos de ubicación
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)

Tipos de Propiedad Comunes

PropiedadDescripción
NamesPropNombres de producto multilingües
QuantityPropCantidades con conversión de unidades
GlossaryTermPropEnlaces a terminología/categorías
OriginPropDatos de origen geográfico
LocationPropUbicación con coordenadas
EnvironmentalFlowsPropResultados de cálculo de impacto
GfmStatePropSeguimiento de estado de ejecución de GFM

Seguimiento de Estado de GFM

Cada nodo rastrea el estado de ejecución de todos los GFMs:

# GfmStateProp almacena estado por módulo usando valores NodeGfmStateEnum
# "S" = programado, "F" = finalizado, "C" = cancelado
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # finalizado
"OriginGapFillingWorker": "F", # finalizado
"TransportDecisionGapFillingWorker": "S", # programado (en progreso)
"GreenhouseGapFillingWorker": "S", # programado (esperando)
"ImpactAssessmentGapFillingWorker": "S" # programado (pendiente)
}

Valores de Estado

El estado de ejecución de GFM se rastrea usando NodeGfmStateEnum:

EstadoCódigoDescripción
scheduledSEl GFM está programado para ejecutarse (incluye pendiente, esperando, ejecutando)
finishedFEl GFM ha completado exitosamente
canceledCEl GFM fue cancelado (no aplicable, omitido o falló)
nota

El enum real usa códigos de una letra ("S", "F", "C") para eficiencia de almacenamiento.

Gestión de Dependencias

Los GFMs declaran dependencias a través de su lógica can_run_now().

Patrones de Dependencia Comunes

2a8f72fc8db9146cac1431b373a03d4e

Verificación de Dependencias

El enfoque recomendado es comprobar la presencia de propiedades requeridas en lugar de esperar GFMs específicos por nombre. Esto hace que los módulos estén más débilmente acoplados y sean más fáciles de mantener:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Comprobar si las propiedades requeridas están disponibles."""
# Comprobar datos de origen (establecidos por origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule

# Comprobar modo de transporte (establecido por transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule

# Comprobar datos de procesamiento (establecidos por processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule

return GapFillingWorkerStatusEnum.ready
Mejor Práctica

Comprobar propiedades en lugar de nombres de GFM significa que tu módulo funcionará correctamente incluso si el GFM anterior es renombrado, reemplazado, o si la propiedad se establece a través de un mecanismo diferente (por ejemplo, datos proporcionados por el usuario).

Seguimiento de Calidad de Datos

Los GFMs rastrean la calidad y fuente de sus cálculos:

class DataQuality:
score: float # Puntuación de confianza 0-1
source: str # Identificador de fuente de datos
methodology: str # Método de cálculo usado

# Puntuaciones de calidad
QUALITY_LEVELS = {
"primary_data": 1.0, # Medición directa
"high_quality_proxy": 0.8, # Mismo producto, contexto diferente
"reasonable_proxy": 0.6, # Producto similar en categoría
"default_value": 0.4, # Promedio de categoría
"uncertain_estimate": 0.2 # Suposiciones amplias
}

Propagación de Calidad

La calidad se degrada a través de los cálculos:

def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""La calidad combinada es el mínimo de todas las entradas."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Fiabilidad de este GFM

return base_quality * calculation_confidence

Manejo de Errores

Los GFMs implementan manejo de errores elegante:

async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")

except Exception as e:
# Registrar error con contexto
logger.error(
"Ejecución de GFM falló",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)

# Crear DataError para seguimiento
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)

# Actualizar estado a fallido
self.update_gfm_state("failed")

Valores de Respaldo

Cuando un GFM falla, los GFMs dependientes pueden usar valores de respaldo:

def get_fallback_origin(self, category: str) -> OriginProp:
"""Proporcionar origen por defecto basado en categoría."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))

Estrategias de Caché

Caché a Nivel de Factory

Las factories cachean recursos compartidos:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Cachear factores de emisión (usados por todos los trabajadores)
self.emission_factors = await self.load_emission_factors()

# Cachear datos de actividad
self.activities = await self.load_activities()

def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Los trabajadores comparten datos cacheados de la factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)

Caché a Nivel de Trabajador

Los trabajadores pueden cachear resultados de cálculo:

class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}

async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"

if cache_key in self.local_cache:
return self.local_cache[cache_key]

result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result

return result

Depuración y Trazado

Registro de Ejecución

El orquestador registra información detallada de ejecución:

logger.info(
"Ejecución de GFM",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)

Mutaciones del Grafo

Todos los cambios al CalcGraph se rastrean como mutaciones:

# Cada cambio de propiedad crea una mutación
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)

Esto proporciona:

  • Auditabilidad - Historial completo de todos los cambios
  • Depuración - Rastrear cómo se calcularon los valores
  • Reproducibilidad - Reproducir cálculos de forma determinista

Siguientes Pasos

  • Catálogo de Módulos - Módulos disponibles
  • GFM SDK (próximamente) - Construir módulos personalizados