Skip to content
Open
296 changes: 202 additions & 94 deletions examples/rtspDeviceShifu/camera.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
import cv2, logging, os, requests, time
from flask import Flask, Response
import cv2, logging, os, requests, time, threading, queue, gc
from flask import Flask, Response, jsonify
from requests.auth import HTTPDigestAuth, HTTPBasicAuth
from threading import Thread

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)

# Camera settings
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 360
CAMERA_FPS = 15
JPEG_QUALITY = 80
BUFFER_SIZE = 1
MAX_CLIENTS = 5

# Environment variables
ip = os.environ.get("IP_CAMERA_ADDRESS")
http_port = os.environ.get("IP_CAMERA_HTTP_PORT")
rtsp_port = os.environ.get("IP_CAMERA_RTSP_PORT")
CAMERA_USERNAME = os.environ.get("IP_CAMERA_USERNAME")
CAMERA_PASSWORD = os.environ.get("IP_CAMERA_PASSWORD")
port = os.environ.get("IP_CAMERA_CONTAINER_PORT")

# Camera control constants
CAMERA_CTRL_MOVE_UP = '<?xml version="1.0" encoding="UTF-8"?><PTZData><pan>0</pan><tilt>60</tilt></PTZData>'
CAMERA_CTRL_MOVE_DOWN = '<?xml version="1.0" encoding="UTF-8"?><PTZData><pan>0</pan><tilt>-60</tilt></PTZData>'
CAMERA_CTRL_MOVE_LEFT = '<?xml version="1.0" encoding="UTF-8"?><PTZData><pan>-60</pan><tilt>0</tilt></PTZData>'
Expand All @@ -25,75 +39,149 @@
"right": CAMERA_CTRL_MOVE_RIGHT,
}


class VideoGet:
"""
Class that continuously gets frames from a VideoCapture object
with a dedicated thread.
"""

def __init__(self, ip, rtsp_port, username, password):
self.stream = cv2.VideoCapture("rtsp://{}:{}@{}{}".format(CAMERA_USERNAME, CAMERA_PASSWORD, ip, rtsp_port))
(self.grabbed, self.frame) = self.stream.read()
self.stopped = False
self.ip = ip
self.rtsp_port = rtsp_port
self.username = username
self.password = password
self.stream = None
self.frame_queue = queue.Queue(maxsize=BUFFER_SIZE)
self._should_stop = False
self._capture_thread = None
self._last_frame = None
self._client_count = 0
self._last_gc_time = time.time()
self._reconnect_attempts = 0
self._max_reconnect_attempts = 5
self._reconnect_delay = 1 # seconds
self.start()

def _reconnect(self):
if self.stream is not None:
self.stream.release()

rtsp_url = f"rtsp://{self.username}:{self.password}@{self.ip}{self.rtsp_port}"
logger.info(f"Attempting to reconnect to RTSP stream: {rtsp_url}")

self.stream = cv2.VideoCapture(rtsp_url)
if not self.stream.isOpened():
logger.error("Failed to reconnect to RTSP stream")
return False
logger.info("Successfully reconnected to RTSP stream")
return True

def _capture_frames(self):
frame_count = 0
last_fps_time = time.time()

while not self._should_stop:
try:
if self._client_count == 0:
time.sleep(0.1)
continue

if self.stream is None or not self.stream.isOpened():
if self._reconnect_attempts < self._max_reconnect_attempts:
if self._reconnect():
self._reconnect_attempts = 0
else:
self._reconnect_attempts += 1
time.sleep(self._reconnect_delay)
continue
else:
logger.error("Max reconnection attempts reached")
time.sleep(1)
continue

success, frame = self.stream.read()
if not success:
logger.warning("Failed to read frame, attempting reconnection")
if not self._reconnect():
time.sleep(self._reconnect_delay)
continue
continue

if frame.shape[1] != CAMERA_WIDTH or frame.shape[0] != CAMERA_HEIGHT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using a resize ratio for reducing the frame size instead of fixed width and height

frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT),
interpolation=cv2.INTER_LINEAR)

self._last_frame = frame.copy()

try:
if self.frame_queue.full():
self.frame_queue.get_nowait()
except queue.Empty:
pass

Comment on lines +111 to +116
Copy link

Copilot AI Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try/except block used to catch queue.Empty is misaligned. Wrap the call to self.frame_queue.get_nowait() in its own try block so that the exception is caught correctly.

Copilot uses AI. Check for mistakes.
self.frame_queue.put(frame, timeout=0.1)

