summaryrefslogtreecommitdiff
path: root/tools/generate_sql_functions.py
blob: 794b8448792d58dd28699762da4d30eab93754ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""Generate inline stubs for generic functions on func

"""
# mypy: ignore-errors

from __future__ import annotations

import inspect
import re
from tempfile import NamedTemporaryFile
import textwrap

from sqlalchemy.sql.functions import _registry
from sqlalchemy.types import TypeEngine
from sqlalchemy.util.tool_support import code_writer_cmd


def _fns_in_deterministic_order():
    reg = _registry["_default"]
    for key in sorted(reg):
        yield key, reg[key]


def process_functions(filename: str, cmd: code_writer_cmd) -> str:

    with NamedTemporaryFile(
        mode="w",
        delete=False,
        suffix=".py",
    ) as buf, open(filename) as orig_py:
        indent = ""
        in_block = False

        for line in orig_py:
            m = re.match(
                r"^( *)# START GENERATED FUNCTION ACCESSORS",
                line,
            )
            if m:
                in_block = True
                buf.write(line)
                indent = m.group(1)
                buf.write(
                    textwrap.indent(
                        """
# code within this block is **programmatically,
# statically generated** by tools/generate_sql_functions.py
""",
                        indent,
                    )
                )

                builtins = set(dir(__builtins__))
                for key, fn_class in _fns_in_deterministic_order():
                    is_reserved_word = key in builtins

                    guess_its_generic = bool(fn_class.__parameters__)

                    buf.write(
                        textwrap.indent(
                            f"""
@property
def {key}(self) -> Type[{fn_class.__name__}{
    '[Any]' if guess_its_generic else ''
}]:{
     '  # noqa: A001' if is_reserved_word else ''
}
    ...

""",
                            indent,
                        )
                    )

            m = re.match(
                r"^( *)# START GENERATED FUNCTION TYPING TESTS",
                line,
            )
            if m:
                in_block = True
                buf.write(line)
                indent = m.group(1)

                buf.write(
                    textwrap.indent(
                        """
# code within this block is **programmatically,
# statically generated** by tools/generate_sql_functions.py
""",
                        indent,
                    )
                )

                count = 0
                for key, fn_class in _fns_in_deterministic_order():
                    if hasattr(fn_class, "type") and isinstance(
                        fn_class.type, TypeEngine
                    ):
                        python_type = fn_class.type.python_type
                        python_expr = rf"Tuple\[.*{python_type.__name__}\]"
                        argspec = inspect.getfullargspec(fn_class)
                        args = ", ".join(
                            'column("x")' for elem in argspec.args[1:]
                        )
                        count += 1

                        buf.write(
                            textwrap.indent(
                                rf"""
stmt{count} = select(func.{key}({args}))

# EXPECTED_RE_TYPE: .*Select\[{python_expr}\]
reveal_type(stmt{count})

""",
                                indent,
                            )
                        )

            if in_block and line.startswith(
                f"{indent}# END GENERATED FUNCTION"
            ):
                in_block = False

            if not in_block:
                buf.write(line)
    return buf.name


def main(cmd: code_writer_cmd) -> None:
    for path in [functions_py, test_functions_py]:
        destination_path = path
        tempfile = process_functions(destination_path, cmd)
        cmd.run_zimports(tempfile)
        cmd.run_black(tempfile)
        cmd.write_output_file_from_tempfile(tempfile, destination_path)


functions_py = "lib/sqlalchemy/sql/functions.py"
test_functions_py = "test/ext/mypy/plain_files/functions.py"


if __name__ == "__main__":

    cmd = code_writer_cmd(__file__)

    with cmd.run_program():
        main(cmd)