summaryrefslogtreecommitdiff
path: root/taskflow/patterns/distributed_flow.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/patterns/distributed_flow.py')
-rw-r--r--taskflow/patterns/distributed_flow.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py
index 3a12cb9..90b3f25 100644
--- a/taskflow/patterns/distributed_flow.py
+++ b/taskflow/patterns/distributed_flow.py
@@ -17,11 +17,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+import celery
import logging
from taskflow import logbook
-from celery import chord
LOG = logging.getLogger(__name__)
@@ -37,7 +37,7 @@ class Flow(object):
logbook.add_workflow(name)
def chain_listeners(self, context, initial_task, callback_task):
- """ Register one listener for a task """
+ """Register one listener for a task."""
if self.root is None:
initial_task.name = '%s.%s' % (self.name, initial_task.name)
self.root = initial_task.s(context)
@@ -50,7 +50,7 @@ class Flow(object):
initial_task.link(callback_task.s(context))
def split_listeners(self, context, initial_task, callback_tasks):
- """ Register multiple listeners for one task """
+ """Register multiple listeners for one task."""
if self.root is None:
initial_task.name = '%s.%s' % (self.name, initial_task.name)
self.root = initial_task.s(context)
@@ -62,7 +62,7 @@ class Flow(object):
initial_task.link(task.s(context))
def merge_listeners(self, context, initial_tasks, callback_task):
- """ Register one listener for multiple tasks """
+ """Register one listener for multiple tasks."""
header = []
if self.root is None:
self.root = []
@@ -79,9 +79,9 @@ class Flow(object):
# TODO(jlucci): Need to set up chord so that it's not executed
# immediately.
- chord(header, body=callback_task)
+ celery.chord(header, body=callback_task)
def run(self, context, *args, **kwargs):
- """ Start root task and kick off workflow """
+ """Start root task and kick off workflow."""
self.root(context)
LOG.info('WF %s has been started' % (self.name,))