Source code for neuralee.embedding

import time
import copy
import sys
import os
from tqdm import trange

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

from ._aux import error_ee, error_ee_split, ls_ee, eloss
from torch.utils.data import DataLoader, TensorDataset


[docs]class FCLayers(nn.Module): """Default nn structure class. :param di: Input feature size. :type di: int :param do: Output feature size. :type do: int How to define a custom nn Modules, check at: https://pytorch.org/tutorials/beginner/pytorch_with_examples.html#pytorch-custom-nn-modules """ def __init__(self, di, do): super(FCLayers, self).__init__() self.net = nn.Sequential( nn.Linear(di, 50), nn.BatchNorm1d(50), nn.ReLU(), nn.Linear(50, 50), nn.BatchNorm1d(50), nn.ReLU(), nn.Linear(50, do), ) for m in self.net.modules(): if isinstance(m, nn.Linear): nn.init.normal_(m.weight, mean=0, std=0.1) nn.init.constant_(m.bias, val=0)
[docs] def forward(self, y): """""" return self.net(y)
[docs]class NeuralEE(object): """NeuralEE class. :param dataset: GeneExpressionDataset. :type dataset: neuralee.dataset.GeneExpressionDataset :param d: low embedded dimension. :type d: int :param lam: trade-off factor of elastic embedding function. :param device: device chosen to operate. If None, set as torch.device('cpu'). :type device: torch.device """ def __init__(self, dataset, d=2, lam=1, device=None): self.dataset = dataset self.d = d self.lam = lam self.device = device if device is not None else torch.device('cpu') self.Y = torch.from_numpy(self.dataset.Y) self.Y_splits = torch.from_numpy(self.dataset.Y_splits) \ if hasattr(self.dataset, 'Y_splits') else None self.Wp = torch.from_numpy(self.dataset.Wp) \ if hasattr(self.dataset, 'Wp') else None self.Wn = torch.from_numpy(self.dataset.Wn) \ if hasattr(self.dataset, 'Wn') else None self.Lps = torch.from_numpy(self.dataset.Lps) \ if hasattr(self.dataset, 'Lps') else None self.Wns = torch.from_numpy(self.dataset.Wns) \ if hasattr(self.dataset, 'Wns') else None def __len__(self): return len(self.dataset) @property def D(self): """feature size.""" return self.Y.shape[1] @property def labels(self): """ :return: label vector. :rtype: numpy.ndarray """ return self.dataset.labels.squeeze() @staticmethod @torch.no_grad() def _loss(Y, Wp, Wn, lam, net, device, calculate_error): """ :param Y: sample-feature matrix. :type Y: torch.FloatTensor :param Wp: attractive weights. :type Wp: torch.FloatTensor. :param Wn: repulsive weights. :type Wn: torch.FloatTensor. :param lam: trade-off factor of elastic embedding function. :param net: nn instance. :param device: device chose to operate. :param calculate_error: how to calculate error. :type calculate_error: {None, 'cpu', 'cuda'} :return: elastic embedding loss. """ results = dict() net.eval() net.to(device) X = net(Y.to(device)) results['X'] = X.cpu() if calculate_error is not None: assert calculate_error in ['cpu', 'cuda'] if calculate_error == 'cpu': e = error_ee_split( X, Wp, Wn, lam, memory=4, device=torch.device('cpu')) results['e'] = e else: e = error_ee_split(X, Wp, Wn, lam) results['e'] = e.item() return results def _loss_entire(self, calculate_error=None): """ :param calculate_error: how to calculate error. :type calculate_error: {None, 'cpu', 'cuda'} :return: elastic embedding loss on the entire dataset. """ return self._loss( self.Y, self.Wp, self.Wn, self.lam, self.net, self.device, calculate_error=calculate_error)
[docs] @torch.no_grad() def EE(self, size=1., maxit=200, tol=1e-5, frequence=None, aff='ea', perplexity=30.0): """Free Elastic embedding (no mapping). Fast training of nonlinear embeddings using the spectral direction for the Elastic Embedding (EE) algorithm. Reference: Partial-Hessian Strategies for Fast Learning of Nonlinear Embeddings. http://faculty.ucmerced.edu/mcarreira-perpinan/papers/icml12.pdf :param size: subsample size of the entire dataset to embed. if subsample, the affinity will be recalculated on subsamples. :type size: int or percentage :param maxit: max number of iterations for EE. :type maxit: int :param tol: minimum relative distance between consecutive X. :param frequence: frequence to display iterating results. if None, not display. :type frequence: int :param aff: if subsampled, affinity used to calculate attractive weights. :type aff: {'ea', 'x2p'} :param perplexity: if subsampled, perplexity defined in elastic embedding function. :return: embedding results. 'X': embedding coordinates; 'e': embedding loss; 'sub_samples': if subsampled, subsamples information. :rtype: dict """ since = time.time() N = len(self) results = dict() N_sub = size if isinstance(size, int) else int(N * size) assert N_sub <= 15000, \ 'Number of samples is too huge for free EE.' X0 = 1e-5 * torch.randn(N_sub, self.d) if N_sub == N: assert self.Wp is not None, \ 'affinity on entire dataset is needed.' Wp = self.Wp Wn = self.Wn else: ind_sub = torch.randperm(N)[: N_sub].tolist() Y = self.dataset.Y[ind_sub] labels = self.labels[ind_sub].squeeze() print('Compute affinity on subsample') Wp, Wn = self.dataset._affinity(Y, aff, perplexity) Wp = torch.from_numpy(Wp) Wn = torch.from_numpy(Wn) sub_samples = {'Y': torch.from_numpy(Y), 'labels': labels, 'Wp': Wp, 'Wn': Wn} results['sub_samples'] = sub_samples Wp = Wp.to(self.device) Wn = Wn.to(self.device) Dp = Wp.sum(dim=1).diagflat() Lp4 = 4 * (-Wp + Dp) R = torch.cholesky( Lp4 + 1e-6 * torch.eye(N_sub, N_sub, device=self.device), upper=True) invR = R.inverse() # S = torch.eye(N_sub, N_sub, device=self.device) # P0 = -S @ invR @ invR.t() @ S.t() # del R, S, invR, Dp P0 = -invR @ invR.t() del R, invR, Dp torch.cuda.empty_cache() Xold = X0.to(self.device) e, ker = error_ee(Xold, Wp, Wn, self.lam) j = 1 a = 1 convcrit = maxit >= 1 while convcrit: WWn = self.lam * Wn * ker DDn = WWn.sum(dim=1).diagflat() # gradient G = (Lp4 - 4 * (-WWn + DDn)) @ Xold P = P0 @ G # spectral direction # line search X, e, ker, a = ls_ee(Xold, Wp, Wn, self.lam, P, e, G, a) convcrit = (j < maxit) and \ (torch.norm(X - Xold) > tol * torch.norm(Xold)) Xold = X if frequence is not None and j % frequence == 0: print('Epoch {}, EE loss is {:.6f}'.format(j, e)) j += 1 print('Elastic Embedding, lambda={}, completed in {:.2f}s, ' 'EE loss is {:.6f}'.format(self.lam, time.time() - since, e.item())) results['X'] = X.cpu() results['e'] = e.item() torch.cuda.empty_cache() return results
def _collate_fn(self, batch): y, Lp, Wn = batch[0] return y.to(self.device), Lp.to(self.device), Wn.to(self.device)
[docs] def fine_tune(self, optimizer=None, size=1., net=None, frequence=50, verbose=False, maxit=500, calculate_error=None, pin_memory=True, aff='ea', perplexity=30.0, save_embedding=None): """NeuralEE method. It supports incremental learning, which means nn can fine tune, if a pre-trained nn offered. :param optimizer: optimization for training neural networks. if None, set as torch.optim.Adam(lr=0.01). :type optimizer: torch.optim :param size: subsample size of the entire dataset to embed. if subsample, the affinity will be recalculated on subsamples. :type size: int or percentage :param net: the nn instance as embedding function. if None and not hasattr(self, net), then fine tune self.net; elif not None, then fine tune net as self.net; else set as the FCLayers instance. :type net: torch.nn.Module :param frequence: frequence to compare and save iterating results. :type frequence: int :param verbose: whether to show verbose training loss. :type verbose: bool :param maxit: max number of iterations for NeuralEE. :type maxit: int :param calculate_error: how to calculate error, if the number of samples is large, set None to avoid out of memory on 'cuda' or 'cpu'. :type calculate_error: {None, 'cpu', 'cuda'} :param pin_memory: whether to pin data on GPU memory to save time of transfer, which depends on your GPU memory. :type pin_memory: bool :param aff: if subsampled, affinity used to calculate attractive weights. :type aff: {'ea', 'x2p'} :param perplexity: if subsampled, perplexity defined in elastic embedding function. :param save_embedding: path to save iterating results according to frequence. if None, not save. :type save_embedding: str :return: embedding results. 'X': embedding coordinates; 'e': embedding loss; 'sub_samples': if subsampled, subsamples information. :rtype: dict """ since = time.time() results = dict() N = len(self) N_sub = size if isinstance(size, int) else int(N * size) if N_sub != N: print('Compute affinity on subsample') ind_sub = torch.randperm(N)[: N_sub].tolist() Y = self.dataset.Y[ind_sub] Wp, Wn = self.dataset._affinity(Y, aff, perplexity) Y = torch.from_numpy(Y) Wp = torch.from_numpy(Wp) Wn = torch.from_numpy(Wn) labels = self.labels[ind_sub].squeeze() sub_samples = {'Y': Y, 'labels': labels, 'Wp': Wp, 'Wn': Wn} results['sub_samples'] = sub_samples Y_splits = Y.unsqueeze(0).to(self.device) Wp = Wp.to(self.device) Lps = (Wp.sum(dim=1).diagflat() - Wp).unsqueeze(0) Wns = Wn.unsqueeze(0).to(self.device) else: assert calculate_error is None or self.Wp is not None, \ 'affinity on entire dataset is needed to calculate error, ' \ 'or let calculate_error set as None.' assert self.Lps is not None, \ 'affinity is needed.' Lps = self.Lps.to(self.device) if pin_memory else self.Lps Wns = self.Wns.to(self.device) if pin_memory else self.Wns Y_splits = self.Y_splits.to(self.device) \ if pin_memory else self.Y_splits dataset = TensorDataset(Y_splits, Lps, Wns) # for speed, only batch_size = 1 if N_sub != N or pin_memory: dataloader = DataLoader(dataset, collate_fn=lambda batch: batch[0]) else: dataloader = DataLoader(dataset, collate_fn=self._collate_fn) if save_embedding is not None: path = save_embedding + self.dataset.__class__.__name__ + '/' if not os.path.exists(path): os.makedirs(path) flag_pbar = calculate_error is None and verbose if net is not None: self.net = net elif hasattr(self, 'net'): pass else: self.net = FCLayers(self.D, self.d) if optimizer is None: optimizer = optim.Adam(self.net.parameters(), lr=0.01) if calculate_error is not None and frequence is not None: best_model_wts = copy.deepcopy(self.net.state_dict()) best_loss = float('inf') self.net.train() self.net.to(self.device) with trange(maxit, desc="NeuralEE", file=sys.stdout, disable=not flag_pbar) as pbar: for epoch in range(1, maxit + 1): for inputs, Lp_batch, Wn_batch in dataloader: optimizer.zero_grad() outputs = self.net(inputs) loss = eloss(outputs, Lp_batch, Wn_batch, self.lam) loss.backward() optimizer.step() if frequence is not None and epoch % frequence == 0: if calculate_error is not None: if N_sub == N: e = self._loss_entire( calculate_error=calculate_error)['e'] else: e = self._loss( Y_splits[0], Wp, Wns[0], self.lam, self.net, self.device, calculate_error)['e'] if verbose: print( 'Epoch {}, EE loss is {:.6f}'.format(epoch, e)) if e < best_loss: best_loss = e best_model_wts = copy.deepcopy( self.net.state_dict()) if save_embedding is not None: X = self._loss_entire( calculate_error=None)['X'].numpy() class_name = self.dataset.__class__.__name__ np.save(path + class_name + str(epoch), X) pbar.update(1) if calculate_error is not None and frequence is not None: self.net.load_state_dict(best_model_wts) if N_sub == N: results.update(self._loss_entire(calculate_error=calculate_error)) else: results.update(self._loss(Y_splits[0], Wp, Wns[0], self.lam, self.net, self.device, calculate_error)) if calculate_error is not None: print('Neural Elastic Embedding, lambda={}, completed in {:.2f}s, ' 'EE loss is {:.6f}'.format(self.lam, time.time() - since, results['e'])) else: print('Neural Elastic Embedding, lambda={}, completed in {:.2f}s.' .format(self.lam, time.time() - since)) # torch.cuda.empty_cache() return results
[docs] @torch.no_grad() def map(self, samples=dict(), calculate_error=None): """Directly mapping via the learned nn. :param samples: 'Y': samples to be mapped into low-dimensional coordinate. 'labels': samples labels. None is acceptable. 'Wp': attractive weights on samples. None is acceptable if error need not be calculated. 'Wn': repulsive weights on samples. None is acceptable if error need not be calculated. if empty dict, mapping on training data. :type samples: dict :param calculate_error: how to calculate error, if the number of samples is large, set None to avoid out of memory on 'cuda' or 'cpu'. :type calculate_error: {None, 'cpu', 'cuda'} :return: embedding results. 'X': embedding coordinates; 'e': embedding loss. :rtype: dict """ if samples == dict(): assert calculate_error is None or hasattr(self.dataset, 'Wp'), \ 'affinity on entire dataset is needed.' samples = {'Y': torch.from_numpy(self.dataset.Y), 'labels': self.labels, 'Wp': None if calculate_error is None else self.dataset.Wp, 'Wn': None if calculate_error is None else self.dataset.Wn} results = self._loss( samples['Y'], samples['Wp'], samples['Wn'], self.lam, self.net, self.device, calculate_error) if calculate_error is not None: print('EE loss is {:.6f}'.format(results['e'])) torch.cuda.empty_cache() return results