summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/sql
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/sql')
-rw-r--r--lib/sqlalchemy/sql/selectable.py97
1 files changed, 54 insertions, 43 deletions
diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py
index 248048662..b4df87e54 100644
--- a/lib/sqlalchemy/sql/selectable.py
+++ b/lib/sqlalchemy/sql/selectable.py
@@ -746,6 +746,33 @@ class Join(FromClause):
providing a "natural join".
"""
+ constraints = cls._joincond_scan_left_right(
+ a, a_subset, b, consider_as_foreign_keys)
+
+ if len(constraints) > 1:
+ cls._joincond_trim_constraints(
+ a, b, constraints, consider_as_foreign_keys)
+
+ if len(constraints) == 0:
+ if isinstance(b, FromGrouping):
+ hint = " Perhaps you meant to convert the right side to a "\
+ "subquery using alias()?"
+ else:
+ hint = ""
+ raise exc.NoForeignKeysError(
+ "Can't find any foreign key relationships "
+ "between '%s' and '%s'.%s" %
+ (a.description, b.description, hint))
+
+ crit = [(x == y) for x, y in list(constraints.values())[0]]
+ if len(crit) == 1:
+ return (crit[0])
+ else:
+ return and_(*crit)
+
+ @classmethod
+ def _joincond_scan_left_right(
+ cls, a, a_subset, b, consider_as_foreign_keys):
constraints = collections.defaultdict(list)
for left in (a_subset, a):
@@ -780,57 +807,41 @@ class Join(FromClause):
if nrte.table_name == b.name:
raise
else:
- # this is totally covered. can't get
- # coverage to mark it.
continue
if col is not None:
constraints[fk.constraint].append((col, fk.parent))
if constraints:
break
+ return constraints
+ @classmethod
+ def _joincond_trim_constraints(
+ cls, a, b, constraints, consider_as_foreign_keys):
+ # more than one constraint matched. narrow down the list
+ # to include just those FKCs that match exactly to
+ # "consider_as_foreign_keys".
+ if consider_as_foreign_keys:
+ for const in list(constraints):
+ if set(f.parent for f in const.elements) != set(
+ consider_as_foreign_keys):
+ del constraints[const]
+
+ # if still multiple constraints, but
+ # they all refer to the exact same end result, use it.
if len(constraints) > 1:
- # more than one constraint matched. narrow down the list
- # to include just those FKCs that match exactly to
- # "consider_as_foreign_keys".
- if consider_as_foreign_keys:
- for const in list(constraints):
- if set(f.parent for f in const.elements) != set(
- consider_as_foreign_keys):
- del constraints[const]
-
- # if still multiple constraints, but
- # they all refer to the exact same end result, use it.
- if len(constraints) > 1:
- dedupe = set(tuple(crit) for crit in constraints.values())
- if len(dedupe) == 1:
- key = list(constraints)[0]
- constraints = {key: constraints[key]}
-
- if len(constraints) != 1:
- raise exc.AmbiguousForeignKeysError(
- "Can't determine join between '%s' and '%s'; "
- "tables have more than one foreign key "
- "constraint relationship between them. "
- "Please specify the 'onclause' of this "
- "join explicitly." % (a.description, b.description))
-
- if len(constraints) == 0:
- if isinstance(b, FromGrouping):
- hint = " Perhaps you meant to convert the right side to a "\
- "subquery using alias()?"
- else:
- hint = ""
- raise exc.NoForeignKeysError(
- "Can't find any foreign key relationships "
- "between '%s' and '%s'.%s" %
- (a.description, b.description, hint))
-
- crit = [(x == y) for x, y in list(constraints.values())[0]]
- if len(crit) == 1:
- return (crit[0])
- else:
- return and_(*crit)
+ dedupe = set(tuple(crit) for crit in constraints.values())
+ if len(dedupe) == 1:
+ key = list(constraints)[0]
+ constraints = {key: constraints[key]}
+
+ if len(constraints) != 1:
+ raise exc.AmbiguousForeignKeysError(
+ "Can't determine join between '%s' and '%s'; "
+ "tables have more than one foreign key "
+ "constraint relationship between them. "
+ "Please specify the 'onclause' of this "
+ "join explicitly." % (a.description, b.description))
def select(self, whereclause=None, **kwargs):
"""Create a :class:`.Select` from this :class:`.Join`.