Source code for chiltepin.data

# SPDX-License-Identifier: Apache-2.0

"""Data transfer and management for Chiltepin workflows.

This module provides specialized tasks for transferring and deleting data between
Globus data transfer endpoints. These tasks integrate seamlessly with Parsl workflows.

Available Tasks
---------------
- :func:`transfer_task`: Transfer files/directories between Globus data endpoints
- :func:`delete_task`: Delete files/directories from Globus data endpoints

Available Functions
-------------------
- :func:`transfer`: Synchronous data transfer using Globus
- :func:`delete`: Synchronous data deletion using Globus

For comprehensive usage examples and best practices, see the :doc:`data` documentation.

Examples
--------
Stage, process, and cleanup data in a workflow::

    from chiltepin.data import transfer_task, delete_task
    from chiltepin.tasks import python_task

    @python_task
    def process_data(input_path):
        # Process the data
        return result

    # Stage data to compute resource
    stage = transfer_task(
        src_ep="my-laptop",
        dst_ep="hpc-scratch",
        src_path="/data/input.dat",
        dst_path="/scratch/input.dat",
        executor="local"
    )

    # Process the staged data
    result = process_data("/scratch/input.dat", executor="compute", dependencies=stage)

    # Clean up
    cleanup = delete_task(
        src_ep="hpc-scratch",
        src_path="/scratch/input.dat",
        executor="local",
        dependencies=result
    )
"""

from typing import Optional

from globus_sdk import TransferClient

import chiltepin.endpoint as endpoint
from chiltepin.tasks import python_task


