summaryrefslogtreecommitdiff
path: root/test/aaa_profiling/test_memusage.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/aaa_profiling/test_memusage.py')
-rw-r--r--test/aaa_profiling/test_memusage.py391
1 files changed, 391 insertions, 0 deletions
diff --git a/test/aaa_profiling/test_memusage.py b/test/aaa_profiling/test_memusage.py
index efeeff9e0..301724509 100644
--- a/test/aaa_profiling/test_memusage.py
+++ b/test/aaa_profiling/test_memusage.py
@@ -6,6 +6,7 @@ import weakref
import sqlalchemy as sa
from sqlalchemy import ForeignKey
+from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import select
@@ -15,9 +16,14 @@ from sqlalchemy import Unicode
from sqlalchemy import util
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
+from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import create_session
+from sqlalchemy.orm import join as orm_join
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import Load
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import subqueryload
@@ -26,12 +32,16 @@ from sqlalchemy.orm.session import _sessions
from sqlalchemy.processors import to_decimal_processor_factory
from sqlalchemy.processors import to_unicode_processor_factory
from sqlalchemy.sql import column
+from sqlalchemy.sql import util as sql_util
+from sqlalchemy.sql.visitors import cloned_traverse
+from sqlalchemy.sql.visitors import replacement_traverse
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import gc_collect
+from ..orm import _fixtures
class A(fixtures.ComparableEntity):
@@ -46,6 +56,28 @@ class ASub(A):
pass
+def assert_cycles(expected=0):
+ def decorate(fn):
+ def go():
+ fn() # warmup, configure mappers, caches, etc.
+
+ gc_collect()
+ gc_collect()
+ gc_collect() # multiple calls seem to matter
+
+ # gc.set_debug(gc.DEBUG_COLLECTABLE)
+ try:
+ return fn() # run for real
+ finally:
+ unreachable = gc_collect()
+ assert unreachable <= expected
+ gc_collect()
+
+ return go
+
+ return decorate
+
+
def profile_memory(
maxtimes=250, assert_no_sessions=True, get_num_objects=None
):
@@ -1066,3 +1098,362 @@ class MemUsageWBackendTest(EnsureZeroed):
go()
finally:
metadata.drop_all()
+
+
+class CycleTest(_fixtures.FixtureTest):
+ __tags__ = ("memory_intensive",)
+ __requires__ = ("cpython", "no_windows")
+
+ run_setup_mappers = "once"
+ run_inserts = "once"
+ run_deletes = None
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+ def test_query(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ return s.query(User).all()
+
+ go()
+
+ def test_query_alias(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ u1 = aliased(User)
+
+ @assert_cycles()
+ def go():
+ s.query(u1).all()
+
+ go()
+
+ def test_entity_path_w_aliased(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ @assert_cycles()
+ def go():
+ u1 = aliased(User)
+ inspect(u1)._path_registry[User.addresses.property]
+
+ go()
+
+ def test_orm_objects_from_query(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ def generate():
+ objects = s.query(User).filter(User.id == 7).all()
+ gc_collect()
+ return objects
+
+ @assert_cycles()
+ def go():
+ generate()
+
+ go()
+
+ def test_orm_objects_from_query_w_selectinload(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ def generate():
+ objects = s.query(User).options(selectinload(User.addresses)).all()
+ gc_collect()
+ return objects
+
+ @assert_cycles()
+ def go():
+ generate()
+
+ go()
+
+ def test_selectinload_option_unbound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ selectinload(User.addresses)
+
+ go()
+
+ def test_selectinload_option_bound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ Load(User).selectinload(User.addresses)
+
+ go()
+
+ def test_orm_path(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ inspect(User)._path_registry[User.addresses.property][
+ inspect(Address)
+ ]
+
+ go()
+
+ def test_joinedload_option_unbound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ joinedload(User.addresses)
+
+ go()
+
+ def test_joinedload_option_bound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ l1 = Load(User).joinedload(User.addresses)
+ l1._generate_cache_key()
+
+ go()
+
+ def test_orm_objects_from_query_w_joinedload(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ def generate():
+ objects = s.query(User).options(joinedload(User.addresses)).all()
+ gc_collect()
+ return objects
+
+ @assert_cycles()
+ def go():
+ generate()
+
+ go()
+
+ def test_query_filtered(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ return s.query(User).filter(User.id == 7).all()
+
+ go()
+
+ def test_query_joins(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ # cycles here are due to ClauseElement._cloned_set
+ @assert_cycles(3)
+ def go():
+ s.query(User).join(User.addresses).all()
+
+ go()
+
+ def test_plain_join(self):
+ users, addresses = self.tables("users", "addresses")
+
+ @assert_cycles()
+ def go():
+ str(users.join(addresses))
+
+ go()
+
+ def test_plain_join_select(self):
+ users, addresses = self.tables("users", "addresses")
+
+ # cycles here are due to ClauseElement._cloned_set
+ @assert_cycles(6)
+ def go():
+ s = select([users]).select_from(users.join(addresses))
+ s._froms
+
+ go()
+
+ def test_orm_join(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ str(orm_join(User, Address, User.addresses))
+
+ go()
+
+ def test_join_via_query_relationship(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ s.query(User).join(User.addresses)
+
+ go()
+
+ def test_join_via_query_to_entity(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ s.query(User).join(Address)
+
+ go()
+
+ def test_core_select(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ stmt = s.query(User).join(User.addresses).statement
+
+ @assert_cycles()
+ def go():
+ s.execute(stmt)
+
+ go()
+
+ def test_adapt_statement_replacement_traversal(self):
+ User, Address = self.classes("User", "Address")
+
+ statement = select([User]).select_from(
+ orm_join(User, Address, User.addresses)
+ )
+
+ @assert_cycles()
+ def go():
+ replacement_traverse(statement, {}, lambda x: None)
+
+ go()
+
+ def test_adapt_statement_cloned_traversal(self):
+ User, Address = self.classes("User", "Address")
+
+ statement = select([User]).select_from(
+ orm_join(User, Address, User.addresses)
+ )
+
+ @assert_cycles()
+ def go():
+ cloned_traverse(statement, {}, {})
+
+ go()
+
+ def test_column_adapter_lookup(self):
+ User, Address = self.classes("User", "Address")
+
+ u1 = aliased(User)
+
+ @assert_cycles()
+ def go():
+ adapter = sql_util.ColumnAdapter(inspect(u1).selectable)
+ adapter.columns[User.id]
+
+ go()
+
+ def test_orm_aliased(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ u1 = aliased(User)
+ inspect(u1)
+
+ go()
+
+ @testing.fails
+ def test_the_counter(self):
+ @assert_cycles()
+ def go():
+ x = []
+ x.append(x)
+
+ go()
+
+ def test_weak_sequence(self):
+ class Foo(object):
+ pass
+
+ f = Foo()
+
+ @assert_cycles()
+ def go():
+ util.WeakSequence([f])
+
+ go()
+
+ @testing.provide_metadata
+ def test_optimized_get(self):
+
+ from sqlalchemy.ext.declarative import declarative_base
+
+ Base = declarative_base(metadata=self.metadata)
+
+ class Employee(Base):
+ __tablename__ = "employee"
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
+ type = Column(String(10))
+ __mapper_args__ = {"polymorphic_on": type}
+
+ class Engineer(Employee):
+ __tablename__ = " engineer"
+ id = Column(ForeignKey("employee.id"), primary_key=True)
+
+ engineer_name = Column(String(50))
+ __mapper_args__ = {"polymorphic_identity": "engineer"}
+
+ Base.metadata.create_all(testing.db)
+
+ s = Session(testing.db)
+ s.add(Engineer(engineer_name="wally"))
+ s.commit()
+ s.close()
+
+ @assert_cycles()
+ def go():
+ e1 = s.query(Employee).first()
+ e1.engineer_name
+
+ go()
+
+ def test_visit_binary_product(self):
+ a, b, q, e, f, j, r = [column(chr_) for chr_ in "abqefjr"]
+
+ from sqlalchemy import and_, func
+ from sqlalchemy.sql.util import visit_binary_product
+
+ expr = and_((a + b) == q + func.sum(e + f), j == r)
+
+ def visit(expr, left, right):
+ pass
+
+ @assert_cycles()
+ def go():
+ visit_binary_product(visit, expr)
+
+ go()