Process Model Integration
The advanced foreground implementation seamlessly converts background process data into foreground fragment models. This enables users to build detailed, modular models that combine proprietary data with public LCA databases.
Core Process Integration Methods
Creating Process Models
create_process_model(process, ref_flow=None, StageName=None, **kwargs)
Convert a background process into a fragment model.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # Create fragment model from ecoinvent process
steel_process = catalog.get('ecoinvent.3.8', 'steel production')
steel_model = query.create_process_model(
steel_process,
StageName='Steel Production'
)
# Specify reference flow for multi-output processes
refinery_process = catalog.get('ecoinvent.3.8', 'petroleum refinery')
gasoline_model = query.create_process_model(
refinery_process,
ref_flow=gasoline_flow,
StageName='Gasoline Production'
)
|
What this creates:
- Reference fragment: Product output with correct direction and exchange value
- Balance fragment: Internal node terminated to the original process
- Metadata preservation: StageName and other process properties
Extending Process Models
extend_process(fragment, scenario=None, include_elementary=False, inventory=False, **kwargs)
Build out supply chain dependencies as child fragments.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| # Basic extension with intermediate flows
steel_model = query.create_process_model(steel_process)
query.extend_process(steel_model)
# Include elementary flows as fragments
query.extend_process(
steel_model,
include_elementary=True
)
# Use process inventory instead of background routes
query.extend_process(
steel_model,
inventory=True # Preserves exchange metadata but may affect LCI accuracy
)
# Scenario-specific extension
query.extend_process(
steel_model,
scenario='renewable_energy'
)
|
Extension behavior:
- Creates child fragments for each process dependency
- Automatically terminates child fragments to background processes
- Preserves exchange values and metadata
- Respects existing child fragments (updates rather than duplicates)
Fragment Creation from Exchanges
fragment_from_exchanges(exchanges, parent=None, include_elementary=False, auto_anchor=True, **kwargs)
Create or update fragments from exchange lists.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # Create new model from exchange list
inventory = steel_process.inventory()
steel_model = query.fragment_from_exchanges(
inventory,
ref='steel_production_model',
include_elementary=True
)
# Add children to existing fragment
aluminum_model = query.frag('aluminum_production')
aluminum_inputs = aluminum_process.dependencies()
query.fragment_from_exchanges(
aluminum_inputs,
parent=aluminum_model,
auto_anchor=True # Automatically link to existing fragments
)
# Update existing model with new data
updated_inventory = updated_steel_process.inventory()
query.fragment_from_exchanges(
updated_inventory,
parent=steel_model # Updates existing children
)
|
Advanced Integration Patterns
Multi-Process System Modeling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| def build_integrated_system(query, processes):
"""Build integrated system from multiple processes"""
models = {}
# Create individual process models
for process_name, process in processes.items():
model = query.create_process_model(
process,
StageName=process_name
)
models[process_name] = model
# Extend each model and link them together
for name, model in models.items():
query.extend_process(model)
# Find linking opportunities
for child in model.child_flows:
if child.flow in [m.flow for m in models.values()]:
# Link to internal process instead of background
supplier = next(m for m in models.values()
if m.flow == child.flow)
child.terminate(supplier)
print(f"Linked {child.flow} to internal model")
return models
|
Hierarchical Model Construction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| def create_hierarchical_model(query, main_process, detail_level=2):
"""Create hierarchical model with configurable detail"""
# Create main model
main_model = query.create_process_model(
main_process,
StageName='Main Process'
)
level = 1
to_expand = [main_model]
while to_expand and level <= detail_level:
current_level = to_expand.copy()
to_expand = []
for model in current_level:
# Expand this level
query.extend_process(model, include_elementary=(level == detail_level))
# Prepare next level expansions
if level < detail_level:
for child in model.child_flows:
if child.termination and hasattr(child.termination, 'inventory'):
# Create sub-model for significant processes
submodel = query.create_process_model(
child.termination,
StageName=f'{model["StageName"]} - {child.flow.name}'
)
child.terminate(submodel)
to_expand.append(submodel)
level += 1
return main_model
|
Process Model Customization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| def customize_process_model(query, process, customizations):
"""Create process model with custom modifications"""
# Create base model
model = query.create_process_model(process)
query.extend_process(model)
# Apply customizations
for custom in customizations:
if custom['type'] == 'substitute_input':
# Replace an input with alternative
target_flow = custom['target_flow']
replacement = custom['replacement']
child = next(c for c in model.child_flows
if c.flow == target_flow and c.direction == 'Input')
child.terminate(replacement)
print(f"Substituted {target_flow} with {replacement}")
elif custom['type'] == 'adjust_efficiency':
# Modify exchange values
target_flow = custom['target_flow']
factor = custom['factor']
child = next(c for c in model.child_flows
if c.flow == target_flow)
old_value = child.exchange_value()
query.observe(child, exchange_value=old_value * factor)
print(f"Adjusted {target_flow} by factor {factor}")
elif custom['type'] == 'add_input':
# Add new input not in original process
new_input = query.new_fragment(
custom['flow'],
'Input',
parent=model,
value=custom['value']
)
if 'termination' in custom:
new_input.terminate(custom['termination'])
print(f"Added new input: {custom['flow']}")
return model
|
Exchange Handling and Termination
Automatic Termination Logic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| def analyze_termination_strategy(query, exchanges):
"""Understand how exchanges get terminated"""
termination_types = {
'context': 0, # Elementary flows to environmental contexts
'cutoff': 0, # Unlinked intermediate flows
'process': 0, # Linked to background processes
'fragment': 0 # Linked to foreground fragments
}
for exchange in exchanges:
if exchange.type == 'context':
termination_types['context'] += 1
elif exchange.type == 'cutoff':
# Check if auto_anchor finds a fragment
matching_frags = list(query.fragments_with_flow(
exchange.flow,
exchange.direction
))
if matching_frags:
termination_types['fragment'] += 1
else:
termination_types['cutoff'] += 1
else: # process termination
termination_types['process'] += 1
print("Termination strategy analysis:")
for term_type, count in termination_types.items():
print(f" {term_type}: {count} exchanges")
return termination_types
|
Custom Termination Control
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| def create_model_with_termination_control(query, process, term_dict=None):
"""Create model with explicit termination control"""
if term_dict is None:
term_dict = {}
# Get process inventory
inventory = process.inventory()
# Build termination dictionary for specific flows
for exchange in inventory:
flow_id = exchange.flow.external_ref
if flow_id in term_dict:
continue # Already specified
# Custom termination logic
if 'electricity' in exchange.flow.name.lower():
# Link electricity to specific source
electricity_model = query.frag('renewable_electricity')
term_dict[flow_id] = electricity_model
elif 'transport' in exchange.flow.name.lower():
# Link transport to specific service
transport_model = query.frag('electric_transport')
term_dict[flow_id] = transport_model
# Create model with custom terminations
model = query.fragment_from_exchanges(
inventory,
term_dict=term_dict,
ref=f'{process.name}_custom_model',
auto_anchor=False # Rely on term_dict only
)
return model
|
Process Model Validation
Model Integrity Checks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| def validate_process_model(query, model):
"""Validate process model integrity"""
issues = []
# Check for unobserved fragments
tree = query.tree(model)
for node in tree:
if not hasattr(node.fragment, 'exchange_value') or node.fragment.exchange_value() is None:
issues.append(f"Unobserved fragment: {node.fragment.external_ref}")
# Check for cutoff flows
cutoffs = []
for node in tree:
if hasattr(node.fragment, 'termination'):
term = node.fragment.termination()
if term is None or term.is_null:
cutoffs.append(node.fragment)
if cutoffs:
issues.append(f"Cutoff flows: {len(cutoffs)}")
# Check balance
try:
traversal = query.traverse(model)
total_flows = len(list(traversal))
print(f"Model traverses successfully: {total_flows} flows")
except Exception as e:
issues.append(f"Traversal error: {e}")
if issues:
print("Model validation issues:")
for issue in issues:
print(f" - {issue}")
else:
print("Model validation: PASSED")
return len(issues) == 0
|
Integration Best Practices
Process Selection
- Choose representative processes: Select processes that best represent your specific technology
- Consider system boundaries: Ensure process scope matches your modeling needs
- Verify data quality: Check process documentation and data sources
Model Structure
- Use StageName consistently: Organize models with clear stage naming conventions
- Preserve process metadata: Maintain links to original process documentation
- Document customizations: Record any modifications made to original process data
Termination Strategy
- Plan linking strategy: Decide which flows to link internally vs. leave as cutoffs
- Use auto_anchor judiciously: Automatic linking can create unintended connections
- Validate terminations: Ensure terminations make sense from a system perspective
- Limit extension depth: Deep process extensions can create very large models
- Cache process models: Reuse created models rather than recreating them
- Use scenarios for alternatives: Rather than creating multiple models, use scenarios
Next Steps