You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.5 KiB
Python

# Copyright (C) 2020-2021 Intel Corporation
#
# SPDX-License-Identifier: MIT
import ast
import cv2 as cv
from collections import namedtuple
import hashlib
import importlib
import sys
import traceback
import subprocess
import os
from av import VideoFrame
from PIL import Image
from django.core.exceptions import ValidationError
Import = namedtuple("Import", ["module", "name", "alias"])
def parse_imports(source_code: str):
root = ast.parse(source_code)
for node in ast.iter_child_nodes(root):
if isinstance(node, ast.Import):
module = []
elif isinstance(node, ast.ImportFrom):
module = node.module
else:
continue
for n in node.names:
yield Import(module, n.name, n.asname)
def import_modules(source_code: str):
results = {}
imports = parse_imports(source_code)
for import_ in imports:
module = import_.module if import_.module else import_.name
loaded_module = importlib.import_module(module)
if not import_.name == module:
loaded_module = getattr(loaded_module, import_.name)
if import_.alias:
results[import_.alias] = loaded_module
else:
results[import_.name] = loaded_module
return results
class InterpreterError(Exception):
pass
def execute_python_code(source_code, global_vars=None, local_vars=None):
try:
# pylint: disable=exec-used
exec(source_code, global_vars, local_vars)
except SyntaxError as err:
error_class = err.__class__.__name__
details = err.args[0]
line_number = err.lineno
raise InterpreterError("{} at line {}: {}".format(error_class, line_number, details))
except AssertionError as err:
# AssertionError doesn't contain any args and line number
error_class = err.__class__.__name__
raise InterpreterError("{}".format(error_class))
except Exception as err:
error_class = err.__class__.__name__
details = err.args[0]
_, _, tb = sys.exc_info()
line_number = traceback.extract_tb(tb)[-1][1]
raise InterpreterError("{} at line {}: {}".format(error_class, line_number, details))
def av_scan_paths(*paths):
if 'yes' == os.environ.get('CLAM_AV'):
command = ['clamscan', '--no-summary', '-i', '-o']
command.extend(paths)
res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # nosec
if res.returncode:
raise ValidationError(res.stdout)
def rotate_image(image, angle):
height, width = image.shape[:2]
image_center = (width/2, height/2)
matrix = cv.getRotationMatrix2D(image_center, angle, 1.)
abs_cos = abs(matrix[0,0])
abs_sin = abs(matrix[0,1])
bound_w = int(height * abs_sin + width * abs_cos)
bound_h = int(height * abs_cos + width * abs_sin)
matrix[0, 2] += bound_w/2 - image_center[0]
matrix[1, 2] += bound_h/2 - image_center[1]
matrix = cv.warpAffine(image, matrix, (bound_w, bound_h))
return matrix
def md5_hash(frame):
if isinstance(frame, VideoFrame):
frame = frame.to_image()
elif isinstance(frame, str):
frame = Image.open(frame, 'r')
return hashlib.md5(frame.tobytes()).hexdigest() # nosec
def parse_specific_attributes(specific_attributes):
assert isinstance(specific_attributes, str), 'Specific attributes must be a string'
return {
item.split('=')[0].strip(): item.split('=')[1].strip()
for item in specific_attributes.split('&')
} if specific_attributes else dict()