-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBPNN.py
208 lines (177 loc) · 7.01 KB
/
BPNN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import pandas as pd
import numpy as np
import datetime
import matplotlib.pyplot as plt
from pandas.plotting import radviz
# step1:初始化参数
def initialize_parameters(n_x, n_h, n_y):
np.random.seed(2)
# 设置权重和偏置矩阵
w1 = np.random.randn(n_h, n_x) * 0.01
b1 = np.zeros(shape=(n_h, 1))
w2 = np.random.randn(n_y, n_h) * 0.01
b2 = np.zeros(shape=(n_y, 1))
# 通过字典存储参数
parameters = {'w1': w1, 'b1': b1, 'w2': w2, 'b2': b2}
return parameters
# step2:前向传播
def forward_propagation(X, parameters):
w1 = parameters['w1']
b1 = parameters['b1']
w2 = parameters['w2']
b2 = parameters['b2']
# 通过前向传播来计算a2
z1 = np.dot(w1, X) + b1
a1 = np.tanh(z1)
z2 = np.dot(w2, a1) + b2
a2 = 1 / (1 + np.exp(-z2))
# 通过字典存储参数
cache = {'z1': z1, 'a1': a1, 'z2': z2, 'a2': a2}
return a2, cache
# step3:计算代价函数
def compute_cost(a2, Y):
m = Y.shape[1] # Y的列数即为总的样本数
logprobs = np.multiply(np.log(a2), Y) + np.multiply((1 - Y), np.log(1 - a2))
cost = - np.sum(logprobs) / m
return cost
# step4:反向传播
def backward_propagation(parameters, cache, X, Y):
m = Y.shape[1]
w2 = parameters['w2']
a1 = cache['a1']
a2 = cache['a2']
# 反向传播,计算dw1、db1、dw2、db2
dz2 = a2 - Y
dw2 = (1 / m) * np.dot(dz2, a1.T)
db2 = (1 / m) * np.sum(dz2, axis=1, keepdims=True)
dz1 = np.multiply(np.dot(w2.T, dz2), 1 - np.power(a1, 2))
dw1 = (1 / m) * np.dot(dz1, X.T)
db1 = (1 / m) * np.sum(dz1, axis=1, keepdims=True)
grads = {'dw1': dw1, 'db1': db1, 'dw2': dw2, 'db2': db2}
return grads
# step5:更新参数
def update_parameters(parameters, grads, learning_rate=0.4):
w1 = parameters['w1']
b1 = parameters['b1']
w2 = parameters['w2']
b2 = parameters['b2']
dw1 = grads['dw1']
db1 = grads['db1']
dw2 = grads['dw2']
db2 = grads['db2']
# 更新参数
w1 = w1 - dw1 * learning_rate
b1 = b1 - db1 * learning_rate
w2 = w2 - dw2 * learning_rate
b2 = b2 - db2 * learning_rate
parameters = {'w1': w1, 'b1': b1, 'w2': w2, 'b2': b2}
return parameters
# step6:建立神经网络
def nn_model(X, Y, n_h, n_input, n_output, num_iterations=10000, print_cost=False):
np.random.seed(3)
n_x = n_input # 输入层节点数
n_y = n_output # 输出层节点数
# 1.初始化参数
parameters = initialize_parameters(n_x, n_h, n_y)
# 梯度下降循环
for i in range(0, num_iterations):
# 2.前向传播
a2, cache = forward_propagation(X, parameters)
# 3.计算代价函数
cost = compute_cost(a2, Y)
# 4.反向传播
grads = backward_propagation(parameters, cache, X, Y)
# 5.更新参数
parameters = update_parameters(parameters, grads)
# 每1000次迭代,输出一次代价函数
if print_cost and i % 1000 == 0:
print('迭代第%i次,代价函数为:%f' % (i, cost))
return parameters
# 6.模型评估
def predict(parameters, x_test, y_test):
w1 = parameters['w1']
b1 = parameters['b1']
w2 = parameters['w2']
b2 = parameters['b2']
z1 = np.dot(w1, x_test) + b1
a1 = np.tanh(z1)
z2 = np.dot(w2, a1) + b2
a2 = 1 / (1 + np.exp(-z2))
# 结果的维度
n_rows = y_test.shape[0]
n_cols = y_test.shape[1]
# 预测值结果存储
output = np.empty(shape=(n_rows, n_cols), dtype=int)
for i in range(n_rows):
for j in range(n_cols):
if a2[i][j] > 0.5:
output[i][j] = 1
else:
output[i][j] = 0
print('预测结果:', output)
print('真实结果:', y_test)
count = 0
for k in range(0, n_cols):
if output[0][k] == y_test[0][k] and output[1][k] == y_test[1][k] and output[2][k] == y_test[2][k]:
count = count + 1
else:
print('错误分类样本的序号:', k + 1)
acc = count / int(y_test.shape[1]) * 100
print('准确率:%.2f%%' % acc)
return output
# 7.结果可视化
def result_visualization(x_test, y_test, result):
cols = y_test.shape[1]
y = []
pre = []
for i in range(cols):
if y_test[0][i] == 0 and y_test[1][i] == 0 and y_test[2][i] == 1:
y.append('setosa')
elif y_test[0][i] == 0 and y_test[1][i] == 1 and y_test[2][i] == 0:
y.append('versicolor')
elif y_test[0][i] == 1 and y_test[1][i] == 0 and y_test[2][i] == 0:
y.append('virginica')
for j in range(cols):
if result[0][j] == 0 and result[1][j] == 0 and result[2][j] == 1:
pre.append('setosa')
elif result[0][j] == 0 and result[1][j] == 1 and result[2][j] == 0:
pre.append('versicolor')
elif result[0][j] == 1 and result[1][j] == 0 and result[2][j] == 0:
pre.append('virginica')
else:
pre.append('unknown')
# 将特征和类别矩阵拼接起来
real = np.column_stack((x_test.T, y))
prediction = np.column_stack((x_test.T, pre))
# 转换成DataFrame类型,并添加columns
df_real = pd.DataFrame(real, index=None, columns=['Sepal Length', 'Sepal Width', 'Petal Length', 'Petal Width', 'Species'])
df_prediction = pd.DataFrame(prediction, index=None, columns=['Sepal Length', 'Sepal Width', 'Petal Length', 'Petal Width', 'Species'])
# 将特征列转换为float类型
df_real[['Sepal Length', 'Sepal Width', 'Petal Length', 'Petal Width']] = df_real[['Sepal Length', 'Sepal Width', 'Petal Length', 'Petal Width']].astype(float)
df_prediction[['Sepal Length', 'Sepal Width', 'Petal Length', 'Petal Width']] = df_prediction[['Sepal Length', 'Sepal Width', 'Petal Length', 'Petal Width']].astype(float)
# 绘图
plt.figure('真实分类')
radviz(df_real, 'Species', color=['blue', 'green', 'red', 'yellow'])
plt.figure('预测分类')
radviz(df_prediction, 'Species', color=['blue', 'green', 'red', 'yellow'])
plt.show()
if __name__ == "__main__":
# 读取数据
data_set = pd.read_csv('C:\\Users\\zou\\Desktop\\ML\\bpnn_data\\iris_training.csv', header=None)
X = data_set.iloc[:, 0:4].values.T
Y = data_set.iloc[:, 4:].values.T
Y = Y.astype('uint8')
# 开始训练
start_time = datetime.datetime.now()
# 输入4个节点,隐藏层10个节点,输出3个节点,迭代10000次
parameters = nn_model(X, Y, n_h=10, n_input=4, n_output=3, num_iterations=10000, print_cost=True)
end_time = datetime.datetime.now()
print("用时:" + str((end_time - start_time).seconds) + 's' + str(round((end_time - start_time).microseconds / 1000)) + 'ms')
# 对模型进行测试
data_test = pd.read_csv('C:\\Users\\zou\\Desktop\\ML\\bpnn_data\\iris_test.csv', header=None)
x_test = data_test.iloc[:, 0:4].values.T
y_test = data_test.iloc[:, 4:].values.T
y_test = y_test.astype('uint8')
result = predict(parameters, x_test, y_test)
# 分类结果可视化
result_visualization(x_test, y_test, result)