Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # import external modules
- import h5py
- import time
- import math
- import cv2
- import os
- import os.path
- import shutil
- import pylatex
- import numpy as np
- import matplotlib as mpl
- import matplotlib.pyplot as plt
- import matplotlib.cm as cm
- import matplotlib.colors as colors
- import scipy.constants as constants
- from IPython.core.display import display, HTML
- from shutil import copyfile
- from os import path
- from datetime import datetime # for saving figures and files with current data/time in file name
- from scipy.interpolate import griddata
- from mpl_toolkits.axes_grid.inset_locator import (inset_axes, InsetPosition, mark_inset) # create tiny plot inside of another plot
- from mpl_toolkits.axes_grid1 import make_axes_locatable
- from mpl_toolkits.axes_grid1.colorbar import colorbar
- from matplotlib.colors import ListedColormap, LinearSegmentedColormap
- from matplotlib.ticker import (MultipleLocator, FormatStrFormatter, AutoMinorLocator, AutoLocator)
- from sklearn.preprocessing import MinMaxScaler, minmax_scale
- from sklearn.cluster import KMeans
- from sklearn.metrics import silhouette_score, calinski_harabasz_score
- from scipy.optimize import curve_fit
- from silx.io.dictdump import dicttoh5 # save computationally expensive dictionaries as h5 files
- def load_h5(scan, datatype, show_time=False):
- '''This function loads only a few selected h5 files, but the definition should suffice for all the files I might need throughout the work on my bachelor's thesis.'''
- start = time.time()
- try:
- if datatype == "positions":
- h5_file = h5py.File("0016_EMPA_MSH_003c/scan_00" + str(
- scan) + "/positions.h5", "r")
- else:
- h5_file = h5py.File("0016_EMPA_MSH_003c/scan_00" + str(
- scan) + "/data/" + datatype + ".h5", "r")
- return h5_file
- except:
- print("You entered '" + str(
- scan) + "' for the argument 'scan'. The function only takes an integer as first argument. Its value must lie between 178 and 192, except for 184, because this scan was aborted.\nThe second argument must be a string for the datatype, e.g. 'positions' or 'fluo_roi'.")
- end = time.time()
- if show_time:
- print("Loading file " + datatype + ".h5 from scan 00" + str(scan) + "took " + str(end - start) + "seconds.")
- def counts2amps(scan, nA=True, negative=False, solar_cell_channel_only=False):
- '''This function takes the integer scan number as an argument, looks for the counter data inside of the h5 file,
- converts all the counts into XBIC in (nA) and returns the values as an ndarray.'''
- # load counts and turn it into 2d array
- if scan in [179, 187, 188, 189, 190]: # XBIC measurements without lock-in amplification
- loaded_counts = load_h5(scan, "counter")["/data/solar_cell_0-10V"]
- counts_as_array = np.array(loaded_counts)
- else:
- loaded_counts = load_h5(scan, "counter")["/data/lock_in_R_0-10V"]
- counts_as_array = np.array(loaded_counts).astype(
- float) # this integer array must be converted to float, otherwise np.reciprocal will return only zeros
- if negative:
- if scan in [182, 186, 187,
- 190]: # these scans show a current signal at the negative branch of the V2F converters
- counts_as_array *= -1
- if solar_cell_channel_only:
- if scan in [178, 179, 180, 181, 183, 185, 188, 191, 192]:
- loaded_counts = load_h5(scan, "counter")["/data/solar_cell_0-10V"]
- counts_as_array = np.array(loaded_counts)
- if scan in [182, 186]:
- loaded_counts = load_h5(scan, "counter")["/data/solar_cell_-10-0V"]
- counts_as_array = np.array(loaded_counts) * (-1)
- if scan in [187, 189, 190]:
- counts_as_array = np.zeros(40200)
- # load photon counts
- loaded_photon_counts = load_h5(scan, "counter")["/data/qbpm_micro_sum"]
- photon_counts_as_array = np.array(loaded_photon_counts).astype(
- float) # this integer array must be converted to float, otherwise np.reciprocal will return only zeros
- # load exposure times and turn it into 2d array
- loaded_exposure_times = load_h5(scan, "counter")["/data/exposure_time"]
- exposure_times_as_array = np.array(loaded_exposure_times)
- # normalize counts and photons to one second
- c_rate = (counts_as_array / exposure_times_as_array)
- ph_rate = photon_counts_as_array / exposure_times_as_array
- # normalize counts to the average photon rate and one second
- f_DAQ = c_rate * np.mean(
- ph_rate) / ph_rate # multiply each LIA count by the ratio of the average photon count to the respective photon count (e.g. twice as much photons as the average should be normalized to half of the XBIC curent)
- # calculate the conversion factor
- if scan in [178, 179, 180, 181, 182, 183, 185, 186, 191, 192]: # these scans have a sensitivity of 1 μA/V
- A_PA = 10 ** 6 # "P_A" = 1/sensitivity: Preamplifier amplification factor in V/A, values taken from LogbuchStrahlzeitPetraIII31.10.-6.11.2018.pptx, slide 54f.
- if scan in [187, 188, 189, 190]: # these scans have a sensitivity of 100 mA/V
- A_PA = 10
- A_LIA = 1 # "A_LIA" = 1/scaling = Lock-in amplifier amplification factor in V/V, values also taken from the pptx logbook
- V2f = 10 ** 6
- Wff = math.sqrt(
- 2) # the output signal (in this case a sine signal) of the LIA is the RMS amplitude, but we are interested in the peak-to-peak value (see jove paper, p.4)
- H_conv = 2 * Wff / (V2f * A_LIA * A_PA)
- # finally multiply the conversion factor with the normalized count of each pixel
- if not nA:
- S_XBIC = f_DAQ * H_conv
- S_XBIC = f_DAQ * H_conv * 10 ** 9
- # return S_XBIC_raw_nA
- return S_XBIC
- def supergrid(yvalues, zvalues, currents, gridres):
- """This function creates a rectangular grid where both axes have the given length. The given datapoints, which
- location is characterized by a tuple of y- and z-values, are interpolated such that every point in the grid gets
- assigned a value according to the nearest neighbor method.
- ARGUMENTS:
- yvalues (1D-array): Corresponding y-values for every measurement
- zvalues (1D-array): Corresponding x-values for every measurement
- currents (1D-array): All currents measured on the respective y- and z-coordinates
- gridres (float): Resolution of the grid insofar as it is the length of both axes of the grid in pixels
- RETURNS:
- (yvalues_new, zvalues_new, currents_supergrid) (Tuple of three 2D-Arrays): three grids with corresponding
- y-values, z-values and currents at every single point in the grid.
- """
- if np.size(yvalues) != np.size(zvalues):
- raise Exception("Arrays 'yvalues' and 'zvalues' must have same size.")
- else:
- if np.size(yvalues) - np.size(currents) == -1: # This paragraph is to catch an error that is raised when
- currents = currents[:np.size(currents) - 1] # dealing with the XRF scan XBIC data. It appears that
- if np.size(yvalues) - np.size(currents) == 1: # the y- and z-arrays or the currents are too long by one
- yvalues = yvalues[:np.size(yvalues) - 1] # element where no measurement was done. This element is
- zvalues = zvalues[:np.size(zvalues) - 1] # removed from the respective array to align points and data.
- if np.size(yvalues) - np.size(currents) == 0:
- pass
- else:
- raise Exception(
- "Positions (y, z) and corresponding data points can't be aligned due to their different size.")
- yvalues_shift = yvalues - yvalues.min() # shift y-values to obtain the relative position
- zvalues_shift = zvalues - zvalues.min() # shift y-values to obtain the relative position
- # Create grid of y- and z-values with respect to the selected grid resolution (gridres)
- yvalues_new, zvalues_new = np.mgrid[0:yvalues_shift.max():gridres * 1j, 0:zvalues_shift.max():gridres * 1j]
- # Interpolate given currents such that they fit the grid of x- and y-values
- currents_supergrid = griddata((yvalues_shift, zvalues_shift), currents, (yvalues_new, zvalues_new),
- method="nearest")
- return yvalues_new, zvalues_new, currents_supergrid.T
- def plotXBIC_loop(scan, fig, fontsize, voltage_dic, loop_count, compareB=None, compareB_counter=None, compare=False):
- # fig.suptitle("Reduced LIA count normalized", fontsize=2*fontsize, x=0.7, y=0.92) # optional title for the figure (with this x and y it is centered and has a good distance)
- # load position and define y, z and f(y,z) as arrays that can be interpreted by the imshow() function
- positions = load_h5(scan, "positions")
- y_axis_h5 = positions["/data/encoder_fast/data"]
- z_axis_h5 = positions["/data/encoder_slow/data"]
- y = np.array(y_axis_h5)
- z = np.array(z_axis_h5)
- current = counts2amps(scan)
- if compare:
- counts2amps_A = current
- counts2amps_B = counts2amps(compareB[compareB_counter])
- # append NaN: scan 180 has the shape (40199,) and thus cannot be broadcast together with the other scans which have shapes of (40200,)
- if scan == 180:
- counts2amps_A = np.append(current, np.nan)
- if compareB[compareB_counter] == 180:
- counts2amps_B = np.append(counts2amps(compareB[compareB_counter]), np.nan)
- current = counts2amps_A - counts2amps_B
- y_grid, z_grid, current_array = supergrid(y, z, current, 1000)
- # axes for new subplot
- ax = fig.add_subplot(4, 4, loop_count)
- # spacing to other subplots
- plt.subplots_adjust(right=1.3, hspace=0.5, wspace=0.5)
- # y and z labels
- ax.set_xlabel("Position Y [μm]", labelpad=0, fontsize=fontsize)
- ax.set_ylabel("Position Z [μm]", labelpad=0, fontsize=fontsize)
- # title
- bias_voltage = voltage_dic[scan]
- if compare:
- plt.title("Scan #" + str(scan) + " minus Scan #" + str(compareB[compareB_counter]), pad=15,
- fontsize=round(9 / 8 * fontsize))
- else:
- plt.title("Scan #" + str(scan) + bias_voltage, pad=15, fontsize=round(9 / 8 * fontsize))
- # individual ticks: it's an extrapolated plot from 201x201 to 1000x1000 pixels, but this corresponds to 10x10 micrometers on the solar cell
- ax.xaxis.set_ticklabels([0, 5, 10], fontsize=round(4 / 5 * fontsize))
- ax.xaxis.set_ticks([0, 500, 999]) # if 1000 instead of 999 it won't display
- ax.yaxis.set_ticklabels([0, 5, 10], fontsize=round(4 / 5 * fontsize))
- ax.yaxis.set_ticks([0, 500, 999])
- ax.xaxis.set_minor_locator(MultipleLocator(50)) # Minor ticks are set to multiples of 50
- ax.yaxis.set_minor_locator(MultipleLocator(50))
- # plot the data
- im = ax.imshow(current_array, interpolation='nearest', cmap=cm.afmhot, origin='lower')
- # add a colorbar as legend
- ax_divider = make_axes_locatable(ax) # Introduce a divider next to the axis already in place
- cax = ax_divider.append_axes("right", size="15%",
- pad="5%") # Append a new axis object with name "cax" to the divider. Width ("size") and distance to the 2D-Plot ("pad") can be given in % of the width of the x axis.
- legend = colorbar(im, cax=cax) # Introduce a legend (colorbar) where the new axis has been placed
- cax.set_ylabel(r"$\frac{XBIC}{pixel \cdot photons_{avg}}$" + " in (nA)", rotation=-90, labelpad=50,
- fontsize=5 / 4 * fontsize) # Place this label next to the colorbar; labelpad denotes the distance of the colorbar
- cax.yaxis.set_major_locator(AutoLocator())
- cax.yaxis.set_minor_locator(AutoMinorLocator())
- def plotXBIC(plot_range=None, singleplots=None, compareA=None, compareB=None, figsize=(30, 30), savepng=False,
- dpi=None):
- '''
- This function plots the XBIC, using data from the previously defined counts2amps() function. Look at the examples to see how to use it:
- plotXBIC(plot_range=14) will do the following:
- It will plot 14 datasets, starting with #178 and ending with #192 (and omitting 184 which is still counted).
- plotXBIC(singleplots=[178,185,192]) will do the following:
- It will plot #178, #185 and #192.
- plotXBIC(compareA=[178,178,185], compareB=[187,189,189]) will do the following:
- It will plot
- Scan 178 minus Scan 187
- Scan 178 minus Scan 189
- Scan 185 minus Scan 189.
- '''
- start = time.time() # measure time for each plot
- # the best looking fontsize is about half of the width of the figsize
- fontsize = figsize[0] // 2
- # individualize titles for each scan if there is a bias voltage applied (information from LogbuchStrahlzeitPetraIII31.10.-6.11.2018.pptx, slide 54f., corresponding to handwritten logbook)
- voltage_dic = {
- 178: "",
- 179: "",
- 180: "",
- 181: "",
- 182: " (+ 100 mV)",
- 183: " (- 100 mV)",
- 185: " (- 200 mV)",
- 186: " (+ 200 mV)",
- 187: " (+ 500 mV)",
- 188: "(- 500 mV)",
- 189: " (- 1 V)",
- 190: " (+ 600 mv)",
- 191: "",
- 192: ""
- }
- # create a figure for all plots
- fig = plt.figure(figsize=figsize)
- loop_count = 1 # the subplot number should be independent of omitted scans (e.g. scan #184); otherwise you would see an empty space at that position
- if type(plot_range) is int:
- if plot_range > 6:
- plot_range += 1 # scan #184 is omitted, but still counted by the for-loop
- for scan in range(178, 178 + plot_range):
- if scan == 184:
- continue # scan 184 was aborted
- plotXBIC_loop(scan=scan, fig=fig, fontsize=fontsize, voltage_dic=voltage_dic, loop_count=loop_count)
- loop_count += 1
- plt.show()
- if type(singleplots) is list:
- for scan in singleplots:
- if scan == 184:
- continue # scan 184 was aborted
- plotXBIC_loop(scan=scan, fig=fig, fontsize=fontsize, voltage_dic=voltage_dic, loop_count=loop_count)
- loop_count += 1
- plt.show()
- if type(compareA) is list:
- compareB_counter = 0
- for scan in compareA:
- plotXBIC_loop(scan=scan, fig=fig, fontsize=fontsize, voltage_dic=voltage_dic, loop_count=loop_count,
- compare=True, compareB=compareB, compareB_counter=compareB_counter)
- loop_count += 1
- compareB_counter += 1
- plt.show()
- end = time.time()
- if type(compareA) is list:
- print("Scan " + str(scan) + " minus Scan " + str(compareB[compareB_counter]) + " took " + str(
- round(end - start, 2)) + " seconds to plot.")
- else:
- print("Scan " + str(scan) + " took " + str(round(end - start, 2)) + " seconds to plot.")
- if savepng:
- now = datetime.now()
- dt_string = now.strftime("%d-%m-%Y_%H_%M_%S")
- if dpi is None:
- fig.savefig("savefig/plotXBIC_" + dt_string + ".png", dpi=fig.dpi, bbox_inches="tight")
- else:
- fig.savefig("savefig/plotXBIC_" + dt_string + ".png", dpi=dpi, bbox_inches="tight")
- def ImageRegistration(scan1, scan2, cell=None, printdetails=False):
- positions = load_h5(scan1, "positions")
- y_axis_h5 = positions["/data/encoder_fast/data"]
- z_axis_h5 = positions["/data/encoder_slow/data"]
- y = np.array(y_axis_h5)
- z = np.array(z_axis_h5)
- current1 = counts2amps(scan1,
- negative=False) # negative argument needs to be false, because otherwise the arrays with negative Amps will appear inverted which will make findTransformEEC fail
- current2 = counts2amps(scan2, negative=False)
- y_grid1, z_grid1, current1_array_original = supergrid(y, z, current1, 1000)
- y_grid2, z_grid2, current2_array_original = supergrid(y, z, current2, 1000)
- # convert current1_array to a grayscale image format (values between 0 and 255)
- current1_array_8bitgrayscale = current1_array_original / current1_array_original.max()
- current1_array_8bitgrayscale *= (1 / 1 - current1_array_8bitgrayscale.min())
- current1_array_8bitgrayscale += 1 - current1_array_8bitgrayscale.max()
- current1_array_8bitgrayscale[current1_array_8bitgrayscale < 0] = 0
- current1_array_8bitgrayscale = np.array(current1_array_8bitgrayscale * 255, dtype=np.uint8)
- current2_array_8bitgrayscale = current2_array_original / current2_array_original.max()
- current2_array_8bitgrayscale *= (1 / 1 - current2_array_8bitgrayscale.min())
- current2_array_8bitgrayscale += 1 - current2_array_8bitgrayscale.max()
- current2_array_8bitgrayscale[current2_array_8bitgrayscale < 0] = 0
- current2_array_8bitgrayscale = np.array(current2_array_8bitgrayscale * 255, dtype=np.uint8)
- shape = np.shape(current1_array_8bitgrayscale)
- warp_mode = cv2.MOTION_TRANSLATION # we only need translation in direction of y and z (no rotation etc.)
- warp_matrix = np.eye(2, 3,
- dtype=np.float32) # this is the matrix that the results of the ECC algorithm are stored in
- number_of_iterations = 5000 # Specify the number of iterations
- termination_eps = 1e-10 # Specify the threshold of the increment in the correlation coefficient between two iterations
- # Specify the overall criteria
- criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, number_of_iterations, termination_eps)
- (cc, warp_matrix) = cv2.findTransformECC(current1_array_8bitgrayscale, current2_array_8bitgrayscale, warp_matrix,
- warp_mode, criteria, None, 1)
- if printdetails == True:
- scan2_aligned = cv2.warpAffine(current2_array_8bitgrayscale, warp_matrix, (shape[1], shape[0]),
- flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP, borderValue=float("NaN")).astype(
- np.float32)
- scan2_aligned[scan2_aligned == 0] = float("NaN")
- # show where the values of the registered image lie (XBIC as a function of the 40200 datapoints)
- scan2_aligned_ravel = scan2_aligned.ravel()
- x = np.arange(np.shape(scan2_aligned_ravel)[0])
- plt.figure(figsize=(50, 5))
- plt.scatter(x, scan2_aligned_ravel)
- plt.show()
- fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 15), constrained_layout=True)
- im1 = ax1.imshow(current1_array_8bitgrayscale, origin="lower", cmap=cm.afmhot)
- ax1.set_title("Scan #" + str(scan1))
- ax1_divider = make_axes_locatable(ax1) # Introduce a divider next to the axis already in place
- cax1 = ax1_divider.append_axes("right", size="15%",
- pad="5%") # Append a new axis object with name "cax" to the divider. Width ("size") and distance to the 2D-Plot ("pad") can be given in % of the width of the x axis.
- legend = colorbar(im1, cax=cax1) # Introduce a legend (colorbar) where the new axis has been placed
- cax1.yaxis.set_major_locator(AutoLocator())
- cax1.yaxis.set_minor_locator(AutoMinorLocator())
- im2 = ax2.imshow(current2_array_8bitgrayscale, origin="lower", cmap=cm.afmhot)
- ax2.set_title("Scan #" + str(scan2))
- ax2_divider = make_axes_locatable(ax2) # Introduce a divider next to the axis already in place
- cax2 = ax2_divider.append_axes("right", size="15%",
- pad="5%") # Append a new axis object with name "cax" to the divider. Width ("size") and distance to the 2D-Plot ("pad") can be given in % of the width of the x axis.
- legend = colorbar(im2, cax=cax2) # Introduce a legend (colorbar) where the new axis has been placed
- cax2.yaxis.set_major_locator(AutoLocator())
- cax2.yaxis.set_minor_locator(AutoMinorLocator())
- im3 = ax3.imshow(scan2_aligned, origin="lower", cmap="inferno")
- ax3.set_title("Scan #" + str(scan2) + " aligned")
- ax3.set_ylim(bottom=0)
- ax3_divider = make_axes_locatable(ax3) # Introduce a divider next to the axis already in place
- cax3 = ax3_divider.append_axes("right", size="15%",
- pad="5%") # Append a new axis object with name "cax" to the divider. Width ("size") and distance to the 2D-Plot ("pad") can be given in % of the width of the x axis.
- legend = colorbar(im3, cax=cax3) # Introduce a legend (colorbar) where the new axis has been placed
- cax3.yaxis.set_major_locator(AutoLocator())
- cax3.yaxis.set_minor_locator(AutoMinorLocator())
- plt.tight_layout(w_pad=7)
- return warp_matrix
- # plotXBIC_loop2 differs from plotXBIC_loop in that it translates the current array according to Image Registration (axes ticks correspondingly and AutoMinorLocator instead of MultipleLocator)
- def plotXBIC_loop2(scan, fig, fontsize, voltage_dic, loop_count, compareB=None, compareB_counter=None, compare=False):
- # fig.suptitle("Reduced LIA count normalized", fontsize=2*fontsize, x=0.7, y=0.92) # optional title for the figure (with this x and y it is centered and has a good distance)
- # load position and define y, z and f(y,z) as arrays that can be interpreted by the imshow() function
- positions = load_h5(scan, "positions")
- y_axis_h5 = positions["/data/encoder_fast/data"]
- z_axis_h5 = positions["/data/encoder_slow/data"]
- y = np.array(y_axis_h5)
- z = np.array(z_axis_h5)
- current = counts2amps(scan)
- if compare:
- counts2amps_A = current
- counts2amps_B = counts2amps(compareB[compareB_counter])
- # append NaN: scan 180 has the shape (40199,) and thus cannot be broadcast together with the other scans which have shapes of (40200,)
- if scan == 180:
- counts2amps_A = np.append(current, np.nan)
- if compareB[compareB_counter] == 180:
- counts2amps_B = np.append(counts2amps(compareB[compareB_counter]), np.nan)
- current = counts2amps_A - counts2amps_B
- y_grid, z_grid, current_array = supergrid(y, z, current, 1000)
- # axes for new subplot
- ax = fig.add_subplot(4, 4, loop_count)
- # spacing to other subplots
- plt.subplots_adjust(right=1.3, hspace=0.5, wspace=0.5)
- # y and z labels
- ax.set_xlabel("Position Y [μm]", labelpad=0, fontsize=fontsize)
- ax.set_ylabel("Position Z [μm]", labelpad=0, fontsize=fontsize)
- # title
- bias_voltage = voltage_dic[scan]
- if compare:
- plt.title("Scan #" + str(scan) + " minus Scan #" + str(compareB[compareB_counter]), pad=15,
- fontsize=round(9 / 8 * fontsize))
- else:
- plt.title("Scan #" + str(scan) + bias_voltage, pad=15, fontsize=round(9 / 8 * fontsize))
- # ticks and ticklabels need to be shifted according to Image Registration
- if scan in [179, 180, 182, 183, 184, 185, 186, 191, 192]:
- IRS_y = ImageRegistrationDic[scan][0][2]
- IRS_z = ImageRegistrationDic[scan][1][2]
- else:
- IRS_y = 0
- IRS_z = 0
- # individual ticks: it's an extrapolated plot from 201x201 to 1000x1000 pixels, but this corresponds to 10x10 micrometers on the solar cell
- ax.xaxis.set_ticklabels([0, 5, 10], fontsize=round(4 / 5 * fontsize))
- ax.xaxis.set_ticks([0 - IRS_y, 500 - IRS_y, 999 - IRS_y]) # if 1000 instead of 999 it won't display
- ax.yaxis.set_ticklabels([0, 5, 10], fontsize=round(4 / 5 * fontsize))
- ax.yaxis.set_ticks([0 - IRS_z, 500 - IRS_z, 999 - IRS_z])
- ax.xaxis.set_minor_locator(AutoMinorLocator(5)) # 1 minor tick for each micrometer
- ax.yaxis.set_minor_locator(AutoMinorLocator(5))
- if scan in [179, 180, 182, 183, 184, 185, 186, 191, 192]:
- warp_matrix = ImageRegistrationDic[scan]
- if scan not in [178, 181, 187, 188, 189,
- 190]: # scan 178 and 181 are reference scans for the warp matrices, so they don't need to be transformed, and Image Registration would fail for scan #187 to #190
- current_array = cv2.warpAffine(current_array, warp_matrix, (1000, 1000),
- flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP, borderValue=float("NaN")).astype(
- np.float32)
- # plot the data
- im = ax.imshow(current_array, interpolation='nearest', cmap=cm.afmhot, origin='lower')
- # add a colorbar as legend
- ax_divider = make_axes_locatable(ax) # Introduce a divider next to the axis already in place
- cax = ax_divider.append_axes("right", size="15%",
- pad="5%") # Append a new axis object with name "cax" to the divider. Width ("size") and distance to the 2D-Plot ("pad") can be given in % of the width of the x axis.
- legend = colorbar(im, cax=cax) # Introduce a legend (colorbar) where the new axis has been placed
- cax.set_ylabel(r"$\frac{XBIC}{pixel \cdot photons_{avg}}$" + " in (nA)", rotation=-90, labelpad=50,
- fontsize=5 / 4 * fontsize) # Place this label next to the colorbar; labelpad denotes the distance of the colorbar
- cax.yaxis.set_major_locator(AutoLocator())
- cax.yaxis.set_minor_locator(AutoMinorLocator())
- def plotXBIC2(plot_range=None, singleplots=None, compareA=None, compareB=None, figsize=(30, 30), savepng=False,
- dpi=None):
- '''
- This function plots the XBIC, using data from the previously defined counts2amps() function. Look at the examples to see how to use it:
- plotXBIC(plot_range=14) will do the following:
- It will plot 14 datasets, starting with #178 and ending with #192 (and omitting 184 which is still counted).
- plotXBIC(singleplots=[178,185,192]) will do the following:
- It will plot #178, #185 and #192.
- plotXBIC(compareA=[178,178,185], compareB=[187,189,189]) will do the following:
- It will plot
- Scan 178 minus Scan 187
- Scan 178 minus Scan 189
- Scan 185 minus Scan 189.
- '''
- start = time.time() # measure time for each plot
- # the best looking fontsize is about half of the width of the figsize
- fontsize = figsize[0] // 2
- # individualize titles for each scan if there is a bias voltage applied (information from LogbuchStrahlzeitPetraIII31.10.-6.11.2018.pptx, slide 54f., corresponding to handwritten logbook)
- voltage_dic = {
- 178: "",
- 179: "",
- 180: "",
- 181: "",
- 182: " (+ 100 mV)",
- 183: " (- 100 mV)",
- 185: " (- 200 mV)",
- 186: " (+ 200 mV)",
- 187: " (+ 500 mV)",
- 188: "(- 500 mV)",
- 189: " (- 1 V)",
- 190: " (+ 600 mv)",
- 191: "",
- 192: ""
- }
- # create a figure for all plots
- fig = plt.figure(figsize=figsize)
- loop_count = 1 # the subplot number should be independent of omitted scans (e.g. scan #184); otherwise you would see an empty space at that position
- if type(plot_range) is int:
- if plot_range > 6:
- plot_range += 1 # scan #184 is omitted, but still counted by the for-loop
- for scan in range(178, 178 + plot_range):
- if scan == 184:
- continue # scan 184 was aborted
- plotXBIC_loop2(scan=scan, fig=fig, fontsize=fontsize, voltage_dic=voltage_dic, loop_count=loop_count)
- loop_count += 1
- plt.show()
- end = time.time()
- print(
- "Plotting scan 178 to scan " + str(178 + plot_range) + " took " + str(round(end - start, 2)) + " seconds.")
- if type(singleplots) is list:
- for scan in singleplots:
- if scan == 184:
- continue # scan 184 was aborted
- plotXBIC_loop2(scan=scan, fig=fig, fontsize=fontsize, voltage_dic=voltage_dic, loop_count=loop_count)
- loop_count += 1
- plt.show()
- end = time.time()
- print("Scan " + str(scan) + " took " + str(round(end - start, 2)) + " seconds to plot.")
- if type(compareA) is list:
- compareB_counter = 0
- for scan in compareA:
- plotXBIC_loop2(scan=scan, fig=fig, fontsize=fontsize, voltage_dic=voltage_dic, loop_count=loop_count,
- compare=True, compareB=compareB, compareB_counter=compareB_counter)
- loop_count += 1
- compareB_counter += 1
- plt.show()
- end = time.time()
- print("Scan " + str(scan) + " minus Scan " + str(compareB[compareB_counter]) + " took " + str(
- round(end - start, 2)) + " seconds to plot.")
- if savepng:
- now = datetime.now()
- dt_string = now.strftime("%d-%m-%Y_%H_%M_%S")
- if dpi is None:
- fig.savefig("savefig/plotXBIC_" + dt_string + ".png", dpi=fig.dpi, bbox_inches="tight")
- else:
- fig.savefig("savefig/plotXBIC_" + dt_string + ".png", dpi=dpi, bbox_inches="tight")
- def plotIV(coordinate=None, sample_size=100, set_region=(0, 999, 0, 999), showPosition=False, inset=False,
- figsize=(30, 30), savepng=False, dpi=None, savepdf=False, annotations=False, data_processing=1,
- return_only=False):
- start = time.time()
- # voltages
- voltages = [-1000, -500, -200, -100, 0, 100, 200, 500, 600]
- # scans (ordered by voltages)
- scans_ordered = [189, 188, 185, 183, 181, 182, 186, 187, 190]
- # convert list to array
- voltages, scans_ordered = np.array(voltages), np.array(scans_ordered)
- current_array_list = []
- current_mean = []
- # plot either with or without negative values and select if the solar cell channel should be selected for the current
- options_dic = {
- 1: (False, False, "Lock in R , ohne Negativwerte"),
- 2: (False, True, "Lock in R, mit Negativwerten"),
- 3: (True, False, "solar cell channel, ohne Negativwerte"),
- 4: (True, True, "solar cell channel, mit Negativwerten")
- }
- if not return_only:
- print(options_dic[data_processing][2])
- A = 2 # plot only from scan_ordered[A] to ...
- B = -2 # ... scan_ordered[B]
- counter = 0
- for scan in scans_ordered[A:B]:
- positions = load_h5(scan, "positions")
- y_axis_h5 = positions["/data/encoder_fast/data"]
- z_axis_h5 = positions["/data/encoder_slow/data"]
- y_axis = np.array(y_axis_h5)
- z_axis = np.array(z_axis_h5)
- current = counts2amps(scan, solar_cell_channel_only=options_dic[data_processing][0],
- negative=options_dic[data_processing][1])
- y_grid, z_grid, current_array = supergrid(y_axis, z_axis, current, 1000)
- if scan != 181:
- warp_matrix = ImageRegistrationDic[scan]
- current_array = cv2.warpAffine(current_array, warp_matrix, (1000, 1000),
- flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP,
- borderValue=float("NaN")).astype(np.float32)
- current_array_list.append(current_array)
- current_mean.append(np.nanmean(current_array))
- print(
- f"Mean current for scan {scan} is {str(round(current_mean[counter], 4))} nA. Its min/max current is {str(round(np.nanmin(current_array), 4))}/{str(round(np.nanmax(current_array), 4))} nA.") # the values are only shown to a few decimal places if not using e.g. {round(current_mean[counter],4):.20f}
- counter += 1
- pixelplotting_array = np.stack(current_array_list)
- if return_only:
- return current_array_list, pixelplotting_array
- fontsize = 1.5 * figsize[0] // 2
- fig = plt.figure(figsize=figsize)
- ax1 = fig.add_subplot(221)
- # y and z labels
- ax1.set_xlabel("Bias voltage [mV]", labelpad=10, fontsize=9 / 5 * fontsize)
- ax1.set_ylabel("X-Ray beam induced current (nA)", labelpad=10, fontsize=9 / 5 * fontsize)
- plt.title("IV curve(s) for single pixel(s)", pad=15, fontsize=round(9 / 5 * fontsize))
- ax1.tick_params(labelsize=9 / 5 * fontsize, length=fontsize)
- ax1.set_xticks(voltages[A:B])
- # allows to plot only a specific location on the y-z current array (else plot each 'sample_size'-th IV curve):
- if coordinate != None:
- ax1.plot(voltages[A:B], pixelplotting_array[:, coordinate[0], coordinate[1]], "-o", markersize=8, linewidth=1.2,
- color="blue")
- if inset:
- # Create a set of inset Axes: these should fill the bounding box allocated to them.
- ax1_2 = plt.axes([0, 0, 1, 1])
- # Manually set the position and relative size of the inset axes within ax1
- ip = InsetPosition(ax1, [0.2, 0.2, 0.3, 0.3])
- ax1_2.set_axes_locator(ip)
- else:
- # Create a new subplot
- ax1_2 = fig.add_subplot(223)
- # ticks and ticklabels need to be shifted according to Image Registration
- scan = scans_ordered[B - 1] # show the scan of the last IV curve point as an example
- IRS_y = ImageRegistrationDic[scan][0][2] # IRS = Image Registration Shift (in y or z)
- IRS_z = ImageRegistrationDic[scan][1][2]
- # individual ticks: it's an extrapolated plot from 201x201 to 1000x1000 pixels, but this corresponds to 10x10 micrometers on the solar cell
- ax1_2.xaxis.set_ticklabels([0, 5, 10], fontsize=round(9 / 5 * fontsize))
- ax1_2.xaxis.set_ticks([0 - IRS_y, 500 - IRS_y, 999 - IRS_y]) # if 1000 instead of 999 it won't display
- ax1_2.yaxis.set_ticklabels([0, 5, 10], fontsize=round(9 / 5 * fontsize))
- ax1_2.yaxis.set_ticks([0 - IRS_z, 500 - IRS_z, 999 - IRS_z])
- ax1_2.xaxis.set_minor_locator(AutoMinorLocator(5)) # 1 minor tick for each micrometer
- ax1_2.yaxis.set_minor_locator(AutoMinorLocator(5))
- ax1_2.imshow(current_array, interpolation='nearest', cmap=cm.afmhot, origin='lower')
- ax1_2.plot([coordinate[0]], coordinate[1], "bo", markersize=8)
- # y and z labels
- ax1_2.set_xlabel("Position Y [μm]", labelpad=10, fontsize=fontsize)
- ax1_2.set_ylabel("Position Z [μm]", labelpad=10, fontsize=fontsize)
- print("Number of NaN values on shown IV curve:",
- str(len(np.where(np.isnan(pixelplotting_array[:, coordinate[0], coordinate[1]]))[0])))
- if coordinate == None:
- countnumberlines = 0
- countNaN = 0
- # define the colormap for the curves
- cmap = cm.gist_rainbow
- norm = colors.Normalize(vmin=1, vmax=sample_size)
- for z in np.linspace(set_region[0], set_region[1], sample_size ** 0.5, dtype=int):
- for y in np.linspace(set_region[2], set_region[3], sample_size ** 0.5, dtype=int):
- # ax1.plot(voltages[A:B], pixelplotting_array[:,y,z], "-o", markersize=8, linewidth=1.2, color=cmap(norm(countnumberlines)))
- ax1.plot(voltages[A:B], pixelplotting_array[:, y, z], "-", linewidth=0.5, color="blue")
- ax1.plot(voltages[A:B], current_mean, "-o", markersize=12, linewidth=4, color="red")
- if showPosition:
- if inset:
- # Create a set of inset Axes: these should fill the bounding box allocated to them.
- ax1_2 = plt.axes([0, 0, 1, 1])
- # Manually set the position and relative size of the inset axes within ax1
- ip = InsetPosition(ax1, [0.2, 0.05, figsize[0] / 120, figsize[0] / 120])
- ax1_2.set_axes_locator(ip)
- else:
- # Create a new subplot
- ax1_2 = fig.add_subplot(223)
- # ticks and ticklabels need to be shifted according to Image Registration
- scan = scans_ordered[B - 1] # show the scan of the last IV curve point as an example
- IRS_y = ImageRegistrationDic[scan][0][2] # IRS = Image Registration Shift (in y or z)
- IRS_z = ImageRegistrationDic[scan][1][2]
- # individual ticks: it's an extrapolated plot from 201x201 to 1000x1000 pixels, but this corresponds to 10x10 micrometers on the solar cell
- ax1_2.xaxis.set_ticklabels([0, 5, 10], fontsize=round(fontsize))
- ax1_2.xaxis.set_ticks(
- [0 - IRS_y, 500 - IRS_y, 999 - IRS_y]) # if 1000 instead of 999 it won't display
- ax1_2.yaxis.set_ticklabels([0, 5, 10], fontsize=round(fontsize))
- ax1_2.yaxis.set_ticks([0 - IRS_z, 500 - IRS_z, 999 - IRS_z])
- ax1_2.xaxis.set_minor_locator(AutoMinorLocator(5)) # 1 minor tick for each micrometer
- ax1_2.yaxis.set_minor_locator(AutoMinorLocator(5))
- ax1_2.imshow(current_array, interpolation='nearest', cmap=cm.afmhot, origin='lower')
- # ax1_2.plot(y,z, "bo", markersize=8, color=cmap(norm(countnumberlines)))
- ax1_2.plot(y, z, "bo", markersize=8, color="k")
- # y and z labels
- ax1_2.set_xlabel("Position Y [μm]", labelpad=10, fontsize=fontsize)
- ax1_2.set_ylabel("Position Z [μm]", labelpad=10, fontsize=fontsize)
- countnumberlines += 1
- countNaN += len(np.where(np.isnan(pixelplotting_array[:, y, z]))[0])
- print("Number of IV curves shown in plot:", str(countnumberlines))
- print("Number of NaN values on shown IV curves:", str(countNaN))
- ax2 = fig.add_subplot(222)
- ax2.plot(voltages[A:B], current_mean, "-o", markersize=8, linewidth=1.2, color="blue",
- label="Mean XBIC for all (y,z) positions (nA)")
- ax2.legend(fontsize=7 / 5 * fontsize)
- # fitting
- popt, pcov = curve_fit(lambda V, I_0, I_Ph: shockley_function(V, I_0, I_Ph) * (-1e9), voltages[A:B] / 1000,
- current_mean, p0=[5e-12, current_mean[2] * 1e-9],
- bounds=([1e-13, (current_mean[2] - 3) * 1e-9], [1e-11, (current_mean[2] + 3) * 1e-9]))
- ax2.plot(voltages[A:B], shockley_function(voltages[A:B] / 1000, popt[0], popt[1]) * (-1e9), "-o", markersize=6,
- linewidth=0.6, color="red", label="Fitted IV curve")
- ax2.legend(fontsize=7 / 5 * fontsize)
- print("-----\nActually measured:\nShort-circuit current:", str(round(current_mean[2], 4)),
- "nA\n-----\nResult of fitting:\nSaturation current I_0:", str(round(popt[0] * 1e9, 4)),
- "nA\nPhotocurrent I_Ph:", str(round(popt[1] * 1e9, 4)),
- "nA\nOpen Circuit Voltage V_OC:", str(round(shockley_function_VOC(popt[0], popt[1]) * 1e3, 4)), "mV\n-----")
- if ax1.get_ylim()[0] < 202 and ax1.get_ylim()[
- 1] > 218: # this condition makes sure that the min and max mean current values are still shown
- ax2.set_ylim(
- ax1.get_ylim()) # make sure that the current interval is the same so that the shape of the curves can be compared
- ax2.set_xlabel("Bias voltage [mV]", labelpad=10, fontsize=9 / 5 * fontsize)
- ax2.set_ylabel("X-Ray beam induced current (nA)", labelpad=10, fontsize=9 / 5 * fontsize)
- plt.title("Mean IV curve", pad=15, fontsize=round(9 / 5 * fontsize))
- ax2.tick_params(labelsize=9 / 5 * fontsize, length=fontsize)
- ax2.set_xticks(voltages[A:B])
- if annotations:
- for i in range(len(current_mean)):
- ax2.annotate("Scan " + str(scans_ordered[i + A]), xy=(voltages[i + A], current_mean[i]),
- xytext=(voltages[i + A], current_mean[i] + 1.3),
- arrowprops=dict(facecolor='black', shrink=0.12, width=2, headwidth=8))
- '''#distance of ticklabels from ticks
- ax1.tick_params(axis='both', which='major', pad=25)
- ax1_2.tick_params(axis='both', which='major', pad=25)
- ax2.tick_params(axis='both', which='major', pad=25)'''
- plt.subplots_adjust(wspace=0.35) # adjust the width between the subplots
- plt.show()
- end = time.time()
- print("Plotting took {} seconds.".format(str(round(end - start, 2))))
- if savepng:
- now = datetime.now()
- dt_string = now.strftime("%d-%m-%Y_%H_%M_%S")
- if dpi is None:
- fig.savefig("savefig/IV_curve_" + dt_string + ".png", dpi=fig.dpi, bbox_inches="tight")
- else:
- fig.savefig("savefig/IV_curve_" + dt_string + ".png", dpi=dpi, bbox_inches="tight")
- if savepdf:
- fig.savefig("savefig/IV_curve_" + dt_string + ".pdf", format="pdf")
- def shockley_function(V,I_0,I_Ph): # arguments and output in SI units
- return I_0*(np.exp(constants.e*V/(constants.k*300))-1)-I_Ph
- def shockley_function_VOC(I_0,I_Ph): # arguments and output in SI units
- return constants.k*300*np.log(I_Ph/I_0 + 1)/constants.e
- def plotClusterCurves(k=None, k_curves=None, NaN=False, sample_size=100, background=True, data_processing=1,
- compare=False, dpi=None, savepng=False):
- start = time.time()
- # use variables from plotIV function
- current_array = plotIV(data_processing=data_processing, return_only=True)[0][
- -1] # select scan of last IV curve point
- pixelplotting_array = plotIV(data_processing=data_processing, return_only=True)[1]
- voltages = [-1000, -500, -200, -100, 0, 100, 200, 500, 600]
- A = 2 # plot only from scan_ordered[A] to ...
- B = -2 # ... scan_ordered[B]
- figsize = (30, 15)
- fontsize = figsize[0] // 2
- fig = plt.figure(figsize=figsize)
- ax1 = fig.add_subplot(121)
- # ticks and ticklabels need to be shifted according to Image Registration
- scan = 186 # show the scan of the last IV curve point as an example <-- here it is scan 186, at B = -2 in the plotIV() function
- IRS_y = ImageRegistrationDic[scan][0][2] # IRS = Image Registration Shift (in y or z)
- IRS_z = ImageRegistrationDic[scan][1][2]
- # individual ticks: it's an extrapolated plot from 201x201 to 1000x1000 pixels, but this corresponds to 10x10 micrometers on the solar cell
- ax1.xaxis.set_ticklabels([0, 5, 10], fontsize=round(4 / 5 * fontsize))
- ax1.xaxis.set_ticks([0 - IRS_y, 500 - IRS_y, 999 - IRS_y]) # if 1000 instead of 999 it won't display
- ax1.yaxis.set_ticklabels([0, 5, 10], fontsize=round(4 / 5 * fontsize))
- ax1.yaxis.set_ticks([0 - IRS_z, 500 - IRS_z, 999 - IRS_z])
- ax1.xaxis.set_minor_locator(AutoMinorLocator(5)) # 1 minor tick for each micrometer
- ax1.yaxis.set_minor_locator(AutoMinorLocator(5))
- if background:
- ax1.imshow(current_array, interpolation='nearest', cmap=cm.afmhot, origin='lower')
- # y and z labels
- ax1.set_xlabel("Position Y [μm]", labelpad=0, fontsize=fontsize)
- ax1.set_ylabel("Position Z [μm]", labelpad=0, fontsize=fontsize)
- if NaN: # this option was only used, because there was something wrong with the plotClusterCurve "if NaN" condition itself, adding weird NaN values to the current grid
- NaN_where = np.where(np.isnan(pixelplotting_array))
- NaN_where = np.flip(NaN_where, 0) # np.flip is necessary, because np.isnan reads from bottom to top
- ax1.scatter(NaN_where[0], NaN_where[1])
- ax1.set_title("Position of NaN values (in blue)", pad=15, fontsize=round(9 / 8 * fontsize))
- if k != None:
- for z in np.linspace(0, 999, sample_size ** 0.5, dtype=int):
- for y in np.linspace(0, 999, sample_size ** 0.5, dtype=int):
- if np.array(kmeansDic[str(k)]["cluster_numbers"])[y][z] == k_curves:
- ax1.scatter(y, z, color="red")
- else:
- ax1.scatter(y, z, color="black")
- ax1.set_title("Position of the displayed curve", pad=15, fontsize=round(9 / 8 * fontsize))
- if k_curves != None: # plot curves belonging to the k_th cluster
- ax2 = fig.add_subplot(122)
- k_curves_values = pixelplotting_array[:, np.array(kmeansDic[str(k)]["cluster_numbers"]) == k_curves].T
- mean_array = np.empty(0)
- for scan_index in range(5):
- mean_array = np.append(mean_array, np.nanmean(k_curves_values[:, scan_index]))
- print(f"Mean currents for curve #{k_curves} (with k = {k}):", str(mean_array))
- countnumberlines = 0
- for z in np.linspace(0, 999, sample_size ** 0.5, dtype=int):
- for y in np.linspace(0, 999, sample_size ** 0.5, dtype=int):
- if not math.isnan(np.array(kmeansDic[str(k)]["cluster_numbers"])[y][z]):
- if int(np.array(kmeansDic[str(k)]["cluster_numbers"])[y][z]) == k_curves:
- ax2.plot(voltages[A:B], pixelplotting_array[:, y, z], linewidth=0.5, color="blue")
- countnumberlines += 1
- ax2.plot(voltages[A:B], mean_array, "-o", linewidth=1.2, color="red")
- ax2.set_title(f"IV curves for group {k_curves} (with k = {k})", pad=15, fontsize=round(9 / 8 * fontsize))
- if compare:
- for comparing_curve in range(k + 1):
- if comparing_curve == k_curves: # don't plot again the red curve for the k_curve that is already selected
- continue
- k_curves_values = pixelplotting_array[:,
- np.array(kmeansDic[str(k)]["cluster_numbers"]) == comparing_curve].T
- mean_array = np.empty(0)
- for scan_index in range(5):
- mean_array = np.append(mean_array, np.nanmean(k_curves_values[:, scan_index]))
- ax2.plot(voltages[A:B], mean_array, "-o", linewidth=1.2, color="black")
- print("For comparison, the mean curve for group", str(comparing_curve), "has been plotted:",
- str(mean_array))
- ax2.set_title(
- f"IV curves for group {k_curves} (with k = {k}), compared to the mean IV curves of the other groups",
- pad=15, fontsize=round(9 / 8 * fontsize))
- print("Number of IV curves shown in plot:", str(countnumberlines))
- plt.show()
- end = time.time()
- print("Plotting took {} seconds.".format(str(round(end - start, 2))))
- if savepng:
- now = datetime.now()
- dt_string = now.strftime("%d-%m-%Y_%H_%M_%S")
- if dpi is None:
- fig.savefig("savefig/cluster_curve_" + dt_string + ".png", dpi=fig.dpi, bbox_inches="tight")
- else:
- fig.savefig("savefig/cluster_curve_" + dt_string + ".png", dpi=dpi, bbox_inches="tight")
- def plotClusterPosition(k=2, dpi=None, savepng=False):
- start = time.time()
- figsize = (10, 10)
- fig = plt.figure(figsize=figsize)
- hsv_modified = cm.get_cmap('hsv', 256) # create new hsv colormaps in range of 0.3 (green) to 0.7 (blue)
- newcmp = ListedColormap(hsv_modified(np.linspace(0, 0.7, 256))) # show figure
- new_inferno = cm.get_cmap(newcmp, 5) # visualize with the new_inferno colormaps
- im = plt.pcolormesh(kmeansDic[str(k)]["cluster_numbers"], cmap=new_inferno)
- plt.colorbar()
- # im = ax.imshow(np.array(kmeansDic[str(k)]["cluster_numbers"]), interpolation='nearest', cmap=cm.afmhot, origin='lower')
- '''
- # add a colorbar as legend
- ax_divider = make_axes_locatable(ax) # Introduce a divider next to the axis already in place
- cax = ax_divider.append_axes("right", size = "15%", pad = "5%") # Append a new axis object with name "cax" to the divider. Width ("size") and distance to the 2D-Plot ("pad") can be given in % of the width of the x axis.
- legend = colorbar(im, cax = cax) # Introduce a legend (colorbar) where the new axis has been placed
- cax.set_ylabel("Number of cluster", rotation = -90, labelpad = 50 , style = "italic", fontsize=5/4*fontsize) # Place this label next to the colorbar; labelpad denotes the distance of the colorbar
- cax.yaxis.set_major_locator(AutoLocator())
- cax.yaxis.set_minor_locator(AutoMinorLocator())
- '''
- end = time.time()
- print("Plotting took {} seconds.".format(str(round(end - start, 2))))
- if savepng:
- now = datetime.now()
- dt_string = now.strftime("%d-%m-%Y_%H_%M_%S")
- if dpi is None:
- fig.savefig("savefig/ClusterPosition_" + dt_string + ".png", dpi=fig.dpi, bbox_inches="tight")
- else:
- fig.savefig("savefig/ClusterPosition_" + dt_string + ".png", dpi=dpi, bbox_inches="tight")
- plotClusterPosition(k=2)
- def kmeans_clustering(data_processing=1, k=3, random_state=22, scale_all=True, absolute_values=False, sample_size=50000,
- silhouette=True):
- pixelplotting_array = plotIV(data_processing=data_processing, return_only=True)[1]
- pixelvectors = pixelplotting_array.reshape(5,
- 1000000).T # each 5D vector represents 1 pixel with 5 different bias voltages from - to + 200 mV
- if not absolute_values:
- pixelvectors = np.array([
- pixelvectors.T[0] - pixelvectors.T[1],
- pixelvectors.T[1] - pixelvectors.T[2],
- pixelvectors.T[2] - pixelvectors.T[3],
- pixelvectors.T[3] - pixelvectors.T[4]
- ]).T
- data = pixelvectors[~np.isnan(pixelvectors).any(axis=1)] # delete all vectors which contain NaN values
- if scale_all: # maximum of ALL values is 1 and minimum of ALL values is 0
- scaler = MinMaxScaler()
- scaler.fit(data)
- data_transformed = scaler.transform(data)
- else: # maximum of each curve is 1 and minimum of each curve is 0
- data_transformed = data / np.linalg.norm(data, ord=2, axis=1, keepdims=True)
- km = KMeans(n_clusters=k)
- groups = km.fit_predict(
- data_transformed) + 1 # Adding 1 calls the first group "group 1" instead of "group 0". Also, it avoids confusion with the zeros_array
- number_sqdist = km.inertia_
- if silhouette: # silhouette slows down the function extremely, so it is given as optional
- silhouette = silhouette_score(data_transformed, groups, sample_size=sample_size)
- else:
- silhouette = 0
- calinski_harabasz = calinski_harabasz_score(data_transformed, groups)
- no_data = np.isnan(pixelvectors).any(
- axis=1) # returns a 1D array with True if a vector contains a NaN value, else False
- no_data_count = no_data.sum() # counts all the vectors with NaNs
- data_count = 1000000 - no_data_count
- zeros_array = np.zeros(1000000)
- zeros_array[~no_data] = groups # fills in all the group numbers where there are no NaNs in a vector
- zeros_array[no_data] = np.nan # where there are NaNs in a vector: assign NaN as "group number"
- cluster_array = zeros_array.reshape(1000, 1000)
- return cluster_array, number_sqdist, silhouette, calinski_harabasz
- def find_k(method="elbow", dpi=None, savepng=False):
- figsize = (12, 12)
- fontsize = figsize[0]
- fig = plt.figure(figsize=figsize)
- ax1 = fig.add_subplot(111)
- x = []
- y = []
- if method == "elbow": # elbow method
- for k in kmeansDic.keys():
- x.append(int(k))
- y.append(np.array(kmeansDic[str(k)]["ssqd"]))
- plt.scatter(x, y)
- string1 = "Elbow method"
- string2 = "Sum of squared distances to respective centroid"
- string3 = "elbow" # only for printing PNGs
- if method == "silhouette": # silhouette score := (b-a) divided by max(a,b) ; where a is the intra-cluster distance and b is the nearest-cluster distance
- for k in kmeansDic.keys():
- x.append(int(k))
- y.append(np.array(kmeansDic[str(k)]["silhouette"]))
- plt.scatter(x, y)
- string1 = "Silhouette score"
- string2 = "(b-a)/max(a,b)"
- string3 = "silhouette" # only for printing PNGs
- if method == "calinski_harabasz": # Calinski-Harabasz score := within-cluster disperson divided by between-cluster dispersion
- for k in kmeansDic.keys():
- x.append(int(k))
- y.append(np.array(kmeansDic[str(k)]["calinski_harabasz"]))
- plt.scatter(x, y)
- string1 = "Calinski-Harabasz score"
- string2 = "ratio between within-cluster and between-cluster disperson"
- string3 = "calinski_harabasz" # only for printing PNGs
- if method == "all": # compare all scores next to each other
- for k in kmeansDic.keys():
- x.append(int(k))
- y.append(np.array(kmeansDic[str(k)]["ssqd"]))
- y_normalized = []
- for entry in y:
- y_normalized.append(entry / np.max(y))
- print("elbow:", str(y_normalized))
- elbow_scatter = ax1.scatter(x, y_normalized, c="red")
- x.clear() # don't take over
- y.clear() # values from ssqd!
- for k in kmeansDic.keys():
- x.append(int(k))
- y.append(np.array(kmeansDic[str(k)]["silhouette"]))
- y_normalized = []
- for entry in y:
- y_normalized.append(entry / np.max(y))
- print("silhouette:", str(y_normalized))
- silhouette_scatter = ax1.scatter(x, y_normalized, c="green")
- x.clear() # don't take over
- y.clear() # values from silhouette!
- for k in kmeansDic.keys():
- x.append(int(k))
- y.append(np.array(kmeansDic[str(k)]["calinski_harabasz"]))
- y_normalized = []
- for entry in y:
- y_normalized.append(entry / np.max(y))
- print("calinski_harabasz:", str(y_normalized))
- calinski_harabasz_scatter = ax1.scatter(x, y_normalized, c="blue")
- string1 = "All scores in comparison"
- string2 = "Normalized scores"
- plt.legend((elbow_scatter, silhouette_scatter, calinski_harabasz_scatter),
- ("Elbow Method", "Silhouette score", "Calinski-Harabasz score"), scatterpoints=1, loc='lower left',
- ncol=3, fontsize=fontsize)
- ax1.set_xlabel("Chosen k for clustering", labelpad=10, fontsize=9 / 5 * fontsize, fontweight="bold")
- ax1.set_ylabel(string2, labelpad=10, fontsize=9 / 5 * fontsize, fontweight="bold")
- plt.title(string1, pad=15, fontsize=round(9 / 5 * fontsize), fontweight="bold")
- ax1.tick_params(labelsize=9 / 5 * fontsize, length=fontsize)
- ax1.set_xticks(x)
- if savepng:
- now = datetime.now()
- dt_string = now.strftime("%d-%m-%Y_%H_%M_%S")
- if dpi is None:
- fig.savefig("savefig/find_k_" + string3 + dt_string + ".png", dpi=fig.dpi, bbox_inches="tight")
- else:
- fig.savefig("savefig/find_k_" + string3 + dt_string + ".png", dpi=dpi, bbox_inches="tight")
- # To speed up all calculations remarkably: Create a dictionary with all the warp matrices:
- # Run each time you start working this notebook to save the values in the global namespace.
- start = time.time()
- ImageRegistrationDic = {
- 179: ImageRegistration(178,179),
- 180: ImageRegistration(178,180),
- 182: ImageRegistration(181,182),
- 183: ImageRegistration(181,183),
- 185: ImageRegistration(181,185),
- 186: ImageRegistration(181,186),
- 191: ImageRegistration(181,191),
- 192: ImageRegistration(181,192)
- }
- end = time.time()
- print("Calculating all warp matrices took", round((end-start), 2), "seconds.")
- '''
- # Create a nested dictionary for all cluster arrays, sum of squared distances, silhouette and calinski_harabasz scores
- start = time.time()
- kmeansDic = {}
- silhouette_setting = True # True should be selected for complete data. If False is selected, then silhouette will just be assigned to 0 and omitted, saving a lot of computation time.
- sample_size = 300000 # this number refers to the sample size in the silhouette_score() function which is computationally very expensive
- k_lowest = 2
- k_highest = 3
- data_processing = 1
- scale_all = True
- absolute_values = False
- compression = True
- for k in range(k_lowest, k_highest+1):
- start_k = time.time()
- cluster_numbers, ssqd, silhouette, calinski_harabasz = kmeans_clustering(data_processing=data_processing, scale_all=scale_all, absolute_values=absolute_values, k=k, sample_size=sample_size, silhouette=silhouette_setting)
- kmeansDic[str(k)] = {"cluster_numbers": cluster_numbers, "ssqd": ssqd, "silhouette": silhouette, "calinski_harabasz": calinski_harabasz}
- end = time.time()
- print("Finished entry in nested dictionary for k = " + str(k) + ". That took " + str(round((end-start_k), 2)) + " seconds.")
- end = time.time()
- print("Calculating all cluster arrays took", round((end-start), 2), "seconds.")
- ### SAVE CACHE ###
- # with these settings (copied from http://www.silx.org/doc/silx/0.2.0/modules/io/dictdump.html), the file size is much smaller, but loading takes much longer, so better not use it!
- os.chdir("/asap3/petra3/gpfs/p06/2018/data/11005475/scratch_cc/Jan") # just make sure that we are talking about the correct location in the file system
- if compression:
- create_ds_args = {'compression': "gzip",
- 'shuffle': True,
- 'fletcher32': True}
- else:
- create_ds_args = None
- saveh5 = input("Do you want to save the dictionary as a h5 file that other functions will refer to ? (y/n)\n")
- if saveh5.lower() == "y":
- if not compression:
- fname = f"kmeansDic({k_lowest},{k_highest})_{sample_size}-samplesize_{silhouette_setting!s}-silhouette_{data_processing}-data-processing_{scale_all!s}-scale-all_{absolute_values!s}-absolute-values.h5"
- else:
- fname = f"kmeansDic({k_lowest},{k_highest})_{sample_size}-samplesize_{silhouette_setting!s}-silhouette_{data_processing}-data-processing_{scale_all!s}-scale-all_{absolute_values!s}-absolute-values_COMPRESSED.h5"
- dicttoh5(kmeansDic, "Cache/"+ fname, create_dataset_args=create_ds_args)
- print("The nested dictionary was saved as \"" + fname + "\" in the Cache folder.")
- if saveh5.lower() == "n":
- now = datetime.now()
- dt_string = now.strftime("%d-%m-%Y_%H_%M_%S")
- fname = "kmeansDic_no-save_" + dt_string + ".h5"
- dicttoh5(kmeansDic, "Cache/" + fname)
- print("The nested dictionary was saved as \"" + fname + "\" in the Cache folder.")
- '''
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement