import time
import copy
import sys
import os
from tqdm import trange

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

from ._aux import error_ee, error_ee_split, ls_ee, eloss
from import DataLoader, TensorDataset

[docs]class FCLayers(nn.Module): """Default nn structure class. :param di: Input feature size. :type di: int :param do: Output feature size. :type do: int How to define a custom nn Modules, check at: """ def __init__(self, di, do): super(FCLayers, self).__init__() = nn.Sequential( nn.Linear(di, 50), nn.BatchNorm1d(50), nn.ReLU(), nn.Linear(50, 50), nn.BatchNorm1d(50), nn.ReLU(), nn.Linear(50, do), ) for m in if isinstance(m, nn.Linear): nn.init.normal_(m.weight, mean=0, std=0.1) nn.init.constant_(m.bias, val=0)
[docs] def forward(self, y): """""" return
[docs]class NeuralEE(object): """NeuralEE class. :param dataset: GeneExpressionDataset. :type dataset: neuralee.dataset.GeneExpressionDataset :param d: low embedded dimension. :type d: int :param lam: trade-off factor of elastic embedding function. :param device: device chosen to operate. If None, set as torch.device('cpu'). :type device: torch.device """ def __init__(self, dataset, d=2, lam=1, device=None): self.dataset = dataset self.d = d self.lam = lam self.device = device if device is not None else torch.device('cpu') self.Y = torch.from_numpy(self.dataset.Y) self.Y_splits = torch.from_numpy(self.dataset.Y_splits) \ if hasattr(self.dataset, 'Y_splits') else None self.Wp = torch.from_numpy(self.dataset.Wp) \ if hasattr(self.dataset, 'Wp') else None self.Wn = torch.from_numpy(self.dataset.Wn) \ if hasattr(self.dataset, 'Wn') else None self.Lps = torch.from_numpy(self.dataset.Lps) \ if hasattr(self.dataset, 'Lps') else None self.Wns = torch.from_numpy(self.dataset.Wns) \ if hasattr(self.dataset, 'Wns') else None def __len__(self): return len(self.dataset) @property def D(self): """feature size.""" return self.Y.shape[1] @property def labels(self): """ :return: label vector. :rtype: numpy.ndarray """ return self.dataset.labels.squeeze() @staticmethod @torch.no_grad() def _loss(Y, Wp, Wn, lam, net, device, calculate_error): """ :param Y: sample-feature matrix. :type Y: torch.FloatTensor :param Wp: attractive weights. :type Wp: torch.FloatTensor. :param Wn: repulsive weights. :type Wn: torch.FloatTensor. :param lam: trade-off factor of elastic embedding function. :param net: nn instance. :param device: device chose to operate. :param calculate_error: how to calculate error. :type calculate_error: {None, 'cpu', 'cuda'} :return: elastic embedding loss. """ results = dict() net.eval() X = net( results['X'] = X.cpu() if calculate_error is not None: assert calculate_error in ['cpu', 'cuda'] if calculate_error == 'cpu': e = error_ee_split( X, Wp, Wn, lam, memory=4, device=torch.device('cpu')) results['e'] = e else: e = error_ee_split(X, Wp, Wn, lam) results['e'] = e.item() return results def _loss_entire(self, calculate_error=None): """ :param calculate_error: how to calculate error. :type calculate_error: {None, 'cpu', 'cuda'} :return: elastic embedding loss on the entire dataset. """ return self._loss( self.Y, self.Wp, self.Wn, self.lam,, self.device, calculate_error=calculate_error)
[docs] @torch.no_grad() def EE(self, size=1., maxit=200, tol=1e-5, frequence=None, aff='ea', perplexity=30.0): """Free Elastic embedding (no mapping). Fast training of nonlinear embeddings using the spectral direction for the Elastic Embedding (EE) algorithm. Reference: Partial-Hessian Strategies for Fast Learning of Nonlinear Embeddings. :param size: subsample size of the entire dataset to embed. if subsample, the affinity will be recalculated on subsamples. :type size: int or percentage :param maxit: max number of iterations for EE. :type maxit: int :param tol: minimum relative distance between consecutive X. :param frequence: frequence to display iterating results. if None, not display. :type frequence: int :param aff: if subsampled, affinity used to calculate attractive weights. :type aff: {'ea', 'x2p'} :param perplexity: if subsampled, perplexity defined in elastic embedding function. :return: embedding results. 'X': embedding coordinates; 'e': embedding loss; 'sub_samples': if subsampled, subsamples information. :rtype: dict """ since = time.time() N = len(self) results = dict() N_sub = size if isinstance(size, int) else int(N * size) assert N_sub <= 15000, \ 'Number of samples is too huge for free EE.' X0 = 1e-5 * torch.randn(N_sub, self.d) if N_sub == N: assert self.Wp is not None, \ 'affinity on entire dataset is needed.' Wp = self.Wp Wn = self.Wn else: ind_sub = torch.randperm(N)[: N_sub].tolist() Y = self.dataset.Y[ind_sub] labels = self.labels[ind_sub].squeeze() print('Compute affinity on subsample') Wp, Wn = self.dataset._affinity(Y, aff, perplexity) Wp = torch.from_numpy(Wp) Wn = torch.from_numpy(Wn) sub_samples = {'Y': torch.from_numpy(Y), 'labels': labels, 'Wp': Wp, 'Wn': Wn} results['sub_samples'] = sub_samples Wp = Wn = Dp = Wp.sum(dim=1).diagflat() Lp4 = 4 * (-Wp + Dp) R = torch.cholesky( Lp4 + 1e-6 * torch.eye(N_sub, N_sub, device=self.device), upper=True) invR = R.inverse() # S = torch.eye(N_sub, N_sub, device=self.device) # P0 = -S @ invR @ invR.t() @ S.t() # del R, S, invR, Dp P0 = -invR @ invR.t() del R, invR, Dp torch.cuda.empty_cache() Xold = e, ker = error_ee(Xold, Wp, Wn, self.lam) j = 1 a = 1 convcrit = maxit >= 1 while convcrit: WWn = self.lam * Wn * ker DDn = WWn.sum(dim=1).diagflat() # gradient G = (Lp4 - 4 * (-WWn + DDn)) @ Xold P = P0 @ G # spectral direction # line search X, e, ker, a = ls_ee(Xold, Wp, Wn, self.lam, P, e, G, a) convcrit = (j < maxit) and \ (torch.norm(X - Xold) > tol * torch.norm(Xold)) Xold = X if frequence is not None and j % frequence == 0: print('Epoch {}, EE loss is {:.6f}'.format(j, e)) j += 1 print('Elastic Embedding, lambda={}, completed in {:.2f}s, ' 'EE loss is {:.6f}'.format(self.lam, time.time() - since, e.item())) results['X'] = X.cpu() results['e'] = e.item() torch.cuda.empty_cache() return results
def _collate_fn(self, batch): y, Lp, Wn = batch[0] return,,
[docs] def fine_tune(self, optimizer=None, size=1., net=None, frequence=50, verbose=False, maxit=500, calculate_error=None, pin_memory=True, aff='ea', perplexity=30.0, save_embedding=None): """NeuralEE method. It supports incremental learning, which means nn can fine tune, if a pre-trained nn offered. :param optimizer: optimization for training neural networks. if None, set as torch.optim.Adam(lr=0.01). :type optimizer: torch.optim :param size: subsample size of the entire dataset to embed. if subsample, the affinity will be recalculated on subsamples. :type size: int or percentage :param net: the nn instance as embedding function. if None and not hasattr(self, net), then fine tune; elif not None, then fine tune net as; else set as the FCLayers instance. :type net: torch.nn.Module :param frequence: frequence to compare and save iterating results. :type frequence: int :param verbose: whether to show verbose training loss. :type verbose: bool :param maxit: max number of iterations for NeuralEE. :type maxit: int :param calculate_error: how to calculate error, if the number of samples is large, set None to avoid out of memory on 'cuda' or 'cpu'. :type calculate_error: {None, 'cpu', 'cuda'} :param pin_memory: whether to pin data on GPU memory to save time of transfer, which depends on your GPU memory. :type pin_memory: bool :param aff: if subsampled, affinity used to calculate attractive weights. :type aff: {'ea', 'x2p'} :param perplexity: if subsampled, perplexity defined in elastic embedding function. :param save_embedding: path to save iterating results according to frequence. if None, not save. :type save_embedding: str :return: embedding results. 'X': embedding coordinates; 'e': embedding loss; 'sub_samples': if subsampled, subsamples information. :rtype: dict """ since = time.time() results = dict() N = len(self) N_sub = size if isinstance(size, int) else int(N * size) if N_sub != N: print('Compute affinity on subsample') ind_sub = torch.randperm(N)[: N_sub].tolist() Y = self.dataset.Y[ind_sub] Wp, Wn = self.dataset._affinity(Y, aff, perplexity) Y = torch.from_numpy(Y) Wp = torch.from_numpy(Wp) Wn = torch.from_numpy(Wn) labels = self.labels[ind_sub].squeeze() sub_samples = {'Y': Y, 'labels': labels, 'Wp': Wp, 'Wn': Wn} results['sub_samples'] = sub_samples Y_splits = Y.unsqueeze(0).to(self.device) Wp = Lps = (Wp.sum(dim=1).diagflat() - Wp).unsqueeze(0) Wns = Wn.unsqueeze(0).to(self.device) else: assert calculate_error is None or self.Wp is not None, \ 'affinity on entire dataset is needed to calculate error, ' \ 'or let calculate_error set as None.' assert self.Lps is not None, \ 'affinity is needed.' Lps = if pin_memory else self.Lps Wns = if pin_memory else self.Wns Y_splits = \ if pin_memory else self.Y_splits dataset = TensorDataset(Y_splits, Lps, Wns) # for speed, only batch_size = 1 if N_sub != N or pin_memory: dataloader = DataLoader(dataset, collate_fn=lambda batch: batch[0]) else: dataloader = DataLoader(dataset, collate_fn=self._collate_fn) if save_embedding is not None: path = save_embedding + self.dataset.__class__.__name__ + '/' if not os.path.exists(path): os.makedirs(path) flag_pbar = calculate_error is None and verbose if net is not None: = net elif hasattr(self, 'net'): pass else: = FCLayers(self.D, self.d) if optimizer is None: optimizer = optim.Adam(, lr=0.01) if calculate_error is not None and frequence is not None: best_model_wts = copy.deepcopy( best_loss = float('inf') with trange(maxit, desc="NeuralEE", file=sys.stdout, disable=not flag_pbar) as pbar: for epoch in range(1, maxit + 1): for inputs, Lp_batch, Wn_batch in dataloader: optimizer.zero_grad() outputs = loss = eloss(outputs, Lp_batch, Wn_batch, self.lam) loss.backward() optimizer.step() if frequence is not None and epoch % frequence == 0: if calculate_error is not None: if N_sub == N: e = self._loss_entire( calculate_error=calculate_error)['e'] else: e = self._loss( Y_splits[0], Wp, Wns[0], self.lam,, self.device, calculate_error)['e'] if verbose: print( 'Epoch {}, EE loss is {:.6f}'.format(epoch, e)) if e < best_loss: best_loss = e best_model_wts = copy.deepcopy( if save_embedding is not None: X = self._loss_entire( calculate_error=None)['X'].numpy() class_name = self.dataset.__class__.__name__ + class_name + str(epoch), X) pbar.update(1) if calculate_error is not None and frequence is not None: if N_sub == N: results.update(self._loss_entire(calculate_error=calculate_error)) else: results.update(self._loss(Y_splits[0], Wp, Wns[0], self.lam,, self.device, calculate_error)) if calculate_error is not None: print('Neural Elastic Embedding, lambda={}, completed in {:.2f}s, ' 'EE loss is {:.6f}'.format(self.lam, time.time() - since, results['e'])) else: print('Neural Elastic Embedding, lambda={}, completed in {:.2f}s.' .format(self.lam, time.time() - since)) # torch.cuda.empty_cache() return results
[docs] @torch.no_grad() def map(self, samples=dict(), calculate_error=None): """Directly mapping via the learned nn. :param samples: 'Y': samples to be mapped into low-dimensional coordinate. 'labels': samples labels. None is acceptable. 'Wp': attractive weights on samples. None is acceptable if error need not be calculated. 'Wn': repulsive weights on samples. None is acceptable if error need not be calculated. if empty dict, mapping on training data. :type samples: dict :param calculate_error: how to calculate error, if the number of samples is large, set None to avoid out of memory on 'cuda' or 'cpu'. :type calculate_error: {None, 'cpu', 'cuda'} :return: embedding results. 'X': embedding coordinates; 'e': embedding loss. :rtype: dict """ if samples == dict(): assert calculate_error is None or hasattr(self.dataset, 'Wp'), \ 'affinity on entire dataset is needed.' samples = {'Y': torch.from_numpy(self.dataset.Y), 'labels': self.labels, 'Wp': None if calculate_error is None else self.dataset.Wp, 'Wn': None if calculate_error is None else self.dataset.Wn} results = self._loss( samples['Y'], samples['Wp'], samples['Wn'], self.lam,, self.device, calculate_error) if calculate_error is not None: print('EE loss is {:.6f}'.format(results['e'])) torch.cuda.empty_cache() return results