# SPDX-License-Identifier: Apache-2.0
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from globus_compute_sdk import Client, Executor
from parsl.config import Config
from parsl.executors import GlobusComputeExecutor, HighThroughputExecutor, MPIExecutor
from parsl.executors.base import ParslExecutor
from parsl.launchers import (
MpiExecLauncher,
SimpleLauncher,
SingleNodeLauncher,
SrunLauncher,
)
from parsl.providers import LocalProvider, PBSProProvider, SlurmProvider
from parsl.providers.base import ExecutionProvider
[docs]def parse_file(filename: str) -> Dict[str, Any]:
"""Parse a YAML resource comfiguration file and return its contents as a dict
Parameters
----------
filename: str
Name of configuration file to parse
Returns
-------
Dict[str, Any]
"""
# Open and parse the yaml config
with open(filename, "r") as stream:
try:
yaml_config = yaml.safe_load(stream)
except yaml.YAMLError as e:
print("Invalid yaml configuration")
raise (e)
return yaml_config
[docs]def create_provider(config: Dict[str, Any]) -> ExecutionProvider:
"""Create the appropriate ExecutionProvider from the given configuration
Parameters
----------
config: Dict[str, Any]
YAML configuration block that contains the following configuration options.
Not all options are valid for all providers.
Options for all providers:
Option key Default value
"mpi" False
"provider": "localhost"
"init blocks": 0
"min blocks": 0
"max blocks": 1
"environment": []
Options for Slurm provider:
"cores per node": 1
"nodes per block": 1
"exclusive": True
"partition": None
"queue": None
"account": None
"walltime": "00:10:00"
Options for PBSPro provider:
"cores per node": 1
"nodes per block": 1
"queue": None
"account": None
"walltime": "00:10:00"
Returns
-------
ExecutionProvider
"""
provider = config.get("provider", "localhost")
if provider == "slurm":
return SlurmProvider(
cores_per_node=(
None if config.get("mpi", False) else config.get("cores_per_node")
),
nodes_per_block=config.get("nodes_per_block", 1),
init_blocks=config.get("init_blocks", 0),
min_blocks=config.get("min_blocks", 0),
max_blocks=config.get("max_blocks", 1),
exclusive=config.get("exclusive", True),
partition=config.get("partition"),
qos=config.get("queue"),
account=config.get("account"),
walltime=config.get("walltime", "00:10:00"),
worker_init="\n".join(config.get("environment", [])),
launcher=(SimpleLauncher() if config.get("mpi", False) else SrunLauncher()),
)
elif provider == "pbspro":
return PBSProProvider(
cpus_per_node=(
None if config.get("mpi", False) else config.get("cores_per_node")
),
nodes_per_block=config.get("nodes_per_block", 1),
init_blocks=config.get("init_blocks", 0),
min_blocks=config.get("min_blocks", 0),
max_blocks=config.get("max_blocks", 1),
queue=config.get("queue"),
account=config.get("account"),
walltime=config.get("walltime", "00:10:00"),
worker_init="\n".join(config.get("environment", [])),
launcher=(
SimpleLauncher() if config.get("mpi", False) else MpiExecLauncher()
),
)
elif provider == "localhost":
return LocalProvider(
init_blocks=config.get("init_blocks", 0),
min_blocks=config.get("min_blocks", 0),
max_blocks=config.get("max_blocks", 1),
worker_init="\n".join(config.get("environment", [])),
launcher=(
SimpleLauncher() if config.get("mpi", False) else SingleNodeLauncher()
),
)
else:
raise ValueError(f"Unsupported provider: {provider}")
[docs]def create_htex_executor(name: str, config: Dict[str, Any]) -> HighThroughputExecutor:
"""Construct a HighThroughputExecutor from the input configuration
Parameters
----------
name: str
A label that will be assigned to the returned HighThroughputExecutor
for naming purposes
config: Dict[str, Any]
YAML configuration block that contains the following configuration options:
Option key Default value
"provider": "localhost"
"cores per node": 1
"nodes per block": 1
"init blocks": 0
"min blocks": 0
"max blocks": 1
"exclusive": True
"partition": None
"queue": None
"account": None
"walltime": "00:10:00"
"environment": []
Returns
-------
HighThroughputExecutor
"""
e = HighThroughputExecutor(
label=name,
cores_per_worker=config.get("cores_per_worker", 1),
max_workers_per_node=config.get("max_workers_per_node"),
provider=create_provider(config),
)
return e
[docs]def create_mpi_executor(
name: str,
config: Dict[str, Any],
) -> MPIExecutor:
"""Construct a MPIExecutor from the input configuration
Parameters
----------
name: str
A label that will be assigned to the returned MPIExecutor
for naming purposes
config: Dict[str, Any]
YAML configuration block that contains the following configuration options:
Option key Default value
"max mpi apps": 1
"mpi_launcher": "srun" for Slurm, otherwise "mpiexec"
"provider": "localhost"
"cores per node": 1
"nodes per block": 1
"init blocks": 0
"min blocks": 0
"max blocks": 1
"exclusive": True
"partition": None
"queue": None
"account": None
"walltime": "00:10:00"
"environment": []
Returns
-------
MPIExecutor
"""
default_launcher = (
"srun" if config.get("provider", "localhost") == "slurm" else "mpiexec"
)
e = MPIExecutor(
label=name,
mpi_launcher=config.get("mpi_launcher", default_launcher),
max_workers_per_block=config.get("max_mpi_apps", 1),
provider=create_provider(config),
)
return e
[docs]def create_globus_compute_executor(
name: str,
config: Dict[str, Any],
client: Optional[Client] = None,
) -> GlobusComputeExecutor:
"""Construct a GlobusComputeExecutor from the input configuration
Parameters
----------
name: str
A label that will be assigned to the returned GlobusComputeExecutor
for naming purposes
config: Dict[str, Any]
YAML configuration block that contains the following configuration options:
Option key Default value
"endpoint id": No default, this option is required
"mpi" False
"max mpi apps": 1
"mpi_launcher": "srun" for Slurm, otherwise "mpiexec"
"provider": "localhost"
"cores per node": 1
"nodes per block": 1
"init blocks": 0
"min blocks": 0
"max blocks": 1
"exclusive": True
"partition": ""
"queue": ""
"account": ""
"walltime": "00:10:00"
"environment": []
client: Client | None
The Globus Compute client to use for instantiating the GlobusComputeExecutor.
If not specified, Globus Compute will instantiate and use a default client.
Returns
-------
GlobusComputeExecutor
"""
default_launcher = (
"srun" if config.get("provider", "localhost") == "slurm" else "mpiexec"
)
e = GlobusComputeExecutor(
label=name,
executor=Executor(
endpoint_id=config["endpoint"],
client=client,
user_endpoint_config={
"mpi": config.get("mpi", False),
"max_mpi_apps": config.get("max_mpi_apps", 1),
"mpi_launcher": config.get("mpi_launcher", default_launcher),
"provider": config.get("provider", "localhost"),
"cores_per_node": config.get("cores_per_node", 1),
"nodes_per_block": config.get("nodes_per_block", 1),
"init_blocks": config.get("init_blocks", 0),
"min_blocks": config.get("min_blocks", 0),
"max_blocks": config.get("max_blocks", 1),
"exclusive": config.get("exclusive", True),
"partition": config.get("partition", ""),
"queue": config.get("queue", ""),
"account": config.get("account", ""),
"walltime": config.get("walltime", "00:10:00"),
"worker_init": "\n".join(config.get("environment", [])),
},
),
)
return e
[docs]def create_executor(
name: str,
config: Dict[str, Any],
client: Optional[Client] = None,
) -> ParslExecutor:
"""Create an Executor specified by the given resource configuration
Parameters
----------
name: str
The name of the resource
config: Dict[str, Any]
YAML configuration block that contains the resource's configuration
client: Client | None
A Globus Compute client to use when instantiating Globus Compute resources.
The default is None. If None, one will be instantiated automatically for
any Globus Compute resources in the configuration. Only applies to Globus
Compute resources
Returns
-------
ParslExecutor
"""
if config.get("endpoint"):
return create_globus_compute_executor(name, config, client)
else:
if config.get("mpi", False):
return create_mpi_executor(name, config)
else:
return create_htex_executor(name, config)
[docs]def load(
config: Dict[str, Any],
include: Optional[List[str]] = None,
client: Optional[Client] = None,
run_dir: Optional[str] = None,
) -> Config:
"""Return a Parsl Config initialized by a list of Executors created from
the input configuration dictionary.
The Config object returned by this function is used in parsl.load(config)
Parameters
----------
config: Dict[str, Any]
YAML configuration block that contains the configuration for a list of
resources
include: List[str] | None
A list of the labels of the resource configurations to load. The
default is None. If None, all resource configurations are loaded.
Otherwise the configurations for resources whose labels are in the
list will be loaded.
client: Client | None
A Globus Compute client to use when instantiating Globus Compute resources.
The default is None. If None, one will be instantiated automatically for
any Globus Compute resources in the configuration.
run_dir: str | None
The directory to use for runtime files. The default is None, which means
Parsl's default runinfo directory location will be used.
Returns
-------
Config
"""
# Get project root directory for setting PYTHONPATH
project_base = Path(__file__).parent.parent.parent.resolve()
# Define a default HTEX Executor with a local provider
# This includes adding project root directory to PYTHONPATH
executors = [
HighThroughputExecutor(
label="local",
worker_debug=True,
cores_per_worker=1,
max_workers_per_node=1,
provider=LocalProvider(
init_blocks=0,
max_blocks=1,
worker_init=f"export PYTHONPATH=${{PYTHONPATH}}:{project_base}",
),
)
]
# Add an Executor for each resource
if include is None:
resources = config
else:
# Validate that all requested resources exist
missing = [key for key in include if key not in config]
if missing:
raise RuntimeError(
f"Resources specified in include list not found in config: {missing}"
)
resources = {key: config[key] for key in include}
for resource_name, resource_config in resources.items():
executors.append(
create_executor(
resource_name,
resource_config,
client,
),
)
config_kwargs = {"executors": executors}
if run_dir is not None:
config_kwargs["run_dir"] = run_dir
return Config(**config_kwargs)