Skip to main content

How Gap Filling Modules Work

This guide provides a detailed look at the internal mechanics of GFMs, from registration to execution.

Module Loading

When the EOS service starts, all GFMs are loaded and initialized through the GapFillingModuleLoader.

Loading Process

class GapFillingModuleLoader:
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
self.postgres_db = postgres_db
self.service_provider = service_provider
self.factories: dict[str, AbstractGapFillingFactory] = {}

async def load_all_modules(self):
"""Load and initialize all GFM factories."""
# Import all GFM factory classes
from core.gap_filling_modules import (
match_product_name_gfm,
origin_gfm,
greenhouse_gfm,
# ... 50+ modules
)

# Initialize each factory
for factory_class in self.get_all_factories():
factory = factory_class(self.postgres_db, self.service_provider)
await factory.init_cache()
self.factories[factory.name] = factory

Factory Initialization

Each factory initializes once and maintains persistent resources:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
def __init__(self, postgres_db: PostgresDb, service_provider: ServiceProvider):
super().__init__(postgres_db, service_provider)
self.emission_factors: dict = {}
self.activity_cache: dict = {}

async def init_cache(self):
"""Load emission factors into memory for fast access."""
self.emission_factors = await self.load_emission_factors()
self.activity_cache = await self.load_activity_data()

Orchestrator Scheduling

The Orchestrator coordinates GFM execution through a scheduling loop.

The Scheduling Loop

class Orchestrator(AbstractGraphObserver):
async def run(self, calc_graph: CalcGraph):
"""Main scheduling loop."""
# Initialize - spawn workers for all nodes
for node in calc_graph.nodes.values():
await self.spawn_workers_for_node(node)

# Scheduling loop
while self.has_pending_gfms():
for gfm in self.get_scheduled_gfms():
status = gfm.can_run_now()

if status == GapFillingWorkerStatusEnum.READY:
await gfm.run(calc_graph)
self.mark_completed(gfm)
elif status == GapFillingWorkerStatusEnum.WAITING:
self.reschedule(gfm)
elif status == GapFillingWorkerStatusEnum.SKIP:
self.mark_skipped(gfm)

Worker Spawning

When a node is added to the graph, the orchestrator spawns workers:

async def spawn_workers_for_node(self, node: Node):
"""Create workers for all applicable GFMs."""
for factory in self.gfm_loader.factories.values():
worker = factory.spawn_worker(node)

if worker.should_be_scheduled():
self.schedule(worker)

The Three Key Methods

Every GFM worker implements three methods that control its lifecycle.

should_be_scheduled()

Determines if this GFM is relevant for a given node:

class OriginGapFillingWorker(AbstractGapFillingWorker):
def should_be_scheduled(self) -> bool:
"""Should this GFM run on this node?"""
# Only run on food product flow nodes
if not isinstance(self.node, FoodProductFlowNode):
return False

# Only run if origin is not already set
origin_prop = self.node.get_property(OriginProp)
if origin_prop and origin_prop.is_complete():
return False

return True

can_run_now()

