summaryrefslogtreecommitdiff
path: root/taskflow/utils
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-07-14 16:13:06 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-11-12 17:03:00 -0800
commitae9c701f9073941fbe063d2b7854ff6eed5b5fc0 (patch)
tree7bf5f7f38399226f3119b16e5718f67474e214a4 /taskflow/utils
parent05fbf1faac2370d57f3a64477846c38aae48ab62 (diff)
downloadtaskflow-ae9c701f9073941fbe063d2b7854ff6eed5b5fc0.tar.gz
Add a executor backed conductor and have existing impl. use it
This adds a executor backed job dispatching base class and has the existing blocking executor use it by running jobs and dispatching jobs into a sync executor. It also allows for dispatching jobs into a thread executor, or other executor via a new '_executor_factory' method that can generate executors (it can be overriden in the non-blocking executor to provide your own executors instances). This does alter the behavior in that now that jobs are dispatched into an executor we no longer can immediatly know if a job was dispatched and raised an exception or whether it will raise an exception in the future, so we now alter the 'local_dispatched' to just be a boolean that is used to determine if any dispatches happened (failure or not). Change-Id: I485770e8f4c85d3833892a453c9fb5168d8f0407
Diffstat (limited to 'taskflow/utils')
-rw-r--r--taskflow/utils/misc.py14
-rw-r--r--taskflow/utils/threading_utils.py12
2 files changed, 26 insertions, 0 deletions
diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py
index aa89aa8..ca8faa5 100644
--- a/taskflow/utils/misc.py
+++ b/taskflow/utils/misc.py
@@ -22,6 +22,7 @@ import errno
import inspect
import os
import re
+import socket
import sys
import threading
import types
@@ -42,6 +43,7 @@ from taskflow.types import notifier
from taskflow.utils import deprecation
+UNKNOWN_HOSTNAME = "<unknown>"
NUMERIC_TYPES = six.integer_types + (float,)
# NOTE(imelnikov): regular expression to get scheme from URI,
@@ -68,6 +70,18 @@ class StringIO(six.StringIO):
self.write(linesep)
+def get_hostname(unknown_hostname=UNKNOWN_HOSTNAME):
+ """Gets the machines hostname; if not able to returns an invalid one."""
+ try:
+ hostname = socket.getfqdn()
+ if not hostname:
+ return unknown_hostname
+ else:
+ return hostname
+ except socket.error:
+ return unknown_hostname
+
+
def match_type(obj, matchers):
"""Matches a given object using the given matchers list/iterable.
diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py
index 7de0151..ed55468 100644
--- a/taskflow/utils/threading_utils.py
+++ b/taskflow/utils/threading_utils.py
@@ -15,6 +15,7 @@
# under the License.
import collections
+import multiprocessing
import threading
import six
@@ -35,6 +36,17 @@ def get_ident():
return _thread.get_ident()
+def get_optimal_thread_count(default=2):
+ """Try to guess optimal thread count for current system."""
+ try:
+ return multiprocessing.cpu_count() + 1
+ except NotImplementedError:
+ # NOTE(harlowja): apparently may raise so in this case we will
+ # just setup two threads since it's hard to know what else we
+ # should do in this situation.
+ return default
+
+
def daemon_thread(target, *args, **kwargs):
"""Makes a daemon thread that calls the given target when started."""
thread = threading.Thread(target=target, args=args, kwargs=kwargs)