Skip to content

Server

icon.server.api

This module defines the API layer of ICON, implemented as a pydase.DataService.

The main entry point is the APIService, which is exposed by the IconServer. The IconServer itself is a pydase.Server hosting the API.

Structure

The APIService aggregates multiple “controller” services as attributes. Each controller is itself a pydase.DataService exposing related API methods.

Background tasks

Controllers can define periodic pydase tasks, which are asyncio tasks automatically started with the service.

api_service

APIService

APIService(
    pre_processing_event_queues: list[Queue[UpdateQueue]],
)

Bases: DataService

Aggregates ICON’s API controllers and manages background tasks.

The APIService groups multiple controllers, each of which is a pydase.DataService exposing related API methods. It also defines background tasks for keeping experiment and parameter metadata in sync with the experiment library and InfluxDB.

Note

Controllers are pydase.DataService instances exposed as attributes to group related API methods. Background tasks are implemented with pydase tasks.

Parameters:

Name Type Description Default
pre_processing_event_queues list[Queue[UpdateQueue]]

Queues used by ScansController to notify pre-processing workers.

required
Source code in src/icon/server/api/api_service.py
def __init__(
    self, pre_processing_event_queues: list[multiprocessing.Queue[UpdateQueue]]
) -> None:
    """
    Args:
        pre_processing_event_queues: Queues used by `ScansController` to notify
            pre-processing workers.
    """

    super().__init__()

    self.devices = DevicesController()
    """Controller for managing external pydase-based devices."""
    self.scheduler = SchedulerController(devices_controller=self.devices)
    """Controller to submit, inspect, and cancel scheduled jobs."""
    self.experiments = ExperimentsController()
    """Controller for experiment metadata."""
    self.parameters = ParametersController()
    """Controller for parameter metadata and shared parameter values."""
    self.config = ConfigurationController()
    """Controller for managing and updating the application's configuration."""
    self.data = ExperimentDataController()
    """Controller for accessing stored experiment data."""
    self.scans = ScansController(
        pre_processing_update_queues=pre_processing_event_queues
    )
    """Controller for triggering update events for jobs across multiple worker
    processes."""
    self.status = StatusController()
    """Controller for system status monitoring."""
config instance-attribute

Controller for managing and updating the application’s configuration.

data instance-attribute

Controller for accessing stored experiment data.

devices instance-attribute
devices = DevicesController()

Controller for managing external pydase-based devices.

experiments instance-attribute
experiments = ExperimentsController()

Controller for experiment metadata.

parameters instance-attribute
parameters = ParametersController()

Controller for parameter metadata and shared parameter values.

scans instance-attribute
scans = ScansController(
    pre_processing_update_queues=pre_processing_event_queues
)

Controller for triggering update events for jobs across multiple worker processes.

scheduler instance-attribute
scheduler = SchedulerController(devices_controller=devices)

Controller to submit, inspect, and cancel scheduled jobs.

status instance-attribute
status = StatusController()

Controller for system status monitoring.

configuration_controller

ConfigurationController

Bases: DataService

Controller for managing and updating the application’s configuration.

This class provides an API to get and update the configuration, validate it, and save the updated configuration back to the source file.

get_config
get_config() -> dict[str, Any]

Get current configuration dictionary.

Source code in src/icon/server/api/configuration_controller.py
def get_config(self) -> dict[str, Any]:
    """Get current configuration dictionary."""

    return get_config().model_dump()
update_config_option
update_config_option(key: str, value: Any) -> bool

Update a specific configuration option.

Traverses the configuration using the dot-separated key, updates the specified value, validates the entire configuration, and saves the changes.

Parameters:

Name Type Description Default
key str

The dot-separated key of the configuration option (e.g., “experiment_library.git_repository”).

required
value Any

The new value for the configuration option.

required

Returns:

Type Description
bool

True if the update is successful, False otherwise.

Source code in src/icon/server/api/configuration_controller.py
def update_config_option(self, key: str, value: Any) -> bool:
    """Update a specific configuration option.

    Traverses the configuration using the dot-separated key, updates the specified
    value, validates the entire configuration, and saves the changes.

    Args:
        key:
            The dot-separated key of the configuration option (e.g.,
            "experiment_library.git_repository").
        value:
            The new value for the configuration option.

    Returns:
        True if the update is successful, False otherwise.
    """

    try:
        # Traverse to the nested key
        fields = key.split(".")
        current_config = get_config().model_dump()
        current = current_config
        for field in fields[:-1]:
            if field not in current:
                raise KeyError(f"Key {key!r} not found in configuration.")
            current = current[field]

        # Update the value
        current[fields[-1]] = value

        # Validate the updated configuration
        updated_config = ServiceConfigV1(config_sources=DataSource(current_config))

        # Save the updated configuration back to the file
        self._save_configuration(updated_config)
        emit_queue.put(
            {"event": "config.update", "data": updated_config.model_dump()}
        )
        return True
    except KeyError as e:
        logger.exception("Failed to update configuration: %s", e)
        return False

devices_controller

DeviceParameterValueyType module-attribute

DeviceParameterValueyType = int | bool | float

Allowed primitive types for device parameter values.

A parameter value sent to or retrieved from a device may be one of these basic types. Quantities with units are handled separately via pydase.units.Quantity.

DevicesController

DevicesController()

Bases: DataService

Controller for managing external pydase-based devices.

Maintains client connections to configured devices, exposes helpers to add/update device entries in SQLite, and provides async accessors for device parameter values through pydase proxies. Also discovers scannable device parameters for integration with ICON scans.

Source code in src/icon/server/api/devices_controller.py
def __init__(self) -> None:
    super().__init__()
    self._devices: dict[str, pydase.Client] = {}
    self.device_proxies: dict[str, ProxyClass] = {}
    """Live pydase proxies keyed by device name."""
device_proxies instance-attribute
device_proxies: dict[str, ProxyClass] = {}

Live pydase proxies keyed by device name.

add_device
add_device(
    *,
    name: str,
    url: str,
    status: Literal["disabled", "enabled"] = "enabled",
    description: str | None = None,
    retry_delay_seconds: float = 0.0,
    retry_attempts: int = 3,
) -> Device

Create a device record in SQLite and (optionally) connect to it.

If status=="enabled", a non-blocking pydase client is created and its proxy is registered.

Parameters:

Name Type Description Default
name str

Unique device name.

required
url str

pydase server URL of the device.

required
status Literal['disabled', 'enabled']

Whether the device should be connected immediately.

'enabled'
description str | None

Optional human-readable description.

None
retry_delay_seconds float

Backoff delay used by device-side logic.

0.0
retry_attempts int

Number of retries used by device-side logic.

3

Returns:

Type Description
Device

The Device SQLAlchemy model.

Source code in src/icon/server/api/devices_controller.py
def add_device(  # noqa: PLR0913
    self,
    *,
    name: str,
    url: str,
    status: Literal["disabled", "enabled"] = "enabled",
    description: str | None = None,
    retry_delay_seconds: float = 0.0,
    retry_attempts: int = 3,
) -> Device:
    """Create a device record in SQLite and (optionally) connect to it.

    If `status=="enabled"`, a non-blocking pydase client is created and its
    proxy is registered.

    Args:
        name: Unique device name.
        url: pydase server URL of the device.
        status: Whether the device should be connected immediately.
        description: Optional human-readable description.
        retry_delay_seconds: Backoff delay used by device-side logic.
        retry_attempts: Number of retries used by device-side logic.

    Returns:
        The `Device` SQLAlchemy model.
    """

    device = DeviceRepository.add_device(
        device=Device(
            name=name,
            url=url,
            status=DeviceStatus(status),
            description=description,
            retry_delay_seconds=retry_delay_seconds,
            retry_attempts=retry_attempts,
        )
    )

    if status == "enabled":
        client = pydase.Client(
            url=device.url,
            client_id="icon-devices-controller",
            block_until_connected=False,
        )
        self._devices[name] = client
        self.device_proxies[name] = client.proxy

    return device
get_devices_by_status
get_devices_by_status(
    *, status: DeviceStatus | None = None
) -> dict[str, DeviceDict]

List devices (optionally filtered by status) with reachability & scan info.

Augments each device entry with

  • reachable: Whether a live proxy is connected.
  • scannable_params: Flat list of scannable parameter access paths.

Parameters:

Name Type Description Default
status DeviceStatus | None

Optional filter (ENABLED, DISABLED, or None for all).

None

Returns:

Type Description
dict[str, DeviceDict]

Mapping from device name to a DeviceDict payload suitable for the API.

Source code in src/icon/server/api/devices_controller.py
def get_devices_by_status(
    self, *, status: DeviceStatus | None = None
) -> dict[str, DeviceDict]:
    """List devices (optionally filtered by status) with reachability & scan info.

    Augments each device entry with

    - `reachable`: Whether a live proxy is connected.
    - `scannable_params`: Flat list of scannable parameter access paths.

    Args:
        status: Optional filter (`ENABLED`, `DISABLED`, or `None` for all).

    Returns:
        Mapping from device name to a `DeviceDict` payload suitable for the API.
    """

    device_dict: dict[str, DeviceDict] = {
        device.name: SQLAlchemyDictEncoder.encode(device)
        for device in DeviceRepository.get_devices_by_status(status=status)
    }

    for name, value in device_dict.items():
        client = self._devices.get(name, None)
        value["reachable"] = False
        value["scannable_params"] = []

        if client is not None:
            value["reachable"] = client.proxy.connected
            value["scannable_params"] = get_scannable_params_list(
                client.proxy.serialize(),
                prefix=f'devices.device_proxies["{name}"].',
            )

    return device_dict
get_parameter_value async
get_parameter_value(*, name: str, parameter_id: str) -> Any

Get a parameter value from a connected device.

Logs a warning if the device is not connected or not found.

Parameters:

Name Type Description Default
name str

Device name.

required
parameter_id str

Access path on the device service.

required

Returns:

Type Description
Any

The parameter value as returned by the device, or None if the device is unreachable or unknown.

Source code in src/icon/server/api/devices_controller.py
async def get_parameter_value(self, *, name: str, parameter_id: str) -> Any:
    """Get a parameter value from a connected device.

    Logs a warning if the device is not connected or not found.

    Args:
        name: Device name.
        parameter_id: Access path on the device service.

    Returns:
        The parameter value as returned by the device, or `None` if the device is
            unreachable or unknown.
    """

    try:
        return await asyncio.to_thread(
            self._devices[name].get_value, access_path=parameter_id
        )
    except BadNamespaceError:
        logger.warning(
            'Could not get %r. Device %r at ("%s") is not connected.',
            parameter_id,
            name,
            self._devices[name]._url,
        )
    except KeyError:
        logger.warning("Device with name %r not found. Is it enabled?", name)
update_device
update_device(
    *,
    name: str,
    status: Literal["disabled", "enabled"] | None = None,
    url: str | None = None,
    retry_attempts: int | None = None,
    retry_delay_seconds: float | None = None,
) -> Device

Update a device record and its live connection.

When transitioning to disabled, the client is disconnected and removed. When transitioning to enabled, a client is (re)created and registered.

Parameters:

Name Type Description Default
name str

Device name.

required
status Literal['disabled', 'enabled'] | None

Target enable/disable status.

None
url str | None

Updated pydase URL.

None
retry_attempts int | None

Updated retry attempts metadata.

None
retry_delay_seconds float | None

Updated retry delay metadata.

None

Returns:

Type Description
Device

The updated Device model.

Source code in src/icon/server/api/devices_controller.py
def update_device(
    self,
    *,
    name: str,
    status: Literal["disabled", "enabled"] | None = None,
    url: str | None = None,
    retry_attempts: int | None = None,
    retry_delay_seconds: float | None = None,
) -> Device:
    """Update a device record and its live connection.

    When transitioning to `disabled`, the client is disconnected and removed.
    When transitioning to `enabled`, a client is (re)created and registered.

    Args:
        name: Device name.
        status: Target enable/disable status.
        url: Updated pydase URL.
        retry_attempts: Updated retry attempts metadata.
        retry_delay_seconds: Updated retry delay metadata.

    Returns:
        The updated `Device` model.
    """

    device = DeviceRepository.update_device(
        name=name,
        url=url,
        status=DeviceStatus(status) if status is not None else None,
        retry_attempts=retry_attempts,
        retry_delay_seconds=retry_delay_seconds,
    )

    if status == "disabled" and name in self._devices:
        if name in self.device_proxies:
            self.device_proxies.pop(name)
        if name in self._devices:
            client = self._devices.pop(name)
            client.disconnect()
    elif status == "enabled":
        client = pydase.Client(
            url=device.url,
            client_id="icon-devices-controller",
            block_until_connected=False,
        )
        self._devices[name] = client
        self.device_proxies[device.name] = client.proxy

    return device
update_parameter_value async
update_parameter_value(
    *,
    name: str,
    parameter_id: str,
    new_value: DeviceParameterValueyType | QuantityDict,
    type_: Literal["float", "int", "Quantity"],
) -> None

Set a parameter value on a connected device.

Performs type-normalization (float, int, or Quantity) before delegating to the device client.

Logs a warning if the device is not connected or not found.

Parameters:

Name Type Description Default
name str

Device name.

required
parameter_id str

Access path on the device service.

required
new_value DeviceParameterValueyType | QuantityDict

New value (native type or quantity dict).

required
type_ Literal['float', 'int', 'Quantity']

Expected type of the value for normalization.

required
Source code in src/icon/server/api/devices_controller.py
async def update_parameter_value(
    self,
    *,
    name: str,
    parameter_id: str,
    new_value: DeviceParameterValueyType | u.QuantityDict,
    type_: Literal["float", "int", "Quantity"],
) -> None:
    """Set a parameter value on a connected device.

    Performs type-normalization (`float`, `int`, or `Quantity`) before delegating
    to the device client.

    Logs a warning if the device is not connected or not found.

    Args:
        name: Device name.
        parameter_id: Access path on the device service.
        new_value: New value (native type or quantity dict).
        type_: Expected type of the value for normalization.
    """

    if type_ == "float" and not isinstance(new_value, dict):
        new_value = float(new_value)
    elif type_ == "int" and not isinstance(new_value, dict):
        new_value = int(new_value)
    elif type_ == "Quantity" and isinstance(new_value, dict):
        new_value = u.Quantity(new_value["magnitude"], new_value["unit"])  # type: ignore

    try:
        await asyncio.to_thread(
            self._devices[name].update_value,
            access_path=parameter_id,
            new_value=new_value,
        )
    except BadNamespaceError:
        logger.warning(
            'Could not set %r. Device %r at ("%s") is not connected.',
            parameter_id,
            name,
            self._devices[name]._url,
        )
    except KeyError:
        logger.warning("Device with name %r not found. Is it enabled?", name)

experiment_data_controller

ExperimentDataController

Bases: DataService

Controller for accessing stored experiment data.

Provides API methods to fetch experiment data associated with jobs.

get_experiment_data_by_job_id async
get_experiment_data_by_job_id(
    job_id: int,
) -> ExperimentData

Return experiment data for a given job.

Parameters:

Name Type Description Default
job_id int

The unique identifier of the job.

required

Returns:

Type Description
ExperimentData

The experiment data linked to the job.

Source code in src/icon/server/api/experiment_data_controller.py
async def get_experiment_data_by_job_id(self, job_id: int) -> ExperimentData:
    """Return experiment data for a given job.

    Args:
        job_id: The unique identifier of the job.

    Returns:
        The experiment data linked to the job.
    """

    return ExperimentDataRepository.get_experiment_data_by_job_id(job_id=job_id)

experiments_controller

ExperimentsController

ExperimentsController()

Bases: DataService

Controller for experiment metadata.

Stores the current set of experiments and exposes them to the API. Updates are compared against the existing metadata and, if changes are detected, an update event is pushed to the Socket.IO emit queue.

Source code in src/icon/server/api/experiments_controller.py
def __init__(self) -> None:
    super().__init__()
    self._experiments: ExperimentDict = {}
get_experiments
get_experiments() -> ExperimentDict

Return the current experiment metadata.

Returns:

Type Description
ExperimentDict

Mapping of experiment IDs to their metadata.

Source code in src/icon/server/api/experiments_controller.py
def get_experiments(self) -> ExperimentDict:
    """Return the current experiment metadata.

    Returns:
        Mapping of experiment IDs to their metadata.
    """

    return self._experiments

models

device_dict

DeviceDict

Bases: TypedDict

Dictionary representation of a device returned by the API.

created instance-attribute
created: str

Creation timestamp in ISO format.

description instance-attribute
description: str | None

Optional human-readable description.

id instance-attribute
id: int

Database identifier of the device.

