Source code for h0rton.tests.test_trainval_data.test_xy_data
import os
import shutil
import unittest
import numpy as np
import pandas as pd
from addict import Dict
from torch.utils.data import DataLoader
from h0rton.trainval_data import XYData
from baobab.configs import BaobabConfig
[docs]class TestXYData(unittest.TestCase):
"""A suite of tests on data preprocessing
"""
@classmethod
[docs] def setUpClass(cls):
cls.Y_cols = ["lens_mass_center_x", "src_light_center_x","lens_mass_center_y", "src_light_center_y", "external_shear_gamma_ext", "external_shear_psi_ext"]
cls.train_Y_mean = np.random.randn(len(cls.Y_cols))
cls.train_Y_std = np.abs(np.random.randn(len(cls.Y_cols))) + 1.0
cls.train_baobab_cfg_path = 'h0rton/tests/test_trainval_data/baobab_train.json'
cls.val_baobab_cfg_path = 'h0rton/tests/test_trainval_data/baobab_val.json'
cls.train_baobab_cfg = BaobabConfig.from_file(cls.train_baobab_cfg_path)
cls.val_baobab_cfg = BaobabConfig.from_file(cls.val_baobab_cfg_path)
cls.original_exptime = 5400.0 # value in baobab_[train/val].json
#####################
# Generate toy data #
#####################
# Training (n_data = 2)
os.makedirs(cls.train_baobab_cfg.out_dir, exist_ok=True)
cls.train_metadata = pd.DataFrame.from_dict({
"lens_mass_center_x": [1.5, 2.0],
"lens_mass_center_y": [1.8, 9.0],
"src_light_center_x": [10.1, 12.5],
"src_light_center_y": [29.2, 18.0],
"external_shear_gamma_ext": [-0.02, 0.02],
"external_shear_psi_ext": [-0.5, 0.5],
"img_filename": ['X_{0:07d}.npy'.format(i) for i in range(2)],
})
cls.train_metadata.to_csv(os.path.join(cls.train_baobab_cfg.out_dir, 'metadata.csv'), index=False)
cls.img_0 = np.abs(np.random.randn(9)*2.0).reshape([1, 3, 3])
cls.img_1 = np.abs(np.random.randn(9)*2.0).reshape([1, 3, 3])
np.save(os.path.join(cls.train_baobab_cfg.out_dir, 'X_{0:07d}.npy'.format(0)), cls.img_0)
np.save(os.path.join(cls.train_baobab_cfg.out_dir, 'X_{0:07d}.npy'.format(1)), cls.img_1)
# Validation (n_data = 3)
os.makedirs(cls.val_baobab_cfg.out_dir, exist_ok=True)
cls.val_metadata = pd.DataFrame.from_dict({
"lens_mass_center_x": np.random.randn(3),
"lens_mass_center_y": np.random.randn(3),
"src_light_center_x": np.random.randn(3),
"src_light_center_y": np.random.randn(3),
"external_shear_gamma_ext": np.random.randn(3),
"external_shear_psi_ext": np.random.randn(3),
"img_filename": ['X_{0:07d}.npy'.format(i) for i in range(3)],
})
cls.img_0_val = np.abs(np.random.randn(9)*2.0).reshape([1, 3, 3])
cls.img_1_val = np.abs(np.random.randn(9)*2.0).reshape([1, 3, 3])
cls.img_2_val = np.abs(np.random.randn(9)*2.0).reshape([1, 3, 3])
np.save(os.path.join(cls.val_baobab_cfg.out_dir, 'X_{0:07d}.npy'.format(0)), cls.img_0_val)
np.save(os.path.join(cls.val_baobab_cfg.out_dir, 'X_{0:07d}.npy'.format(1)), cls.img_1_val)
np.save(os.path.join(cls.val_baobab_cfg.out_dir, 'X_{0:07d}.npy'.format(2)), cls.img_2_val)
cls.val_metadata.to_csv(os.path.join(cls.val_baobab_cfg.out_dir, 'metadata.csv'), index=False)
@classmethod
[docs] def tearDownClass(cls):
"""Remove the toy data
"""
shutil.rmtree(cls.train_baobab_cfg.out_dir)
shutil.rmtree(cls.val_baobab_cfg.out_dir)
[docs] def test_X_identity(self):
"""Test if the input iamge equals the dataset image, when nothing is done to the image at all
"""
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_img, _ = train_data[0]
expected_img = self.img_0
np.testing.assert_array_almost_equal(actual_img, expected_img, err_msg='test_X_identity')
[docs] def test_X_transformation_log(self):
"""Test if the images transform as expected, with log(1+X)
"""
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=True, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_img, _ = train_data[0]
expected_img = self.img_0
expected_img = np.log1p(expected_img)
np.testing.assert_array_almost_equal(actual_img, expected_img, err_msg='test_X_transformation_log')
[docs] def test_X_transformation_rescale(self):
"""Test if the images transform as expected, with whitening
"""
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=True, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_img, _ = train_data[0]
expected_img = self.img_0
expected_img = (expected_img - np.mean(expected_img))/np.std(expected_img, ddof=1)
np.testing.assert_array_almost_equal(actual_img, expected_img, err_msg='test_X_transformation_rescale')
[docs] def test_X_transformation_log_rescale(self):
"""Test if the images transform as expected, with log(1+X) and whitening
"""
# Without exposure time factor
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=True, log_pixels=True, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_img, _ = train_data[0]
expected_img = self.img_0
expected_img = np.log1p(expected_img)
# Note torch std takes into account Bessel correction
expected_img = (expected_img - np.mean(expected_img))/np.std(expected_img, ddof=1)
np.testing.assert_array_almost_equal(actual_img, expected_img, err_msg='test_X_transformation_log_rescale, without exposure time factor')
# With exposure time factor
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=True, log_pixels=True, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_img, _ = train_data[0]
expected_img = self.img_0*2.0
expected_img = np.log1p(expected_img)
# Note torch std takes into account Bessel correction
expected_img = (expected_img - np.mean(expected_img))/np.std(expected_img, ddof=1)
np.testing.assert_array_almost_equal(actual_img, expected_img, err_msg='test_X_transformation_log_rescale, with exposure time factor')
[docs] def test_X_exposure_time_factor(self):
"""Test if the images scale by the new effective exposure time correctly
"""
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_img, _ = train_data[0]
expected_img = self.img_0*2.0
np.testing.assert_array_almost_equal(actual_img, expected_img, err_msg='test_X_exposure_time_factor')
[docs] def test_Y_transformation_(self):
"""Test if the target Y whitens correctly
"""
# Training
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=None, train_Y_std=None, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
_, actual_Y_0 = train_data[0]
_, actual_Y_1 = train_data[1]
actual_Y = np.stack([actual_Y_0, actual_Y_1], axis=0)
Y_df = self.train_metadata[self.Y_cols].copy()
Y_df['src_light_center_x'] -= Y_df['lens_mass_center_x']
Y_df['src_light_center_y'] -= Y_df['lens_mass_center_y']
expected_Y = Y_df.values
before_whitening_Y = Y_df.values
#expected_Y = (expected_Y - self.train_Y_mean.reshape([1, -1]))/self.train_Y_std.reshape([1, -1])
expected_Y[np.argmin(before_whitening_Y, axis=0), np.arange(len(self.Y_cols))] = -1
expected_Y[np.argmax(before_whitening_Y, axis=0), np.arange(len(self.Y_cols))] = 1
np.testing.assert_array_equal(actual_Y_0.shape, [len(self.Y_cols),], err_msg='shape of single example Y for training')
np.testing.assert_array_almost_equal(actual_Y, expected_Y, err_msg='transformed Y for training')
# Validation
val_data = XYData(False, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
_, actual_Y_0 = val_data[0]
_, actual_Y_1 = val_data[1]
_, actual_Y_2 = val_data[2]
actual_Y = np.stack([actual_Y_0, actual_Y_1, actual_Y_2], axis=0)
expected_Y = self.val_metadata[self.Y_cols].copy()
expected_Y['src_light_center_x'] -= expected_Y['lens_mass_center_x']
expected_Y['src_light_center_y'] -= expected_Y['lens_mass_center_y']
expected_Y = expected_Y.values
expected_Y = (expected_Y - self.train_Y_mean.reshape([1, -1]))/self.train_Y_std.reshape([1, -1])
np.testing.assert_array_equal(actual_Y_0.shape, [len(self.Y_cols),], err_msg='shape of single example Y for validation for arbitrary train mean and std')
np.testing.assert_array_almost_equal(actual_Y, expected_Y, err_msg='transformed Y for validation for arbitrary train mean and std')
[docs] def test_train_vs_val(self):
"""Test if the images and metadata are loaded from the correct folder (train/val)
"""
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
val_data = XYData(False, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
np.testing.assert_equal(len(train_data), 2, err_msg='reading from correct folder (train/val)')
np.testing.assert_equal(len(val_data), 3, err_msg='reading from correct folder (train/val)')
[docs] def test_tensor_type(self):
"""Test if X, Y are of the configured data type
"""
# DoubleTensor
train_data = XYData(True, self.Y_cols, 'DoubleTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_X_0, actual_Y_0 = train_data[0]
assert actual_X_0.type() == 'torch.DoubleTensor'
assert actual_Y_0.type() == 'torch.DoubleTensor'
# FloatTensor
train_data = XYData(True, self.Y_cols, 'FloatTensor', define_src_pos_wrt_lens=True, rescale_pixels=False, log_pixels=False, add_pixel_noise=False, eff_exposure_time={'TDLMC_F160W': self.original_exptime*2.0}, train_Y_mean=self.train_Y_mean, train_Y_std=self.train_Y_std, train_baobab_cfg_path=self.train_baobab_cfg_path, val_baobab_cfg_path=self.val_baobab_cfg_path, for_cosmology=False)
actual_X_0, actual_Y_0 = train_data[0]
assert actual_X_0.type() == 'torch.FloatTensor'
assert actual_Y_0.type() == 'torch.FloatTensor'
if __name__ == '__main__':
unittest.main()