Skip to content

Cameras

RoSys provides instant camera access for object detection, remote operation and similar use cases.

Setup

USB camera devices are discovered through video4linux (v4l) and accessed with openCV. Therefore the program v4l2ctl and openCV (including python bindings) must be available. We recommend to use the RoSys Docker image which provides the full required software stack. Make sure the container can access the USB devices by starting it with --privileged or explicitly passing the specific --devices.

Show Captured Images

Using rosys.ui you can show the latest captured images from each camera:

#!/usr/bin/env python3
from nicegui import ui

from rosys.vision import SimulatedCamera

camera = SimulatedCamera(id='test_cam', width=800, height=600)

image = ui.interactive_image()
ui.timer(0.3, lambda: image.set_source(camera.get_latest_image_url()))

ui.run(title='RoSys')

The ui.timer regularly updates the source property of the ui.image. The cameras latest_image_uri property provides the URI to the latest captured image.

This example uses a SimulatedCamera for demonstration. You can directly replace the camera with a UsbCamera or RtspCamera if you know their ids or use their respective providers to discover them automatically.

Remote Operation

A fairly often required use case on real mobile robots is the remote operation. In a simple use case you may only need to visualize one camera and have some steering controls. Here we use the NEW_CAMERA event to display the first camera to control real Hardware:

#!/usr/bin/env python3
from nicegui import ui

import rosys

if rosys.hardware.SerialCommunication.is_possible():
    communication = rosys.hardware.SerialCommunication()
    robot_brain = rosys.hardware.RobotBrain(communication)
    can = rosys.hardware.CanHardware(robot_brain)
    wheels = rosys.hardware.WheelsHardware(robot_brain, can=can)
    robot = rosys.hardware.RobotHardware([can, wheels], robot_brain)
    camera_provider = rosys.vision.UsbCameraProvider()
else:
    wheels = rosys.hardware.WheelsSimulation()
    robot = rosys.hardware.RobotSimulation([wheels])
    rosys.vision.SimulatedCameraProvider.USE_PERSISTENCE = False
    camera_provider = rosys.vision.SimulatedCameraProvider()
    camera = rosys.vision.SimulatedCamera(id='test_cam', width=800, height=600)
    rosys.on_startup(lambda: camera_provider.add_camera(camera))
steerer = rosys.driving.Steerer(wheels)
odometer = rosys.driving.Odometer(wheels)


async def add_main_camera(camera: rosys.vision.Camera) -> None:
    camera_card.clear()  # remove "seeking camera" label
    with camera_card:
        main_cam = ui.interactive_image()
        ui.timer(0.1, lambda: main_cam.set_source(camera.get_latest_image_url()))

camera_provider.CAMERA_ADDED.register_ui(add_main_camera)

with ui.card().tight().style('width:30em') as camera_card:
    ui.label('seeking main camera').classes('m-8 text-center')

with ui.card().tight().style('width:30em'):
    with ui.row():
        with ui.card().tight():
            rosys.driving.joystick(steerer)
            rosys.driving.keyboard_control(steerer)
        ui.markdown('steer with joystick on the left or<br />SHIFT + arrow keys') \
            .classes('m-8 text-center')

ui.run(title='RoSys')

By adding a Joystick and KeyboardControl the robot is ready to go for remote operation.

Controlling the Camera

The following example creates a web interface for controlling multiple camera types. It displays cameras in a grid, showing their live feeds along with controls to connect/disconnect and adjust settings like FPS, quality, exposure, and color. The demo supports RTSP, MJPEG, USB, and simulated cameras. It automatically updates every 0.1 seconds to detect and display new cameras, and initializes with one simulated camera.

#!/usr/bin/env python3
import logging

from nicegui import ui

import rosys.vision


