#! /usr/bin/env python
#
# Copyright 2012-2013 Ralf Kotulla
# kotulla@uwm.edu
#
# This file is part of the ODI QuickReduce pipeline package.
#
# If you find this program or parts thereof please make sure to
# cite it appropriately (please contact the author for the most
# up-to-date reference to use). Also if you find any problems
# or have suggestiosn on how to improve the code or its
# functionality please let me know. Comments and questions are
# always welcome.
#
# The code is made publicly available. Feel free to share the link
# with whoever might be interested. However, I do ask you to not
# publish additional copies on your own website or other sources.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
"""
This module contains all functionality for combining a number of input frames
into one output frame by performing a user-sdefined function on the range of
input values of each pixel.
To improve performance, the actual image-combination step is executed in
parallel, split by line. Furthermore, each extension is computed in a new
process, enabling better memory management by the operating system. This enables
memory-limited operation with very little memory loss/leakage between
extensions.
Example:
--------
For unbinned pODI (13 OTAs) frames, memory demand is roughly
1 GB + 65MB x numer of frames. Assuming 6 GB of available memory, this would
practically (avoiding memory swapping) to 75 input frames.
Standalone option:
------------------
Combine a set of input images and write output to disk
``podi_imcombine.py (-op=xxx) output.fits file1.fits file2.fits ...``
"""
import sys
import os
import pyfits
import numpy
numpy.seterr(divide='ignore', invalid='ignore')
import scipy
import scipy.stats
import Queue
import threading
import multiprocessing
import ctypes
fix_cpu_count = False
number_cpus = 2
max_cpu_count = -1
try:
number_cpus = multiprocessing.cpu_count()
print "Yippie, found %d CPUs to use in parallel!" % (number_cpus)
if (number_cpus > max_cpu_count and max_cpu_count > 1):
number_cpus = max_cpu_count
print "... but using only %d of them!" % (number_cpus)
except:
pass
#number_cpus = 1
from podi_definitions import *
import bottleneck
verbose = cmdline_arg_isset("-verbose")
[docs]def parallel_compute(queue, shmem_buffer, shmem_results, size_x, size_y, len_filelist, operation):
#queue, shmem_buffer, shmem_results, size_x, size_y, len_filelist = worker_args
buffer = shmem_as_ndarray(shmem_buffer).reshape((size_x, size_y, len_filelist))
result_buffer = shmem_as_ndarray(shmem_results).reshape((size_x, size_y))
while (True):
cmd_quit, line = queue.get()
if (cmd_quit):
queue.task_done()
break
if (operation == "median"):
result_buffer[line,:] = numpy.median(buffer[line,:,:], axis=1)
elif (operation == "medsigclip"):
# Do not use (yet), is slow as hell
# (maskedarrays are pure python, not C as all the rest)
#print buffer[line,:,:].shape
_sigma_plus = numpy.ones(shape=(buffer.shape[1],buffer.shape[2])) * 1e9
_sigma_minus = numpy.ones(shape=(buffer.shape[1],buffer.shape[2])) * 1e9
_median = numpy.median(buffer[line,:,:], axis=1)
nrep = 3
valid_pixels = numpy.ma.MaskedArray(buffer[line,:,:])
for rep in range(nrep):
_median_2d = _median.reshape(_median.shape[0],1).repeat(buffer.shape[2], axis=1)
_min = _median_2d - 3 * _sigma_minus
_max = _median_2d + 3 * _sigma_plus
#valid_pixels = numpy.ma.masked_inside(buffer[line,:,:], _min, _max)
valid = (buffer[line,:,:] > _min) & (buffer[line,:,:] < _max)
valid_pixels = numpy.ma.array(buffer[line,:,:], mask=valid)
#valid_pixels = numpy.ma.MaskedArray(buffer[line,:,:], valid)
#print _min.shape, valid.shape, valid_pixels.shape
#if (numpy.sum(valid, axis=1).any() <= 0):
# break
#_median = numpy.median(buffer[line,:,:][valid], axis=1)
_median = numpy.median(valid_pixels, axis=1)
if (rep < nrep-1):
#_sigma_plus = scipy.stats.scoreatpercentile(buffer[line,:,:][valid], 84) - _median
#_sigma_minus = _median - scipy.stats.scoreatpercentile(buffer[line,:,:][valid], 16)
_sigma_plus = scipy.stats.scoreatpercentile(valid_pixels, 84) - _median
_sigma_minus = _median - scipy.stats.scoreatpercentile(valid_pixels, 16)
result_buffer[line,:] = _median
elif (operation == "medclip"):
intermediate = numpy.sort(buffer[line,:,:], axis=1)
result_buffer[line,:] = numpy.median(intermediate[:,1:-2], axis=1)
elif (operation == "min"):
result_buffer[line,:] = numpy.min(buffer[line,:,:], axis=1)
elif (operation == "max"):
result_buffer[line,:] = numpy.max(buffer[line,:,:], axis=1)
elif (operation == "nanmean"):
result_buffer[line,:] = scipy.stats.nanmean(buffer[line,:,:], axis=1)
elif (operation == "nanmedian"):
result_buffer[line,:] = scipy.stats.nanmedian(buffer[line,:,:], axis=1)
elif (operation == "nanmedian.bn"):
x = numpy.array(buffer[line,:,:], dtype=numpy.float32)
result_buffer[line,:] = bottleneck.nanmedian(x, axis=1)
x = None
del x
elif (operation == "nanmean.bn"):
x = numpy.array(buffer[line,:,:], dtype=numpy.float32)
result_buffer[line,:] = bottleneck.nanmean(x, axis=1)
x = None
del x
else:
result_buffer[line,:] = numpy.mean(buffer[line,:,:], axis=1)
queue.task_done()
buffer = None
shmem_buffer = None
del shmem_buffer
del buffer
sys.exit(0)
return
[docs]def imcombine_data(datas, operation="nanmean"):
# Allocate enough shared memory to load a single OTA from all files. The shared part is
# important to make communication between the main and the slave processes possible.
size_x, size_y = datas[0].shape[0], datas[0].shape[1]
total_pixels = size_x*size_y*len(datas)
# print "total pixel count",total_pixels
shmem_buffer = multiprocessing.RawArray(ctypes.c_float, total_pixels) #size_x*size_y*len(datas))
# Extract the shared memory buffer as numpy array to make things easier
buffer = shmem_as_ndarray(shmem_buffer).reshape((size_x, size_y, len(datas)))
# Set the full buffer to NaN
buffer[:,:,:] = numpy.NaN
# Now open all the other files, look for the right extension, and copy their image data to buffer
for data_id in range(len(datas)):
# stdout_write("copying %d" % (data_id))
buffer[:,:,data_id] = datas[data_id][:,:]
sizes = (size_x, size_y, len(datas))
combined = imcombine_sharedmem_data(shmem_buffer, operation, sizes)
del shmem_buffer
return combined
[docs]def imcombine_sharedmem_data(shmem_buffer, operation, sizes):
size_x, size_y, n_frames = sizes
shmem_results = multiprocessing.RawArray(ctypes.c_float, size_x*size_y)
#
# Set up the parallel processing environment
#
queue = multiprocessing.JoinableQueue()
#result_buffer = numpy.zeros(shape=(buffer.shape[0], buffer.shape[1]), dtype=numpy.float32)
processes = []
for i in range(number_cpus):
worker_args = (queue, shmem_buffer, shmem_results,
size_x, size_y, n_frames, operation)
p = multiprocessing.Process(target=parallel_compute, args=worker_args)
p.start()
processes.append(p)
# Now compute median/average/sum/etc
buffer = shmem_as_ndarray(shmem_buffer).reshape((size_x, size_y, n_frames))
for line in range(buffer.shape[0]):
#print "Adding line",line,"to queue"
queue.put((False,line))
# Tell all workers to shut down when no more data is left to work on
for i in range(number_cpus):
queue.put((True,None))
# Once all command are sent out to the workers, join them to speed things up
try:
queue.join()
except KeyboardInterrupt:
for p in processes:
p.terminate()
sys.exit(-1)
for p in processes:
p.terminate()
results = numpy.copy(shmem_as_ndarray(shmem_results).reshape((size_x, size_y)))
del shmem_results
del queue
del buffer
return results
[docs]def imcombine_subprocess(extension, filelist, shape, operation, queue, verbose):
#
# Allocate enough shared momory to hold all frames
#
size_x, size_y = shape[0], shape[1]
shmem_buffer = multiprocessing.RawArray(ctypes.c_float, size_x*size_y*len(filelist))
# Extract the shared memory buffer as numpy array to make things easier
buffer = shmem_as_ndarray(shmem_buffer).reshape((size_x, size_y, len(filelist)))
# Set the full buffer to NaN
buffer[:,:,:] = numpy.NaN
# Now open all files, look for the right extension, and copy their image data to buffer
for file_number in range(len(filelist)):
filename = filelist[file_number]
hdulist = pyfits.open(filename)
for i in range(1, len(hdulist)):
if (not is_image_extension(hdulist[i])):
continue
fppos = hdulist[i].header['EXTNAME']
if (fppos == extension):
buffer[:,:,file_number] = hdulist[i].data[:,:]
break
hdulist[i].data = None
hdulist.close()
del hdulist
if (verbose): stdout_write("\n Added file %s ..." % (filename))
# stdout_write("\n Starting imcombine for real ...")
combined = imcombine_sharedmem_data(shmem_buffer, operation=operation, sizes=(size_x, size_y, len(filelist)))
# put the imcombine'd data into the queue to return them to the main process
queue.put(combined)
# and kill this process, returning all its memory
sys.exit(0)
[docs]def imcombine(input_filelist, outputfile, operation, return_hdu=False):
# First loop over all filenames and make sure all files exist
filelist = []
for file in input_filelist:
if (os.path.isfile(file)):
filelist.append(file)
if (len(filelist) <= 0):
stdout_write("No existing files found in input list, hence nothing to do!\n")
return
elif (len(filelist) == 1):
# stdout_write("Only 1 file to combine, save the hassle and copy the file!\n")
hdulist = pyfits.open(filelist[0])
if (return_hdu):
return hdulist
hdulist.writeto(outputfile)
return
# Read the input parameters
# Note that file headers are copied from the first file
reference_filename = filelist[0]
ref_hdulist = pyfits.open(reference_filename)
# Create the primary extension of the output file
# Copy all headers from the reference HDU
primhdu = pyfits.PrimaryHDU(header=ref_hdulist[0].header)
# Add PrimaryHDU to list of OTAs that go into the output file
out_hdulist = [primhdu]
#
# Now loop over all extensions and compute the mean
#
for cur_ext in range(1, len(ref_hdulist)):
data_blocks = []
# Check what OTA we are dealing with
if (not is_image_extension(ref_hdulist[cur_ext])):
continue
ref_fppos = ref_hdulist[cur_ext].header['EXTNAME']
stdout_write("\rCombining frames for OTA %s (#% 2d/% 2d) ..." % (ref_fppos, cur_ext, len(ref_hdulist)-1))
#
# Add some weird-looking construct to move the memory allocation and actual
# imcombine into a separate process. This has to do with if/how/when python releases
# memory (or not), causing a massive short-term memory leak.
# With the additional process, the memory is onwed by the other process and the memory
# is freed once we destroy this helper process.
#
return_queue = multiprocessing.JoinableQueue()
worker_args=(ref_fppos, filelist, ref_hdulist[cur_ext].data.shape, operation, return_queue, verbose)
p = multiprocessing.Process(target=imcombine_subprocess, args=worker_args)
p.start()
combined = return_queue.get()
p.terminate()
del p
if (verbose): stdout_write(" done, creating fits extension ...")
# Create new ImageHDU, insert the imcombined's data and copy the
# header from the reference frame
hdu = pyfits.ImageHDU(data=combined, header=ref_hdulist[cur_ext].header)
# Append the new HDU to the list of result HDUs
out_hdulist.append(hdu)
if (verbose): stdout_write(" done\n")
#
# At this point, add a fits table listing all filenames that went into this combination
#
filenames_only = []
for file in filelist:
dirname, filename = os.path.split(file)
filenames_only.append(filename)
out_hdulist[0].header["NCOMBINE"] = (len(filenames_only), "number of combined files")
columns = [
pyfits.Column(name='filenumber', format="I4", array=range(1,len(filenames_only)+1)),
pyfits.Column(name='filename', format="A100", array=filenames_only),
]
coldefs = pyfits.ColDefs(columns)
tablehdu = pyfits.new_table(coldefs, tbtype="BinTableHDU")
tablehdu.header["EXTNAME"] = "FILELIST"
out_hdulist.append(tablehdu)
#
# All work done now, prepare to return the data or write it to disk
#
out_hdu = pyfits.HDUList(out_hdulist)
if (not return_hdu and outputfile != None):
stdout_write(" writing results to file %s ..." % (outputfile))
clobberfile(outputfile)
out_hdu.writeto(outputfile, clobber=True)
out_hdu.close()
del out_hdu
del out_hdulist
stdout_write(" done!\n")
elif (return_hdu):
stdout_write(" returning HDU for further processing ...\n")
return out_hdu
else:
stdout_write(" couldn't write output file, no filename given!\n")
return
if __name__ == "__main__":
outputfile = get_clean_cmdline()[1]
filelist = get_clean_cmdline()[2:]
operation = cmdline_arg_set_or_default("-op", "mean")
imcombine(filelist, outputfile, operation)