Gradient agreement in MAML

Now, we'll define a class called GradientAgreement_MAML, where we'll implement the gradient agreement MAML algorithm. In the __init__ method, we'll initialize all of the necessary variables. Then, we'll define our sigmoid activation function. Following this, we'll define our train function.

Let's see this step by step and we'll see the overall code:

class GradientAgreement_MAML(object):

We define the __init__ method and initialize all variables:

    def __init__(self):

#initialize number of tasks i.e number of tasks we need in each batch of tasks
self.num_tasks = 2

#number of samples i.e number of shots -number of data points (k) we need to have in each task
self.num_samples = 10

#number of epochs i.e training iterations
self.epochs = 100

#hyperparameter for the inner loop (inner gradient update)
self.alpha = 0.0001

#hyperparameter for the outer loop (outer gradient update) i.e meta optimization
self.beta = 0.0001

#randomly initialize our model parameter theta
self.theta = np.random.normal(size=self.pol_ord).reshape(self.pol_ord, 1)

Now, we define a function called sigmoid for converting our x into polynomial form:

    def sigmoid(self,a):
return 1.0 / (1 + np.exp(-a))

Now, let's define a function called train for training:

    def train(self):

For the number of epochs, we do the following:

        for e in range(self.epochs): 

self.theta_ = []

#for storing gradient updates
self.g = []

For task i in a batch of tasks, we do the following:

            for i in range(self.num_tasks):

We sample k data points and prepare our train set, :

                XTrain, YTrain = sample_points(self.num_samples)

We predict the value of YHat:

                a = np.matmul(XTrain, self.theta)

YHat = self.sigmoid(a)

We calculate loss and minimize the loss using gradient descent, :

                #since we're performing classification, we use cross entropy loss as our loss function
loss = ((np.matmul(-YTrain.T, np.log(YHat)) - np.matmul((1 -YTrain.T), np.log(1 - YHat)))/self.num_samples)[0][0]

#minimize the loss by calculating gradients
gradient = np.matmul(XTrain.T, (YHat - YTrain)) / self.num_samples

#update the gradients and find the optimal parameter theta' for each of tasks
self.theta_.append(self.theta - self.alpha*gradient)

We store the gradient updates in g, :

                self.g.append(self.theta-self.theta_[i])

Now, we calculate weights,:

            normalization_factor = 0

for i in range(self.num_tasks):
for j in range(self.num_tasks):
normalization_factor += np.abs(np.dot(self.g[i].T, self.g[j]))

w = np.zeros(self.num_tasks)

for i in range(self.num_tasks):

for j in range(self.num_tasks):
w[i] += np.dot(self.g[i].T, self.g[j])

w[i] = w[i] / normalization_factor

We initialize weighted meta gradients:

            weighted_gradient = np.zeros(self.theta.shape)

For the number of tasks, we sample k data points and prepare our test set, :

            for i in range(self.num_tasks):

#sample k data points and prepare our test set for meta training
XTest, YTest = sample_points(10)

We predict the value of y:

                a = np.matmul(XTest, self.theta_[i])

YPred = self.sigmoid(a)

We compute meta gradients:

                meta_gradient = np.matmul(XTest.T, (YPred - YTest)) / self.num_samples

Multiply the weights to the computed meta gradients and update the value of , using:

                weighted_gradient += np.sum(w[i]*meta_gradient)
            self.theta = self.theta-self.beta*weighted_gradient/self.num_tasks

We print the loss for every 10 epochs:

            if e%10==0:
print "Epoch {}: Loss {} ".format(e,loss)
print 'Updated Model Parameter Theta '
print 'Sampling Next Batch of Tasks '
print '--------------------------------- '

The following is the whole class for GradientAgreement_MAML:

class GradientAgreement_MAML(object):
def __init__(self):

#initialize number of tasks i.e number of tasks we need in each batch of tasks
self.num_tasks = 2

#number of samples i.e number of shots -number of data points (k) we need to have in each task
self.num_samples = 10

#number of epochs i.e training iterations
self.epochs = 100

#hyperparameter for the inner loop (inner gradient update)
self.alpha = 0.0001

#hyperparameter for the outer loop (outer gradient update) i.e meta optimization
self.beta = 0.0001

#randomly initialize our model parameter theta
self.theta = np.random.normal(size=50).reshape(50, 1)

#define our sigmoid activation function
def sigmoid(self,a):
return 1.0 / (1 + np.exp(-a))


#now Let's get to the interesting part i.e training :P
def train(self):

#for the number of epochs,
for e in range(self.epochs):

self.theta_ = []

#for storing gradient updates
self.g = []

#for task i in batch of tasks
for i in range(self.num_tasks):

#sample k data points and prepare our train set
XTrain, YTrain = sample_points(self.num_samples)

a = np.matmul(XTrain, self.theta)

YHat = self.sigmoid(a)

#since we're performing classification, we use cross entropy loss as our loss function
loss = ((np.matmul(-YTrain.T, np.log(YHat)) - np.matmul((1 -YTrain.T), np.log(1 - YHat)))/self.num_samples)[0][0]

#minimize the loss by calculating gradients
gradient = np.matmul(XTrain.T, (YHat - YTrain)) / self.num_samples

#update the gradients and find the optimal parameter theta' for each of tasks
self.theta_.append(self.theta - self.alpha*gradient)

#compute the gradient update
self.g.append(self.theta-self.theta_[i])


#now we calculate the weights
#we know that weight is the sum of dot product of g_i and g_j divided by a normalization factor.

normalization_factor = 0

for i in range(self.num_tasks):
for j in range(self.num_tasks):
normalization_factor += np.abs(np.dot(self.g[i].T, self.g[j]))

w = np.zeros(self.num_tasks)

for i in range(self.num_tasks):

for j in range(self.num_tasks):
w[i] += np.dot(self.g[i].T, self.g[j])

w[i] = w[i] / normalization_factor



#initialize meta gradients
weighted_gradient = np.zeros(self.theta.shape)

for i in range(self.num_tasks):

#sample k data points and prepare our test set for meta training
XTest, YTest = sample_points(10)

#predict the value of y
a = np.matmul(XTest, self.theta_[i])

YPred = self.sigmoid(a)

#compute meta gradients
meta_gradient = np.matmul(XTest.T, (YPred - YTest)) / self.num_samples


weighted_gradient += np.sum(w[i]*meta_gradient)


#update our randomly initialized model parameter theta with the meta gradients
self.theta = self.theta-self.beta*weighted_gradient/self.num_tasks

if e%10==0:
print "Epoch {}: Loss {} ".format(e,loss)
print 'Updated Model Parameter Theta '
print 'Sampling Next Batch of Tasks '
print '--------------------------------- '

We create an instance to our GradientAgreement_MAML class:

model = GradientAgreement_MAML()

Then, we train the model:

model.train()

You can see how the loss decreases over epochs:

Epoch 0: Loss 5.9436043239

Updated Model Parameter Theta

Sampling Next Batch of Tasks 

---------------------------------

Epoch 10: Loss 3.905350606769

Updated Model Parameter Theta

Sampling Next Batch of Tasks 

---------------------------------

Epoch 20: Loss 2.0736155578

Updated Model Parameter Theta

Sampling Next Batch of Tasks 

---------------------------------

Epoch 30: Loss 1.48478751777

Updated Model Parameter Theta

Sampling Next Batch of Tasks 

---------------------------------
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset