from __future__ import division, absolute_import, print_function import sys import gzip import os import threading from tempfile import mkstemp, mktemp, NamedTemporaryFile import time import warnings import gc from io import BytesIO from datetime import datetime from numpy.testing.utils import WarningManager import numpy as np import numpy.ma as ma from numpy.lib._iotools import ConverterError, ConverterLockError, \ ConversionWarning from numpy.compat import asbytes, asbytes_nested, bytes, asstr from nose import SkipTest from numpy.ma.testutils import (TestCase, assert_equal, assert_array_equal, assert_raises, run_module_suite) from numpy.testing import assert_warns, assert_, build_err_msg class TextIO(BytesIO): """Helper IO class. Writes encode strings to bytes if needed, reads return bytes. This makes it easier to emulate files opened in binary mode without needing to explicitly convert strings to bytes in setting up the test data. """ def __init__(self, s=""): BytesIO.__init__(self, asbytes(s)) def write(self, s): BytesIO.write(self, asbytes(s)) def writelines(self, lines): BytesIO.writelines(self, [asbytes(s) for s in lines]) MAJVER, MINVER = sys.version_info[:2] IS_64BIT = sys.maxsize > 2**32 def strptime(s, fmt=None): """This function is available in the datetime module only from Python >= 2.5. """ if sys.version_info[0] >= 3: return datetime(*time.strptime(s.decode('latin1'), fmt)[:3]) else: return datetime(*time.strptime(s, fmt)[:3]) class RoundtripTest(object): def roundtrip(self, save_func, *args, **kwargs): """ save_func : callable Function used to save arrays to file. file_on_disk : bool If true, store the file on disk, instead of in a string buffer. save_kwds : dict Parameters passed to `save_func`. load_kwds : dict Parameters passed to `numpy.load`. args : tuple of arrays Arrays stored to file. """ save_kwds = kwargs.get('save_kwds', {}) load_kwds = kwargs.get('load_kwds', {}) file_on_disk = kwargs.get('file_on_disk', False) if file_on_disk: # Do not delete the file on windows, because we can't # reopen an already opened file on that platform, so we # need to close the file and reopen it, implying no # automatic deletion. if sys.platform == 'win32' and MAJVER >= 2 and MINVER >= 6: target_file = NamedTemporaryFile(delete=False) else: target_file = NamedTemporaryFile() load_file = target_file.name else: target_file = BytesIO() load_file = target_file arr = args save_func(target_file, *arr, **save_kwds) target_file.flush() target_file.seek(0) if sys.platform == 'win32' and not isinstance(target_file, BytesIO): target_file.close() arr_reloaded = np.load(load_file, **load_kwds) self.arr = arr self.arr_reloaded = arr_reloaded def test_array(self): a = np.array([[1, 2], [3, 4]], float) self.roundtrip(a) a = np.array([[1, 2], [3, 4]], int) self.roundtrip(a) a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.csingle) self.roundtrip(a) a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.cdouble) self.roundtrip(a) def test_1D(self): a = np.array([1, 2, 3, 4], int) self.roundtrip(a) @np.testing.dec.knownfailureif(sys.platform == 'win32', "Fail on Win32") def test_mmap(self): a = np.array([[1, 2.5], [4, 7.3]]) self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) def test_record(self): a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) self.roundtrip(a) class TestSaveLoad(RoundtripTest, TestCase): def roundtrip(self, *args, **kwargs): RoundtripTest.roundtrip(self, np.save, *args, **kwargs) assert_equal(self.arr[0], self.arr_reloaded) class TestSavezLoad(RoundtripTest, TestCase): def roundtrip(self, *args, **kwargs): RoundtripTest.roundtrip(self, np.savez, *args, **kwargs) for n, arr in enumerate(self.arr): assert_equal(arr, self.arr_reloaded['arr_%d' % n]) @np.testing.dec.skipif(not IS_64BIT, "Works only with 64bit systems") @np.testing.dec.slow def test_big_arrays(self): L = (1 << 31) + 100000 tmp = mktemp(suffix='.npz') a = np.empty(L, dtype=np.uint8) np.savez(tmp, a=a) del a npfile = np.load(tmp) a = npfile['a'] npfile.close() os.remove(tmp) def test_multiple_arrays(self): a = np.array([[1, 2], [3, 4]], float) b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) self.roundtrip(a, b) def test_named_arrays(self): a = np.array([[1, 2], [3, 4]], float) b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) c = BytesIO() np.savez(c, file_a=a, file_b=b) c.seek(0) l = np.load(c) assert_equal(a, l['file_a']) assert_equal(b, l['file_b']) def test_savez_filename_clashes(self): # Test that issue #852 is fixed # and savez functions in multithreaded environment def writer(error_list): fd, tmp = mkstemp(suffix='.npz') os.close(fd) try: arr = np.random.randn(500, 500) try: np.savez(tmp, arr=arr) except OSError as err: error_list.append(err) finally: os.remove(tmp) errors = [] threads = [threading.Thread(target=writer, args=(errors,)) for j in range(3)] for t in threads: t.start() for t in threads: t.join() if errors: raise AssertionError(errors) def test_not_closing_opened_fid(self): # Test that issue #2178 is fixed: # verify could seek on 'loaded' file fd, tmp = mkstemp(suffix='.npz') os.close(fd) try: fp = open(tmp, 'wb') np.savez(fp, data='LOVELY LOAD') fp.close() fp = open(tmp, 'rb', 10000) fp.seek(0) assert_(not fp.closed) _ = np.load(fp)['data'] assert_(not fp.closed) # must not get closed by .load(opened fp) fp.seek(0) assert_(not fp.closed) finally: fp.close() os.remove(tmp) def test_closing_fid(self): # Test that issue #1517 (too many opened files) remains closed # It might be a "weak" test since failed to get triggered on # e.g. Debian sid of 2012 Jul 05 but was reported to # trigger the failure on Ubuntu 10.04: # http://projects.scipy.org/numpy/ticket/1517#comment:2 fd, tmp = mkstemp(suffix='.npz') os.close(fd) try: fp = open(tmp, 'wb') np.savez(fp, data='LOVELY LOAD') fp.close() # We need to check if the garbage collector can properly close # numpy npz file returned by np.load when their reference count # goes to zero. Python 3 running in debug mode raises a # ResourceWarning when file closing is left to the garbage # collector, so we catch the warnings. Because ResourceWarning # is unknown in Python < 3.x, we take the easy way out and # catch all warnings. with warnings.catch_warnings(): warnings.simplefilter("ignore") for i in range(1, 1025): try: np.load(tmp)["data"] except Exception as e: msg = "Failed to load data from a file: %s" % e raise AssertionError(msg) finally: os.remove(tmp) def test_closing_zipfile_after_load(self): # Check that zipfile owns file and can close it. # This needs to pass a file name to load for the # test. fd, tmp = mkstemp(suffix='.npz') os.close(fd) np.savez(tmp, lab='place holder') data = np.load(tmp) fp = data.zip.fp data.close() assert_(fp.closed) class TestSaveTxt(TestCase): def test_array(self): a = np.array([[1, 2], [3, 4]], float) fmt = "%.18e" c = BytesIO() np.savetxt(c, a, fmt=fmt) c.seek(0) assert_equal(c.readlines(), [asbytes((fmt + ' ' + fmt + '\n') % (1, 2)), asbytes((fmt + ' ' + fmt + '\n') % (3, 4))]) a = np.array([[1, 2], [3, 4]], int) c = BytesIO() np.savetxt(c, a, fmt='%d') c.seek(0) assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) def test_1D(self): a = np.array([1, 2, 3, 4], int) c = BytesIO() np.savetxt(c, a, fmt='%d') c.seek(0) lines = c.readlines() assert_equal(lines, [b'1\n', b'2\n', b'3\n', b'4\n']) def test_record(self): a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) c = BytesIO() np.savetxt(c, a, fmt='%d') c.seek(0) assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) def test_delimiter(self): a = np.array([[1., 2.], [3., 4.]]) c = BytesIO() np.savetxt(c, a, delimiter=',', fmt='%d') c.seek(0) assert_equal(c.readlines(), [b'1,2\n', b'3,4\n']) def test_format(self): a = np.array([(1, 2), (3, 4)]) c = BytesIO() # Sequence of formats np.savetxt(c, a, fmt=['%02d', '%3.1f']) c.seek(0) assert_equal(c.readlines(), [b'01 2.0\n', b'03 4.0\n']) # A single multiformat string c = BytesIO() np.savetxt(c, a, fmt='%02d : %3.1f') c.seek(0) lines = c.readlines() assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) # Specify delimiter, should be overiden c = BytesIO() np.savetxt(c, a, fmt='%02d : %3.1f', delimiter=',') c.seek(0) lines = c.readlines() assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) def test_header_footer(self): """ Test the functionality of the header and footer keyword argument. """ c = BytesIO() a = np.array([(1, 2), (3, 4)], dtype=np.int) test_header_footer = 'Test header / footer' # Test the header keyword argument np.savetxt(c, a, fmt='%1d', header=test_header_footer) c.seek(0) assert_equal(c.read(), asbytes('# ' + test_header_footer + '\n1 2\n3 4\n')) # Test the footer keyword argument c = BytesIO() np.savetxt(c, a, fmt='%1d', footer=test_header_footer) c.seek(0) assert_equal(c.read(), asbytes('1 2\n3 4\n# ' + test_header_footer + '\n')) # Test the commentstr keyword argument used on the header c = BytesIO() commentstr = '% ' np.savetxt(c, a, fmt='%1d', header=test_header_footer, comments=commentstr) c.seek(0) assert_equal(c.read(), asbytes(commentstr + test_header_footer + '\n' + '1 2\n3 4\n')) # Test the commentstr keyword argument used on the footer c = BytesIO() commentstr = '% ' np.savetxt(c, a, fmt='%1d', footer=test_header_footer, comments=commentstr) c.seek(0) assert_equal(c.read(), asbytes('1 2\n3 4\n' + commentstr + test_header_footer + '\n')) def test_file_roundtrip(self): f, name = mkstemp() os.close(f) try: a = np.array([(1, 2), (3, 4)]) np.savetxt(name, a) b = np.loadtxt(name) assert_array_equal(a, b) finally: os.unlink(name) def test_complex_arrays(self): ncols = 2 nrows = 2 a = np.zeros((ncols, nrows), dtype=np.complex128) re = np.pi im = np.e a[:] = re + 1.0j * im # One format only c = BytesIO() np.savetxt(c, a, fmt=' %+.3e') c.seek(0) lines = c.readlines() _assert_floatstr_lines_equal(lines, [b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n', b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n']) # One format for each real and imaginary part c = BytesIO() np.savetxt(c, a, fmt=' %+.3e' * 2 * ncols) c.seek(0) lines = c.readlines() _assert_floatstr_lines_equal(lines, [b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n', b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n']) # One format for each complex number c = BytesIO() np.savetxt(c, a, fmt=['(%.3e%+.3ej)'] * ncols) c.seek(0) lines = c.readlines() _assert_floatstr_lines_equal(lines, [b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n', b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n']) def _assert_floatstr_lines_equal(actual_lines, expected_lines): """A string comparison function that also works on Windows + Python 2.5. This is necessary because Python 2.5 on Windows inserts an extra 0 in the exponent of the string representation of floating point numbers. Only used in TestSaveTxt.test_complex_arrays, no attempt made to make this more generic. Once Python 2.5 compatibility is dropped, simply use `assert_equal` instead of this function. """ for actual, expected in zip(actual_lines, expected_lines): if actual != expected: expected_win25 = expected.replace("e+00", "e+000") if actual != expected_win25: msg = build_err_msg([actual, expected], '', verbose=True) raise AssertionError(msg) class TestLoadTxt(TestCase): def test_record(self): c = TextIO() c.write('1 2\n3 4') c.seek(0) x = np.loadtxt(c, dtype=[('x', np.int32), ('y', np.int32)]) a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) assert_array_equal(x, a) d = TextIO() d.write('M 64.0 75.0\nF 25.0 60.0') d.seek(0) mydescriptor = {'names': ('gender', 'age', 'weight'), 'formats': ('S1', 'i4', 'f4')} b = np.array([('M', 64.0, 75.0), ('F', 25.0, 60.0)], dtype=mydescriptor) y = np.loadtxt(d, dtype=mydescriptor) assert_array_equal(y, b) def test_array(self): c = TextIO() c.write('1 2\n3 4') c.seek(0) x = np.loadtxt(c, dtype=int) a = np.array([[1, 2], [3, 4]], int) assert_array_equal(x, a) c.seek(0) x = np.loadtxt(c, dtype=float) a = np.array([[1, 2], [3, 4]], float) assert_array_equal(x, a) def test_1D(self): c = TextIO() c.write('1\n2\n3\n4\n') c.seek(0) x = np.loadtxt(c, dtype=int) a = np.array([1, 2, 3, 4], int) assert_array_equal(x, a) c = TextIO() c.write('1,2,3,4\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',') a = np.array([1, 2, 3, 4], int) assert_array_equal(x, a) def test_missing(self): c = TextIO() c.write('1,2,3,,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ converters={3:lambda s: int(s or - 999)}) a = np.array([1, 2, 3, -999, 5], int) assert_array_equal(x, a) def test_converters_with_usecols(self): c = TextIO() c.write('1,2,3,,5\n6,7,8,9,10\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ converters={3:lambda s: int(s or - 999)}, \ usecols=(1, 3,)) a = np.array([[2, -999], [7, 9]], int) assert_array_equal(x, a) def test_comments(self): c = TextIO() c.write('# comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ comments='#') a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) def test_skiprows(self): c = TextIO() c.write('comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ skiprows=1) a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) c = TextIO() c.write('# comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ skiprows=1) a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) def test_usecols(self): a = np.array([[1, 2], [3, 4]], float) c = BytesIO() np.savetxt(c, a) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=(1,)) assert_array_equal(x, a[:, 1]) a = np.array([[1, 2, 3], [3, 4, 5]], float) c = BytesIO() np.savetxt(c, a) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=(1, 2)) assert_array_equal(x, a[:, 1:]) # Testing with arrays instead of tuples. c.seek(0) x = np.loadtxt(c, dtype=float, usecols=np.array([1, 2])) assert_array_equal(x, a[:, 1:]) # Checking with dtypes defined converters. data = '''JOE 70.1 25.3 BOB 60.5 27.9 ''' c = TextIO(data) names = ['stid', 'temp'] dtypes = ['S4', 'f8'] arr = np.loadtxt(c, usecols=(0, 2), dtype=list(zip(names, dtypes))) assert_equal(arr['stid'], [b"JOE", b"BOB"]) assert_equal(arr['temp'], [25.3, 27.9]) def test_fancy_dtype(self): c = TextIO() c.write('1,2,3.0\n4,5,6.0\n') c.seek(0) dt = np.dtype([('x', int), ('y', [('t', int), ('s', float)])]) x = np.loadtxt(c, dtype=dt, delimiter=',') a = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dt) assert_array_equal(x, a) def test_shaped_dtype(self): c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6") dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), ('block', int, (2, 3))]) x = np.loadtxt(c, dtype=dt) a = np.array([('aaaa', 1.0, 8.0, [[1, 2, 3], [4, 5, 6]])], dtype=dt) assert_array_equal(x, a) def test_3d_shaped_dtype(self): c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6 7 8 9 10 11 12") dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), ('block', int, (2, 2, 3))]) x = np.loadtxt(c, dtype=dt) a = np.array([('aaaa', 1.0, 8.0, [[[1, 2, 3], [4, 5, 6]],[[7, 8, 9], [10, 11, 12]]])], dtype=dt) assert_array_equal(x, a) def test_empty_file(self): warn_ctx = WarningManager() warn_ctx.__enter__() try: warnings.filterwarnings("ignore", message="loadtxt: Empty input file:") c = TextIO() x = np.loadtxt(c) assert_equal(x.shape, (0,)) x = np.loadtxt(c, dtype=np.int64) assert_equal(x.shape, (0,)) assert_(x.dtype == np.int64) finally: warn_ctx.__exit__() def test_unused_converter(self): c = TextIO() c.writelines(['1 21\n', '3 42\n']) c.seek(0) data = np.loadtxt(c, usecols=(1,), converters={0: lambda s: int(s, 16)}) assert_array_equal(data, [21, 42]) c.seek(0) data = np.loadtxt(c, usecols=(1,), converters={1: lambda s: int(s, 16)}) assert_array_equal(data, [33, 66]) def test_dtype_with_object(self): "Test using an explicit dtype with an object" from datetime import date import time data = """ 1; 2001-01-01 2; 2002-01-31 """ ndtype = [('idx', int), ('code', np.object)] func = lambda s: strptime(s.strip(), "%Y-%m-%d") converters = {1: func} test = np.loadtxt(TextIO(data), delimiter=";", dtype=ndtype, converters=converters) control = np.array([(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))], dtype=ndtype) assert_equal(test, control) def test_uint64_type(self): tgt = (9223372043271415339, 9223372043271415853) c = TextIO() c.write("%s %s" % tgt) c.seek(0) res = np.loadtxt(c, dtype=np.uint64) assert_equal(res, tgt) def test_int64_type(self): tgt = (-9223372036854775807, 9223372036854775807) c = TextIO() c.write("%s %s" % tgt) c.seek(0) res = np.loadtxt(c, dtype=np.int64) assert_equal(res, tgt) def test_universal_newline(self): f, name = mkstemp() os.write(f, b'1 21\r3 42\r') os.close(f) try: data = np.loadtxt(name) assert_array_equal(data, [[1, 21], [3, 42]]) finally: os.unlink(name) def test_empty_field_after_tab(self): c = TextIO() c.write('1 \t2 \t3\tstart \n4\t5\t6\t \n7\t8\t9.5\t') c.seek(0) dt = { 'names': ('x', 'y', 'z', 'comment'), 'formats': ('= 3: # python 3k is known to fail for '\r' linesep = ('\n', '\r\n') else: linesep = ('\n', '\r\n', '\r') for sep in linesep: data = '0 1 2' + sep + '3 4 5' f, name = mkstemp() # We can't use NamedTemporaryFile on windows, because we cannot # reopen the file. try: os.write(f, asbytes(data)) assert_array_equal(np.genfromtxt(name), wanted) finally: os.close(f) os.unlink(name) def test_gft_using_generator(self): # gft doesn't work with unicode. def count(): for i in range(10): yield asbytes("%d" % i) res = np.genfromtxt(count()) assert_array_equal(res, np.arange(10)) def test_gzip_load(): a = np.random.random((5, 5)) s = BytesIO() f = gzip.GzipFile(fileobj=s, mode="w") np.save(f, a) f.close() s.seek(0) f = gzip.GzipFile(fileobj=s, mode="r") assert_array_equal(np.load(f), a) def test_gzip_loadtxt(): # Thanks to another windows brokeness, we can't use # NamedTemporaryFile: a file created from this function cannot be # reopened by another open call. So we first put the gzipped string # of the test reference array, write it to a securely opened file, # which is then read from by the loadtxt function s = BytesIO() g = gzip.GzipFile(fileobj=s, mode='w') g.write(b'1 2 3\n') g.close() s.seek(0) f, name = mkstemp(suffix='.gz') try: os.write(f, s.read()) s.close() assert_array_equal(np.loadtxt(name), [1, 2, 3]) finally: os.close(f) os.unlink(name) def test_gzip_loadtxt_from_string(): s = BytesIO() f = gzip.GzipFile(fileobj=s, mode="w") f.write(b'1 2 3\n') f.close() s.seek(0) f = gzip.GzipFile(fileobj=s, mode="r") assert_array_equal(np.loadtxt(f), [1, 2, 3]) def test_npzfile_dict(): s = BytesIO() x = np.zeros((3, 3)) y = np.zeros((3, 3)) np.savez(s, x=x, y=y) s.seek(0) z = np.load(s) assert_('x' in z) assert_('y' in z) assert_('x' in z.keys()) assert_('y' in z.keys()) for f, a in z.items(): assert_(f in ['x', 'y']) assert_equal(a.shape, (3, 3)) assert_(len(z.items()) == 2) for f in z: assert_(f in ['x', 'y']) assert_('x' in z.keys()) def test_load_refcount(): # Check that objects returned by np.load are directly freed based on # their refcount, rather than needing the gc to collect them. f = BytesIO() np.savez(f, [1, 2, 3]) f.seek(0) gc.collect() n_before = len(gc.get_objects()) np.load(f) n_after = len(gc.get_objects()) assert_equal(n_before, n_after) if __name__ == "__main__": run_module_suite()