name instance-attribute
name: str

Unique device name.

reachable instance-attribute
reachable: bool

Whether the device is currently connected.

scannable_params instance-attribute
scannable_params: list[str]

List of scannable parameter access paths.

status instance-attribute
status: str

Device status, e.g. “enabled” or “disabled”.

url instance-attribute
url: str

pydase server URL of the device.

experiment_dict

ExperimentDict module-attribute
ExperimentDict = dict[str, ExperimentMetadata]

Dictionary mapping the unique experiment identifier to its metadata.

Example
experiment_dict: ExperimentDict = {
    "experiment_library.experiments.my_experiment.MyExperiment (Cool Det)": {
        "class_name": "MyExperiment",
        "constructor_kwargs": {
            "name": "Cool Det",
        },
        "parameters": {
            "Local Parameters": {
                "namespace='experiment_library.experiments.my_experiment.MyExperiment.Cool Det' parameter_group='default' param_type='ParameterTypes.AMPLITUDE'": {
                    "allowed_values": None,
                    "default_value": 0.0,
                    "display_name": "amplitude",
                    "max_value": 100.0,
                    "min_value": 0.0,
                    "unit": "%",
                },
            },
            "ParameterGroup": {
                "namespace='experiment_library.globals.global_parameters' parameter_group='ParameterGroup' param_type='ParameterTypes.AMPLITUDE'": {
                    "allowed_values": None,
                    "default_value": 0.0,
                    "display_name": "amplitude",
                    "max_value": 100.0,
                    "min_value": 0.0,
                    "unit": "%",
                },
            },
        },
    },
}
ExperimentMetadata

Bases: TypedDict

Metadata for a single experiment.

class_name instance-attribute
class_name: str

Name of the experiment class.

constructor_kwargs instance-attribute
constructor_kwargs: dict[str, Any]

Constructor keyword arguments used to instantiate the experiment.

parameters instance-attribute
parameters: dict[str, dict[str, ParameterMetadata]]

Mapping of display groups to parameter metadata.

parameter_metadata

ParameterMetadata

Bases: TypedDict

Metadata describing a single parameter.

allowed_values instance-attribute
allowed_values: list[Any] | None

Explicit list of allowed values (for ComboboxParameters), otherwise None.

default_value instance-attribute
default_value: float | int

Default value assigned to the parameter.

display_name instance-attribute
display_name: str

Human-readable name of the parameter.

max_value instance-attribute
max_value: float | None

Maximum allowed value for the parameter.

min_value instance-attribute
min_value: float | None

Minimum allowed value for the parameter.

unit instance-attribute
unit: str

Unit in which the parameter value is expressed.

scan_parameter

ScanParameter

Bases: TypedDict

Specification of a parameter to scan during a job.

device_name instance-attribute
device_name: NotRequired[str]

Name of the device this parameter belongs to.

If omitted, the parameter is assumed to be stored in InfluxDB.

id instance-attribute
id: str

Unique identifier of the parameter.

values instance-attribute
values: list[Any]

List of explicit values to scan for this parameter.

parameters_controller

ParametersController

ParametersController()

Bases: DataService

Controller for parameter metadata and shared parameter values.

Maintains metadata for all parameters and their display groups, exposes read/write access to parameter value via the API, and ensures parameters are initialized in the InfluxDB backend.

Source code in src/icon/server/api/parameters_controller.py
def __init__(self) -> None:
    super().__init__()
    self._all_parameter_metadata: dict[str, ParameterMetadata] = {}
    self._display_group_metadata: dict[str, dict[str, ParameterMetadata]] = {}
get_all_parameters
get_all_parameters() -> dict[str, DatabaseValueType]

Return the current values of all shared parameters.

Returns:

Type Description
dict[str, DatabaseValueType]

Mapping of parameter IDs to their values.

Source code in src/icon/server/api/parameters_controller.py
def get_all_parameters(self) -> dict[str, DatabaseValueType]:
    """Return the current values of all shared parameters.

    Returns:
        Mapping of parameter IDs to their values.
    """

    return dict(ParametersRepository.get_shared_parameters())
get_display_groups
get_display_groups() -> dict[
    str, dict[str, ParameterMetadata]
]

Return metadata grouped by display group.

Returns:

Type Description
dict[str, dict[str, ParameterMetadata]]

Mapping from display group names to parameter metadata.

Source code in src/icon/server/api/parameters_controller.py
def get_display_groups(self) -> dict[str, dict[str, ParameterMetadata]]:
    """Return metadata grouped by display group.

    Returns:
        Mapping from display group names to parameter metadata.
    """

    return self._display_group_metadata
initialise_parameters_repository
initialise_parameters_repository() -> None

Initialize the global ParametersRepository.

Loads existing parameters from InfluxDB, populates the shared parameters dict in the shared resource manager, and marks the ParametersRepository as initialized.

Source code in src/icon/server/api/parameters_controller.py
def initialise_parameters_repository(self) -> None:
    """Initialize the global `ParametersRepository`.

    Loads existing parameters from InfluxDB, populates the shared parameters dict in
    the shared resource manager, and marks the `ParametersRepository` as
    initialized.
    """

    icon.server.shared_resource_manager.parameters_dict.update(
        ParametersRepository.get_influxdb_parameters()
    )
    ParametersRepository.initialize(
        shared_parameters=icon.server.shared_resource_manager.parameters_dict
    )
    logger.info("ParametersRepository successfully initialised.")
update_parameter_by_id
update_parameter_by_id(
    parameter_id: str, value: Any
) -> None

Update a single parameter value in InfluxDB.

Parameters:

Name Type Description Default
parameter_id str

The unique identifier of the parameter.

required
value Any

The new value to assign.

required
Source code in src/icon/server/api/parameters_controller.py
def update_parameter_by_id(self, parameter_id: str, value: Any) -> None:
    """Update a single parameter value in InfluxDB.

    Args:
        parameter_id: The unique identifier of the parameter.
        value: The new value to assign.
    """

    ParametersRepository.update_parameters(parameter_mapping={parameter_id: value})

get_added_removed_and_updated_keys

get_added_removed_and_updated_keys(
    new_dict: dict[str, Any], cached_dict: dict[str, Any]
) -> tuple[list[str], list[str], list[str]]

Compare two dictionaries and return added, removed, and updated keys.

Parameters:

Name Type Description Default
new_dict dict[str, Any]

The latest dictionary state.

required
cached_dict dict[str, Any]

The previously cached dictionary state.

required

Returns:

Type Description
tuple[list[str], list[str], list[str]]

A tuple of three lists:

  • added keys
  • removed keys
  • updated keys (present in both but with changed values)
Source code in src/icon/server/api/parameters_controller.py
def get_added_removed_and_updated_keys(
    new_dict: dict[str, Any], cached_dict: dict[str, Any]
) -> tuple[list[str], list[str], list[str]]:
    """Compare two dictionaries and return added, removed, and updated keys.

    Args:
        new_dict: The latest dictionary state.
        cached_dict: The previously cached dictionary state.

    Returns:
        A tuple of three lists:

            - added keys
            - removed keys
            - updated keys (present in both but with changed values)
    """

    keys1 = set(cached_dict)
    keys2 = set(new_dict)

    added_keys = keys2 - keys1
    removed_keys = keys1 - keys2

    intersect_keys = keys1 & keys2
    updated_keys = {key for key in intersect_keys if new_dict[key] != cached_dict[key]}

    return list(added_keys), list(removed_keys), list(updated_keys)

scans_controller

ScansController

ScansController(
    pre_processing_update_queues: list[Queue[UpdateQueue]],
)

Bases: DataService

Controller for triggering update events for jobs across multiple worker processes.

Each worker process has its own update queue ([multiprocessing.Queue][]), which this controller writes to when an update event is triggered.

Source code in src/icon/server/api/scans_controller.py
def __init__(
    self,
    pre_processing_update_queues: list[multiprocessing.Queue[UpdateQueue]],
) -> None:
    super().__init__()
    self._pre_processing_update_queues = pre_processing_update_queues
trigger_update_job_params async
trigger_update_job_params(
    *, job_id: int | None = None
) -> None

Triggers an ‘update_parameters’ event for the given job ID.

Parameters:

Name Type Description Default
job_id int | None

The ID of the job whose parameters should be updated. If None, all jobs will update their parameters.

None
Source code in src/icon/server/api/scans_controller.py
async def trigger_update_job_params(self, *, job_id: int | None = None) -> None:
    """Triggers an 'update_parameters' event for the given job ID.

    Args:
        job_id: The ID of the job whose parameters should be updated. If None, all
            jobs will update their parameters.
    """

    for pre_processing_update_queue in self._pre_processing_update_queues:
        pre_processing_update_queue.put(
            {
                "event": "update_parameters",
                "job_id": job_id,
            }
        )

scheduler_controller

SchedulerController

SchedulerController(devices_controller: DevicesController)

Bases: DataService

Controller to submit, inspect, and cancel scheduled jobs.

Provides methods to submit new jobs, cancel pending or running jobs, and query jobs or runs by ID or status. Ensures scan parameters are cast to the correct runtime type before persisting them.

Parameters:

Name Type Description Default
devices_controller DevicesController

Reference to the devices controller. Used to read current values of device parameters when casting scan values.

required
Source code in src/icon/server/api/scheduler_controller.py
def __init__(self, devices_controller: DevicesController) -> None:
    """
    Args:
        devices_controller: Reference to the devices controller. Used to read
            current values of device parameters when casting scan values.
    """

    super().__init__()
    self._devices_controller = devices_controller
cancel_job
cancel_job(*, job_id: int) -> None

Cancel a queued or running job.

The following status updates are performed:

  • Job: PROCESSING/SUBMITTED → PROCESSED
  • JobRun: PENDING/PROCESSING → CANCELLED

Parameters:

Name Type Description Default
job_id int

ID of the job to cancel.

required
Source code in src/icon/server/api/scheduler_controller.py
def cancel_job(self, *, job_id: int) -> None:
    """Cancel a queued or running job.

    The following status updates are performed:

    - Job: PROCESSING/SUBMITTED → PROCESSED
    - JobRun: PENDING/PROCESSING → CANCELLED

    Args:
        job_id: ID of the job to cancel.
    """

    job = JobRepository.get_job_by_id(job_id=job_id)
    if job.status in (JobStatus.PROCESSING, JobStatus.SUBMITTED):
        JobRepository.update_job_status(job=job, status=JobStatus.PROCESSED)
        job_run = JobRunRepository.get_run_by_job_id(job_id=job_id)
        if job_run.status in (JobRunStatus.PENDING, JobRunStatus.PROCESSING):
            JobRunRepository.update_run_by_id(
                run_id=job_run.id,
                status=JobRunStatus.CANCELLED,
                log="Cancelled through user interaction.",
            )
get_job_by_id
get_job_by_id(*, job_id: int) -> Job

Fetch a job with its experiment source and scan parameters.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required

Returns:

Type Description
Job

The job record.

Source code in src/icon/server/api/scheduler_controller.py
def get_job_by_id(self, *, job_id: int) -> Job:
    """Fetch a job with its experiment source and scan parameters.

    Args:
        job_id: Job identifier.

    Returns:
        The job record.
    """

    return JobRepository.get_job_by_id(
        job_id=job_id, load_experiment_source=True, load_scan_parameters=True
    )
get_job_run_by_id
get_job_run_by_id(*, job_id: int) -> JobRun

Fetch the run record for a given job.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required

Returns:

Type Description
JobRun

The associated run record.

Source code in src/icon/server/api/scheduler_controller.py
def get_job_run_by_id(self, *, job_id: int) -> JobRun:
    """Fetch the run record for a given job.

    Args:
        job_id: Job identifier.

    Returns:
        The associated run record.
    """

    return JobRunRepository.get_run_by_job_id(job_id=job_id)
get_scheduled_jobs
get_scheduled_jobs(
    *,
    status: JobStatus | None = None,
    start: str | None = None,
    stop: str | None = None,
) -> dict[int, Job]

List jobs filtered by status and optional ISO timeframe.

Parameters:

Name Type Description Default
status JobStatus | None

Optional job status filter.

None
start str | None

Optional ISO8601 start timestamp (inclusive).

None
stop str | None

Optional ISO8601 stop timestamp (exclusive).

None

Returns:

Type Description
dict[int, Job]

Mapping from job ID to job record.

Source code in src/icon/server/api/scheduler_controller.py
def get_scheduled_jobs(
    self,
    *,
    status: JobStatus | None = None,
    start: str | None = None,
    stop: str | None = None,
) -> dict[int, Job]:
    """List jobs filtered by status and optional ISO timeframe.

    Args:
        status: Optional job status filter.
        start: Optional ISO8601 start timestamp (inclusive).
        stop: Optional ISO8601 stop timestamp (exclusive).

    Returns:
        Mapping from job ID to job record.
    """

    start_date = datetime.fromisoformat(start) if start is not None else None
    stop_date = datetime.fromisoformat(stop) if stop is not None else None

    return {
        job.id: job
        for job in JobRepository.get_jobs_by_status_and_timeframe(
            status=status, start=start_date, stop=stop_date
        )
    }
submit_job async
submit_job(
    *,
    experiment_id: str,
    scan_parameters: list[ScanParameter],
    priority: int = 20,
    local_parameters_timestamp: datetime | None = None,
    repetitions: int = 1,
    number_of_shots: int = 50,
    git_commit_hash: str | None = None,
    auto_calibration: bool = False,
) -> int

Create and submit a job with typed scan parameters.

Each scan parameter’s values are cast to the current type of the target parameter (device parameter via DevicesController or shared parameter via ParametersRepository).

Parameters:

Name Type Description Default
experiment_id str

Experiment identifier (from experiment library).

required
scan_parameters list[ScanParameter]

List of scan parameter specs (id, values, optional device_name).

required
priority int

Higher values run sooner.

20
local_parameters_timestamp datetime | None

ISO timestamp to snapshot local parameters; defaults to datetime.now(tz=timezone).

None
repetitions int

Number of experiment repetitions.

1
number_of_shots int

Shots per data point.

50
git_commit_hash str | None

Git commit to associate with the job; if None, job is marked debug_mode=True.

None
auto_calibration bool

Whether to run auto-calibration for the job.

False

Returns:

Type Description
int

The persisted job ID.

Source code in src/icon/server/api/scheduler_controller.py
async def submit_job(  # noqa: PLR0913
    self,
    *,
    experiment_id: str,
    scan_parameters: list[ScanParameter],
    priority: int = 20,
    local_parameters_timestamp: datetime | None = None,
    repetitions: int = 1,
    number_of_shots: int = 50,
    git_commit_hash: str | None = None,
    auto_calibration: bool = False,
) -> int:
    """Create and submit a job with typed scan parameters.

    Each scan parameter's values are cast to the current type of the target
    parameter (device parameter via `DevicesController` or shared parameter via
    `ParametersRepository`).

    Args:
        experiment_id: Experiment identifier (from experiment library).
        scan_parameters: List of scan parameter specs (id, values, optional
            device_name).
        priority: Higher values run sooner.
        local_parameters_timestamp: ISO timestamp to snapshot local parameters;
            defaults to `datetime.now(tz=timezone)`.
        repetitions: Number of experiment repetitions.
        number_of_shots: Shots per data point.
        git_commit_hash: Git commit to associate with the job; if `None`, job is
            marked `debug_mode=True`.
        auto_calibration: Whether to run auto-calibration for the job.

    Returns:
        The persisted job ID.
    """

    if local_parameters_timestamp is None:
        local_parameters_timestamp = datetime.now(tz=timezone)

    experiment_source = ExperimentSource(experiment_id=experiment_id)

    experiment_source = ExperimentSourceRepository.get_or_create_experiment(
        experiment_source=experiment_source
    )
    sqlite_scan_parameters = []

    for param in scan_parameters:
        scan_values = await self._cast_scan_values_to_param_type(
            scan_parameter=param
        )

        if len(scan_values) == 0:
            raise RuntimeError(f"Scan value of {param['id']} are empty")

        sqlite_scan_parameters.append(
            sqlite_scan_parameter.ScanParameter(
                variable_id=param["id"],
                scan_values=scan_values,
                device_id=DeviceRepository.get_device_by_name(
                    name=param["device_name"]
                ).id
                if "device_name" in param
                else None,
            )
        )
    job = Job(
        experiment_source=experiment_source,
        priority=priority,
        local_parameters_timestamp=local_parameters_timestamp,
        scan_parameters=sqlite_scan_parameters,
        repetitions=repetitions,
        git_commit_hash=git_commit_hash,
        number_of_shots=number_of_shots,
        auto_calibration=auto_calibration,
        debug_mode=git_commit_hash is None,
    )
    job = JobRepository.submit_job(job=job)

    return job.id