def add_card(camera: rosys.vision.Camera, container: ui.element) -> None:
    uid = camera.id
    if uid not in streams:
        with container:
            camera_card = ui.card().tight()
            camera_cards[uid] = camera_card
            print(f'adding card for {uid}')
            with camera_grid:
                with camera_card:
                    streams[uid] = ui.interactive_image()
                    ui.label(uid).classes('m-2')
                    with ui.row():
                        ui.button('disconnect', on_click=camera.disconnect) \
                            .bind_enabled_from(camera, 'is_connected')
                    if isinstance(camera, rosys.vision.ConfigurableCamera):
                        create_camera_settings_panel(camera)

    streams[uid].set_source(camera.get_latest_image_url())


def create_camera_settings_panel(camera: rosys.vision.ConfigurableCamera) -> None:
    camera_parameters = camera.get_capabilities()
    parameter_names = [parameter.name for parameter in camera_parameters]
    with ui.expansion('Settings').classes('w-full') \
            .bind_enabled_from(camera, 'is_connected'):
        if isinstance(camera, rosys.vision.RtspCamera):
            ui.label('URL') \
                .bind_text_from(camera, 'url',
                                backward=lambda x: x or 'URL not available')
        if 'fps' in parameter_names:
            with ui.card(), ui.row():
                ui.label('FPS:')
                ui.select(options=list(range(1, 31)),
                          on_change=lambda e: camera.set_parameters({'fps': e.value})) \
                    .bind_value_from(camera, 'parameters',
                                     backward=lambda params: params['fps'])
        if 'substream' in parameter_names:
            with ui.card():
                ui.switch('High Quality',
                          on_change=lambda e: camera.set_parameters({'substream': 0 if e.value else 1}))
        if 'exposure' in parameter_names:
            with ui.card(), ui.row():
                ui.label('Exposure:')
                ui.select(options=list(range(0, 255)),
                          on_change=lambda e: camera.set_parameters({'exposure': e.value})) \
                    .bind_value_from(camera, 'parameters',
                                     backward=lambda params: params['exposure'])
        if 'auto_exposure' in parameter_names:
            with ui.card():
                ui.switch('Auto Exposure',
                          on_change=lambda e: camera.set_parameters({'auto_exposure': e.value})) \
                    .bind_value_from(camera, 'parameters',
                                     backward=lambda params: params['auto_exposure'])
        if 'color' in parameter_names:
            with ui.card(), ui.row():
                ui.label('Color:')
                with ui.button(icon='colorize'):
                    ui.color_picker(on_pick=lambda e: camera.set_parameters({'color': e.color}))


def update_camera_cards() -> None:
    providers: list[rosys.vision.CameraProvider] = [
        rtsp_camera_provider,
        mjpeg_camera_provider,
        usb_camera_provider,
        simulated_camera_provider,
    ]
    for provider in providers:
        for camera in provider.cameras.values():
            add_card(camera, camera_grid)


logging.basicConfig(level=logging.INFO)
streams: dict[str, ui.interactive_image] = {}
camera_cards: dict[str, ui.card] = {}
camera_grid = ui.row()

rtsp_camera_provider = rosys.vision.RtspCameraProvider()
mjpeg_camera_provider = rosys.vision.MjpegCameraProvider()
usb_camera_provider = rosys.vision.UsbCameraProvider()
simulated_camera_provider = rosys.vision.SimulatedCameraProvider()

ui.timer(0.1, update_camera_cards)

simulated_camera_provider.add_cameras(1)

ui.run(title='RoSys', port=8080)

Streaming RTSP Cameras

The following example shows how to stream images from an RTSP camera.

#!/usr/bin/env python3
from nicegui import ui

import rosys

camera_provider = rosys.vision.RtspCameraProvider()


def refresh() -> None:
    for uid, camera in camera_provider.cameras.items():
        if uid not in streams:
            with camera_grid:
                with ui.card().tight():
                    streams[uid] = ui.interactive_image()
                    ui.label(uid).classes('m-2')
        streams[uid].set_source(camera.get_latest_image_url())


streams: dict[str, ui.interactive_image] = {}
camera_grid = ui.row()
ui.timer(0.01, refresh)

ui.run(title='RoSys')