Cómo Funcionan los Gap Filling Modules
Esta guía proporciona una mirada detallada a las mecánicas internas de los GFMs, desde el registro hasta la ejecución.
Carga de Módulos
Cuando el servicio EOS arranca, todos los GFMs se cargan e inicializan a través del GapFillingModuleLoader.
Proceso de Carga
class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}
async def load_all_modules(self):
"""Cargar e inicializar todas las factories de GFM."""
# Importar todas las clases de factory de GFM
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 28 módulos en total
)
# Inicializar cada factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory
Inicialización de Factory
Cada factory se inicializa una vez y mantiene recursos persistentes:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}
async def init_cache(self):
"""Cargar factores de emisión en memoria para acceso rápido."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()
Programación del Orquestador
El Orquestador coordina la ejecución de GFM a través de un bucle de programación.
El Bucle de Programación
class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Bucle principal de programación."""
# Inicializar - generar trabajadores para todos los nodos
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)
# Bucle de programación
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()
if status == GapFillingWorkerStatusEnum.ready:
await gfm.run(calc_graph)
self.mark_finished(gfm)
elif status == GapFillingWorkerStatusEnum.reschedule:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.cancel:
self.mark_canceled(gfm)
Generación de Trabajadores
Cuando un nodo se añade al grafo, el orquestador genera trabajadores:
async def spawn_workers_for_node(self, node: Node):
"""Crear trabajadores para todos los GFMs aplicables."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)
if worker.should_be_scheduled():
self.schedule(worker)
Los Tres Métodos Clave
Cada trabajador GFM implementa tres métodos que controlan su ciclo de vida.
should_be_scheduled()
Determina si este GFM es relevante para un nodo dado:
class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""¿Debería ejecutarse este GFM en este nodo?"""
# Solo ejecutar en nodos de flujo de producto alimentario
if not isinstance(self.node, FoodProductFlowNode):
return False
# Solo ejecutar si el origen no está ya establecido
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False
return True
can_run_now()
Comprueba si todas las dependencias están satisfechas:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""¿Están satisfechas las dependencias?"""
# Comprobar si las propiedades requeridas están disponibles
# (Preferir comprobar propiedades sobre estado de GFM para acoplamiento débil)
# Esperar asignación de categoría (establecida por match_product_name_gfm)
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
run()
Ejecuta la lógica de relleno de vacíos:
async def run(self, calc_graph: CalcGraph) -> None:
"""Ejecutar el cálculo de relleno de vacíos."""
try:
# Obtener entradas requeridas
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)
# Realizar cálculo
origin = await self.detect_origin(names, category)
# Actualizar nodo con resultado
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)
# Actualizar estado del GFM
self.update_gfm_state("completed")
except Exception as e:
self.handle_error(e)
Valores de Estado del Trabajador
El método can_run_now() devuelve uno de estos valores de GapFillingWorkerStatusEnum:
| Estado | Descripción | Acción del Orquestador |
|---|---|---|
ready | Todas las dependencias satisfechas | Ejecutar inmediatamente |
reschedule | Dependencias aún no disponibles | Reprogramar para siguiente iteración |
cancel | No aplicable para este nodo | Eliminar de la cola, marcar como cancelado |
Sistema de Propiedades
Los GFMs leen y escriben datos a través de propiedades tipadas en los nodos.
Lectura de Propiedades
# Obtener un tipo de propiedad específico
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")
# Obtener cantidad con unidades
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()
Escritura de Propiedades
# Establecer flujos ambientales
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)
# Establecer datos de ubicación
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)
Tipos de Propiedad Comunes
| Propiedad | Descripción |
|---|---|
NamesProp | Nombres de producto multilingües |
QuantityProp | Cantidades con conversión de unidades |
GlossaryTermProp | Enlaces a terminología/categorías |
OriginProp | Datos de origen geográfico |
LocationProp | Ubicación con coordenadas |
EnvironmentalFlowsProp | Resultados de cálculo de impacto |
GfmStateProp | Seguimiento de estado de ejecución de GFM |
Seguimiento de Estado de GFM
Cada nodo rastrea el estado de ejecución de todos los GFMs:
# GfmStateProp almacena estado por módulo usando valores NodeGfmStateEnum
# "S" = programado, "F" = finalizado, "C" = cancelado
gfm_state = {
"MatchProductNameGapFillingWorker": "F", # finalizado
"OriginGapFillingWorker": "F", # finalizado
"TransportDecisionGapFillingWorker": "S", # programado (en progreso)
"GreenhouseGapFillingWorker": "S", # programado (esperando)
"ImpactAssessmentGapFillingWorker": "S" # programado (pendiente)
}
Valores de Estado
El estado de ejecución de GFM se rastrea usando NodeGfmStateEnum:
| Estado | Código | Descripción |
|---|---|---|
scheduled | S | El GFM está programado para ejecutarse (incluye pendiente, esperando, ejecutando) |
finished | F | El GFM ha completado exitosamente |
canceled | C | El GFM fue cancelado (no aplicable, omitido o falló) |
El enum real usa códigos de una letra ("S", "F", "C") para eficiencia de almacenamiento.
Gestión de Dependencias
Los GFMs declaran dependencias a través de su lógica can_run_now().
Patrones de Dependencia Comunes
Verificación de Dependencias
El enfoque recomendado es comprobar la presencia de propiedades requeridas en lugar de esperar GFMs específicos por nombre. Esto hace que los módulos estén más débilmente acoplados y sean más fáciles de mantener:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Comprobar si las propiedades requeridas están disponibles."""
# Comprobar datos de origen (establecidos por origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.reschedule
# Comprobar modo de transporte (establecido por transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.reschedule
# Comprobar datos de procesamiento (establecidos por processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.reschedule
return GapFillingWorkerStatusEnum.ready
Comprobar propiedades en lugar de nombres de GFM significa que tu módulo funcionará correctamente incluso si el GFM anterior es renombrado, reemplazado, o si la propiedad se establece a través de un mecanismo diferente (por ejemplo, datos proporcionados por el usuario).
Seguimiento de Calidad de Datos
Los GFMs rastrean la calidad y fuente de sus cálculos:
class DataQuality:
score: float # Puntuación de confianza 0-1
source: str # Identificador de fuente de datos
methodology: str # Método de cálculo usado
# Puntuaciones de calidad
QUALITY_LEVELS = {
"primary_data": 1.0, # Medición directa
"high_quality_proxy": 0.8, # Mismo producto, contexto diferente
"reasonable_proxy": 0.6, # Producto similar en categoría
"default_value": 0.4, # Promedio de categoría
"uncertain_estimate": 0.2 # Suposiciones amplias
}
Propagación de Calidad
La calidad se degrada a través de los cálculos:
def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""La calidad combinada es el mínimo de todas las entradas."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # Fiabilidad de este GFM
return base_quality * calculation_confidence
Manejo de Errores
Los GFMs implementan manejo de errores elegante:
async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")
except Exception as e:
# Registrar error con contexto
logger.error(
"Ejecución de GFM falló",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)
# Crear DataError para seguimiento
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)
# Actualizar estado a fallido
self.update_gfm_state("failed")
Valores de Respaldo
Cuando un GFM falla, los GFMs dependientes pueden usar valores de respaldo:
def get_fallback_origin(self, category: str) -> OriginProp:
"""Proporcionar origen por defecto basado en categoría."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))
Estrategias de Caché
Caché a Nivel de Factory
Las factories cachean recursos compartidos:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Cachear factores de emisión (usados por todos los trabajadores)
self.emission_factors = await self.load_emission_factors()
# Cachear datos de actividad
self.activities = await self.load_activities()
def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Los trabajadores comparten datos cacheados de la factory
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)
Caché a Nivel de Trabajador
Los trabajadores pueden cachear resultados de cálculo:
class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}
async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"
if cache_key in self.local_cache:
return self.local_cache[cache_key]
result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result
return result
Depuración y Trazado
Registro de Ejecución
El orquestador registra información detallada de ejecución:
logger.info(
"Ejecución de GFM",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)
Mutaciones del Grafo
Todos los cambios al CalcGraph se rastrean como mutaciones:
# Cada cambio de propiedad crea una mutación
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)
Esto proporciona:
- Auditabilidad - Historial completo de todos los cambios
- Depuración - Rastrear cómo se calcularon los valores
- Reproducibilidad - Reproducir cálculos de forma determinista
Siguientes Pasos
- Catálogo de Módulos - Módulos disponibles
- GFM SDK (próximamente) - Construir módulos personalizados