status_controller

StatusController

StatusController()

Bases: DataService

Controller for system status monitoring.

Periodically checks availability of InfluxDB and hardware and emits status events via the Socket.IO queue.

Source code in src/icon/server/api/status_controller.py
def __init__(self) -> None:
    super().__init__()
    self.__hardware_controller = HardwareController(connect=False)
    self._influxdb_available = False
    self._hardware_available = False
check_hardware_status async
check_hardware_status() -> None

Check hardware connection and reconnect if necessary.

Ensures the hardware controller matches the configured host/port and reconnects in a background thread if required.

Emits a "status.hardware" event to the Socket.IO queue.

Source code in src/icon/server/api/status_controller.py
async def check_hardware_status(self) -> None:
    """Check hardware connection and reconnect if necessary.

    Ensures the hardware controller matches the configured host/port and reconnects
    in a background thread if required.

    Emits a `"status.hardware"` event to the Socket.IO queue.
    """

    status = self.__hardware_controller.connected

    if (
        not status
        or self.__hardware_controller._host != get_config().hardware.host
        or self.__hardware_controller._port != get_config().hardware.port
    ):
        await asyncio.to_thread(self.__hardware_controller.connect)

    self._hardware_available = status
    emit_queue.put({"event": "status.hardware", "data": status})
check_influxdb_status
check_influxdb_status() -> None

Check if InfluxDB is responsive and update status.

Emits a "status.influxdb" event to the Socket.IO queue.

Source code in src/icon/server/api/status_controller.py
def check_influxdb_status(self) -> None:
    """Check if InfluxDB is responsive and update status.

    Emits a `"status.influxdb"` event to the Socket.IO queue.
    """

    status = influxdb_v1.is_responsive()

    self._influxdb_available = status
    emit_queue.put({"event": "status.influxdb", "data": status})
get_status
get_status() -> dict[str, bool]

Return the current system status flags.

Returns:

Type Description
dict[str, bool]

A dictionary with:

  • "influxdb": Whether InfluxDB is responsive.
  • "hardware": Whether the hardware connection is active.
Source code in src/icon/server/api/status_controller.py
def get_status(self) -> dict[str, bool]:
    """Return the current system status flags.

    Returns:
        A dictionary with:

            - `"influxdb"`: Whether InfluxDB is responsive.
            - `"hardware"`: Whether the hardware connection is active.
    """

    return {
        "influxdb": self._influxdb_available,
        "hardware": self._hardware_available,
    }

icon.server.data_access.models.enums

This module defines enums used by the SQLAlchemy models.

These enums represent database-level states for jobs, job runs, and devices. They are stored as strings in the database and used throughout ICON’s scheduling and device management logic.

DeviceStatus

Bases: Enum

Operational states of a device.

DISABLED class-attribute instance-attribute

DISABLED = 'disabled'

Device is disabled and should not be used.

ENABLED class-attribute instance-attribute

ENABLED = 'enabled'

Device is enabled and may be connected.

JobRunStatus

Bases: Enum

Lifecycle states of a job run.

CANCELLED class-attribute instance-attribute

CANCELLED = 'cancelled'

Run was cancelled before completion.

DONE class-attribute instance-attribute

DONE = 'done'

Run completed successfully.

FAILED class-attribute instance-attribute

FAILED = 'failed'

Run ended unsuccessfully due to an error.

PENDING class-attribute instance-attribute

PENDING = 'pending'

Run is queued but has not started yet.

PROCESSING class-attribute instance-attribute

PROCESSING = 'processing'

Run is currently executing.

JobStatus

Bases: Enum

Lifecycle states of a job submission.

PROCESSED class-attribute instance-attribute

PROCESSED = 'processed'

Job has finished or was cancelled and is no longer active.

PROCESSING class-attribute instance-attribute

PROCESSING = 'processing'

Job has been put into the pre-processing task queue.

SUBMITTED class-attribute instance-attribute

SUBMITTED = 'submitted'

Job has been created and is waiting to be scheduled.

icon.server.data_access.models.sqlite

This module contains the SQLAlchemy models for ICON.

All models must be imported and added to the __all__ list here so that Alembic can correctly detect them during schema autogeneration. Alembic inspects Base.metadata, which is only populated with models that are actually imported at runtime.

Base

Bases: DeclarativeBase

Base class for all SQLAlchemy ORM models in ICON.

This class configures the declarative mapping and provides a datetime type mapping for all models that inherit from it.

type_annotation_map class-attribute

type_annotation_map: dict[type, Any] = {
    datetime: TIMESTAMP(timezone=True)
}

Custom type mapping used when interpreting Python type annotations.

Currently, datetime.datetime is mapped to sqlalchemy.TIMESTAMP(timezone=True) to ensure timezone-aware timestamps across all models.

Device

Bases: Base

SQLAlchemy model for a registered device.

Represents an external device accessible via a pydase service. Stores configuration, connection details, and retry behaviour. A device may be linked to multiple scan parameters.

created class-attribute instance-attribute

created: Mapped[datetime] = mapped_column(
    default=lambda: now(timezone)
)

Timestamp when the device entry was created.

description class-attribute instance-attribute

description: Mapped[str | None] = mapped_column(
    default=None
)

Optional human-readable description of the device.

id class-attribute instance-attribute

id: Mapped[int] = mapped_column(
    primary_key=True, autoincrement=True
)

Primary key identifier for the device.

name class-attribute instance-attribute

name: Mapped[str] = mapped_column(unique=True, index=True)

Unique name of the device.

retry_attempts class-attribute instance-attribute

retry_attempts: Mapped[int] = mapped_column(
    default=3, nullable=False
)

Number of attempts to verify the device value was set correctly.

retry_delay_seconds class-attribute instance-attribute

retry_delay_seconds: Mapped[float] = mapped_column(
    default=0.0, nullable=False
)

Delay in seconds between retry attempts

scan_parameters class-attribute instance-attribute

scan_parameters: Mapped[list[ScanParameter]] = relationship(
    "ScanParameter", back_populates="device"
)

Relationship to scan parameters linked to this device.

status class-attribute instance-attribute

status: Mapped[DeviceStatus] = mapped_column(
    default=ENABLED, index=True
)

Current status of the device (enabled or disabled).

url class-attribute instance-attribute

pydase service URL of the device.

ExperimentSource

Bases: Base

SQLAlchemy model for experiment sources.

Represents a unique experiment identifier from the experiment library. Each experiment source may be linked to multiple jobs.

experiment_id class-attribute instance-attribute

experiment_id: Mapped[str] = mapped_column()

Unique experiment identifier string (as defined in the experiment library).

id class-attribute instance-attribute

id: Mapped[int] = mapped_column(
    primary_key=True, autoincrement=True
)

Primary key identifier for the experiment source.

jobs class-attribute instance-attribute

jobs: Mapped[list[Job]] = relationship(
    back_populates="experiment_source"
)

Relationship to jobs associated with this experiment source.

Job

Bases: Base

SQLAlchemy model for experiment jobs.

Represents a scheduled or running experiment job, including its metadata, status, and relationships to experiment sources, runs, and scan parameters.

Constraints
  • priority must be between 0 and 20.
  • Indexed by (experiment_source_id, status, priority, created).

auto_calibration class-attribute instance-attribute

auto_calibration: Mapped[bool] = mapped_column(
    default=False
)

Whether auto-calibration is enabled for this job. Currently unused.

created class-attribute instance-attribute

created: Mapped[datetime] = mapped_column(
    default=lambda: now(timezone)
)

Timestamp when the job was created. This cannot be set manually.

debug_mode class-attribute instance-attribute

debug_mode: Mapped[bool] = mapped_column(default=False)

Whether the job was submitted in debug mode (no commit hash).

experiment_source class-attribute instance-attribute

experiment_source: Mapped[ExperimentSource] = relationship(
    back_populates="jobs"
)

Relationship to the experiment source.

experiment_source_id class-attribute instance-attribute

experiment_source_id: Mapped[int] = mapped_column(
    ForeignKey("experiment_sources.id")
)

Foreign key referencing the associated experiment source.

git_commit_hash class-attribute instance-attribute

git_commit_hash: Mapped[str | None] = mapped_column(
    default=None
)

Git commit hash of the experiment code associated with the job.

id class-attribute instance-attribute

id: Mapped[int] = mapped_column(
    primary_key=True, autoincrement=True
)

Primary key identifier for the job.

local_parameters_timestamp class-attribute instance-attribute

local_parameters_timestamp: Mapped[datetime] = (
    mapped_column(default=now(timezone))
)

Timestamp of the local parameter snapshot used for this job.

number_of_shots class-attribute instance-attribute

number_of_shots: Mapped[int] = mapped_column(default=50)

Number of shots per repetition.

parent_job class-attribute instance-attribute

parent_job: Mapped[Job | None] = relationship(
    "Job",
    remote_side=[id],
    back_populates="resubmitted_jobs",
)

Relationship to the parent job from which this job was resubmitted.

parent_job_id class-attribute instance-attribute

parent_job_id: Mapped[int | None] = mapped_column(
    ForeignKey("job_submissions.id"), nullable=True
)

Foreign key referencing the original job if this job was resubmitted.

priority class-attribute instance-attribute

priority: Mapped[int] = mapped_column(default=20)

Job priority, between 0 (lowest) and 20 (highest).

repetitions class-attribute instance-attribute

repetitions: Mapped[int] = mapped_column(default=1)

Number of times the experiment should be repeated.

resubmitted_jobs class-attribute instance-attribute

resubmitted_jobs: Mapped[list[Job]] = relationship(
    "Job", back_populates="parent_job"
)

List of jobs resubmitted from this job.

run class-attribute instance-attribute

run: Mapped[JobRun] = relationship(back_populates='job')

Relationship to the job run associated with this job.

scan_parameters class-attribute instance-attribute

scan_parameters: Mapped[list[ScanParameter]] = relationship(
    back_populates="job"
)

List of scan parameters associated with this job.

status class-attribute instance-attribute

Current status of the job (submitted, processing, etc.).

JobRun

Bases: Base

SQLAlchemy model for job runs.

Represents the execution of a job, including its scheduled time, current status, and log messages.

Constraints
  • Indexed by (job_id, status, scheduled_time).
  • scheduled_time must be unique across runs.

id class-attribute instance-attribute

id: Mapped[int] = mapped_column(
    primary_key=True, autoincrement=True
)

Primary key identifier for the job run.

job class-attribute instance-attribute

job: Mapped[Job] = relationship(back_populates='run')

Relationship to the job associated with this run.

job_id class-attribute instance-attribute

job_id: Mapped[int] = mapped_column(
    ForeignKey("job_submissions.id")
)

Foreign key referencing the job being executed.

log class-attribute instance-attribute

log: Mapped[str | None] = mapped_column(default=None)

Optional log message for this run (e.g., cancellation reason).

scheduled_time class-attribute instance-attribute

scheduled_time: Mapped[datetime] = mapped_column(
    default=now(timezone)
)

Time when the run was scheduled to start.

status class-attribute instance-attribute

status: Mapped[JobRunStatus] = mapped_column(
    default=PENDING
)

Current status of the run (pending, processing, cancelled, etc.).

ScanParameter

Bases: Base

SQLAlchemy model for scan parameters.

Represents a parameter scanned during a job execution. Each parameter is linked to a job and optionally to a device.

device class-attribute instance-attribute

device: Mapped[Device | None] = relationship(
    back_populates="scan_parameters", lazy="joined"
)

Relationship to the device associated with this parameter.

device_id class-attribute instance-attribute

device_id: Mapped[int | None] = mapped_column(
    ForeignKey("devices.id"), nullable=True
)

Foreign key referencing the associated device, if any.

id class-attribute instance-attribute

id: Mapped[int] = mapped_column(
    primary_key=True, autoincrement=True
)

Primary key identifier for the scan parameter.

job class-attribute instance-attribute

job: Mapped[Job] = relationship(
    back_populates="scan_parameters"
)

Relationship to the job.

job_id class-attribute instance-attribute

job_id: Mapped[int] = mapped_column(
    ForeignKey("job_submissions.id")
)

Foreign key referencing the job this parameter belongs to.

scan_values class-attribute instance-attribute

scan_values: Mapped[list[DatabaseValueType]] = (
    mapped_column(JSONEncodedList, nullable=False)
)

List of values scanned for this parameter (stored as JSON).

variable_id class-attribute instance-attribute

variable_id: Mapped[str] = mapped_column()

Identifier of the parameter being scanned.

unique_id

unique_id() -> str

Return a unique identifier for the parameter.

Returns:

Type Description
str

"Device(<device_name>) <variable_id>" if a device is associated, otherwise just <variable_id>.

Source code in src/icon/server/data_access/models/sqlite/scan_parameter.py
def unique_id(self) -> str:
    """Return a unique identifier for the parameter.

    Returns:
        `"Device(<device_name>) <variable_id>"` if a device is associated, otherwise
            just `<variable_id>`.
    """

    return (
        f"Device({self.device.name}) {self.variable_id}"
        if self.device is not None
        else self.variable_id
    )

icon.server.data_access.repositories

This module contains the repository layer for ICON’s data access.

Repositories encapsulate database access logic and hide the underlying persistence technology (SQLAlchemy sessions, InfluxDB queries, etc.) from the rest of the application. They expose simple, intention-revealing methods for creating, retrieving, and updating domain objects, while emitting Socket.IO events when relevant.

By using repositories, controllers and services can work with high-level operations (e.g. “submit a job”, “update a device”) without needing to know how the data is stored or which database backend is used. This keeps the codebase modular, easier to maintain, and allows the persistence layer to evolve independently of business logic.

device_repository

DeviceRepository

Repository for Device entities.

Provides methods to create, update, and query devices in the SQLite database. All methods open their own SQLAlchemy session and return detached ORM objects.

add_device staticmethod
add_device(*, device: Device) -> Device

Insert a new device into the database.

Parameters:

Name Type Description Default
device Device

Device instance to persist.

required

Returns:

Type Description
Device

The persisted device with database-generated fields (e.g., id) populated.

Source code in src/icon/server/data_access/repositories/device_repository.py
@staticmethod
def add_device(*, device: Device) -> Device:
    """Insert a new device into the database.

    Args:
        device: Device instance to persist.

    Returns:
        The persisted device with database-generated fields (e.g., `id`) populated.
    """

    with sqlalchemy.orm.session.Session(engine) as session:
        session.add(device)
        session.commit()
        session.refresh(device)
        logger.debug("Added new device %s", device)

    emit_queue.put(
        {
            "event": "device.new",
            "data": {
                "device": SQLAlchemyDictEncoder.encode(obj=device),
            },
        }
    )

    return device
get_all_device_names staticmethod
get_all_device_names() -> Sequence[str]

Return the names of all devices.

Returns:

Type Description
Sequence[str]

List of device names.

Source code in src/icon/server/data_access/repositories/device_repository.py
@staticmethod
def get_all_device_names() -> Sequence[str]:
    """Return the names of all devices.

    Returns:
        List of device names.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = sqlalchemy.select(Device.name)
        return session.execute(stmt).scalars().all()
get_device_by_id staticmethod
get_device_by_id(*, id: int) -> Device

Return a device by database ID.

Parameters:

Name Type Description Default
id int

Primary key identifier of the device.

required

Returns:

Type Description
Device

The matching device.

Raises:

Type Description
NoResultFound

If no device exists with the given ID.

Source code in src/icon/server/data_access/repositories/device_repository.py
@staticmethod
def get_device_by_id(*, id: int) -> Device:
    """Return a device by database ID.

    Args:
        id: Primary key identifier of the device.

    Returns:
        The matching device.

    Raises:
        NoResultFound: If no device exists with the given ID.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = sqlalchemy.select(Device).where(Device.id == id)
        return session.execute(stmt).scalar_one()
get_device_by_name staticmethod
get_device_by_name(*, name: str) -> Device

Return a device by unique name.

Parameters:

Name Type Description Default
name str

Device name.

required

Returns:

Type Description
Device

The matching device.

Raises:

Type Description
NoDeviceFoundError

If no device exists with the given name.

Source code in src/icon/server/data_access/repositories/device_repository.py
@staticmethod
def get_device_by_name(*, name: str) -> Device:
    """Return a device by unique name.

    Args:
        name: Device name.

    Returns:
        The matching device.

    Raises:
        NoDeviceFoundError: If no device exists with the given name.
    """

    try:
        with sqlalchemy.orm.Session(engine) as session:
            stmt = sqlalchemy.select(Device).where(Device.name == name)
            return session.execute(stmt).scalar_one()
    except sqlalchemy.exc.NoResultFound:
        raise NoDeviceFoundError(
            f"Device with name {name!r} does not exist.",
        )
