import numpy as np import numpy.ma as ma from numpy.ma.testutils import (TestCase, assert_equal, assert_array_equal, assert_raises, run_module_suite) from numpy.testing import assert_warns, assert_ import sys import gzip import os import threading from tempfile import mkstemp, NamedTemporaryFile import time from datetime import datetime from numpy.lib._iotools import ConverterError, ConverterLockError, \ ConversionWarning from numpy.compat import asbytes, asbytes_nested, bytes if sys.version_info[0] >= 3: from io import BytesIO def StringIO(s=""): return BytesIO(asbytes(s)) else: from StringIO import StringIO BytesIO = StringIO MAJVER, MINVER = sys.version_info[:2] def strptime(s, fmt=None): """This function is available in the datetime module only from Python >= 2.5. """ if sys.version_info[0] >= 3: return datetime(*time.strptime(s.decode('latin1'), fmt)[:3]) else: return datetime(*time.strptime(s, fmt)[:3]) class RoundtripTest(object): def roundtrip(self, save_func, *args, **kwargs): """ save_func : callable Function used to save arrays to file. file_on_disk : bool If true, store the file on disk, instead of in a string buffer. save_kwds : dict Parameters passed to `save_func`. load_kwds : dict Parameters passed to `numpy.load`. args : tuple of arrays Arrays stored to file. """ save_kwds = kwargs.get('save_kwds', {}) load_kwds = kwargs.get('load_kwds', {}) file_on_disk = kwargs.get('file_on_disk', False) if file_on_disk: # Do not delete the file on windows, because we can't # reopen an already opened file on that platform, so we # need to close the file and reopen it, implying no # automatic deletion. if sys.platform == 'win32' and MAJVER >= 2 and MINVER >= 6: target_file = NamedTemporaryFile(delete=False) else: target_file = NamedTemporaryFile() load_file = target_file.name else: target_file = StringIO() load_file = target_file arr = args save_func(target_file, *arr, **save_kwds) target_file.flush() target_file.seek(0) if sys.platform == 'win32' and not isinstance(target_file, BytesIO): target_file.close() arr_reloaded = np.load(load_file, **load_kwds) self.arr = arr self.arr_reloaded = arr_reloaded def test_array(self): a = np.array([[1, 2], [3, 4]], float) self.roundtrip(a) a = np.array([[1, 2], [3, 4]], int) self.roundtrip(a) a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.csingle) self.roundtrip(a) a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.cdouble) self.roundtrip(a) def test_1D(self): a = np.array([1, 2, 3, 4], int) self.roundtrip(a) @np.testing.dec.knownfailureif(sys.platform == 'win32', "Fail on Win32") def test_mmap(self): a = np.array([[1, 2.5], [4, 7.3]]) self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) def test_record(self): a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) self.roundtrip(a) class TestSaveLoad(RoundtripTest, TestCase): def roundtrip(self, *args, **kwargs): RoundtripTest.roundtrip(self, np.save, *args, **kwargs) assert_equal(self.arr[0], self.arr_reloaded) class TestSavezLoad(RoundtripTest, TestCase): def roundtrip(self, *args, **kwargs): RoundtripTest.roundtrip(self, np.savez, *args, **kwargs) for n, arr in enumerate(self.arr): assert_equal(arr, self.arr_reloaded['arr_%d' % n]) def test_multiple_arrays(self): a = np.array([[1, 2], [3, 4]], float) b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) self.roundtrip(a, b) def test_named_arrays(self): a = np.array([[1, 2], [3, 4]], float) b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) c = StringIO() np.savez(c, file_a=a, file_b=b) c.seek(0) l = np.load(c) assert_equal(a, l['file_a']) assert_equal(b, l['file_b']) def test_savez_filename_clashes(self): # Test that issue #852 is fixed # and savez functions in multithreaded environment def writer(error_list): fd, tmp = mkstemp(suffix='.npz') os.close(fd) try: arr = np.random.randn(500, 500) try: np.savez(tmp, arr=arr) except OSError, err: error_list.append(err) finally: os.remove(tmp) errors = [] threads = [threading.Thread(target=writer, args=(errors,)) for j in xrange(3)] for t in threads: t.start() for t in threads: t.join() if errors: raise AssertionError(errors) class TestSaveTxt(TestCase): def test_array(self): a = np.array([[1, 2], [3, 4]], float) fmt = "%.18e" c = StringIO() np.savetxt(c, a, fmt=fmt) c.seek(0) assert_equal(c.readlines(), asbytes_nested( [(fmt + ' ' + fmt + '\n') % (1, 2), (fmt + ' ' + fmt + '\n') % (3, 4)])) a = np.array([[1, 2], [3, 4]], int) c = StringIO() np.savetxt(c, a, fmt='%d') c.seek(0) assert_equal(c.readlines(), asbytes_nested(['1 2\n', '3 4\n'])) def test_1D(self): a = np.array([1, 2, 3, 4], int) c = StringIO() np.savetxt(c, a, fmt='%d') c.seek(0) lines = c.readlines() assert_equal(lines, asbytes_nested(['1\n', '2\n', '3\n', '4\n'])) def test_record(self): a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) c = StringIO() np.savetxt(c, a, fmt='%d') c.seek(0) assert_equal(c.readlines(), asbytes_nested(['1 2\n', '3 4\n'])) def test_delimiter(self): a = np.array([[1., 2.], [3., 4.]]) c = StringIO() np.savetxt(c, a, delimiter=asbytes(','), fmt='%d') c.seek(0) assert_equal(c.readlines(), asbytes_nested(['1,2\n', '3,4\n'])) def test_format(self): a = np.array([(1, 2), (3, 4)]) c = StringIO() # Sequence of formats np.savetxt(c, a, fmt=['%02d', '%3.1f']) c.seek(0) assert_equal(c.readlines(), asbytes_nested(['01 2.0\n', '03 4.0\n'])) # A single multiformat string c = StringIO() np.savetxt(c, a, fmt='%02d : %3.1f') c.seek(0) lines = c.readlines() assert_equal(lines, asbytes_nested(['01 : 2.0\n', '03 : 4.0\n'])) # Specify delimiter, should be overiden c = StringIO() np.savetxt(c, a, fmt='%02d : %3.1f', delimiter=',') c.seek(0) lines = c.readlines() assert_equal(lines, asbytes_nested(['01 : 2.0\n', '03 : 4.0\n'])) def test_file_roundtrip(self): f, name = mkstemp() os.close(f) try: a = np.array([(1, 2), (3, 4)]) np.savetxt(name, a) b = np.loadtxt(name) assert_array_equal(a, b) finally: os.unlink(name) class TestLoadTxt(TestCase): def test_record(self): c = StringIO() c.write(asbytes('1 2\n3 4')) c.seek(0) x = np.loadtxt(c, dtype=[('x', np.int32), ('y', np.int32)]) a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) assert_array_equal(x, a) d = StringIO() d.write(asbytes('M 64.0 75.0\nF 25.0 60.0')) d.seek(0) mydescriptor = {'names': ('gender', 'age', 'weight'), 'formats': ('S1', 'i4', 'f4')} b = np.array([('M', 64.0, 75.0), ('F', 25.0, 60.0)], dtype=mydescriptor) y = np.loadtxt(d, dtype=mydescriptor) assert_array_equal(y, b) def test_array(self): c = StringIO() c.write(asbytes('1 2\n3 4')) c.seek(0) x = np.loadtxt(c, dtype=int) a = np.array([[1, 2], [3, 4]], int) assert_array_equal(x, a) c.seek(0) x = np.loadtxt(c, dtype=float) a = np.array([[1, 2], [3, 4]], float) assert_array_equal(x, a) def test_1D(self): c = StringIO() c.write(asbytes('1\n2\n3\n4\n')) c.seek(0) x = np.loadtxt(c, dtype=int) a = np.array([1, 2, 3, 4], int) assert_array_equal(x, a) c = StringIO() c.write(asbytes('1,2,3,4\n')) c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',') a = np.array([1, 2, 3, 4], int) assert_array_equal(x, a) def test_missing(self): c = StringIO() c.write(asbytes('1,2,3,,5\n')) c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ converters={3:lambda s: int(s or - 999)}) a = np.array([1, 2, 3, -999, 5], int) assert_array_equal(x, a) def test_converters_with_usecols(self): c = StringIO() c.write(asbytes('1,2,3,,5\n6,7,8,9,10\n')) c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ converters={3:lambda s: int(s or - 999)}, \ usecols=(1, 3,)) a = np.array([[2, -999], [7, 9]], int) assert_array_equal(x, a) def test_comments(self): c = StringIO() c.write(asbytes('# comment\n1,2,3,5\n')) c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ comments='#') a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) def test_skiprows(self): c = StringIO() c.write(asbytes('comment\n1,2,3,5\n')) c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ skiprows=1) a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) c = StringIO() c.write(asbytes('# comment\n1,2,3,5\n')) c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', \ skiprows=1) a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) def test_usecols(self): a = np.array([[1, 2], [3, 4]], float) c = StringIO() np.savetxt(c, a) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=(1,)) assert_array_equal(x, a[:, 1]) a = np.array([[1, 2, 3], [3, 4, 5]], float) c = StringIO() np.savetxt(c, a) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=(1, 2)) assert_array_equal(x, a[:, 1:]) # Testing with arrays instead of tuples. c.seek(0) x = np.loadtxt(c, dtype=float, usecols=np.array([1, 2])) assert_array_equal(x, a[:, 1:]) # Checking with dtypes defined converters. data = '''JOE 70.1 25.3 BOB 60.5 27.9 ''' c = StringIO(data) names = ['stid', 'temp'] dtypes = ['S4', 'f8'] arr = np.loadtxt(c, usecols=(0, 2), dtype=zip(names, dtypes)) assert_equal(arr['stid'], asbytes_nested(["JOE", "BOB"])) assert_equal(arr['temp'], [25.3, 27.9]) def test_fancy_dtype(self): c = StringIO() c.write(asbytes('1,2,3.0\n4,5,6.0\n')) c.seek(0) dt = np.dtype([('x', int), ('y', [('t', int), ('s', float)])]) x = np.loadtxt(c, dtype=dt, delimiter=',') a = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dt) assert_array_equal(x, a) def test_shaped_dtype(self): c = StringIO("aaaa 1.0 8.0 1 2 3 4 5 6") dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), ('block', int, (2, 3))]) x = np.loadtxt(c, dtype=dt) a = np.array([('aaaa', 1.0, 8.0, [[1, 2, 3], [4, 5, 6]])], dtype=dt) assert_array_equal(x, a) def test_3d_shaped_dtype(self): c = StringIO("aaaa 1.0 8.0 1 2 3 4 5 6 7 8 9 10 11 12") dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), ('block', int, (2, 2, 3))]) x = np.loadtxt(c, dtype=dt) a = np.array([('aaaa', 1.0, 8.0, [[[1, 2, 3], [4, 5, 6]],[[7, 8, 9], [10, 11, 12]]])], dtype=dt) assert_array_equal(x, a) def test_empty_file(self): c = StringIO() x = np.loadtxt(c) assert_equal(x.shape, (0,)) x = np.loadtxt(c, dtype=np.int64) assert_equal(x.shape, (0,)) assert_(x.dtype == np.int64) def test_unused_converter(self): c = StringIO() c.writelines([asbytes('1 21\n'), asbytes('3 42\n')]) c.seek(0) data = np.loadtxt(c, usecols=(1,), converters={0: lambda s: int(s, 16)}) assert_array_equal(data, [21, 42]) c.seek(0) data = np.loadtxt(c, usecols=(1,), converters={1: lambda s: int(s, 16)}) assert_array_equal(data, [33, 66]) def test_dtype_with_object(self): "Test using an explicit dtype with an object" from datetime import date import time data = """ 1; 2001-01-01 2; 2002-01-31 """ ndtype = [('idx', int), ('code', np.object)] func = lambda s: strptime(s.strip(), "%Y-%m-%d") converters = {1: func} test = np.loadtxt(StringIO(data), delimiter=";", dtype=ndtype, converters=converters) control = np.array([(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))], dtype=ndtype) assert_equal(test, control) def test_uint64_type(self): tgt = (9223372043271415339, 9223372043271415853) c = StringIO() c.write(asbytes("%s %s" % tgt)) c.seek(0) res = np.loadtxt(c, dtype=np.uint64) assert_equal(res, tgt) def test_int64_type(self): tgt = (-9223372036854775807, 9223372036854775807) c = StringIO() c.write(asbytes("%s %s" % tgt)) c.seek(0) res = np.loadtxt(c, dtype=np.int64) assert_equal(res, tgt) def test_universal_newline(self): f, name = mkstemp() os.write(f, asbytes('1 21\r3 42\r')) os.close(f) try: data = np.loadtxt(name) assert_array_equal(data, [[1, 21], [3, 42]]) finally: os.unlink(name) def test_structure_unpack(self): txt = StringIO(asbytes("M 21 72\nF 35 58")) dt = { 'names': ('a', 'b', 'c'), 'formats': ('|S1', '