Camera live streaming using MJPEG format
MJPEG streaming is a simple method to stream videos on the internet with a low latency. It sends JPEG images over the network and display that sequence of images on the user's webpage. However, it consumes a lot of bandwidth due to the size of every complete image. This post guides to implement a camera live stream using MJPEG format. A server can be easily made by PiCamera and Python HTTP.
Last update: 2022-06-29
Table of Content
Big Buck Bunny movie, © 2008, Blender Foundation
There are many methods to implement a streaming server using MJPEG (MJPG) format. The basic principle is to send a series of JPEG (JPG) image to the user’s webpage and display it in an image <img>
tag. An example is the mjpg-streamer.
This post shows a method to develop a streaming system, starting with a Python package named PiCamera and a simple Python HTTP server.
To setup picamera
package, please read more in the Setup Camera post.
PiCamera also has an example to stream MJPEG at Web streaming section.
The basic structure of this MJPEG streaming server is as below. PiCamera will capture JPEG images to a buffer that will be sent to user’s web browser via an endless multipart/x-mixed-replace
content when the webpage requests to show an image in a <img>
element.
Record video to a stream#
This is a basic step to write a video stream to a buffered memory. Python has the io
package which expects bytes-like objects and produces bytes
objects. No encoding, decoding, or newline translation is performed, because PiCamera requests to V4L2 driver to handle the encoding in hardware.
from io import BytesIO
from picamera import PiCamera
# create in-memory stream
stream = BytesIO()
# create camera object (instance)
camera = PiCamera()
# config camera
camera.resolution = (640, 480)
# start recording to stream
camera.start_recording(stream, format='mjpeg')
# wait
camera.wait_recording(15)
# stop recording
camera.stop_recording()
Frame buffer#
Next step is to create a custom output to used in PiCamera.start_recording()
method. Refer to Custom outputs.
A file-like object (as far as PiCamera is concerned) is simply an object with:
- A
write()
method which must accept a single parameter consisting of a byte-string, and which can optionally return the number of bytes written. - A
flush()
method with no parameters, which will be called at the end of output.
In write()
method, it can implement code that reacts to each and every frame. The write()
method is called frequently, so its implementation must be sufficiently rapid that it doesn’t stall the encoding flow.
Let’s write a class FrameBuffer()
which checks the JPEG Magic Number 0xFF
0xD8
at the beginning of an JPEG image:
import io
class FrameBuffer(object):
def __init__(self):
# store each frame
self.frame = None
# buffer to hold incoming frame
self.buffer = io.BytesIO()
def write(self, buf):
# if it's a JPEG image
if buf.startswith(b'\xff\xd8'):
# write to buffer
self.buffer.seek(0)
self.buffer.write(buf)
# extract frame
self.buffer.truncate()
self.frame = self.buffer.getvalue()
Note that FrameBuffer.frame
will be used to send the frame to user’s webpage.
Then, use the FrameBuffer
instead of the buffered memory:
# create buffer
frame_buffer = FrameBuffer()
# write to framebuffer
camera.start_recording(frame_buffer, format='mjpeg')
Streaming Web server#
Python has a built-in simple HTTP Server, which is ready to run by providing a server address and a request handler class.
from http.server import HTTPServer, BaseHTTPRequestHandler
def run(server_class=HTTPServer, handler_class=BaseHTTPRequestHandler):
server_address = ('', 8000)
httpd = server_class(server_address, handler_class)
httpd.serve_forever()
Now, look at some pre-defined Request Handler classes:
class http.server.BaseHTTPRequestHandler
-
This class is used to handle the HTTP requests that arrive at the server. By itself, it cannot respond to any actual HTTP requests;
BaseHTTPRequestHandler
just provides a number of class and instance variables, and methods for use by subclasses. It must be sub-classed to handle each request method (e.g. GET or POST).The handler will parse the request and the headers, then call a method specific to the request type. The method name is constructed from the request. For example, for the request method SPAM, the
do_SPAM()
method will be called with no arguments. All the relevant information is stored in instance variables of the handler. Subclasses should not need to override or extend the__init__()
method. class http.server.SimpleHTTPRequestHandler
- This class serves files from the current directory and below, directly mapping the directory structure to HTTP requests. A lot of the work, such as parsing the request, is done by the base class
BaseHTTPRequestHandler
. This class implements thedo_GET()
anddo_HEAD()
functions. class http.server.CGIHTTPRequestHandler
-
This class is used to serve either files or output of CGI scripts from the current directory and below. Note that mapping HTTP hierarchic structure to local directory structure is exactly as in
SimpleHTTPRequestHandler
.The class will however, run the CGI script, instead of serving it as a file, if it guesses it to be a CGI script. Only directory-based CGI are used — the other common server configuration is to treat special extensions as denoting CGI scripts.
The
do_GET()
anddo_HEAD()
functions are modified to run CGI scripts and serve the output, instead of serving files, if the request leads to somewhere below thecgi_directories
path.
Let’s start with SimpleHTTPRequestHandler
which has some implemented features.
Request Handler#
Based on SimpleHTTPRequestHandler
, create a new class StreamingHandler
and only override do_GET()
method to just print requested path
and then call the base method as it is already implemented.
from http.server import SimpleHTTPRequestHandler
class StreamingHandler(SimpleHTTPRequestHandler):
def do_GET(self):
print(self.path)
# call to the base method implemented in SimpleHTTPRequestHandler
super().do_GET()
The SimpleHTTPRequestHandler
will serve files in GET requests, and it will look for index.html
for the homepage.
To display image, create an image <img>
tag which will request a file named stream.mjpg
.
<html>
<head>
<title>Picamea MJPEG Live Stream</title>
</head>
<body>
<!-- Request MJPEG stream -->
<img src="stream.mjpg" />
</body>
</html>
There is no actual stream.mjpg
file!.
When the web page request stream.mjpg
, web server should return a stream, not a single file, therefore a special sequence is needed to handle this special request of stream.mjpg
file in the do_GET()
method:
-
Send response with HTTP Status Code 200 (Successful responses)
-
Send header with information to notify web client about type of responded content, which is
multipart/x-mixed-replace
-
Send the content in a stream format (loop forever!): send the boundary
FRAME
, send content type of each frameimage/jpeg
, send the length of the content, and then send the actual image data
from http.server import SimpleHTTPRequestHandler
class StreamingHandler(SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/stream.mjpg':
# response
self.send_response(200)
# header
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
frame = frame_buffer.frame # need frame_buffer as global
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
print(str(e))
else:
super().do_GET()
Finally, wrap them up by creating an instance of FrameBuffer
, PiCamera
, HTTPServer
to start streaming:
frame_buffer = FrameBuffer()
camera = PiCamera(resolution='640x480', framerate=24)
camera.start_recording(frame_buffer, format='mjpeg')
server_address = ('', 8000)
handler_class = StreamingHandler # alias
try:
httpd = HTTPServer(server_address, handler_class)
httpd.serve_forever()
finally:
camera.stop_recording()
Bug: Hangup stream
When run the above code, the web page shows up but with only one frame displayed, CPU is locked up at 100%, because the block while True:
loop causes the problem.
Need to find a way to synchronize between camera thread and web server thread: send a frame only when it is available.
Synchronize between threads#
Python has implemented a lock mechanism between threads:
class threading.Condition(lock=None)
-
This class implements condition variable objects. A condition variable allows one or more threads to wait until they are notified by another thread. If the
lock
argument is given and notNone
, it must be aLock
orRLock
object, and it is used as the underlying lock. Otherwise, a newRLock
object is created and used as the underlying lock.wait(timeout=None)
-
Wait until notified or until a timeout occurs. If the calling thread has not acquired the
lock
when this method is called, aRuntimeError
is raised.This method releases the underlying
lock
, and then blocks until it is awakened by anotify()
ornotify_all()
call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires thelock
and returns. notify_all()
- Wake up all threads waiting on this condition. This method acts like
notify()
, but wakes up all waiting threads instead of one. If the calling thread has not acquired thelock
when this method is called, aRuntimeError
is raised.
Then add a Condition
object in FrameBuffer
, and use it in StreamingHandler
:
from threading import Condition
class FrameBuffer(object):
def __init__(self):
self.frame = None
self.buffer = io.BytesIO()
# synchronize between threads
self.condition = Condition()
def write(self, buf):
if buf.startswith(b'\xff\xd8'):
with self.condition:
self.buffer.seek(0)
self.buffer.write(buf)
self.buffer.truncate()
self.frame = self.buffer.getvalue()
# notify other threads
self.condition.notify_all()
class StreamingHandler(SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/stream.mjpg':
...
try:
while True:
with frame_buffer.condition:
# wait for a new frame
frame_buffer.condition.wait()
frame = frame_buffer.frame # access global variable, need to change later
Wow, it works!!! the latency is just about 200ms which is unachievable with HLS/ MPEG-DASH streaming. However, the CPU usage is quite high, Pi Zero W only can handle 6 clients at the same time with video quality at 640x480 @25fps.
Hint
Above sections are enough to create a simple MJPEG streaming server.
Below sections are for an advanced implementation which need some advanced Python programming to create multiple buffers in an application, which can be used to merge or manipulate the image before sending to user’s browsers.
Some updates in the script#
The instance frame_buffer
is used as a global variable in the StreamingHandler
, it is not good if there is another FrameBuffer
used for another stream in a same script.
Here is an advanced method to have multiple frame buffers by passing an instance of FrameBuffer
into an instance of StreamingHandler
. It can be done by adding an Instance variable that holds reference to an instance of FrameBuffer
, but can not be done using Class variable.
Let’s check how they work.
Class variable#
Class variable is shared by all instance, therefore it acts like a global static attribute of the class.
class StreamingHandler(SimpleHTTPRequestHandler):
# class variable refers to an instance of FrameBuffer
my_frame_buffer = None
def do_GET(self):
...
frame = self.my_frame_buffer.frame
# create an instance of FrameBuffer
frame_buffer = FrameBuffer()
handler_class = StreamingHandler # alias
# assign class variable
handler_class.my_frame_buffer = frame_buffer
# all instance will share class variables
first_handler = StreamingHandler()
second_handler = StreamingHandler()
# first_handler.my_frame_buffer will be the same as second_handler.my_frame_buffer
Instance variable#
Instance variables are for the data unique to each instance, they are created in the __init()__
constructor of that class:
class StreamingHandler(SimpleHTTPRequestHandler):
def __init__(self, frame_buffer, request, client_address, server, directory=None):
self.my_frame_buffer = frame_buffer
super().__init__(request, client_address, server, directory)
def do_GET():
...
However, with this modification, script cannot use StreamingHandler
to initialize ThreadingHTTPServer
anymore, because it expects to call a request handler with only required positional arguments (request, client_address, server)
, without a new argument frame_buffer
.
Therefore, write a function that convert expected parameters list to new parameters list:
frame_buffer = FrameBuffer()
def getStreamingHandler(request, client_address, server):
return StreamingHandler(frame_buffer, request, client_address, server)
httpd = ThreadingHTTPServer(address, getStreamingHandler)
Well, it works, but the convert function actually drop the parameter directory
which is an optional parameter in original constructor of SimpleHTTPRequestHandler
. To solve this problem, let’s use special *args
and **kwargs
parameters.
*args
and **kwargs
#
The special *args
and **kwargs
parameters allow passing multiple arguments or keyword arguments to a function. Read about them in here.
So, change the parameter list (request, client_address, server, ...)
to *args
in code, then it looks better:
class StreamingHandler(SimpleHTTPRequestHandler):
def __init__(self, frame_buffer, *args):
self.my_frame_buffer = frame_buffer
super().__init__(*args)
frame_buffer = FrameBuffer()
def getStreamingHandler(*args):
return StreamingHandler(frame_buffer, *args)
httpd = ThreadingHTTPServer(address, getStreamingHandler)
Lambda function#
Python and other languages like Java, C#, and even C++ have had lambda functions added to their syntax, whereas languages like LISP or the ML family of languages, Haskell, OCaml, and F#, use lambdas as a core concept. Read more in here
So, reduce the function getStreamingHandler
to a lambda function which can be declared in-line when creating ThreadingHTTPServer
instance:
frame_buffer = FrameBuffer()
httpd = ThreadingHTTPServer(address, lambda *args: StreamingHandler(frame_buffer, *args))
Measure FPS#
In the while loop of sending frames, use frame_count
variable to count the number of processed frames. With time
package, it is easy to calculate FPS over a defined period, for example, 5 seconds in below code:
try:
# tracking serving time
start_time = time.time()
frame_count = 0
# endless stream
while True:
with self.frames_buffer.condition:
# wait for a new frame
self.frames_buffer.condition.wait()
# it's available, pick it up
frame = self.frames_buffer.frame
# send it
...
# count frames
frame_count += 1
# calculate FPS every 5s
if (time.time() - start_time) > 5:
print("FPS: ", frame_count / (time.time() - start_time))
frame_count = 0
start_time = time.time()
...
Some lines of code to handle exception are also needed, for full source code, please download by clicking on the download button at the beginning of this post.