SPPAS 4.20

Module sppas.src.videodata

Class sppasVideoReader

Description

Class to wrap a VideoCapture with OpenCV.

This class is embedding a VideoCapture() object and define some

getters and setters to manage such video easily.

It was tested only to open/read videos from files, not to capture a

video stream from a camera, etc.

Example
Example
>>> # Create the instance and open the video
>>> vid = sppasVideoReader()
>>> vid.open("my_video_file.xxx")
Example
>>> # Read one frame from the current position
>>> image = vid.read()
Example
>>> # Set the current position
>>> vid.seek(frame_pos)
>>> # Get the current position
>>> vid.tell()
Example
>>> # Release the video stream
>>> vid.close()

Constructor

Create a sppasVideoReader.

View Source
def __init__(self):
    """Create a sppasVideoReader. """
    self.__video = cv2.VideoCapture()
    self.__lock = False
    self.__rotate = 0.0
    self.__gray = False
    self.__alpha = False
    self.__rgb = False

Public functions

set_rotate

Force to rotate the read image of given angle.

Parameters

angle(int) Value in range [-360,360]

View Source
def set_rotate(self, angle=0.0):
    """Force to rotate the read image of given angle.

        :param angle (int) Value in range [-360,360]

        """
    angle = float(angle)
    if int(angle) == 360 or int(angle) == -360:
        self.__rotate = 0
    elif -360 < int(angle) < 360:
        self.__rotate = angle
    else:
        raise IntervalRangeException(angle, -360, 360)
get_rotate

Return the angle to rotate read images.

View Source
def get_rotate(self):
    """Return the angle to rotate read images."""
    return self.__rotate
set_gray

Turn the read image into gray color.

Parameters
  • value: (bool) make the image gray
View Source
def set_gray(self, value=False):
    """Turn the read image into gray color.

        :param value: (bool) make the image gray

        """
    value = bool(value)
    self.__gray = value
get_gray

Return True if read images should be turned gray.

View Source
def get_gray(self):
    """Return True if read images should be turned gray."""
    return self.__gray
set_rgb

Turn the read image into RGB instead of BGR.

Parameters
  • value: (bool) make the image RGB
View Source
def set_rgb(self, value=False):
    """Turn the read image into RGB instead of BGR.

        :param value: (bool) make the image RGB

        """
    value = bool(value)
    self.__rgb = value
get_rgb

Return True if read images should be turned into RGB.

View Source
def get_rgb(self):
    """Return True if read images should be turned into RGB."""
    return self.__rgb
set_alpha

Add or remove alpha channel of the read image.

Parameters
  • value: (bool) force to add alpha or to remove it
View Source
def set_alpha(self, value=False):
    """Add or remove alpha channel of the read image.

        :param value: (bool) force to add alpha or to remove it

        """
    value = bool(value)
    self.__alpha = value
get_alpha

Return True if read images should contain alpha channel.

View Source
def get_alpha(self):
    """Return True if read images should contain alpha channel."""
    return self.__alpha
open

Initialize a VideoCapture.

The angle value to rotate the image is overridden considering OpenCV

VideoCapture properties extracted from the video file's metadata

(applicable for FFmpeg back-end only).

Parameters
  • video: (name of video file, image sequence, url or video stream, GStreamer pipeline, IP camera) The video to browse.
View Source
def open(self, video):
    """Initialize a VideoCapture.

        The angle value to rotate the image is overridden considering OpenCV
        VideoCapture properties extracted from the video file's metadata
        (applicable for FFmpeg back-end only).

        :param video: (name of video file, image sequence, url or video stream,
        GStreamer pipeline, IP camera) The video to browse.

        """
    if self.__lock is True:
        logging.error("The video can't be opened because another video stream is already opened.")
        raise VideoLockError
    self.__video = self.__open_video(video)
    success = True
    test_pos = self.get_nframes() - 10
    logging.debug(' ... Video opened successfully. Test to browse into the {} frames.'.format(self.get_nframes()))
    if test_pos > 0:
        success = self.__video.set(cv2.CAP_PROP_POS_FRAMES, test_pos)
        if success is True:
            real_pos = int(self.__video.get(cv2.CAP_PROP_POS_FRAMES))
            if real_pos != test_pos:
                logging.warning('Video failed in setting pos: {} != {}'.format(real_pos, test_pos))
    if success is False:
        self.close()
        raise VideoBrowseError(video)
    self.__video.set(cv2.CAP_PROP_POS_FRAMES, 0)
