Source code for tshirt.pipeline.utils

#import phot_pipeline
import numpy as np
from scipy.stats import binned_statistic
import matplotlib.pyplot as plt
from astropy.table import Table
from astropy.io import fits, ascii
from scipy import signal
from scipy.interpolate import interp1d
from scipy import ndimage
import matplotlib.pyplot as plt
import pdb
from copy import deepcopy
import os
from scipy.interpolate import UnivariateSpline, LSQUnivariateSpline
import warnings

def robust_statistics(data,method='robust mean',nsig=10):
    median_val = np.median(data)
    mad = np.median(np.abs(data - median_val))
    if method == 'median':
        oneStatistic = median_val
        err = mad / np.sqrt(np.sum(np.isfinite(data)))
    elif method == 'robust mean':
        goodp = np.abs(data - median_val) < (nsig * mad)
        oneStatistic = np.mean(data[goodp])
        err = mad / np.sqrt(np.sum(goodp))
    else:
        raise Exception("Unrecognized statistic {}".format(method))
    
    return oneStatistic, err

[docs] def robust_poly(x,y,polyord,sigreject=3.0,iteration=3,useSpline=False,knots=None, preScreen=False,plotEachStep=False): """ Fit a function (with sigma rejection) to a curve Parameters ----------- x: numpy array Independent variable y: numpy array Dependent variable polyord: int order of the fit (number of terms). polyord=1 is a linear fit, 2 is a quadratic, etc. sigreject: float The 'sigma' rejection level in terms of median absolute deviations useSpline: bool Do a spline fit? knots: int or None How many knots to use if doing a spline fit preScreen: bool Pre-screen by removing outliers from the median (which might fail for large slopes) plotEachStep: bool Plot each step of the fitting? Example -------------- .. code-block:: python import numpy as np from tshirt.pipeline import phot_pipeline import matplotlib.pyplot as plt x = np.arange(30) y = np.random.randn(30) + x y[2] = 80 ## an outlier polyfit = phot_pipeline.robust_poly(x,y,1) ymodel = np.polyval(polyfit,x) plt.plot(x,y,'o',label='input') plt.plot(x,ymodel,label='fit') plt.show() """ finitep = np.isfinite(y) & np.isfinite(x) if preScreen == True: resid = np.abs(y - np.nanmedian(y)) madev = np.nanmedian(resid) goodp = np.zeros_like(resid,dtype=bool) goodp[finitep] = (np.abs(resid[finitep]) < (sigreject * madev)) else: goodp = finitep ## Start with the finite points for iter in range(iteration): if (useSpline == True) & (knots is not None): pointsThreshold = len(knots) + polyord else: pointsThreshold = polyord if np.sum(goodp) <= pointsThreshold: warntext = "Less than "+str(polyord)+"points accepted, returning flat line" warnings.warn(warntext) if useSpline == True: spl = UnivariateSpline([0,1,2],[0,0,0],k=1) else: coeff = np.zeros(polyord + 1) coeff[0] = 1.0 else: if useSpline == True: if knots is None: spl = UnivariateSpline(x[goodp], y[goodp], k=polyord, s=sSpline) else: try: spl = LSQUnivariateSpline(x[goodp], y[goodp], knots, k=polyord) except ValueError as inst: knownFailures = ((str(inst) == 'Interior knots t must satisfy Schoenberg-Whitney conditions') | ("The input parameters have been rejected by fpchec." in str(inst))) if knownFailures: warnings.warn("Spline fitting failed because of Schoenberg-Whitney conditions. Trying to eliminate knots without sufficient data") if plotEachStep == True: plt.plot(x[goodp],y[goodp],'o',label='data') plt.plot(knots,np.ones_like(knots) * np.median(y[goodp]),'o',label='knots',markersize=10) keepKnots = np.zeros_like(knots,dtype=bool) nKnots = len(knots) for ind,oneKnot in enumerate(knots): if ind == 0: if np.sum(x[goodp] < oneKnot) > 0: keepKnots[ind] = True elif ind == nKnots - 1: if np.sum(x[goodp] > oneKnot) > 0: keepKnots[ind] = True else: pointsTest = ((np.sum((x[goodp] > knots[ind-1]) & (x[goodp] < oneKnot)) > 0 ) & (np.sum((x[goodp] > oneKnot) & (x[goodp] < knots[ind+1])) > 0 )) if pointsTest == True: keepKnots[ind] = True if plotEachStep == True: plt.plot(knots[keepKnots],np.ones_like(knots[keepKnots]) * np.median(y[goodp]),'o',label='knots to keep') plt.show() knots = knots[keepKnots] spl = LSQUnivariateSpline(x[goodp], y[goodp], knots, k=polyord) else: raise inst ymod = spl(x) else: coeff = np.polyfit(x[goodp],y[goodp],polyord) yPoly = np.poly1d(coeff) ymod = yPoly(x) resid = np.abs(ymod - y) madev = np.nanmedian(resid) if madev > 0: ## replacing the old line to avoid runtime errors ## goodp = (np.abs(resid) < (sigreject * madev)) goodp = np.zeros_like(resid,dtype=bool) goodp[finitep] = (np.abs(resid[finitep]) < (sigreject * madev)) if plotEachStep == True: plt.plot(x,y,'o') plt.plot(x[goodp],y[goodp],'o') plt.plot(x,ymod) plt.show() if useSpline == True: return spl else: return coeff
[docs] def flatten(x,y,flatteningMethod='filter',polyOrd=2, highPassFreq=0.01,normalize=True, lowPassFreq=None): """ Flatten a time series/array """ if flatteningMethod == 'polynomial': polyFit = robust_poly(x,y,polyOrd=polyOrd) yFlat = y / np.polyval(polyFit,x) if normalize == False: yFlat = yFlat + np.median(y) elif flatteningMethod == 'filter': if lowPassFreq == None: sos = signal.butter(5,highPassFreq, 'highpass',analog=False, output='sos') else: sos = signal.butter(5,[highPassFreq,lowPassFreq], 'bandpass',analog=False, output='sos') yFlat = signal.sosfiltfilt(sos, y) if normalize == False: yFlat = yFlat + np.median(y) return yFlat
[docs] def roll_pad(y,pixShift,pad_value=np.nan,order=1): """ Similar as numpy roll (shifting) but make sure the wrap-arounds are NaN Converts to floating point since the Nans are weird for integer arrays If pixShift is an integer, it does whole-pixel shifts Otherwise, it will do linear interpolation to do the subpixel shift """ if type(pixShift) is int: rolled = np.array(np.roll(y,pixShift),dtype=np.float) if pixShift > 0: rolled[0:pixShift] = pad_value elif pixShift < 0: rolled[pixShift:] = pad_value return rolled else: rolled = ndimage.interpolation.shift(np.array(y,dtype=np.float),pixShift, mode='constant',cval=pad_value, order=order) return rolled
# numPix = len(y) # indArr = np.arange(numPix) # intpixShift = np.int(np.floor(pixShift)) # rolled = np.zeros_like(y,dtype=np.float) * np.nan # fInterp = interp1d(indArr,y) # # newFracIndArr = indArr - pixShift # pdb.set_trace() # if pixShift == 0.0: # rolled = y # elif (intpixShift < 0) & (intpixShift >= -(numPix)): # rolled[abs(intpixShift)-1:numPix] = fInterp(newFracIndArr[abs(intpixShift)-1:numPix]) # elif (intpixShift >= 0) & (intpixShift < numPix): # rolled[0:numPix-intpixShift-1] = fInterp(newFracIndArr[0:numPix-intpixShift-1]) # # return rolled
[docs] def test_roll(length): """ Test the rolling function """ tmp = [1,2,1,7,1] plt.plot(tmp) plt.plot(roll_pad(tmp,length)) plt.show()
def subpixel_peak(x,y): polyFit = robust_poly(x,y,polyord=2) if len(polyFit) != 3: ## Peak fitting failed xPeak, yPeak, yModel = np.nan, np.nan, np.nan else: yModel = np.polyval(polyFit,x) xPeak = -polyFit[1]/(2. * polyFit[0]) yPeak = polyFit[2] - polyFit[1]**2 / (4. * polyFit[0]) return xPeak, yPeak, yModel
[docs] def crosscor_offset(x,y1,y2,Noffset=150,diagnostics=False, flatteningMethod='filter', highPassFreq=0.01,lowPassFreq=None, subPixel=False): """ Cross correlate two arrays to find the offset First, a filter is applied to each array to remove low frequency stuff Parameters ----------- x: numpy array Array for the x values y1: numpy array The first signal assumed to be the reference y2: numpy array The second signal where the shift is desired to the reference Noffset: int number of offset points to explore diagnostics: bool Show diagnostic plots? flatteningMethod: str What kind of flattening method should be used on the arrays? 'filter' will apply a filter 'polynomial' will divide by a polynomial highpassFreq: float The frequency (on a scale from Nyquist to 1) to pass information subPixel: bool Fit the cross-correlation at the subpixel level? """ Npts = len(x) offsetIndices = (np.arange(2 * Noffset + 1) - Noffset) offsets = np.median(np.diff(x)) * offsetIndices y1Norm = y1 / np.median(y1) y2Norm = y2 / np.median(y2) if diagnostics == True: fig, ax = plt.subplots() ax.plot(y1Norm) tmp = y2Norm[Noffset:Npts-Noffset] xTmp = np.arange(len(tmp)) ax.plot(xTmp + Noffset,tmp) plt.show() y1Flat = flatten(x,y1Norm,flatteningMethod=flatteningMethod,highPassFreq=highPassFreq, lowPassFreq=lowPassFreq) y2Flat = flatten(x,y2Norm,flatteningMethod=flatteningMethod,highPassFreq=highPassFreq, lowPassFreq=lowPassFreq) if diagnostics == True: plt.plot(y1Flat,label='reference') plt.plot(y2Flat,label='input') plt.legend() plt.show() corr = signal.correlate(y1Flat, y2Flat[Noffset:Npts-Noffset], mode='valid') if subPixel == True: indOffset, yPeak, yModel = subpixel_peak(offsetIndices,corr) if indOffset < np.min(offsetIndices): xOffset = np.min(offsets) elif indOffset > np.max(offsetIndices): xOffset = np.max(offsets) else: fInterp = interp1d(offsetIndices,offsets) xOffset = np.float(fInterp(indOffset)) else: peakArg = np.argmax(corr) yPeak = corr[peakArg] xOffset = offsets[peakArg] indOffset = offsetIndices[peakArg] if diagnostics == True: plt.plot(offsetIndices,corr,label='Cross-cor') if subPixel == True: plt.plot(offsetIndices,yModel,label='Parabola Fit') plt.plot(xOffset,yPeak,'o',color='red',label='Peak') plt.legend() plt.show() if diagnostics == True: print("Shift = {}, or index {}".format(xOffset,indOffset)) plt.plot(y1Flat,label='reference') plt.plot(roll_pad(y2Flat,indOffset),label='shifted input') plt.legend() plt.show() return xOffset, indOffset
def test_crosscor_offsets(): x = np.arange(1000) y1 = np.random.randn(1000) + 1. y2 = y1 + np.random.randn(1000) * 0.1 plt.plot(x,y1) plt.plot(x,y2) plt.show() offset, offsetInd = crosscor_offset(x,y1,y2,diagnostics=True) def get_baseDir(): if 'TSHIRT_DATA' in os.environ: baseDir = os.environ['TSHIRT_DATA'] else: baseDir = os.path.join(os.environ['HOME'],'tshirt_data') if os.path.exists(baseDir) == False: os.mkdir(baseDir) return baseDir