Skip to content

vision

Autoupload

Bases: Enum

Configuration options for image auto-upload behavior to the Learning Loop

ALL class-attribute instance-attribute

ALL = 'all'

Upload mode where every processed image is uploaded

DISABLED class-attribute instance-attribute

DISABLED = 'disabled'

Upload mode where no images are auto-uploaded

FILTERED class-attribute instance-attribute

FILTERED = 'filtered'

Upload mode for images with novel detections within uncertainty thresholds (default)

Events

Name Description
NEW_DETECTIONS detection on an image is completed (argument: image)

CameraProjector

CameraProjector(
    camera_provider: CalibratableCameraProvider,
    *,
    interval: float = 1.0
)

The camera projector computes a grid of projected image points on the ground plane.

It is mainly used for visualization purposes.

CameraProvider

CameraProvider()

Bases: Generic[T], Persistable

A camera provider holds a dictionary of cameras and manages additions and removals.

The camera dictionary should not be modified directly but by using the camera provider's methods. This way respective events are emitted and consistency can be taken care of.

The camera provider also creates an HTTP route to access camera images.

Events

Name Description
CAMERA_ADDED a new camera has been added (argument: camera)
CAMERA_REMOVED a camera has been removed (argument: camera id)
NEW_IMAGE a new image is available from any camera (argument: image)

ConfigurableCamera

ConfigurableCamera(**kwargs)

Bases: Camera

A generalized interface for adjusting camera parameters like exposure, brightness or fps.

Detector

Detector(*, name: str | None = None)

Bases: ABC

A detector allows detecting objects in images.

It also holds an upload queue for sending images with uncertain results to an active learning infrastructure like the Zauberzeug Learning Loop.

detect abstractmethod async

detect(
    image: Image,
    *,
    autoupload: Autoupload = Autoupload.FILTERED,
    tags: list[str] | None = None,
    source: str | None = None,
    creation_date: datetime | str | None = None
) -> Detections | None

Runs detections on the image and fills the image.detections property.

The parameters tags, source, and creation_date are added as metadata if the image is uploaded.

Note that the hardware detector uses a lazy strategy to schedule the inference tasks. In particular a queue with a maximum size of 1 is used. This means if the detector is busy, the image is not processed immediately, but queued up. If the detect function is called again, the queued image is dropped and the new image is queued instead. In this case this method returns None.

Returns:

Type Description
Detections | None

the detections found in the image.

Raises:

Type Description
DetectorException

if the detection fails.

fetch_detector_info abstractmethod async

fetch_detector_info() -> DetectorInfo

Retrieve information about the detector.

Returns:

Type Description
DetectorInfo

information about the detector.

Raises:

Type Description
DetectorException

if the about information cannot be retrieved.

fetch_model_version_info abstractmethod async

fetch_model_version_info() -> ModelVersioningInfo

Retrieve information about the model version and versioning mode.

Returns:

Type Description
ModelVersioningInfo

the information about the model versioning as data class.

Raises:

Type Description
DetectorException

if the detector is not connected or the information cannot be retrieved.

set_model_version abstractmethod async

set_model_version(
    version: Literal["follow_loop", "pause"] | str,
) -> None

Set the model version or versioning mode.

Set to "follow_loop" to automatically update the model version to the latest version in the learning loop. Set to "pause" to stop automatic updates and keep the current model version. Set to a version number (e.g. "1.2") to use a specific version.

Raises:

Type Description
DetectorException

if the version control mode is not valid or the version could not be set.

upload abstractmethod async

upload(
    image: Image,
    *,
    tags: list[str] | None = None,
    source: str | None = None,
    creation_date: datetime | str | None = None,
    annotations: Annotations | None = None
) -> None

Uploads an image (and its annotations) to the Learning Loop.

The parameters tags, source, and creation_date are added as metadata. If the image has detections, they are also uploaded.

Raises:

Type Description
DetectorException

if the upload fails.

Events

Name Description
NEW_DETECTIONS detection on an image is completed (argument: image)

DetectorHardware

DetectorHardware(
    *,
    host: str = "localhost",
    port: int = 8004,
    name: str | None = None,
    auto_disconnect: bool = True,
    sio_timeout: int = 5
)

Bases: Detector

This detector communicates with a zauberzeug detector node via SocketIO.

It automatically connects and reconnects, submits and receives detections and sends images that should be uploaded to the Zauberzeug Learning Loop.

Note: Images must be smaller than MAX_IMAGE_SIZE bytes (default: 64 MB).

soft_reload async

soft_reload() -> None

Trigger a soft reload of the detector.

Raises:

