-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathobjectdetection_fd_rknn_adapter.py
More file actions
154 lines (109 loc) · 5.59 KB
/
objectdetection_fd_rknn_adapter.py
File metadata and controls
154 lines (109 loc) · 5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Import our general libraries
import os
import sys
from time import time
# Import CodeProject.AI SDK
from codeproject_ai_sdk import RequestData, ModuleRunner, LogMethod, JSON
# Import the method of the module we're wrapping
from options import Options
from PIL import Image
# Import the method of the module we're wrapping
from objectdetection_fd_rknn import init_detect, do_detect
class FastDeploy_adapter(ModuleRunner):
def __init__(self):
super().__init__()
self.opts = Options()
self.models_last_checked = None
self.model_names = [] # We'll use this to cache the available model names
def initialise(self) -> None:
# if the module was launched outside of the server then the queue name
# wasn't set. This is normally fine, but here we want the queue to be
# the same as the other object detection queues
if not self.launched_by_server:
self.queue_name = "objectdetection_queue"
if self.enable_GPU:
self.enable_GPU = self.system_info.hasFastDeployRockNPU
if self.enable_GPU:
print("Rockchip NPU detected")
self.inference_device = "NPU"
self.inference_library = "PaddlePaddle"
init_detect(self.opts)
self._num_items_found = 0
self._histogram = {}
def process(self, data: RequestData) -> JSON:
response = None
# The route to here is /v1/vision/custom/list list all models available
if data.command == "list-custom":
response = self._list_models(self.opts.custom_models_dir)
elif data.command == "detect": # Perform 'standard' object detection
# The route to here is /v1/vision/detection
threshold: float = float(data.get_value("min_confidence", self.opts.min_confidence))
img: Image = data.get_image(0)
response = do_detect(self, self.opts.models_dir, self.opts.std_model_name, img, threshold)
elif data.command == "custom": # Perform custom object detection
threshold: float = float(data.get_value("min_confidence", self.opts.min_confidence))
img: Image = data.get_image(0)
# The route to here is /v1/vision/custom/<model-name>. if mode-name = general,
# or no model provided, then a built-in general purpose mode will be used.
models_dir:str = self.opts.custom_models_dir
model_name:str = "general"
if data.segments and data.segments[0]:
model_name = data.segments[0]
# Map the "general" model to our current "general" model
if model_name == "general": # Use the custom IP Cam general model
models_dir = self.opts.custom_models_dir
model_name = "ipcam-general-small"
self.log(LogMethod.Info | LogMethod.Server,
{
"filename": __file__,
"loglevel": "information",
"method": sys._getframe().f_code.co_name,
"message": f"Detecting using {model_name}"
})
response = do_detect(self, models_dir, model_name, img, threshold)
else:
self.report_error(None, __file__, f"Unknown command {data.command}")
response = { "success": False, "error": "unsupported command" }
return response
def status(self) -> JSON:
statusData = super().status()
statusData["numItemsFound"] = self._num_items_found
statusData["histogram"] = self._histogram
return statusData
def update_statistics(self, response):
super().update_statistics(response)
if "success" in response and response["success"] and "predictions" in response:
predictions = response["predictions"]
self._num_items_found += len(predictions)
for prediction in predictions:
label = prediction["label"]
if label not in self._histogram:
self._histogram[label] = 1
else:
self._histogram[label] += 1
def selftest(self) -> JSON:
file_name = os.path.join("test", "home-office.jpg")
request_data = RequestData()
request_data.queue = self.queue_name
request_data.command = "detect"
request_data.add_file(file_name)
request_data.add_value("min_confidence", 0.4)
result = self.process(request_data)
print(f"Info: Self-test for {self.module_id}. Success: {result['success']}")
# print(f"Info: Self-test output for {self.module_id}: {result}")
return { "success": result['success'], "message": "Object detection test successful" }
def _list_models(self, models_path: str):
"""
Lists the custom models we have in the assets folder. This ignores the
yolo* files.
"""
# We'll only refresh the list of models at most once a minute
if self.models_last_checked is None or (time() - self.models_last_checked) >= 60:
self.model_names = [entry.name[:-5] for entry in os.scandir(models_path)
if (entry.is_file()
and entry.name.endswith(".rknn")
and not entry.name.startswith("yolov5"))]
self.models_last_checked = time()
return { "success": True, "models": self.model_names }
if __name__ == "__main__":
FastDeploy_adapter().start_loop()