"""Deep learning. (Chapters 20)"""
import random
import statistics
import numpy as np
from keras import Sequential, optimizers
from keras.layers import Embedding, SimpleRNN, Dense
from keras.preprocessing import sequence
from utils4e import (conv1D, gaussian_kernel, element_wise_product, vector_add, random_weights,
scalar_vector_product, map_vector, mean_squared_error_loss)
[docs]
class Node:
"""
A single unit of a layer in a neural network
:param weights: weights between parent nodes and current node
:param value: value of current node
"""
def __init__(self, weights=None, value=None):
self.value = value
self.weights = weights or []
[docs]
class Layer:
"""
A layer in a neural network based on a computational graph.
:param size: number of units in the current layer
"""
def __init__(self, size):
self.nodes = np.array([Node() for _ in range(size)])
[docs]
def forward(self, inputs):
"""Define the operation to get the output of this layer"""
raise NotImplementedError
[docs]
class Activation:
"""Abstract base class for neural-network activation functions.
Subclasses implement ``function`` and its ``derivative``; calling an
instance applies the activation to its input.
"""
[docs]
def function(self, x):
"""Apply the activation function to input ``x``."""
raise NotImplementedError
[docs]
def derivative(self, x):
"""Return the derivative of the activation function at ``x``."""
raise NotImplementedError
def __call__(self, x):
return self.function(x)
[docs]
class Sigmoid(Activation):
"""Logistic sigmoid activation, ``1 / (1 + e**-x)``."""
[docs]
def function(self, x):
"""Return the logistic sigmoid of ``x``."""
return 1 / (1 + np.exp(-x))
[docs]
def derivative(self, value):
"""Return the sigmoid derivative given the layer output ``value``."""
return value * (1 - value)
[docs]
class ReLU(Activation):
"""Rectified Linear Unit activation, ``max(0, x)``."""
[docs]
def function(self, x):
"""Return ``max(0, x)``."""
return max(0, x)
[docs]
def derivative(self, value):
"""Return the ReLU derivative (1 if ``value`` > 0 else 0)."""
return 1 if value > 0 else 0
[docs]
class ELU(Activation):
"""Exponential Linear Unit activation, with scale ``alpha`` for non-positive inputs."""
def __init__(self, alpha=0.01):
self.alpha = alpha
[docs]
def function(self, x):
"""Return ``x`` if positive else ``alpha * (e**x - 1)``."""
return x if x > 0 else self.alpha * (np.exp(x) - 1)
[docs]
def derivative(self, value):
"""Return the ELU derivative given the layer output ``value``."""
return 1 if value > 0 else self.alpha * np.exp(value)
[docs]
class LeakyReLU(Activation):
"""Leaky ReLU activation, with small slope ``alpha`` for negative inputs."""
def __init__(self, alpha=0.01):
self.alpha = alpha
[docs]
def function(self, x):
"""Return ``max(x, alpha * x)``."""
return max(x, self.alpha * x)
[docs]
def derivative(self, value):
"""Return the Leaky ReLU derivative (1 if ``value`` > 0 else ``alpha``)."""
return 1 if value > 0 else self.alpha
[docs]
class Tanh(Activation):
"""Hyperbolic tangent activation."""
[docs]
def function(self, x):
"""Return ``tanh(x)``."""
return np.tanh(x)
[docs]
def derivative(self, value):
"""Return the tanh derivative given the layer output ``value`` (``1 - value**2``)."""
return 1 - (value ** 2)
[docs]
class SoftMax(Activation):
"""Softmax activation that normalises a vector into a probability distribution."""
[docs]
def function(self, x):
"""Return the softmax of vector ``x`` (normalised exponentials)."""
return np.exp(x) / np.sum(np.exp(x))
[docs]
def derivative(self, x):
"""Return a placeholder unit gradient for each element of ``x``."""
return np.ones_like(x)
[docs]
class SoftPlus(Activation):
"""SoftPlus activation, ``log(1 + e**x)`` (a smooth approximation of ReLU)."""
[docs]
def function(self, x):
"""Return ``log(1 + e**x)`` for ``x``."""
return np.log(1. + np.exp(x))
[docs]
def derivative(self, x):
"""Return the SoftPlus derivative at ``x`` (the logistic sigmoid)."""
return 1. / (1. + np.exp(-x))
[docs]
class Linear(Activation):
"""Identity (linear) activation that returns its input unchanged."""
[docs]
def function(self, x):
"""Return ``x`` unchanged."""
return x
[docs]
def derivative(self, x):
"""Return an all-ones gradient matching the shape of ``x``."""
return np.ones_like(x)
[docs]
class OutputLayer(Layer):
"""1D softmax output layer in 19.3.2."""
def __init__(self, size=3):
super().__init__(size)
[docs]
def forward(self, inputs, activation=SoftMax):
"""Apply ``activation`` (softmax by default) to ``inputs`` and store it in each node."""
assert len(self.nodes) == len(inputs)
res = activation().function(inputs)
for node, val in zip(self.nodes, res):
node.value = val
return res
[docs]
class DenseLayer(Layer):
"""
1D dense layer in a neural network.
:param in_size: (int) input vector size
:param out_size: (int) output vector size
:param activation: (Activation object) activation function
"""
def __init__(self, in_size=3, out_size=3, activation=Sigmoid):
super().__init__(out_size)
self.out_size = out_size
self.inputs = None
self.activation = activation()
# initialize weights
for node in self.nodes:
node.weights = random_weights(-0.5, 0.5, in_size)
[docs]
def forward(self, inputs):
"""Apply the activation to each unit's weighted sum of ``inputs`` and return the outputs."""
self.inputs = inputs
res = []
# get the output value of each unit
for unit in self.nodes:
val = self.activation.function(np.dot(unit.weights, inputs))
unit.value = val
res.append(val)
return res
[docs]
class ConvLayer1D(Layer):
"""
1D convolution layer of in neural network.
:param kernel_size: convolution kernel size
"""
def __init__(self, size=3, kernel_size=3):
super().__init__(size)
# init convolution kernel as gaussian kernel
for node in self.nodes:
node.weights = gaussian_kernel(kernel_size)
[docs]
def forward(self, features):
"""Convolve each input channel in ``features`` with its node kernel and return the outputs."""
# each node in layer takes a channel in the features
assert len(self.nodes) == len(features)
res = []
# compute the convolution output of each channel, store it in node.val
for node, feature in zip(self.nodes, features):
out = conv1D(feature, node.weights)
res.append(out)
node.value = out
return res
[docs]
class MaxPoolingLayer1D(Layer):
"""
1D max pooling layer in a neural network.
:param kernel_size: max pooling area size
"""
def __init__(self, size=3, kernel_size=3):
super().__init__(size)
self.kernel_size = kernel_size
self.inputs = None
[docs]
def forward(self, features):
"""Apply 1D max pooling over each channel in ``features`` and return the pooled outputs."""
assert len(self.nodes) == len(features)
res = []
self.inputs = features
# do max pooling for each channel in features
for i in range(len(self.nodes)):
feature = features[i]
# get the max value in a kernel_size * kernel_size area
out = [max(feature[i:i + self.kernel_size])
for i in range(len(feature) - self.kernel_size + 1)]
res.append(out)
self.nodes[i].value = out
return res
[docs]
class BatchNormalizationLayer(Layer):
"""Batch normalization layer."""
def __init__(self, size, eps=0.001):
super().__init__(size)
self.eps = eps
# self.weights = [beta, gamma]
self.weights = [0, 0]
self.inputs = None
[docs]
def forward(self, inputs):
"""Normalise ``inputs`` by their mean and std, then scale and shift by the layer weights."""
# mean value of inputs
mu = sum(inputs) / len(inputs)
# standard error of inputs
stderr = statistics.stdev(inputs)
self.inputs = inputs
res = []
# get normalized value of each input
for i in range(len(self.nodes)):
val = [(inputs[i] - mu) * self.weights[0] / np.sqrt(self.eps + stderr ** 2) + self.weights[1]]
res.append(val)
self.nodes[i].value = val
return res
[docs]
def init_examples(examples, idx_i, idx_t, o_units):
"""Init examples from dataset.examples."""
inputs, targets = {}, {}
for i, e in enumerate(examples):
# input values of e
inputs[i] = [e[i] for i in idx_i]
if o_units > 1:
# one-hot representation of e's target
t = [0 for i in range(o_units)]
t[e[idx_t]] = 1
targets[i] = t
else:
# target value of e
targets[i] = [e[idx_t]]
return inputs, targets
[docs]
def stochastic_gradient_descent(dataset, net, loss, epochs=1000, l_rate=0.01, batch_size=1, verbose=False):
"""
Gradient descent algorithm to update the learnable parameters of a network.
:return: the updated network
"""
examples = dataset.examples # init data
for e in range(epochs):
total_loss = 0
random.shuffle(examples)
weights = [[node.weights for node in layer.nodes] for layer in net]
for batch in get_batch(examples, batch_size):
inputs, targets = init_examples(batch, dataset.inputs, dataset.target, len(net[-1].nodes))
# compute gradients of weights
gs, batch_loss = BackPropagation(inputs, targets, weights, net, loss)
# update weights with gradient descent
weights = [x + y for x, y in zip(weights, [np.array(tg) * -l_rate for tg in gs])]
total_loss += batch_loss
# update the weights of network each batch
for i in range(len(net)):
if weights[i].size != 0:
for j in range(len(weights[i])):
net[i].nodes[j].weights = weights[i][j]
if verbose:
print("epoch:{}, total_loss:{}".format(e + 1, total_loss))
return net
[docs]
def adam(dataset, net, loss, epochs=1000, rho=(0.9, 0.999), delta=1 / 10 ** 8,
l_rate=0.001, batch_size=1, verbose=False):
"""
[Figure 19.6]
Adam optimizer to update the learnable parameters of a network.
Required parameters are similar to gradient descent.
:return the updated network
"""
examples = dataset.examples
# init s,r and t
s = [[[0] * len(node.weights) for node in layer.nodes] for layer in net]
r = [[[0] * len(node.weights) for node in layer.nodes] for layer in net]
t = 0
# repeat util converge
for e in range(epochs):
# total loss of each epoch
total_loss = 0
random.shuffle(examples)
weights = [[node.weights for node in layer.nodes] for layer in net]
for batch in get_batch(examples, batch_size):
t += 1
inputs, targets = init_examples(batch, dataset.inputs, dataset.target, len(net[-1].nodes))
# compute gradients of weights
gs, batch_loss = BackPropagation(inputs, targets, weights, net, loss)
# update s,r,s_hat and r_gat
s = vector_add(scalar_vector_product(rho[0], s),
scalar_vector_product((1 - rho[0]), gs))
r = vector_add(scalar_vector_product(rho[1], r),
scalar_vector_product((1 - rho[1]), element_wise_product(gs, gs)))
s_hat = scalar_vector_product(1 / (1 - rho[0] ** t), s)
r_hat = scalar_vector_product(1 / (1 - rho[1] ** t), r)
# rescale r_hat
r_hat = map_vector(lambda x: 1 / (np.sqrt(x) + delta), r_hat)
# delta weights
delta_theta = scalar_vector_product(-l_rate, element_wise_product(s_hat, r_hat))
weights = vector_add(weights, delta_theta)
total_loss += batch_loss
# update the weights of network each batch
for i in range(len(net)):
if weights[i]:
for j in range(len(weights[i])):
net[i].nodes[j].weights = weights[i][j]
if verbose:
print("epoch:{}, total_loss:{}".format(e + 1, total_loss))
return net
[docs]
def BackPropagation(inputs, targets, theta, net, loss):
"""
The back-propagation algorithm for multilayer networks in only one epoch, to calculate gradients of theta.
:param inputs: a batch of inputs in an array. Each input is an iterable object
:param targets: a batch of targets in an array. Each target is an iterable object
:param theta: parameters to be updated
:param net: a list of predefined layer objects representing their linear sequence
:param loss: a predefined loss function taking array of inputs and targets
:return: gradients of theta, loss of the input batch
"""
assert len(inputs) == len(targets)
o_units = len(net[-1].nodes)
n_layers = len(net)
batch_size = len(inputs)
gradients = [[[] for _ in layer.nodes] for layer in net]
total_gradients = [[[0] * len(node.weights) for node in layer.nodes] for layer in net]
batch_loss = 0
# iterate over each example in batch
for e in range(batch_size):
i_val = inputs[e]
t_val = targets[e]
# forward pass and compute batch loss
for i in range(1, n_layers):
layer_out = net[i].forward(i_val)
i_val = layer_out
batch_loss += loss(t_val, layer_out)
# initialize delta
delta = [[] for _ in range(n_layers)]
previous = np.array([layer_out[i] - t_val[i] for i in range(o_units)])
h_layers = n_layers - 1
# backward pass
for i in range(h_layers, 0, -1):
layer = net[i]
derivative = np.array([layer.activation.derivative(node.value) for node in layer.nodes])
delta[i] = previous * derivative
# pass to layer i-1 in the next iteration
previous = np.matmul([delta[i]], theta[i])[0]
# compute gradient of layer i
gradients[i] = [scalar_vector_product(d, net[i].inputs) for d in delta[i]]
# add gradient of current example to batch gradient
total_gradients = vector_add(total_gradients, gradients)
return total_gradients, batch_loss
[docs]
def get_batch(examples, batch_size=1):
"""Split examples into multiple batches"""
for i in range(0, len(examples), batch_size):
yield examples[i: i + batch_size]
[docs]
class NeuralNetworkLearner:
"""
Simple dense multilayer neural network.
:param hidden_layer_sizes: size of hidden layers in the form of a list
"""
def __init__(self, dataset, hidden_layer_sizes, l_rate=0.01, epochs=1000, batch_size=10,
optimizer=stochastic_gradient_descent, loss=mean_squared_error_loss, verbose=False, plot=False):
self.dataset = dataset
self.l_rate = l_rate
self.epochs = epochs
self.batch_size = batch_size
self.optimizer = optimizer
self.loss = loss
self.verbose = verbose
self.plot = plot
input_size = len(dataset.inputs)
output_size = len(dataset.values[dataset.target])
# initialize the network
raw_net = [InputLayer(input_size)]
# add hidden layers
hidden_input_size = input_size
for h_size in hidden_layer_sizes:
raw_net.append(DenseLayer(hidden_input_size, h_size))
hidden_input_size = h_size
raw_net.append(DenseLayer(hidden_input_size, output_size))
self.raw_net = raw_net
[docs]
def fit(self, X, y):
"""Train the network with the configured optimizer and loss, returning ``self``."""
self.learned_net = self.optimizer(self.dataset, self.raw_net, loss=self.loss, epochs=self.epochs,
l_rate=self.l_rate, batch_size=self.batch_size, verbose=self.verbose)
return self
[docs]
def predict(self, example):
"""Forward-pass ``example`` through the trained net and return the index of the max output."""
n_layers = len(self.learned_net)
layer_input = example
layer_out = example
# get the output of each layer by forward passing
for i in range(1, n_layers):
layer_out = self.learned_net[i].forward(np.array(layer_input).reshape((-1, 1)))
layer_input = layer_out
return layer_out.index(max(layer_out))
[docs]
class PerceptronLearner:
"""
Simple perceptron neural network.
"""
def __init__(self, dataset, l_rate=0.01, epochs=1000, batch_size=10, optimizer=stochastic_gradient_descent,
loss=mean_squared_error_loss, verbose=False, plot=False):
self.dataset = dataset
self.l_rate = l_rate
self.epochs = epochs
self.batch_size = batch_size
self.optimizer = optimizer
self.loss = loss
self.verbose = verbose
self.plot = plot
input_size = len(dataset.inputs)
output_size = len(dataset.values[dataset.target])
# initialize the network, add dense layer
self.raw_net = [InputLayer(input_size), DenseLayer(input_size, output_size)]
[docs]
def fit(self, X, y):
"""Train the perceptron with the configured optimizer and loss, returning ``self``."""
self.learned_net = self.optimizer(self.dataset, self.raw_net, loss=self.loss, epochs=self.epochs,
l_rate=self.l_rate, batch_size=self.batch_size, verbose=self.verbose)
return self
[docs]
def predict(self, example):
"""Forward-pass ``example`` and return the index of the maximum output unit."""
layer_out = self.learned_net[1].forward(np.array(example).reshape((-1, 1)))
return layer_out.index(max(layer_out))
[docs]
def keras_dataset_loader(dataset, max_length=500):
"""
Helper function to load keras datasets.
:param dataset: keras data set type
:param max_length: max length of each input sequence
"""
# init dataset
(X_train, y_train), (X_val, y_val) = dataset
if max_length > 0:
X_train = sequence.pad_sequences(X_train, maxlen=max_length)
X_val = sequence.pad_sequences(X_val, maxlen=max_length)
return (X_train[10:], y_train[10:]), (X_val, y_val), (X_train[:10], y_train[:10])
[docs]
def SimpleRNNLearner(train_data, val_data, epochs=2, verbose=False):
"""
RNN example for text sentimental analysis.
:param train_data:
a tuple of (training data, targets)
Training data: ndarray taking training examples, while each example is coded by embedding
Targets: ndarray taking targets of each example. Each target is mapped to an integer
:param val_data: a tuple of (validation data, targets)
:param epochs: number of epochs
:param verbose: verbosity mode
:return: a keras model
"""
total_inputs = 5000
input_length = 500
# init data
X_train, y_train = train_data
X_val, y_val = val_data
# init a the sequential network (embedding layer, rnn layer, dense layer)
model = Sequential()
model.add(Embedding(total_inputs, 32, input_length=input_length))
model.add(SimpleRNN(units=128))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
# train the model
model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=epochs, batch_size=128, verbose=verbose)
return model
[docs]
def AutoencoderLearner(inputs, encoding_size, epochs=200, verbose=False):
"""
Simple example of linear auto encoder learning producing the input itself.
:param inputs: a batch of input data in np.ndarray type
:param encoding_size: int, the size of encoding layer
:param epochs: number of epochs
:param verbose: verbosity mode
:return: a keras model
"""
# init data
input_size = len(inputs[0])
# init model
model = Sequential()
model.add(Dense(encoding_size, input_dim=input_size, activation='relu', kernel_initializer='random_uniform',
bias_initializer='ones'))
model.add(Dense(input_size, activation='relu', kernel_initializer='random_uniform', bias_initializer='ones'))
# update model with sgd
sgd = optimizers.SGD(learning_rate=0.01)
model.compile(loss='mean_squared_error', optimizer=sgd, metrics=['accuracy'])
# train the model
model.fit(inputs, inputs, epochs=epochs, batch_size=10, verbose=verbose)
return model