summaryrefslogtreecommitdiff
path: root/taskflow
diff options
context:
space:
mode:
authorAngus Salkeld <asalkeld@redhat.com>2013-06-25 22:48:23 +1000
committerAngus Salkeld <asalkeld@redhat.com>2013-06-26 11:04:42 +1000
commita7482eaa1857105e2c845a5d31e5f2b2e7b733ae (patch)
tree3558008bb2ffd4984e6066df8d5c062e40e8cd82 /taskflow
parent6b17ba8ab7a20ff36026dfe90ddc0d9a80f341e0 (diff)
downloadtaskflow-a7482eaa1857105e2c845a5d31e5f2b2e7b733ae.tar.gz
Fix most of the hacking rules
Hacking rules: H302, H303, H304, H401, H404 Change-Id: I38e62696724a99c5ebe74d95d477999bd91a2c9a
Diffstat (limited to 'taskflow')
-rw-r--r--taskflow/backends/celery/celeryapp.py11
-rw-r--r--taskflow/backends/memory.py5
-rw-r--r--taskflow/db/sqlalchemy/api.py28
-rw-r--r--taskflow/db/sqlalchemy/models.py9
-rw-r--r--taskflow/db/sqlalchemy/session.py3
-rw-r--r--taskflow/logbook.py5
-rw-r--r--taskflow/patterns/distributed_flow.py12
-rw-r--r--taskflow/tests/unit/test_memory.py7
8 files changed, 37 insertions, 43 deletions
diff --git a/taskflow/backends/celery/celeryapp.py b/taskflow/backends/celery/celeryapp.py
index 3bb3595..e41d5d6 100644
--- a/taskflow/backends/celery/celeryapp.py
+++ b/taskflow/backends/celery/celeryapp.py
@@ -19,17 +19,16 @@
import logging
import traceback as tb
-from celery.signals import task_failure
-from celery.signals import task_success
+from celery import signals
LOG = logging.getLogger(__name__)
-@task_failure.connect
+@signals.task_failure.connect
def task_error_handler(signal=None, sender=None, task_id=None,
exception=None, args=None, kwargs=None,
traceback=None, einfo=None):
- """ If a task errors out, log all error info """
+ """If a task errors out, log all error info"""
LOG.error('Task %s, id: %s, called with args: %s, and kwargs: %s'
'failed with exception: %s' % (sender.name, task_id,
args, kwargs, exception))
@@ -37,7 +36,7 @@ def task_error_handler(signal=None, sender=None, task_id=None,
# TODO(jlucci): Auto-initiate rollback from failed task
-@task_success.connect
+@signals.task_success.connect
def task_success_handler(singal=None, sender=None, result=None):
- """ Save task results to WF """
+ """Save task results to WF."""
pass
diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py
index b2c83cf..f006e31 100644
--- a/taskflow/backends/memory.py
+++ b/taskflow/backends/memory.py
@@ -16,8 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-from datetime import datetime
-
+import datetime
import functools
import logging
import threading
@@ -198,7 +197,7 @@ class MemoryJobBoard(jobboard.JobBoard):
@check_not_closed
def post(self, job):
with self._lock.acquire(read=False):
- self._board.append((datetime.utcnow(), job))
+ self._board.append((datetime.datetime.utcnow(), job))
# Ensure the job tracks that we posted it
job.posted_on.append(weakref.proxy(self))
# Let people know a job is here
diff --git a/taskflow/db/sqlalchemy/api.py b/taskflow/db/sqlalchemy/api.py
index a52f943..f31c952 100644
--- a/taskflow/db/sqlalchemy/api.py
+++ b/taskflow/db/sqlalchemy/api.py
@@ -24,7 +24,7 @@ import logging
from taskflow import states
from taskflow.db.sqlalchemy import models
-from taskflow.db.sqlalchemy.session import get_session
+from taskflow.db.sqlalchemy import session as sql_session
from taskflow.openstack.common import exception
@@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__)
def model_query(context, *args, **kwargs):
- session = kwargs.get('session') or get_session()
+ session = kwargs.get('session') or sql_session.get_session()
query = session.query(*args)
return query
@@ -80,7 +80,7 @@ def logbook_create(context, name, lb_id=None):
def logbook_get_workflows(context, lb_id):
"""Return all workflows associated with a logbook"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
lb = logbook_get(context, lb_id, session=session)
@@ -89,7 +89,7 @@ def logbook_get_workflows(context, lb_id):
def logbook_add_workflow(context, lb_id, wf_name):
"""Add Workflow to given LogBook"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
lb = logbook_get(context, lb_id, session=session)
@@ -101,7 +101,7 @@ def logbook_add_workflow(context, lb_id, wf_name):
def logbook_destroy(context, lb_id):
"""Delete a given LogBook"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
lb = logbook_get(context, lb_id, session=session)
lb.delete(session=session)
@@ -126,7 +126,7 @@ def job_get(context, job_id, session=None):
def job_update(context, job_id, values):
"""Update job with given values"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
job.update(values)
@@ -135,7 +135,7 @@ def job_update(context, job_id, values):
def job_add_workflow(context, job_id, wf_id):
"""Add a Workflow to given job"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
wf = workflow_get(context, wf_id, session=session)
@@ -157,7 +157,7 @@ def job_get_state(context, job_id):
def job_get_logbook(context, job_id):
"""Return the logbook associated with the given job"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
return job.logbook
@@ -177,7 +177,7 @@ def job_create(context, name, job_id=None):
def job_destroy(context, job_id):
"""Delete a given Job"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
job = job_get(context, job_id, session=session)
job.delete(session=session)
@@ -218,7 +218,7 @@ def workflow_get_names(context):
def workflow_get_tasks(context, wf_name):
"""Return all tasks for a given Workflow"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
@@ -227,7 +227,7 @@ def workflow_get_tasks(context, wf_name):
def workflow_add_task(context, wf_id, task_id):
"""Add a task to a given workflow"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
task = task_get(context, task_id, session=session)
wf = workflow_get(context, wf_id, session=session)
@@ -246,7 +246,7 @@ def workflow_create(context, workflow_name):
def workflow_destroy(context, wf_name):
"""Delete a given Workflow"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
wf = workflow_get(context, wf_name, session=session)
wf.delete(session=session)
@@ -283,7 +283,7 @@ def task_create(context, task_name, wf_id, task_id=None):
def task_update(context, task_id, values):
"""Update Task with given values"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
task = task_get(context, task_id)
@@ -293,7 +293,7 @@ def task_update(context, task_id, values):
def task_destroy(context, task_id):
"""Delete an existing Task"""
- session = get_session()
+ session = sql_session.get_session()
with session.begin():
task = task_get(context, task_id, session=session)
task.delete(session=session)
diff --git a/taskflow/db/sqlalchemy/models.py b/taskflow/db/sqlalchemy/models.py
index 6f8a150..35572fa 100644
--- a/taskflow/db/sqlalchemy/models.py
+++ b/taskflow/db/sqlalchemy/models.py
@@ -29,8 +29,7 @@ from sqlalchemy.orm import object_mapper, relationship
from sqlalchemy import DateTime, ForeignKey
from sqlalchemy import types as types
-from taskflow.db.sqlalchemy.session import get_engine
-from taskflow.db.sqlalchemy.session import get_session
+from taskflow.db.sqlalchemy import session as sql_session
from taskflow.openstack.common import exception
from taskflow.openstack.common import timeutils
from taskflow.openstack.common import uuidutils
@@ -59,7 +58,7 @@ class TaskFlowBase(object):
def save(self, session=None):
"""Save this object."""
if not session:
- session = get_session()
+ session = sql_session.get_session()
session.add(self)
try:
session.flush()
@@ -74,7 +73,7 @@ class TaskFlowBase(object):
self.deleted = True
self.deleted_at = timeutils.utcnow()
if not session:
- session = get_session()
+ session = sql_session.get_session()
session.delete(self)
session.flush()
@@ -182,4 +181,4 @@ class Task(BASE, TaskFlowBase):
def create_tables():
- BASE.metadata.create_all(get_engine())
+ BASE.metadata.create_all(sql_session.get_engine())
diff --git a/taskflow/db/sqlalchemy/session.py b/taskflow/db/sqlalchemy/session.py
index 66af18d..bf84ecb 100644
--- a/taskflow/db/sqlalchemy/session.py
+++ b/taskflow/db/sqlalchemy/session.py
@@ -52,8 +52,7 @@ def synchronous_switch_listener(dbapi_conn, connection_rec):
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
- """
-Ensures that MySQL connections checked out of the
+ """Ensures that MySQL connections checked out of the
pool are alive.
Borrowed from:
diff --git a/taskflow/logbook.py b/taskflow/logbook.py
index 943896e..037a330 100644
--- a/taskflow/logbook.py
+++ b/taskflow/logbook.py
@@ -17,16 +17,15 @@
# under the License.
import abc
+import datetime
import weakref
-from datetime import datetime
-
class TaskDetail(object):
"""Task details have the bare minimum of these fields/methods."""
def __init__(self, name, metadata=None):
- self.date_created = datetime.utcnow()
+ self.date_created = datetime.datetime.utcnow()
self.name = name
self.metadata = metadata
self.date_updated = None
diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py
index 3a12cb9..90b3f25 100644
--- a/taskflow/patterns/distributed_flow.py
+++ b/taskflow/patterns/distributed_flow.py
@@ -17,11 +17,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+import celery
import logging
from taskflow import logbook
-from celery import chord
LOG = logging.getLogger(__name__)
@@ -37,7 +37,7 @@ class Flow(object):
logbook.add_workflow(name)
def chain_listeners(self, context, initial_task, callback_task):
- """ Register one listener for a task """
+ """Register one listener for a task."""
if self.root is None:
initial_task.name = '%s.%s' % (self.name, initial_task.name)
self.root = initial_task.s(context)
@@ -50,7 +50,7 @@ class Flow(object):
initial_task.link(callback_task.s(context))
def split_listeners(self, context, initial_task, callback_tasks):
- """ Register multiple listeners for one task """
+ """Register multiple listeners for one task."""
if self.root is None:
initial_task.name = '%s.%s' % (self.name, initial_task.name)
self.root = initial_task.s(context)
@@ -62,7 +62,7 @@ class Flow(object):
initial_task.link(task.s(context))
def merge_listeners(self, context, initial_tasks, callback_task):
- """ Register one listener for multiple tasks """
+ """Register one listener for multiple tasks."""
header = []
if self.root is None:
self.root = []
@@ -79,9 +79,9 @@ class Flow(object):
# TODO(jlucci): Need to set up chord so that it's not executed
# immediately.
- chord(header, body=callback_task)
+ celery.chord(header, body=callback_task)
def run(self, context, *args, **kwargs):
- """ Start root task and kick off workflow """
+ """Start root task and kick off workflow."""
self.root(context)
LOG.info('WF %s has been started' % (self.name,))
diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py
index 3ea73bb..13b11b5 100644
--- a/taskflow/tests/unit/test_memory.py
+++ b/taskflow/tests/unit/test_memory.py
@@ -16,8 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-from datetime import datetime
-
+import datetime
import functools
import threading
import unittest2
@@ -65,7 +64,7 @@ class MemoryBackendTest(unittest2.TestCase):
if not my_jobs:
# No jobs were claimed, lets not search the past again
# then, since *likely* those jobs will remain claimed...
- job_search_from = datetime.utcnow()
+ job_search_from = datetime.datetime.utcnow()
if my_jobs and poison.isSet():
# Oh crap, we need to unclaim and repost the jobs.
for j in my_jobs:
@@ -215,7 +214,7 @@ class MemoryBackendTest(unittest2.TestCase):
j.claim(owner)
def receive_job():
- start = datetime.utcnow()
+ start = datetime.datetime.utcnow()
receiver_awake.set()
new_jobs = []
while not new_jobs: