How Gap Filling Modules Work
This guide provides a detailed look at the internal mechanics of GFMs, from registration to execution.
Module Loading
When the EOS service starts, all GFMs are loaded and initialized through the GapFillingModuleLoader.
Loading Process
class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}
async def load_all_modules(self):
"""Load and initialize all GFM factories."""
# Import all GFM factory classes
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 50+ modules
)
# Initialize each factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory
Factory Initialization
Each factory initializes once and maintains persistent resources:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}
async def init_cache(self):
"""Load emission factors into memory for fast access."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()
Orchestrator Scheduling
The Orchestrator coordinates GFM execution through a scheduling loop.
The Scheduling Loop
class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Main scheduling loop."""
# Initialize - spawn workers for all nodes
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)
# Scheduling loop
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()
if status == GapFillingWorkerStatusEnum.READY:
await gfm.run(calc_graph)
self.mark_completed(gfm)
elif status == GapFillingWorkerStatusEnum.WAITING:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.SKIP:
self.mark_skipped(gfm)
Worker Spawning
When a node is added to the graph, the orchestrator spawns workers:
async def spawn_workers_for_node(self, node: Node):
"""Create workers for all applicable GFMs."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)
if worker.should_be_scheduled():
self.schedule(worker)
The Three Key Methods
Every GFM worker implements three methods that control its lifecycle.
should_be_scheduled()
Determines if this GFM is relevant for a given node:
class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Should this GFM run on this node?"""
# Only run on food product flow nodes
if not isinstance(self.node, FoodProductFlowNode):
return False
# Only run if origin is not already set
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False
return True
can_run_now()
Checks if all dependencies are satisfied:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Are dependencies satisfied?"""
# Check if category has been assigned
gfm_state = self.node.get_property(GfmStateProp)
if not gfm_state:
return GapFillingWorkerStatusEnum.WAITING
# Wait for match_product_name_gfm to complete
if gfm_state.get("match_product_name_gfm") != "completed":
return GapFillingWorkerStatusEnum.WAITING
# Wait for category assignment
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.WAITING
return GapFillingWorkerStatusEnum.READY
run()
Executes the gap-filling logic:
async def run(self, calc_graph: CalcGraph) -> None:
"""Execute the gap-filling calculation."""
try:
# Get required inputs
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)
# Perform calculation
origin = await self.detect_origin(names, category)
# Update node with result
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)
# Update GFM state
self.update_gfm_state("completed")
except Exception as e:
self.handle_error(e)
Worker Status Values
The can_run_now() method returns one of these status values:
| Status | Description | Orchestrator Action |
|---|---|---|
READY | All dependencies satisfied | Execute immediately |
WAITING | Dependencies not yet available | Reschedule for next iteration |
SKIP | Not applicable for this node | Remove from queue |
Property System
GFMs read and write data through typed properties on nodes.
Reading Properties
# Get a specific property type
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")
# Get quantity with units
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()
Writing Properties
# Set environmental flows
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)
# Set location data
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)
Common Property Types
| Property | Description |
|---|---|
NamesProp | Multi-language product names |
QuantityProp | Amounts with unit conversion |
GlossaryTermProp | Links to terminology/categories |
OriginProp | Geographic origin data |
LocationProp | Location with coordinates |
EnvironmentalFlowsProp | Impact calculation results |
GfmStateProp | GFM execution status tracking |
GFM State Tracking
Each node tracks the execution state of all GFMs:
# GfmStateProp stores per-module state
gfm_state = {
"match_product_name_gfm": "completed",
"origin_gfm": "completed",
"transportation_decision_gfm": "running",
"greenhouse_gfm": "waiting",
"impact_assessment_gfm": "pending"
}
State Values
| State | Description |
|---|---|
pending | Not yet evaluated |
waiting | Dependencies not satisfied |
running | Currently executing |
completed | Successfully finished |
skipped | Not applicable |
failed | Execution failed |
Dependency Management
GFMs declare dependencies through their can_run_now() logic.
Common Dependency Patterns
Checking Dependencies
The recommended approach is to check for the presence of required properties rather than waiting for specific GFMs by name. This makes modules more loosely coupled and easier to maintain:
def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Check if required properties are available."""
# Check for origin data (set by origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.WAITING
# Check for transport mode (set by transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.WAITING
# Check for processing data (set by processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.WAITING
return GapFillingWorkerStatusEnum.READY
Checking for properties instead of GFM names means your module will work correctly even if the upstream GFM is renamed, replaced, or if the property is set through a different mechanism (e.g., user-provided data).
Data Quality Tracking
GFMs track the quality and source of their calculations:
class DataQuality:
score: float # 0-1 confidence score
source: str # Data source identifier
methodology: str # Calculation method used
# Quality scores
QUALITY_LEVELS = {
"primary_data": 1.0, # Direct measurement
"high_quality_proxy": 0.8, # Same product, different context
"reasonable_proxy": 0.6, # Similar product in category
"default_value": 0.4, # Category average
"uncertain_estimate": 0.2 # Broad assumptions
}
Propagating Quality
Quality degrades through calculations:
def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""Combined quality is the minimum of all inputs."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # This GFM's reliability
return base_quality * calculation_confidence
Error Handling
GFMs implement graceful error handling:
async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")
except Exception as e:
# Log error with context
logger.error(
"GFM execution failed",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)
# Create DataError for tracking
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)
# Update state to failed
self.update_gfm_state("failed")
Fallback Values
When a GFM fails, dependent GFMs may use fallback values:
def get_fallback_origin(self, category: str) -> OriginProp:
"""Provide default origin based on category."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))
Caching Strategies
Factory-Level Caching
Factories cache shared resources:
class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Cache emission factors (used by all workers)
self.emission_factors = await self.load_emission_factors()
# Cache activity data
self.activities = await self.load_activities()
def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Workers share factory's cached data
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)
Worker-Level Caching
Workers can cache calculation results:
class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}
async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"
if cache_key in self.local_cache:
return self.local_cache[cache_key]
result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result
return result
Debugging and Tracing
Execution Logging
The orchestrator logs detailed execution information:
logger.info(
"GFM execution",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)
Graph Mutations
All changes to the CalcGraph are tracked as mutations:
# Every property change creates a mutation
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)
This provides:
- Auditability - Complete history of all changes
- Debugging - Trace how values were computed
- Reproducibility - Replay calculations deterministically
Next Steps
- Module Catalog - Available modules
- GFM SDK (coming soon) - Build custom modules