Server¶
icon.server.api
icon.server.data_access.models.enums
icon.server.data_access.models.sqlite
icon.server.data_access.repositories
icon.server.hardware_processing
icon.server.post_processing
icon.server.pre_processing
icon.server.scheduler
icon.server.utils.types
icon.server.web_server
icon.server.api
¶
This module defines the API layer of ICON, implemented as a
pydase.DataService
.
The main entry point is the APIService
,
which is exposed by the IconServer
.
The IconServer
itself is a pydase.Server
hosting the API.
Structure¶
The APIService
aggregates multiple “controller” services as attributes. Each
controller is itself a pydase.DataService
exposing related API methods.
Background tasks¶
Controllers can define periodic
pydase
tasks,
which are asyncio tasks automatically started with the service.
api_service
¶
APIService
¶
APIService(
pre_processing_event_queues: list[Queue[UpdateQueue]],
)
Bases: DataService
Aggregates ICON’s API controllers and manages background tasks.
The APIService
groups multiple controllers, each of which is a
pydase.DataService
exposing related API methods. It also defines
background tasks for keeping experiment and parameter metadata
in sync with the experiment library and InfluxDB.
Note
Controllers are pydase.DataService
instances exposed as attributes to group
related API methods. Background tasks are implemented with
pydase
tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pre_processing_event_queues
|
list[Queue[UpdateQueue]]
|
Queues used by |
required |
Source code in src/icon/server/api/api_service.py
config
instance-attribute
¶
config = ConfigurationController()
Controller for managing and updating the application’s configuration.
data
instance-attribute
¶
data = ExperimentDataController()
Controller for accessing stored experiment data.
devices
instance-attribute
¶
devices = DevicesController()
Controller for managing external pydase-based devices.
experiments
instance-attribute
¶
experiments = ExperimentsController()
Controller for experiment metadata.
parameters
instance-attribute
¶
parameters = ParametersController()
Controller for parameter metadata and shared parameter values.
scans
instance-attribute
¶
scans = ScansController(
pre_processing_update_queues=pre_processing_event_queues
)
Controller for triggering update events for jobs across multiple worker processes.
scheduler
instance-attribute
¶
scheduler = SchedulerController(devices_controller=devices)
Controller to submit, inspect, and cancel scheduled jobs.
configuration_controller
¶
ConfigurationController
¶
Bases: DataService
Controller for managing and updating the application’s configuration.
This class provides an API to get and update the configuration, validate it, and save the updated configuration back to the source file.
get_config
¶
update_config_option
¶
Update a specific configuration option.
Traverses the configuration using the dot-separated key, updates the specified value, validates the entire configuration, and saves the changes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key
|
str
|
The dot-separated key of the configuration option (e.g., “experiment_library.git_repository”). |
required |
value
|
Any
|
The new value for the configuration option. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the update is successful, False otherwise. |
Source code in src/icon/server/api/configuration_controller.py
devices_controller
¶
DeviceParameterValueyType
module-attribute
¶
Allowed primitive types for device parameter values.
A parameter value sent to or retrieved from a device may be one of these
basic types. Quantities with units are handled separately via pydase.units.Quantity
.
DevicesController
¶
Bases: DataService
Controller for managing external pydase-based devices.
Maintains client connections to configured devices, exposes helpers to add/update device entries in SQLite, and provides async accessors for device parameter values through pydase proxies. Also discovers scannable device parameters for integration with ICON scans.
Source code in src/icon/server/api/devices_controller.py
device_proxies
instance-attribute
¶
Live pydase proxies keyed by device name.
add_device
¶
add_device(
*,
name: str,
url: str,
status: Literal["disabled", "enabled"] = "enabled",
description: str | None = None,
retry_delay_seconds: float = 0.0,
retry_attempts: int = 3,
) -> Device
Create a device record in SQLite and (optionally) connect to it.
If status=="enabled"
, a non-blocking pydase client is created and its
proxy is registered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Unique device name. |
required |
url
|
str
|
pydase server URL of the device. |
required |
status
|
Literal['disabled', 'enabled']
|
Whether the device should be connected immediately. |
'enabled'
|
description
|
str | None
|
Optional human-readable description. |
None
|
retry_delay_seconds
|
float
|
Backoff delay used by device-side logic. |
0.0
|
retry_attempts
|
int
|
Number of retries used by device-side logic. |
3
|
Returns:
Type | Description |
---|---|
Device
|
The |
Source code in src/icon/server/api/devices_controller.py
get_devices_by_status
¶
get_devices_by_status(
*, status: DeviceStatus | None = None
) -> dict[str, DeviceDict]
List devices (optionally filtered by status) with reachability & scan info.
Augments each device entry with
reachable
: Whether a live proxy is connected.scannable_params
: Flat list of scannable parameter access paths.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status
|
DeviceStatus | None
|
Optional filter ( |
None
|
Returns:
Type | Description |
---|---|
dict[str, DeviceDict]
|
Mapping from device name to a |
Source code in src/icon/server/api/devices_controller.py
get_parameter_value
async
¶
Get a parameter value from a connected device.
Logs a warning if the device is not connected or not found.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Device name. |
required |
parameter_id
|
str
|
Access path on the device service. |
required |
Returns:
Type | Description |
---|---|
Any
|
The parameter value as returned by the device, or |
Source code in src/icon/server/api/devices_controller.py
update_device
¶
update_device(
*,
name: str,
status: Literal["disabled", "enabled"] | None = None,
url: str | None = None,
retry_attempts: int | None = None,
retry_delay_seconds: float | None = None,
) -> Device
Update a device record and its live connection.
When transitioning to disabled
, the client is disconnected and removed.
When transitioning to enabled
, a client is (re)created and registered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Device name. |
required |
status
|
Literal['disabled', 'enabled'] | None
|
Target enable/disable status. |
None
|
url
|
str | None
|
Updated pydase URL. |
None
|
retry_attempts
|
int | None
|
Updated retry attempts metadata. |
None
|
retry_delay_seconds
|
float | None
|
Updated retry delay metadata. |
None
|
Returns:
Type | Description |
---|---|
Device
|
The updated |
Source code in src/icon/server/api/devices_controller.py
update_parameter_value
async
¶
update_parameter_value(
*,
name: str,
parameter_id: str,
new_value: DeviceParameterValueyType | QuantityDict,
type_: Literal["float", "int", "Quantity"],
) -> None
Set a parameter value on a connected device.
Performs type-normalization (float
, int
, or Quantity
) before delegating
to the device client.
Logs a warning if the device is not connected or not found.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Device name. |
required |
parameter_id
|
str
|
Access path on the device service. |
required |
new_value
|
DeviceParameterValueyType | QuantityDict
|
New value (native type or quantity dict). |
required |
type_
|
Literal['float', 'int', 'Quantity']
|
Expected type of the value for normalization. |
required |
Source code in src/icon/server/api/devices_controller.py
experiment_data_controller
¶
ExperimentDataController
¶
Bases: DataService
Controller for accessing stored experiment data.
Provides API methods to fetch experiment data associated with jobs.
get_experiment_data_by_job_id
async
¶
get_experiment_data_by_job_id(
job_id: int,
) -> ExperimentData
Return experiment data for a given job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
The unique identifier of the job. |
required |
Returns:
Type | Description |
---|---|
ExperimentData
|
The experiment data linked to the job. |
Source code in src/icon/server/api/experiment_data_controller.py
experiments_controller
¶
ExperimentsController
¶
Bases: DataService
Controller for experiment metadata.
Stores the current set of experiments and exposes them to the API. Updates are compared against the existing metadata and, if changes are detected, an update event is pushed to the Socket.IO emit queue.
Source code in src/icon/server/api/experiments_controller.py
get_experiments
¶
get_experiments() -> ExperimentDict
Return the current experiment metadata.
Returns:
Type | Description |
---|---|
ExperimentDict
|
Mapping of experiment IDs to their metadata. |
models
¶
device_dict
¶
experiment_dict
¶
ExperimentDict
module-attribute
¶
ExperimentDict = dict[str, ExperimentMetadata]
Dictionary mapping the unique experiment identifier to its metadata.
Example
experiment_dict: ExperimentDict = {
"experiment_library.experiments.my_experiment.MyExperiment (Cool Det)": {
"class_name": "MyExperiment",
"constructor_kwargs": {
"name": "Cool Det",
},
"parameters": {
"Local Parameters": {
"namespace='experiment_library.experiments.my_experiment.MyExperiment.Cool Det' parameter_group='default' param_type='ParameterTypes.AMPLITUDE'": {
"allowed_values": None,
"default_value": 0.0,
"display_name": "amplitude",
"max_value": 100.0,
"min_value": 0.0,
"unit": "%",
},
},
"ParameterGroup": {
"namespace='experiment_library.globals.global_parameters' parameter_group='ParameterGroup' param_type='ParameterTypes.AMPLITUDE'": {
"allowed_values": None,
"default_value": 0.0,
"display_name": "amplitude",
"max_value": 100.0,
"min_value": 0.0,
"unit": "%",
},
},
},
},
}
parameter_metadata
¶
parameters_controller
¶
ParametersController
¶
Bases: DataService
Controller for parameter metadata and shared parameter values.
Maintains metadata for all parameters and their display groups, exposes read/write access to parameter value via the API, and ensures parameters are initialized in the InfluxDB backend.
Source code in src/icon/server/api/parameters_controller.py
get_all_parameters
¶
get_display_groups
¶
get_display_groups() -> dict[
str, dict[str, ParameterMetadata]
]
Return metadata grouped by display group.
Returns:
Type | Description |
---|---|
dict[str, dict[str, ParameterMetadata]]
|
Mapping from display group names to parameter metadata. |
Source code in src/icon/server/api/parameters_controller.py
initialise_parameters_repository
¶
Initialize the global ParametersRepository
.
Loads existing parameters from InfluxDB, populates the shared parameters dict in
the shared resource manager, and marks the ParametersRepository
as
initialized.
Source code in src/icon/server/api/parameters_controller.py
get_added_removed_and_updated_keys
¶
get_added_removed_and_updated_keys(
new_dict: dict[str, Any], cached_dict: dict[str, Any]
) -> tuple[list[str], list[str], list[str]]
Compare two dictionaries and return added, removed, and updated keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_dict
|
dict[str, Any]
|
The latest dictionary state. |
required |
cached_dict
|
dict[str, Any]
|
The previously cached dictionary state. |
required |
Returns:
Type | Description |
---|---|
tuple[list[str], list[str], list[str]]
|
A tuple of three lists:
|
Source code in src/icon/server/api/parameters_controller.py
scans_controller
¶
ScansController
¶
ScansController(
pre_processing_update_queues: list[Queue[UpdateQueue]],
)
Bases: DataService
Controller for triggering update events for jobs across multiple worker processes.
Each worker process has its own update queue ([multiprocessing.Queue][]
), which
this controller writes to when an update event is triggered.
Source code in src/icon/server/api/scans_controller.py
trigger_update_job_params
async
¶
trigger_update_job_params(
*, job_id: int | None = None
) -> None
Triggers an ‘update_parameters’ event for the given job ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int | None
|
The ID of the job whose parameters should be updated. If None, all jobs will update their parameters. |
None
|
Source code in src/icon/server/api/scans_controller.py
scheduler_controller
¶
SchedulerController
¶
SchedulerController(devices_controller: DevicesController)
Bases: DataService
Controller to submit, inspect, and cancel scheduled jobs.
Provides methods to submit new jobs, cancel pending or running jobs, and query jobs or runs by ID or status. Ensures scan parameters are cast to the correct runtime type before persisting them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
devices_controller
|
DevicesController
|
Reference to the devices controller. Used to read current values of device parameters when casting scan values. |
required |
Source code in src/icon/server/api/scheduler_controller.py
cancel_job
¶
cancel_job(*, job_id: int) -> None
Cancel a queued or running job.
The following status updates are performed:
- Job: PROCESSING/SUBMITTED → PROCESSED
- JobRun: PENDING/PROCESSING → CANCELLED
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
ID of the job to cancel. |
required |
Source code in src/icon/server/api/scheduler_controller.py
get_job_by_id
¶
get_job_run_by_id
¶
get_scheduled_jobs
¶
get_scheduled_jobs(
*,
status: JobStatus | None = None,
start: str | None = None,
stop: str | None = None,
) -> dict[int, Job]
List jobs filtered by status and optional ISO timeframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status
|
JobStatus | None
|
Optional job status filter. |
None
|
start
|
str | None
|
Optional ISO8601 start timestamp (inclusive). |
None
|
stop
|
str | None
|
Optional ISO8601 stop timestamp (exclusive). |
None
|
Returns:
Type | Description |
---|---|
dict[int, Job]
|
Mapping from job ID to job record. |
Source code in src/icon/server/api/scheduler_controller.py
submit_job
async
¶
submit_job(
*,
experiment_id: str,
scan_parameters: list[ScanParameter],
priority: int = 20,
local_parameters_timestamp: datetime | None = None,
repetitions: int = 1,
number_of_shots: int = 50,
git_commit_hash: str | None = None,
auto_calibration: bool = False,
) -> int
Create and submit a job with typed scan parameters.
Each scan parameter’s values are cast to the current type of the target
parameter (device parameter via DevicesController
or shared parameter via
ParametersRepository
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiment_id
|
str
|
Experiment identifier (from experiment library). |
required |
scan_parameters
|
list[ScanParameter]
|
List of scan parameter specs (id, values, optional device_name). |
required |
priority
|
int
|
Higher values run sooner. |
20
|
local_parameters_timestamp
|
datetime | None
|
ISO timestamp to snapshot local parameters;
defaults to |
None
|
repetitions
|
int
|
Number of experiment repetitions. |
1
|
number_of_shots
|
int
|
Shots per data point. |
50
|
git_commit_hash
|
str | None
|
Git commit to associate with the job; if |
None
|
auto_calibration
|
bool
|
Whether to run auto-calibration for the job. |
False
|
Returns:
Type | Description |
---|---|
int
|
The persisted job ID. |
Source code in src/icon/server/api/scheduler_controller.py
status_controller
¶
StatusController
¶
Bases: DataService
Controller for system status monitoring.
Periodically checks availability of InfluxDB and hardware and emits status events via the Socket.IO queue.
Source code in src/icon/server/api/status_controller.py
check_hardware_status
async
¶
Check hardware connection and reconnect if necessary.
Ensures the hardware controller matches the configured host/port and reconnects in a background thread if required.
Emits a "status.hardware"
event to the Socket.IO queue.
Source code in src/icon/server/api/status_controller.py
check_influxdb_status
¶
Check if InfluxDB is responsive and update status.
Emits a "status.influxdb"
event to the Socket.IO queue.
Source code in src/icon/server/api/status_controller.py
get_status
¶
Return the current system status flags.
Returns:
Type | Description |
---|---|
dict[str, bool]
|
A dictionary with:
|
Source code in src/icon/server/api/status_controller.py
icon.server.data_access.models.enums
¶
This module defines enums used by the SQLAlchemy models.
These enums represent database-level states for jobs, job runs, and devices. They are stored as strings in the database and used throughout ICON’s scheduling and device management logic.
DeviceStatus
¶
JobRunStatus
¶
Bases: Enum
Lifecycle states of a job run.
CANCELLED
class-attribute
instance-attribute
¶
Run was cancelled before completion.
FAILED
class-attribute
instance-attribute
¶
Run ended unsuccessfully due to an error.
PENDING
class-attribute
instance-attribute
¶
Run is queued but has not started yet.
PROCESSING
class-attribute
instance-attribute
¶
Run is currently executing.
JobStatus
¶
Bases: Enum
Lifecycle states of a job submission.
PROCESSED
class-attribute
instance-attribute
¶
Job has finished or was cancelled and is no longer active.
PROCESSING
class-attribute
instance-attribute
¶
Job has been put into the pre-processing task queue.
SUBMITTED
class-attribute
instance-attribute
¶
Job has been created and is waiting to be scheduled.
icon.server.data_access.models.sqlite
¶
This module contains the SQLAlchemy models for ICON.
All models must be imported and added to the __all__
list here so that Alembic can
correctly detect them during schema autogeneration.
Alembic inspects Base.metadata
, which is only populated with models that are
actually imported at runtime.
Base
¶
Bases: DeclarativeBase
Base class for all SQLAlchemy ORM models in ICON.
This class configures the declarative mapping and provides a datetime type mapping for all models that inherit from it.
type_annotation_map
class-attribute
¶
Custom type mapping used when interpreting Python type annotations.
Currently, datetime.datetime
is mapped to
sqlalchemy.TIMESTAMP(timezone=True)
to ensure timezone-aware timestamps across all
models.
Device
¶
Bases: Base
SQLAlchemy model for a registered device.
Represents an external device accessible via a pydase service. Stores configuration, connection details, and retry behaviour. A device may be linked to multiple scan parameters.
created
class-attribute
instance-attribute
¶
created: Mapped[datetime] = mapped_column(
default=lambda: now(timezone)
)
Timestamp when the device entry was created.
description
class-attribute
instance-attribute
¶
description: Mapped[str | None] = mapped_column(
default=None
)
Optional human-readable description of the device.
id
class-attribute
instance-attribute
¶
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True
)
Primary key identifier for the device.
name
class-attribute
instance-attribute
¶
name: Mapped[str] = mapped_column(unique=True, index=True)
Unique name of the device.
retry_attempts
class-attribute
instance-attribute
¶
retry_attempts: Mapped[int] = mapped_column(
default=3, nullable=False
)
Number of attempts to verify the device value was set correctly.
retry_delay_seconds
class-attribute
instance-attribute
¶
retry_delay_seconds: Mapped[float] = mapped_column(
default=0.0, nullable=False
)
Delay in seconds between retry attempts
scan_parameters
class-attribute
instance-attribute
¶
scan_parameters: Mapped[list[ScanParameter]] = relationship(
"ScanParameter", back_populates="device"
)
Relationship to scan parameters linked to this device.
status
class-attribute
instance-attribute
¶
status: Mapped[DeviceStatus] = mapped_column(
default=ENABLED, index=True
)
Current status of the device (enabled or disabled).
url
class-attribute
instance-attribute
¶
url: Mapped[str] = mapped_column()
pydase service URL of the device.
ExperimentSource
¶
Bases: Base
SQLAlchemy model for experiment sources.
Represents a unique experiment identifier from the experiment library. Each experiment source may be linked to multiple jobs.
experiment_id
class-attribute
instance-attribute
¶
experiment_id: Mapped[str] = mapped_column()
Unique experiment identifier string (as defined in the experiment library).
id
class-attribute
instance-attribute
¶
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True
)
Primary key identifier for the experiment source.
jobs
class-attribute
instance-attribute
¶
jobs: Mapped[list[Job]] = relationship(
back_populates="experiment_source"
)
Relationship to jobs associated with this experiment source.
Job
¶
Bases: Base
SQLAlchemy model for experiment jobs.
Represents a scheduled or running experiment job, including its metadata, status, and relationships to experiment sources, runs, and scan parameters.
Constraints
priority
must be between 0 and 20.- Indexed by
(experiment_source_id, status, priority, created)
.
auto_calibration
class-attribute
instance-attribute
¶
auto_calibration: Mapped[bool] = mapped_column(
default=False
)
Whether auto-calibration is enabled for this job. Currently unused.
created
class-attribute
instance-attribute
¶
created: Mapped[datetime] = mapped_column(
default=lambda: now(timezone)
)
Timestamp when the job was created. This cannot be set manually.
debug_mode
class-attribute
instance-attribute
¶
debug_mode: Mapped[bool] = mapped_column(default=False)
Whether the job was submitted in debug mode (no commit hash).
experiment_source
class-attribute
instance-attribute
¶
experiment_source: Mapped[ExperimentSource] = relationship(
back_populates="jobs"
)
Relationship to the experiment source.
experiment_source_id
class-attribute
instance-attribute
¶
experiment_source_id: Mapped[int] = mapped_column(
ForeignKey("experiment_sources.id")
)
Foreign key referencing the associated experiment source.
git_commit_hash
class-attribute
instance-attribute
¶
git_commit_hash: Mapped[str | None] = mapped_column(
default=None
)
Git commit hash of the experiment code associated with the job.
id
class-attribute
instance-attribute
¶
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True
)
Primary key identifier for the job.
local_parameters_timestamp
class-attribute
instance-attribute
¶
local_parameters_timestamp: Mapped[datetime] = (
mapped_column(default=now(timezone))
)
Timestamp of the local parameter snapshot used for this job.
number_of_shots
class-attribute
instance-attribute
¶
number_of_shots: Mapped[int] = mapped_column(default=50)
Number of shots per repetition.
parent_job
class-attribute
instance-attribute
¶
parent_job: Mapped[Job | None] = relationship(
"Job",
remote_side=[id],
back_populates="resubmitted_jobs",
)
Relationship to the parent job from which this job was resubmitted.
parent_job_id
class-attribute
instance-attribute
¶
parent_job_id: Mapped[int | None] = mapped_column(
ForeignKey("job_submissions.id"), nullable=True
)
Foreign key referencing the original job if this job was resubmitted.
priority
class-attribute
instance-attribute
¶
priority: Mapped[int] = mapped_column(default=20)
Job priority, between 0 (lowest) and 20 (highest).
repetitions
class-attribute
instance-attribute
¶
repetitions: Mapped[int] = mapped_column(default=1)
Number of times the experiment should be repeated.
resubmitted_jobs
class-attribute
instance-attribute
¶
resubmitted_jobs: Mapped[list[Job]] = relationship(
"Job", back_populates="parent_job"
)
List of jobs resubmitted from this job.
run
class-attribute
instance-attribute
¶
run: Mapped[JobRun] = relationship(back_populates='job')
Relationship to the job run associated with this job.
scan_parameters
class-attribute
instance-attribute
¶
scan_parameters: Mapped[list[ScanParameter]] = relationship(
back_populates="job"
)
List of scan parameters associated with this job.
status
class-attribute
instance-attribute
¶
status: Mapped[JobStatus] = mapped_column(default=SUBMITTED)
Current status of the job (submitted, processing, etc.).
JobRun
¶
Bases: Base
SQLAlchemy model for job runs.
Represents the execution of a job, including its scheduled time, current status, and log messages.
Constraints
- Indexed by
(job_id, status, scheduled_time)
. scheduled_time
must be unique across runs.
id
class-attribute
instance-attribute
¶
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True
)
Primary key identifier for the job run.
job
class-attribute
instance-attribute
¶
job: Mapped[Job] = relationship(back_populates='run')
Relationship to the job associated with this run.
job_id
class-attribute
instance-attribute
¶
job_id: Mapped[int] = mapped_column(
ForeignKey("job_submissions.id")
)
Foreign key referencing the job being executed.
log
class-attribute
instance-attribute
¶
log: Mapped[str | None] = mapped_column(default=None)
Optional log message for this run (e.g., cancellation reason).
scheduled_time
class-attribute
instance-attribute
¶
scheduled_time: Mapped[datetime] = mapped_column(
default=now(timezone)
)
Time when the run was scheduled to start.
status
class-attribute
instance-attribute
¶
status: Mapped[JobRunStatus] = mapped_column(
default=PENDING
)
Current status of the run (pending, processing, cancelled, etc.).
ScanParameter
¶
Bases: Base
SQLAlchemy model for scan parameters.
Represents a parameter scanned during a job execution. Each parameter is linked to a job and optionally to a device.
device
class-attribute
instance-attribute
¶
device: Mapped[Device | None] = relationship(
back_populates="scan_parameters", lazy="joined"
)
Relationship to the device associated with this parameter.
device_id
class-attribute
instance-attribute
¶
device_id: Mapped[int | None] = mapped_column(
ForeignKey("devices.id"), nullable=True
)
Foreign key referencing the associated device, if any.
id
class-attribute
instance-attribute
¶
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True
)
Primary key identifier for the scan parameter.
job
class-attribute
instance-attribute
¶
job: Mapped[Job] = relationship(
back_populates="scan_parameters"
)
Relationship to the job.
job_id
class-attribute
instance-attribute
¶
job_id: Mapped[int] = mapped_column(
ForeignKey("job_submissions.id")
)
Foreign key referencing the job this parameter belongs to.
scan_values
class-attribute
instance-attribute
¶
scan_values: Mapped[list[DatabaseValueType]] = (
mapped_column(JSONEncodedList, nullable=False)
)
List of values scanned for this parameter (stored as JSON).
variable_id
class-attribute
instance-attribute
¶
variable_id: Mapped[str] = mapped_column()
Identifier of the parameter being scanned.
icon.server.data_access.repositories
¶
This module contains the repository layer for ICON’s data access.
Repositories encapsulate database access logic and hide the underlying persistence technology (SQLAlchemy sessions, InfluxDB queries, etc.) from the rest of the application. They expose simple, intention-revealing methods for creating, retrieving, and updating domain objects, while emitting Socket.IO events when relevant.
By using repositories, controllers and services can work with high-level operations (e.g. “submit a job”, “update a device”) without needing to know how the data is stored or which database backend is used. This keeps the codebase modular, easier to maintain, and allows the persistence layer to evolve independently of business logic.
device_repository
¶
DeviceRepository
¶
Repository for Device
entities.
Provides methods to create, update, and query devices in the SQLite database. All methods open their own SQLAlchemy session and return detached ORM objects.
add_device
staticmethod
¶
Insert a new device into the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
device
|
Device
|
Device instance to persist. |
required |
Returns:
Type | Description |
---|---|
Device
|
The persisted device with database-generated fields (e.g., |
Source code in src/icon/server/data_access/repositories/device_repository.py
get_all_device_names
staticmethod
¶
get_device_by_id
staticmethod
¶
Return a device by database ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id
|
int
|
Primary key identifier of the device. |
required |
Returns:
Type | Description |
---|---|
Device
|
The matching device. |
Raises:
Type | Description |
---|---|
NoResultFound
|
If no device exists with the given ID. |
Source code in src/icon/server/data_access/repositories/device_repository.py
get_device_by_name
staticmethod
¶
Return a device by unique name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Device name. |
required |
Returns:
Type | Description |
---|---|
Device
|
The matching device. |
Raises:
Type | Description |
---|---|
NoDeviceFoundError
|
If no device exists with the given name. |
Source code in src/icon/server/data_access/repositories/device_repository.py
get_devices_by_status
staticmethod
¶
get_devices_by_status(
*, status: DeviceStatus | None = None
) -> Sequence[Device]
Return devices filtered by status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status
|
DeviceStatus | None
|
Optional device status to filter on. |
None
|
Returns:
Type | Description |
---|---|
Sequence[Device]
|
All devices matching the filter (or all devices if no filter is given). |
Source code in src/icon/server/data_access/repositories/device_repository.py
update_device
staticmethod
¶
update_device(
*,
name: str,
url: str | None = None,
status: DeviceStatus | None = None,
retry_attempts: int | None = None,
retry_delay_seconds: float | None = None,
) -> Device
Update an existing device by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Unique device name. |
required |
url
|
str | None
|
New device URL (cannot change if the device is enabled). |
None
|
status
|
DeviceStatus | None
|
New device status (enabled/disabled). |
None
|
retry_attempts
|
int | None
|
Updated retry attempt count. |
None
|
retry_delay_seconds
|
float | None
|
Updated retry delay in seconds. |
None
|
Returns:
Type | Description |
---|---|
Device
|
The updated device. |
Raises:
Type | Description |
---|---|
RuntimeError
|
If attempting to change the URL of an enabled device. |
NoDeviceFoundError
|
If no device with the given name exists. |
Source code in src/icon/server/data_access/repositories/device_repository.py
experiment_data_repository
¶
ExperimentData
¶
Bases: TypedDict
Container for all experiment data returned to the API.
json_sequences
instance-attribute
¶
List of [index, sequence_json] pairs (list for pydase JSON compatibility).
plot_windows
instance-attribute
¶
plot_windows: PlotWindowsDict
Plot window metadata grouped by channel class.
result_channels
instance-attribute
¶
Result channels as channel_name -> {index -> value}.
scan_parameters
instance-attribute
¶
Scan parameters as param_id -> {index -> value/timestamp}.
shot_channels
instance-attribute
¶
Shot channels as channel_name -> {index -> values}.
ExperimentDataPoint
¶
Bases: ResultDict
A single data point with its context.
ExperimentDataRepository
¶
Repository for HDF5-based experiment data.
Manages HDF5 file creation and updates (metadata, results, parameters), with file-level locking to support concurrent writers.
get_experiment_data_by_job_id
staticmethod
¶
get_experiment_data_by_job_id(
*, job_id: int
) -> ExperimentData
Load all stored data for a job from its HDF5 file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
Job identifier. |
required |
Returns:
Type | Description |
---|---|
ExperimentData
|
Experiment data payload suitable for the API. |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 |
|
update_metadata_by_job_id
staticmethod
¶
update_metadata_by_job_id(
*,
job_id: int,
number_of_shots: int,
repetitions: int,
readout_metadata: ReadoutMetadata,
local_parameter_timestamp: datetime | None = None,
parameters: list[ScanParameter] = [],
) -> None
Create or update HDF5 metadata for a job.
Initializes datasets, sets file-level attributes, and stores plot window metadata for result/shot/vector channels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
Job identifier. |
required |
number_of_shots
|
int
|
Shots per data point. |
required |
repetitions
|
int
|
Number of repetitions. |
required |
readout_metadata
|
ReadoutMetadata
|
Plot/window/channel metadata. |
required |
local_parameter_timestamp
|
datetime | None
|
Optional timestamp for local parameters. |
None
|
parameters
|
list[ScanParameter]
|
Scan parameters. |
[]
|
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
|
write_experiment_data_by_job_id
staticmethod
¶
write_experiment_data_by_job_id(
*, job_id: int, data_point: ExperimentDataPoint
) -> None
Append a complete data point to the HDF5 file and emit an event.
Writes scan parameters, result/shot/vector channels, and sequence JSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
Job identifier. |
required |
data_point
|
ExperimentDataPoint
|
Data point payload to append. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 |
|
write_parameter_update_by_job_id
staticmethod
¶
write_parameter_update_by_job_id(
*,
job_id: int,
timestamp: str,
parameter_values: dict[str, str | int | float | bool],
) -> None
Append parameter updates under the ‘parameters’ group.
Creates a dataset per parameter storing (timestamp, value) entries. Appends only when the value changed from the last entry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
Job identifier. |
required |
timestamp
|
str
|
ISO timestamp string. |
required |
parameter_values
|
dict[str, str | int | float | bool]
|
Mapping of parameter id to value. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
PlotWindowMetadata
¶
Bases: TypedDict
Metadata describing a single plot window for visualization in the frontend.
This metadata includes the plot’s index within its type, the type of plot (e.g., vector, histogram, or readout), and the list of channel names that are to be plotted in the respective window.
channel_names
instance-attribute
¶
A list of channel names to be plotted in this window
PlotWindowsDict
¶
Bases: TypedDict
Grouping of plot window metadata by channel type.
result_channels
instance-attribute
¶
result_channels: list[PlotWindowMetadata]
Plot window metadata for result channels.
shot_channels
instance-attribute
¶
shot_channels: list[PlotWindowMetadata]
Plot window metadata for shot channels.
vector_channels
instance-attribute
¶
vector_channels: list[PlotWindowMetadata]
Plot window metadata for vector channels.
ReadoutMetadata
¶
Bases: TypedDict
Metadata describing readout/shot/vector channels and their plot windows.
readout_channel_names
instance-attribute
¶
A list of all readout channel names
readout_channel_windows
instance-attribute
¶
readout_channel_windows: list[PlotWindowMetadata]
List of PlotWindowMetadata
of result channels
shot_channel_names
instance-attribute
¶
A list of all shot channel names
shot_channel_windows
instance-attribute
¶
shot_channel_windows: list[PlotWindowMetadata]
List of PlotWindowMetadata
of shot channels
vector_channel_names
instance-attribute
¶
A list of all vector channel names
vector_channel_windows
instance-attribute
¶
vector_channel_windows: list[PlotWindowMetadata]
List of PlotWindowMetadata
of vector channels
ResultDict
¶
get_filename_by_job_id
¶
Return the HDF5 filename for a job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
Job identifier. |
required |
Returns:
Type | Description |
---|---|
str
|
Filename derived from the job’s scheduled time (e.g., “ |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
resize_dataset
¶
Resize a dataset to accommodate writing at a target index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset
|
Dataset
|
HDF5 dataset to resize. |
required |
next_index
|
int
|
Index that must be writable. |
required |
axis
|
int
|
Axis along which to grow. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
write_results_to_dataset
¶
write_results_to_dataset(
h5file: File,
data_point_index: int,
result_channels: dict[str, float],
number_of_data_points: int,
) -> None
Write scalar result channels into the ‘result_channels’ dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
h5file
|
File
|
Open HDF5 file handle. |
required |
data_point_index
|
int
|
Index of the current data point. |
required |
result_channels
|
dict[str, float]
|
Mapping of channel name to float value. |
required |
number_of_data_points
|
int
|
Current total number of stored data points. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
write_scan_parameters_and_timestamp_to_dataset
¶
write_scan_parameters_and_timestamp_to_dataset(
h5file: File,
data_point_index: int,
scan_params: dict[str, DatabaseValueType],
timestamp: str,
number_of_data_points: int,
) -> None
Write scan parameters and timestamp to the ‘scan_parameters’ dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
h5file
|
File
|
Open HDF5 file handle. |
required |
data_point_index
|
int
|
Index of the current data point. |
required |
scan_params
|
dict[str, DatabaseValueType]
|
Parameter values for this data point. |
required |
timestamp
|
str
|
Acquisition timestamp (ISO string). |
required |
number_of_data_points
|
int
|
Current total number of stored data points. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
write_sequence_json_to_dataset
¶
Append sequence JSON if it changed since the last entry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
h5file
|
File
|
Open HDF5 file handle. |
required |
data_point_index
|
int
|
Index of the current data point. |
required |
sequence_json
|
str
|
Serialized sequence JSON to append. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
write_shot_channels_to_datasets
¶
write_shot_channels_to_datasets(
h5file: File,
data_point_index: int,
shot_channels: dict[str, list[int]],
number_of_data_points: int,
number_of_shots: int,
) -> None
Write per-shot data into datasets under the ‘shot_channels’ group.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
h5file
|
File
|
Open HDF5 file handle. |
required |
data_point_index
|
int
|
Index of the current data point. |
required |
shot_channels
|
dict[str, list[int]]
|
Mapping of channel to per-shot integers. |
required |
number_of_data_points
|
int
|
Current total number of stored data points. |
required |
number_of_shots
|
int
|
Expected number of shots per channel. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
write_vector_channels_to_datasets
¶
write_vector_channels_to_datasets(
h5file: File,
data_point_index: int,
vector_channels: dict[str, list[float]],
) -> None
Write vector channel data under the ‘vector_channels’ group.
Creates one dataset per channel per data point.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
h5file
|
File
|
Open HDF5 file handle. |
required |
data_point_index
|
int
|
Index of the current data point. |
required |
vector_channels
|
dict[str, list[float]]
|
Mapping of channel to vector of floats. |
required |
Source code in src/icon/server/data_access/repositories/experiment_data_repository.py
experiment_source_repository
¶
ExperimentSourceRepository
¶
Repository for ExperimentSource
entities.
Provides methods to query and persist experiment sources in the database. Encapsulates the SQLAlchemy session and query logic.
get_or_create_experiment
staticmethod
¶
get_or_create_experiment(
*, experiment_source: ExperimentSource
) -> ExperimentSource
Return an existing experiment source or create it if not found.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiment_source
|
ExperimentSource
|
The experiment source to look up by |
required |
Returns:
Type | Description |
---|---|
ExperimentSource
|
The existing or newly created experiment source. |
Source code in src/icon/server/data_access/repositories/experiment_source_repository.py
job_repository
¶
JobRepository
¶
Repository for Job
entities.
Encapsulates SQLAlchemy session/query logic and emits Socket.IO events on changes. All methods open their own session and return detached ORM objects.
get_job_by_experiment_source_and_status
staticmethod
¶
get_job_by_experiment_source_and_status(
*,
experiment_source_id: int,
status: JobStatus | None = None,
) -> Sequence[Row[tuple[Job]]]
List jobs for an experiment source, optionally filtered by status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiment_source_id
|
int
|
Foreign key of the experiment source. |
required |
status
|
JobStatus | None
|
Optional status filter. |
None
|
Returns:
Type | Description |
---|---|
Sequence[Row[tuple[Job]]]
|
Rows containing |
Source code in src/icon/server/data_access/repositories/job_repository.py
get_job_by_id
staticmethod
¶
get_job_by_id(
*,
job_id: int,
load_experiment_source: bool = False,
load_scan_parameters: bool = False,
) -> Job
Fetch a job by ID with optional eager-loading.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
Job identifier. |
required |
load_experiment_source
|
bool
|
If True, eager-load |
False
|
load_scan_parameters
|
bool
|
If True, eager-load |
False
|
Returns:
Type | Description |
---|---|
Job
|
The requested job. |
Source code in src/icon/server/data_access/repositories/job_repository.py
get_jobs_by_status_and_timeframe
staticmethod
¶
get_jobs_by_status_and_timeframe(
*,
status: JobStatus | None = None,
start: datetime | None = None,
stop: datetime | None = None,
) -> Sequence[Job]
List jobs filtered by status and optional creation time window.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status
|
JobStatus | None
|
Optional status filter. |
None
|
start
|
datetime | None
|
Inclusive start timestamp. |
None
|
stop
|
datetime | None
|
Exclusive stop timestamp. |
None
|
Returns:
Type | Description |
---|---|
Sequence[Job]
|
Matching jobs ordered by priority then creation time. |
Source code in src/icon/server/data_access/repositories/job_repository.py
resubmit_job_by_id
staticmethod
¶
Clone an existing job as a new submission.
If the source job is not itself a resubmission, the new job’s parent_job_id
is
set to the original job’s id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
ID of the job to clone. |
required |
Returns:
Type | Description |
---|---|
Job
|
The newly created job. |
Source code in src/icon/server/data_access/repositories/job_repository.py
submit_job
staticmethod
¶
Insert a new job and emit a creation event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job
|
Job
|
The job instance to persist. |
required |
Returns:
Type | Description |
---|---|
Job
|
The persisted job with generated fields populated. |
Source code in src/icon/server/data_access/repositories/job_repository.py
update_job_status
staticmethod
¶
Update a job’s status and emit an update event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job
|
Job
|
Job to update (identified by its |
required |
status
|
JobStatus
|
New job status. |
required |
Returns:
Type | Description |
---|---|
Job
|
The updated job with relationships loaded. |
Source code in src/icon/server/data_access/repositories/job_repository.py
job_run_repository
¶
JobRunRepository
¶
Repository for JobRun
entities.
Provides methods to insert, update, and query job runs from the database. Emits Socket.IO events when job runs are created or updated.
get_run_by_job_id
staticmethod
¶
Return the run associated with a given job ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
ID of the job. |
required |
load_job
|
bool
|
If True, eagerly load the related |
False
|
Returns:
Type | Description |
---|---|
JobRun
|
The run linked to the given job. |
Source code in src/icon/server/data_access/repositories/job_run_repository.py
get_runs_by_status
staticmethod
¶
get_runs_by_status(
*,
status: JobRunStatus | list[JobRunStatus],
load_job: bool = False,
) -> Sequence[JobRun]
Return job runs filtered by status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status
|
JobRunStatus | list[JobRunStatus]
|
Single or list of run statuses to filter on. |
required |
load_job
|
bool
|
If True, eagerly load the related |
False
|
Returns:
Type | Description |
---|---|
Sequence[JobRun]
|
All matching runs. |
Source code in src/icon/server/data_access/repositories/job_run_repository.py
get_scheduled_time_by_job_id
staticmethod
¶
Return the scheduled time of a run by job ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
ID of the job. |
required |
Returns:
Type | Description |
---|---|
datetime
|
The scheduled start time of the run. |
Source code in src/icon/server/data_access/repositories/job_run_repository.py
insert_run
staticmethod
¶
Insert a new job run and emit a creation event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run
|
JobRun
|
The job run instance to persist. |
required |
Returns:
Type | Description |
---|---|
JobRun
|
The persisted job run with generated fields populated. |
Source code in src/icon/server/data_access/repositories/job_run_repository.py
update_run_by_id
staticmethod
¶
update_run_by_id(
*,
run_id: int,
status: JobRunStatus,
log: str | None = None,
) -> JobRun
Update a job run by ID and emit an update event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id
|
int
|
The ID of the job run to update. |
required |
status
|
JobRunStatus
|
New status of the run. |
required |
log
|
str | None
|
Optional log message (e.g. failure reason). |
None
|
Returns:
Type | Description |
---|---|
JobRun
|
The updated job run. |
Source code in src/icon/server/data_access/repositories/job_run_repository.py
job_run_cancelled_or_failed
¶
Check if a job’s run was cancelled or failed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id
|
int
|
ID of the job whose run should be checked. |
required |
Returns:
Type | Description |
---|---|
bool
|
True if the run status is CANCELLED or FAILED, False otherwise. |
Source code in src/icon/server/data_access/repositories/job_run_repository.py
parameters_repository
¶
NotInitialisedError
¶
Bases: Exception
Raised when repository methods are called before initialization.
ParametersRepository
¶
Repository for parameter values and metadata.
Provides methods to read and update shared parameter state (via a
multiprocessing.Manager
dict) and to persist/retrieve parameters from InfluxDB.
Emits Socket.IO events on updates.
get_influxdb_parameter_by_id
staticmethod
¶
get_influxdb_parameter_by_id(
parameter_id: str,
) -> DatabaseValueType | None
Return a single parameter value from InfluxDB.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameter_id
|
str
|
ID of the parameter. |
required |
Returns:
Type | Description |
---|---|
DatabaseValueType | None
|
The parameter value, or None if not found. |
Source code in src/icon/server/data_access/repositories/parameters_repository.py
get_influxdb_parameter_keys
staticmethod
¶
Return all parameter field keys from InfluxDB v1.
Source code in src/icon/server/data_access/repositories/parameters_repository.py
get_influxdb_parameters
staticmethod
¶
get_influxdb_parameters(
*,
before: str | None = None,
namespace: str | None = None,
) -> dict[str, DatabaseValueType]
Return the latest parameter values from InfluxDB.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
before
|
str | None
|
Optional ISO timestamp to query parameters before. |
None
|
namespace
|
str | None
|
Optional namespace filter. |
None
|
Returns:
Type | Description |
---|---|
dict[str, DatabaseValueType]
|
Mapping of parameter IDs to values. |
Source code in src/icon/server/data_access/repositories/parameters_repository.py
get_shared_parameter_by_id
classmethod
¶
get_shared_parameter_by_id(
*, parameter_id: str
) -> DatabaseValueType | None
Return a single parameter value from shared state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameter_id
|
str
|
ID of the parameter. |
required |
Returns:
Type | Description |
---|---|
DatabaseValueType | None
|
The parameter value, or None if not set. |
Source code in src/icon/server/data_access/repositories/parameters_repository.py
get_shared_parameters
classmethod
¶
get_shared_parameters() -> DictProxy[
str, DatabaseValueType
]
Return the full shared parameter dictionary.
Returns:
Type | Description |
---|---|
DictProxy[str, DatabaseValueType]
|
Proxy dictionary of parameters. |
Source code in src/icon/server/data_access/repositories/parameters_repository.py
initialize
classmethod
¶
initialize(
*, shared_parameters: DictProxy[str, DatabaseValueType]
) -> None
Initialize the repository with a shared parameters dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shared_parameters
|
DictProxy[str, DatabaseValueType]
|
Proxy dictionary used to store shared state. |
required |
Source code in src/icon/server/data_access/repositories/parameters_repository.py
update_parameters
classmethod
¶
Update parameters in both shared state and InfluxDB.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameter_mapping
|
dict[str, DatabaseValueType]
|
Mapping of parameter IDs to values. |
required |
Source code in src/icon/server/data_access/repositories/parameters_repository.py
get_specifiers_from_parameter_identifier
¶
Extract specifiers from a parameter identifier string.
Parameter identifiers encode metadata as key='value'
pairs. This helper parses
them into a dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameter_identifier
|
str
|
Identifier string to parse. |
required |
Returns:
Type | Description |
---|---|
dict[str, str]
|
Mapping of specifier keys to values. |
Source code in src/icon/server/data_access/repositories/parameters_repository.py
pycrystal_library_repository
¶
ParameterMetadataDict
module-attribute
¶
ParameterMetadataDict = TypedDict(
"ParameterMetadataDict",
{
"all parameters": dict[str, ParameterMetadata],
"display groups": dict[
str, dict[str, ParameterMetadata]
],
},
)
Dictionary of parameter metadata.
ParameterAndExperimentMetadata
¶
Bases: TypedDict
Combined metadata for experiments and parameters.
experiment_metadata
instance-attribute
¶
experiment_metadata: ExperimentDict
Dictionary mapping the unique experiment identifier to its metadata.
parameter_metadata
instance-attribute
¶
parameter_metadata: ParameterMetadataDict
Dictionary of parameter metadata.
PycrystalLibraryRepository
¶
Repository for interacting with the pycrystal
experiment library.
Provides methods to fetch experiment and parameter metadata and to generate sequences by executing helper scripts inside the experiment library’s virtual environment.
generate_json_sequence
async
staticmethod
¶
generate_json_sequence(
*,
exp_module_name: str,
exp_instance_name: str,
parameter_dict: dict[str, DatabaseValueType],
) -> str
Generate a JSON sequence for an experiment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exp_module_name
|
str
|
Module name of the experiment. |
required |
exp_instance_name
|
str
|
Name of the experiment instance. |
required |
parameter_dict
|
dict[str, DatabaseValueType]
|
Mapping of parameter IDs to values. |
required |
Returns:
Type | Description |
---|---|
str
|
JSON string containing the generated sequence. |
Source code in src/icon/server/data_access/repositories/pycrystal_library_repository.py
get_experiment_and_parameter_metadata
async
staticmethod
¶
get_experiment_and_parameter_metadata() -> (
ParameterAndExperimentMetadata
)
Fetch the experiment and parameter metadata.
Returns:
Type | Description |
---|---|
ParameterAndExperimentMetadata
|
Dictionary with experiment metadata and parameter metadata. |
Source code in src/icon/server/data_access/repositories/pycrystal_library_repository.py
get_experiment_readout_metadata
async
staticmethod
¶
get_experiment_readout_metadata(
*,
exp_module_name: str,
exp_instance_name: str,
parameter_dict: dict[str, DatabaseValueType],
) -> ReadoutMetadata
Fetch readout metadata for an experiment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exp_module_name
|
str
|
Module name of the experiment. |
required |
exp_instance_name
|
str
|
Name of the experiment instance. |
required |
parameter_dict
|
dict[str, DatabaseValueType]
|
Mapping of parameter IDs to values. |
required |
Returns:
Type | Description |
---|---|
ReadoutMetadata
|
Dictionary containing readout metadata for the experiment. |
Source code in src/icon/server/data_access/repositories/pycrystal_library_repository.py
icon.server.hardware_processing
¶
Modules:
Name | Description |
---|---|
hardware_controller |
|
task |
|
worker |
|
hardware_controller
¶
Classes:
Name | Description |
---|---|
HardwareController |
|
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
HardwareController
¶
HardwareController(connect: bool = True)
Methods:
Name | Description |
---|---|
connect |
|
run |
|
Attributes:
Name | Type | Description |
---|---|---|
connected |
bool
|
|
Source code in src/icon/server/hardware_processing/hardware_controller.py
connect
¶
Source code in src/icon/server/hardware_processing/hardware_controller.py
run
¶
run(*, sequence: str, number_of_shots: int) -> ResultDict
Source code in src/icon/server/hardware_processing/hardware_controller.py
task
¶
Classes:
Name | Description |
---|---|
HardwareProcessingTask |
|
HardwareProcessingTask
¶
Bases: BaseModel
Methods:
Name | Description |
---|---|
__lt__ |
|
Attributes:
Name | Type | Description |
---|---|---|
created |
datetime
|
|
data_point_index |
int
|
|
data_points_to_process |
Queue[tuple[int, dict[str, DatabaseValueType]]]
|
|
global_parameter_timestamp |
datetime
|
|
model_config |
|
|
pre_processing_task |
PreProcessingTask
|
|
priority |
int
|
|
processed_data_points |
Queue[HardwareProcessingTask]
|
|
scanned_params |
dict[str, DatabaseValueType]
|
|
sequence_json |
str
|
|
src_dir |
str
|
|
data_points_to_process
instance-attribute
¶
model_config
class-attribute
instance-attribute
¶
model_config = ConfigDict(arbitrary_types_allowed=True)
__lt__
¶
__lt__(other: HardwareProcessingTask) -> bool
worker
¶
Classes:
Name | Description |
---|---|
HardwareProcessingWorker |
|
Functions:
Name | Description |
---|---|
parse_parameter_id |
Parses a parameter ID string into a device name and variable ID. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
|
timezone |
|
HardwareProcessingWorker
¶
HardwareProcessingWorker(
hardware_processing_queue: PriorityQueue[
HardwareProcessingTask
],
post_processing_queue: PriorityQueue[
PostProcessingTask
],
manager: SharedResourceManager,
)
Bases: Process
Methods:
Name | Description |
---|---|
run |
|
Source code in src/icon/server/hardware_processing/worker.py
run
¶
Source code in src/icon/server/hardware_processing/worker.py
parse_parameter_id
¶
Parses a parameter ID string into a device name and variable ID.
If the input string is in the format “Device(device_name) variable_id”, the device name and variable ID are returned as a tuple.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
param_id
|
str
|
The parameter identifier string. |
required |
Returns:
Type | Description |
---|---|
str | None
|
A tuple (device_name, variable_id). If the input does not match the expected |
str
|
format, device_name is None and the entire param_id is returned as the |
tuple[str | None, str]
|
variable_id. |
Examples:
Source code in src/icon/server/hardware_processing/worker.py
icon.server.post_processing
¶
Modules:
Name | Description |
---|---|
task |
|
worker |
|
task
¶
Classes:
Name | Description |
---|---|
PostProcessingTask |
|
PostProcessingTask
¶
Bases: BaseModel
Methods:
Name | Description |
---|---|
__lt__ |
|
Attributes:
Name | Type | Description |
---|---|---|
created |
datetime
|
|
data_point |
ExperimentDataPoint
|
|
pre_processing_task |
PreProcessingTask
|
|
priority |
int
|
|
src_dir |
str
|
|
__lt__
¶
__lt__(other: PostProcessingTask) -> bool
worker
¶
Classes:
Name | Description |
---|---|
PostProcessingWorker |
|
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
PostProcessingWorker
¶
PostProcessingWorker(
post_processing_queue: PriorityQueue[
PostProcessingTask
],
)
Bases: Process
Methods:
Name | Description |
---|---|
run |
|
Source code in src/icon/server/post_processing/worker.py
run
¶
Source code in src/icon/server/post_processing/worker.py
icon.server.pre_processing
¶
Modules:
Name | Description |
---|---|
task |
|
worker |
|
task
¶
Classes:
Name | Description |
---|---|
PreProcessingTask |
|
PreProcessingTask
¶
Bases: BaseModel
Methods:
Name | Description |
---|---|
__lt__ |
|
Attributes:
Name | Type | Description |
---|---|---|
auto_calibration |
bool
|
|
debug_mode |
bool
|
|
git_commit_hash |
str | None
|
|
job |
Job
|
|
job_run |
JobRun
|
|
local_parameters_timestamp |
str
|
|
model_config |
|
|
priority |
int
|
|
repetitions |
int
|
|
scan_parameters |
list[ScanParameter]
|
|
model_config
class-attribute
instance-attribute
¶
model_config = ConfigDict(arbitrary_types_allowed=True)
__lt__
¶
__lt__(other: PreProcessingTask) -> bool
worker
¶
Classes:
Name | Description |
---|---|
ParamUpdateMode |
|
PreProcessingWorker |
|
Functions:
Name | Description |
---|---|
change_process_priority |
Changes process priority. Only superusers can decrease the niceness of a |
get_scan_combinations |
Generates all combinations of scan parameters for a given job. Repeats each |
parse_experiment_identifier |
Parses an experiment identifier and returns: |
prepare_experiment_library_folder |
|
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
|
timezone |
|
ParamUpdateMode
¶
Attributes:
Name | Type | Description |
---|---|---|
ALL_FROM_TIMESTAMP |
|
|
ALL_UP_TO_DATE |
|
|
LOCALS_FROM_TS_GLOBALS_LATEST |
|
|
ONLY_NEW_PARAMETERS |
|
PreProcessingWorker
¶
PreProcessingWorker(
worker_number: int,
pre_processing_queue: PriorityQueue[PreProcessingTask],
update_queue: Queue[UpdateQueue],
hardware_processing_queue: PriorityQueue[
HardwareProcessingTask
],
manager: SharedResourceManager,
)
Bases: Process
Methods:
Name | Description |
---|---|
run |
|
Source code in src/icon/server/pre_processing/worker.py
run
¶
Source code in src/icon/server/pre_processing/worker.py
change_process_priority
¶
change_process_priority(priority: int) -> None
Changes process priority. Only superusers can decrease the niceness of a process.
get_scan_combinations
¶
Generates all combinations of scan parameters for a given job. Repeats each
combination job.repetitions
times.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job
|
Job
|
The job containing scan parameters. |
required |
Returns:
Type | Description |
---|---|
list[dict[str, DatabaseValueType]]
|
A list of dictionaries, where each dictionary represents a combination of |
list[dict[str, DatabaseValueType]]
|
parameter values. |
Source code in src/icon/server/pre_processing/worker.py
parse_experiment_identifier
¶
Parses an experiment identifier and returns: - the module path (e.g. ‘experiment_library.experiments.exp_name’) - the experiment class name (e.g. ‘ClassName’) - the experiment instance name (e.g. ‘Instance name’)
Example
“experiment_library.experiments.exp_name.ClassName (Instance name)” -> (“experiment_library.experiments.exp_name”, “ClassName”, “Instance name”)
Source code in src/icon/server/pre_processing/worker.py
prepare_experiment_library_folder
¶
prepare_experiment_library_folder(
src_dir: str, pre_processing_task: PreProcessingTask
) -> None
Source code in src/icon/server/pre_processing/worker.py
icon.server.scheduler
¶
Modules:
Name | Description |
---|---|
scheduler |
|
scheduler
¶
Classes:
Name | Description |
---|---|
Scheduler |
|
Functions:
Name | Description |
---|---|
initialise_job_tables |
|
should_exit |
|
Scheduler
¶
Scheduler(
pre_processing_queue: PriorityQueue[PreProcessingTask],
**kwargs: Any,
)
Bases: Process
Methods:
Name | Description |
---|---|
run |
|
Attributes:
Name | Type | Description |
---|---|---|
kwargs |
|
Source code in src/icon/server/scheduler/scheduler.py
run
¶
Source code in src/icon/server/scheduler/scheduler.py
initialise_job_tables
¶
Source code in src/icon/server/scheduler/scheduler.py
icon.server.utils.types
¶
Classes:
Name | Description |
---|---|
UpdateQueue |
|
UpdateQueue
¶
Bases: TypedDict
Attributes:
Name | Type | Description |
---|---|---|
event |
Literal['update_parameters', 'calibration']
|
|
job_id |
NotRequired[int | None]
|
|
new_parameters |
NotRequired[dict[str, DatabaseValueType]]
|
|
icon.server.web_server
¶
Modules:
Name | Description |
---|---|
icon_server |
|
sio_setup |
|
socketio_emit_queue |
|
icon_server
¶
Classes:
Name | Description |
---|---|
IconServer |
|
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
IconServer
¶
Bases: Server
Methods:
Name | Description |
---|---|
post_startup |
|
post_startup
async
¶
Source code in src/icon/server/web_server/icon_server.py
sio_setup
¶
Classes:
Name | Description |
---|---|
AsyncServer |
|
Functions:
Name | Description |
---|---|
patch_sio_setup |
|
setup_sio_events |
|
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
|
pydase_setup_sio_events |
|
AsyncServer
¶
Bases: AsyncServer
Attributes:
Name | Type | Description |
---|---|---|
controlling_sid |
str | None
|
Socketio SID of the client controlling the frontend. |
patch_sio_setup
¶
setup_sio_events
¶
setup_sio_events(
sio: AsyncServer, state_manager: StateManager
) -> None