frame_count += 1
if frame_count >= 30:
current_time = time.time()
fps = frame_count / (current_time - last_fps_time)
logger.info(f"Capture FPS: {fps:.2f}")
frame_count = 0
last_fps_time = current_time

current_time = time.time()
if current_time - self._last_gc_time > 60:
gc.collect()
self._last_gc_time = current_time

except Exception as e:
logger.error(f"Error in capture thread: {str(e)}")
time.sleep(0.1)

def start(self):
self.thread = Thread(target=self.get, args=())
self.thread.start()
self._should_stop = False
if self.stream is None:
self._reconnect()
self._capture_thread = Thread(target=self._capture_frames)
self._capture_thread.daemon = True
self._capture_thread.start()
return self

def get(self):
while not self.stopped:
if not self.grabbed:
print("capture failed, please refresh the page!")
break
else:
(self.grabbed, self.frame) = self.stream.read()
def get_frame(self):
try:
frame = self.frame_queue.get(timeout=0.1)
if frame is None and self._last_frame is not None:
frame = self._last_frame.copy()

if frame is not None:
encode_param = [
int(cv2.IMWRITE_JPEG_QUALITY), JPEG_QUALITY,
int(cv2.IMWRITE_JPEG_OPTIMIZE), 1,
]
ret, buffer = cv2.imencode(".jpg", frame, encode_param)
return buffer.tobytes()
return None
except queue.Empty:
if self._last_frame is not None:
encode_param = [
int(cv2.IMWRITE_JPEG_QUALITY), JPEG_QUALITY,
int(cv2.IMWRITE_JPEG_OPTIMIZE), 1,
]
ret, buffer = cv2.imencode(".jpg", self._last_frame, encode_param)
return buffer.tobytes()
return None
except Exception as e:
logger.error(f"Error getting frame: {str(e)}")
return None

def stop(self):
self.stopped = True
self.thread.join()
self._should_stop = True
if self._capture_thread:
self._capture_thread.join(timeout=1.0)
self._capture_thread = None
self.stream.release()
gc.collect()

@app.route('/capture')
def capture():
try:
global video_getter
if not video_getter.grabbed:
video_getter.stop()
video_getter = VideoGet(ip, rtsp_port, CAMERA_USERNAME, CAMERA_PASSWORD).start()
if not video_getter.stopped:
ret, frame = video_getter.grabbed, video_getter.frame
if ret:
retval, buffer = cv2.imencode('.jpg', frame)
byte_frame = buffer.tobytes()
print("Image captured!")
return Response(byte_frame, mimetype='image/jpeg')
else:
print("cannot capture frame from cv2")
return "cannot capture frame from cv2\n", 400
except Exception as e:
print("error capture picture, error: {}".format(e))
# return False
return "cannot capture frame from cv2\n", 400


def stream(ip, username, password):
try:
print("start streaming!")
while True:
success, frame = video_getter.grabbed, video_getter.frame
if not success:
break
else:
reducedframe = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)
ret, buffer = cv2.imencode('.jpeg', reducedframe)
framedata = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + framedata + b'\r\n') # concat frame one by one and show result
except Exception as e:
print("error capture picture, error: {}".format(e))
return False
def add_client(self):
self._client_count += 1

def remove_client(self):
self._client_count -= 1

def getCameraInfoWithAuth(s, ip, http_port, auth):
result = None
Expand All @@ -107,102 +195,122 @@ def getCameraInfoWithAuth(s, ip, http_port, auth):
if r.ok:
result = r.content
else:
print("{} failed".format(type(auth)))
logger.warning(f"{type(auth)} failed")
except Exception as e:
result = None
print("error trying {}, {}".format(type(auth), e))
logger.error(f"Error trying {type(auth)}, {e}")

return result