get_devices_by_status staticmethod
get_devices_by_status(
    *, status: DeviceStatus | None = None
) -> Sequence[Device]

Return devices filtered by status.

Parameters:

Name Type Description Default
status DeviceStatus | None

Optional device status to filter on.

None

Returns:

Type Description
Sequence[Device]

All devices matching the filter (or all devices if no filter is given).

Source code in src/icon/server/data_access/repositories/device_repository.py
@staticmethod
def get_devices_by_status(
    *,
    status: DeviceStatus | None = None,
) -> Sequence[Device]:
    """Return devices filtered by status.

    Args:
        status: Optional device status to filter on.

    Returns:
        All devices matching the filter (or all devices if no filter is given).
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = sqlalchemy.select(Device)

        if status is not None:
            stmt = stmt.where(Device.status == status)

        return session.execute(stmt).scalars().all()
update_device staticmethod
update_device(
    *,
    name: str,
    url: str | None = None,
    status: DeviceStatus | None = None,
    retry_attempts: int | None = None,
    retry_delay_seconds: float | None = None,
) -> Device

Update an existing device by name.

Parameters:

Name Type Description Default
name str

Unique device name.

required
url str | None

New device URL (cannot change if the device is enabled).

None
status DeviceStatus | None

New device status (enabled/disabled).

None
retry_attempts int | None

Updated retry attempt count.

None
retry_delay_seconds float | None

Updated retry delay in seconds.

None

Returns:

Type Description
Device

The updated device.

Raises:

Type Description
RuntimeError

If attempting to change the URL of an enabled device.

NoDeviceFoundError

If no device with the given name exists.

Source code in src/icon/server/data_access/repositories/device_repository.py
@staticmethod
def update_device(
    *,
    name: str,
    url: str | None = None,
    status: DeviceStatus | None = None,
    retry_attempts: int | None = None,
    retry_delay_seconds: float | None = None,
) -> Device:
    """Update an existing device by name.

    Args:
        name: Unique device name.
        url: New device URL (cannot change if the device is enabled).
        status: New device status (enabled/disabled).
        retry_attempts: Updated retry attempt count.
        retry_delay_seconds: Updated retry delay in seconds.

    Returns:
        The updated device.

    Raises:
        RuntimeError: If attempting to change the URL of an enabled device.
        NoDeviceFoundError: If no device with the given name exists.
    """

    updated_properties = {
        name: new_value
        for name, new_value in {
            "url": url,
            "status": status if status is not None else None,
            "retry_attempts": retry_attempts,
            "retry_delay_seconds": retry_delay_seconds,
        }.items()
        if new_value is not None
    }

    if "url" in updated_properties:
        device = DeviceRepository.get_device_by_name(name=name)
        if device.status == DeviceStatus.ENABLED:
            raise RuntimeError("Cannot change url of an enabled device")

    with sqlalchemy.orm.Session(engine) as session:
        session.execute(
            update(Device).where(Device.name == name).values(updated_properties)
        )
        session.commit()

        device = session.execute(
            select(Device).where(Device.name == name)
        ).scalar_one()
        session.expunge(device)

        logger.debug("Updated device %s", device)

    serialized_properties = {
        key: value.value if isinstance(value, enum.Enum) else value
        for key, value in updated_properties.items()
    }

    if "status" in updated_properties:
        serialized_properties["reachable"] = False

    emit_queue.put(
        {
            "event": "device.update",
            "data": {
                "device_name": device.name,
                "updated_properties": serialized_properties,
            },
        }
    )

    return device

NoDeviceFoundError

Bases: Exception

Raised when a device could not be found by the given identifier.

experiment_data_repository

ExperimentData

Bases: TypedDict

Container for all experiment data returned to the API.

json_sequences instance-attribute
json_sequences: list[list[int | str]]

List of [index, sequence_json] pairs (list for pydase JSON compatibility).

plot_windows instance-attribute
plot_windows: PlotWindowsDict

Plot window metadata grouped by channel class.

result_channels instance-attribute
result_channels: dict[str, dict[int, float]]

Result channels as channel_name -> {index -> value}.

scan_parameters instance-attribute
scan_parameters: dict[str, dict[int, str | float]]

Scan parameters as param_id -> {index -> value/timestamp}.

shot_channels instance-attribute
shot_channels: dict[str, dict[int, list[int]]]

Shot channels as channel_name -> {index -> values}.

vector_channels instance-attribute
vector_channels: dict[str, dict[int, list[float]]]

Vector channels as channel_name -> {index -> values}.

ExperimentDataPoint

Bases: ResultDict

A single data point with its context.

index instance-attribute
index: int

Sequential index of this data point.

scan_params instance-attribute
scan_params: dict[str, DatabaseValueType]

Parameter values that produced this data point.

sequence_json instance-attribute
sequence_json: str

Serialized sequence JSON used for this data point.

timestamp instance-attribute
timestamp: str

Acquisition timestamp (ISO string).

ExperimentDataRepository

Repository for HDF5-based experiment data.

Manages HDF5 file creation and updates (metadata, results, parameters), with file-level locking to support concurrent writers.

get_experiment_data_by_job_id staticmethod
get_experiment_data_by_job_id(
    *, job_id: int
) -> ExperimentData

Load all stored data for a job from its HDF5 file.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required

Returns:

Type Description
ExperimentData

Experiment data payload suitable for the API.

Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
@staticmethod
def get_experiment_data_by_job_id(
    *,
    job_id: int,
) -> ExperimentData:
    """Load all stored data for a job from its HDF5 file.

    Args:
        job_id: Job identifier.

    Returns:
        Experiment data payload suitable for the API.
    """

    data: ExperimentData = {
        "plot_windows": {
            "result_channels": [],
            "shot_channels": [],
            "vector_channels": [],
        },
        "shot_channels": {},
        "result_channels": {},
        "vector_channels": {},
        "scan_parameters": {},
        "json_sequences": [],
    }

    filename = get_filename_by_job_id(job_id)
    file = f"{get_config().data.results_dir}/{filename}"

    if not os.path.exists(file):
        logger.warning("The file %s does not exist.", file)
        return data

    lock_path = (
        f"{get_config().data.results_dir}/.{filename}"
        f"{ExperimentDataRepository.LOCK_EXTENSION}"
    )
    with FileLock(lock_path), h5py.File(file, "r") as h5file:
        scan_parameters: npt.NDArray = h5file["scan_parameters"][:]  # type: ignore
        data["scan_parameters"] = {
            param: {
                index: value[0].item().decode()
                if isinstance(value[0], np.bytes_)
                else value[0].item()
                for index, value in enumerate(scan_parameters[param])
            }
            for param in cast("tuple[str, ...]", scan_parameters.dtype.names)
        }

        result_channel_dataset = cast("h5py.Dataset", h5file["result_channels"])
        data["plot_windows"]["result_channels"] = json.loads(
            cast("str", result_channel_dataset.attrs["Plot window metadata"])
        )
        result_channels = cast("npt.NDArray", result_channel_dataset[:])  # type: ignore
        data["result_channels"] = {
            channel_name: dict(
                enumerate(
                    cast("list[float]", result_channels[channel_name].tolist())
                )
            )
            for channel_name in cast("tuple[str, ...]", result_channels.dtype.names)
        }

        # Convert shot channels into dicts with index as key
        shot_channels_group = cast("h5py.Group", h5file["shot_channels"])
        data["plot_windows"]["shot_channels"] = json.loads(
            cast("str", shot_channels_group.attrs["Plot window metadata"])
        )
        data["shot_channels"] = {
            key: dict(enumerate(value[:].tolist()))  # type: ignore
            for key, value in cast(
                "Sequence[tuple[str, h5py.Dataset]]", shot_channels_group.items()
            )
        }

        vector_channels_group = cast("h5py.Group", h5file["vector_channels"])
        data["plot_windows"]["vector_channels"] = json.loads(
            cast("str", vector_channels_group.attrs["Plot window metadata"])
        )
        data["vector_channels"] = {
            channel_name: {
                int(data_point): vector_dataset[:].tolist()
                for data_point, vector_dataset in cast(
                    "Sequence[tuple[str, h5py.Dataset]]", vector_group.items()
                )
            }
            for channel_name, vector_group in cast(
                "Sequence[tuple[str, h5py.Group]]", vector_channels_group.items()
            )
        }

        sequence_json_dataset = cast("h5py.Dataset", h5file["sequence_json"])
        data["json_sequences"] = [
            [cast("np.int32", entry["index"]).item(), entry["Sequence"].decode()]
            for entry in sequence_json_dataset
        ]
        return data
update_metadata_by_job_id staticmethod
update_metadata_by_job_id(
    *,
    job_id: int,
    number_of_shots: int,
    repetitions: int,
    readout_metadata: ReadoutMetadata,
    local_parameter_timestamp: datetime | None = None,
    parameters: list[ScanParameter] = [],
) -> None

Create or update HDF5 metadata for a job.

Initializes datasets, sets file-level attributes, and stores plot window metadata for result/shot/vector channels.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required
number_of_shots int

Shots per data point.

required
repetitions int

Number of repetitions.

required
readout_metadata ReadoutMetadata

Plot/window/channel metadata.

required
local_parameter_timestamp datetime | None

Optional timestamp for local parameters.

None
parameters list[ScanParameter]

Scan parameters.

[]
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
@staticmethod
def update_metadata_by_job_id(  # noqa: PLR0913
    *,
    job_id: int,
    number_of_shots: int,
    repetitions: int,
    readout_metadata: ReadoutMetadata,
    local_parameter_timestamp: datetime | None = None,
    parameters: list[ScanParameter] = [],
) -> None:
    """Create or update HDF5 metadata for a job.

    Initializes datasets, sets file-level attributes, and stores plot window
    metadata for result/shot/vector channels.

    Args:
        job_id: Job identifier.
        number_of_shots: Shots per data point.
        repetitions: Number of repetitions.
        readout_metadata: Plot/window/channel metadata.
        local_parameter_timestamp: Optional timestamp for local parameters.
        parameters: Scan parameters.
    """

    filename = get_filename_by_job_id(job_id)
    file = f"{get_config().data.results_dir}/{filename}"

    job = JobRepository.get_job_by_id(job_id=job_id, load_experiment_source=True)

    lock_path = (
        f"{get_config().data.results_dir}/.{filename}"
        f"{ExperimentDataRepository.LOCK_EXTENSION}"
    )
    with FileLock(lock_path), h5py.File(file, "a") as h5file:
        h5file.attrs["number_of_data_points"] = 0
        h5file.attrs["number_of_shots"] = number_of_shots
        h5file.attrs["experiment_id"] = job.experiment_source.experiment_id
        h5file.attrs["job_id"] = job_id
        h5file.attrs["repetitions"] = repetitions
        if local_parameter_timestamp is not None:
            h5file.attrs["local_parameter_timestamp"] = local_parameter_timestamp

        scan_parameter_dtype = [
            ("timestamp", "S26"),
            *[(param.variable_id, np.float64) for param in parameters],
        ]
        h5file.create_dataset(
            "scan_parameters",
            shape=(0, 1),
            maxshape=(None, 1),
            chunks=True,
            dtype=scan_parameter_dtype,
            compression="gzip",
            compression_opts=9,
        )

        for parameter in parameters:
            if parameter.device is not None:
                h5file["scan_parameters"].attrs[parameter.unique_id()] = (
                    f"name={parameter.device.name} url={parameter.device.url}"
                    f"description={parameter.device.description}"
                )

        result_dataset = get_result_channels_dataset(
            h5file=h5file, result_channels=readout_metadata["readout_channel_names"]
        )

        result_dataset.attrs["Plot window metadata"] = json.dumps(
            readout_metadata["readout_channel_windows"]
        )
        shot_group = h5file.require_group("shot_channels")
        shot_group.attrs["Plot window metadata"] = json.dumps(
            readout_metadata["shot_channel_windows"]
        )
        vector_group = h5file.require_group("vector_channels")
        vector_group.attrs["Plot window metadata"] = json.dumps(
            readout_metadata["vector_channel_windows"]
        )

    emit_queue.put(
        {
            "event": f"experiment_{job_id}_metadata",
            "data": {
                "readout_metadata": {
                    "result_channels": readout_metadata["readout_channel_windows"],
                    "shot_channels": readout_metadata["shot_channel_windows"],
                    "vector_channels": readout_metadata["vector_channel_windows"],
                },
            },
        }
    )
write_experiment_data_by_job_id staticmethod
write_experiment_data_by_job_id(
    *, job_id: int, data_point: ExperimentDataPoint
) -> None

Append a complete data point to the HDF5 file and emit an event.

Writes scan parameters, result/shot/vector channels, and sequence JSON.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required
data_point ExperimentDataPoint

Data point payload to append.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
@staticmethod
def write_experiment_data_by_job_id(
    *,
    job_id: int,
    data_point: ExperimentDataPoint,
) -> None:
    """Append a complete data point to the HDF5 file and emit an event.

    Writes scan parameters, result/shot/vector channels, and sequence JSON.

    Args:
        job_id: Job identifier.
        data_point: Data point payload to append.
    """

    filename = get_filename_by_job_id(job_id)
    file = f"{get_config().data.results_dir}/{filename}"

    lock_path = (
        f"{get_config().data.results_dir}/.{filename}"
        f"{ExperimentDataRepository.LOCK_EXTENSION}"
    )
    with FileLock(lock_path), h5py.File(file, "a") as h5file:
        try:
            number_of_shots = cast("int", h5file.attrs["number_of_shots"])
            number_of_data_points = cast(
                "int", h5file.attrs["number_of_data_points"]
            )
        except KeyError:
            raise Exception(
                "Metadata does not contain relevant information. Please use "
                "ExperimentDataRepository.update_metadata_by_job_id first!"
            )

        write_scan_parameters_and_timestamp_to_dataset(
            h5file=h5file,
            data_point_index=data_point["index"],
            scan_params=data_point["scan_params"],
            timestamp=data_point["timestamp"],
            number_of_data_points=number_of_data_points,
        )

        write_results_to_dataset(
            h5file=h5file,
            data_point_index=data_point["index"],
            result_channels=data_point["result_channels"],
            number_of_data_points=number_of_data_points,
        )

        write_shot_channels_to_datasets(
            h5file=h5file,
            data_point_index=data_point["index"],
            shot_channels=data_point["shot_channels"],
            number_of_data_points=number_of_data_points,
            number_of_shots=number_of_shots,
        )

        write_vector_channels_to_datasets(
            h5file=h5file,
            data_point_index=data_point["index"],
            vector_channels=data_point["vector_channels"],
        )

        write_sequence_json_to_dataset(
            h5file=h5file,
            data_point_index=data_point["index"],
            sequence_json=data_point["sequence_json"],
        )

        if data_point["index"] >= number_of_data_points:
            h5file.attrs["number_of_data_points"] = data_point["index"] + 1

        logger.debug("Appended data to %s", file)

    emit_queue.put(
        {
            "event": f"experiment_{job_id}",
            "data": data_point,
        }
    )
write_parameter_update_by_job_id staticmethod
write_parameter_update_by_job_id(
    *,
    job_id: int,
    timestamp: str,
    parameter_values: dict[str, str | int | float | bool],
) -> None

Append parameter updates under the ‘parameters’ group.

Creates a dataset per parameter storing (timestamp, value) entries. Appends only when the value changed from the last entry.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required
timestamp str

ISO timestamp string.

required
parameter_values dict[str, str | int | float | bool]

Mapping of parameter id to value.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
@staticmethod
def write_parameter_update_by_job_id(
    *,
    job_id: int,
    timestamp: str,
    parameter_values: dict[str, str | int | float | bool],
) -> None:
    """Append parameter updates under the 'parameters' group.

    Creates a dataset per parameter storing (timestamp, value) entries.
    Appends only when the value changed from the last entry.

    Args:
        job_id: Job identifier.
        timestamp: ISO timestamp string.
        parameter_values: Mapping of parameter id to value.
    """

    filename = get_filename_by_job_id(job_id)
    file = f"{get_config().data.results_dir}/{filename}"
    lock_path = (
        f"{get_config().data.results_dir}/.{filename}"
        f"{ExperimentDataRepository.LOCK_EXTENSION}"
    )

    with FileLock(lock_path), h5py.File(file, "a") as h5file:
        parameters_group = h5file.require_group("parameters")

        for param_id, value in parameter_values.items():
            dtype = [("timestamp", "S26"), ("value", get_hdf5_dtype(value))]

            if param_id in parameters_group:
                ds = cast("h5py.Dataset", parameters_group[param_id])
                if ds.shape[0] > 0:
                    last_entry = ds[-1]
                    last_value = last_entry["value"]
                    if isinstance(value, str):
                        if last_value.decode() == value:
                            continue
                    elif last_value == value:
                        continue

                index = ds.shape[0]
                resize_dataset(ds, next_index=index, axis=0)
            else:
                ds = parameters_group.create_dataset(
                    param_id,
                    shape=(1,),
                    maxshape=(None,),
                    dtype=dtype,
                )
                index = 0

            ds[index] = (timestamp.encode(), value)

        logger.debug(
            "Wrote parameter update for job %d at %s",
            job_id,
            timestamp,
        )

PlotWindowMetadata

Bases: TypedDict

Metadata describing a single plot window for visualization in the frontend.

This metadata includes the plot’s index within its type, the type of plot (e.g., vector, histogram, or readout), and the list of channel names that are to be plotted in the respective window.

channel_names instance-attribute
channel_names: list[str]

A list of channel names to be plotted in this window

index instance-attribute
index: int

The order of the plot window within its type (e.g., 0, 1, 2…)

name instance-attribute
name: str

The name of the plot window

type instance-attribute
type: Literal['vector', 'histogram', 'readout']

The type of the plot window

PlotWindowsDict

Bases: TypedDict

Grouping of plot window metadata by channel type.

result_channels instance-attribute
result_channels: list[PlotWindowMetadata]

Plot window metadata for result channels.

shot_channels instance-attribute
shot_channels: list[PlotWindowMetadata]

Plot window metadata for shot channels.

vector_channels instance-attribute
vector_channels: list[PlotWindowMetadata]

Plot window metadata for vector channels.

ReadoutMetadata

Bases: TypedDict

Metadata describing readout/shot/vector channels and their plot windows.

readout_channel_names instance-attribute
readout_channel_names: list[str]

A list of all readout channel names

readout_channel_windows instance-attribute
readout_channel_windows: list[PlotWindowMetadata]

List of PlotWindowMetadata of result channels

shot_channel_names instance-attribute
shot_channel_names: list[str]

A list of all shot channel names

shot_channel_windows instance-attribute
shot_channel_windows: list[PlotWindowMetadata]

List of PlotWindowMetadata of shot channels

vector_channel_names instance-attribute
vector_channel_names: list[str]

A list of all vector channel names

vector_channel_windows instance-attribute
vector_channel_windows: list[PlotWindowMetadata]

List of PlotWindowMetadata of vector channels

ResultDict

Bases: TypedDict

Scalar/vector/shot readouts for a single data point.

result_channels instance-attribute
result_channels: dict[str, float]

Mapping from result channel name to scalar value.

shot_channels instance-attribute
shot_channels: dict[str, list[int]]

Mapping from shot channel name to per-shot integers.

vector_channels instance-attribute
vector_channels: dict[str, list[float]]

Mapping from vector channel name to list of floats.

get_filename_by_job_id

get_filename_by_job_id(job_id: int) -> str

Return the HDF5 filename for a job.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required

Returns:

Type Description
str

Filename derived from the job’s scheduled time (e.g., “.h5”).

Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
def get_filename_by_job_id(job_id: int) -> str:
    """Return the HDF5 filename for a job.

    Args:
        job_id: Job identifier.

    Returns:
        Filename derived from the job's scheduled time (e.g., "<iso>.h5").
    """

    scheduled_time = JobRunRepository.get_scheduled_time_by_job_id(job_id=job_id)
    return f"{scheduled_time}.h5"

resize_dataset

resize_dataset(
    dataset: Dataset, next_index: int, axis: int
) -> None

Resize a dataset to accommodate writing at a target index.

Parameters:

Name Type Description Default
dataset Dataset

HDF5 dataset to resize.

required
next_index int

Index that must be writable.

required
axis int

Axis along which to grow.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
def resize_dataset(dataset: h5py.Dataset, next_index: int, axis: int) -> None:
    """Resize a dataset to accommodate writing at a target index.

    Args:
        dataset: HDF5 dataset to resize.
        next_index: Index that must be writable.
        axis: Axis along which to grow.
    """

    dataset.resize(next_index + 1, axis)

write_results_to_dataset

write_results_to_dataset(
    h5file: File,
    data_point_index: int,
    result_channels: dict[str, float],
    number_of_data_points: int,
) -> None

Write scalar result channels into the ‘result_channels’ dataset.

Parameters:

Name Type Description Default
h5file File

Open HDF5 file handle.

required
data_point_index int

Index of the current data point.

required
result_channels dict[str, float]

Mapping of channel name to float value.

required
number_of_data_points int

Current total number of stored data points.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
def write_results_to_dataset(
    h5file: h5py.File,
    data_point_index: int,
    result_channels: dict[str, float],
    number_of_data_points: int,
) -> None:
    """Write scalar result channels into the 'result_channels' dataset.

    Args:
        h5file: Open HDF5 file handle.
        data_point_index: Index of the current data point.
        result_channels: Mapping of channel name to float value.
        number_of_data_points: Current total number of stored data points.
    """

    sorted_keys = sorted(result_channels)

    result_dataset = get_result_channels_dataset(
        h5file=h5file,
        result_channels=sorted_keys,
        number_of_data_points=number_of_data_points,
    )

    if set(result_dataset.dtype.names) != set(sorted_keys):
        raise RuntimeError(
            f"Result channels changed from {list(result_dataset.dtype.names)} to "
            f"{sorted_keys}"
        )

    if data_point_index >= number_of_data_points:
        resize_dataset(result_dataset, next_index=data_point_index, axis=0)

    result_dataset[data_point_index] = tuple(result_channels[k] for k in sorted_keys)

write_scan_parameters_and_timestamp_to_dataset

write_scan_parameters_and_timestamp_to_dataset(
    h5file: File,
    data_point_index: int,
    scan_params: dict[str, DatabaseValueType],
    timestamp: str,
    number_of_data_points: int,
) -> None

Write scan parameters and timestamp to the ‘scan_parameters’ dataset.

Parameters:

Name Type Description Default
h5file File

Open HDF5 file handle.

required
data_point_index int

Index of the current data point.

required
scan_params dict[str, DatabaseValueType]

Parameter values for this data point.

required
timestamp str

Acquisition timestamp (ISO string).

required
number_of_data_points int

Current total number of stored data points.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
def write_scan_parameters_and_timestamp_to_dataset(
    h5file: h5py.File,
    data_point_index: int,
    scan_params: dict[str, DatabaseValueType],
    timestamp: str,
    number_of_data_points: int,
) -> None:
    """Write scan parameters and timestamp to the 'scan_parameters' dataset.

    Args:
        h5file: Open HDF5 file handle.
        data_point_index: Index of the current data point.
        scan_params: Parameter values for this data point.
        timestamp: Acquisition timestamp (ISO string).
        number_of_data_points: Current total number of stored data points.
    """

    scan_parameter_dtype = [
        ("timestamp", "S26"),  # timestamps are strings of length 26
        *[(key, np.float64) for key in scan_params],
    ]
    scan_params_dataset = h5file.require_dataset(
        "scan_parameters",
        shape=(number_of_data_points, 1),
        maxshape=(None, 1),
        chunks=True,
        dtype=scan_parameter_dtype,
        compression="gzip",
        compression_opts=9,
    )

    if data_point_index >= number_of_data_points:
        resize_dataset(scan_params_dataset, next_index=data_point_index, axis=0)

    parameter_values = tuple(scan_params[key] for key in scan_params)
    scan_params_dataset[data_point_index] = (
        timestamp,
        *parameter_values,
    )

write_sequence_json_to_dataset

write_sequence_json_to_dataset(
    h5file: File, data_point_index: int, sequence_json: str
) -> None

Append sequence JSON if it changed since the last entry.

Parameters:

Name Type Description Default
h5file File

Open HDF5 file handle.

required
data_point_index int

Index of the current data point.

required
sequence_json str

Serialized sequence JSON to append.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
def write_sequence_json_to_dataset(
    h5file: h5py.File,
    data_point_index: int,
    sequence_json: str,
) -> None:
    """Append sequence JSON if it changed since the last entry.

    Args:
        h5file: Open HDF5 file handle.
        data_point_index: Index of the current data point.
        sequence_json: Serialized sequence JSON to append.
    """

    sequence_json_dtype = [
        ("index", np.int32),
        ("Sequence", h5py.string_dtype()),
    ]
    sequence_json_dataset = h5file.require_dataset(
        "sequence_json",
        shape=(0,),
        maxshape=(None,),
        chunks=True,
        dtype=sequence_json_dtype,
        compression="gzip",
        compression_opts=9,
    )

    index = sequence_json_dataset.shape[0]
    if index > 0:
        _, sequence_json_old = cast(
            "tuple[int, bytes]", sequence_json_dataset[index - 1]
        )
        if sequence_json_old.decode() == sequence_json:
            logger.debug("Sequence JSON didn't change.")
            return

    resize_dataset(sequence_json_dataset, next_index=index, axis=0)

    sequence_json_dataset[index] = (data_point_index, sequence_json)

write_shot_channels_to_datasets

write_shot_channels_to_datasets(
    h5file: File,
    data_point_index: int,
    shot_channels: dict[str, list[int]],
    number_of_data_points: int,
    number_of_shots: int,
) -> None

Write per-shot data into datasets under the ‘shot_channels’ group.

Parameters:

Name Type Description Default
h5file File

Open HDF5 file handle.

required
data_point_index int

Index of the current data point.

required
shot_channels dict[str, list[int]]

Mapping of channel to per-shot integers.

required
number_of_data_points int

Current total number of stored data points.

required
number_of_shots int

Expected number of shots per channel.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
def write_shot_channels_to_datasets(
    h5file: h5py.File,
    data_point_index: int,
    shot_channels: dict[str, list[int]],
    number_of_data_points: int,
    number_of_shots: int,
) -> None:
    """Write per-shot data into datasets under the 'shot_channels' group.

    Args:
        h5file: Open HDF5 file handle.
        data_point_index: Index of the current data point.
        shot_channels: Mapping of channel to per-shot integers.
        number_of_data_points: Current total number of stored data points.
        number_of_shots: Expected number of shots per channel.
    """

    shot_group = h5file.require_group("shot_channels")
    for key, value in shot_channels.items():
        shot_dataset = shot_group.require_dataset(
            key,
            shape=(number_of_data_points, number_of_shots),
            maxshape=(None, number_of_shots),
            chunks=True,
            dtype=np.float64,
            compression="gzip",
            compression_opts=9,
        )

        if data_point_index >= number_of_data_points:
            resize_dataset(shot_dataset, next_index=data_point_index, axis=0)
        shot_dataset[data_point_index] = value

write_vector_channels_to_datasets

write_vector_channels_to_datasets(
    h5file: File,
    data_point_index: int,
    vector_channels: dict[str, list[float]],
) -> None

Write vector channel data under the ‘vector_channels’ group.

Creates one dataset per channel per data point.

Parameters:

Name Type Description Default
h5file File

Open HDF5 file handle.

required
data_point_index int

Index of the current data point.

required
vector_channels dict[str, list[float]]

Mapping of channel to vector of floats.

required
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
def write_vector_channels_to_datasets(
    h5file: h5py.File,
    data_point_index: int,
    vector_channels: dict[str, list[float]],
) -> None:
    """Write vector channel data under the 'vector_channels' group.

    Creates one dataset per channel per data point.

    Args:
        h5file: Open HDF5 file handle.
        data_point_index: Index of the current data point.
        vector_channels: Mapping of channel to vector of floats.
    """

    vector_group = h5file.require_group("vector_channels")
    for channel_name, vector in vector_channels.items():
        channel_group = vector_group.require_group(channel_name)
        if str(data_point_index) not in channel_group:
            channel_group.create_dataset(
                str(data_point_index),
                data=vector,
                compression="gzip",
                compression_opts=9,
            )

experiment_source_repository

ExperimentSourceRepository

Repository for ExperimentSource entities.

Provides methods to query and persist experiment sources in the database. Encapsulates the SQLAlchemy session and query logic.

get_or_create_experiment staticmethod
get_or_create_experiment(
    *, experiment_source: ExperimentSource
) -> ExperimentSource

Return an existing experiment source or create it if not found.

Parameters:

Name Type Description Default
experiment_source ExperimentSource

The experiment source to look up by experiment_id. If no matching row exists, this instance is inserted into the database.

required

Returns:

Type Description
ExperimentSource

The existing or newly created experiment source.

Source code in src/icon/server/data_access/repositories/experiment_source_repository.py
@staticmethod
def get_or_create_experiment(
    *,
    experiment_source: ExperimentSource,
) -> ExperimentSource:
    """Return an existing experiment source or create it if not found.

    Args:
        experiment_source: The experiment source to look up by `experiment_id`. If
            no matching row exists, this instance is inserted into the database.

    Returns:
        The existing or newly created experiment source.
    """

    with sqlalchemy.orm.Session(engine) as session:
        experiment = (
            session.query(ExperimentSource)
            .filter_by(experiment_id=experiment_source.experiment_id)
            .first()
        )

        if not experiment:
            experiment = experiment_source
            session.add(experiment)
            session.commit()
            session.refresh(experiment)  # Refresh to get the ID
            logger.debug("Inserted new experiment %s", experiment)

    return experiment

job_repository

JobRepository

Repository for Job entities.

Encapsulates SQLAlchemy session/query logic and emits Socket.IO events on changes. All methods open their own session and return detached ORM objects.

get_job_by_experiment_source_and_status staticmethod
get_job_by_experiment_source_and_status(
    *,
    experiment_source_id: int,
    status: JobStatus | None = None,
) -> Sequence[Row[tuple[Job]]]

List jobs for an experiment source, optionally filtered by status.

Parameters:

Name Type Description Default
experiment_source_id int

Foreign key of the experiment source.

required
status JobStatus | None

Optional status filter.

None

Returns:

Type Description
Sequence[Row[tuple[Job]]]

Rows containing Job objects, ordered by priority then creation time.

Source code in src/icon/server/data_access/repositories/job_repository.py
@staticmethod
def get_job_by_experiment_source_and_status(
    *,
    experiment_source_id: int,
    status: JobStatus | None = None,
) -> Sequence[sqlalchemy.Row[tuple[Job]]]:
    """List jobs for an experiment source, optionally filtered by status.

    Args:
        experiment_source_id: Foreign key of the experiment source.
        status: Optional status filter.

    Returns:
        Rows containing `Job` objects, ordered by priority then creation time.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = select(Job).where(Job.experiment_source_id == experiment_source_id)

        if status:
            stmt = stmt.where(Job.status == status)

        stmt = stmt.options(
            sqlalchemy.orm.joinedload(Job.experiment_source)
        ).order_by(Job.priority.asc(), Job.created.asc())

        jobs = session.execute(stmt).all()
        logger.debug("Got jobs by experiment_source_id %s", experiment_source_id)
    return jobs
