Coding the Recurrent Neural Network
Look at a write-up that explains how an RNN works from the time of initialization of weights to forward path, back propagate, and update of the weight network.
Join the DZone community and get the full member experience.
Join For FreeWhile I was referring to the following articles to learn the basics, I realized that I could add a few more things so that the community and developers like me can benefit
https://www.analyticsvidhya.com/blog/2017/12/introduction-to-recurrent-neural-networks/
https://towardsdatascience.com/recurrent-neural-networks-rnns-3f06d7653a85
The below write-up explains in detail with code snippets, how a RNN works from the time of initialization of weights to forward path, back propagate and update of the weight network.
Let us take this example of a single hidden layer with 3 neurons processing the word ‘Hello’ (example taken from the first link above)
For this example, I am trying to keep the computation simple and calculating the values upto H2 only. It can be extended to H4 and further down and many hidden layers
Step1 – Weight Initialization:
- Considering the input layer to be [[1,0,0,0],[0,1,0,0]] - for letters 'h' and 'e'
- The same weight networks will be repeated for all the states in the same hidden layer. When we will add a new hidden layer, a new weight matrix will be added
- Input weight network(Wxh) will be 3*5 matrix. 3 rows due to 3 neurons, 4 columns due to the 4 input values(one hot encoded: 1,0,0,0 for ‘h’, 0,1,0,0 for ‘e’….), 5th column is the bias weight
xxxxxxxxxx
weight_arrayTest=[[0.287027,0.84606,0.572392,0.486813,0.56700],
[0.902874,0.871522,0.691079,0.18998,0.56700],
[0.537524,0.09224,0.558159,0.491528,0.56700]]
weight_hidden_dictTest=dict()
weight_hidden_dictTest.update({'Weight':weight_arrayTest})
weight_network.append(weight_hidden_dictTest)
- Recurrent weight network(Whh): [0.427043]. This is a 1*1 matrix for 1 hidden layer.
- Output weight network (Wyh) will be a 4*3 matrix. 4 rows as the array size of the input array is 4(for each input element e.g x1[1,0,0,0] …., there has to be an output), 3 columns as there are 3 neurons and each neuron will emit a state)
xxxxxxxxxx
output_weight_arrayTest=[[0.37168,0.974829459,0.830034886],
[0.39141,0.282585823,0.659835709],
[0.64985,0.09821557,0.334287084],
[0.91266,0.32581642,0.144630018]]
output_weight_hidden_dictTest=dict()
output_weight_hidden_dictTest.update({'Weight':output_weight_arrayTest})
output_weight_network.append(output_weight_hidden_dictTest)
Step 2 – Forward Path
The forward path is explained very well in the first link above through an excel based calculation. The code snippet below explains the logic
I have broken it down into multiple small parts so that it is easier to understand. The code can be simplified through dot product and array arithmetic
- LOOP through the input Weight Network and multiply the weights with the input values(dot product)
('input weight Network', [{'Weight': [[0.287027, 0.84606, 0.572392, 0.486813, 0.567], [0.902874, 0.871522, 0.691079, 0.18998, 0.567], [0.537524, 0.09224, 0.558159, 0.491528, 0.567]]}])
inputs=[[1,0,0,0],
[0,1,0,0]]
xxxxxxxxxx
for k in weight_network:
new_inputs=list()
for j in k.values()[0]:
aggregate = 0.0
aggregate_list=list()
for n in range(len(j[0:len(j)-1])):
aggregate += inputs[n]*j[0:len(j)-1][n]
aggregate += j[-1]*1#for the bias
if len(previous_states) != 0:#adding the previous states
aggregate_sum = aggregate + aggregate1_list[indices]
output = 1/(1+np.exp(-aggregate_sum))#activation
new_inputs.append(output)
else:#activation
output = 1/(1+np.exp(-aggregate))
new_inputs.append(output)
indices += 1
#at each step, the recurrent network will also produce an output
hidden_layer_output.append(new_inputs)
inputs = new_inputs
return [inputs,hidden_layer_output]
- If the previous state is not zero, we will need to add the previous states to the aggregate that we are calculating from the dot product. Aggregate1_list is calculated through
x
if len(previous_states) != 0:
#for i in previous_states[sample_row_index-1]:
for i in range(len(previous_states[sample_row_index-1])):
print("Values of i",previous_states[sample_row_index-1][i])
print("Values of recurrent neuron weight",recurrrent_neuron_weight[0])
aggregate1 = previous_states[sample_row_index-1][i]*recurrrent_neuron_weight[0]
aggregate1_list.append(aggregate1)
print("Aggregate 1 list",aggregate1_list)
- After the dot product and the previous states are added, we use the activation function(I used the sigmoid) to calculate the corresponding output(hidden state value for each neuron). Since, I have restricted the input layer/loop to ‘h’,’e’ of the word ‘hello’, the hidden state output after 2 iterations of the input array will look like the 2*3 matrix below(2 rows for the 2 input rows and 2 columns for the 3 neurons in each state for the 1 hidden layer)
('States output are', [[0.70141121474134871, 0.81303823389731422, 0.75110680687063036], [0.84717227340513512, 0.85640226112907092, 0.72710720520726202]])
- Calculate the output value(y) at each step with the States Output and the output weight network
xxxxxxxxxx
def outputCalc(self,new_inputs):
output_arr=list()
for k in output_weight_network:
for j in k.values()[0]:
aggregate = 0.0
for n in range(len(j[0:len(j)])):
aggregate += new_inputs[n]*j[0:len(j)][n]
output_arr.append(aggregate)
return output_arr
The output data structure for 2 iterations of the input array loop will look like the 2*4 matrix below(2 rows for the 2 input rows and 4 columns for the 4 one hot encoded input elements for each X(t))
'Output Arr List', [[1.6767189948061862, 0.99989953446445878, 0.7867503957150177, 1.0136837569350066], [1.7532474896660379, 1.0533701355806655, 0.87770948548253291, 1.1573716940239656]])
- Predict through SoftMax
xxxxxxxxxx
def predictSoftMax(self,output_values):#softmax function
print("Output values in predict method is",output_values)
out = np.exp(output_values)
print("Sigma of softmax is",out/np.sum(out))
return out/np.sum(out)#the output of the array will add up to 1
('Predicted Softmax values', array([ 0.40578425, 0.20153121, 0.16906506, 0.22361947]))
Array size of 4 for 4 outputs(4 output values for each of 4 one hot encoded input elements)
- Calculate the Performance (Cost Function) - -(1/2)(d-z)**2
xxxxxxxxxx
def performance(self,outputs,sample_row_index):
performance_errors=0.0
for i in range(len(outputs[sample_row_index])):
print("Actual", input_arr[sample_row_index][i])
print("Predicted", outputs[sample_row_index][i])
performance_errors += -(0.5)*(input_arr[sample_row_index][i]-outputs[sample_row_index][i])**2 # -(1/2)(d-z)**2
return performance_errors
We will see the Error value decreasing with each loop/iteration
Step 3 – Back Propagation
The math behind the backpropagation is shown in the picture below
Initialize the delta arrays for input weight matrix, output weight matrix and recurrent weight matrix. Using the standard notation U=input weight, V=output weight, W=recurrent weight
xxxxxxxxxx
dV = np.zeros(shape=(4,3),dtype=float)
#du_main_list=list()
dU = np.zeros(shape=(no_of_neurons_hidden_layer,input_features+1),dtype=float)
#dw_main_list=list()
dW = np.zeros(shape=(no_of_hidden_layers),dtype=float)
traverse in the reverse order of the length of the network for the single hidden layer (in this example, the reverse order will be from 2 as I have stopped after step 2 to keep it simple
- Step 1: calculate the delta weight matrix for the output weight network
The (d-z) is first calculated and then fed into the second code snippet(dot product of (d-z).h(t).T)
x
def performance_derivative_output(self,outputs,sample_row_index):
error_list=list()
for i in range(len(outputs[sample_row_index])):
print("Actual derivative function", input_arr[sample_row_index][i])
print("Predicted derivative function", outputs[sample_row_index][i])
print("Actual-Predicted Value derivative function",input_arr[sample_row_index][i]-outputs[sample_row_index][i])
error_list.append(input_arr[sample_row_index][i]-outputs[sample_row_index][i])#(d-z)
return error_list
xxxxxxxxxx
dV += np.dot(np.array(output_array_list[i])[:,None],np.array(states_list[i])[None,:])
('Output Array List', [[-0.67671899480618625, -0.99989953446445878, -0.7867503957150177, -1.0136837569350066], [-1.7532474896660379, -0.053370135580665501, -0.87770948548253291, -1.1573716940239656]])
('States List', [[0.70141121474134871, 0.81303823389731422, 0.75110680687063036], [0.84717227340513512, 0.85640226112907092, 0.72710720520726202]])
After 2 loops, the final dV weight matrix will look like below (4*3 matrix size)
'dV', array([[-1.95996095, -2.05168353, -1.78308713],
[-0.74655445, -0.85866286, -0.78983716],
[-1.29540669, -1.39133054, -1.22912247],
[-1.69150236, -1.81533939, -1.60291807]]))
- Step 2: calculate [y(t) desired – y(t) actual)].w(yh). You can see the math in the picture above
xxxxxxxxxx
dList1 = np.dot(output_weight_array_Transpose,np.array(output_array_list[i])[:,None])
('Output weight array Transpose', array([[ 0.37168 , 0.39141 , 0.64985 , 0.91266 ],
[ 0.97482946, 0.28258582, 0.09821557, 0.32581642],
[ 0.83003489, 0.65983571, 0.33428708, 0.14463002]]))
The output_array_list is the output of performance_derivative_output() function in the previous step
After 2 loops, the output of the above statement will generate the following array
('dList 1 current', array([[-2.07931196],
[-1.349789 ],
[-1.63107939]]))
- Step 3: Add the carry forward to dList1. The carry forward equation is given in the picture above. The code is given in Step 6
xxxxxxxxxx
dList1 = dList1 + delta_error[:,None]
The updated array after the second run of the loop (which is state 1)
('Updated Delta 1 list', array([[-2.52200365],
[-1.77097217],
[-2.00677954]]))
- Step 4: calculate ([y(t) desired – y(t) actual)].w(yh)+delta_error)*[h(t)(1-h(t))]
xxxxxxxxxx
updated_states_list = self.calculateSigmoidBackProp(states_list[i])#[h(t)(1-h(t))
x
def calculateSigmoidBackProp(self,states_list):#[h(t)(1-h(t))
updated_states_list=list()
for i in range(len(states_list)):
print('states_list[i]',states_list[i])
updated_states_list.append(states_list[i]*(1-states_list[i]))
return updated_states_list
xxxxxxxxxx
dList2=list()#([y(t) desired – y(t) actual)].w(yh)+delta_error)*h(t)(1-h(t)
for i1 in range(len(dList1)):
sum=0
for j in range(len(updated_states_list)):
print("dList 1 elements",dList1[i1])
print("updated_states_list[j] elements",updated_states_list[j])
sum += dList1[i1][0]*updated_states_list[j]
print("Individual sum",sum)
dList2.append(sum)
- Step 5: Take the output of Step 4 and do a dot product with the Input
xxxxxxxxxx
input_arr[i].append(1)#append 1 for the input bias
dUMain_list = list()
for m in np.array(dList2):
sub_list = list()
for m1 in input_arr[i]:
sub_list.append(m*m1)
dUMain_list.append(sub_list)
print("Input dUMain_list",dUMain_list)
dU += np.array(dUMain_list)
dU after 2 loops (matrix of 3*5)
('Du Main List', array([[-1.38303139, -1.03664432, 0. , 0. , -2.41967571],
[-0.97117627, -0.98627813, 0. , 0. , -1.9574544 ],
[-1.10048972, -0.87977125, 0. , 0. , -1.98026097]]))
- Step 6: Calculate the carry forward that was added in Step 3:
xxxxxxxxxx
delta_error = np.dot(np.array(dList2)[:,None], np.array(recurrrent_neuron_weight))
- Step 7: calculate dW: Take the output of Step 4 and do a dot product with the h(t-1)
x
if i != 0:#stop if we are at h(1) and h(0) is zero
print("dList2",np.array(dList2)[None,:])
print("sates list[i-1]",np.array(states_list[i-1])[:,None])
dW_Sum=0
for w in range(len(dList2)):
print("MUltiplying {0} * {1}".format(dList2[w],states_list[i-1][w]))
dW_Sum+=dList2[w]*states_list[i-1][w]
print("dW_Sum",dW_Sum)
dW += dW_Sum
print("DW Main List", dW)
dW after 2 loops (matrix of 1*1)
('DW Main List', array([-2.18979795]))
Step 4 – Update Weights
We now have the Delta weight matrix for U,V,W. While calculating, we will have to make sure that the dimensions of dV,dU and dW are same as the initialized matrix
('dV before weight update', array([[-1.95996095, -2.05168353, -1.78308713],
[-0.74655445, -0.85866286, -0.78983716],
[-1.29540669, -1.39133054, -1.22912247],
[-1.69150236, -1.81533939, -1.60291807]]))
('dU before weight update', array([[-1.38303139, -1.03664432, 0. , 0. , -2.41967571],
[-0.97117627, -0.98627813, 0. , 0. , -1.9574544 ],
[-1.10048972, -0.87977125, 0. , 0. , -1.98026097]]))
('dW before weight update', array([-2.18979795]))
The dimensions match the input weight networks. The updated weight matrix values can be derived by just adding up the input and the delta values
For the output_weight_network[0] and weight_network[0], Index 0 is hardcoded as we have only 1 hidden layer for this example. With multiple hidden layers, the index will be fetched via a loop
xxxxxxxxxx
new_output_weights = list(output_weight_network[0].get('Weight')) + dV
new_input_weights = list(weight_network[0].get('Weight')) + dU
new_recurrent_weights = recurrrent_neuron_weight + dW
Opinions expressed by DZone contributors are their own.
Trending
-
Effortlessly Streamlining Test-Driven Development and CI Testing for Kafka Developers
-
RBAC With API Gateway and Open Policy Agent (OPA)
-
What ChatGPT Needs Is Context
-
A Complete Guide to AWS File Handling and How It Is Revolutionizing Cloud Storage
Comments