-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
245 lines (199 loc) · 10.1 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# test.py
import os
import tensorflow as tf
import numpy as np
import cv2
import socket
# module-level variables ##############################################################################################
RETRAINED_LABELS_TXT_FILE_LOC = os.getcwd() + "/" + "retrained_labels.txt"
RETRAINED_GRAPH_PB_FILE_LOC = os.getcwd() + "/" + "retrained_graph.pb"
TEST_IMAGES_DIR = os.getcwd() + "/test_images"
SCALAR_RED = (0.0, 0.0, 255.0)
SCALAR_BLUE = (255.0, 0.0, 0.0)
TCP_IP = '192.168.0.2'
TCP_PORT = 5001
def recvall(sock, count):
buf = b''
while count:
newbuf = sock.recv(count)
if not newbuf: return None
buf += newbuf
count -= len(newbuf)
return buf
def read_image_socket(conn, addr):
length = recvall(conn,16)
stringData = recvall(conn, int(length))
data = np.frombuffer(stringData, dtype='uint8')
decimg=cv2.imdecode(data,1)
return decimg, int(length)
def send_msg_socket(conn, addr, msg):
conn.send( str(msg).ljust(32).encode());
#######################################################################################################################
def main():
print("starting program . . .")
if not checkIfNecessaryPathsAndFilesExist():
return
# end if
# get a list of classifications from the labels file
classifications = []
# for each line in the label file . . .
for currentLine in tf.gfile.GFile(RETRAINED_LABELS_TXT_FILE_LOC):
# remove the carriage return
classification = currentLine.rstrip()
# and append to the list
classifications.append(classification)
# end for
# show the classifications to prove out that we were able to read the label file successfully
print("classifications = " + str(classifications))
# load the graph from file
with tf.gfile.FastGFile(RETRAINED_GRAPH_PB_FILE_LOC, 'rb') as retrainedGraphFile:
# instantiate a GraphDef object
graphDef = tf.GraphDef()
# read in retrained graph into the GraphDef object
graphDef.ParseFromString(retrainedGraphFile.read())
# import the graph into the current default Graph, note that we don't need to be concerned with the return value
_ = tf.import_graph_def(graphDef, name='')
# end with
# if the test image directory listed above is not valid, show an error message and bail
if not os.path.isdir(TEST_IMAGES_DIR):
print("the test image directory does not seem to be a valid directory, check file / directory paths")
return
# end if
with tf.Session() as sess:
# for each file in the test images directory . . .
#for fileName in os.listdir(TEST_IMAGES_DIR):
run = 1
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(True)
print("wait for client")
conn, addr = s.accept()
while(run):
# if the file does not end in .jpg or .jpeg (case-insensitive), continue with the next iteration of the for loop
#if not (fileName.lower().endswith(".jpg") or fileName.lower().endswith(".jpeg")):
# continue
# end if
# show the file name on std out
#print(fileName)
# get the file name and full path of the current image file
#imageFileWithPath = os.path.join(TEST_IMAGES_DIR, fileName)
# attempt to open the image with OpenCV
#openCVImage = cv2.imread(imageFileWithPath)
print("---------------------------------------")
#s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#s.bind((TCP_IP, TCP_PORT))
#s.listen(True)
#print("wait for client")
#conn, addr = s.accept()
print("wait for data")
openCVImage, length = read_image_socket(conn, addr)
print("recived data " + str(length) + " byte")
#cv2.imwrite(str(length) + '_st.png',openCVImage)
#openCVImage = cv2.imread(TEST_IMAGES_DIR + "/foo1.jpg")
# if we were not able to successfully open the image, continue with the next iteration of the for loop
#if openCVImage is None:
# print("unable to open " + fileName + " as an OpenCV image")
# continue
# end if
# get the final tensor from the graph
finalTensor = sess.graph.get_tensor_by_name('final_result:0')
# convert the OpenCV image (numpy array) to a TensorFlow image
tfImage = np.array(openCVImage)[:, :, 0:3]
# run the network to get the predictions
predictions = sess.run(finalTensor, {'DecodeJpeg:0': tfImage})
# sort predictions from most confidence to least confidence
sortedPredictions = predictions[0].argsort()[-len(predictions[0]):][::-1]
# keep track of if we're going through the next for loop for the first time so we can show more info about
# the first prediction, which is the most likely prediction (they were sorted descending above)
onMostLikelyPrediction = True
# for each prediction . . .
cnt = 0
for prediction in sortedPredictions:
strClassification = classifications[prediction]
# if the classification (obtained from the directory name) ends with the letter "s", remove the "s" to change from plural to singular
if strClassification.endswith("s"):
strClassification = strClassification[:-1]
# end if
# get confidence, then get confidence rounded to 2 places after the decimal
confidence = predictions[0][prediction]
# if we're on the first (most likely) prediction, state what the object appears to be and show a % confidence to two decimal places
if onMostLikelyPrediction:
# get the score as a %
scoreAsAPercent = confidence * 100.0
# show the result to std out
print("the object appears to be a " + strClassification + ", " + "{0:.2f}".format(scoreAsAPercent) + "% confidence")
# write the result on the image
writeResultOnImage(openCVImage, strClassification + ", " + "{0:.2f}".format(scoreAsAPercent) + "% confidence")
strMsg = strClassification + ", " + "{0:.2f}".format(scoreAsAPercent) + "% confidence"
print("Send: " + strMsg + ";")
send_msg_socket(conn, addr, strMsg)
# finally we can show the OpenCV image
#cv2.imshow("Image", openCVImage)
# mark that we've show the most likely prediction at this point so the additional information in
# this if statement does not show again for this image
onMostLikelyPrediction = False
# end if
# for any prediction, show the confidence as a ratio to five decimal places
cnt += 1
#show top 5
if (cnt <= 3):
print(strClassification + " (" + "{0:.2f}".format(confidence*100) + " %)")
# end for
#print("---------------------------------------")
# pause until a key is pressed so the user can see the current image (shown above) and the prediction info
#cv2.waitKey()
# after a key is pressed, close the current window to prep for the next time around
#cv2.destroyAllWindows()
#run = 0
# end for
s.close()
print("Connection close")
# end with
# write the graph to file so we can view with TensorBoard
tfFileWriter = tf.summary.FileWriter(os.getcwd())
tfFileWriter.add_graph(sess.graph)
tfFileWriter.close()
# end main
#######################################################################################################################
def checkIfNecessaryPathsAndFilesExist():
if not os.path.exists(TEST_IMAGES_DIR):
print('')
print('ERROR: TEST_IMAGES_DIR "' + TEST_IMAGES_DIR + '" does not seem to exist')
print('Did you set up the test images?')
print('')
return False
# end if
if not os.path.exists(RETRAINED_LABELS_TXT_FILE_LOC):
print('ERROR: RETRAINED_LABELS_TXT_FILE_LOC "' + RETRAINED_LABELS_TXT_FILE_LOC + '" does not seem to exist')
return False
# end if
if not os.path.exists(RETRAINED_GRAPH_PB_FILE_LOC):
print('ERROR: RETRAINED_GRAPH_PB_FILE_LOC "' + RETRAINED_GRAPH_PB_FILE_LOC + '" does not seem to exist')
return False
# end if
return True
# end function
#######################################################################################################################
def writeResultOnImage(openCVImage, resultText):
# ToDo: this function may take some further fine-tuning to show the text well given any possible image size
imageHeight, imageWidth, sceneNumChannels = openCVImage.shape
# choose a font
fontFace = cv2.FONT_HERSHEY_TRIPLEX
# chose the font size and thickness as a fraction of the image size
fontScale = 1.0
fontThickness = 2
# make sure font thickness is an integer, if not, the OpenCV functions that use this may crash
fontThickness = int(fontThickness)
upperLeftTextOriginX = int(imageWidth * 0.05)
upperLeftTextOriginY = int(imageHeight * 0.05)
textSize, baseline = cv2.getTextSize(resultText, fontFace, fontScale, fontThickness)
textSizeWidth, textSizeHeight = textSize
# calculate the lower left origin of the text area based on the text area center, width, and height
lowerLeftTextOriginX = upperLeftTextOriginX
lowerLeftTextOriginY = upperLeftTextOriginY + textSizeHeight
# write the text on the image
cv2.putText(openCVImage, resultText, (lowerLeftTextOriginX, lowerLeftTextOriginY), fontFace, fontScale, SCALAR_BLUE, fontThickness)
# end function
#######################################################################################################################
if __name__ == "__main__":
main()