get_job_by_id staticmethod
get_job_by_id(
    *,
    job_id: int,
    load_experiment_source: bool = False,
    load_scan_parameters: bool = False,
) -> Job

Fetch a job by ID with optional eager-loading.

Parameters:

Name Type Description Default
job_id int

Job identifier.

required
load_experiment_source bool

If True, eager-load experiment_source.

False
load_scan_parameters bool

If True, eager-load scan_parameters.

False

Returns:

Type Description
Job

The requested job.

Source code in src/icon/server/data_access/repositories/job_repository.py
@staticmethod
def get_job_by_id(
    *,
    job_id: int,
    load_experiment_source: bool = False,
    load_scan_parameters: bool = False,
) -> Job:
    """Fetch a job by ID with optional eager-loading.

    Args:
        job_id: Job identifier.
        load_experiment_source: If True, eager-load `experiment_source`.
        load_scan_parameters: If True, eager-load `scan_parameters`.

    Returns:
        The requested job.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = select(Job).where(Job.id == job_id)

        if load_experiment_source:
            stmt = stmt.options(sqlalchemy.orm.joinedload(Job.experiment_source))
        if load_scan_parameters:
            stmt = stmt.options(sqlalchemy.orm.joinedload(Job.scan_parameters))

        return session.execute(stmt).unique().scalar_one()
get_jobs_by_status_and_timeframe staticmethod
get_jobs_by_status_and_timeframe(
    *,
    status: JobStatus | None = None,
    start: datetime | None = None,
    stop: datetime | None = None,
) -> Sequence[Job]

List jobs filtered by status and optional creation time window.

Parameters:

Name Type Description Default
status JobStatus | None

Optional status filter.

None
start datetime | None

Inclusive start timestamp.

None
stop datetime | None

Exclusive stop timestamp.

None

Returns:

Type Description
Sequence[Job]

Matching jobs ordered by priority then creation time.

Source code in src/icon/server/data_access/repositories/job_repository.py
@staticmethod
def get_jobs_by_status_and_timeframe(
    *,
    status: JobStatus | None = None,
    start: datetime.datetime | None = None,
    stop: datetime.datetime | None = None,
) -> Sequence[Job]:
    """List jobs filtered by status and optional creation time window.

    Args:
        status: Optional status filter.
        start: Inclusive start timestamp.
        stop: Exclusive stop timestamp.

    Returns:
        Matching jobs ordered by priority then creation time.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = (
            select(Job)
            .options(sqlalchemy.orm.joinedload(Job.experiment_source))
            .options(sqlalchemy.orm.joinedload(Job.scan_parameters))
            .order_by(Job.priority.asc())
            .order_by(Job.created.asc())
        )

        if status is not None:
            stmt = stmt.where(Job.status == status)
        if start is not None:
            stmt = stmt.where(Job.created >= start)
        if stop is not None:
            stmt = stmt.where(Job.created < stop)

        return session.execute(stmt).unique().scalars().all()