def moveCameraWithAuth(s, ip, http_port, auth, direction):
result = None
s.auth = auth
try:
# send request once to avoid send put request error
getCameraInfoWithAuth(s,ip,http_port,auth)
getCameraInfoWithAuth(s, ip, http_port, auth)
headers = {'Content-Type': 'application/xml'}
r = s.put('http://' + ip + http_port + '/ISAPI/PTZCtrl/channels/1/continuous', data=CAMERA_CTRL_MOVE_DICT[direction], headers=headers)
r = s.put('http://' + ip + http_port + '/ISAPI/PTZCtrl/channels/1/continuous',
data=CAMERA_CTRL_MOVE_DICT[direction], headers=headers)
if r.ok:
time.sleep(0.2)
r = s.put('http://' + ip + http_port + '/ISAPI/PTZCtrl/channels/1/continuous', data=CAMERA_CTRL_MOVE_STOP, headers=headers)
r = s.put('http://' + ip + http_port + '/ISAPI/PTZCtrl/channels/1/continuous',
data=CAMERA_CTRL_MOVE_STOP, headers=headers)
result = r.content
else:
print("{} failed, message: {}".format(type(auth), r.content))
logger.warning(f"{type(auth)} failed, message: {r.content}")
except Exception as e:
result = None
print("error trying {}, {}".format(type(auth), e))
logger.error(f"Error trying {type(auth)}, {e}")

return result


def moveCamera(direction):
with requests.Session() as s:
result = None
print("try HTTPDigestAuth")
logger.info("Trying HTTPDigestAuth")
auth = HTTPDigestAuth(CAMERA_USERNAME, CAMERA_PASSWORD)
result = moveCameraWithAuth(s, ip, http_port, auth, direction)

if result is None:
print("try HTTPBasicAuth")
logger.info("Trying HTTPBasicAuth")
auth = HTTPBasicAuth(CAMERA_USERNAME, CAMERA_PASSWORD)
result = moveCameraWithAuth(s, ip, http_port, auth, direction)
if result is None:
print("all authentication failed for device")
logger.error("All authentication failed for device")
return False

return True


@app.route('/info')
def getCameraInfo():
with requests.Session() as s:
result = None
print("try HTTPDigestAuth")
logger.info("Trying HTTPDigestAuth")
auth = HTTPDigestAuth(CAMERA_USERNAME, CAMERA_PASSWORD)
result = getCameraInfoWithAuth(s, ip, http_port, auth)

if result is None:
print("try HTTPBasicAuth")
logger.info("Trying HTTPBasicAuth")
auth = HTTPBasicAuth(CAMERA_USERNAME, CAMERA_PASSWORD)
result = getCameraInfoWithAuth(s, ip, http_port, auth)

if result is None:
print("all authentication failed for device")
return "all authentication failed for device\n", 400
logger.error("All authentication failed for device")
return jsonify({"error": "All authentication failed for device"}), 400

return Response(result, mimetype='text/xml')

@app.route('/capture')
def capture():
try:
if not video_getter._capture_thread:
video_getter.start()

frame_data = video_getter.get_frame()
if frame_data is not None:
return Response(frame_data, mimetype='image/jpeg')
else:
logger.error("Unable to capture frame")
return jsonify({"error": "Unable to capture frame"}), 500
except Exception as e:
logger.error(f"Capture error: {str(e)}")
return jsonify({"error": str(e)}), 500

@app.route('/stream')
def video_feed():
#Video streaming route. Put this in the src attribute of an img tag
global video_getter
if not video_getter.grabbed:
video_getter.stop()
video_getter = VideoGet(ip, rtsp_port, CAMERA_USERNAME, CAMERA_PASSWORD).start()
return Response(stream(ip, CAMERA_USERNAME, CAMERA_PASSWORD), mimetype='multipart/x-mixed-replace; boundary=frame')

def generate():
try:
video_getter.add_client()
while True:
frame_data = video_getter.get_frame()
if frame_data is None:
time.sleep(0.01)
continue

yield (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + frame_data + b"\r\n"
)
finally:
video_getter.remove_client()

return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/move/<direction>')
def move_camera(direction=None):
print(CAMERA_CTRL_MOVE_DICT.keys())
print("direction is {}".format(direction))
if direction is None or direction not in CAMERA_CTRL_MOVE_DICT.keys():
return 'Please specify move direction, /move/(up/down/left/right)\n', 400
return jsonify({"error": "Please specify move direction (up/down/left/right)"}), 400

if moveCamera(direction):
return 'Success', 200
return jsonify({"message": "Success"}), 200
else:
return 'cannot move camera',400

return jsonify({"error": "Cannot move camera"}), 400

if __name__ == "__main__":
# Customize port
if http_port:
http_port = ':' + http_port

if rtsp_port:
rtsp_port = ':' + rtsp_port

video_getter = VideoGet(ip, rtsp_port, CAMERA_USERNAME, CAMERA_PASSWORD).start()
video_getter = VideoGet(ip, rtsp_port, CAMERA_USERNAME, CAMERA_PASSWORD)
app.run(host="0.0.0.0", port=port)
Loading
Loading