is_opened

Return True if a video is already opened and ready to write.

View Source
def is_opened(self):
    """Return True if a video is already opened and ready to write."""
    return self.__lock
close

Release the flow taken by the reading of the video.

View Source
def close(self):
    """Release the flow taken by the reading of the video."""
    self.__video.release()
    self.__lock = False
read_frame

Read a frame of the video.

Returns
  • (sppasImage, None)
Parameters
  • process_image
View Source
def read_frame(self, process_image=True):
    """Read a frame of the video.

        :return: (sppasImage, None)

        """
    if self.__lock is False:
        return None
    success, img = self.__video.read()
    if img is None or success is False:
        return None
    if process_image is False:
        return sppasImage(input_array=img)
    return self._preprocess_image_options(img)
read

Browse a sequence of a video.

If both frompos and topos are -1, only one frame is read.

Parameters
  • from_pos: (int) frameID value to start reading. -1 means the current position.
  • to_pos: (int) frameID value to stop reading. -1 means the last frame of the video.
  • process_image: (bool) convert the image to sppasImage and apply options
Returns
  • None, an image or a list of images(numpy.ndarray).
View Source
def read(self, from_pos=-1, to_pos=-1, process_image=True):
    """Browse a sequence of a video.

        If both from_pos and to_pos are -1, only one frame is read.

        :param from_pos: (int) frameID value to start reading. -1 means the current position.
        :param to_pos: (int) frameID value to stop reading. -1 means the last frame of the video.
        :param process_image: (bool) convert the image to sppasImage and apply options
        :returns: None, an image or a list of images(numpy.ndarray).

        """
    if self.__lock is False:
        return None
    if from_pos == -1 and to_pos == -1:
        return self.read_frame(process_image)
    if to_pos == -1:
        to_pos = self.get_nframes()
    else:
        to_pos = self.check_frame(to_pos)
    if from_pos == -1:
        from_pos = self.tell()
    else:
        from_pos = self.check_frame(from_pos)
    if from_pos >= to_pos:
        raise RangeBoundsException(from_pos, to_pos)
    images = list()
    self.seek(from_pos)
    for i in range(to_pos - from_pos):
        frame = self.read_frame(process_image)
        if frame is None:
            break
        images.append(frame)
    if len(images) == 0:
        return None
    if len(images) == 1:
        return images[0]
    return images
tell

Return the index of the current frame position.

View Source
def tell(self):
    """Return the index of the current frame position."""
    if self.__lock is False:
        return 0
    return int(self.__video.get(cv2.CAP_PROP_POS_FRAMES))
seek

Set a new frame position in the video.

Parameters
  • value: (int)
Raises
  • IOError

IntervalRangeException

Returns
  • (bool) Success condition
View Source
def seek(self, value):
    """Set a new frame position in the video.

        :param value: (int)
        :raises: IOError:
        :raises: IntervalRangeException
        :return: (bool) Success condition

        """
    if self.__lock is False:
        raise IOError('No video is opened: seek is not possible.')
    value = self.check_frame(value)
    nb_frames_diff = int(value - self.__video.get(cv2.CAP_PROP_POS_FRAMES))
    if nb_frames_diff == 0:
        return True
    if nb_frames_diff < 0 or nb_frames_diff > 20:
        success = self.__video.set(cv2.CAP_PROP_POS_FRAMES, value)
        if success is False:
            raise IOError('Seek is not supported by your platform for this video.')
    else:
        for i in range(nb_frames_diff):
            ret = self.__video.grab()
            if ret is False:
                return False
    return True
check_frame

Raise an exception if the given value is an invalid frameID.

Parameters
  • value: (int)