resubmit_job_by_id staticmethod
resubmit_job_by_id(*, job_id: int) -> Job

Clone an existing job as a new submission.

If the source job is not itself a resubmission, the new job’s parent_job_id is set to the original job’s id.

Parameters:

Name Type Description Default
job_id int

ID of the job to clone.

required

Returns:

Type Description
Job

The newly created job.

Source code in src/icon/server/data_access/repositories/job_repository.py
@staticmethod
def resubmit_job_by_id(*, job_id: int) -> Job:
    """Clone an existing job as a new submission.

    If the source job is not itself a resubmission, the new job's `parent_job_id` is
    set to the original job's id.

    Args:
        job_id: ID of the job to clone.

    Returns:
        The newly created job.
    """

    with sqlalchemy.orm.Session(engine) as session:
        job = session.execute(
            sqlalchemy.select(Job).where(Job.id == job_id)
        ).scalar_one()
        sqlalchemy.orm.make_transient(job)

        if not job.parent_job_id:
            job.parent_job_id = job.id

        # PK and created are set by DB on insert
        job.id = None  # type: ignore
        job.created = None  # type: ignore

        session.add(job)
        session.commit()
        session.refresh(job)
        session.expunge(job)

    emit_queue.put(
        {
            "event": "job.new",
            "data": {
                "job": SQLAlchemyDictEncoder.encode(
                    JobRepository.get_job_by_id(
                        job_id=job.id,
                        load_experiment_source=True,
                        load_scan_parameters=True,
                    )
                ),
            },
        }
    )

    return job
submit_job staticmethod
submit_job(*, job: Job) -> Job

Insert a new job and emit a creation event.

Parameters:

Name Type Description Default
job Job

The job instance to persist.

required

Returns:

Type Description
Job

The persisted job with generated fields populated.

Source code in src/icon/server/data_access/repositories/job_repository.py
@staticmethod
def submit_job(*, job: Job) -> Job:
    """Insert a new job and emit a creation event.

    Args:
        job: The job instance to persist.

    Returns:
        The persisted job with generated fields populated.
    """
    with sqlalchemy.orm.Session(engine) as session:
        session.add(job)
        session.commit()
        session.refresh(job)
        session.expunge(job)

        logger.debug("Submitted new job %s", job)

    emit_queue.put(
        {
            "event": "job.new",
            "data": {
                "job": SQLAlchemyDictEncoder.encode(
                    JobRepository.get_job_by_id(
                        job_id=job.id,
                        load_experiment_source=True,
                        load_scan_parameters=True,
                    )
                ),
            },
        }
    )

    return job
update_job_status staticmethod
update_job_status(*, job: Job, status: JobStatus) -> Job

Update a job’s status and emit an update event.

Parameters:

Name Type Description Default
job Job

Job to update (identified by its id).

required
status JobStatus

New job status.

required

Returns:

Type Description
Job

The updated job with relationships loaded.

Source code in src/icon/server/data_access/repositories/job_repository.py
@staticmethod
def update_job_status(*, job: Job, status: JobStatus) -> Job:
    """Update a job's status and emit an update event.

    Args:
        job: Job to update (identified by its `id`).
        status: New job status.

    Returns:
        The updated job with relationships loaded.
    """

    with sqlalchemy.orm.Session(engine) as session:
        session.execute(update(Job).where(Job.id == job.id).values(status=status))
        session.commit()

        job = (
            session.execute(
                select(Job)
                .where(Job.id == job.id)
                .options(
                    sqlalchemy.orm.joinedload(Job.experiment_source),
                    sqlalchemy.orm.joinedload(Job.scan_parameters),
                )
            )
            .unique()
            .scalar_one()
        )
        session.expunge(job)

        logger.debug("Updated job %s", job)

    emit_queue.put(
        {
            "event": "job.update",
            "data": {
                "job_id": job.id,
                "updated_properties": {"status": status.value},
            },
        }
    )

    return job

job_run_repository

JobRunRepository

Repository for JobRun entities.

Provides methods to insert, update, and query job runs from the database. Emits Socket.IO events when job runs are created or updated.

get_run_by_job_id staticmethod
get_run_by_job_id(
    *, job_id: int, load_job: bool = False
) -> JobRun

Return the run associated with a given job ID.

Parameters:

Name Type Description Default
job_id int

ID of the job.

required
load_job bool

If True, eagerly load the related Job.

False

Returns:

Type Description
JobRun

The run linked to the given job.

Source code in src/icon/server/data_access/repositories/job_run_repository.py
@staticmethod
def get_run_by_job_id(*, job_id: int, load_job: bool = False) -> JobRun:
    """Return the run associated with a given job ID.

    Args:
        job_id: ID of the job.
        load_job: If True, eagerly load the related `Job`.

    Returns:
        The run linked to the given job.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = select(JobRun).where(JobRun.job_id == job_id)

        if load_job:
            stmt = stmt.options(sqlalchemy.orm.joinedload(JobRun.job))

        run = session.execute(stmt).scalar_one()
        logger.debug("Got JobRun by job_id %s", job_id)
    return run
get_runs_by_status staticmethod
get_runs_by_status(
    *,
    status: JobRunStatus | list[JobRunStatus],
    load_job: bool = False,
) -> Sequence[JobRun]

Return job runs filtered by status.

Parameters:

Name Type Description Default
status JobRunStatus | list[JobRunStatus]

Single or list of run statuses to filter on.

required
load_job bool

If True, eagerly load the related Job.

False

Returns:

Type Description
Sequence[JobRun]

All matching runs.

Source code in src/icon/server/data_access/repositories/job_run_repository.py
@staticmethod
def get_runs_by_status(
    *,
    status: JobRunStatus | list[JobRunStatus],
    load_job: bool = False,
) -> Sequence[JobRun]:
    """Return job runs filtered by status.

    Args:
        status: Single or list of run statuses to filter on.
        load_job: If True, eagerly load the related `Job`.

    Returns:
        All matching runs.
    """

    if not isinstance(status, list):
        status = [status]

    with sqlalchemy.orm.Session(engine) as session:
        stmt = (
            select(JobRun)
            .where(JobRun.status.in_(status))
            .order_by(JobRun.scheduled_time.asc())
        )

        if load_job:
            stmt = stmt.options(sqlalchemy.orm.joinedload(JobRun.job))

        return session.execute(stmt).scalars().all()
get_scheduled_time_by_job_id staticmethod
get_scheduled_time_by_job_id(*, job_id: int) -> datetime

Return the scheduled time of a run by job ID.

Parameters:

Name Type Description Default
job_id int

ID of the job.

required

Returns:

Type Description
datetime

The scheduled start time of the run.

Source code in src/icon/server/data_access/repositories/job_run_repository.py
@staticmethod
def get_scheduled_time_by_job_id(*, job_id: int) -> datetime:
    """Return the scheduled time of a run by job ID.

    Args:
        job_id: ID of the job.

    Returns:
        The scheduled start time of the run.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = select(JobRun.scheduled_time).where(JobRun.job_id == job_id)

        scheduled_time = session.execute(stmt).scalar_one()
        logger.debug("Got scheduled time for job_id %s", job_id)
    return scheduled_time
insert_run staticmethod
insert_run(*, run: JobRun) -> JobRun

Insert a new job run and emit a creation event.

Parameters:

Name Type Description Default
run JobRun

The job run instance to persist.

required

Returns:

Type Description
JobRun

The persisted job run with generated fields populated.

Source code in src/icon/server/data_access/repositories/job_run_repository.py
@staticmethod
def insert_run(*, run: JobRun) -> JobRun:
    """Insert a new job run and emit a creation event.

    Args:
        run: The job run instance to persist.

    Returns:
        The persisted job run with generated fields populated.
    """

    with sqlalchemy.orm.Session(engine) as session:
        session.add(run)
        session.commit()
        session.refresh(run)
        logger.debug("Created new run %s", run)

    emit_queue.put(
        {
            "event": "job_run.new",
            "data": {
                "job_run": SQLAlchemyDictEncoder.encode(obj=run),
            },
        }
    )

    return run
update_run_by_id staticmethod
update_run_by_id(
    *,
    run_id: int,
    status: JobRunStatus,
    log: str | None = None,
) -> JobRun

Update a job run by ID and emit an update event.

Parameters:

Name Type Description Default
run_id int

The ID of the job run to update.

required
status JobRunStatus

New status of the run.

required
log str | None

Optional log message (e.g. failure reason).

None

Returns:

Type Description
JobRun

The updated job run.

Source code in src/icon/server/data_access/repositories/job_run_repository.py
@staticmethod
def update_run_by_id(
    *,
    run_id: int,
    status: JobRunStatus,
    log: str | None = None,
) -> JobRun:
    """Update a job run by ID and emit an update event.

    Args:
        run_id: The ID of the job run to update.
        status: New status of the run.
        log: Optional log message (e.g. failure reason).

    Returns:
        The updated job run.
    """

    with sqlalchemy.orm.Session(engine) as session:
        stmt = (
            update(JobRun)
            .where(JobRun.id == run_id)
            .values(status=status, log=log)
            .returning(JobRun)
        )
        run = session.execute(stmt).scalar_one()
        session.commit()

        logger.debug("Updated run %s", run)

    emit_queue.put(
        {
            "event": "job_run.update",
            "data": {
                "run_id": run_id,
                "updated_properties": {
                    "status": status.value,
                    "log": log,
                },
            },
        }
    )

    return run

job_run_cancelled_or_failed

job_run_cancelled_or_failed(job_id: int) -> bool

Check if a job’s run was cancelled or failed.

Parameters:

Name Type Description Default
job_id int

ID of the job whose run should be checked.

required

Returns:

Type Description
bool

True if the run status is CANCELLED or FAILED, False otherwise.

Source code in src/icon/server/data_access/repositories/job_run_repository.py
def job_run_cancelled_or_failed(job_id: int) -> bool:
    """Check if a job's run was cancelled or failed.

    Args:
        job_id: ID of the job whose run should be checked.

    Returns:
        True if the run status is CANCELLED or FAILED, False otherwise.
    """

    job_run = JobRunRepository.get_run_by_job_id(job_id=job_id)
    if job_run.status in (JobRunStatus.CANCELLED, JobRunStatus.FAILED):
        logger.info(
            "JobRun with id %s %s.",
            job_run.id,
            job_run.status.value,
        )
        return True
    return False

parameters_repository

NotInitialisedError

Bases: Exception

Raised when repository methods are called before initialization.

ParametersRepository

Repository for parameter values and metadata.

Provides methods to read and update shared parameter state (via a multiprocessing.Manager dict) and to persist/retrieve parameters from InfluxDB. Emits Socket.IO events on updates.

get_influxdb_parameter_by_id staticmethod
get_influxdb_parameter_by_id(
    parameter_id: str,
) -> DatabaseValueType | None

Return a single parameter value from InfluxDB.

Parameters:

Name Type Description Default
parameter_id str

ID of the parameter.

required

Returns:

Type Description
DatabaseValueType | None

The parameter value, or None if not found.

Source code in src/icon/server/data_access/repositories/parameters_repository.py
@staticmethod
def get_influxdb_parameter_by_id(parameter_id: str) -> DatabaseValueType | None:
    """Return a single parameter value from InfluxDB.

    Args:
        parameter_id: ID of the parameter.

    Returns:
        The parameter value, or None if not found.
    """

    with InfluxDBv1Session() as influxdb:
        result_dict = influxdb.query(
            measurement=get_config().databases.influxdbv1.measurement,
            field=parameter_id,
        )
        if result_dict is None:
            logger.error(
                "Could not find parameter with id %s in database %s",
                parameter_id,
                get_config().databases.influxdbv1.measurement,
            )
            return None
        return result_dict[parameter_id]
get_influxdb_parameter_keys staticmethod
get_influxdb_parameter_keys() -> list[str]

Return all parameter field keys from InfluxDB v1.

Source code in src/icon/server/data_access/repositories/parameters_repository.py
@staticmethod
def get_influxdb_parameter_keys() -> list[str]:
    """Return all parameter field keys from InfluxDB v1."""

    with InfluxDBv1Session() as influxdbv1:
        return influxdbv1.get_field_keys(
            get_config().databases.influxdbv1.measurement
        )
get_influxdb_parameters staticmethod
get_influxdb_parameters(
    *,
    before: str | None = None,
    namespace: str | None = None,
) -> dict[str, DatabaseValueType]

Return the latest parameter values from InfluxDB.

Parameters:

Name Type Description Default
before str | None

Optional ISO timestamp to query parameters before.

None
namespace str | None

Optional namespace filter.

None

Returns:

Type Description
dict[str, DatabaseValueType]

Mapping of parameter IDs to values.

Source code in src/icon/server/data_access/repositories/parameters_repository.py
@staticmethod
def get_influxdb_parameters(
    *, before: str | None = None, namespace: str | None = None
) -> dict[str, DatabaseValueType]:
    """Return the latest parameter values from InfluxDB.

    Args:
        before: Optional ISO timestamp to query parameters before.
        namespace: Optional namespace filter.

    Returns:
        Mapping of parameter IDs to values.
    """

    with InfluxDBv1Session() as influxdbv1:
        return influxdbv1.query_last(
            get_config().databases.influxdbv1.measurement,
            before=before,
            namespace=namespace,
        )
get_shared_parameter_by_id classmethod
get_shared_parameter_by_id(
    *, parameter_id: str
) -> DatabaseValueType | None

Return a single parameter value from shared state.

Parameters:

Name Type Description Default
parameter_id str

ID of the parameter.

required

Returns:

Type Description
DatabaseValueType | None

The parameter value, or None if not set.

Source code in src/icon/server/data_access/repositories/parameters_repository.py
@classmethod
def get_shared_parameter_by_id(
    cls,
    *,
    parameter_id: str,
) -> DatabaseValueType | None:
    """Return a single parameter value from shared state.

    Args:
        parameter_id: ID of the parameter.

    Returns:
        The parameter value, or None if not set.
    """

    cls._check_initialised()

    return cls._shared_parameters.get(parameter_id, None)
get_shared_parameters classmethod
get_shared_parameters() -> DictProxy[
    str, DatabaseValueType
]

Return the full shared parameter dictionary.

Returns:

Type Description
DictProxy[str, DatabaseValueType]

Proxy dictionary of parameters.

Source code in src/icon/server/data_access/repositories/parameters_repository.py
@classmethod
def get_shared_parameters(cls) -> DictProxy[str, DatabaseValueType]:
    """Return the full shared parameter dictionary.

    Returns:
        Proxy dictionary of parameters.
    """

    cls._check_initialised()

    return cls._shared_parameters
initialize classmethod
initialize(
    *, shared_parameters: DictProxy[str, DatabaseValueType]
) -> None

Initialize the repository with a shared parameters dict.

Parameters:

Name Type Description Default
shared_parameters DictProxy[str, DatabaseValueType]

Proxy dictionary used to store shared state.

required
Source code in src/icon/server/data_access/repositories/parameters_repository.py
@classmethod
def initialize(
    cls, *, shared_parameters: DictProxy[str, DatabaseValueType]
) -> None:
    """Initialize the repository with a shared parameters dict.

    Args:
        shared_parameters: Proxy dictionary used to store shared state.
    """

    cls._shared_parameters = shared_parameters
    cls.initialised = True
update_parameters classmethod
update_parameters(
    *, parameter_mapping: dict[str, DatabaseValueType]
) -> None

Update parameters in both shared state and InfluxDB.

