import glob
import warnings
try:
from ccdproc import Combiner, CCDData, ccd_process, gain_correct
import ccdproc
except ImportError as err1:
print("Could not import ccdproc, so some CCD-like processing may not work.")
import yaml
import os
from astropy.io import fits
import astropy.units as u
import numpy as np
import pdb
import warnings
from scipy.interpolate import interp1d
import tqdm
defaultParamFile = 'parameters/reduction_parameters/example_reduction_parameters.yaml'
[docs]
class prep():
""" Class for reducing images
Parameters are located in parameters/reduction_parameters
"""
def __init__(self,paramFile=defaultParamFile,testMode=False,rawIndex=0):
with open(paramFile) as paramFileIO:
self.pipePrefs = yaml.safe_load(paramFileIO)
defaultParams = {'biasFiles': 'zero*.fits', ## Bias files
'flatFiles': 'flat*.fits', ## Flat field files
'sciFiles': 'k1255*.fits', ## Science files
'nSkip': 2, ## number of files to skip at the beginning
'doBias': True, ## Calculate and apply a bias correction?
'doFlat': True, ## Calculate and apply a flat field correction
'doBadPxMask': False, ## Appply a bad pixel mask?
'doFlatBias': False, ## use a specific bias or dark for the flat field
'darksForFlat': None, ## dark frames for master flat frame
'gainKeyword': 'GAIN1', ## Calculate and apply a flat correction?
'gainValue': None,## manually specify the gain, if not in header
'procName': 'proc', ## directory name for processed files
'doNonLin': False, ## apply nonlinearity correction?
'nonLinFunction': None, ## non-linearity function
'sciExtension': None, ## extension for science data
'sciExcludeList': None, ## list of files to exclude for science data
'fixWindow': False, ## fix the window between bias, flat & science?
'fixPix': False, ## fix the bad pixels with interpolation?
'combinerFunc': 'average', ## how to combine calibration files?
'normFlat': False ## normalize the flat field images by the median? Useful for sky flats
}
for oneKey in defaultParams.keys():
if oneKey not in self.pipePrefs:
self.pipePrefs[oneKey] = defaultParams[oneKey]
rawDirInput = self.pipePrefs['rawDir']
if type(rawDirInput) == list:
## If the parameter file is list, choose one of those indices
self.rawDir = rawDirInput[rawIndex]
self.nRawDirs = len(rawDirInput)
elif type(rawDirInput) == str:
self.rawDir = rawDirInput
self.nRawDirs = 1
else:
print('Unrecognized input file directory/list')
self.testMode = testMode
if testMode == True:
self.procDir = os.path.join(self.rawDir,'test_proc')
else:
self.procDir = os.path.join(self.rawDir,self.pipePrefs['procName'])
[docs]
def makeMasterCals(self):
""" Makes the master calibration files """
allCals = []
if self.pipePrefs['doBias'] == True:
allCals.append('biasFiles')
if self.pipePrefs['doFlat'] == True:
allCals.append('flatFiles')
if self.pipePrefs['doFlatBias'] == True:
allCals.append('darksForFlat')
for oneCal in allCals:
fileSearchInfo = self.pipePrefs[oneCal]
fileL = self.get_fileL(fileSearchInfo)
if self.testMode == True:
fileL = fileL[0:4]
if (oneCal == 'doFlat') & (self.pipePrefs['normFlat'] == True):
normalize = True
else:
normalize = False
ccdList = []
for oneFile in fileL:
head, dataCCD = self.getData(oneFile,normalize=normalize)
ccdList.append(dataCCD)
combiner = Combiner(ccdList)
combiner.sigma_clipping(low_thresh=2, high_thresh=5, func=np.ma.median)
combinerFunc = self.pipePrefs['combinerFunc']
if combinerFunc == 'average':
combined_avg = combiner.average_combine()
elif combinerFunc == 'median':
combined_avg = combiner.median_combine()
else:
raise Exception("Unrecognized combiner function {}".format(self.combiner_func))
comb_gained = gain_correct(combined_avg,self.get_gain(head) * u.electron/u.adu)
hdu = fits.PrimaryHDU(comb_gained,head)
HDUList = fits.HDUList([hdu])
HDUList[0].header['IMGAVG'] = ('T','Image is averaged')
HDUList[0].header['GAINCOR'] = ('T','Gain correction applied (units are e)')
HDUList[0].data = combined_avg
if os.path.exists(self.procDir) == False:
os.mkdir(self.procDir)
if oneCal == 'biasFiles':
outName = 'zero'
elif oneCal == 'darksForFlat':
outName = 'dark_for_flat'
else:
outName = 'flat'
HDUList.writeto(os.path.join(self.procDir,'master_'+outName+'.fits'),overwrite=True)
def get_gain(self,head):
if self.pipePrefs['gainKeyword'] is not None:
gain = float(head[self.pipePrefs['gainKeyword']])
elif self.pipePrefs['gainValue'] is not None:
gain = float(self.pipePrefs['gainValue'])
else:
gain = 1.0
return gain
[docs]
def get_fileL(self,fileSearchInfo,searchType='generic'):
"""
Search for a list of files
Tests out if the user put in a string w/ wildcard or a list of files
"""
if type(fileSearchInfo) == list:
fileL = []
for oneFile in fileSearchInfo:
fileL.append(os.path.join(self.rawDir,oneFile))
else:
fileL = np.sort(glob.glob(os.path.join(self.rawDir,fileSearchInfo)))
if (self.pipePrefs['sciExcludeList'] is not None) & (searchType == 'science'):
outList = []
for oneFile in fileL:
if os.path.basename(oneFile) not in self.pipePrefs['sciExcludeList']:
outList.append(oneFile)
else:
outList = fileL
return outList
[docs]
def procSciFiles(self):
""" Process the science images """
fileL = self.get_fileL(self.pipePrefs['sciFiles'],searchType='science')
if self.testMode == True:
fileL = fileL[0:4]
if self.pipePrefs['doBias'] == True:
hbias, bias = self.getData(os.path.join(self.procDir,'master_zero.fits'))
else:
hbias, bias = None, None
if self.pipePrefs['doFlat'] == True:
hflat, flat = self.getData(os.path.join(self.procDir,'master_flat.fits'))
if self.pipePrefs['doFlatBias'] == True:
hFlatDark, flatDark = self.getData(os.path.join(self.procDir,'master_dark_for_flat.fits'))
flat = ccdproc.subtract_bias(flat,flatDark)
else:
hflat, flat = None, None
if self.pipePrefs['doBadPxMask'] == True:
badPx = fits.getdata(os.path.join(self.procDir,'master_badpx_mask.fits'))
else:
hbadPx, badPx = None, None
for ind in tqdm.tqdm(np.arange(len(fileL))):
oneFile = fileL[ind]
head, dataCCD = self.getData(oneFile)
sciHead = fits.getheader(oneFile,ext=self.pipePrefs['sciExtension'])
if ('CCDSEC' in sciHead) & (self.pipePrefs['fixWindow'] == True):
yFix = np.array(sciHead['CCDSEC'].split(',')[1].split(']')[0].split(':'),dtype=np.int) - 1
if flat is not None:
useFlat = flat[yFix[0]:yFix[1]+1]
if ind == 0:
flatSave = fits.PrimaryHDU(useFlat)
flatSave.writeto('diagnostics/trimmed_flat/trimmed_flat.fits',overwrite=True)
if bias is not None:
useBias = bias[yFix[0]:yFix[1]+1]#ccdproc.trim_image(bias,sciHead['CCDSEC'])
else:
useFlat = flat
useBias = bias
nccd = ccd_process(dataCCD,gain=self.get_gain(head) * u.electron/u.adu,
master_flat=useFlat,
master_bias=useBias,
bad_pixel_mask=badPx)
if nccd.mask is not None:
nccd.data[nccd.mask] = np.nan
if self.pipePrefs['fixPix'] == True:
nccd.data = self.fix_pix_line(nccd.data)
head['ZEROFILE'] = 'master_zero.fits'
head['FLATFILE'] = 'master_flat.fits'
head['BADPXFIL'] = 'master_badpx_mask.fits'
head['GAINCOR'] = ('T','Gain correction applied (units are e)')
head['BUNIT'] = ('electron','Physical unit of array values')
hdu = fits.PrimaryHDU(data=nccd,header=head)
HDUList = fits.HDUList([hdu])
newFile = os.path.basename(oneFile)
HDUList.writeto(os.path.join(self.procDir,newFile),overwrite=True)
def fix_pix(self,x,y):
goodP = np.isfinite(y)
if np.sum(goodP) > 5:
y_model = interp1d(x[goodP],y[goodP],fill_value="extrapolate")
y_out = y_model(x)
else:
y_out = y
return y_out
[docs]
def fix_pix_line(self,img,direction='row'):
"""
Fix pixels
"""
if direction == 'row':
nY = img.shape[0]
x = np.arange(img.shape[1])
for row in np.arange(nY):
img[row,:] = self.fix_pix(x,img[row,:])
else:
raise NotImplementedError
return img
[docs]
def check_if_nonlin_needed(self,head):
"""
Check if non-linearity correction should be applied
"""
## Is the nonlinearity correction specified?
if self.pipePrefs['doNonLin'] == True:
if 'LINCOR' in head:
if head['LINCOR'] == True:
return False
else:
return True ## run if lincor hasn't been applied
else:
return True ## run if LINCOR not in header
else:
return False
[docs]
def getData(self,fileName,normalize=False):
""" Gets the data and converts to CCDData type
Parameters
----------
normalize: bool
Normalize by the median? Useful for sky flats
"""
HDUList = fits.open(fileName)
if self.pipePrefs['sciExtension'] is None:
sciExtension = 0
else:
if ('master_flat' in fileName) | ('master_zero' in fileName):
sciExtension = 0
else:
sciExtension = self.pipePrefs['sciExtension']
if sciExtension >= len(HDUList):
print('No extension {} for {}. Trying 0'.format(sciExtension,fileName))
sciExtension = 1
data = HDUList[sciExtension].data
head = HDUList[0].header
HDUList.close()
if normalize == True:
data = data / np.nanmedian(data)
outUnit = u.dimensionless_unscaled
elif 'GAINCOR' in head:
if head['GAINCOR'] == 'T':
outUnit = u.electron
else:
outUnit = u.adu
else:
outUnit = u.adu
if self.check_if_nonlin_needed(head) == True:
if 'LBT LUCI' in self.pipePrefs['nonLinFunction']:
if 'NDIT' not in head:
print("No NDIT found for {} so no non-linearity correction applied".format(fileName))
data = data
else:
if self.pipePrefs['nonLinFunction'] == 'LBT LUCI2':
data = lbt_luci2_lincor(data,dataUnit=outUnit,ndit=head['NDIT'])
elif self.pipePrefs['nonLinFunction'] == 'LBT LUCI2 OLD':
data = lbt_luci2_lincor(data,dataUnit=outUnit,ndit=head['NDIT'],k2=4.155e-6)
else:
raise Exception("Unrecognized non-linearity function {}".format(self.pipePrefs['nonLinFunction']))
else:
raise Exception("Unrecognized non-linearity function {}".format(self.pipePrefs['nonLinFunction']))
head['LINCOR'] = (True, "Is a non-linearity correction applied?")
head['LINCFUNC'] = (self.pipePrefs['nonLinFunction'], "Name of non-linearity function applied")
else:
head['LINCOR'] = (False, "Is a non-linearity correction applied?")
try:
outData = CCDData(data,unit=outUnit)
except TypeError as err1:
pdb.set_trace()
return head, outData
[docs]
def lbt_luci2_lincor(img,dataUnit=u.adu,ndit=1.0,k2=2.767e-6):
"""
LUCI2 linearity correction from
https://sites.google.com/a/lbto.org/luci/observing/calibrations/calibration-details
Input image should be in ADU
Parameters
----------
img: numpy array
An input image to do linearity correction on
dataUnit: astropy unit
Unit of the image, such as :code:`astropy.units.adu`
ndit: float
The number of the detection integration time in LUCI2's readout system
This has to be updated for the science observations in question
k2: float
Quadratic coefficient in non-linearity correction
Returns
-------
ADUlin * ndit: numpy array
A new image that has been linearity-corrected
"""
if dataUnit == u.adu:
imgUse = img
else:
raise Exception("Unit {} not the right unit for this nonlin correction".format(datUnit))
imgUse = imgUse / ndit
ADUlin=imgUse + k2 * (imgUse)**2
return ADUlin * ndit
[docs]
def reduce_all(testMode=False):
""" Reduce all files listed in reduction parameters """
pipeObj = prep()
for rawIndex in range(pipeObj.nRawDirs):
pipeObj = prep(rawIndex=rawIndex,testMode=testMode)
print("Making Master Cals")
pipeObj.makeMasterCals()
print("Processing Science Files")
pipeObj.procSciFiles()
if __name__ == "__main__":
reduce_all()