Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import click
- from sklearn.cluster import KMeans
- from sklearn import metrics
- from scipy.linalg import svd
- import numpy as np
- import pandas as pd
- @click.command()
- @click.argument('input-file', help='Name of the input file. Should be stored as a Tab Separated File')
- @click.option('--output-file', default=None, help='Output results to a file instead of printing results')
- @click.option('--n-clusters', default=3, type=int, help='The number of KMeans clusters')
- @click.option('--pca', is_flag=True, help='Should we use PCA precprocessing of the data?')
- @click.option('--pca-min-variance', default=1.0, type=float,
- help='The amount of variance to perserve if using PCA. Ignored if not used with --pca flag')
- @click.option('--evaluate-clusters', is_flag=True, help='Should we evaluate cluster quality after clustering?')
- @click.option('--verbose', is_flag=True, help='Print progress')
- def run_clustering(input_file, output_file, n_clusters, pca, pca_min_variance, evaluate_clusters, verbose):
- """Run KMeans Clustering on an input TSV File with Optional PCA"""
- input_data = pd.read_table(input_file)
- X = np.array(input_data)
- if pca:
- if verbose:
- print 'Fitting PCA'
- X = pca(X, pca_min_variance, verbose)
- if verbose:
- print 'Fitting K-Means with %s clusters' % n_clusters
- kmeans = KMeans(n_clusters=n_clusters).fit(X)
- clusters = kmeans.labels_
- if evaluate_clusters:
- if verbose:
- print 'Calculating Calinski-Harabaz Index'
- print 'Calinski-Harabaz Index: %s' % metrics.calinski_harabaz_score(X, clusters)
- if output_file is not None:
- input_data['clusters'] = clusters
- input_data.to_csv(output_file, index=False, columns='clusters', sep='\t')
- else:
- print 'clusters'
- for i in clusters:
- print i
- def pca(X, variance_explained=1., verbose=True):
- """Compute PCA on data matrix X"""
- X = np.array(X)
- m, n = X.shape
- mu = X.mean(0)
- Xn = X - mu
- if verbose:
- print 'Fitting covariance matrix'
- XtX = Xn.T.dot(Xn) / m
- if verbose:
- print 'Computing SVD'
- U, s, V = svd(XtX)
- variance_explained_ratio = (s / s.sum()).cumsum()
- n_comp = (variance_explained_ratio < variance_explained).sum()
- if verbose:
- print 'Selecting %s components with an explained variance of %s' % (n_comp, variance_explained_ratio[n_comp])
- return Xn.dot(U[:, :n_comp])
- if __name__ == '__main__':
- run_clustering()
Add Comment
Please, Sign In to add comment