Parameters:

Name Type Description Default
parameter_mapping dict[str, DatabaseValueType]

Mapping of parameter IDs to values.

required
Source code in src/icon/server/data_access/repositories/parameters_repository.py
@classmethod
def update_parameters(
    cls,
    *,
    parameter_mapping: dict[str, DatabaseValueType],
) -> None:
    """Update parameters in both shared state and InfluxDB.

    Args:
        parameter_mapping: Mapping of parameter IDs to values.
    """

    for key, value in parameter_mapping.items():
        if (
            isinstance(value, int)
            and not isinstance(value, bool)
            and "ParameterTypes.INT" not in key
        ):
            parameter_mapping[key] = float(value)

    cls._update_shared_parameters(parameter_mapping=parameter_mapping)
    cls._update_influxdb_parameters(parameter_mapping=parameter_mapping)

get_specifiers_from_parameter_identifier

get_specifiers_from_parameter_identifier(
    parameter_identifier: str,
) -> dict[str, str]

Extract specifiers from a parameter identifier string.

Parameter identifiers encode metadata as key='value' pairs. This helper parses them into a dictionary.

Parameters:

Name Type Description Default
parameter_identifier str

Identifier string to parse.

required

Returns:

Type Description
dict[str, str]

Mapping of specifier keys to values.

Source code in src/icon/server/data_access/repositories/parameters_repository.py
def get_specifiers_from_parameter_identifier(
    parameter_identifier: str,
) -> dict[str, str]:
    """Extract specifiers from a parameter identifier string.

    Parameter identifiers encode metadata as `key='value'` pairs. This helper parses
    them into a dictionary.

    Args:
        parameter_identifier: Identifier string to parse.

    Returns:
        Mapping of specifier keys to values.
    """

    pattern = re.compile(r"(\w+)='([^']*)'")
    matches = pattern.findall(parameter_identifier)

    return dict(matches)

pycrystal_library_repository

ParameterMetadataDict module-attribute

ParameterMetadataDict = TypedDict(
    "ParameterMetadataDict",
    {
        "all parameters": dict[str, ParameterMetadata],
        "display groups": dict[
            str, dict[str, ParameterMetadata]
        ],
    },
)

Dictionary of parameter metadata.

ParameterAndExperimentMetadata

Bases: TypedDict

Combined metadata for experiments and parameters.

experiment_metadata instance-attribute
experiment_metadata: ExperimentDict

Dictionary mapping the unique experiment identifier to its metadata.

parameter_metadata instance-attribute
parameter_metadata: ParameterMetadataDict

Dictionary of parameter metadata.

PycrystalLibraryRepository

Repository for interacting with the pycrystal experiment library.

Provides methods to fetch experiment and parameter metadata and to generate sequences by executing helper scripts inside the experiment library’s virtual environment.

generate_json_sequence async staticmethod
generate_json_sequence(
    *,
    exp_module_name: str,
    exp_instance_name: str,
    parameter_dict: dict[str, DatabaseValueType],
) -> str

Generate a JSON sequence for an experiment.

Parameters:

Name Type Description Default
exp_module_name str

Module name of the experiment.

required
exp_instance_name str

Name of the experiment instance.

required
parameter_dict dict[str, DatabaseValueType]

Mapping of parameter IDs to values.

required

Returns:

Type Description
str

JSON string containing the generated sequence.

Source code in src/icon/server/data_access/repositories/pycrystal_library_repository.py
@staticmethod
async def generate_json_sequence(
    *,
    exp_module_name: str,
    exp_instance_name: str,
    parameter_dict: dict[str, DatabaseValueType],
) -> str:
    """Generate a JSON sequence for an experiment.

    Args:
        exp_module_name: Module name of the experiment.
        exp_instance_name: Name of the experiment instance.
        parameter_dict: Mapping of parameter IDs to values.

    Returns:
        JSON string containing the generated sequence.
    """

    template_vars = {
        "key_val_dict": parameter_dict,
        "module_name": exp_module_name,
        "exp_instance_name": exp_instance_name,
    }

    code = PycrystalLibraryRepository._get_code(
        Path(__file__).parent.parent / "templates/generate_pycrystal_sequence.py",
        **template_vars,
    )
    return await PycrystalLibraryRepository._run_code(code)
get_experiment_and_parameter_metadata async staticmethod
get_experiment_and_parameter_metadata() -> (
    ParameterAndExperimentMetadata
)

Fetch the experiment and parameter metadata.

Returns:

Type Description
ParameterAndExperimentMetadata

Dictionary with experiment metadata and parameter metadata.

Source code in src/icon/server/data_access/repositories/pycrystal_library_repository.py
@staticmethod
async def get_experiment_and_parameter_metadata() -> ParameterAndExperimentMetadata:
    """Fetch the experiment and parameter metadata.

    Returns:
        Dictionary with experiment metadata and parameter metadata.
    """

    code = PycrystalLibraryRepository._get_code(
        Path(__file__).parent.parent
        / "templates/get_pycrystal_experiment_and_parameter_metadata.py"
    )
    stdout = await PycrystalLibraryRepository._run_code(code)
    return json.loads(stdout)
get_experiment_readout_metadata async staticmethod
get_experiment_readout_metadata(
    *,
    exp_module_name: str,
    exp_instance_name: str,
    parameter_dict: dict[str, DatabaseValueType],
) -> ReadoutMetadata

Fetch readout metadata for an experiment.

Parameters:

Name Type Description Default
exp_module_name str

Module name of the experiment.

required
exp_instance_name str

Name of the experiment instance.

required
parameter_dict dict[str, DatabaseValueType]

Mapping of parameter IDs to values.

required

Returns:

Type Description
ReadoutMetadata

Dictionary containing readout metadata for the experiment.

Source code in src/icon/server/data_access/repositories/pycrystal_library_repository.py
@staticmethod
async def get_experiment_readout_metadata(
    *,
    exp_module_name: str,
    exp_instance_name: str,
    parameter_dict: dict[str, DatabaseValueType],
) -> ReadoutMetadata:
    """Fetch readout metadata for an experiment.

    Args:
        exp_module_name: Module name of the experiment.
        exp_instance_name: Name of the experiment instance.
        parameter_dict: Mapping of parameter IDs to values.

    Returns:
        Dictionary containing readout metadata for the experiment.
    """

    template_vars = {
        "key_val_dict": parameter_dict,
        "module_name": exp_module_name,
        "exp_instance_name": exp_instance_name,
    }

    code = PycrystalLibraryRepository._get_code(
        Path(__file__).parent.parent
        / "templates/get_experiment_readout_windows.py",
        **template_vars,
    )
    stdout = await PycrystalLibraryRepository._run_code(code)
    return json.loads(stdout)

icon.server.hardware_processing

Modules:

Name Description
hardware_controller
task
worker

hardware_controller

Classes:

Name Description
HardwareController

Attributes:

Name Type Description
logger

logger module-attribute

logger = getLogger(__name__)

HardwareController

HardwareController(connect: bool = True)

Methods:

Name Description
connect
run

Attributes:

Name Type Description
connected bool
Source code in src/icon/server/hardware_processing/hardware_controller.py
def __init__(self, connect: bool = True) -> None:
    self._host = get_config().hardware.host
    self._port = get_config().hardware.port
    self._zedboard: tiqi_zedboard.zedboard.Zedboard | None = None
    if connect:
        self.connect()
connected property
connected: bool
connect
connect() -> None
Source code in src/icon/server/hardware_processing/hardware_controller.py
def connect(self) -> None:
    logger.info("Connecting to the Zedboard")
    self._host = get_config().hardware.host
    self._port = get_config().hardware.port
    self._zedboard = tiqi_zedboard.zedboard.Zedboard(
        hostname=self._host, port=self._port
    )
    if not self.connected:
        logger.warning("Failed to connect to the Zedboard")
run
run(*, sequence: str, number_of_shots: int) -> ResultDict
Source code in src/icon/server/hardware_processing/hardware_controller.py
def run(self, *, sequence: str, number_of_shots: int) -> ResultDict:
    if not self.connected:
        self.connect()

    if not self.connected:
        raise RuntimeError("Could not connect to the Zedboard")

    self._update_zedboard_sequence(sequence=sequence)
    self._update_number_of_shots(number_of_shots=number_of_shots)
    self._zedboard.sequence_JSON_parser.Parse_JSON_Header()  # type: ignore
    results: tiqi_zedboard.zedboard.Result = self._zedboard.sequence_JSON_parser()  # type: ignore

    return {
        "result_channels": results.result_channels,
        "vector_channels": results.vector_channels
        if results.vector_channels is not None
        else {},
        "shot_channels": results.shot_channels,
    }

task

Classes:

Name Description
HardwareProcessingTask

HardwareProcessingTask

Bases: BaseModel

Methods:

Name Description
__lt__

Attributes:

Name Type Description
created datetime
data_point_index int
data_points_to_process Queue[tuple[int, dict[str, DatabaseValueType]]]
global_parameter_timestamp datetime
model_config
pre_processing_task PreProcessingTask
priority int
processed_data_points Queue[HardwareProcessingTask]
scanned_params dict[str, DatabaseValueType]
sequence_json str
src_dir str
created instance-attribute
created: datetime
data_point_index instance-attribute
data_point_index: int
data_points_to_process instance-attribute
data_points_to_process: Queue[
    tuple[int, dict[str, DatabaseValueType]]
]
global_parameter_timestamp instance-attribute
global_parameter_timestamp: datetime
model_config class-attribute instance-attribute
model_config = ConfigDict(arbitrary_types_allowed=True)
pre_processing_task instance-attribute
pre_processing_task: PreProcessingTask
priority instance-attribute
priority: int
processed_data_points instance-attribute
processed_data_points: Queue[HardwareProcessingTask]
scanned_params instance-attribute
scanned_params: dict[str, DatabaseValueType]
sequence_json instance-attribute
sequence_json: str
src_dir instance-attribute
src_dir: str
__lt__
__lt__(other: HardwareProcessingTask) -> bool
Source code in src/icon/server/hardware_processing/task.py
def __lt__(self, other: HardwareProcessingTask) -> bool:
    return self.priority < other.priority or self.created < other.created

worker

Classes:

Name Description
HardwareProcessingWorker

Functions:

Name Description
parse_parameter_id

Parses a parameter ID string into a device name and variable ID.

Attributes:

Name Type Description
logger
timezone

logger module-attribute

logger = getLogger(__name__)

timezone module-attribute

timezone = timezone(timezone)

HardwareProcessingWorker

HardwareProcessingWorker(
    hardware_processing_queue: PriorityQueue[
        HardwareProcessingTask
    ],
    post_processing_queue: PriorityQueue[
        PostProcessingTask
    ],
    manager: SharedResourceManager,
)

Bases: Process

Methods:

Name Description
run
Source code in src/icon/server/hardware_processing/worker.py
def __init__(
    self,
    hardware_processing_queue: queue.PriorityQueue[HardwareProcessingTask],
    post_processing_queue: queue.PriorityQueue[PostProcessingTask],
    manager: SharedResourceManager,
) -> None:
    super().__init__()
    self._queue = hardware_processing_queue
    self._post_processing_queue = post_processing_queue
    self._manager = manager
    self._pydase_clients: dict[str, pydase.Client] = {}

    self._hardware_controller = HardwareController()
run
run() -> None
Source code in src/icon/server/hardware_processing/worker.py
def run(self) -> None:
    self._pydase_clients = {
        device.name: pydase.Client(
            url=device.url, block_until_connected=False, auto_update_proxy=False
        )
        for device in DeviceRepository.get_devices_by_status(
            status=DeviceStatus.ENABLED
        )
    }

    while True:
        task = self._queue.get()

        if job_run_cancelled_or_failed(
            job_id=task.pre_processing_task.job.id,
        ):
            continue

        try:
            self._set_pydase_service_values(scanned_params=task.scanned_params)

            timestamp = datetime.now(timezone)
            result = self._hardware_controller.run(
                sequence=task.sequence_json,
                number_of_shots=task.pre_processing_task.job.number_of_shots,
            )

            experiment_data_point: ExperimentDataPoint = {
                "index": task.data_point_index,
                "scan_params": task.scanned_params,
                "result_channels": result["result_channels"],
                "shot_channels": result["shot_channels"],
                "vector_channels": result["vector_channels"],
                "timestamp": timestamp.isoformat(),
                "sequence_json": task.sequence_json,
            }

            post_processing_task = PostProcessingTask(
                priority=task.priority,
                pre_processing_task=task.pre_processing_task,
                data_point=experiment_data_point,
                src_dir=task.src_dir,
                created=task.created,
            )

            self._post_processing_queue.put(post_processing_task)
        except Exception as e:
            logger.exception(e)
            JobRunRepository.update_run_by_id(
                run_id=task.pre_processing_task.job_run.id,
                status=JobRunStatus.FAILED,
                log=str(e),
            )
        finally:
            task.processed_data_points.put(task)

parse_parameter_id

parse_parameter_id(param_id: str) -> tuple[str | None, str]

Parses a parameter ID string into a device name and variable ID.

If the input string is in the format “Device(device_name) variable_id”, the device name and variable ID are returned as a tuple.

Parameters:

Name Type Description Default
param_id str

The parameter identifier string.

required

Returns:

Type Description
str | None

A tuple (device_name, variable_id). If the input does not match the expected

str

format, device_name is None and the entire param_id is returned as the

tuple[str | None, str]

variable_id.

Examples:

>>> parse_parameter_id("Device(my_device) my_param")
('my_device', 'my_param')
>>> parse_parameter_id("bare_param")
(None, 'bare_param')
Source code in src/icon/server/hardware_processing/worker.py
def parse_parameter_id(param_id: str) -> tuple[str | None, str]:
    """Parses a parameter ID string into a device name and variable ID.

    If the input string is in the format "Device(device_name) variable_id",
    the device name and variable ID are returned as a tuple.

    Parameters:
        param_id: The parameter identifier string.

    Returns:
        A tuple (device_name, variable_id). If the input does not match the expected
        format, device_name is None and the entire param_id is returned as the
        variable_id.

    Examples:
        >>> parse_parameter_id("Device(my_device) my_param")
        ('my_device', 'my_param')

        >>> parse_parameter_id("bare_param")
        (None, 'bare_param')
    """

    match = re.match(r"^Device\(([^)]+)\) (.*)$", param_id)
    if match:
        return match[1], match[2]
    return None, param_id

icon.server.post_processing

Modules:

Name Description
task
worker

task

Classes:

Name Description
PostProcessingTask

PostProcessingTask

Bases: BaseModel

Methods:

Name Description
__lt__

Attributes:

Name Type Description
created datetime
data_point ExperimentDataPoint
pre_processing_task PreProcessingTask
priority int
src_dir str
created instance-attribute
created: datetime
data_point instance-attribute
data_point: ExperimentDataPoint
pre_processing_task instance-attribute
pre_processing_task: PreProcessingTask
priority instance-attribute
priority: int
src_dir instance-attribute
src_dir: str
__lt__
__lt__(other: PostProcessingTask) -> bool
Source code in src/icon/server/post_processing/task.py
def __lt__(self, other: PostProcessingTask) -> bool:
    return self.priority < other.priority

worker

Classes:

Name Description
PostProcessingWorker

Attributes:

Name Type Description
logger

logger module-attribute

logger = getLogger(__name__)

PostProcessingWorker

PostProcessingWorker(
    post_processing_queue: PriorityQueue[
        PostProcessingTask
    ],
)

Bases: Process

Methods:

Name Description
run
Source code in src/icon/server/post_processing/worker.py
def __init__(
    self,
    post_processing_queue: queue.PriorityQueue[PostProcessingTask],
) -> None:
    super().__init__()
    self._post_processing_queue = post_processing_queue
run
run() -> None
Source code in src/icon/server/post_processing/worker.py
def run(self) -> None:
    logger.info("Pre-processing worker started")

    while True:
        task = self._post_processing_queue.get()

        if job_run_cancelled_or_failed(
            job_id=task.pre_processing_task.job.id,
        ):
            continue

        ExperimentDataRepository.write_experiment_data_by_job_id(
            job_id=task.pre_processing_task.job.id,
            data_point=task.data_point,
        )

icon.server.pre_processing

Modules:

Name Description
task
worker

task

Classes:

Name Description
PreProcessingTask

PreProcessingTask

Bases: BaseModel

Methods:

Name Description
__lt__

Attributes:

