1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
import json
import hashlib
import os
import socket
import time
from importlib import import_module
from django.core.cache import caches
from django.core.files.base import ContentFile
from django.utils.encoding import force_str, smart_bytes
from django.utils.functional import SimpleLazyObject
from compressor.conf import settings
from compressor.storage import default_offline_manifest_storage
from compressor.utils import get_mod_func
_cachekey_func = None
def get_hexdigest(plaintext, length=None):
digest = hashlib.sha256(smart_bytes(plaintext)).hexdigest()
if length:
return digest[:length]
return digest
def simple_cachekey(key):
return "django_compressor.%s" % force_str(key)
def socket_cachekey(key):
return "django_compressor.%s.%s" % (socket.gethostname(), force_str(key))
def get_cachekey(*args, **kwargs):
global _cachekey_func
if _cachekey_func is None:
try:
mod_name, func_name = get_mod_func(settings.COMPRESS_CACHE_KEY_FUNCTION)
_cachekey_func = getattr(import_module(mod_name), func_name)
except (AttributeError, ImportError, TypeError) as e:
raise ImportError(
"Couldn't import cache key function %s: %s"
% (settings.COMPRESS_CACHE_KEY_FUNCTION, e)
)
return _cachekey_func(*args, **kwargs)
def get_mtime_cachekey(filename):
return get_cachekey("mtime.%s" % get_hexdigest(filename))
def get_offline_hexdigest(render_template_string):
return get_hexdigest(
# Make the hexdigest determination independent of STATIC_URL
render_template_string.replace(
# Cast ``settings.STATIC_URL`` to a string to allow it to be
# a string-alike object to e.g. add ``SCRIPT_NAME`` WSGI param
# as a *path prefix* to the output URL.
# See https://code.djangoproject.com/ticket/25598.
str(settings.STATIC_URL),
"",
)
)
def get_offline_cachekey(source):
return get_cachekey("offline.%s" % get_offline_hexdigest(source))
_offline_manifest = None
def get_offline_manifest():
global _offline_manifest
if _offline_manifest is None:
filename = settings.COMPRESS_OFFLINE_MANIFEST
if default_offline_manifest_storage.exists(filename):
with default_offline_manifest_storage.open(filename) as fp:
_offline_manifest = json.loads(fp.read().decode("utf8"))
else:
_offline_manifest = {}
return _offline_manifest
def flush_offline_manifest():
global _offline_manifest
_offline_manifest = None
def write_offline_manifest(manifest):
content = json.dumps(manifest, indent=2).encode("utf8")
default_offline_manifest_storage.save(
settings.COMPRESS_OFFLINE_MANIFEST, ContentFile(content)
)
flush_offline_manifest()
def get_templatetag_cachekey(compressor, mode, kind):
return get_cachekey("templatetag.%s.%s.%s" % (compressor.cachekey, mode, kind))
def get_mtime(filename):
if settings.COMPRESS_MTIME_DELAY:
key = get_mtime_cachekey(filename)
mtime = cache.get(key)
if mtime is None:
mtime = os.path.getmtime(filename)
cache.set(key, mtime, settings.COMPRESS_MTIME_DELAY)
return mtime
return os.path.getmtime(filename)
def get_hashed_mtime(filename, length=12):
try:
filename = os.path.realpath(filename)
mtime = str(int(get_mtime(filename)))
except OSError:
return None
return get_hexdigest(mtime, length)
def get_hashed_content(filename, length=12):
try:
filename = os.path.realpath(filename)
except OSError:
return None
# should we make sure that file is utf-8 encoded?
with open(filename, "rb") as file:
return get_hexdigest(file.read(), length)
def get_precompiler_cachekey(command, contents):
return hashlib.sha1(
smart_bytes("precompiler.%s.%s" % (command, contents))
).hexdigest()
def cache_get(key):
packed_val = cache.get(key)
if packed_val is None:
return None
val, refresh_time, refreshed = packed_val
if (time.time() > refresh_time) and not refreshed:
# Store the stale value while the cache
# revalidates for another MINT_DELAY seconds.
cache_set(key, val, refreshed=True, timeout=settings.COMPRESS_MINT_DELAY)
return None
return val
def cache_set(key, val, refreshed=False, timeout=None):
if timeout is None:
timeout = settings.COMPRESS_REBUILD_TIMEOUT
refresh_time = timeout + time.time()
real_timeout = timeout + settings.COMPRESS_MINT_DELAY
packed_val = (val, refresh_time, refreshed)
return cache.set(key, packed_val, real_timeout)
cache = SimpleLazyObject(lambda: caches[settings.COMPRESS_CACHE_BACKEND])
|