Type Description
DetectorException

if the communication fails.

DetectorSimulation

DetectorSimulation(
    camera_provider: CalibratableCameraProvider,
    *,
    noise: float = 1.0,
    detection_delay: float = 0.4,
    name: str | None = None
)

Bases: Detector

This detector simulates object detection.

It requires a camera provider in order to check visibility using the cameras' calibrations. Individual camera IDs can be added to a set of blocked_cameras to simulate occlusions during pytests. A list of simulated_objects can be filled to define what can be detected. An optional noise parameter controls the spatial accuracy in pixels. An optional detection_delay parameter simulates the time it takes to process an image.

ImageRecorder

ImageRecorder(
    camera_provider: CameraProvider, data_dir: Path
)

This module saves images from a CameraProvider to disk.

This module saves images from a CameraProvider to disk.

Parameters:

Name Type Description Default
camera_provider CameraProvider

the camera provider to record images from

required
data_dir Path

the directory to save images to. Images are saved in subdirectories

required

set_recording

set_recording(value: bool)

Enable or disable recording images from the camera provider.

MultiCameraProvider

MultiCameraProvider(*camera_providers: CameraProvider)

Bases: CameraProvider

A multi-camera provider combines multiple camera providers into one.

This is useful if another module requires a single camera provider but the robot has multiple camera sources like USB and WiFi cameras.

ReplayCameraProvider

ReplayCameraProvider(
    replay_folder: Path, replay_interval: float = 0.01
)

Bases: CameraProvider[ReplayCamera]

This module collects and simulates cameras by looping over images in a drive.

NOTE: The first image of each camera will be emitted at any time before the timestamp of the second image. Therefore, setting the time to 0.0 will cause all cameras to emit their first image.

Parameters:

Name Type Description Default
replay_folder Path

path to the root replay folder containing the camera folders

required
replay_interval float

the interval at which the provider checks for new images (in seconds)

0.01

create_ui

create_ui(
    *,
    skip_time: float = 2.0,
    time_label_format: str = "%d.%m.%Y %H:%M:%S"
) -> None

Create a simple UI for controlling the replay.

Parameters:

Name Type Description Default
skip_time float

the amount of time to skip when pressing the skip buttons (in seconds)

2.0
time_label_format str

the format to be used for the current time label (used by datetime.strftime)

'%d.%m.%Y %H:%M:%S'

jump_to

jump_to(percent: float) -> None

Jump to a specific point in the replay.

Parameters:

Name Type Description Default
percent float

a value between 0.0 and 100.0, where 0.0 is the start and 100.0 is the end

required

jump_to_time

jump_to_time(time: float) -> None

Jump to a specific time in the replay.

Parameters:

Name Type Description Default
time float

time in seconds

required

set_speed

set_speed(speed: float) -> None

Set the playback speed.

1.0 is real-time, 2.0 is double speed, 0.5 is half speed. You may use negative values to play backwards.

RtspCameraProvider

RtspCameraProvider(
    *,
    frame_rate: int = 6,
    substream: int = 0,
    avdec: Literal["h264", "h265"] = "h264",
    network_interface: str | None = None,
    auto_scan: bool = True
)

Bases: CameraProvider[RtspCamera]

This module collects and provides real RTSP streaming cameras.

SimulatedCameraProvider

SimulatedCameraProvider(
    *,
    simulate_failing: bool = False,
    auto_scan: bool = True
)

Bases: CameraProvider[SimulatedCamera]

This module collects and simulates cameras and generates synthetic images.

In the current implementation the images only contain the camera ID and the current time.

scan_for_cameras async

scan_for_cameras() -> AsyncGenerator[str, Any]

Simulated device discovery by returning all camera's IDs.

If simulate_device_failure is set, disconnected cameras are returned with a fixed probability.

UsbCameraProvider

UsbCameraProvider(*, auto_scan: bool = True)

Bases: CameraProvider[UsbCamera]

This module collects and provides real USB cameras.

Camera devices are discovered through video4linux (v4l) and accessed with openCV. Therefore the program v4l2ctl and openCV (including python bindings) must be available.

camera_objects

camera_objects(
    camera_provider: CalibratableCameraProvider,
    camera_projector: CameraProjector,
    *,
    px_per_m: float = 10000,
    debug: bool = False,
    interval: float = 1.0
)

Bases: Group

This module provides a UI element for displaying cameras in a 3D scene.

It requires a camera provider as a source of cameras as well as a camera projector to show the current images projected on the ground plane. The px_per_m argument can be used to scale the camera frustums. With debug=True camera IDs are shown (default: False).