Name Type Description
auto_calibration bool
debug_mode bool
git_commit_hash str | None
job Job
job_run JobRun
local_parameters_timestamp str
model_config
priority int
repetitions int
scan_parameters list[ScanParameter]
auto_calibration instance-attribute
auto_calibration: bool
debug_mode class-attribute instance-attribute
debug_mode: bool = False
git_commit_hash class-attribute instance-attribute
git_commit_hash: str | None = None
job instance-attribute
job: Job
job_run instance-attribute
job_run: JobRun
local_parameters_timestamp instance-attribute
local_parameters_timestamp: str
model_config class-attribute instance-attribute
model_config = ConfigDict(arbitrary_types_allowed=True)
priority class-attribute instance-attribute
priority: int = Field(ge=0, le=20)
repetitions class-attribute instance-attribute
repetitions: int = 1
scan_parameters instance-attribute
scan_parameters: list[ScanParameter]
__lt__
__lt__(other: PreProcessingTask) -> bool
Source code in src/icon/server/pre_processing/task.py
def __lt__(self, other: "PreProcessingTask") -> bool:
    return self.priority < other.priority

worker

Classes:

Name Description
ParamUpdateMode
PreProcessingWorker

Functions:

Name Description
change_process_priority

Changes process priority. Only superusers can decrease the niceness of a

get_scan_combinations

Generates all combinations of scan parameters for a given job. Repeats each

parse_experiment_identifier

Parses an experiment identifier and returns:

prepare_experiment_library_folder

Attributes:

Name Type Description
logger
timezone

logger module-attribute

logger = getLogger(__name__)

timezone module-attribute

timezone = timezone(timezone)

ParamUpdateMode

Bases: str, Enum

Attributes:

Name Type Description
ALL_FROM_TIMESTAMP
ALL_UP_TO_DATE
LOCALS_FROM_TS_GLOBALS_LATEST
ONLY_NEW_PARAMETERS
ALL_FROM_TIMESTAMP class-attribute instance-attribute
ALL_FROM_TIMESTAMP = 'all_from_timestamp'
ALL_UP_TO_DATE class-attribute instance-attribute
ALL_UP_TO_DATE = 'all_up_to_date'
LOCALS_FROM_TS_GLOBALS_LATEST class-attribute instance-attribute
LOCALS_FROM_TS_GLOBALS_LATEST = 'locals_ts_globals_now'
ONLY_NEW_PARAMETERS class-attribute instance-attribute
ONLY_NEW_PARAMETERS = 'only_new_parameters'

PreProcessingWorker

PreProcessingWorker(
    worker_number: int,
    pre_processing_queue: PriorityQueue[PreProcessingTask],
    update_queue: Queue[UpdateQueue],
    hardware_processing_queue: PriorityQueue[
        HardwareProcessingTask
    ],
    manager: SharedResourceManager,
)

Bases: Process

Methods:

Name Description
run
Source code in src/icon/server/pre_processing/worker.py
def __init__(
    self,
    worker_number: int,
    pre_processing_queue: queue.PriorityQueue[PreProcessingTask],
    update_queue: multiprocessing.Queue[UpdateQueue],
    hardware_processing_queue: queue.PriorityQueue[HardwareProcessingTask],
    manager: SharedResourceManager,
) -> None:
    super().__init__()
    self._queue = pre_processing_queue
    self._update_queue = update_queue
    self._hw_processing_queue = hardware_processing_queue
    self._worker_number = worker_number
    self._manager = manager
    self._data_points_to_process: queue.Queue[
        tuple[int, dict[str, DatabaseValueType]]
    ]
    self._processed_data_points: queue.Queue[HardwareProcessingTask]
    self._pre_processing_task: PreProcessingTask
    self._src_dir: str
    self._exp_module_name: str
    self._exp_class_name: str
    self._exp_instance_name: str
    self._scan_parameter_value_combinations: list[dict[str, DatabaseValueType]]
    self._parameter_dict: dict[str, DatabaseValueType] = {}
    self._tmp_dir: str
run
run() -> None
Source code in src/icon/server/pre_processing/worker.py
def run(self) -> None:
    with tempfile.TemporaryDirectory() as tmp_dir:
        self._tmp_dir = tmp_dir
        logger.debug("Created temporary directory %s", tmp_dir)

        while True:
            self._pre_processing_task = self._queue.get()

            self._data_points_to_process = self._manager.Queue()
            self._processed_data_points = self._manager.Queue()

            try:
                self._process_task()

                logger.info(
                    "JobRun with id '%s' finished",
                    self._pre_processing_task.job_run.id,
                )

                if (
                    JobRunRepository.get_run_by_job_id(
                        job_id=self._pre_processing_task.job.id
                    ).status
                    == JobRunStatus.PROCESSING
                ):
                    JobRunRepository.update_run_by_id(
                        run_id=self._pre_processing_task.job_run.id,
                        status=JobRunStatus.DONE,
                    )
            except Exception as e:
                logger.exception(
                    "JobRun with id '%s' failed with error: %s",
                    self._pre_processing_task.job_run.id,
                    e,
                )

                if (
                    JobRunRepository.get_run_by_job_id(
                        job_id=self._pre_processing_task.job.id
                    ).status
                    == JobRunStatus.PROCESSING
                ):
                    JobRunRepository.update_run_by_id(
                        run_id=self._pre_processing_task.job_run.id,
                        status=JobRunStatus.FAILED,
                        log=str(e),
                    )
            finally:
                JobRepository.update_job_status(
                    job=self._pre_processing_task.job, status=JobStatus.PROCESSED
                )

change_process_priority

change_process_priority(priority: int) -> None

Changes process priority. Only superusers can decrease the niceness of a process.

Source code in src/icon/server/pre_processing/worker.py
def change_process_priority(priority: int) -> None:
    """Changes process priority. Only superusers can decrease the niceness of a
    process."""

    if os.getuid() == 0:
        p = psutil.Process(os.getpid())

        p.nice(priority)

get_scan_combinations

get_scan_combinations(
    job: Job,
) -> list[dict[str, DatabaseValueType]]

Generates all combinations of scan parameters for a given job. Repeats each combination job.repetitions times.

Parameters:

Name Type Description Default
job Job

The job containing scan parameters.

required

Returns:

Type Description
list[dict[str, DatabaseValueType]]

A list of dictionaries, where each dictionary represents a combination of

list[dict[str, DatabaseValueType]]

parameter values.

Source code in src/icon/server/pre_processing/worker.py
def get_scan_combinations(job: Job) -> list[dict[str, DatabaseValueType]]:
    """Generates all combinations of scan parameters for a given job. Repeats each
    combination `job.repetitions` times.

    Args:
        job:
            The job containing scan parameters.

    Returns:
        A list of dictionaries, where each dictionary represents a combination of
        parameter values.
    """

    # Extract variable IDs and their scan values from the job's scan parameters
    parameter_values: dict[str, Any] = {}
    for scan_param in job.scan_parameters:
        parameter_values[scan_param.unique_id()] = scan_param.scan_values

    # Generate combinations using itertools.product
    keys = list(parameter_values.keys())
    values = [parameter_values[key] for key in keys]

    if values == []:
        return []

    combinations = itertools.product(*values)

    # Map each combination back to variable IDs
    return [
        dict(zip(keys, combination)) for combination in combinations
    ] * job.repetitions

parse_experiment_identifier

parse_experiment_identifier(
    identifier: str,
) -> tuple[str, str, str]

Parses an experiment identifier and returns: - the module path (e.g. ‘experiment_library.experiments.exp_name’) - the experiment class name (e.g. ‘ClassName’) - the experiment instance name (e.g. ‘Instance name’)

Example

“experiment_library.experiments.exp_name.ClassName (Instance name)” -> (“experiment_library.experiments.exp_name”, “ClassName”, “Instance name”)

Source code in src/icon/server/pre_processing/worker.py
def parse_experiment_identifier(identifier: str) -> tuple[str, str, str]:
    """
    Parses an experiment identifier and returns:
    - the module path (e.g. 'experiment_library.experiments.exp_name')
    - the experiment class name (e.g. 'ClassName')
    - the experiment instance name (e.g. 'Instance name')

    Example:
        "experiment_library.experiments.exp_name.ClassName (Instance name)"
        -> ("experiment_library.experiments.exp_name", "ClassName", "Instance name")
    """

    match = re.match(r"^(.*)\.([^. ]+) \(([^)]+)\)$", identifier)
    if match:
        return match.group(1), match.group(2), match.group(3)
    raise ValueError("Unexpected format of experiment identifier: ", identifier)

prepare_experiment_library_folder

prepare_experiment_library_folder(
    src_dir: str, pre_processing_task: PreProcessingTask
) -> None
Source code in src/icon/server/pre_processing/worker.py
def prepare_experiment_library_folder(
    src_dir: str, pre_processing_task: PreProcessingTask
) -> None:
    import icon.server.utils.git_helpers

    if not icon.server.utils.git_helpers.local_repo_exists(
        repository_dir=src_dir,
        repository=get_config().experiment_library.git_repository,
    ):
        icon.server.utils.git_helpers.git_clone(
            repository=get_config().experiment_library.git_repository,
            dir=src_dir,
        )

    icon.server.utils.git_helpers.checkout_commit(
        git_hash=pre_processing_task.git_commit_hash, cwd=src_dir
    )

icon.server.scheduler

Modules:

Name Description
scheduler

scheduler

Classes:

Name Description
Scheduler

Functions:

Name Description
initialise_job_tables
should_exit

Scheduler

Scheduler(
    pre_processing_queue: PriorityQueue[PreProcessingTask],
    **kwargs: Any,
)

Bases: Process

Methods:

Name Description
run

Attributes:

Name Type Description
kwargs
Source code in src/icon/server/scheduler/scheduler.py
def __init__(
    self,
    pre_processing_queue: queue.PriorityQueue[PreProcessingTask],
    **kwargs: Any,
) -> None:
    super().__init__()
    self.kwargs = kwargs
    self._pre_processing_queue = pre_processing_queue
kwargs instance-attribute
kwargs = kwargs
run
run() -> None
Source code in src/icon/server/scheduler/scheduler.py
def run(self) -> None:
    initialise_job_tables()
    while not should_exit():
        jobs = JobRepository.get_jobs_by_status_and_timeframe(
            status=JobStatus.SUBMITTED
        )
        for job_ in jobs:
            job = JobRepository.update_job_status(
                job=job_, status=JobStatus.PROCESSING
            )
            run = JobRun(job_id=job.id, scheduled_time=datetime.now(tz=timezone))
            run = JobRunRepository.insert_run(run=run)

            self._pre_processing_queue.put(
                PreProcessingTask(
                    job=job,
                    job_run=run,
                    git_commit_hash=job.git_commit_hash,
                    scan_parameters=job.scan_parameters,
                    local_parameters_timestamp=job.local_parameters_timestamp.astimezone(
                        tz=timezone
                    ).isoformat(),
                    priority=job.priority,
                    auto_calibration=job.auto_calibration,
                    debug_mode=job.debug_mode,
                    repetitions=job.repetitions,
                )
            )
        time.sleep(0.1)

initialise_job_tables

initialise_job_tables() -> None
Source code in src/icon/server/scheduler/scheduler.py
def initialise_job_tables() -> None:
    # update job_runs table
    job_runs = JobRunRepository.get_runs_by_status(
        status=[JobRunStatus.PENDING, JobRunStatus.PROCESSING]
    )
    for job_run in job_runs:
        JobRunRepository.update_run_by_id(
            run_id=job_run.id,
            status=JobRunStatus.CANCELLED,
            log="Cancelled during scheduler initialization.",
        )

    # update jobs table
    jobs = JobRepository.get_jobs_by_status_and_timeframe(status=JobStatus.PROCESSING)
    for job in jobs:
        JobRepository.update_job_status(job=job, status=JobStatus.PROCESSED)

should_exit

should_exit() -> bool
Source code in src/icon/server/scheduler/scheduler.py
def should_exit() -> bool:
    return False

icon.server.utils.types

Classes:

Name Description
UpdateQueue

UpdateQueue

Bases: TypedDict

Attributes:

Name Type Description
event Literal['update_parameters', 'calibration']
job_id NotRequired[int | None]
new_parameters NotRequired[dict[str, DatabaseValueType]]

event instance-attribute

event: Literal['update_parameters', 'calibration']

job_id instance-attribute

job_id: NotRequired[int | None]

new_parameters instance-attribute

new_parameters: NotRequired[dict[str, DatabaseValueType]]

icon.server.web_server

Modules:

Name Description
icon_server
sio_setup
socketio_emit_queue

icon_server

Classes:

Name Description
IconServer

Attributes:

Name Type Description
logger

logger module-attribute

logger = getLogger(__name__)

IconServer

Bases: Server

Methods:

Name Description
post_startup
post_startup async
post_startup() -> None
Source code in src/icon/server/web_server/icon_server.py
async def post_startup(self) -> None:
    sio = self._web_server._sio

    async def emit_worker() -> None:
        while not self.should_exit:
            try:
                emit_event = await asyncio.to_thread(emit_queue.get, timeout=1.0)
            except queue.Empty:
                continue
            await sio.emit(
                event=emit_event["event"],
                data=emit_event.get("data", None),
                room=emit_event.get("room", None),
            )

    asyncio.create_task(emit_worker())

    def devices_callback(
        full_access_path: str, value: Any, cached_value_dict: SerializedObject
    ) -> None:
        """This callback handles structural changes of devices. If the structure of
        a device changes, it will re-calculate the scannable parameters and emit
        them to the interested clients."""

        if full_access_path.startswith("devices.device_proxies"):
            emit_scannable_device_params_change(
                self._observer, full_access_path, value, cached_value_dict
            )

    self._observer.add_notification_callback(devices_callback)

sio_setup

Classes:

Name Description
AsyncServer

Functions:

Name Description
patch_sio_setup
setup_sio_events

Attributes:

Name Type Description
logger
pydase_setup_sio_events

logger module-attribute

logger = getLogger(__name__)

pydase_setup_sio_events module-attribute

pydase_setup_sio_events = setup_sio_events

AsyncServer

Bases: AsyncServer

Attributes:

Name Type Description
controlling_sid str | None

Socketio SID of the client controlling the frontend.

controlling_sid class-attribute instance-attribute
controlling_sid: str | None = None

Socketio SID of the client controlling the frontend.

patch_sio_setup

patch_sio_setup() -> None
Source code in src/icon/server/web_server/sio_setup.py
def patch_sio_setup() -> None:
    import pydase.server.web_server.sio_setup  # noqa: PLC0415

    pydase.server.web_server.sio_setup.setup_sio_events = setup_sio_events

setup_sio_events

setup_sio_events(
    sio: AsyncServer, state_manager: StateManager
) -> None
Source code in src/icon/server/web_server/sio_setup.py
def setup_sio_events(
    sio: AsyncServer,
    state_manager: pydase.data_service.state_manager.StateManager,
) -> None:
    pydase_setup_sio_events(sio, state_manager)

    sio.controlling_sid = None

    @sio.event
    async def connect(sid: str, environ: Any) -> None:
        client_id_header = environ.get("HTTP_X_CLIENT_ID", None)
        remote_username_header = environ.get("HTTP_REMOTE_USER", None)

        if remote_username_header is not None:
            log_id = f"user={click.style(remote_username_header, fg='cyan')}"
        elif client_id_header is not None:
            log_id = f"id={click.style(client_id_header, fg='cyan')}"
        else:
            log_id = f"sid={click.style(sid, fg='cyan')}"

        # send current controlling state to the newly connected client
        await sio.emit(
            "control_state", {"controlling_sid": sio.controlling_sid}, to=sid
        )

        async with sio.session(sid) as session:
            session["client_id"] = log_id
            logger.info("Client [%s] connected", session["client_id"])

    @sio.event
    async def disconnect(sid: str) -> None:
        if sid == sio.controlling_sid:
            sio.controlling_sid = None
            await sio.emit("control_state", {"controlling_sid": None})

        async with sio.session(sid) as session:
            logger.info("Client [%s] disconnected", session["client_id"])

    @sio.event
    async def take_control(sid: str) -> None:
        sio.controlling_sid = sid
        await sio.emit("control_state", {"controlling_sid": sio.controlling_sid})

    @sio.event
    async def release_control(sid: str) -> None:
        if sio.controlling_sid == sid:
            sio.controlling_sid = None
            await sio.emit("control_state", {"controlling_sid": None})

socketio_emit_queue

Classes:

Name Description
EmitEvent

Attributes:

Name Type Description
emit_queue Queue[EmitEvent]

emit_queue module-attribute

emit_queue: Queue[EmitEvent] = Queue()

EmitEvent

Bases: TypedDict

Attributes:

Name Type Description
data Any
event str
room NotRequired[str]
data instance-attribute
data: Any
event instance-attribute
event: str
room instance-attribute