diff --git a/batch_retinaface.py b/batch_retinaface.py new file mode 100644 index 0000000..eb00ab2 --- /dev/null +++ b/batch_retinaface.py @@ -0,0 +1,275 @@ +import time +import cupy_backends.cuda.api.runtime +import cv2 +from numba import njit, prange, types, int64, typed +from simple_decode import decode +from numpy import fromfile, uint8, float32 +import numpy as np +from cupy.cuda import UnownedMemory, MemoryPointer, set_allocator, stream +from cupy import ndarray, stack, pad, ascontiguousarray, asnumpy +from cupyx.scipy.ndimage import zoom +from torch import Tensor, from_dlpack +from os import listdir, path +from more_itertools import chunked +from onnxruntime import InferenceSession +import math +from matplotlib import pyplot +from PIL import Image + +set_allocator() +IMAGE_SIZE = 640 +BATCH_SIZE = 1 + + +def resize_image(image: ndarray, scales: list[float], stz: list[stream.Event], stream: stream.Stream) -> ndarray: + with stream: + if image.shape[1] <= IMAGE_SIZE and image.shape[2] <= IMAGE_SIZE: + scale = 1.0 + pass + else: + scale = ((IMAGE_SIZE - 1) / max(image.shape[1], image.shape[2])) + image = zoom(image, (1, scale, scale), mode="constant") + ret = pad(image, pad_width=[ + (0, 0), + (0, IMAGE_SIZE - image.shape[1]), + (0, IMAGE_SIZE - image.shape[2]) + ]) + scales.append(scale) + stz.append(stream.record()) + return ret + + +# @njit +def prior_box(min_sizes: list[list[int]], steps: list[int], clip: bool, image_sizes: list[int]) -> np.ndarray: + feature_maps = [[math.ceil(image_sizes[0] / step), math.ceil(image_sizes[1] / step)] for step in steps] + anchors = [] + for k, f in enumerate(feature_maps): + min_sizes_k = min_sizes[k] + for i in range(f[0]): + for j in range(f[1]): + for min_size in min_sizes_k: + s_kx = min_size / image_sizes[1] + s_ky = min_size / image_sizes[0] + dense_cx = [x * steps[k] / image_sizes[1] for x in [j + 0.5]] + dense_cy = [y * steps[k] / image_sizes[0] for y in [i + 0.5]] + for cy in dense_cy: + for cx in dense_cx: + anchors.append([cx, cy, s_kx, s_ky]) + # for cy, cx in np.nditer([np.array(dense_cy), np.array(dense_cx).reshape((-1, 1))]): + + output = np.array(anchors) + if clip: + output.clip(.0, 1.0) + return output + + +# @njit +def loc_decode(loc: np.ndarray, priors: np.ndarray, variances: list[float]): + boxes = np.concatenate(( + priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:], + priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), 1) + boxes[:, :2] -= boxes[:, 2:] / 2 + boxes[:, 2:] += boxes[:, :2] + return boxes + + +# @njit +def decode_landm(pre: np.ndarray, priors: np.ndarray, variances: list[float]): + return np.concatenate((priors[:, :2] + pre[:, :2] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:], + ), axis=1) + + +# @njit +def py_cpu_nms(dets: np.ndarray, thresh: float): + x1 = dets[:, 0] + y1 = dets[:, 1] + x2 = dets[:, 2] + y2 = dets[:, 3] + scores = dets[:, 4] + + areas = (x2 - x1 + 1) * (y2 - y1 + 1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + w = np.maximum(0.0, xx2 - xx1 + 1) + h = np.maximum(0.0, yy2 - yy1 + 1) + inter = w * h + ovr = inter / (areas[i] + areas[order[1:]] - inter) + + inds = np.where(ovr <= thresh)[0] + order = order[inds + 1] + + return keep + + +# @njit +def softmax(x, axis=-1): + x_max = np.max(x, axis=axis, keepdims=True) + exp_x = np.exp(x - x_max) + return exp_x / np.sum(exp_x, axis=axis, keepdims=True) + + +# @njit +def post_process(landms: np.ndarray, conf: np.ndarray, loc: np.ndarray, image_sizes: list[int], resize_scale: float, + confidence_threshold: float, top_k: int, nms_threshold: float, keep_top_k: int) -> np.ndarray: + priors = prior_box(min_sizes=[[16, 32], [64, 128], [256, 512]], steps=[8, 16, 32], clip=False, + image_sizes=image_sizes) + boxes = loc_decode(loc, priors, [0.1, 0.2]) + boxes_scale = np.array([image_sizes[1], image_sizes[0]] * 2) + boxes = boxes * boxes_scale / resize_scale + + conf = softmax(conf) + scores = conf[:, 1] + landms = decode_landm(landms, priors, [0.1, 0.2]) + landms_scale = np.array([image_sizes[1], image_sizes[0]] * 5) + landms = landms * landms_scale / resize_scale + + inds = np.where(scores > confidence_threshold)[0] + boxes = boxes[inds] + landms = landms[inds] + scores = scores[inds] + + order = scores.argsort()[::-1][:top_k] + boxes = boxes[order] + landms = landms[order] + scores = scores[order] + + dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32, copy=False) + keep = py_cpu_nms(dets, nms_threshold) + dets = dets[keep, :] + landms = landms[keep] + + dets = dets[:keep_top_k, :] + landms = landms[:keep_top_k, :] + + dets = np.concatenate((dets, landms), axis=1) + return dets + + +# @njit(parallel=True) +def post_loop(landms: np.ndarray, conf: np.ndarray, loc: np.ndarray, resize_scales: list[float]): + res = [] + for i in range(BATCH_SIZE): + rep = post_process(landms=landms[i, :, :], conf=conf[i, :, :], loc=loc[i, :, :], + image_sizes=[IMAGE_SIZE, IMAGE_SIZE], + resize_scale=resize_scales[i], confidence_threshold=0.4, top_k=5000, nms_threshold=0.4, + keep_top_k=750) + res.append(rep) + return res + + +root_dir = r"D:\helloproject-ai-data\blog_images" +model_path = r"C:\Users\tomokazu\build\retinaface\retinaface_only_nn.onnx" +# session = InferenceSession( +# path_or_bytes=model_path, +# providers=[ +# ('TensorrtExecutionProvider', { +# 'trt_engine_cache_enable': True, +# 'trt_engine_cache_path': 'trt_cache', +# 'trt_fp16_enable': True, +# 'trt_profile_min_shapes': f'input:1x3x{IMAGE_SIZE}x{IMAGE_SIZE}', +# 'trt_profile_max_shapes': f'input:{BATCH_SIZE}x3x{IMAGE_SIZE}x{IMAGE_SIZE}', +# 'trt_profile_opt_shapes': f'input:{BATCH_SIZE}x3x{IMAGE_SIZE}x{IMAGE_SIZE}', +# }), +# 'CUDAExecutionProvider', +# 'CPUExecutionProvider' +# ] +# ) +# from matplotlib import pyplot +files = [] +for name in listdir(root_dir): + if name != "真野恵里菜": + continue + pass + for file_name in listdir(path.join(root_dir, name)): + files.append(path.join(root_dir, name, file_name)) +# files = files[:32] + +for file_chunk in chunked(files, BATCH_SIZE): + stacks = [] + ptrs = [] + filenames = [] + # st1 = stream.Stream() + stz = [] + for st1, file in zip([stream.Stream() for _ in range(BATCH_SIZE)], file_chunk): + with st1: + start = time.time() + _input = fromfile(file=file, dtype=uint8) + if _input.shape[0] == 0: + continue + filenames.append(file) + try: + ptr, (_, (width, height)) = decode(_input, st1.ptr) + ptrs.append(ptr) + # print(ptr, width, height) + unownedmemory = UnownedMemory(ptr, height * width * 3 * uint8().itemsize, None) + gpu_arr = ndarray((height * width * 3,), dtype=float32, memptr=MemoryPointer(unownedmemory, 0)) + gpu_image: ndarray = gpu_arr.reshape((3, height, width)) + except: + pil_im = Image.open(file) + pil_im.width + stacks.append(gpu_image) + # tens: Tensor = from_dlpack(gpu_image) + print(file, gpu_image.shape, time.time() - start, sep='\t') + # tens.cpu() + stz.append(st1.record()) + [st.synchronize() for st in stz] + max_height = max([i.shape[1] for i in stacks]) + max_width = max([i.shape[2] for i in stacks]) + if stacks.__len__() != BATCH_SIZE: + stacks.extend([ndarray([3, IMAGE_SIZE, IMAGE_SIZE])] * (BATCH_SIZE - stacks.__len__())) + resize_scales = [] + stz = [] + stacked_images = stack([resize_image(gpu_image, resize_scales, stz, stream.Stream()) for gpu_image in stacks]) + print(stacked_images.shape) + contiguous_stacked = ascontiguousarray(stacked_images) + [st.synchronize() for st in stz] + + # io_binding = session.io_binding() + # io_binding.bind_input( + # name="input", + # device_type='cuda', + # device_id=stacked_images.device, + # element_type=float32, + # shape=tuple(stacked_images.shape), + # buffer_ptr=contiguous_stacked.data.ptr + # ) + # io_binding.bind_output("landmark") + # io_binding.bind_output("confidence") + # io_binding.bind_output("bbox") + # session.run_with_iobinding(iobinding=io_binding) + # landms, conf, loc = io_binding.copy_outputs_to_cpu() + # start = time.time() + # detected = post_loop(landms, conf, loc, resize_scales) + # print(f"post process time: {time.time() - start}") + # for (i, filename, gpu_image) in zip(detected, filenames, stacks): + # print(f'"{filename}"') + # host_image = (asnumpy(gpu_image).transpose((1, 2, 0)) * 255).astype(uint8).copy() + # print(host_image.shape) + # for j in i.tolist(): + # j = [int(_j) for _j in j] + # cv2.rectangle(host_image, (j[0], j[1]), (j[2], j[3]), (255, 0, 0), 2, cv2.LINE_AA) + # cv2.putText(host_image, str(j[4]), (0, 0), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 0, 0)) + # for k in range(5): + # cv2.circle(host_image, (j[5 + k * 2], j[5 + k * 2 + 1]), 2, (255, 0, 0)) + # + # print(j) + # # pyplot.imshow(host_image) + # # pyplot.show() + # # time.sleep(.5) + [cupy_backends.cuda.api.runtime.free(ptr) for ptr in ptrs] +# host_im = gpu_image.get() +# pyplot.imshow(host_im.transpose((1, 2, 0))) +# pyplot.show() diff --git a/test_script/torchivision_jpeg_decode.py b/test_script/torchivision_jpeg_decode.py new file mode 100644 index 0000000..d6661b9 --- /dev/null +++ b/test_script/torchivision_jpeg_decode.py @@ -0,0 +1,153 @@ +import os +from concurrent.futures.process import ProcessPoolExecutor +from multiprocessing import shared_memory +from io import BytesIO +from os import listdir, path, pathsep, makedirs +from pprint import pprint +import more_itertools +import numpy as np +import tqdm +from PIL import Image +from numpy import ndarray +from onnxruntime import InferenceSession +from torch import tensor +import aiofiles +import numpy +import torch +from torchvision.io import decode_jpeg +from asyncio import run, gather, Semaphore +from site import getsitepackages +from rust_retinaface_post_processor import resnet_post_process + +os.environ["Path"] = path.join(getsitepackages()[-1], "tensorrt_libs") + pathsep + os.environ["Path"] +root_dir = r"D:\helloproject-ai-data\blog_images" +model_path = r"C:\Users\tomokazu\build\retinaface\retinaface_only_nn_fp16.onnx" +# makedirs("memmap", exist_ok=True) + +files = [] +files_data: dict[str, numpy.ndarray | None] = {} +chunk_size = 32 +image_size = 640 +device = torch.device("cuda") + + +async def async_read(path: str, semaphore: Semaphore): + async with semaphore: + async with aiofiles.open(file=path, mode="rb") as fp: + return await fp.read() + + +async def gather_runner(l: list, fn): + sem = Semaphore(2048) + return await gather(*[fn(p, sem) for p in l]) + + +def post_processor(outputs, batch_size, image_size): + # print("aaa", flush=True) + outputs = [numpy.ascontiguousarray(output.astype(numpy.float32)) for output in outputs] + res = resnet_post_process([output.__array_interface__["data"][0] for output in outputs], batch_size, image_size) + return res + + +def post_processor_memmap(tmp_filename, sizes, batch_size, image_size): + # print("aaa", flush=True) + outputs = [ + numpy.memmap(filename=path.join("memmap", tmp_filename + str(order)), dtype=numpy.float16, mode="r", shape=size) + for order, size in enumerate(sizes)] + outputs = [numpy.ascontiguousarray(output.astype(numpy.float32)) for output in outputs] + res = resnet_post_process([output.__array_interface__["data"][0] for output in outputs], batch_size, image_size) + return res + + +def post_processor_shm(shm_name, sizes, batch_size, image_size): + shms = [shared_memory.SharedMemory(name=shm_name + "_" + str(i)) for i in range(3)] + outputs = \ + [numpy.ascontiguousarray(numpy.ndarray(shape=size, dtype=numpy.float16, buffer=shm.buf).astype(numpy.float32)) + for size, shm in zip(sizes, shms)] + res = resnet_post_process([output.__array_interface__["data"][0] for output in outputs], batch_size, image_size) + [shm.close() for shm in shms] + return res + + +if __name__ == '__main__': + from kornia.augmentation import LongestMaxSize, PadTo, Normalize + from kornia.constants import Resample + + longest_max_size = LongestMaxSize(max_size=640,resample=Resample.NEAREST) + pad_to = PadTo(size=(640, 640), pad_value=1.) + normalize = Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)) + + session = InferenceSession( + path_or_bytes=model_path, + providers=[ + ('TensorrtExecutionProvider', { + 'trt_engine_cache_enable': True, + 'trt_engine_cache_path': 'trt_cache', + 'trt_fp16_enable': True, + 'trt_profile_min_shapes': f'input:1x3x{image_size}x{image_size}', + 'trt_profile_max_shapes': f'input:{chunk_size}x3x{image_size}x{image_size}', + 'trt_profile_opt_shapes': f'input:{chunk_size}x3x{image_size}x{image_size}', + }), + 'CUDAExecutionProvider', + 'CPUExecutionProvider' + ] + ) + + for name in listdir(root_dir): + with (ProcessPoolExecutor(max_workers=4) as executor): + if name != "下井谷幸穂": + # continue + pass + file_names = listdir(path.join(root_dir, name)) + name_files = [path.join(root_dir, name, file_name) for file_name in file_names] + files_data = {file_name: numpy.frombuffer(dat, dtype=numpy.uint8) for file_name, dat in + zip(file_names, run(gather_runner(name_files, async_read)))} + + # print(k_1) + for cnk in more_itertools.chunked(tqdm.tqdm(files_data.items(), desc=name), n=chunk_size): + stack = [] + tmp_file_name = "" + for file, dat in cnk: + tmp_file_name = file + try: + decoded_image = decode_jpeg(tensor(dat), device=device) + except: + decoded_image = tensor(numpy.array(Image.open(BytesIO(dat.tobytes()))).transpose([2, 0, 1])).to( + device) + decoded_image = decoded_image.to(torch.float16) / 255 + decoded_image = normalize(decoded_image) + decoded_image_resized = longest_max_size(decoded_image) + decoded_image_padded = pad_to(decoded_image_resized) + stack.append(decoded_image_padded.squeeze()) + [stack.append(torch.zeros(size=[3, 640, 640], dtype=torch.float16, device=device)) for _ in + range(32 - stack.__len__())] + stacked = torch.stack(stack).contiguous() + # print(stacked.shape) + io_binding = session.io_binding() + io_binding.bind_input( + name="input", + device_type=stacked.device.type, + device_id=stacked.device.index, + element_type='float16', + shape=tuple(stacked.shape), + buffer_ptr=stacked.data_ptr() + ) + io_binding.bind_output("landmark") + io_binding.bind_output("confidence") + io_binding.bind_output("bbox") + session.run_with_iobinding(iobinding=io_binding) + outputs: list[numpy.ndarray] = io_binding.copy_outputs_to_cpu() + # [numpy.memmap(filename=path.join("memmap", tmp_file_name + str(order)), dtype=numpy.float16, + # mode="w+", shape=output.shape) for order, output in enumerate(outputs)] + shared_array: list[shared_memory.SharedMemory] = \ + [shared_memory.SharedMemory(name=tmp_file_name + "_" + str(order), create=True, size=output.nbytes) + for order, output in enumerate(outputs)] + shared_ndarray = [numpy.ndarray(shape=output.shape, dtype=numpy.float16, buffer=shm.buf) + for shm, output in zip(shared_array, outputs, strict=True)] + for shm, output in zip(shared_ndarray, outputs, strict=True): + shm[:] = output[:] + future = executor.submit(post_processor_shm, tmp_file_name, [output.shape for output in outputs], + chunk_size, [image_size, image_size]) + # print(future.result()) + # future.add_done_callback(pprint) + # exit(0)