Checks if all dependencies are satisfied:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Are dependencies satisfied?"""
# Check if category has been assigned
gfm_state = self.node.get_property(GfmStateProp)

if not gfm_state:
return GapFillingWorkerStatusEnum.WAITING

# Wait for match_product_name_gfm to complete
if gfm_state.get("match_product_name_gfm") != "completed":
return GapFillingWorkerStatusEnum.WAITING

# Wait for category assignment
category_prop = self.node.get_property(GlossaryTermProp)
if not category_prop:
return GapFillingWorkerStatusEnum.WAITING

return GapFillingWorkerStatusEnum.READY

run()

Executes the gap-filling logic:

async def run(self, calc_graph: CalcGraph) -> None:
"""Execute the gap-filling calculation."""
try:
# Get required inputs
category = self.node.get_property(GlossaryTermProp)
names = self.node.get_property(NamesProp)

# Perform calculation
origin = await self.detect_origin(names, category)

# Update node with result
origin_prop = OriginProp(
country_code=origin.country,
confidence=origin.confidence,
source="origin_gfm"
)
self.node.set_property(origin_prop)

# Update GFM state
self.update_gfm_state("completed")

except Exception as e:
self.handle_error(e)

Worker Status Values

The can_run_now() method returns one of these status values:

StatusDescriptionOrchestrator Action
READYAll dependencies satisfiedExecute immediately
WAITINGDependencies not yet availableReschedule for next iteration
SKIPNot applicable for this nodeRemove from queue

Property System

GFMs read and write data through typed properties on nodes.

Reading Properties

# Get a specific property type
names_prop = self.node.get_property(NamesProp)
if names_prop:
product_name = names_prop.get_name("en")

# Get quantity with units
quantity_prop = self.node.get_property(QuantityProp)
if quantity_prop:
amount_kg = quantity_prop.to_kg()

Writing Properties

# Set environmental flows
flows_prop = EnvironmentalFlowsProp(
co2_eq=calculated_emissions,
water_l=water_footprint,
land_m2=land_use
)
self.node.set_property(flows_prop)

# Set location data
location_prop = LocationProp(
country_code="CH",
region="Europe",
coordinates=(47.3769, 8.5417)
)
self.node.set_property(location_prop)

Common Property Types

PropertyDescription
NamesPropMulti-language product names
QuantityPropAmounts with unit conversion
GlossaryTermPropLinks to terminology/categories
OriginPropGeographic origin data
LocationPropLocation with coordinates
EnvironmentalFlowsPropImpact calculation results
GfmStatePropGFM execution status tracking

GFM State Tracking

Each node tracks the execution state of all GFMs:

# GfmStateProp stores per-module state
gfm_state = {
"match_product_name_gfm": "completed",
"origin_gfm": "completed",
"transportation_decision_gfm": "running",
"greenhouse_gfm": "waiting",
"impact_assessment_gfm": "pending"
}

State Values

StateDescription
pendingNot yet evaluated
waitingDependencies not satisfied
runningCurrently executing
completedSuccessfully finished
skippedNot applicable
failedExecution failed

Dependency Management

GFMs declare dependencies through their can_run_now() logic.

Common Dependency Patterns

f9889c4d177aa8a1a1827e1b6553168f

Checking Dependencies

The recommended approach is to check for the presence of required properties rather than waiting for specific GFMs by name. This makes modules more loosely coupled and easier to maintain:

def can_run_now(self) -> GapFillingWorkerStatusEnum:
"""Check if required properties are available."""
# Check for origin data (set by origin_gfm)
origin_prop = self.node.get_property(OriginProp)
if not origin_prop:
return GapFillingWorkerStatusEnum.WAITING

# Check for transport mode (set by transportation_decision_gfm)
transport_prop = self.node.get_property(TransportModeProp)
if not transport_prop:
return GapFillingWorkerStatusEnum.WAITING

# Check for processing data (set by processing_gfm)
processing_prop = self.node.get_property(ProcessingProp)
if not processing_prop:
return GapFillingWorkerStatusEnum.WAITING

return GapFillingWorkerStatusEnum.READY
Best Practice

Checking for properties instead of GFM names means your module will work correctly even if the upstream GFM is renamed, replaced, or if the property is set through a different mechanism (e.g., user-provided data).

Data Quality Tracking

GFMs track the quality and source of their calculations:

class DataQuality:
score: float # 0-1 confidence score
source: str # Data source identifier
methodology: str # Calculation method used

# Quality scores
QUALITY_LEVELS = {
"primary_data": 1.0, # Direct measurement
"high_quality_proxy": 0.8, # Same product, different context
"reasonable_proxy": 0.6, # Similar product in category
"default_value": 0.4, # Category average
"uncertain_estimate": 0.2 # Broad assumptions
}

Propagating Quality

Quality degrades through calculations:

def calculate_output_quality(self, input_qualities: list[float]) -> float:
"""Combined quality is the minimum of all inputs."""
base_quality = min(input_qualities)
calculation_confidence = 0.95 # This GFM's reliability

return base_quality * calculation_confidence

Error Handling

GFMs implement graceful error handling:

async def run(self, calc_graph: CalcGraph) -> None:
try:
result = await self.calculate()
self.node.set_property("result", result)
self.update_gfm_state("completed")

except Exception as e:
# Log error with context
logger.error(
"GFM execution failed",
gfm=self.__class__.__name__,
node_uid=self.node.uid,
error=str(e)
)

# Create DataError for tracking
error = DataError(
node_uid=self.node.uid,
gfm_name=self.__class__.__name__,
message=str(e),
classification=ErrorClassification.calculation_error
)
self.node.add_error(error)

# Update state to failed
self.update_gfm_state("failed")

Fallback Values

When a GFM fails, dependent GFMs may use fallback values:

def get_fallback_origin(self, category: str) -> OriginProp:
"""Provide default origin based on category."""
defaults = {
"tropical_fruit": OriginProp(country_code="XX", region="ROW"),
"dairy": OriginProp(country_code="EU", region="Europe"),
"meat": OriginProp(country_code="EU", region="Europe"),
}
return defaults.get(category, OriginProp(country_code="XX"))

Caching Strategies

Factory-Level Caching

Factories cache shared resources:

class GreenhouseGapFillingFactory(AbstractGapFillingFactory):
async def init_cache(self):
# Cache emission factors (used by all workers)
self.emission_factors = await self.load_emission_factors()

# Cache activity data
self.activities = await self.load_activities()

def spawn_worker(self, node: Node) -> GreenhouseGapFillingWorker:
# Workers share factory's cached data
return GreenhouseGapFillingWorker(
node=node,
emission_factors=self.emission_factors,
activities=self.activities
)

Worker-Level Caching

Workers can cache calculation results:

class ClimateWorker(AbstractGapFillingWorker):
def __init__(self, node: Node, factory_cache: dict):
self.node = node
self.factory_cache = factory_cache
self.local_cache = {}

async def calculate_emissions(self, category: str, origin: str) -> float:
cache_key = f"{category}:{origin}"

if cache_key in self.local_cache:
return self.local_cache[cache_key]

result = await self._compute_emissions(category, origin)
self.local_cache[cache_key] = result

return result

Debugging and Tracing

Execution Logging

The orchestrator logs detailed execution information:

logger.info(
"GFM execution",
gfm=worker.__class__.__name__,
node_uid=worker.node.uid,
status=status.value,
duration_ms=duration
)

Graph Mutations

All changes to the CalcGraph are tracked as mutations:

# Every property change creates a mutation
mutation = PropMutation(
node_uid=self.node.uid,
property_type="OriginProp",
old_value=None,
new_value=origin_prop,
gfm_name="origin_gfm"
)
calc_graph.apply_mutation(mutation)

This provides:

  • Auditability - Complete history of all changes
  • Debugging - Trace how values were computed
  • Reproducibility - Replay calculations deterministically

Next Steps

  • Module Catalog - Available modules
  • GFM SDK (coming soon) - Build custom modules