Real-Time Vision for Mobile Robotics: Low-Latency H.264 Streaming
RAiV was designed to be developer friendly. Probably the easiest function of RAiV to use is H.264 streaming. RAiV includes a hardware H.264 encoder. Moreover, for low latency and flexibility it uses WebSocket protocol during the transmission. Here, we will show how you can control the H.264 stream.
H264 Streaming in 22 lines
There is only one function to call setEncoderStatus() for controlling H264 stream of RAiV. Rest of the process is handled under the hood.
You can find this example in our Github Repository with all the necessary modules. Please download the example code from the github repository and upload it to RAiV via the web interface.
# For command listener
import qCU_Net
# For H.264 streamer
import qCU_Stream
def main():
# Start TCP server for listening on streaming commands
qCU_Net.start_tcp_server(host='192.168.10.55', port=12345, json_handler=enc_json_handler)
# Incomming message payload parser
def enc_json_handler(client_socket, json_payload, client_address):
# Check live streaming request
if isinstance(json_payload, dict):
if 'live_isStart' in json_payload and 'live_isStereo' in json_payload:
isStart = json_payload['live_isStart']
isStereo = json_payload['live_isStereo']
print(f"Live setting start: {isStart} stereo: {isStereo}")
qCU_Stream.setEncoderStatus(isStart, isStereo)
if __name__ == "__main__":
main()
Live Action
Now, on the PC side start the code from our Github repository to control the H.264 stream remotely.
H264 Streaming with Auto Exposure Gain
Automatic exposure and gain adjustment is a little bit more complex than H.264 streaming. Due to RAiV can be used in various different situations, we left the exposure gain adjustment algorithm to users. So, users can select and/or implement the most suitable algorithm to their use cases.
You can find this example in our Github Repository with all the necessary modules. Please download the example code from the github repository and upload it to RAiV via the web interface.
# For command listener
import qCU_Net
# For H.264 streamer
import qCU_Stream
# For camera control
from qCU_CCtrl import qCU_CCtrl
# For accessing data pipeline
from qCU_Data import qCUData
# For calculating exposure and gain values
from autoExpoGain import AutoExposureGain
# For image colorspace and scale modification
import cv2
def get_latest_frame():
if gQCUData is None:
raise RuntimeError("Data interface was not initialized")
# Get current frame
frame = gQCUData.getDataFrame()
if frame is None:
return None
# Convert right image from RGB to Grayscale
gray_frame = cv2.cvtColor(frame[1], cv2.COLOR_RGB2GRAY)
# Resize the grayscale image to FRAME_WIDTH and FRAME_HEIGHT//2
[monoWidth, monoHeight] = gray_frame.shape
resized_gray_frame = cv2.resize(gray_frame, (monoWidth, monoHeight//2))
return resized_gray_frame
def enc_json_handler(client_socket, json_payload, client_address):
# Check live streaming request
if isinstance(json_payload, dict):
if 'live_isStart' in json_payload and 'live_isStereo' in json_payload:
isStart = json_payload['live_isStart']
isStereo = json_payload['live_isStereo']
print(f"Live setting start: {isStart} stereo: {isStereo}")
qCU_Stream.setEncoderStatus(isStart, isStereo)
# Check camera control parameter request
if isinstance(json_payload, dict):
if 'get_camCtrl' in json_payload:
# Initialize the camera control object
cameraCtrl = qCU_CCtrl()
# Get camera control parameters
expo, gain, ret_code = cameraCtrl.get_expo_gain()
print(f"{expo} {gain} {ret_code}")
payload = {
"getCCtrl_expo": expo,
"getCCtrl_gain": gain
}
# Send camera control parameters to client
qCU_Net.send_response_to_client(client_socket, payload)
# Delete the camera control object
del cameraCtrl
# Check camera control parameter request
if isinstance(json_payload, dict):
if 'set_camCtrl' in json_payload:
curExpo = json_payload['expo']
curGain = json_payload['gain']
# Initialize the camera control object
cameraCtrl = qCU_CCtrl()
# Get camera control parameters
ret_code = cameraCtrl.set_expo_gain(curExpo, curGain)
print(f"{curExpo} {curGain} {ret_code}")
payload = {
"set_camCtrl_status": ret_code
}
# Send camera control parameters to client
qCU_Net.send_response_to_client(client_socket, payload)
# Delete the camera control object
del cameraCtrl
# Check camera control parameter request
if isinstance(json_payload, dict):
if 'auto_camCtrl' in json_payload:
isAutoCamCtrl = json_payload['auto_camCtrl']
autoPeriodMs = json_payload.get('auto_period_ms')
if isAutoCamCtrl == 1:
# Start
gAutoEG.start(data_func=get_latest_frame, interval_ms=autoPeriodMs)
else:
# Stop
gAutoEG.stop()
def main():
# Declara global variables
global gQCUData
global gAutoEG
# Create the data interface
gQCUData = qCUData()
if not gQCUData.init():
print("Failed to initialize shared memory")
return
# Initialize auto exposure gain with camera control parameters
tmpCCtrl = qCU_CCtrl()
camCtrlRanges = tmpCCtrl.get_cam_ctrl_ranges()
print(f"Camera control ranges: {camCtrlRanges}")
del tmpCCtrl
#
gAutoEG = AutoExposureGain(cctrl_ranges=camCtrlRanges, target_brightness=128, tolerance=10, adjustment_factor=0.5)
qCU_Net.start_tcp_server(host='192.168.10.55', port=12345, json_handler=enc_json_handler)
if __name__ == "__main__":
main()
Live Action
Now, on the PC side start the code from our Github repository to control the H.264 stream with auto exposure & gain.
What is Next?
Check our Python SDK:
RAiV Python SDKCheck our Github Repository For Sample Codes:
Our Github Repository