-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatacollector.py
118 lines (95 loc) · 3.27 KB
/
datacollector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import cv2
import os
import sys
import pathlib
from typing import TypedDict
from datetime import datetime
import time
import logging
import colorlog
# set up logging
handler = colorlog.StreamHandler()
handler.setFormatter(
colorlog.ColoredFormatter(
"%(log_color)s[%(levelname)s]: \033[0m%(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
)
)
logger = colorlog.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class Config(TypedDict):
"""
Configuration for the ImageCollector
path: pathlib.Path
The directory where images will be stored
When Program Start to collect, program will create a directory base on the date in `path` directory
image_polling_interval: int
The interval in seconds between each image capture. Default is 1 second.
vcap: cv2.VideoCapture
The VideoCapture object to capture images from the camera or video stream.
"""
path: pathlib.Path
image_polling_interval: int = 1
vcap: cv2.VideoCapture
class ImageCollector:
"""
Collects images from a video capture and stores them in a directory.
"""
def __init__(self, config: Config):
self.path = config["path"]
if config["image_polling_interval"] <= 0:
logger.error("Error: image_polling_interval must be greater than 0.")
sys.exit()
self.image_polling_interval = config["image_polling_interval"]
# test video capture
if not config["vcap"].isOpened():
logger.error("Error: Could not open video capture.")
sys.exit()
self.vcap = config["vcap"]
# create the directory if it does not exist (./[path])
if not os.path.exists(config["path"]):
os.mkdir(config["path"])
@staticmethod
def from_dir(
path: pathlib.Path, vcap: cv2.VideoCapture, image_polling_interval: int = 1
):
return ImageCollector(
{
"path": path,
"vcap": vcap,
"image_polling_interval": image_polling_interval,
}
)
def collect(self, duration: int):
"""
Collect images from the video capture and store them in the directory.
duration: int
The duration in seconds to collect images.
"""
# create a directory for the date with time
date = str(datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
date_path = self.path / date
os.mkdir(date_path)
# wait until user is ready to start
os.system("pause")
# collect images
start_time = time.time()
while time.time() - start_time < duration:
ret, frame = self.vcap.read()
if not ret:
print("Error: Could not read frame.")
break
image_path = date_path / f"{time.time()}.jpg"
cv2.imwrite(str(image_path), frame)
logger.info(f"Image saved to {image_path}")
time.sleep(self.image_polling_interval)
self.vcap.release()
cv2.destroyAllWindows()
logger.info(f"Image collection complete. Images saved to {date_path}")