forked from RoBorregos/sems-vision
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutors.py
More file actions
43 lines (32 loc) · 1.03 KB
/
executors.py
File metadata and controls
43 lines (32 loc) · 1.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import random
from typing import Callable
import cv2
from .frame_packet import FramePacketGenerator
def pipeline_executor(source: FramePacketGenerator) -> Callable:
"""
Creates a pipeline executor that accepts frames as fast as possible until end of stream
:param source: pipeline source
:return: executor function
"""
def executor():
nonlocal source
for _ in source:
continue
return executor
def imshow_pipeline_executor(source: FramePacketGenerator, unique_id: int) -> Callable:
"""
Creates a pipeline executor that accepts frames as fast as possible and shows them using
cv2's imshow
:param source: pipeline source
:return: executor function
"""
def executor():
nonlocal source
window_id = f'imshow_executor{unique_id}'
for packet in source:
cv2.imshow(window_id, packet.frame)
key = cv2.waitKey(1)
if key == ord('q'):
return
cv2.destroyWindow(window_id)
return executor