Cameras¶
RoSys provides instant camera access for object detection, remote operation and similar use cases.
Setup¶
USB camera devices are discovered through video4linux (v4l) and accessed with openCV.
Therefore the program v4l2ctl
and openCV (including python bindings) must be available.
We recommend to use the RoSys Docker image which provides the full required software stack.
Make sure the container can access the USB devices by starting it with --privileged
or explicitly passing the specific --device
s.
Show Captured Images¶
Using rosys.ui
you can show the latest captured images from each camera:
#!/usr/bin/env python3
from nicegui import ui
from rosys.vision import SimulatedCamera
camera = SimulatedCamera(id='test_cam', width=800, height=600)
image = ui.interactive_image()
ui.timer(0.3, lambda: image.set_source(camera.get_latest_image_url()))
ui.run(title='RoSys')
The ui.timer
regularly updates the source property of the ui.image
.
The cameras latest_image_uri
property provides the URI to the latest captured image.
This example uses a SimulatedCamera
for demonstration.
You can directly replace the camera with a UsbCamera
or RtspCamera
if you know their ids
or use their respective providers to discover them automatically.
Remote Operation¶
A fairly often required use case on real mobile robots is the remote operation.
In a simple use case you may only need to visualize one camera and have some steering controls.
Here we use the NEW_CAMERA
event to display the first camera to control real Hardware:
#!/usr/bin/env python3
from nicegui import ui
import rosys
if rosys.hardware.SerialCommunication.is_possible():
communication = rosys.hardware.SerialCommunication()
robot_brain = rosys.hardware.RobotBrain(communication)
can = rosys.hardware.CanHardware(robot_brain)
wheels = rosys.hardware.WheelsHardware(robot_brain, can=can)
robot = rosys.hardware.RobotHardware([can, wheels], robot_brain)
camera_provider = rosys.vision.UsbCameraProvider()
else:
wheels = rosys.hardware.WheelsSimulation()
robot = rosys.hardware.RobotSimulation([wheels])
rosys.vision.SimulatedCameraProvider.USE_PERSISTENCE = False
camera_provider = rosys.vision.SimulatedCameraProvider()
camera = rosys.vision.SimulatedCamera(id='test_cam', width=800, height=600)
rosys.on_startup(lambda: camera_provider.add_camera(camera))
steerer = rosys.driving.Steerer(wheels)
odometer = rosys.driving.Odometer(wheels)
async def add_main_camera(camera: rosys.vision.Camera) -> None:
camera_card.clear() # remove "seeking camera" label
with camera_card:
main_cam = ui.interactive_image()
ui.timer(0.1, lambda: main_cam.set_source(camera.get_latest_image_url()))
camera_provider.CAMERA_ADDED.register_ui(add_main_camera)
with ui.card().tight().style('width:30em') as camera_card:
ui.label('seeking main camera').classes('m-8 text-center')
with ui.card().tight().style('width:30em'):
with ui.row():
with ui.card().tight():
rosys.driving.joystick(steerer)
rosys.driving.keyboard_control(steerer)
ui.markdown('steer with joystick on the left or<br />SHIFT + arrow keys') \
.classes('m-8 text-center')
ui.run(title='RoSys')
By adding a Joystick
and KeyboardControl
the robot is ready to go for remote operation.
Controlling the Camera¶
The following example creates a web interface for controlling multiple camera types. It displays cameras in a grid, showing their live feeds along with controls to connect/disconnect and adjust settings like FPS, quality, exposure, and color. The demo supports RTSP, MJPEG, USB, and simulated cameras. It automatically updates every 0.1 seconds to detect and display new cameras, and initializes with one simulated camera.
#!/usr/bin/env python3
import logging
from nicegui import ui
import rosys.vision
def add_card(camera: rosys.vision.Camera, container: ui.element) -> None:
uid = camera.id
if uid not in streams:
with container:
camera_card = ui.card().tight()
camera_cards[uid] = camera_card
print(f'adding card for {uid}')
with camera_grid:
with camera_card:
streams[uid] = ui.interactive_image()
ui.label(uid).classes('m-2')
with ui.row():
ui.button('disconnect', on_click=camera.disconnect) \
.bind_enabled_from(camera, 'is_connected')
if isinstance(camera, rosys.vision.ConfigurableCamera):
create_camera_settings_panel(camera)
streams[uid].set_source(camera.get_latest_image_url())
def create_camera_settings_panel(camera: rosys.vision.ConfigurableCamera) -> None:
camera_parameters = camera.get_capabilities()
parameter_names = [parameter.name for parameter in camera_parameters]
with ui.expansion('Settings').classes('w-full') \
.bind_enabled_from(camera, 'is_connected'):
if isinstance(camera, rosys.vision.RtspCamera):
ui.label('URL') \
.bind_text_from(camera, 'url',
backward=lambda x: x or 'URL not available')
if 'fps' in parameter_names:
with ui.card(), ui.row():
ui.label('FPS:')
ui.select(options=list(range(1, 31)),
on_change=lambda e: camera.set_parameters({'fps': e.value})) \
.bind_value_from(camera, 'parameters',
backward=lambda params: params['fps'])
if 'substream' in parameter_names:
with ui.card():
ui.switch('High Quality',
on_change=lambda e: camera.set_parameters({'substream': 0 if e.value else 1}))
if 'exposure' in parameter_names:
with ui.card(), ui.row():
ui.label('Exposure:')
ui.select(options=list(range(0, 255)),
on_change=lambda e: camera.set_parameters({'exposure': e.value})) \
.bind_value_from(camera, 'parameters',
backward=lambda params: params['exposure'])
if 'auto_exposure' in parameter_names:
with ui.card():
ui.switch('Auto Exposure',
on_change=lambda e: camera.set_parameters({'auto_exposure': e.value})) \
.bind_value_from(camera, 'parameters',
backward=lambda params: params['auto_exposure'])
if 'color' in parameter_names:
with ui.card(), ui.row():
ui.label('Color:')
with ui.button(icon='colorize'):
ui.color_picker(on_pick=lambda e: camera.set_parameters({'color': e.color}))
def update_camera_cards() -> None:
providers: list[rosys.vision.CameraProvider] = [
rtsp_camera_provider,
mjpeg_camera_provider,
usb_camera_provider,
simulated_camera_provider,
]
for provider in providers:
for camera in provider.cameras.values():
add_card(camera, camera_grid)
logging.basicConfig(level=logging.INFO)
streams: dict[str, ui.interactive_image] = {}
camera_cards: dict[str, ui.card] = {}
camera_grid = ui.row()
rtsp_camera_provider = rosys.vision.RtspCameraProvider()
mjpeg_camera_provider = rosys.vision.MjpegCameraProvider()
usb_camera_provider = rosys.vision.UsbCameraProvider()
simulated_camera_provider = rosys.vision.SimulatedCameraProvider()
ui.timer(0.1, update_camera_cards)
simulated_camera_provider.add_cameras(1)
ui.run(title='RoSys', port=8080)
Streaming RTSP Cameras¶
The following example shows how to stream images from an RTSP camera.
#!/usr/bin/env python3
from nicegui import ui
import rosys
camera_provider = rosys.vision.RtspCameraProvider()
def refresh() -> None:
for uid, camera in camera_provider.cameras.items():
if uid not in streams:
with camera_grid:
with ui.card().tight():
streams[uid] = ui.interactive_image()
ui.label(uid).classes('m-2')
streams[uid].set_source(camera.get_latest_image_url())
streams: dict[str, ui.interactive_image] = {}
camera_grid = ui.row()
ui.timer(0.01, refresh)
ui.run(title='RoSys')