summaryrefslogtreecommitdiff
path: root/taskflow/patterns
diff options
context:
space:
mode:
authorIvan A. Melnikov <imelnikov@griddynamics.com>2014-05-06 15:26:46 +0400
committerIvan A. Melnikov <imelnikov@griddynamics.com>2014-05-07 11:17:50 +0400
commit095650653d53d4a2f431bea20fabc440263d38e7 (patch)
treee4a61c288ec7591122405cac9bea78d532ef3153 /taskflow/patterns
parentee1e96d87d385baf3f583cabea94997b50130985 (diff)
downloadtaskflow-095650653d53d4a2f431bea20fabc440263d38e7.tar.gz
Put provides and requires code to basic Flow
Code that calculates provides and requires for flow is almost identical for all patterns, so this change makes it completely identical and puts it to the base class. Other patterns are still allowed to override these properties for sake of customization or optimization. Change-Id: I6e875e863047b5287ec727fc9a491f252f144ecf
Diffstat (limited to 'taskflow/patterns')
-rw-r--r--taskflow/patterns/graph_flow.py22
-rw-r--r--taskflow/patterns/linear_flow.py19
-rw-r--r--taskflow/patterns/unordered_flow.py33
3 files changed, 12 insertions, 62 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 6869199..0ed74c7 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -104,8 +104,8 @@ class Flow(flow.Flow):
if self.retry:
update_requirements(self.retry)
- provided.update(dict((k,
- self.retry) for k in self._retry_provides))
+ provided.update(dict((k, self.retry)
+ for k in self.retry.provides))
# NOTE(harlowja): Add items and edges to a temporary copy of the
# underlying graph and only if that is successful added to do we then
@@ -123,7 +123,7 @@ class Flow(flow.Flow):
% dict(item=item.name,
flow=provided[value].name,
value=value))
- if value in self._retry_requires:
+ if self.retry and value in self.retry.requires:
raise exc.DependencyFailure(
"Flows retry controller %(retry)s requires %(value)s "
"but item %(item)s being added to the flow produces "
@@ -167,22 +167,6 @@ class Flow(flow.Flow):
for (u, v, e_data) in self._get_subgraph().edges_iter(data=True):
yield (u, v, e_data)
- @property
- def provides(self):
- provides = set()
- provides.update(self._retry_provides)
- for subflow in self:
- provides.update(subflow.provides)
- return provides
-
- @property
- def requires(self):
- requires = set()
- requires.update(self._retry_requires)
- for subflow in self:
- requires.update(subflow.requires)
- return requires - self.provides
-
class TargetedFlow(Flow):
"""Graph flow with a target.
diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py
index d7cbb54..48b4d3c 100644
--- a/taskflow/patterns/linear_flow.py
+++ b/taskflow/patterns/linear_flow.py
@@ -78,22 +78,3 @@ class Flow(flow.Flow):
for src, dst in zip(self._children[:-1],
self._children[1:]):
yield (src, dst, _LINK_METADATA.copy())
-
- @property
- def provides(self):
- provides = set()
- provides.update(self._retry_provides)
- for subflow in self._children:
- provides.update(subflow.provides)
- return provides
-
- @property
- def requires(self):
- requires = set()
- provides = set()
- requires.update(self._retry_requires)
- provides.update(self._retry_provides)
- for subflow in self._children:
- requires.update(subflow.requires - provides)
- provides.update(subflow.provides)
- return requires
diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py
index 2890e80..a837796 100644
--- a/taskflow/patterns/unordered_flow.py
+++ b/taskflow/patterns/unordered_flow.py
@@ -41,12 +41,9 @@ class Flow(flow.Flow):
if not items:
return self
- # NOTE(harlowja): check that items to be added are actually
- # independent.
- provides = set()
- for subflow in self:
- provides.update(subflow.provides)
-
+ # check that items don't provide anything that other
+ # part of flow provides or requires
+ provides = self.provides
old_requires = self.requires
for item in items:
item_provides = item.provides
@@ -57,7 +54,7 @@ class Flow(flow.Flow):
"by other item(s) of unordered flow %(flow)s"
% dict(item=item.name, flow=self.name,
oo=sorted(bad_provs)))
- same_provides = (provides | self._retry_provides) & item.provides
+ same_provides = provides & item.provides
if same_provides:
raise exceptions.DependencyFailure(
"%(item)s provides %(value)s but is already being"
@@ -67,6 +64,11 @@ class Flow(flow.Flow):
value=sorted(same_provides)))
provides |= item.provides
+ # check that items don't require anything other children provides
+ if self.retry:
+ # NOTE(imelnikov): it is allowed to depend on value provided
+ # by retry controller of the flow
+ provides -= self.retry.provides
for item in items:
bad_reqs = provides & item.requires
if bad_reqs:
@@ -79,23 +81,6 @@ class Flow(flow.Flow):
self._children.update(items)
return self
- @property
- def provides(self):
- provides = set()
- provides.update(self._retry_provides)
- for subflow in self:
- provides.update(subflow.provides)
- return provides
-
- @property
- def requires(self):
- requires = set()
- for subflow in self:
- requires.update(subflow.requires)
- requires.update(self._retry_requires)
- requires -= self._retry_provides
- return requires
-
def __len__(self):
return len(self._children)