1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
import pytest
import redis
from redis import exceptions
from redis.commands.core import Script
from tests.conftest import skip_if_redis_enterprise, skip_if_server_version_lt
multiply_script = """
local value = redis.call('GET', KEYS[1])
value = tonumber(value)
return value * ARGV[1]"""
msgpack_hello_script = """
local message = cmsgpack.unpack(ARGV[1])
local name = message['name']
return "hello " .. name
"""
msgpack_hello_script_broken = """
local message = cmsgpack.unpack(ARGV[1])
local names = message['name']
return "hello " .. name
"""
class TestScript:
"""
We have a few tests to directly test the Script class.
However, most of the behavioral tests are covered by `TestScripting`.
"""
@pytest.fixture()
def script_str(self):
return "fake-script"
@pytest.fixture()
def script_bytes(self):
return b"\xcf\x84o\xcf\x81\xce\xbdo\xcf\x82"
def test_script_text(self, r, script_str, script_bytes):
assert Script(r, script_str).script == "fake-script"
assert Script(r, script_bytes).script == b"\xcf\x84o\xcf\x81\xce\xbdo\xcf\x82"
def test_string_script_sha(self, r, script_str):
script = Script(r, script_str)
assert script.sha == "505e4245f0866b60552741b3cce9a0c3d3b66a87"
def test_bytes_script_sha(self, r, script_bytes):
script = Script(r, script_bytes)
assert script.sha == "1329344e6bf995a35a8dc57ab1a6af8b2d54a763"
def test_encoder(self, r, script_bytes):
encoder = Script(r, script_bytes).get_encoder()
assert encoder is not None
assert encoder.encode("fake-script") == b"fake-script"
class TestScripting:
@pytest.fixture(autouse=True)
def reset_scripts(self, r):
r.script_flush()
def test_eval_multiply(self, r):
r.set("a", 2)
# 2 * 3 == 6
assert r.eval(multiply_script, 1, "a", 3) == 6
@skip_if_server_version_lt("7.0.0")
@skip_if_redis_enterprise()
def test_eval_ro(self, r):
r.set("a", "b")
assert r.eval_ro("return redis.call('GET', KEYS[1])", 1, "a") == b"b"
with pytest.raises(redis.ResponseError):
r.eval_ro("return redis.call('DEL', KEYS[1])", 1, "a")
def test_eval_msgpack(self, r):
msgpack_message_dumped = b"\x81\xa4name\xa3Joe"
# this is msgpack.dumps({"name": "joe"})
assert r.eval(msgpack_hello_script, 0, msgpack_message_dumped) == b"hello Joe"
def test_eval_same_slot(self, r):
"""
In a clustered redis, the script keys must be in the same slot.
This test isn't very interesting for standalone redis, but it doesn't
hurt anything.
"""
r.set("A{foo}", 2)
r.set("B{foo}", 4)
# 2 * 4 == 8
script = """
local value = redis.call('GET', KEYS[1])
local value2 = redis.call('GET', KEYS[2])
return value * value2
"""
result = r.eval(script, 2, "A{foo}", "B{foo}")
assert result == 8
@pytest.mark.onlycluster
def test_eval_crossslot(self, r):
"""
In a clustered redis, the script keys must be in the same slot.
This test should fail, because the two keys we send are in different
slots. This test assumes that {foo} and {bar} will not go to the same
server when used. In a setup with 3 primaries and 3 secondaries, this
assumption holds.
"""
r.set("A{foo}", 2)
r.set("B{bar}", 4)
# 2 * 4 == 8
script = """
local value = redis.call('GET', KEYS[1])
local value2 = redis.call('GET', KEYS[2])
return value * value2
"""
with pytest.raises(exceptions.RedisClusterException):
r.eval(script, 2, "A{foo}", "B{bar}")
@skip_if_server_version_lt("6.2.0")
def test_script_flush_620(self, r):
r.set("a", 2)
r.script_load(multiply_script)
r.script_flush("ASYNC")
r.set("a", 2)
r.script_load(multiply_script)
r.script_flush("SYNC")
r.set("a", 2)
r.script_load(multiply_script)
r.script_flush()
with pytest.raises(exceptions.DataError):
r.set("a", 2)
r.script_load(multiply_script)
r.script_flush("NOTREAL")
def test_script_flush(self, r):
r.set("a", 2)
r.script_load(multiply_script)
r.script_flush(None)
with pytest.raises(exceptions.DataError):
r.set("a", 2)
r.script_load(multiply_script)
r.script_flush("NOTREAL")
def test_evalsha(self, r):
r.set("a", 2)
sha = r.script_load(multiply_script)
# 2 * 3 == 6
assert r.evalsha(sha, 1, "a", 3) == 6
@skip_if_server_version_lt("7.0.0")
@skip_if_redis_enterprise()
def test_evalsha_ro(self, r):
r.set("a", "b")
get_sha = r.script_load("return redis.call('GET', KEYS[1])")
del_sha = r.script_load("return redis.call('DEL', KEYS[1])")
assert r.evalsha_ro(get_sha, 1, "a") == b"b"
with pytest.raises(redis.ResponseError):
r.evalsha_ro(del_sha, 1, "a")
def test_evalsha_script_not_loaded(self, r):
r.set("a", 2)
sha = r.script_load(multiply_script)
# remove the script from Redis's cache
r.script_flush()
with pytest.raises(exceptions.NoScriptError):
r.evalsha(sha, 1, "a", 3)
def test_script_loading(self, r):
# get the sha, then clear the cache
sha = r.script_load(multiply_script)
r.script_flush()
assert r.script_exists(sha) == [False]
r.script_load(multiply_script)
assert r.script_exists(sha) == [True]
def test_flush_response(self, r):
r.script_load(multiply_script)
flush_response = r.script_flush()
assert flush_response is True
def test_script_object(self, r):
r.set("a", 2)
multiply = r.register_script(multiply_script)
precalculated_sha = multiply.sha
assert precalculated_sha
assert r.script_exists(multiply.sha) == [False]
# Test second evalsha block (after NoScriptError)
assert multiply(keys=["a"], args=[3]) == 6
# At this point, the script should be loaded
assert r.script_exists(multiply.sha) == [True]
# Test that the precalculated sha matches the one from redis
assert multiply.sha == precalculated_sha
# Test first evalsha block
assert multiply(keys=["a"], args=[3]) == 6
# Scripting is not supported in cluster pipelines
@pytest.mark.onlynoncluster
def test_script_object_in_pipeline(self, r):
multiply = r.register_script(multiply_script)
precalculated_sha = multiply.sha
assert precalculated_sha
pipe = r.pipeline()
pipe.set("a", 2)
pipe.get("a")
multiply(keys=["a"], args=[3], client=pipe)
assert r.script_exists(multiply.sha) == [False]
# [SET worked, GET 'a', result of multiple script]
assert pipe.execute() == [True, b"2", 6]
# The script should have been loaded by pipe.execute()
assert r.script_exists(multiply.sha) == [True]
# The precalculated sha should have been the correct one
assert multiply.sha == precalculated_sha
# purge the script from redis's cache and re-run the pipeline
# the multiply script should be reloaded by pipe.execute()
r.script_flush()
pipe = r.pipeline()
pipe.set("a", 2)
pipe.get("a")
multiply(keys=["a"], args=[3], client=pipe)
assert r.script_exists(multiply.sha) == [False]
# [SET worked, GET 'a', result of multiple script]
assert pipe.execute() == [True, b"2", 6]
assert r.script_exists(multiply.sha) == [True]
# Scripting is not supported in cluster pipelines
@pytest.mark.onlynoncluster
def test_eval_msgpack_pipeline_error_in_lua(self, r):
msgpack_hello = r.register_script(msgpack_hello_script)
assert msgpack_hello.sha
pipe = r.pipeline()
# avoiding a dependency to msgpack, this is the output of
# msgpack.dumps({"name": "joe"})
msgpack_message_1 = b"\x81\xa4name\xa3Joe"
msgpack_hello(args=[msgpack_message_1], client=pipe)
assert r.script_exists(msgpack_hello.sha) == [False]
assert pipe.execute()[0] == b"hello Joe"
assert r.script_exists(msgpack_hello.sha) == [True]
msgpack_hello_broken = r.register_script(msgpack_hello_script_broken)
msgpack_hello_broken(args=[msgpack_message_1], client=pipe)
with pytest.raises(exceptions.ResponseError) as excinfo:
pipe.execute()
assert excinfo.type == exceptions.ResponseError
|