Skip to content
2 changes: 1 addition & 1 deletion CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cff-version: 1.2.0
message: If you use this software, please cite it using the metadata from this file.
type: software
title: 'pycoupler: dynamic model coupling of LPJmL'
version: 1.6.5
version: 1.7.0
date-released: '2025-09-22'
abstract: An LPJmL-Python interface for operating LPJmL in a Python environment
and coupling it with Python models, programmes or simple programming scripts.
Expand Down
1 change: 0 additions & 1 deletion pycoupler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
detect_io_type,
)


__all__ = [
"LpjmlConfig",
"CoupledConfig",
Expand Down
30 changes: 16 additions & 14 deletions pycoupler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,13 @@ def set_coupled(
coupled_input,
coupled_output,
sim_name="coupled",
coupled_config=None,
dependency=None,
coupled_year=None,
temporal_resolution="annual",
write_output=[],
write_file_format="cdf",
append_output=True,
Comment thread
jnnsbrr marked this conversation as resolved.
model_name="copan:CORE",
):
"""
Set configuration required for coupled model runs.
Expand All @@ -306,6 +306,8 @@ def set_coupled(
Provide output ID as identifier.
sim_name : str, default "coupled"
Name of the simulation.
coupled_config : str, optional
Path to coupled config file.
dependency : str, optional
Name of simulation to depend on (e.g., transient run).
coupled_year : int, optional
Expand All @@ -323,8 +325,6 @@ def set_coupled(
append_output : bool, default True
If True, defined output entries are appended. If False, existing
outputs are overwritten.
model_name : str, default "copan:CORE"
Name of the coupled model.
"""
self.sim_name = sim_name
self.sim_path = create_subdirs(sim_path, self.sim_name)
Expand All @@ -350,14 +350,19 @@ def set_coupled(
)
# set coupling parameters
self._set_coupling(
inputs=coupled_input,
outputs=coupled_output,
start_year=coupled_year,
model_name=model_name,
inputs=coupled_input, outputs=coupled_output, start_year=coupled_year
)
# set start from directory to start from historic run
self._set_startfrom(path=f"{sim_path}/restart", dependency=dependency)

# add coupled config if provided
if coupled_config:
self.add_config(coupled_config)
if hasattr(self.coupled_config, "model") and isinstance(
self.coupled_config.model, str
):
self.coupled_model = self.coupled_config.model

def _set_output(
self,
output_path,
Expand Down Expand Up @@ -540,7 +545,7 @@ def _set_grid_explicitly(self, only_all=True):
)

def _set_coupling(
self, inputs, outputs, start_year=None, model_name="copan:CORE"
self, inputs, outputs, start_year=None, model_name="copan"
): # noqa
"""Coupled settings - no spinup, not write restart file and set sockets"""
self.write_restart = False
Expand Down Expand Up @@ -1135,12 +1140,9 @@ def __repr__(self, sub_repr=1, order=1):

for key, value in self.__dict__.items():
if isinstance(value, SubConfig):
summary += (
f"""{' ' * sub_repr}* {key}: {value.__repr__(
sub_repr + 1, order + 1
)}""".strip()
+ spacing
)
summary += f"""{' ' * sub_repr}* {key}: {value.__repr__(
sub_repr + 1, order + 1
)}""".strip() + spacing
else:
summary += (
f"{' ' * sub_repr}* {key:<20} {value}".strip() + spacing
Expand Down
127 changes: 107 additions & 20 deletions pycoupler/coupler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import tempfile
import copy
import warnings
import subprocess
Comment thread
jnnsbrr marked this conversation as resolved.
import atexit
from contextlib import contextmanager

import numpy as np
import pandas as pd
Expand All @@ -25,6 +28,57 @@
)
from pycoupler.utils import get_countries

# Port cleanup utilities ==================================================== #


def kill_process_on_port(port):
"""Kill any process using the specified port."""
try:
# Find processes using the port
result = subprocess.run(
["lsof", "-ti", f":{port}"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split("\n")
killed_count = 0
for pid in pids:
if pid.strip():
try:
subprocess.run(["kill", "-9", pid.strip()], timeout=5)
killed_count += 1
except subprocess.TimeoutExpired:
Comment thread
jnnsbrr marked this conversation as resolved.
Outdated
Comment thread
jnnsbrr marked this conversation as resolved.
pass
return killed_count
return 0
except (
subprocess.TimeoutExpired,
subprocess.CalledProcessError,
FileNotFoundError,
):
return -1


def cleanup_port_on_exit(port):
"""Register a cleanup function for the given port."""

def cleanup():
kill_process_on_port(port)

atexit.register(cleanup)


@contextmanager
def safe_port_binding(host, port):
"""Context manager for safe port binding with automatic cleanup."""
# Clean up any existing processes on the port first
kill_process_on_port(port)

try:
yield port
finally:
# Clean up on exit
kill_process_on_port(port)
Comment thread
jnnsbrr marked this conversation as resolved.


# class for testing purposes
class test_channel:
Expand Down Expand Up @@ -206,6 +260,9 @@ def opentdt(host, port):
if hasattr(sys, "_called_from_test"):
channel = test_channel()
else:
# Clean up any existing processes on the port first
kill_process_on_port(port)

# create an INET, STREAMing socket
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Expand Down Expand Up @@ -439,7 +496,7 @@ def get_historic_years(self, match_period=True):
generator
Generator for all historic years.
"""
start_year = self._sim_year
start_year = self._config.firstyear
end_year = self.config.start_coupling
if match_period and start_year >= end_year:
raise ValueError(
Expand Down Expand Up @@ -552,27 +609,33 @@ def read_historic_output(self, to_xarray=True):
Dictionary with output keys and corresponding output as numpy arrays
"""
# read all historic outputs
hist_years = list()
output_years = list()
for year in self.get_historic_years():
hist_years.append(year)
if year == self._config.outputyear:
output_dict = self.read_output(year=year, to_xarray=False)
output_years.append(year)
elif year > self._config.outputyear:
output_dict = append_to_dict(
output_dict, self.read_output(year=year, to_xarray=False)
)
output_years.append(year)

if not output_dict:
raise ValueError(
f"No historic output found for year {self._config.outputyear}"
)
Comment thread
jnnsbrr marked this conversation as resolved.

for key in output_dict:
if key in output_dict:
index = [
item[0] for item in self._output_ids.items() if item[1] == key
][0]
lpjml_output = self._create_xarray_template(
index, time_length=len(hist_years)
index, time_length=len(output_years)
)

lpjml_output.coords["time"] = pd.date_range(
start=str(hist_years[0]), end=str(hist_years[-1] + 1), freq="YE"
start=str(output_years[0]), end=str(output_years[-1] + 1), freq="YE"
)
lpjml_output.data = output_dict[key]
output_dict[key] = lpjml_output
Expand All @@ -583,8 +646,20 @@ def read_historic_output(self, to_xarray=True):
return output_dict

def close(self):
"""Close socket channel"""
self._channel.close()
"""Close socket channel and clean up port"""
if hasattr(self, "_channel") and self._channel:
self._channel.close()

# Clean up any processes still using the port
if hasattr(self, "_config") and hasattr(self._config, "coupled_port"):
kill_process_on_port(self._config.coupled_port)

def __del__(self):
"""Destructor to ensure cleanup on object deletion"""
try:
self.close()
except Exception:
pass # Ignore errors during cleanup

def send_input(self, input_dict, year):
"""Send input data of iterated year as dictionary to LPJmL.
Expand Down Expand Up @@ -1349,11 +1424,26 @@ def _send_input_data(self, data, year):
index = read_int(self._channel)
if isinstance(data, LPJmLDataSet):
data = data.to_numpy()
elif not isinstance(data[self._input_ids[index]], np.ndarray):

# Get the input data array
input_name = self._input_ids[index]
input_data = data[input_name]

# Convert to numpy array if it's not already
if not isinstance(input_data, np.ndarray):
if hasattr(input_data, "values"):
input_data = input_data.values
elif hasattr(input_data, "to_numpy"):
input_data = input_data.to_numpy()
else:
input_data = np.array(input_data)

# Ensure it's a numpy array
if not isinstance(input_data, np.ndarray):
self.close()
raise TypeError(
"Unsupported object type. Please supply a numpy "
+ "array with the dimension of (ncells, nband)."
f"Input data for '{input_name}' could not be converted to numpy array. "
+ f"Got type: {type(input_data)}"
)

# type check conversion
Expand All @@ -1362,10 +1452,10 @@ def _send_input_data(self, data, year):
elif self._input_types[index].type == int:
type_check = np.integer

if not np.issubdtype(data[self._input_ids[index]].dtype, type_check):
if not np.issubdtype(input_data.dtype, type_check):
self.close()
raise TypeError(
f"Unsupported type: {data[self._input_ids[index]].dtype} "
f"Unsupported type: {input_data.dtype} "
+ "Please supply a numpy array with data type: "
+ f"{np.dtype(self._input_types[index].type)}."
)
Expand All @@ -1376,21 +1466,18 @@ def _send_input_data(self, data, year):
if index in self._input_ids.keys():
# get corresponding number of bands from LPJmLInputType class
bands = LPJmLInputType(id=index).nband
if not np.shape(data[self._input_ids[index]]) == (self._ncell, bands):
if (
bands == 1
and not np.shape(data[self._input_ids[index]])[0] == self._ncell
): # noqa
if not np.shape(input_data) == (self._ncell, bands):
if bands == 1 and not np.shape(input_data)[0] == self._ncell: # noqa
self.close()
raise ValueError(
"The dimensions of the supplied data: "
+ f"{(self._ncell, bands)} does not match the "
+ f"needed dimensions for {self._input_ids[index]}"
+ f"{np.shape(input_data)} does not match the "
+ f"needed dimensions for {input_name}"
+ f": {(self._ncell, bands)}."
)
# execute sending values method to actually send the input to
# socket
self._send_input_values(data[self._input_ids[index]])
self._send_input_values(input_data)

def _send_input_values(self, data):
"""Iterate over all values to be sent to the socket. Recursive iteration
Expand Down
Loading
Loading