diff options
Diffstat (limited to 'taskflow/patterns')
| -rw-r--r-- | taskflow/patterns/distributed_flow.py | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py index 3a12cb9..90b3f25 100644 --- a/taskflow/patterns/distributed_flow.py +++ b/taskflow/patterns/distributed_flow.py @@ -17,11 +17,11 @@ # License for the specific language governing permissions and limitations # under the License. +import celery import logging from taskflow import logbook -from celery import chord LOG = logging.getLogger(__name__) @@ -37,7 +37,7 @@ class Flow(object): logbook.add_workflow(name) def chain_listeners(self, context, initial_task, callback_task): - """ Register one listener for a task """ + """Register one listener for a task.""" if self.root is None: initial_task.name = '%s.%s' % (self.name, initial_task.name) self.root = initial_task.s(context) @@ -50,7 +50,7 @@ class Flow(object): initial_task.link(callback_task.s(context)) def split_listeners(self, context, initial_task, callback_tasks): - """ Register multiple listeners for one task """ + """Register multiple listeners for one task.""" if self.root is None: initial_task.name = '%s.%s' % (self.name, initial_task.name) self.root = initial_task.s(context) @@ -62,7 +62,7 @@ class Flow(object): initial_task.link(task.s(context)) def merge_listeners(self, context, initial_tasks, callback_task): - """ Register one listener for multiple tasks """ + """Register one listener for multiple tasks.""" header = [] if self.root is None: self.root = [] @@ -79,9 +79,9 @@ class Flow(object): # TODO(jlucci): Need to set up chord so that it's not executed # immediately. - chord(header, body=callback_task) + celery.chord(header, body=callback_task) def run(self, context, *args, **kwargs): - """ Start root task and kick off workflow """ + """Start root task and kick off workflow.""" self.root(context) LOG.info('WF %s has been started' % (self.name,)) |