[docs]@python_task def transfer_task( src_ep: str, dst_ep: str, src_path: str, dst_path: str, timeout: int = 3600, polling_interval: int = 30, client: Optional[TransferClient] = None, recursive: bool = False, ): """Transfer data asynchronously in a Parsl task This wraps synchronous Globus data transfer into a Parsl python_app task. Calling this function will immediately return a future. The result of the future will be True if the transfer completed successfully, or False if it did not. Parameters ---------- src_ep: str Name of the source endpoint for the transfer. Can be a display name or a UUID string. dst_ep: str Name of the destination endpoint for the transfer. Can be a display name or a UUID string. src_path: str Path to the file or directory on the source endpoint that is to be transferred. dst_path: str Path to the file or directory on the destination endpoint where the data is to be transferred. timeout: int Number of seconds to wait for the transfer to complete. polling_interval: int Number of seconds to wait between checking the status of the transfer client: TransferClient | None Transfer client to use for submitting the transfers. If None, one will be retreived via the login process. If a login has already been performed, no login flow prompts will be issued. recursive: bool Whether or not a recursive transfer should be performed """ # Run the transfer (executes in remote Parsl worker) completed = transfer( # pragma: no cover src_ep, dst_ep, src_path, dst_path, timeout=timeout, polling_interval=polling_interval, client=client, recursive=recursive, ) return completed # pragma: no cover
[docs]@python_task def delete_task( src_ep: str, src_path: str, timeout: int = 3600, polling_interval: int = 30, client: Optional[TransferClient] = None, recursive: bool = False, ): """Delete data asynchronously in a Parsl task This wraps synchronous Globus data deletion into a Parsl python_app task. Calling this function will immediately return a future. The result of the future will be True if the deletion completed successfully, or False if it did not. Parameters ---------- src_ep: str Name of the source endpoint for the data to be deleted. Can be a display name or a UUID string. src_path: str Path to the file or directory on the source endpoint that is to be deleted. timeout: int Number of seconds to wait for the deletion to complete. polling_interval: int Number of seconds to wait between checking the status of the deletion client: TransferClient | None Transfer client to use for submitting the deletion. If None, one will be retreived via the login process. If a login has already been performed, no login flow prompts will be issued. NOTE: Yes, deletion is done using a TransferClient. recursive: bool Whether or not a recursive deletion should be performed """ # Run the deletion (executes in remote Parsl worker) completed = delete( # pragma: no cover src_ep, src_path, timeout=timeout, polling_interval=polling_interval, client=client, recursive=recursive, ) return completed # pragma: no cover
[docs]def transfer( src_ep: str, dst_ep: str, src_path: str, dst_path: str, timeout: int = 3600, polling_interval: int = 30, client: Optional[TransferClient] = None, recursive: bool = False, ): """Transfer data synchronously with Globus This performs a Globus transfer of data from one Globus transfer endpoint to another. This function will not return until the transfer completes or fails. Parameters ---------- src_ep: str Name of the source endpoint for the transfer. Can be a display name or a UUID string. dst_ep: str Name of the destination endpoint for the transfer. Can be a display name or a UUID string. src_path: str Path to the file or directory on the source endpoint that is to be transferred. dst_path: str Path to the file or directory on the destination endpoint where the data is to be transferred. timeout: int Number of seconds to wait for the transfer to complete. polling_interval: int Number of seconds to wait between checking the status of the transfer client: TransferClient | None Transfer client to use for submitting the transfers. If None, one will be retreived via the login process. If a login has already been performed, no login flow prompts will be issued. recursive: bool Whether or not a recursive transfer should be performed """ import globus_sdk # Get transfer client if not client: clients = endpoint.login() client = clients["transfer"] # Get the source endpoint src_id = None for ep in client.endpoint_search(src_ep, filter_non_functional=False): if ep["display_name"] == src_ep or ep["id"] == src_ep: src_id = ep["id"] if not src_id: raise RuntimeError(f"Source endpoint '{src_ep}' could not be found") # Get the destination endpoint dst_id = None for ep in client.endpoint_search(dst_ep, filter_non_functional=False): if ep["display_name"] == dst_ep or ep["id"] == dst_ep: dst_id = ep["id"] if not dst_id: raise RuntimeError(f"Destination endpoint '{dst_ep}' could not be found") # Add data access scopes for both endpoints (just in case) # client.add_app_data_access_scope([src_id, dst_id]) # Build the transfer data task_data = globus_sdk.TransferData( client, source_endpoint=src_id, destination_endpoint=dst_id, ) task_data.add_item( src_path, dst_path, recursive=recursive, ) # Submit the transfer request try: task_doc = client.submit_transfer(task_data) task_id = task_doc["task_id"] done = client.task_wait( task_id, timeout=timeout, polling_interval=polling_interval ) return done except globus_sdk.TransferAPIError as err: if err.info.consent_required: raise RuntimeError( "Encountered a ConsentRequired error.\n" "You must login a second time to grant consents.\n\n" "err.info" ) else: raise RuntimeError(err)
[docs]def delete( src_ep: str, src_path: str, timeout: int = 3600, polling_interval: int = 30, client: Optional[TransferClient] = None, recursive: bool = False, ): """Delete data synchronously with Globus. This deletes data from a Globus endpoint. This function will not return until the deletion has completed or failed. Parameters ---------- src_ep: str Name of the source endpoint for the data to be deleted. Can be a display name or a UUID string. src_path: str Path to the file or directory on the source endpoint that is to be deleted. timeout: int Number of seconds to wait for the deletion to complete. polling_interval: int Number of seconds to wait between checking the status of the deletion client: TransferClient | None Transfer client to use for submitting the deletion. If None, one will be retreived via the login process. If a login has already been performed, no login flow prompts will be issued. NOTE: Yes, deletion is done using a TransferClient. recursive: bool Whether or not a recursive deletion should be performed """ import globus_sdk # Get transfer client if not client: clients = endpoint.login() client = clients["transfer"] # Get the source endpoint src_id = None for ep in client.endpoint_search(src_ep, filter_non_functional=False): if ep["display_name"] == src_ep or ep["id"] == src_ep: src_id = ep["id"] if not src_id: raise RuntimeError(f"Source endpoint '{src_ep}' could not be found") # Add data access scopes for both endpoints (just in case) # client.add_app_data_access_scope([src_id, dst_id]) # Build the delete data payload task_data = globus_sdk.DeleteData(client, src_id, recursive=True) task_data.add_item(src_path) # Submit the deletion request try: task_doc = client.submit_delete(task_data) task_id = task_doc["task_id"] done = client.task_wait( task_id, timeout=timeout, polling_interval=polling_interval, ) return done except globus_sdk.TransferAPIError as err: if err.info.consent_required: raise RuntimeError( "Encountered a ConsentRequired error.\n" "You must login a second time to grant consents.\n\n" "err.info" ) else: raise RuntimeError(err)