Raises

ValueError

Returns
  • (int) -1 if no video is opened
View Source
def check_frame(self, value):
    """Raise an exception if the given value is an invalid frameID.

        :param value: (int)
        :raise: ValueError
        :return: (int) -1 if no video is opened

        """
    if self.__lock is False:
        return -1
    value = int(value)
    if value < 0:
        raise NegativeValueError(value)
    if self.is_opened() is True and value > self.get_nframes():
        raise IntervalRangeException(value, 1, self.get_nframes())
    return value
get_duration

Return the duration of the video in seconds (float).

View Source
def get_duration(self):
    """Return the duration of the video in seconds (float)."""
    if self.__lock is False:
        return 0.0
    return float(self.get_nframes()) * (1.0 / self.get_framerate())
get_framerate

Return the FPS of the current video (float).

View Source
def get_framerate(self):
    """Return the FPS of the current video (float)."""
    if self.__lock is False:
        return 0
    return float(self.__video.get(cv2.CAP_PROP_FPS))
get_width

Return the width of the frames in the video.

View Source
def get_width(self):
    """Return the width of the frames in the video."""
    if self.__lock is False:
        return 0
    return int(self.__video.get(cv2.CAP_PROP_FRAME_WIDTH))
get_height

Return the height of the frames in the video.

View Source
def get_height(self):
    """Return the height of the frames in the video."""
    if self.__lock is False:
        return 0
    return int(self.__video.get(cv2.CAP_PROP_FRAME_HEIGHT))
get_size

Return the (w,h) of the frames in the video.

View Source
def get_size(self):
    """Return the (w,h) of the frames in the video."""
    if self.__lock is False:
        return 0
    w = int(self.__video.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(self.__video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    return (w, h)
get_nframes

Return the number of frames in the video.

View Source
def get_nframes(self):
    """Return the number of frames in the video."""
    if self.__lock is False:
        return 0
    return int(self.__video.get(cv2.CAP_PROP_FRAME_COUNT))

Private functions

_preprocess_image_options

Process the image to apply options.

Parameters
  • image: (np.array)
Returns
  • (sppasImage)
View Source
def _preprocess_image_options(self, image):
    """Process the image to apply options.

        :param image: (np.array)
        :return: (sppasImage)

        """
    image = np.array(image, dtype=np.uint8)
    _, _, c = image.shape
    if self.__gray is True:
        avg = 0
        if c == 4:
            avg = np.average(self, weights=[0.114, 0.587, 0.2989, 1], axis=2)
        elif c == 3:
            avg = np.average(self, weights=[0.114, 0.587, 0.2989], axis=2)
        if avg > 0:
            image[:, :, 0] = avg
            image[:, :, 1] = avg
            image[:, :, 2] = avg
    elif self.__rgb is True:
        if c == 3:
            image = image[:, :, [2, 1, 0]]
        elif c == 4:
            image = image[:, :, [2, 1, 0, 3]]
    if c == 3 and self.__alpha is True:
        image = cv2.cvtColor(image, cv2.COLOR_RGB2RGBA)
    if c == 4 and self.__alpha is False:
        image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
    image = sppasImage(input_array=image)
    if int(self.__rotate) != 0:
        image = image.irotate(self.__rotate)
    return image

Protected functions

__open_video

Return a VideoCapture object or raise an Exception.

Parameters
  • device
View Source
def __open_video(self, device):
    """Return a VideoCapture object or raise an Exception."""
    message = ''
    self.__lock = False
    try:
        video = cv2.VideoCapture(device, cv2.CAP_FFMPEG)
        self.__rotate = video.get(cv2.CAP_PROP_ORIENTATION_META)
    except:
        try:
            self.__rotate = 0.0
            video = cv2.VideoCapture(device, cv2.CAP_ANY)
        except Exception as e:
            video = cv2.VideoCapture()
            message = str(e)
    if video.isOpened() is True:
        self.__lock = True
        return video
    logging.error("Video {} can't be opened. CV2 library message: {}".format(device, message))
    raise VideoOpenError(device)