from __future__ import division, absolute_import, print_function import pytest import numpy as np import numpy.ma as ma from numpy.ma.mrecords import MaskedRecords from numpy.ma.testutils import assert_equal from numpy.testing import assert_, assert_raises from numpy.lib.recfunctions import ( drop_fields, rename_fields, get_fieldstructure, recursive_fill_fields, find_duplicates, merge_arrays, append_fields, stack_arrays, join_by, repack_fields, unstructured_to_structured, structured_to_unstructured, apply_along_fields, require_fields, assign_fields_by_name) get_names = np.lib.recfunctions.get_names get_names_flat = np.lib.recfunctions.get_names_flat zip_descr = np.lib.recfunctions.zip_descr class TestRecFunctions(object): # Misc tests def setup(self): x = np.array([1, 2, ]) y = np.array([10, 20, 30]) z = np.array([('A', 1.), ('B', 2.)], dtype=[('A', '|S3'), ('B', float)]) w = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dtype=[('a', int), ('b', [('ba', float), ('bb', int)])]) self.data = (w, x, y, z) def test_zip_descr(self): # Test zip_descr (w, x, y, z) = self.data # Std array test = zip_descr((x, x), flatten=True) assert_equal(test, np.dtype([('', int), ('', int)])) test = zip_descr((x, x), flatten=False) assert_equal(test, np.dtype([('', int), ('', int)])) # Std & flexible-dtype test = zip_descr((x, z), flatten=True) assert_equal(test, np.dtype([('', int), ('A', '|S3'), ('B', float)])) test = zip_descr((x, z), flatten=False) assert_equal(test, np.dtype([('', int), ('', [('A', '|S3'), ('B', float)])])) # Standard & nested dtype test = zip_descr((x, w), flatten=True) assert_equal(test, np.dtype([('', int), ('a', int), ('ba', float), ('bb', int)])) test = zip_descr((x, w), flatten=False) assert_equal(test, np.dtype([('', int), ('', [('a', int), ('b', [('ba', float), ('bb', int)])])])) def test_drop_fields(self): # Test drop_fields a = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dtype=[('a', int), ('b', [('ba', float), ('bb', int)])]) # A basic field test = drop_fields(a, 'a') control = np.array([((2, 3.0),), ((5, 6.0),)], dtype=[('b', [('ba', float), ('bb', int)])]) assert_equal(test, control) # Another basic field (but nesting two fields) test = drop_fields(a, 'b') control = np.array([(1,), (4,)], dtype=[('a', int)]) assert_equal(test, control) # A nested sub-field test = drop_fields(a, ['ba', ]) control = np.array([(1, (3.0,)), (4, (6.0,))], dtype=[('a', int), ('b', [('bb', int)])]) assert_equal(test, control) # All the nested sub-field from a field: zap that field test = drop_fields(a, ['ba', 'bb']) control = np.array([(1,), (4,)], dtype=[('a', int)]) assert_equal(test, control) test = drop_fields(a, ['a', 'b']) assert_(test is None) def test_rename_fields(self): # Test rename fields a = np.array([(1, (2, [3.0, 30.])), (4, (5, [6.0, 60.]))], dtype=[('a', int), ('b', [('ba', float), ('bb', (float, 2))])]) test = rename_fields(a, {'a': 'A', 'bb': 'BB'}) newdtype = [('A', int), ('b', [('ba', float), ('BB', (float, 2))])] control = a.view(newdtype) assert_equal(test.dtype, newdtype) assert_equal(test, control) def test_get_names(self): # Test get_names ndtype = np.dtype([('A', '|S3'), ('B', float)]) test = get_names(ndtype) assert_equal(test, ('A', 'B')) ndtype = np.dtype([('a', int), ('b', [('ba', float), ('bb', int)])]) test = get_names(ndtype) assert_equal(test, ('a', ('b', ('ba', 'bb')))) def test_get_names_flat(self): # Test get_names_flat ndtype = np.dtype([('A', '|S3'), ('B', float)]) test = get_names_flat(ndtype) assert_equal(test, ('A', 'B')) ndtype = np.dtype([('a', int), ('b', [('ba', float), ('bb', int)])]) test = get_names_flat(ndtype) assert_equal(test, ('a', 'b', 'ba', 'bb')) def test_get_fieldstructure(self): # Test get_fieldstructure # No nested fields ndtype = np.dtype([('A', '|S3'), ('B', float)]) test = get_fieldstructure(ndtype) assert_equal(test, {'A': [], 'B': []}) # One 1-nested field ndtype = np.dtype([('A', int), ('B', [('BA', float), ('BB', '|S1')])]) test = get_fieldstructure(ndtype) assert_equal(test, {'A': [], 'B': [], 'BA': ['B', ], 'BB': ['B']}) # One 2-nested fields ndtype = np.dtype([('A', int), ('B', [('BA', int), ('BB', [('BBA', int), ('BBB', int)])])]) test = get_fieldstructure(ndtype) control = {'A': [], 'B': [], 'BA': ['B'], 'BB': ['B'], 'BBA': ['B', 'BB'], 'BBB': ['B', 'BB']} assert_equal(test, control) def test_find_duplicates(self): # Test find_duplicates a = ma.array([(2, (2., 'B')), (1, (2., 'B')), (2, (2., 'B')), (1, (1., 'B')), (2, (2., 'B')), (2, (2., 'C'))], mask=[(0, (0, 0)), (0, (0, 0)), (0, (0, 0)), (0, (0, 0)), (1, (0, 0)), (0, (1, 0))], dtype=[('A', int), ('B', [('BA', float), ('BB', '|S1')])]) test = find_duplicates(a, ignoremask=False, return_index=True) control = [0, 2] assert_equal(sorted(test[-1]), control) assert_equal(test[0], a[test[-1]]) test = find_duplicates(a, key='A', return_index=True) control = [0, 1, 2, 3, 5] assert_equal(sorted(test[-1]), control) assert_equal(test[0], a[test[-1]]) test = find_duplicates(a, key='B', return_index=True) control = [0, 1, 2, 4] assert_equal(sorted(test[-1]), control) assert_equal(test[0], a[test[-1]]) test = find_duplicates(a, key='BA', return_index=True) control = [0, 1, 2, 4] assert_equal(sorted(test[-1]), control) assert_equal(test[0], a[test[-1]]) test = find_duplicates(a, key='BB', return_index=True) control = [0, 1, 2, 3, 4] assert_equal(sorted(test[-1]), control) assert_equal(test[0], a[test[-1]]) def test_find_duplicates_ignoremask(self): # Test the ignoremask option of find_duplicates ndtype = [('a', int)] a = ma.array([1, 1, 1, 2, 2, 3, 3], mask=[0, 0, 1, 0, 0, 0, 1]).view(ndtype) test = find_duplicates(a, ignoremask=True, return_index=True) control = [0, 1, 3, 4] assert_equal(sorted(test[-1]), control) assert_equal(test[0], a[test[-1]]) test = find_duplicates(a, ignoremask=False, return_index=True) control = [0, 1, 2, 3, 4, 6] assert_equal(sorted(test[-1]), control) assert_equal(test[0], a[test[-1]]) def test_repack_fields(self): dt = np.dtype('u1,f4,i8', align=True) a = np.zeros(2, dtype=dt) assert_equal(repack_fields(dt), np.dtype('u1,f4,i8')) assert_equal(repack_fields(a).itemsize, 13) assert_equal(repack_fields(repack_fields(dt), align=True), dt) # make sure type is preserved dt = np.dtype((np.record, dt)) assert_(repack_fields(dt).type is np.record) def test_structured_to_unstructured(self): a = np.zeros(4, dtype=[('a', 'i4'), ('b', 'f4,u2'), ('c', 'f4', 2)]) out = structured_to_unstructured(a) assert_equal(out, np.zeros((4,5), dtype='f8')) b = np.array([(1, 2, 5), (4, 5, 7), (7, 8 ,11), (10, 11, 12)], dtype=[('x', 'i4'), ('y', 'f4'), ('z', 'f8')]) out = np.mean(structured_to_unstructured(b[['x', 'z']]), axis=-1) assert_equal(out, np.array([ 3. , 5.5, 9. , 11. ])) c = np.arange(20).reshape((4,5)) out = unstructured_to_structured(c, a.dtype) want = np.array([( 0, ( 1., 2), [ 3., 4.]), ( 5, ( 6., 7), [ 8., 9.]), (10, (11., 12), [13., 14.]), (15, (16., 17), [18., 19.])], dtype=[('a', '