1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
""" Tablib - DBF Support.
"""
import io
import os
import tempfile
from tablib.packages.dbfpy import dbf, dbfnew
from tablib.packages.dbfpy import record as dbfrecord
class DBFFormat:
title = 'dbf'
extensions = ('csv',)
DEFAULT_ENCODING = 'utf-8'
@classmethod
def export_set(cls, dataset):
"""Returns DBF representation of a Dataset"""
new_dbf = dbfnew.dbf_new()
temp_file, temp_uri = tempfile.mkstemp()
# create the appropriate fields based on the contents of the first row
first_row = dataset[0]
for fieldname, field_value in zip(dataset.headers, first_row):
if type(field_value) in [int, float]:
new_dbf.add_field(fieldname, 'N', 10, 8)
else:
new_dbf.add_field(fieldname, 'C', 80)
new_dbf.write(temp_uri)
dbf_file = dbf.Dbf(temp_uri, readOnly=0)
for row in dataset:
record = dbfrecord.DbfRecord(dbf_file)
for fieldname, field_value in zip(dataset.headers, row):
record[fieldname] = field_value
record.store()
dbf_file.close()
dbf_stream = open(temp_uri, 'rb')
stream = io.BytesIO(dbf_stream.read())
dbf_stream.close()
os.close(temp_file)
os.remove(temp_uri)
return stream.getvalue()
@classmethod
def import_set(cls, dset, in_stream, headers=True):
"""Returns a dataset from a DBF stream."""
dset.wipe()
_dbf = dbf.Dbf(in_stream)
dset.headers = _dbf.fieldNames
for record in range(_dbf.recordCount):
row = [_dbf[record][f] for f in _dbf.fieldNames]
dset.append(row)
@classmethod
def detect(cls, stream):
"""Returns True if the given stream is valid DBF"""
try:
_dbf = dbf.Dbf(stream, readOnly=True)
return True
except Exception:
return False
|