diff options
| author | Ian Cordasco <graffatcolmingov@gmail.com> | 2016-06-25 10:12:13 -0500 |
|---|---|---|
| committer | Ian Cordasco <graffatcolmingov@gmail.com> | 2016-06-25 10:12:13 -0500 |
| commit | 1a2c68f5da8ae95b8a156ef6f6a772bf82cf0f88 (patch) | |
| tree | e125328f45274330a116d0ae659e20ad4c8367cf /src | |
| parent | 5c8d767626a31560494996cd02ec5d654734aab2 (diff) | |
| download | flake8-1a2c68f5da8ae95b8a156ef6f6a772bf82cf0f88.tar.gz | |
Move flake8 into src
This is an emerging best practice and there is little reason to not
follow it
Diffstat (limited to 'src')
29 files changed, 4358 insertions, 0 deletions
diff --git a/src/flake8/__init__.py b/src/flake8/__init__.py new file mode 100644 index 0000000..d6c6915 --- /dev/null +++ b/src/flake8/__init__.py @@ -0,0 +1,82 @@ +"""Top-level module for Flake8. + +This module + +- initializes logging for the command-line tool +- tracks the version of the package +- provides a way to configure logging for the command-line tool + +.. autofunction:: flake8.configure_logging + +""" +import logging +try: + from logging import NullHandler +except ImportError: + class NullHandler(logging.Handler): + """Shim for version of Python < 2.7.""" + + def emit(self, record): + """Do nothing.""" + pass +import sys + +LOG = logging.getLogger(__name__) +LOG.addHandler(NullHandler()) + +# Clean up after LOG config +del NullHandler + +__version__ = '3.0.0b1' +__version_info__ = tuple(int(i) for i in __version__.split('.') if i.isdigit()) + + +# There is nothing lower than logging.DEBUG (10) in the logging library, +# but we want an extra level to avoid being too verbose when using -vv. +_EXTRA_VERBOSE = 5 +logging.addLevelName(_EXTRA_VERBOSE, 'VERBOSE') + +_VERBOSITY_TO_LOG_LEVEL = { + # output more than warnings but not debugging info + 1: logging.INFO, # INFO is a numerical level of 20 + # output debugging information + 2: logging.DEBUG, # DEBUG is a numerical level of 10 + # output extra verbose debugging information + 3: _EXTRA_VERBOSE, +} + +LOG_FORMAT = ('%(name)-25s %(processName)-11s %(relativeCreated)6d ' + '%(levelname)-8s %(message)s') + + +def configure_logging(verbosity, filename=None, logformat=LOG_FORMAT): + """Configure logging for flake8. + + :param int verbosity: + How verbose to be in logging information. + :param str filename: + Name of the file to append log information to. + If ``None`` this will log to ``sys.stderr``. + If the name is "stdout" or "stderr" this will log to the appropriate + stream. + """ + if verbosity <= 0: + return + if verbosity > 3: + verbosity = 3 + + log_level = _VERBOSITY_TO_LOG_LEVEL[verbosity] + + if not filename or filename in ('stderr', 'stdout'): + fileobj = getattr(sys, filename or 'stderr') + handler_cls = logging.StreamHandler + else: + fileobj = filename + handler_cls = logging.FileHandler + + handler = handler_cls(fileobj) + handler.setFormatter(logging.Formatter(logformat)) + LOG.addHandler(handler) + LOG.setLevel(log_level) + LOG.debug('Added a %s logging handler to logger root at %s', + filename, __name__) diff --git a/src/flake8/__main__.py b/src/flake8/__main__.py new file mode 100644 index 0000000..42bc428 --- /dev/null +++ b/src/flake8/__main__.py @@ -0,0 +1,4 @@ +"""Module allowing for ``python -m flake8 ...``.""" +from flake8.main import cli + +cli.main() diff --git a/src/flake8/api/__init__.py b/src/flake8/api/__init__.py new file mode 100644 index 0000000..9f95557 --- /dev/null +++ b/src/flake8/api/__init__.py @@ -0,0 +1,10 @@ +"""Module containing all public entry-points for Flake8. + +This is the only submodule in Flake8 with a guaranteed stable API. All other +submodules are considered internal only and are subject to change. +""" + + +def get_style_guide(**kwargs): + """Stub out the only function I'm aware of people using.""" + pass diff --git a/src/flake8/checker.py b/src/flake8/checker.py new file mode 100644 index 0000000..b875f44 --- /dev/null +++ b/src/flake8/checker.py @@ -0,0 +1,610 @@ +"""Checker Manager and Checker classes.""" +import errno +import logging +import os +import sys +import tokenize + +try: + import multiprocessing +except ImportError: + multiprocessing = None + +try: + import Queue as queue +except ImportError: + import queue + +from flake8 import defaults +from flake8 import exceptions +from flake8 import processor +from flake8 import utils + +LOG = logging.getLogger(__name__) + +SERIAL_RETRY_ERRNOS = set([ + # ENOSPC: Added by sigmavirus24 + # > On some operating systems (OSX), multiprocessing may cause an + # > ENOSPC error while trying to trying to create a Semaphore. + # > In those cases, we should replace the customized Queue Report + # > class with pep8's StandardReport class to ensure users don't run + # > into this problem. + # > (See also: https://gitlab.com/pycqa/flake8/issues/74) + errno.ENOSPC, + # NOTE(sigmavirus24): When adding to this list, include the reasoning + # on the lines before the error code and always append your error + # code. Further, please always add a trailing `,` to reduce the visual + # noise in diffs. +]) + + +class Manager(object): + """Manage the parallelism and checker instances for each plugin and file. + + This class will be responsible for the following: + + - Determining the parallelism of Flake8, e.g.: + + * Do we use :mod:`multiprocessing` or is it unavailable? + + * Do we automatically decide on the number of jobs to use or did the + user provide that? + + - Falling back to a serial way of processing files if we run into an + OSError related to :mod:`multiprocessing` + + - Organizing the results of each checker so we can group the output + together and make our output deterministic. + """ + + def __init__(self, style_guide, arguments, checker_plugins): + """Initialize our Manager instance. + + :param style_guide: + The instantiated style guide for this instance of Flake8. + :type style_guide: + flake8.style_guide.StyleGuide + :param list arguments: + The extra arguments parsed from the CLI (if any) + :param checker_plugins: + The plugins representing checks parsed from entry-points. + :type checker_plugins: + flake8.plugins.manager.Checkers + """ + self.arguments = arguments + self.style_guide = style_guide + self.options = style_guide.options + self.checks = checker_plugins + self.jobs = self._job_count() + self.process_queue = None + self.results_queue = None + self.statistics_queue = None + self.using_multiprocessing = self.jobs > 1 + self.processes = [] + self.checkers = [] + self.statistics = { + 'files': 0, + 'logical lines': 0, + 'physical lines': 0, + 'tokens': 0, + } + + if self.using_multiprocessing: + try: + self.process_queue = multiprocessing.Queue() + self.results_queue = multiprocessing.Queue() + self.statistics_queue = multiprocessing.Queue() + except OSError as oserr: + if oserr.errno not in SERIAL_RETRY_ERRNOS: + raise + self.using_multiprocessing = False + + @staticmethod + def _cleanup_queue(q): + while not q.empty(): + q.get_nowait() + + def _force_cleanup(self): + if self.using_multiprocessing: + for proc in self.processes: + proc.join(0.2) + self._cleanup_queue(self.process_queue) + self._cleanup_queue(self.results_queue) + self._cleanup_queue(self.statistics_queue) + + def _process_statistics(self): + all_statistics = self.statistics + if self.using_multiprocessing: + total_number_of_checkers = len(self.checkers) + statistics_gathered = 0 + while statistics_gathered < total_number_of_checkers: + try: + statistics = self.statistics_queue.get(block=False) + statistics_gathered += 1 + except queue.Empty: + break + + for statistic in defaults.STATISTIC_NAMES: + all_statistics[statistic] += statistics[statistic] + else: + statistics_generator = (checker.statistics + for checker in self.checkers) + for statistics in statistics_generator: + for statistic in defaults.STATISTIC_NAMES: + all_statistics[statistic] += statistics[statistic] + all_statistics['files'] += len(self.checkers) + + def _job_count(self): + # type: () -> Union[int, NoneType] + # First we walk through all of our error cases: + # - multiprocessing library is not present + # - we're running on windows in which case we know we have significant + # implemenation issues + # - the user provided stdin and that's not something we can handle + # well + # - we're processing a diff, which again does not work well with + # multiprocessing and which really shouldn't require multiprocessing + # - the user provided some awful input + if not multiprocessing: + LOG.warning('The multiprocessing module is not available. ' + 'Ignoring --jobs arguments.') + return 0 + + if (utils.is_windows() and + not utils.can_run_multiprocessing_on_windows()): + LOG.warning('The --jobs option is only available on Windows on ' + 'Python 2.7.11+ and 3.3+. We have detected that you ' + 'are running an unsupported version of Python on ' + 'Windows. Ignoring --jobs arguments.') + return 0 + + if utils.is_using_stdin(self.arguments): + LOG.warning('The --jobs option is not compatible with supplying ' + 'input using - . Ignoring --jobs arguments.') + return 0 + + if self.options.diff: + LOG.warning('The --diff option was specified with --jobs but ' + 'they are not compatible. Ignoring --jobs arguments.') + return 0 + + jobs = self.options.jobs + if jobs != 'auto' and not jobs.isdigit(): + LOG.warning('"%s" is not a valid parameter to --jobs. Must be one ' + 'of "auto" or a numerical value, e.g., 4.', jobs) + return 0 + + # If the value is "auto", we want to let the multiprocessing library + # decide the number based on the number of CPUs. However, if that + # function is not implemented for this particular value of Python we + # default to 1 + if jobs == 'auto': + try: + return multiprocessing.cpu_count() + except NotImplementedError: + return 0 + + # Otherwise, we know jobs should be an integer and we can just convert + # it to an integer + return int(jobs) + + def _results(self): + seen_done = 0 + LOG.info('Retrieving results') + while True: + result = self.results_queue.get() + if result == 'DONE': + seen_done += 1 + if seen_done >= self.jobs: + break + continue + + yield result + + def _handle_results(self, filename, results): + style_guide = self.style_guide + reported_results_count = 0 + for (error_code, line_number, column, text, physical_line) in results: + reported_results_count += style_guide.handle_error( + code=error_code, + filename=filename, + line_number=line_number, + column_number=column, + text=text, + physical_line=physical_line, + ) + return reported_results_count + + def _run_checks_from_queue(self): + LOG.info('Running checks in parallel') + for checker in iter(self.process_queue.get, 'DONE'): + LOG.debug('Running checker for file "%s"', checker.filename) + checker.run_checks(self.results_queue, self.statistics_queue) + self.results_queue.put('DONE') + + def is_path_excluded(self, path): + # type: (str) -> bool + """Check if a path is excluded. + + :param str path: + Path to check against the exclude patterns. + :returns: + True if there are exclude patterns and the path matches, + otherwise False. + :rtype: + bool + """ + exclude = self.options.exclude + if not exclude: + return False + basename = os.path.basename(path) + if utils.fnmatch(basename, exclude): + LOG.info('"%s" has been excluded', basename) + return True + + absolute_path = os.path.abspath(path) + match = utils.fnmatch(absolute_path, exclude) + LOG.info('"%s" has %sbeen excluded', absolute_path, + '' if match else 'not ') + return match + + def make_checkers(self, paths=None): + # type: (List[str]) -> NoneType + """Create checkers for each file.""" + if paths is None: + paths = self.arguments + filename_patterns = self.options.filename + + # NOTE(sigmavirus24): Yes this is a little unsightly, but it's our + # best solution right now. + def should_create_file_checker(filename): + """Determine if we should create a file checker.""" + matches_filename_patterns = utils.fnmatch( + filename, filename_patterns + ) + is_stdin = filename == '-' + file_exists = os.path.exists(filename) + return (file_exists and matches_filename_patterns) or is_stdin + + self.checkers = [ + FileChecker(filename, self.checks, self.style_guide) + for argument in paths + for filename in utils.filenames_from(argument, + self.is_path_excluded) + if should_create_file_checker(filename) + ] + + def report(self): + # type: () -> (int, int) + """Report all of the errors found in the managed file checkers. + + This iterates over each of the checkers and reports the errors sorted + by line number. + + :returns: + A tuple of the total results found and the results reported. + :rtype: + tuple(int, int) + """ + results_reported = results_found = 0 + for checker in self.checkers: + results = sorted(checker.results, key=lambda tup: (tup[2], tup[3])) + results_reported += self._handle_results(checker.filename, + results) + results_found += len(results) + return (results_found, results_reported) + + def run_parallel(self): + """Run the checkers in parallel.""" + LOG.info('Starting %d process workers', self.jobs) + for i in range(self.jobs): + proc = multiprocessing.Process( + target=self._run_checks_from_queue + ) + proc.daemon = True + proc.start() + self.processes.append(proc) + + final_results = {} + for (filename, results) in self._results(): + final_results[filename] = results + + for checker in self.checkers: + filename = checker.filename + checker.results = sorted(final_results.get(filename, []), + key=lambda tup: (tup[1], tup[2])) + + def run_serial(self): + """Run the checkers in serial.""" + for checker in self.checkers: + checker.run_checks(self.results_queue, self.statistics_queue) + + def run(self): + """Run all the checkers. + + This will intelligently decide whether to run the checks in parallel + or whether to run them in serial. + + If running the checks in parallel causes a problem (e.g., + https://gitlab.com/pycqa/flake8/issues/74) this also implements + fallback to serial processing. + """ + try: + if self.using_multiprocessing: + self.run_parallel() + else: + self.run_serial() + except OSError as oserr: + if oserr.errno not in SERIAL_RETRY_ERRNOS: + LOG.exception(oserr) + raise + LOG.warning('Running in serial after OS exception, %r', oserr) + self.run_serial() + + def start(self, paths=None): + """Start checking files. + + :param list paths: + Path names to check. This is passed directly to + :meth:`~Manager.make_checkers`. + """ + LOG.info('Making checkers') + self.make_checkers(paths) + if not self.using_multiprocessing: + return + + LOG.info('Populating process queue') + for checker in self.checkers: + self.process_queue.put(checker) + + for i in range(self.jobs): + self.process_queue.put('DONE') + + def stop(self): + """Stop checking files.""" + self._process_statistics() + for proc in self.processes: + LOG.info('Joining %s to the main process', proc.name) + proc.join() + + +class FileChecker(object): + """Manage running checks for a file and aggregate the results.""" + + def __init__(self, filename, checks, style_guide): + """Initialize our file checker. + + :param str filename: + Name of the file to check. + :param checks: + The plugins registered to check the file. + :type checks: + flake8.plugins.manager.Checkers + :param style_guide: + The initialized StyleGuide for this particular run. + :type style_guide: + flake8.style_guide.StyleGuide + """ + self.filename = filename + self.checks = checks + self.style_guide = style_guide + self.results = [] + self.processor = self._make_processor() + self.statistics = { + 'tokens': 0, + 'logical lines': 0, + 'physical lines': len(self.processor.lines), + } + + def _make_processor(self): + try: + return processor.FileProcessor(self.filename, + self.style_guide.options) + except IOError: + # If we can not read the file due to an IOError (e.g., the file + # does not exist or we do not have the permissions to open it) + # then we need to format that exception for the user. + # NOTE(sigmavirus24): Historically, pep8 has always reported this + # as an E902. We probably *want* a better error code for this + # going forward. + (exc_type, exception) = sys.exc_info()[:2] + message = '{0}: {1}'.format(exc_type.__name__, exception) + self.report('E902', 0, 0, message) + return None + + def report(self, error_code, line_number, column, text): + # type: (str, int, int, str) -> str + """Report an error by storing it in the results list.""" + if error_code is None: + error_code, text = text.split(' ', 1) + + physical_line = '' + # If we're recovering from a problem in _make_processor, we will not + # have this attribute. + if getattr(self, 'processor', None): + physical_line = self.processor.line_for(line_number) + + error = (error_code, line_number, column, text, physical_line) + self.results.append(error) + return error_code + + def run_check(self, plugin, **arguments): + """Run the check in a single plugin.""" + LOG.debug('Running %r with %r', plugin, arguments) + self.processor.keyword_arguments_for(plugin.parameters, arguments) + return plugin.execute(**arguments) + + def run_ast_checks(self): + """Run all checks expecting an abstract syntax tree.""" + try: + ast = self.processor.build_ast() + except (ValueError, SyntaxError, TypeError): + (exc_type, exception) = sys.exc_info()[:2] + if len(exception.args) > 1: + offset = exception.args[1] + if len(offset) > 2: + offset = offset[1:3] + else: + offset = (1, 0) + + self.report('E999', offset[0], offset[1], '%s: %s' % + (exc_type.__name__, exception.args[0])) + return + + for plugin in self.checks.ast_plugins: + checker = self.run_check(plugin, tree=ast) + # NOTE(sigmavirus24): If we want to allow for AST plugins that are + # not classes exclusively, we can do the following: + # retrieve_results = getattr(checker, 'run', lambda: checker) + # Otherwise, we just call run on the checker + for (line_number, offset, text, check) in checker.run(): + self.report( + error_code=None, + line_number=line_number, + column=offset, + text=text, + ) + + def run_logical_checks(self): + """Run all checks expecting a logical line.""" + comments, logical_line, mapping = self.processor.build_logical_line() + if not mapping: + return + self.processor.update_state(mapping) + + LOG.debug('Logical line: "%s"', logical_line.rstrip()) + + for plugin in self.checks.logical_line_plugins: + self.processor.update_checker_state_for(plugin) + results = self.run_check(plugin, logical_line=logical_line) or () + for offset, text in results: + offset = find_offset(offset, mapping) + line_number, column_offset = offset + self.report( + error_code=None, + line_number=line_number, + column=column_offset, + text=text, + ) + + self.processor.next_logical_line() + + def run_physical_checks(self, physical_line): + """Run all checks for a given physical line.""" + for plugin in self.checks.physical_line_plugins: + self.processor.update_checker_state_for(plugin) + result = self.run_check(plugin, physical_line=physical_line) + if result is not None: + column_offset, text = result + error_code = self.report( + error_code=None, + line_number=self.processor.line_number, + column=column_offset, + text=text, + ) + + self.processor.check_physical_error(error_code, physical_line) + + def process_tokens(self): + """Process tokens and trigger checks. + + This can raise a :class:`flake8.exceptions.InvalidSyntax` exception. + Instead of using this directly, you should use + :meth:`flake8.checker.FileChecker.run_checks`. + """ + parens = 0 + statistics = self.statistics + file_processor = self.processor + for token in file_processor.generate_tokens(): + statistics['tokens'] += 1 + self.check_physical_eol(token) + token_type, text = token[0:2] + processor.log_token(LOG, token) + if token_type == tokenize.OP: + parens = processor.count_parentheses(parens, text) + elif parens == 0: + if processor.token_is_newline(token): + self.handle_newline(token_type) + elif (processor.token_is_comment(token) and + len(file_processor.tokens) == 1): + self.handle_comment(token, text) + + if file_processor.tokens: + # If any tokens are left over, process them + self.run_physical_checks(file_processor.lines[-1]) + self.run_logical_checks() + + def run_checks(self, results_queue, statistics_queue): + """Run checks against the file.""" + if self.processor.should_ignore_file(): + return + + try: + self.process_tokens() + except exceptions.InvalidSyntax as exc: + self.report(exc.error_code, exc.line_number, exc.column_number, + exc.error_message) + + self.run_ast_checks() + + if results_queue is not None: + results_queue.put((self.filename, self.results)) + + logical_lines = self.processor.statistics['logical lines'] + self.statistics['logical lines'] = logical_lines + if statistics_queue is not None: + statistics_queue.put(self.statistics) + + def handle_comment(self, token, token_text): + """Handle the logic when encountering a comment token.""" + # The comment also ends a physical line + token = list(token) + token[1] = token_text.rstrip('\r\n') + token[3] = (token[2][0], token[2][1] + len(token[1])) + self.processor.tokens = [tuple(token)] + self.run_logical_checks() + + def handle_newline(self, token_type): + """Handle the logic when encountering a newline token.""" + if token_type == tokenize.NEWLINE: + self.run_logical_checks() + self.processor.reset_blank_before() + elif len(self.processor.tokens) == 1: + # The physical line contains only this token. + self.processor.visited_new_blank_line() + self.processor.delete_first_token() + else: + self.run_logical_checks() + + def check_physical_eol(self, token): + """Run physical checks if and only if it is at the end of the line.""" + if processor.is_eol_token(token): + # Obviously, a newline token ends a single physical line. + self.run_physical_checks(token[4]) + elif processor.is_multiline_string(token): + # Less obviously, a string that contains newlines is a + # multiline string, either triple-quoted or with internal + # newlines backslash-escaped. Check every physical line in the + # string *except* for the last one: its newline is outside of + # the multiline string, so we consider it a regular physical + # line, and will check it like any other physical line. + # + # Subtleties: + # - have to wind self.line_number back because initially it + # points to the last line of the string, and we want + # check_physical() to give accurate feedback + line_no = token[2][0] + with self.processor.inside_multiline(line_number=line_no): + for line in self.processor.split_line(token): + self.run_physical_checks(line + '\n') + + +def find_offset(offset, mapping): + """Find the offset tuple for a single offset.""" + if isinstance(offset, tuple): + return offset + + for token_offset, position in mapping: + if offset <= token_offset: + break + return (position[0], position[1] + offset - token_offset) diff --git a/src/flake8/defaults.py b/src/flake8/defaults.py new file mode 100644 index 0000000..d9f5a0b --- /dev/null +++ b/src/flake8/defaults.py @@ -0,0 +1,17 @@ +"""Constants that define defaults.""" + +EXCLUDE = '.svn,CVS,.bzr,.hg,.git,__pycache__,.tox' +IGNORE = 'E121,E123,E126,E226,E24,E704,W503,W504' +SELECT = 'E,F,W,C' +MAX_LINE_LENGTH = 79 + +TRUTHY_VALUES = set(['true', '1', 't']) + +# Other constants +WHITESPACE = frozenset(' \t') + +STATISTIC_NAMES = ( + 'logical lines', + 'physical lines', + 'tokens', +) diff --git a/src/flake8/exceptions.py b/src/flake8/exceptions.py new file mode 100644 index 0000000..5ff55a2 --- /dev/null +++ b/src/flake8/exceptions.py @@ -0,0 +1,96 @@ +"""Exception classes for all of Flake8.""" + + +class Flake8Exception(Exception): + """Plain Flake8 exception.""" + + pass + + +class FailedToLoadPlugin(Flake8Exception): + """Exception raised when a plugin fails to load.""" + + FORMAT = 'Flake8 failed to load plugin "%(name)s" due to %(exc)s.' + + def __init__(self, *args, **kwargs): + """Initialize our FailedToLoadPlugin exception.""" + self.plugin = kwargs.pop('plugin') + self.ep_name = self.plugin.name + self.original_exception = kwargs.pop('exception') + super(FailedToLoadPlugin, self).__init__(*args, **kwargs) + + def __str__(self): + """Return a nice string for our exception.""" + return self.FORMAT % {'name': self.ep_name, + 'exc': self.original_exception} + + +class InvalidSyntax(Flake8Exception): + """Exception raised when tokenizing a file fails.""" + + def __init__(self, *args, **kwargs): + """Initialize our InvalidSyntax exception.""" + self.original_exception = kwargs.pop('exception') + self.error_code = 'E902' + self.line_number = 1 + self.column_number = 0 + try: + self.error_message = self.original_exception.message + except AttributeError: + # On Python 3, the IOError is an OSError which has a + # strerror attribute instead of a message attribute + self.error_message = self.original_exception.strerror + super(InvalidSyntax, self).__init__(*args, **kwargs) + + +class HookInstallationError(Flake8Exception): + """Parent exception for all hooks errors.""" + + pass + + +class GitHookAlreadyExists(HookInstallationError): + """Exception raised when the git pre-commit hook file already exists.""" + + def __init__(self, *args, **kwargs): + """Initialize the path attribute.""" + self.path = kwargs.pop('path') + super(GitHookAlreadyExists, self).__init__(*args, **kwargs) + + def __str__(self): + """Provide a nice message regarding the exception.""" + msg = ('The Git pre-commit hook ({0}) already exists. To convince ' + 'Flake8 to install the hook, please remove the existing ' + 'hook.') + return msg.format(self.path) + + +class MercurialHookAlreadyExists(HookInstallationError): + """Exception raised when a mercurial hook is already configured.""" + + hook_name = None + + def __init__(self, *args, **kwargs): + """Initialize the relevant attributes.""" + self.path = kwargs.pop('path') + self.value = kwargs.pop('value') + super(MercurialHookAlreadyExists, self).__init__(*args, **kwargs) + + def __str__(self): + """Return a nicely formatted string for these errors.""" + msg = ('The Mercurial {0} hook already exists with "{1}" in {2}. ' + 'To convince Flake8 to install the hook, please remove the ' + '{0} configuration from the [hooks] section of your hgrc.') + return msg.format(self.hook_name, self.value, self.path) + + +class MercurialCommitHookAlreadyExists(MercurialHookAlreadyExists): + """Exception raised when the hg commit hook is already configured.""" + + hook_name = 'commit' + + +class MercurialQRefreshHookAlreadyExists(MercurialHookAlreadyExists): + """Exception raised when the hg commit hook is already configured.""" + + hook_name = 'qrefresh' diff --git a/src/flake8/formatting/__init__.py b/src/flake8/formatting/__init__.py new file mode 100644 index 0000000..bf44801 --- /dev/null +++ b/src/flake8/formatting/__init__.py @@ -0,0 +1 @@ +"""Submodule containing the default formatters for Flake8.""" diff --git a/src/flake8/formatting/base.py b/src/flake8/formatting/base.py new file mode 100644 index 0000000..4fda6f4 --- /dev/null +++ b/src/flake8/formatting/base.py @@ -0,0 +1,161 @@ +"""The base class and interface for all formatting plugins.""" +from __future__ import print_function + + +class BaseFormatter(object): + """Class defining the formatter interface. + + .. attribute:: options + + The options parsed from both configuration files and the command-line. + + .. attribute:: filename + + If specified by the user, the path to store the results of the run. + + .. attribute:: output_fd + + Initialized when the :meth:`start` is called. This will be a file + object opened for writing. + + .. attribute:: newline + + The string to add to the end of a line. This is only used when the + output filename has been specified. + """ + + def __init__(self, options): + """Initialize with the options parsed from config and cli. + + This also calls a hook, :meth:`after_init`, so subclasses do not need + to call super to call this method. + + :param optparse.Values options: + User specified configuration parsed from both configuration files + and the command-line interface. + """ + self.options = options + self.filename = options.output_file + self.output_fd = None + self.newline = '\n' + self.after_init() + + def after_init(self): + """Initialize the formatter further.""" + pass + + def start(self): + """Prepare the formatter to receive input. + + This defaults to initializing :attr:`output_fd` if :attr:`filename` + """ + if self.filename: + self.output_fd = open(self.filename, 'w') + + def handle(self, error): + """Handle an error reported by Flake8. + + This defaults to calling :meth:`format`, :meth:`show_source`, and + then :meth:`write`. To extend how errors are handled, override this + method. + + :param error: + This will be an instance of :class:`~flake8.style_guide.Error`. + :type error: + flake8.style_guide.Error + """ + line = self.format(error) + source = self.show_source(error) + self.write(line, source) + + def format(self, error): + """Format an error reported by Flake8. + + This method **must** be implemented by subclasses. + + :param error: + This will be an instance of :class:`~flake8.style_guide.Error`. + :type error: + flake8.style_guide.Error + :returns: + The formatted error string. + :rtype: + str + """ + raise NotImplementedError('Subclass of BaseFormatter did not implement' + ' format.') + + def show_benchmarks(self, benchmarks): + """Format and print the benchmarks.""" + # NOTE(sigmavirus24): The format strings are a little confusing, even + # to me, so here's a quick explanation: + # We specify the named value first followed by a ':' to indicate we're + # formatting the value. + # Next we use '<' to indicate we want the value left aligned. + # Then '10' is the width of the area. + # For floats, finally, we only want only want at most 3 digits after + # the decimal point to be displayed. This is the precision and it + # can not be specified for integers which is why we need two separate + # format strings. + float_format = '{value:<10.3} {statistic}'.format + int_format = '{value:<10} {statistic}'.format + for statistic, value in benchmarks: + if isinstance(value, int): + benchmark = int_format(statistic=statistic, value=value) + else: + benchmark = float_format(statistic=statistic, value=value) + self._write(benchmark) + + def show_source(self, error): + """Show the physical line generating the error. + + This also adds an indicator for the particular part of the line that + is reported as generating the problem. + + :param error: + This will be an instance of :class:`~flake8.style_guide.Error`. + :type error: + flake8.style_guide.Error + :returns: + The formatted error string if the user wants to show the source. + If the user does not want to show the source, this will return + ``None``. + :rtype: + str + """ + if not self.options.show_source: + return None + pointer = (' ' * error.column_number) + '^' + # Physical lines have a newline at the end, no need to add an extra + # one + return error.physical_line + pointer + + def _write(self, output): + """Handle logic of whether to use an output file or print().""" + if self.output_fd is not None: + self.output_fd.write(output + self.newline) + else: + print(output) + + def write(self, line, source): + """Write the line either to the output file or stdout. + + This handles deciding whether to write to a file or print to standard + out for subclasses. Override this if you want behaviour that differs + from the default. + + :param str line: + The formatted string to print or write. + :param str source: + The source code that has been formatted and associated with the + line of output. + """ + self._write(line) + if source: + self._write(source) + + def stop(self): + """Clean up after reporting is finished.""" + if self.output_fd is not None: + self.output_fd.close() + self.output_fd = None diff --git a/src/flake8/formatting/default.py b/src/flake8/formatting/default.py new file mode 100644 index 0000000..bef8c88 --- /dev/null +++ b/src/flake8/formatting/default.py @@ -0,0 +1,56 @@ +"""Default formatting class for Flake8.""" +from flake8.formatting import base + + +class SimpleFormatter(base.BaseFormatter): + """Simple abstraction for Default and Pylint formatter commonality. + + Sub-classes of this need to define an ``error_format`` attribute in order + to succeed. The ``format`` method relies on that attribute and expects the + ``error_format`` string to use the old-style formatting strings with named + parameters: + + * code + * text + * path + * row + * col + + """ + + error_format = None + + def format(self, error): + """Format and write error out. + + If an output filename is specified, write formatted errors to that + file. Otherwise, print the formatted error to standard out. + """ + return self.error_format % { + "code": error.code, + "text": error.text, + "path": error.filename, + "row": error.line_number, + "col": error.column_number, + } + + +class Default(SimpleFormatter): + """Default formatter for Flake8. + + This also handles backwards compatibility for people specifying a custom + format string. + """ + + error_format = '%(path)s:%(row)d:%(col)d: %(code)s %(text)s' + + def after_init(self): + """Check for a custom format string.""" + if self.options.format.lower() != 'default': + self.error_format = self.options.format + + +class Pylint(SimpleFormatter): + """Pylint formatter for Flake8.""" + + error_format = '%(path)s:%(row)d: [%(code)s] %(text)s' diff --git a/src/flake8/main/__init__.py b/src/flake8/main/__init__.py new file mode 100644 index 0000000..d3aa1de --- /dev/null +++ b/src/flake8/main/__init__.py @@ -0,0 +1 @@ +"""Module containing the logic for the Flake8 entry-points.""" diff --git a/src/flake8/main/application.py b/src/flake8/main/application.py new file mode 100644 index 0000000..225c701 --- /dev/null +++ b/src/flake8/main/application.py @@ -0,0 +1,296 @@ +"""Module containing the application logic for Flake8.""" +from __future__ import print_function + +import logging +import sys +import time + +import flake8 +from flake8 import checker +from flake8 import defaults +from flake8 import style_guide +from flake8 import utils +from flake8.main import options +from flake8.options import aggregator +from flake8.options import manager +from flake8.plugins import manager as plugin_manager + +LOG = logging.getLogger(__name__) + + +class Application(object): + """Abstract our application into a class.""" + + def __init__(self, program='flake8', version=flake8.__version__): + # type: (str, str) -> NoneType + """Initialize our application. + + :param str program: + The name of the program/application that we're executing. + :param str version: + The version of the program/application we're executing. + """ + #: The timestamp when the Application instance was instantiated. + self.start_time = time.time() + #: The timestamp when the Application finished reported errors. + self.end_time = None + #: The name of the program being run + self.program = program + #: The version of the program being run + self.version = version + #: The instance of :class:`flake8.options.manager.OptionManager` used + #: to parse and handle the options and arguments passed by the user + self.option_manager = manager.OptionManager( + prog='flake8', version=flake8.__version__ + ) + options.register_default_options(self.option_manager) + + # We haven't found or registered our plugins yet, so let's defer + # printing the version until we aggregate options from config files + # and the command-line. First, let's clone our arguments on the CLI, + # then we'll attempt to remove ``--version`` so that we can avoid + # triggering the "version" action in optparse. If it's not there, we + # do not need to worry and we can continue. If it is, we successfully + # defer printing the version until just a little bit later. + # Similarly we have to defer printing the help text until later. + args = sys.argv[:] + try: + args.remove('--version') + except ValueError: + pass + try: + args.remove('--help') + except ValueError: + pass + try: + args.remove('-h') + except ValueError: + pass + + preliminary_opts, _ = self.option_manager.parse_args(args) + # Set the verbosity of the program + flake8.configure_logging(preliminary_opts.verbose, + preliminary_opts.output_file) + + #: The instance of :class:`flake8.plugins.manager.Checkers` + self.check_plugins = None + #: The instance of :class:`flake8.plugins.manager.Listeners` + self.listening_plugins = None + #: The instance of :class:`flake8.plugins.manager.ReportFormatters` + self.formatting_plugins = None + #: The user-selected formatter from :attr:`formatting_plugins` + self.formatter = None + #: The :class:`flake8.plugins.notifier.Notifier` for listening plugins + self.listener_trie = None + #: The :class:`flake8.style_guide.StyleGuide` built from the user's + #: options + self.guide = None + #: The :class:`flake8.checker.Manager` that will handle running all of + #: the checks selected by the user. + self.file_checker_manager = None + + #: The user-supplied options parsed into an instance of + #: :class:`optparse.Values` + self.options = None + #: The left over arguments that were not parsed by + #: :attr:`option_manager` + self.args = None + #: The number of errors, warnings, and other messages after running + #: flake8 and taking into account ignored errors and lines. + self.result_count = 0 + #: The total number of errors before accounting for ignored errors and + #: lines. + self.total_result_count = 0 + + #: Whether the program is processing a diff or not + self.running_against_diff = False + #: The parsed diff information + self.parsed_diff = {} + + def exit(self): + # type: () -> NoneType + """Handle finalization and exiting the program. + + This should be the last thing called on the application instance. It + will check certain options and exit appropriately. + """ + if self.options.count: + print(self.result_count) + + if not self.options.exit_zero: + raise SystemExit(self.result_count > 0) + + def find_plugins(self): + # type: () -> NoneType + """Find and load the plugins for this application. + + If :attr:`check_plugins`, :attr:`listening_plugins`, or + :attr:`formatting_plugins` are ``None`` then this method will update + them with the appropriate plugin manager instance. Given the expense + of finding plugins (via :mod:`pkg_resources`) we want this to be + idempotent and so only update those attributes if they are ``None``. + """ + if self.check_plugins is None: + self.check_plugins = plugin_manager.Checkers() + + if self.listening_plugins is None: + self.listening_plugins = plugin_manager.Listeners() + + if self.formatting_plugins is None: + self.formatting_plugins = plugin_manager.ReportFormatters() + + self.check_plugins.load_plugins() + self.listening_plugins.load_plugins() + self.formatting_plugins.load_plugins() + + def register_plugin_options(self): + # type: () -> NoneType + """Register options provided by plugins to our option manager.""" + self.check_plugins.register_options(self.option_manager) + self.check_plugins.register_plugin_versions(self.option_manager) + self.listening_plugins.register_options(self.option_manager) + self.formatting_plugins.register_options(self.option_manager) + + def parse_configuration_and_cli(self, argv=None): + # type: (Union[NoneType, List[str]]) -> NoneType + """Parse configuration files and the CLI options. + + :param list argv: + Command-line arguments passed in directly. + """ + if self.options is None and self.args is None: + self.options, self.args = aggregator.aggregate_options( + self.option_manager, argv + ) + + self.running_against_diff = self.options.diff + if self.running_against_diff: + self.parsed_diff = utils.parse_unified_diff() + + self.check_plugins.provide_options(self.option_manager, self.options, + self.args) + self.listening_plugins.provide_options(self.option_manager, + self.options, + self.args) + self.formatting_plugins.provide_options(self.option_manager, + self.options, + self.args) + + def make_formatter(self): + # type: () -> NoneType + """Initialize a formatter based on the parsed options.""" + if self.formatter is None: + self.formatter = self.formatting_plugins.get( + self.options.format, self.formatting_plugins['default'] + ).execute(self.options) + + def make_notifier(self): + # type: () -> NoneType + """Initialize our listener Notifier.""" + if self.listener_trie is None: + self.listener_trie = self.listening_plugins.build_notifier() + + def make_guide(self): + # type: () -> NoneType + """Initialize our StyleGuide.""" + if self.guide is None: + self.guide = style_guide.StyleGuide( + self.options, self.listener_trie, self.formatter + ) + + if self.running_against_diff: + self.guide.add_diff_ranges(self.parsed_diff) + + def make_file_checker_manager(self): + # type: () -> NoneType + """Initialize our FileChecker Manager.""" + if self.file_checker_manager is None: + self.file_checker_manager = checker.Manager( + style_guide=self.guide, + arguments=self.args, + checker_plugins=self.check_plugins, + ) + + def run_checks(self): + # type: () -> NoneType + """Run the actual checks with the FileChecker Manager. + + This method encapsulates the logic to make a + :class:`~flake8.checker.Manger` instance run the checks it is + managing. + """ + files = None + if self.running_against_diff: + files = list(sorted(self.parsed_diff.keys())) + self.file_checker_manager.start(files) + self.file_checker_manager.run() + LOG.info('Finished running') + self.file_checker_manager.stop() + self.end_time = time.time() + + def report_benchmarks(self): + """Aggregate, calculate, and report benchmarks for this run.""" + if not self.options.benchmark: + return + + time_elapsed = self.end_time - self.start_time + statistics = [('seconds elapsed', time_elapsed)] + add_statistic = statistics.append + for statistic in (defaults.STATISTIC_NAMES + ('files',)): + value = self.file_checker_manager.statistics[statistic] + total_description = 'total ' + statistic + ' processed' + add_statistic((total_description, value)) + per_second_description = statistic + ' processed per second' + add_statistic((per_second_description, int(value / time_elapsed))) + + self.formatter.show_benchmarks(statistics) + + def report_errors(self): + # type: () -> NoneType + """Report all the errors found by flake8 3.0. + + This also updates the :attr:`result_count` attribute with the total + number of errors, warnings, and other messages found. + """ + LOG.info('Reporting errors') + results = self.file_checker_manager.report() + self.total_result_count, self.result_count = results + LOG.info('Found a total of %d results and reported %d', + self.total_result_count, self.result_count) + + def initialize(self, argv): + # type: () -> NoneType + """Initialize the application to be run. + + This finds the plugins, registers their options, and parses the + command-line arguments. + """ + self.find_plugins() + self.register_plugin_options() + self.parse_configuration_and_cli(argv) + self.make_formatter() + self.make_notifier() + self.make_guide() + self.make_file_checker_manager() + + def _run(self, argv): + # type: (Union[NoneType, List[str]]) -> NoneType + self.initialize(argv) + self.run_checks() + self.report_errors() + self.report_benchmarks() + + def run(self, argv=None): + # type: (Union[NoneType, List[str]]) -> NoneType + """Run our application. + + This method will also handle KeyboardInterrupt exceptions for the + entirety of the flake8 application. If it sees a KeyboardInterrupt it + will forcibly clean up the :class:`~flake8.checker.Manager`. + """ + try: + self._run(argv) + except KeyboardInterrupt as exc: + LOG.critical('Caught keyboard interrupt from user') + LOG.exception(exc) + self.file_checker_manager._force_cleanup() diff --git a/src/flake8/main/cli.py b/src/flake8/main/cli.py new file mode 100644 index 0000000..29bd159 --- /dev/null +++ b/src/flake8/main/cli.py @@ -0,0 +1,17 @@ +"""Command-line implementation of flake8.""" +from flake8.main import application + + +def main(argv=None): + # type: (Union[NoneType, List[str]]) -> NoneType + """Main entry-point for the flake8 command-line tool. + + This handles the creation of an instance of :class:`Application`, runs it, + and then exits the application. + + :param list argv: + The arguments to be passed to the application for parsing. + """ + app = application.Application() + app.run(argv) + app.exit() diff --git a/src/flake8/main/git.py b/src/flake8/main/git.py new file mode 100644 index 0000000..bae0233 --- /dev/null +++ b/src/flake8/main/git.py @@ -0,0 +1,207 @@ +"""Module containing the main git hook interface and helpers. + +.. autofunction:: hook +.. autofunction:: install + +""" +import contextlib +import os +import shutil +import stat +import subprocess +import sys +import tempfile + +from flake8 import defaults +from flake8 import exceptions + +__all__ = ('hook', 'install') + + +def hook(lazy=False, strict=False): + """Execute Flake8 on the files in git's index. + + Determine which files are about to be committed and run Flake8 over them + to check for violations. + + :param bool lazy: + Find files not added to the index prior to committing. This is useful + if you frequently use ``git commit -a`` for example. This defaults to + False since it will otherwise include files not in the index. + :param bool strict: + If True, return the total number of errors/violations found by Flake8. + This will cause the hook to fail. + :returns: + Total number of errors found during the run. + :rtype: + int + """ + # NOTE(sigmavirus24): Delay import of application until we need it. + from flake8.main import application + app = application.Application() + with make_temporary_directory() as tempdir: + filepaths = list(copy_indexed_files_to(tempdir, lazy)) + app.initialize(filepaths) + app.run_checks() + + app.report_errors() + if strict: + return app.result_count + return 0 + + +def install(): + """Install the git hook script. + + This searches for the ``.git`` directory and will install an executable + pre-commit python script in the hooks sub-directory if one does not + already exist. + + :returns: + True if successful, False if the git directory doesn't exist. + :rtype: + bool + :raises: + flake8.exceptions.GitHookAlreadyExists + """ + git_directory = find_git_directory() + if git_directory is None or not os.path.exists(git_directory): + return False + + hooks_directory = os.path.join(git_directory, 'hooks') + if not os.path.exists(hooks_directory): + os.mkdir(hooks_directory) + + pre_commit_file = os.path.abspath( + os.path.join(hooks_directory, 'pre-commit') + ) + if os.path.exists(pre_commit_file): + raise exceptions.GitHookAlreadyExists( + 'File already exists', + path=pre_commit_file, + ) + + executable = get_executable() + + with open(pre_commit_file, 'w') as fd: + fd.write(_HOOK_TEMPLATE.format(executable=executable)) + + # NOTE(sigmavirus24): The following sets: + # - read, write, and execute permissions for the owner + # - read permissions for people in the group + # - read permissions for other people + # The owner needs the file to be readable, writable, and executable + # so that git can actually execute it as a hook. + pre_commit_permissions = stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH + os.chmod(pre_commit_file, pre_commit_permissions) + return True + + +def get_executable(): + if sys.executable is not None: + return sys.executable + return '/usr/bin/env python' + + +def find_git_directory(): + rev_parse = piped_process(['git', 'rev-parse', '--git-dir']) + + (stdout, _) = rev_parse.communicate() + stdout = to_text(stdout) + + if rev_parse.returncode == 0: + return stdout.strip() + return None + + +def copy_indexed_files_to(temporary_directory, lazy): + modified_files = find_modified_files(lazy) + for filename in modified_files: + contents = get_staged_contents_from(filename) + yield copy_file_to(temporary_directory, filename, contents) + + +def copy_file_to(destination_directory, filepath, contents): + directory, filename = os.path.split(os.path.abspath(filepath)) + temporary_directory = make_temporary_directory_from(destination_directory, + directory) + if not os.path.exists(temporary_directory): + os.makedirs(temporary_directory) + temporary_filepath = os.path.join(temporary_directory, filename) + with open(temporary_filepath, 'wb') as fd: + fd.write(contents) + return temporary_filepath + + +def make_temporary_directory_from(destination, directory): + prefix = os.path.commonprefix([directory, destination]) + common_directory_path = os.path.relpath(directory, start=prefix) + return os.path.join(destination, common_directory_path) + + +def find_modified_files(lazy): + diff_index = piped_process( + ['git', 'diff-index', '--cached', '--name-only', + '--diff-filter=ACMRTUXB', 'HEAD'], + ) + + (stdout, _) = diff_index.communicate() + stdout = to_text(stdout) + return stdout.splitlines() + + +def get_staged_contents_from(filename): + git_show = piped_process(['git', 'show', ':{0}'.format(filename)]) + (stdout, _) = git_show.communicate() + return stdout + + +@contextlib.contextmanager +def make_temporary_directory(): + temporary_directory = tempfile.mkdtemp() + yield temporary_directory + shutil.rmtree(temporary_directory, ignore_errors=True) + + +def to_text(string): + """Ensure that the string is text.""" + if callable(getattr(string, 'decode', None)): + return string.decode('utf-8') + return string + + +def piped_process(command): + return subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + +def git_config_for(parameter): + config = piped_process(['git', 'config', '--get', '--bool', parameter]) + (stdout, _) = config.communicate() + return to_text(stdout) + + +def config_for(parameter): + environment_variable = 'flake8_{0}'.format(parameter).upper() + git_variable = 'flake8.{0}'.format(parameter) + value = os.environ.get(environment_variable, git_config_for(git_variable)) + return value.lower() in defaults.TRUTHY_VALUES + + +_HOOK_TEMPLATE = """#!{executable} +import os +import sys + +from flake8.main import git + +if __name__ == '__main__': + sys.exit( + git.hook( + strict=git.config_for('strict'), + lazy=git.config_for('lazy'), + ) + ) +""" diff --git a/src/flake8/main/mercurial.py b/src/flake8/main/mercurial.py new file mode 100644 index 0000000..d067612 --- /dev/null +++ b/src/flake8/main/mercurial.py @@ -0,0 +1,128 @@ +"""Module containing the main mecurial hook interface and helpers. + +.. autofunction:: hook +.. autofunction:: install + +""" +import configparser +import os +import subprocess + +from flake8 import exceptions as exc + +__all__ = ('hook', 'install') + + +def hook(ui, repo, **kwargs): + """Execute Flake8 on the repository provided by Mercurial. + + To understand the parameters read more of the Mercurial documentation + around Hooks: https://www.mercurial-scm.org/wiki/Hook. + + We avoid using the ``ui`` attribute because it can cause issues with + the GPL license tha Mercurial is under. We don't import it, but we + avoid using it all the same. + """ + from flake8.main import application + hgrc = find_hgrc(create_if_missing=False) + if hgrc is None: + print('Cannot locate your root mercurial repository.') + raise SystemExit(True) + + hgconfig = configparser_for(hgrc) + strict = hgconfig.get('flake8', 'strict', fallback=True) + + filenames = list(get_filenames_from(repo, kwargs)) + + app = application.Application() + app.run(filenames) + + if strict: + return app.result_count + return 0 + + +def install(): + """Ensure that the mercurial hooks are installed.""" + hgrc = find_hgrc(create_if_missing=True) + if hgrc is None: + return False + + hgconfig = configparser_for(hgrc) + + if not hgconfig.has_section('hooks'): + hgconfig.add_section('hooks') + + if hgconfig.has_option('hooks', 'commit'): + raise exc.MercurialCommitHookAlreadyExists( + path=hgrc, + value=hgconfig.get('hooks', 'commit'), + ) + + if hgconfig.has_option('hooks', 'qrefresh'): + raise exc.MercurialQRefreshHookAlreadyExists( + path=hgrc, + value=hgconfig.get('hooks', 'qrefresh'), + ) + + hgconfig.set('hooks', 'commit', 'python:flake8.main.mercurial.hook') + hgconfig.set('hooks', 'qrefresh', 'python:flake8.main.mercurial.hook') + + if not hgconfig.has_section('flake8'): + hgconfig.add_section('flake8') + + if not hgconfig.has_option('flake8', 'strict'): + hgconfig.set('flake8', 'strict', False) + + with open(hgrc, 'w') as fd: + hgconfig.write(fd) + + return True + + +def get_filenames_from(repository, kwargs): + seen_filenames = set() + node = kwargs['node'] + for revision in range(repository[node], len(repository)): + for filename in repository[revision].files(): + full_filename = os.path.join(repository.root, filename) + have_seen_filename = full_filename in seen_filenames + filename_does_not_exist = not os.path.exists(full_filename) + if have_seen_filename or filename_does_not_exist: + continue + + seen_filenames.add(full_filename) + if full_filename.endswith('.py'): + yield full_filename + + +def find_hgrc(create_if_missing=False): + root = subprocess.Popen( + ['hg', 'root'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + (hg_directory, _) = root.communicate() + if callable(getattr(hg_directory, 'decode', None)): + hg_directory = hg_directory.decode('utf-8') + + if not os.path.isdir(hg_directory): + return None + + hgrc = os.path.abspath( + os.path.join(hg_directory, '.hg', 'hgrc') + ) + if not os.path.exists(hgrc): + if create_if_missing: + open(hgrc, 'w').close() + else: + return None + + return hgrc + + +def configparser_for(path): + parser = configparser.ConfigParser(interpolation=None) + parser.read(path) + return parser diff --git a/src/flake8/main/options.py b/src/flake8/main/options.py new file mode 100644 index 0000000..c725c38 --- /dev/null +++ b/src/flake8/main/options.py @@ -0,0 +1,201 @@ +"""Contains the logic for all of the default options for Flake8.""" +from flake8 import defaults +from flake8.main import vcs + + +def register_default_options(option_manager): + """Register the default options on our OptionManager. + + The default options include: + + - ``-v``/``--verbose`` + - ``-q``/``--quiet`` + - ``--count`` + - ``--diff`` + - ``--exclude`` + - ``--filename`` + - ``--format`` + - ``--hang-closing`` + - ``--ignore`` + - ``--max-line-length`` + - ``--select`` + - ``--disable-noqa`` + - ``--show-source`` + - ``--statistics`` + - ``--enable-extensions`` + - ``--exit-zero`` + - ``-j``/``--jobs`` + - ``--output-file`` + - ``--append-config`` + - ``--config`` + - ``--isolated`` + """ + add_option = option_manager.add_option + + # pep8 options + add_option( + '-v', '--verbose', default=0, action='count', + parse_from_config=True, + help='Print more information about what is happening in flake8.' + ' This option is repeatable and will increase verbosity each ' + 'time it is repeated.', + ) + add_option( + '-q', '--quiet', default=0, action='count', + parse_from_config=True, + help='Report only file names, or nothing. This option is repeatable.', + ) + + add_option( + '--count', action='store_true', parse_from_config=True, + help='Print total number of errors and warnings to standard error and' + ' set the exit code to 1 if total is not empty.', + ) + + add_option( + '--diff', action='store_true', + help='Report changes only within line number ranges in the unified ' + 'diff provided on standard in by the user.', + ) + + add_option( + '--exclude', metavar='patterns', default=defaults.EXCLUDE, + comma_separated_list=True, parse_from_config=True, + normalize_paths=True, + help='Comma-separated list of files or directories to exclude.' + ' (Default: %default)', + ) + + add_option( + '--filename', metavar='patterns', default='*.py', + parse_from_config=True, comma_separated_list=True, + help='Only check for filenames matching the patterns in this comma-' + 'separated list. (Default: %default)', + ) + + add_option( + '--stdin-display-name', default='stdin', + help='The name used when reporting errors from code passed via stdin.' + ' This is useful for editors piping the file contents to flake8.' + ' (Default: %default)', + ) + + # TODO(sigmavirus24): Figure out --first/--repeat + + # NOTE(sigmavirus24): We can't use choices for this option since users can + # freely provide a format string and that will break if we restrict their + # choices. + add_option( + '--format', metavar='format', default='default', + parse_from_config=True, + help='Format errors according to the chosen formatter.', + ) + + add_option( + '--hang-closing', action='store_true', parse_from_config=True, + help='Hang closing bracket instead of matching indentation of opening' + " bracket's line.", + ) + + add_option( + '--ignore', metavar='errors', default=defaults.IGNORE, + parse_from_config=True, comma_separated_list=True, + help='Comma-separated list of errors and warnings to ignore (or skip).' + ' For example, ``--ignore=E4,E51,W234``. (Default: %default)', + ) + + add_option( + '--max-line-length', type='int', metavar='n', + default=defaults.MAX_LINE_LENGTH, parse_from_config=True, + help='Maximum allowed line length for the entirety of this run. ' + '(Default: %default)', + ) + + add_option( + '--select', metavar='errors', default=defaults.SELECT, + parse_from_config=True, comma_separated_list=True, + help='Comma-separated list of errors and warnings to enable.' + ' For example, ``--select=E4,E51,W234``. (Default: %default)', + ) + + add_option( + '--disable-noqa', default=False, parse_from_config=True, + action='store_true', + help='Disable the effect of "# noqa". This will report errors on ' + 'lines with "# noqa" at the end.' + ) + + # TODO(sigmavirus24): Decide what to do about --show-pep8 + + add_option( + '--show-source', action='store_true', parse_from_config=True, + help='Show the source generate each error or warning.', + ) + + add_option( + '--statistics', action='store_true', parse_from_config=True, + help='Count errors and warnings.', + ) + + # Flake8 options + add_option( + '--enable-extensions', default='', parse_from_config=True, + comma_separated_list=True, type='string', + help='Enable plugins and extensions that are otherwise disabled ' + 'by default', + ) + + add_option( + '--exit-zero', action='store_true', + help='Exit with status code "0" even if there are errors.', + ) + + add_option( + '--install-hook', action='callback', type='choice', + choices=vcs.choices(), callback=vcs.install, + help='Install a hook that is run prior to a commit for the supported ' + 'version control systema.' + ) + + add_option( + '-j', '--jobs', type='string', default='auto', parse_from_config=True, + help='Number of subprocesses to use to run checks in parallel. ' + 'This is ignored on Windows. The default, "auto", will ' + 'auto-detect the number of processors available to use.' + ' (Default: %default)', + ) + + add_option( + '--output-file', default=None, type='string', parse_from_config=True, + # callback=callbacks.redirect_stdout, + help='Redirect report to a file.', + ) + + # Config file options + + add_option( + '--append-config', action='append', + help='Provide extra config files to parse in addition to the files ' + 'found by Flake8 by default. These files are the last ones read ' + 'and so they take the highest precedence when multiple files ' + 'provide the same option.', + ) + + add_option( + '--config', default=None, + help='Path to the config file that will be the authoritative config ' + 'source. This will cause Flake8 to ignore all other ' + 'configuration files.' + ) + + add_option( + '--isolated', default=False, action='store_true', + help='Ignore all found configuration files.', + ) + + # Benchmarking + + add_option( + '--benchmark', default=False, action='store_true', + help='Print benchmark information about this run of Flake8', + ) diff --git a/src/flake8/main/setuptools_command.py b/src/flake8/main/setuptools_command.py new file mode 100644 index 0000000..1c27bf6 --- /dev/null +++ b/src/flake8/main/setuptools_command.py @@ -0,0 +1,77 @@ +"""The logic for Flake8's integration with setuptools.""" +import os + +import setuptools + +from flake8.main import application as app + + +class Flake8(setuptools.Command): + """Run Flake8 via setuptools/distutils for registered modules.""" + + description = 'Run Flake8 on modules registered in setup.py' + # NOTE(sigmavirus24): If we populated this with a list of tuples, users + # could do something like ``python setup.py flake8 --ignore=E123,E234`` + # but we would have to redefine it and we can't define it dynamically. + # Since I refuse to copy-and-paste the options here or maintain two lists + # of options, and since this will break when users use plugins that + # provide command-line options, we are leaving this empty. If users want + # to configure this command, they can do so through config files. + user_options = [] + + def initialize_options(self): + """Override this method to initialize our application.""" + pass + + def finalize_options(self): + """Override this to parse the parameters.""" + pass + + def package_files(self): + """Collect the files/dirs included in the registered modules.""" + seen_package_directories = () + directories = self.distribution.package_dir or {} + empty_directory_exists = '' in directories + packages = self.distribution.packages or [] + for package in packages: + package_directory = package + if package in directories: + package_directory = directories[package] + elif empty_directory_exists: + package_directory = os.path.join(directories[''], + package_directory) + + # NOTE(sigmavirus24): Do not collect submodules, e.g., + # if we have: + # - flake8/ + # - flake8/plugins/ + # Flake8 only needs ``flake8/`` to be provided. It will + # recurse on its own. + if package_directory.startswith(seen_package_directories): + continue + + seen_package_directories += (package_directory,) + yield package_directory + + def module_files(self): + """Collect the files listed as py_modules.""" + modules = self.distribution.py_modules or [] + filename_from = '{0}.py'.format + for module in modules: + yield filename_from(module) + + def distribution_files(self): + """Collect package and module files.""" + for package in self.package_files(): + yield package + + for module in self.module_files(): + yield module + + yield 'setup.py' + + def run(self): + """Run the Flake8 application.""" + flake8 = app.Application() + flake8.run(list(self.distribution_files())) + flake8.exit() diff --git a/src/flake8/main/vcs.py b/src/flake8/main/vcs.py new file mode 100644 index 0000000..6f7499e --- /dev/null +++ b/src/flake8/main/vcs.py @@ -0,0 +1,39 @@ +"""Module containing some of the logic for our VCS installation logic.""" +from flake8 import exceptions as exc +from flake8.main import git +from flake8.main import mercurial + + +# NOTE(sigmavirus24): In the future, we may allow for VCS hooks to be defined +# as plugins, e.g., adding a flake8.vcs entry-point. In that case, this +# dictionary should disappear, and this module might contain more code for +# managing those bits (in conjuntion with flake8.plugins.manager). +_INSTALLERS = { + 'git': git.install, + 'mercurial': mercurial.install, +} + + +def install(option, option_string, value, parser): + """Determine which version control hook to install. + + For more information about the callback signature, see: + https://docs.python.org/2/library/optparse.html#optparse-option-callbacks + """ + installer = _INSTALLERS.get(value) + errored = False + successful = False + try: + successful = installer() + except exc.HookInstallationError as hook_error: + print(str(hook_error)) + errored = True + + if not successful: + print('Could not find the {0} directory'.format(value)) + raise SystemExit(not successful and errored) + + +def choices(): + """Return the list of VCS choices.""" + return list(_INSTALLERS.keys()) diff --git a/src/flake8/options/__init__.py b/src/flake8/options/__init__.py new file mode 100644 index 0000000..cc20daa --- /dev/null +++ b/src/flake8/options/__init__.py @@ -0,0 +1,12 @@ +"""Package containing the option manager and config management logic. + +- :mod:`flake8.options.config` contains the logic for finding, parsing, and + merging configuration files. + +- :mod:`flake8.options.manager` contains the logic for managing customized + Flake8 command-line and configuration options. + +- :mod:`flake8.options.aggregator` uses objects from both of the above modules + to aggregate configuration into one object used by plugins and Flake8. + +""" diff --git a/src/flake8/options/aggregator.py b/src/flake8/options/aggregator.py new file mode 100644 index 0000000..99d0cfe --- /dev/null +++ b/src/flake8/options/aggregator.py @@ -0,0 +1,74 @@ +"""Aggregation function for CLI specified options and config file options. + +This holds the logic that uses the collected and merged config files and +applies the user-specified command-line configuration on top of it. +""" +import logging + +from flake8 import utils +from flake8.options import config + +LOG = logging.getLogger(__name__) + + +def aggregate_options(manager, arglist=None, values=None): + """Aggregate and merge CLI and config file options. + + :param flake8.option.manager.OptionManager manager: + The instance of the OptionManager that we're presently using. + :param list arglist: + The list of arguments to pass to ``manager.parse_args``. In most cases + this will be None so ``parse_args`` uses ``sys.argv``. This is mostly + available to make testing easier. + :param optparse.Values values: + Previously parsed set of parsed options. + :returns: + Tuple of the parsed options and extra arguments returned by + ``manager.parse_args``. + :rtype: + tuple(optparse.Values, list) + """ + # Get defaults from the option parser + default_values, _ = manager.parse_args([], values=values) + # Get original CLI values so we can find additional config file paths and + # see if --config was specified. + original_values, original_args = manager.parse_args(arglist) + extra_config_files = utils.normalize_paths(original_values.append_config) + + # Make our new configuration file mergerator + config_parser = config.MergedConfigParser( + option_manager=manager, + extra_config_files=extra_config_files, + args=original_args, + ) + + # Get the parsed config + parsed_config = config_parser.parse(original_values.config, + original_values.isolated) + + # Extend the default ignore value with the extended default ignore list, + # registered by plugins. + extended_default_ignore = manager.extended_default_ignore.copy() + LOG.debug('Extended default ignore list: %s', + list(extended_default_ignore)) + extended_default_ignore.update(default_values.ignore) + default_values.ignore = list(extended_default_ignore) + LOG.debug('Merged default ignore list: %s', default_values.ignore) + + # Merge values parsed from config onto the default values returned + for config_name, value in parsed_config.items(): + dest_name = config_name + # If the config name is somehow different from the destination name, + # fetch the destination name from our Option + if not hasattr(default_values, config_name): + dest_name = config_parser.config_options[config_name].dest + + LOG.debug('Overriding default value of (%s) for "%s" with (%s)', + getattr(default_values, dest_name, None), + dest_name, + value) + # Override the default values with the config values + setattr(default_values, dest_name, value) + + # Finally parse the command-line options + return manager.parse_args(arglist, default_values) diff --git a/src/flake8/options/config.py b/src/flake8/options/config.py new file mode 100644 index 0000000..48719a8 --- /dev/null +++ b/src/flake8/options/config.py @@ -0,0 +1,279 @@ +"""Config handling logic for Flake8.""" +import configparser +import logging +import os.path +import sys + +LOG = logging.getLogger(__name__) + +__all__ = ('ConfigFileFinder', 'MergedConfigParser') + + +class ConfigFileFinder(object): + """Encapsulate the logic for finding and reading config files.""" + + PROJECT_FILENAMES = ('setup.cfg', 'tox.ini') + + def __init__(self, program_name, args, extra_config_files): + """Initialize object to find config files. + + :param str program_name: + Name of the current program (e.g., flake8). + :param list args: + The extra arguments passed on the command-line. + :param list extra_config_files: + Extra configuration files specified by the user to read. + """ + # The values of --append-config from the CLI + extra_config_files = extra_config_files or [] + self.extra_config_files = [ + # Ensure the paths are absolute paths for local_config_files + os.path.abspath(f) for f in extra_config_files + ] + + # Platform specific settings + self.is_windows = sys.platform == 'win32' + self.xdg_home = os.environ.get('XDG_CONFIG_HOME', + os.path.expanduser('~/.config')) + + # Look for '.<program_name>' files + self.program_config = '.' + program_name + self.program_name = program_name + + # List of filenames to find in the local/project directory + self.project_filenames = ('setup.cfg', 'tox.ini', self.program_config) + + self.local_directory = os.path.abspath(os.curdir) + + if not args: + args = ['.'] + self.parent = self.tail = os.path.abspath(os.path.commonprefix(args)) + + @staticmethod + def _read_config(files): + config = configparser.RawConfigParser() + try: + found_files = config.read(files) + except configparser.ParsingError: + LOG.exception("There was an error trying to parse a config " + "file. The files we were attempting to parse " + "were: %r", files) + found_files = [] + return (config, found_files) + + def cli_config(self, files): + """Read and parse the config file specified on the command-line.""" + config, found_files = self._read_config(files) + if found_files: + LOG.debug('Found cli configuration files: %s', found_files) + return config + + def generate_possible_local_files(self): + """Find and generate all local config files.""" + tail = self.tail + parent = self.parent + local_dir = self.local_directory + while tail: + for project_filename in self.project_filenames: + filename = os.path.abspath(os.path.join(parent, + project_filename)) + yield filename + if parent == local_dir: + break + (parent, tail) = os.path.split(parent) + + def local_config_files(self): + """Find all local config files which actually exist. + + Filter results from + :meth:`~ConfigFileFinder.generate_possible_local_files` based + on whether the filename exists or not. + + :returns: + List of files that exist that are local project config files with + extra config files appended to that list (which also exist). + :rtype: + [str] + """ + exists = os.path.exists + return [ + filename + for filename in self.generate_possible_local_files() + if os.path.exists(filename) + ] + [f for f in self.extra_config_files if exists(f)] + + def local_configs(self): + """Parse all local config files into one config object.""" + config, found_files = self._read_config(self.local_config_files()) + if found_files: + LOG.debug('Found local configuration files: %s', found_files) + return config + + def user_config_file(self): + """Find the user-level config file.""" + if self.is_windows: + return os.path.expanduser('~\\' + self.program_config) + return os.path.join(self.xdg_home, self.program_name) + + def user_config(self): + """Parse the user config file into a config object.""" + config, found_files = self._read_config(self.user_config_file()) + if found_files: + LOG.debug('Found user configuration files: %s', found_files) + return config + + +class MergedConfigParser(object): + """Encapsulate merging different types of configuration files. + + This parses out the options registered that were specified in the + configuration files, handles extra configuration files, and returns + dictionaries with the parsed values. + """ + + #: Set of types that should use the + #: :meth:`~configparser.RawConfigParser.getint` method. + GETINT_TYPES = set(['int', 'count']) + #: Set of actions that should use the + #: :meth:`~configparser.RawConfigParser.getbool` method. + GETBOOL_ACTIONS = set(['store_true', 'store_false']) + + def __init__(self, option_manager, extra_config_files=None, args=None): + """Initialize the MergedConfigParser instance. + + :param flake8.option.manager.OptionManager option_manager: + Initialized OptionManager. + :param list extra_config_files: + List of extra config files to parse. + :params list args: + The extra parsed arguments from the command-line. + """ + #: Our instance of flake8.options.manager.OptionManager + self.option_manager = option_manager + #: The prog value for the cli parser + self.program_name = option_manager.program_name + #: Parsed extra arguments + self.args = args + #: Mapping of configuration option names to + #: :class:`~flake8.options.manager.Option` instances + self.config_options = option_manager.config_options_dict + #: List of extra config files + self.extra_config_files = extra_config_files or [] + #: Our instance of our :class:`~ConfigFileFinder` + self.config_finder = ConfigFileFinder(self.program_name, self.args, + self.extra_config_files) + + @staticmethod + def _normalize_value(option, value): + final_value = option.normalize(value) + LOG.debug('%r has been normalized to %r for option "%s"', + value, final_value, option.config_name) + return final_value + + def _parse_config(self, config_parser): + config_dict = {} + for option_name in config_parser.options(self.program_name): + if option_name not in self.config_options: + LOG.debug('Option "%s" is not registered. Ignoring.', + option_name) + continue + option = self.config_options[option_name] + + # Use the appropriate method to parse the config value + method = config_parser.get + if option.type in self.GETINT_TYPES: + method = config_parser.getint + elif option.action in self.GETBOOL_ACTIONS: + method = config_parser.getboolean + + value = method(self.program_name, option_name) + LOG.debug('Option "%s" returned value: %r', option_name, value) + + final_value = self._normalize_value(option, value) + config_dict[option_name] = final_value + + return config_dict + + def is_configured_by(self, config): + """Check if the specified config parser has an appropriate section.""" + return config.has_section(self.program_name) + + def parse_local_config(self): + """Parse and return the local configuration files.""" + config = self.config_finder.local_configs() + if not self.is_configured_by(config): + LOG.debug('Local configuration files have no %s section', + self.program_name) + return {} + + LOG.debug('Parsing local configuration files.') + return self._parse_config(config) + + def parse_user_config(self): + """Parse and return the user configuration files.""" + config = self.config_finder.user_config() + if not self.is_configured_by(config): + LOG.debug('User configuration files have no %s section', + self.program_name) + return {} + + LOG.debug('Parsing user configuration files.') + return self._parse_config(config) + + def parse_cli_config(self, config_path): + """Parse and return the file specified by --config.""" + config = self.config_finder.cli_config(config_path) + if not self.is_configured_by(config): + LOG.debug('CLI configuration files have no %s section', + self.program_name) + return {} + + LOG.debug('Parsing CLI configuration files.') + return self._parse_config(config) + + def merge_user_and_local_config(self): + """Merge the parsed user and local configuration files. + + :returns: + Dictionary of the parsed and merged configuration options. + :rtype: + dict + """ + user_config = self.parse_user_config() + config = self.parse_local_config() + + for option, value in user_config.items(): + config.setdefault(option, value) + + return config + + def parse(self, cli_config=None, isolated=False): + """Parse and return the local and user config files. + + First this copies over the parsed local configuration and then + iterates over the options in the user configuration and sets them if + they were not set by the local configuration file. + + :param str cli_config: + Value of --config when specified at the command-line. Overrides + all other config files. + :param bool isolated: + Determines if we should parse configuration files at all or not. + If running in isolated mode, we ignore all configuration files + :returns: + Dictionary of parsed configuration options + :rtype: + dict + """ + if isolated: + LOG.debug('Refusing to parse configuration files due to user-' + 'requested isolation') + return {} + + if cli_config: + LOG.debug('Ignoring user and locally found configuration files. ' + 'Reading only configuration from "%s" specified via ' + '--config by the user', cli_config) + return self.parse_cli_config(cli_config) + + return self.merge_user_and_local_config() diff --git a/src/flake8/options/manager.py b/src/flake8/options/manager.py new file mode 100644 index 0000000..439cba2 --- /dev/null +++ b/src/flake8/options/manager.py @@ -0,0 +1,256 @@ +"""Option handling and Option management logic.""" +import logging +import optparse # pylint: disable=deprecated-module + +from flake8 import utils + +LOG = logging.getLogger(__name__) + + +class Option(object): + """Our wrapper around an optparse.Option object to add features.""" + + def __init__(self, short_option_name=None, long_option_name=None, + # Options below here are taken from the optparse.Option class + action=None, default=None, type=None, dest=None, + nargs=None, const=None, choices=None, callback=None, + callback_args=None, callback_kwargs=None, help=None, + metavar=None, + # Options below here are specific to Flake8 + parse_from_config=False, comma_separated_list=False, + normalize_paths=False): + """Initialize an Option instance wrapping optparse.Option. + + The following are all passed directly through to optparse. + + :param str short_option_name: + The short name of the option (e.g., ``-x``). This will be the + first argument passed to :class:`~optparse.Option`. + :param str long_option_name: + The long name of the option (e.g., ``--xtra-long-option``). This + will be the second argument passed to :class:`~optparse.Option`. + :param str action: + Any action allowed by :mod:`optparse`. + :param default: + Default value of the option. + :param type: + Any type allowed by :mod:`optparse`. + :param dest: + Attribute name to store parsed option value as. + :param nargs: + Number of arguments to parse for this option. + :param const: + Constant value to store on a common destination. Usually used in + conjuntion with ``action="store_const"``. + :param iterable choices: + Possible values for the option. + :param callable callback: + Callback used if the action is ``"callback"``. + :param iterable callback_args: + Additional positional arguments to the callback callable. + :param dictionary callback_kwargs: + Keyword arguments to the callback callable. + :param str help: + Help text displayed in the usage information. + :param str metavar: + Name to use instead of the long option name for help text. + + The following parameters are for Flake8's option handling alone. + + :param bool parse_from_config: + Whether or not this option should be parsed out of config files. + :param bool comma_separated_list: + Whether the option is a comma separated list when parsing from a + config file. + :param bool normalize_paths: + Whether the option is expecting a path or list of paths and should + attempt to normalize the paths to absolute paths. + """ + self.short_option_name = short_option_name + self.long_option_name = long_option_name + self.option_args = [ + x for x in (short_option_name, long_option_name) if x is not None + ] + self.option_kwargs = { + 'action': action, + 'default': default, + 'type': type, + 'dest': self._make_dest(dest), + 'nargs': nargs, + 'const': const, + 'choices': choices, + 'callback': callback, + 'callback_args': callback_args, + 'callback_kwargs': callback_kwargs, + 'help': help, + 'metavar': metavar, + } + # Set attributes for our option arguments + for key, value in self.option_kwargs.items(): + setattr(self, key, value) + + # Set our custom attributes + self.parse_from_config = parse_from_config + self.comma_separated_list = comma_separated_list + self.normalize_paths = normalize_paths + + self.config_name = None + if parse_from_config: + if not long_option_name: + raise ValueError('When specifying parse_from_config=True, ' + 'a long_option_name must also be specified.') + self.config_name = long_option_name[2:].replace('-', '_') + + self._opt = None + + def __repr__(self): + """Simple representation of an Option class.""" + return ( + 'Option({0}, {1}, action={action}, default={default}, ' + 'dest={dest}, type={type}, callback={callback}, help={help},' + ' callback={callback}, callback_args={callback_args}, ' + 'callback_kwargs={callback_kwargs}, metavar={metavar})' + ).format(self.short_option_name, self.long_option_name, + **self.option_kwargs) + + def _make_dest(self, dest): + if dest: + return dest + + if self.long_option_name: + return self.long_option_name[2:].replace('-', '_') + return self.short_option_name[1] + + def normalize(self, value): + """Normalize the value based on the option configuration.""" + if self.normalize_paths: + # Decide whether to parse a list of paths or a single path + normalize = utils.normalize_path + if self.comma_separated_list: + normalize = utils.normalize_paths + return normalize(value) + elif self.comma_separated_list: + return utils.parse_comma_separated_list(value) + return value + + def to_optparse(self): + """Convert a Flake8 Option to an optparse Option.""" + if self._opt is None: + self._opt = optparse.Option(*self.option_args, + **self.option_kwargs) + return self._opt + + +class OptionManager(object): + """Manage Options and OptionParser while adding post-processing.""" + + def __init__(self, prog=None, version=None, + usage='%prog [options] file file ...'): + """Initialize an instance of an OptionManager. + + :param str prog: + Name of the actual program (e.g., flake8). + :param str version: + Version string for the program. + :param str usage: + Basic usage string used by the OptionParser. + """ + self.parser = optparse.OptionParser(prog=prog, version=version, + usage=usage) + self.config_options_dict = {} + self.options = [] + self.program_name = prog + self.version = version + self.registered_plugins = set() + self.extended_default_ignore = set() + + @staticmethod + def format_plugin(plugin_tuple): + """Convert a plugin tuple into a dictionary mapping name to value.""" + return dict(zip(["name", "version"], plugin_tuple)) + + def add_option(self, *args, **kwargs): + """Create and register a new option. + + See parameters for :class:`~flake8.options.manager.Option` for + acceptable arguments to this method. + + .. note:: + + ``short_option_name`` and ``long_option_name`` may be specified + positionally as they are with optparse normally. + """ + if len(args) == 1 and args[0].startswith('--'): + args = (None, args[0]) + option = Option(*args, **kwargs) + self.parser.add_option(option.to_optparse()) + self.options.append(option) + if option.parse_from_config: + self.config_options_dict[option.config_name] = option + LOG.debug('Registered option "%s".', option) + + def remove_from_default_ignore(self, error_codes): + """Remove specified error codes from the default ignore list. + + :param list error_codes: + List of strings that are the error/warning codes to attempt to + remove from the extended default ignore list. + """ + LOG.debug('Removing %r from the default ignore list', error_codes) + for error_code in error_codes: + try: + self.extend_default_ignore.remove(error_code) + except ValueError: + LOG.debug('Attempted to remove %s from default ignore' + ' but it was not a member of the list.', error_code) + + def extend_default_ignore(self, error_codes): + """Extend the default ignore list with the error codes provided. + + :param list error_codes: + List of strings that are the error/warning codes with which to + extend the default ignore list. + """ + LOG.debug('Extending default ignore list with %r', error_codes) + self.extended_default_ignore.update(error_codes) + + def generate_versions(self, format_str='%(name)s: %(version)s'): + """Generate a comma-separated list of versions of plugins.""" + return ', '.join( + format_str % self.format_plugin(plugin) + for plugin in self.registered_plugins + ) + + def update_version_string(self): + """Update the flake8 version string.""" + self.parser.version = (self.version + ' (' + + self.generate_versions() + ')') + + def generate_epilog(self): + """Create an epilog with the version and name of each of plugin.""" + plugin_version_format = '%(name)s: %(version)s' + self.parser.epilog = 'Installed plugins: ' + self.generate_versions( + plugin_version_format + ) + + def parse_args(self, args=None, values=None): + """Simple proxy to calling the OptionParser's parse_args method.""" + self.generate_epilog() + self.update_version_string() + options, xargs = self.parser.parse_args(args, values) + for option in self.options: + old_value = getattr(options, option.dest) + setattr(options, option.dest, option.normalize(old_value)) + + return options, xargs + + def register_plugin(self, name, version): + """Register a plugin relying on the OptionManager. + + :param str name: + The name of the checker itself. This will be the ``name`` + attribute of the class or function loaded from the entry-point. + :param str version: + The version of the checker that we're using. + """ + self.registered_plugins.add((name, version)) diff --git a/src/flake8/plugins/__init__.py b/src/flake8/plugins/__init__.py new file mode 100644 index 0000000..fda6a44 --- /dev/null +++ b/src/flake8/plugins/__init__.py @@ -0,0 +1 @@ +"""Submodule of built-in plugins and plugin managers.""" diff --git a/src/flake8/plugins/_trie.py b/src/flake8/plugins/_trie.py new file mode 100644 index 0000000..4871abb --- /dev/null +++ b/src/flake8/plugins/_trie.py @@ -0,0 +1,97 @@ +"""Independent implementation of a Trie tree.""" + +__all__ = ('Trie', 'TrieNode') + + +def _iterate_stringlike_objects(string): + for i in range(len(string)): + yield string[i:i + 1] + + +class Trie(object): + """The object that manages the trie nodes.""" + + def __init__(self): + """Initialize an empty trie.""" + self.root = TrieNode(None, None) + + def add(self, path, node_data): + """Add the node data to the path described.""" + node = self.root + for prefix in _iterate_stringlike_objects(path): + child = node.find_prefix(prefix) + if child is None: + child = node.add_child(prefix, []) + node = child + node.data.append(node_data) + + def find(self, path): + """Find a node based on the path provided.""" + node = self.root + for prefix in _iterate_stringlike_objects(path): + child = node.find_prefix(prefix) + if child is None: + return None + node = child + return node + + def traverse(self): + """Traverse this tree. + + This performs a depth-first pre-order traversal of children in this + tree. It returns the results consistently by first sorting the + children based on their prefix and then traversing them in + alphabetical order. + """ + return self.root.traverse() + + +class TrieNode(object): + """The majority of the implementation details of a Trie.""" + + def __init__(self, prefix, data, children=None): + """Initialize a TrieNode with data and children.""" + self.children = children or {} + self.data = data + self.prefix = prefix + + def __repr__(self): + """Generate an easy to read representation of the node.""" + return 'TrieNode(prefix={0}, data={1})'.format( + self.prefix, self.data + ) + + def find_prefix(self, prefix): + """Find the prefix in the children of this node. + + :returns: A child matching the prefix or None. + :rtype: :class:`~TrieNode` or None + """ + return self.children.get(prefix, None) + + def add_child(self, prefix, data, children=None): + """Create and add a new child node. + + :returns: The newly created node + :rtype: :class:`~TrieNode` + """ + new_node = TrieNode(prefix, data, children) + self.children[prefix] = new_node + return new_node + + def traverse(self): + """Traverse children of this node. + + This performs a depth-first pre-order traversal of the remaining + children in this sub-tree. It returns the results consistently by + first sorting the children based on their prefix and then traversing + them in alphabetical order. + """ + if not self.children: + return + + for prefix in sorted(self.children.keys()): + child = self.children[prefix] + yield child + for child in child.traverse(): + yield child diff --git a/src/flake8/plugins/manager.py b/src/flake8/plugins/manager.py new file mode 100644 index 0000000..d08d542 --- /dev/null +++ b/src/flake8/plugins/manager.py @@ -0,0 +1,458 @@ +"""Plugin loading and management logic and classes.""" +import collections +import logging + +import pkg_resources + +from flake8 import exceptions +from flake8 import utils +from flake8.plugins import notifier + +LOG = logging.getLogger(__name__) + +__all__ = ( + 'Checkers', + 'Listeners', + 'Plugin', + 'PluginManager', + 'ReportFormatters', +) + +NO_GROUP_FOUND = object() + + +class Plugin(object): + """Wrap an EntryPoint from setuptools and other logic.""" + + def __init__(self, name, entry_point): + """"Initialize our Plugin. + + :param str name: + Name of the entry-point as it was registered with setuptools. + :param entry_point: + EntryPoint returned by setuptools. + :type entry_point: + setuptools.EntryPoint + """ + self.name = name + self.entry_point = entry_point + self._plugin = None + self._parameters = None + self._group = None + self._plugin_name = None + self._version = None + + def __repr__(self): + """Provide an easy to read description of the current plugin.""" + return 'Plugin(name="{0}", entry_point="{1}")'.format( + self.name, self.entry_point + ) + + def is_in_a_group(self): + """Determine if this plugin is in a group. + + :returns: + True if the plugin is in a group, otherwise False. + :rtype: + bool + """ + return self.group() is not None + + def group(self): + """Find and parse the group the plugin is in.""" + if self._group is None: + name = self.name.split('.', 1) + if len(name) > 1: + self._group = name[0] + else: + self._group = NO_GROUP_FOUND + if self._group is NO_GROUP_FOUND: + return None + return self._group + + @property + def parameters(self): + """List of arguments that need to be passed to the plugin.""" + if self._parameters is None: + self._parameters = utils.parameters_for(self) + return self._parameters + + @property + def plugin(self): + """The loaded (and cached) plugin associated with the entry-point. + + This property implicitly loads the plugin and then caches it. + """ + self.load_plugin() + return self._plugin + + @property + def version(self): + """Return the version of the plugin.""" + if self._version is None: + if self.is_in_a_group(): + self._version = version_for(self) + else: + self._version = self.plugin.version + + return self._version + + @property + def plugin_name(self): + """Return the name of the plugin.""" + if self._plugin_name is None: + if self.is_in_a_group(): + self._plugin_name = self.group() + else: + self._plugin_name = self.plugin.name + + return self._plugin_name + + @property + def off_by_default(self): + """Return whether the plugin is ignored by default.""" + return getattr(self.plugin, 'off_by_default', False) + + def execute(self, *args, **kwargs): + r"""Call the plugin with \*args and \*\*kwargs.""" + return self.plugin(*args, **kwargs) # pylint: disable=not-callable + + def _load(self, verify_requirements): + # Avoid relying on hasattr() here. + resolve = getattr(self.entry_point, 'resolve', None) + require = getattr(self.entry_point, 'require', None) + if resolve and require: + if verify_requirements: + LOG.debug('Verifying plugin "%s"\'s requirements.', + self.name) + require() + self._plugin = resolve() + else: + self._plugin = self.entry_point.load( + require=verify_requirements + ) + + def load_plugin(self, verify_requirements=False): + """Retrieve the plugin for this entry-point. + + This loads the plugin, stores it on the instance and then returns it. + It does not reload it after the first time, it merely returns the + cached plugin. + + :param bool verify_requirements: + Whether or not to make setuptools verify that the requirements for + the plugin are satisfied. + :returns: + Nothing + """ + if self._plugin is None: + LOG.info('Loading plugin "%s" from entry-point.', self.name) + try: + self._load(verify_requirements) + except Exception as load_exception: + LOG.exception(load_exception, exc_info=True) + failed_to_load = exceptions.FailedToLoadPlugin( + plugin=self, + exception=load_exception, + ) + LOG.critical(str(failed_to_load)) + raise failed_to_load + + def enable(self, optmanager): + """Remove plugin name from the default ignore list.""" + optmanager.remove_from_default_ignore([self.name]) + + def disable(self, optmanager): + """Add the plugin name to the default ignore list.""" + optmanager.extend_default_ignore([self.name]) + + def provide_options(self, optmanager, options, extra_args): + """Pass the parsed options and extra arguments to the plugin.""" + parse_options = getattr(self.plugin, 'parse_options', None) + if parse_options is not None: + LOG.debug('Providing options to plugin "%s".', self.name) + try: + parse_options(optmanager, options, extra_args) + except TypeError: + parse_options(options) + + if self.name in options.enable_extensions: + self.enable(optmanager) + + def register_options(self, optmanager): + """Register the plugin's command-line options on the OptionManager. + + :param optmanager: + Instantiated OptionManager to register options on. + :type optmanager: + flake8.options.manager.OptionManager + :returns: + Nothing + """ + add_options = getattr(self.plugin, 'add_options', None) + if add_options is not None: + LOG.debug( + 'Registering options from plugin "%s" on OptionManager %r', + self.name, optmanager + ) + add_options(optmanager) + + if self.off_by_default: + self.disable(optmanager) + + +class PluginManager(object): # pylint: disable=too-few-public-methods + """Find and manage plugins consistently.""" + + def __init__(self, namespace, verify_requirements=False): + """Initialize the manager. + + :param str namespace: + Namespace of the plugins to manage, e.g., 'flake8.extension'. + :param bool verify_requirements: + Whether or not to make setuptools verify that the requirements for + the plugin are satisfied. + """ + self.namespace = namespace + self.verify_requirements = verify_requirements + self.plugins = {} + self.names = [] + self._load_all_plugins() + + def _load_all_plugins(self): + LOG.info('Loading entry-points for "%s".', self.namespace) + for entry_point in pkg_resources.iter_entry_points(self.namespace): + name = entry_point.name + self.plugins[name] = Plugin(name, entry_point) + self.names.append(name) + LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name) + + def map(self, func, *args, **kwargs): + r"""Call ``func`` with the plugin and \*args and \**kwargs after. + + This yields the return value from ``func`` for each plugin. + + :param collections.Callable func: + Function to call with each plugin. Signature should at least be: + + .. code-block:: python + + def myfunc(plugin): + pass + + Any extra positional or keyword arguments specified with map will + be passed along to this function after the plugin. The plugin + passed is a :class:`~flake8.plugins.manager.Plugin`. + :param args: + Positional arguments to pass to ``func`` after each plugin. + :param kwargs: + Keyword arguments to pass to ``func`` after each plugin. + """ + for name in self.names: + yield func(self.plugins[name], *args, **kwargs) + + def versions(self): + # () -> (str, str) + """Generate the versions of plugins. + + :returns: + Tuples of the plugin_name and version + :rtype: + tuple + """ + plugins_seen = set() + for entry_point_name in self.names: + plugin = self.plugins[entry_point_name] + plugin_name = plugin.plugin_name + if plugin.plugin_name in plugins_seen: + continue + plugins_seen.add(plugin_name) + yield (plugin_name, plugin.version) + + +def version_for(plugin): + # (Plugin) -> Union[str, NoneType] + """Determine the version of a plugin by it's module. + + :param plugin: + The loaded plugin + :type plugin: + Plugin + :returns: + version string for the module + :rtype: + str + """ + module_name = plugin.plugin.__module__ + try: + module = __import__(module_name) + except ImportError: + return None + + return getattr(module, '__version__', None) + + +class PluginTypeManager(object): + """Parent class for most of the specific plugin types.""" + + namespace = None + + def __init__(self): + """Initialize the plugin type's manager.""" + self.manager = PluginManager(self.namespace) + self.plugins_loaded = False + + def __contains__(self, name): + """Check if the entry-point name is in this plugin type manager.""" + LOG.debug('Checking for "%s" in plugin type manager.', name) + return name in self.plugins + + def __getitem__(self, name): + """Retrieve a plugin by its name.""" + LOG.debug('Retrieving plugin for "%s".', name) + return self.plugins[name] + + def get(self, name, default=None): + """Retrieve the plugin referred to by ``name`` or return the default. + + :param str name: + Name of the plugin to retrieve. + :param default: + Default value to return. + :returns: + Plugin object referred to by name, if it exists. + :rtype: + :class:`Plugin` + """ + if name in self: + return self[name] + return default + + @property + def names(self): + """Proxy attribute to underlying manager.""" + return self.manager.names + + @property + def plugins(self): + """Proxy attribute to underlying manager.""" + return self.manager.plugins + + @staticmethod + def _generate_call_function(method_name, optmanager, *args, **kwargs): + def generated_function(plugin): + """Function that attempts to call a specific method on a plugin.""" + method = getattr(plugin, method_name, None) + if (method is not None and + isinstance(method, collections.Callable)): + return method(optmanager, *args, **kwargs) + return generated_function + + def load_plugins(self): + """Load all plugins of this type that are managed by this manager.""" + if self.plugins_loaded: + return + + def load_plugin(plugin): + """Call each plugin's load_plugin method.""" + return plugin.load_plugin() + + plugins = list(self.manager.map(load_plugin)) + # Do not set plugins_loaded if we run into an exception + self.plugins_loaded = True + return plugins + + def register_plugin_versions(self, optmanager): + """Register the plugins and their versions with the OptionManager.""" + self.load_plugins() + for (plugin_name, version) in self.manager.versions(): + optmanager.register_plugin(name=plugin_name, version=version) + + def register_options(self, optmanager): + """Register all of the checkers' options to the OptionManager.""" + self.load_plugins() + call_register_options = self._generate_call_function( + 'register_options', optmanager, + ) + + list(self.manager.map(call_register_options)) + + def provide_options(self, optmanager, options, extra_args): + """Provide parsed options and extra arguments to the plugins.""" + call_provide_options = self._generate_call_function( + 'provide_options', optmanager, options, extra_args, + ) + + list(self.manager.map(call_provide_options)) + + +class NotifierBuilderMixin(object): # pylint: disable=too-few-public-methods + """Mixin class that builds a Notifier from a PluginManager.""" + + def build_notifier(self): + """Build a Notifier for our Listeners. + + :returns: + Object to notify our listeners of certain error codes and + warnings. + :rtype: + :class:`~flake8.notifier.Notifier` + """ + notifier_trie = notifier.Notifier() + for name in self.names: + notifier_trie.register_listener(name, self.manager[name]) + return notifier_trie + + +class Checkers(PluginTypeManager): + """All of the checkers registered through entry-ponits.""" + + namespace = 'flake8.extension' + + def checks_expecting(self, argument_name): + """Retrieve checks that expect an argument with the specified name. + + Find all checker plugins that are expecting a specific argument. + """ + for plugin in self.plugins.values(): + if argument_name == plugin.parameters[0]: + yield plugin + + @property + def ast_plugins(self): + """List of plugins that expect the AST tree.""" + plugins = getattr(self, '_ast_plugins', []) + if not plugins: + plugins = list(self.checks_expecting('tree')) + self._ast_plugins = plugins + return plugins + + @property + def logical_line_plugins(self): + """List of plugins that expect the logical lines.""" + plugins = getattr(self, '_logical_line_plugins', []) + if not plugins: + plugins = list(self.checks_expecting('logical_line')) + self._logical_line_plugins = plugins + return plugins + + @property + def physical_line_plugins(self): + """List of plugins that expect the physical lines.""" + plugins = getattr(self, '_physical_line_plugins', []) + if not plugins: + plugins = list(self.checks_expecting('physical_line')) + self._physical_line_plugins = plugins + return plugins + + +class Listeners(PluginTypeManager, NotifierBuilderMixin): + """All of the listeners registered through entry-points.""" + + namespace = 'flake8.listen' + + +class ReportFormatters(PluginTypeManager): + """All of the report formatters registered through entry-points.""" + + namespace = 'flake8.report' diff --git a/src/flake8/plugins/notifier.py b/src/flake8/plugins/notifier.py new file mode 100644 index 0000000..dc255c4 --- /dev/null +++ b/src/flake8/plugins/notifier.py @@ -0,0 +1,46 @@ +"""Implementation of the class that registers and notifies listeners.""" +from flake8.plugins import _trie + + +class Notifier(object): + """Object that tracks and notifies listener objects.""" + + def __init__(self): + """Initialize an empty notifier object.""" + self.listeners = _trie.Trie() + + def listeners_for(self, error_code): + """Retrieve listeners for an error_code. + + There may be listeners registered for E1, E100, E101, E110, E112, and + E126. To get all the listeners for one of E100, E101, E110, E112, or + E126 you would also need to incorporate the listeners for E1 (since + they're all in the same class). + + Example usage: + + .. code-block:: python + + from flake8 import notifier + + n = notifier.Notifier() + # register listeners + for listener in n.listeners_for('W102'): + listener.notify(...) + """ + path = error_code + while path: + node = self.listeners.find(path) + listeners = getattr(node, 'data', []) + for listener in listeners: + yield listener + path = path[:-1] + + def notify(self, error_code, *args, **kwargs): + """Notify all listeners for the specified error code.""" + for listener in self.listeners_for(error_code): + listener.notify(error_code, *args, **kwargs) + + def register_listener(self, error_code, listener): + """Register a listener for a specific error_code.""" + self.listeners.add(error_code, listener) diff --git a/src/flake8/plugins/pyflakes.py b/src/flake8/plugins/pyflakes.py new file mode 100644 index 0000000..72d45fa --- /dev/null +++ b/src/flake8/plugins/pyflakes.py @@ -0,0 +1,140 @@ +"""Plugin built-in to Flake8 to treat pyflakes as a plugin.""" +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +try: + # The 'demandimport' breaks pyflakes and flake8.plugins.pyflakes + from mercurial import demandimport +except ImportError: + pass +else: + demandimport.disable() +import os + +import pyflakes +import pyflakes.checker + +from flake8 import utils + + +def patch_pyflakes(): + """Add error codes to Pyflakes messages.""" + codes = dict([line.split()[::-1] for line in ( + 'F401 UnusedImport', + 'F402 ImportShadowedByLoopVar', + 'F403 ImportStarUsed', + 'F404 LateFutureImport', + 'F810 Redefined', + 'F811 RedefinedWhileUnused', + 'F812 RedefinedInListComp', + 'F821 UndefinedName', + 'F822 UndefinedExport', + 'F823 UndefinedLocal', + 'F831 DuplicateArgument', + 'F841 UnusedVariable', + )]) + + for name, obj in vars(pyflakes.messages).items(): + if name[0].isupper() and obj.message: + obj.flake8_msg = '%s %s' % (codes.get(name, 'F999'), obj.message) +patch_pyflakes() + + +class FlakesChecker(pyflakes.checker.Checker): + """Subclass the Pyflakes checker to conform with the flake8 API.""" + + name = 'pyflakes' + version = pyflakes.__version__ + + def __init__(self, tree, filename): + """Initialize the PyFlakes plugin with an AST tree and filename.""" + filename = utils.normalize_paths(filename)[0] + with_doctest = self.with_doctest + included_by = [include for include in self.include_in_doctest + if include != '' and filename.startswith(include)] + if included_by: + with_doctest = True + + for exclude in self.exclude_from_doctest: + if exclude != '' and filename.startswith(exclude): + with_doctest = False + overlaped_by = [include for include in included_by + if include.startswith(exclude)] + + if overlaped_by: + with_doctest = True + + super(FlakesChecker, self).__init__(tree, filename, + withDoctest=with_doctest) + + @classmethod + def add_options(cls, parser): + """Register options for PyFlakes on the Flake8 OptionManager.""" + parser.add_option( + '--builtins', parse_from_config=True, comma_separated_list=True, + help="define more built-ins, comma separated", + ) + parser.add_option( + '--doctests', default=False, action='store_true', + parse_from_config=True, + help="check syntax of the doctests", + ) + parser.add_option( + '--include-in-doctest', default='', + dest='include_in_doctest', parse_from_config=True, + comma_separated_list=True, normalize_paths=True, + help='Run doctests only on these files', + type='string', + ) + parser.add_option( + '--exclude-from-doctest', default='', + dest='exclude_from_doctest', parse_from_config=True, + comma_separated_list=True, normalize_paths=True, + help='Skip these files when running doctests', + type='string', + ) + + @classmethod + def parse_options(cls, options): + """Parse option values from Flake8's OptionManager.""" + if options.builtins: + cls.builtIns = cls.builtIns.union(options.builtins) + cls.with_doctest = options.doctests + + included_files = [] + for included_file in options.include_in_doctest: + if included_file == '': + continue + if not included_file.startswith((os.sep, './', '~/')): + included_files.append('./' + included_file) + else: + included_files.append(included_file) + cls.include_in_doctest = utils.normalize_paths(included_files) + + excluded_files = [] + for excluded_file in options.exclude_from_doctest: + if excluded_file == '': + continue + if not excluded_file.startswith((os.sep, './', '~/')): + excluded_files.append('./' + excluded_file) + else: + excluded_files.append(excluded_file) + cls.exclude_from_doctest = utils.normalize_paths(excluded_files) + + inc_exc = set(cls.include_in_doctest).intersection( + cls.exclude_from_doctest + ) + if inc_exc: + raise ValueError('"%s" was specified in both the ' + 'include-in-doctest and exclude-from-doctest ' + 'options. You are not allowed to specify it in ' + 'both for doctesting.' % inc_exc) + + def run(self): + """Run the plugin.""" + for message in self.messages: + col = getattr(message, 'col', 0) + yield (message.lineno, + col, + (message.flake8_msg % message.message_args), + message.__class__) diff --git a/src/flake8/processor.py b/src/flake8/processor.py new file mode 100644 index 0000000..0c33cc2 --- /dev/null +++ b/src/flake8/processor.py @@ -0,0 +1,430 @@ +"""Module containing our file processor that tokenizes a file for checks.""" +import contextlib +import io +import logging +import re +import sys +import tokenize + +import flake8 +from flake8 import defaults +from flake8 import exceptions +from flake8 import utils + +LOG = logging.getLogger(__name__) +PyCF_ONLY_AST = 1024 +NEWLINE = frozenset([tokenize.NL, tokenize.NEWLINE]) +# Work around Python < 2.6 behaviour, which does not generate NL after +# a comment which is on a line by itself. +COMMENT_WITH_NL = tokenize.generate_tokens(['#\n'].pop).send(None)[1] == '#\n' + +SKIP_TOKENS = frozenset([tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, + tokenize.DEDENT]) + + +class FileProcessor(object): + """Processes a file and holdes state. + + This processes a file by generating tokens, logical and physical lines, + and AST trees. This also provides a way of passing state about the file + to checks expecting that state. Any public attribute on this object can + be requested by a plugin. The known public attributes are: + + - :attr:`blank_before` + - :attr:`blank_lines` + - :attr:`checker_state` + - :attr:`indect_char` + - :attr:`indent_level` + - :attr:`line_number` + - :attr:`logical_line` + - :attr:`max_line_length` + - :attr:`multiline` + - :attr:`noqa` + - :attr:`previous_indent_level` + - :attr:`previous_logical` + - :attr:`tokens` + - :attr:`total_lines` + - :attr:`verbose` + """ + + NOQA_FILE = re.compile(r'\s*# flake8[:=]\s*noqa', re.I) + + def __init__(self, filename, options, lines=None): + """Initialice our file processor. + + :param str filename: + Name of the file to process + """ + self.filename = filename + self.lines = lines + if lines is None: + self.lines = self.read_lines() + self.strip_utf_bom() + self.options = options + + # Defaults for public attributes + #: Number of preceding blank lines + self.blank_before = 0 + #: Number of blank lines + self.blank_lines = 0 + #: Checker states for each plugin? + self._checker_states = {} + #: Current checker state + self.checker_state = None + #: User provided option for hang closing + self.hang_closing = options.hang_closing + #: Character used for indentation + self.indent_char = None + #: Current level of indentation + self.indent_level = 0 + #: Line number in the file + self.line_number = 0 + #: Current logical line + self.logical_line = '' + #: Maximum line length as configured by the user + self.max_line_length = options.max_line_length + #: Whether the current physical line is multiline + self.multiline = False + #: Whether or not we're observing NoQA + self.noqa = False + #: Previous level of indentation + self.previous_indent_level = 0 + #: Previous logical line + self.previous_logical = '' + #: Current set of tokens + self.tokens = [] + #: Total number of lines in the file + self.total_lines = len(self.lines) + #: Verbosity level of Flake8 + self.verbose = options.verbose + #: Statistics dictionary + self.statistics = { + 'logical lines': 0, + } + + @contextlib.contextmanager + def inside_multiline(self, line_number): + """Context-manager to toggle the multiline attribute.""" + self.line_number = line_number + self.multiline = True + yield + self.multiline = False + + def reset_blank_before(self): + """Reset the blank_before attribute to zero.""" + self.blank_before = 0 + + def delete_first_token(self): + """Delete the first token in the list of tokens.""" + del self.tokens[0] + + def visited_new_blank_line(self): + """Note that we visited a new blank line.""" + self.blank_lines += 1 + + def update_state(self, mapping): + """Update the indent level based on the logical line mapping.""" + (start_row, start_col) = mapping[0][1] + start_line = self.lines[start_row - 1] + self.indent_level = expand_indent(start_line[:start_col]) + if self.blank_before < self.blank_lines: + self.blank_before = self.blank_lines + + def update_checker_state_for(self, plugin): + """Update the checker_state attribute for the plugin.""" + if 'checker_state' in plugin.parameters: + self.checker_state = self._checker_states.setdefault( + plugin.name, {} + ) + + def next_logical_line(self): + """Record the previous logical line. + + This also resets the tokens list and the blank_lines count. + """ + if self.logical_line: + self.previous_indent_level = self.indent_level + self.previous_logical = self.logical_line + self.blank_lines = 0 + self.tokens = [] + + def build_logical_line_tokens(self): + """Build the mapping, comments, and logical line lists.""" + logical = [] + comments = [] + length = 0 + previous_row = previous_column = mapping = None + for token_type, text, start, end, line in self.tokens: + if token_type in SKIP_TOKENS: + continue + if not mapping: + mapping = [(0, start)] + if token_type == tokenize.COMMENT: + comments.append(text) + continue + if token_type == tokenize.STRING: + text = mutate_string(text) + if previous_row: + (start_row, start_column) = start + if previous_row != start_row: + row_index = previous_row - 1 + column_index = previous_column - 1 + previous_text = self.lines[row_index][column_index] + if (previous_text == ',' or + (previous_text not in '{[(' and + text not in '}])')): + text = ' ' + text + elif previous_column != start_column: + text = line[previous_column:start_column] + text + logical.append(text) + length += len(text) + mapping.append((length, end)) + (previous_row, previous_column) = end + return comments, logical, mapping + + def build_ast(self): + """Build an abstract syntax tree from the list of lines.""" + return compile(''.join(self.lines), '', 'exec', PyCF_ONLY_AST) + + def build_logical_line(self): + """Build a logical line from the current tokens list.""" + comments, logical, mapping_list = self.build_logical_line_tokens() + self.logical_line = ''.join(logical) + self.statistics['logical lines'] += 1 + return ''.join(comments), self.logical_line, mapping_list + + def split_line(self, token): + """Split a physical line's line based on new-lines. + + This also auto-increments the line number for the caller. + """ + for line in token[1].split('\n')[:-1]: + yield line + self.line_number += 1 + + def keyword_arguments_for(self, parameters, arguments=None): + """Generate the keyword arguments for a list of parameters.""" + if arguments is None: + arguments = {} + for param in parameters: + if param in arguments: + continue + try: + arguments[param] = getattr(self, param) + except AttributeError as exc: + LOG.exception(exc) + raise + return arguments + + def check_physical_error(self, error_code, line): + """Update attributes based on error code and line.""" + if error_code == 'E101': + self.indent_char = line[0] + + def generate_tokens(self): + """Tokenize the file and yield the tokens. + + :raises flake8.exceptions.InvalidSyntax: + If a :class:`tokenize.TokenError` is raised while generating + tokens. + """ + try: + for token in tokenize.generate_tokens(self.next_line): + if token[2][0] > self.total_lines: + break + self.tokens.append(token) + yield token + # NOTE(sigmavirus24): pycodestyle was catching both a SyntaxError + # and a tokenize.TokenError. In looking a the source on Python 2 and + # Python 3, the SyntaxError should never arise from generate_tokens. + # If we were using tokenize.tokenize, we would have to catch that. Of + # course, I'm going to be unsurprised to be proven wrong at a later + # date. + except tokenize.TokenError as exc: + raise exceptions.InvalidSyntax(exc.message, exception=exc) + + def line_for(self, line_number): + """Retrieve the physical line at the specified line number.""" + return self.lines[line_number - 1] + + def next_line(self): + """Get the next line from the list.""" + if self.line_number >= self.total_lines: + return '' + line = self.lines[self.line_number] + self.line_number += 1 + if self.indent_char is None and line[:1] in defaults.WHITESPACE: + self.indent_char = line[0] + return line + + def read_lines(self): + # type: () -> List[str] + """Read the lines for this file checker.""" + if self.filename is None or self.filename == '-': + self.filename = 'stdin' + return self.read_lines_from_stdin() + return self.read_lines_from_filename() + + def _readlines_py2(self): + # type: () -> List[str] + with open(self.filename, 'rU') as fd: + return fd.readlines() + + def _readlines_py3(self): + # type: () -> List[str] + try: + with open(self.filename, 'rb') as fd: + (coding, lines) = tokenize.detect_encoding(fd.readline) + textfd = io.TextIOWrapper(fd, coding, line_buffering=True) + return ([l.decode(coding) for l in lines] + + textfd.readlines()) + except (LookupError, SyntaxError, UnicodeError): + # If we can't detect the codec with tokenize.detect_encoding, or + # the detected encoding is incorrect, just fallback to latin-1. + with open(self.filename, encoding='latin-1') as fd: + return fd.readlines() + + def read_lines_from_filename(self): + # type: () -> List[str] + """Read the lines for a file.""" + if (2, 6) <= sys.version_info < (3, 0): + readlines = self._readlines_py2 + elif (3, 0) <= sys.version_info < (4, 0): + readlines = self._readlines_py3 + return readlines() + + def read_lines_from_stdin(self): + # type: () -> List[str] + """Read the lines from standard in.""" + return utils.stdin_get_value().splitlines(True) + + def should_ignore_file(self): + # type: () -> bool + """Check if ``# flake8: noqa`` is in the file to be ignored. + + :returns: + True if a line matches :attr:`FileProcessor.NOQA_FILE`, + otherwise False + :rtype: + bool + """ + ignore_file = self.NOQA_FILE.search + return any(ignore_file(line) for line in self.lines) + + def strip_utf_bom(self): + # type: () -> NoneType + """Strip the UTF bom from the lines of the file.""" + if not self.lines: + # If we have nothing to analyze quit early + return + + first_byte = ord(self.lines[0][0]) + if first_byte not in (0xEF, 0xFEFF): + return + + # If the first byte of the file is a UTF-8 BOM, strip it + if first_byte == 0xFEFF: + self.lines[0] = self.lines[0][1:] + elif self.lines[0][:3] == '\xEF\xBB\xBF': + self.lines[0] = self.lines[0][3:] + + +def is_eol_token(token): + """Check if the token is an end-of-line token.""" + return token[0] in NEWLINE or token[4][token[3][1]:].lstrip() == '\\\n' + +if COMMENT_WITH_NL: # If on Python 2.6 + def is_eol_token(token, _is_eol_token=is_eol_token): + """Check if the token is an end-of-line token.""" + return (_is_eol_token(token) or + (token[0] == tokenize.COMMENT and token[1] == token[4])) + + +def is_multiline_string(token): + """Check if this is a multiline string.""" + return token[0] == tokenize.STRING and '\n' in token[1] + + +def token_is_newline(token): + """Check if the token type is a newline token type.""" + return token[0] in NEWLINE + + +def token_is_comment(token): + """Check if the token type is a comment.""" + return COMMENT_WITH_NL and token[0] == tokenize.COMMENT + + +def count_parentheses(current_parentheses_count, token_text): + """Count the number of parentheses.""" + current_parentheses_count = current_parentheses_count or 0 + if token_text in '([{': + return current_parentheses_count + 1 + elif token_text in '}])': + return current_parentheses_count - 1 + return current_parentheses_count + + +def log_token(log, token): + """Log a token to a provided logging object.""" + if token[2][0] == token[3][0]: + pos = '[%s:%s]' % (token[2][1] or '', token[3][1]) + else: + pos = 'l.%s' % token[3][0] + log.log(flake8._EXTRA_VERBOSE, 'l.%s\t%s\t%s\t%r' % + (token[2][0], pos, tokenize.tok_name[token[0]], + token[1])) + + +# NOTE(sigmavirus24): This was taken wholesale from +# https://github.com/PyCQA/pycodestyle +def expand_indent(line): + r"""Return the amount of indentation. + + Tabs are expanded to the next multiple of 8. + + >>> expand_indent(' ') + 4 + >>> expand_indent('\t') + 8 + >>> expand_indent(' \t') + 8 + >>> expand_indent(' \t') + 16 + """ + if '\t' not in line: + return len(line) - len(line.lstrip()) + result = 0 + for char in line: + if char == '\t': + result = result // 8 * 8 + 8 + elif char == ' ': + result += 1 + else: + break + return result + + +# NOTE(sigmavirus24): This was taken wholesale from +# https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be +# more descriptive. +def mutate_string(text): + """Replace contents with 'xxx' to prevent syntax matching. + + >>> mute_string('"abc"') + '"xxx"' + >>> mute_string("'''abc'''") + "'''xxx'''" + >>> mute_string("r'abc'") + "r'xxx'" + """ + # NOTE(sigmavirus24): If there are string modifiers (e.g., b, u, r) + # use the last "character" to determine if we're using single or double + # quotes and then find the first instance of it + start = text.index(text[-1]) + 1 + end = len(text) - 1 + # Check for triple-quoted strings + if text[-3:] in ('"""', "'''"): + start += 2 + end -= 2 + return text[:start] + 'x' * (end - start) + text[end:] diff --git a/src/flake8/style_guide.py b/src/flake8/style_guide.py new file mode 100644 index 0000000..89890ba --- /dev/null +++ b/src/flake8/style_guide.py @@ -0,0 +1,283 @@ +"""Implementation of the StyleGuide used by Flake8.""" +import collections +import enum +import linecache +import logging +import re + +from flake8 import utils + +__all__ = ( + 'StyleGuide', +) + +LOG = logging.getLogger(__name__) + + +# TODO(sigmavirus24): Determine if we need to use enum/enum34 +class Selected(enum.Enum): + """Enum representing an explicitly or implicitly selected code.""" + + Explicitly = 'explicitly selected' + Implicitly = 'implicitly selected' + + +class Ignored(enum.Enum): + """Enum representing an explicitly or implicitly ignored code.""" + + Explicitly = 'explicitly ignored' + Implicitly = 'implicitly ignored' + + +class Decision(enum.Enum): + """Enum representing whether a code should be ignored or selected.""" + + Ignored = 'ignored error' + Selected = 'selected error' + + +Error = collections.namedtuple( + 'Error', + [ + 'code', + 'filename', + 'line_number', + 'column_number', + 'text', + 'physical_line', + ], +) + + +class StyleGuide(object): + """Manage a Flake8 user's style guide.""" + + NOQA_INLINE_REGEXP = re.compile( + # We're looking for items that look like this: + # ``# noqa`` + # ``# noqa: E123`` + # ``# noqa: E123,W451,F921`` + # ``# NoQA: E123,W451,F921`` + # ``# NOQA: E123,W451,F921`` + # We do not care about the ``: `` that follows ``noqa`` + # We do not care about the casing of ``noqa`` + # We want a comma-separated list of errors + '# noqa(?:: )?(?P<codes>[A-Z0-9,]+)?$', + re.IGNORECASE + ) + + def __init__(self, options, listener_trie, formatter): + """Initialize our StyleGuide. + + .. todo:: Add parameter documentation. + """ + self.options = options + self.listener = listener_trie + self.formatter = formatter + self._selected = tuple(options.select) + self._ignored = tuple(options.ignore) + self._decision_cache = {} + self._parsed_diff = {} + + def is_user_selected(self, code): + # type: (str) -> Union[Selected, Ignored] + """Determine if the code has been selected by the user. + + :param str code: + The code for the check that has been run. + :returns: + Selected.Implicitly if the selected list is empty, + Selected.Explicitly if the selected list is not empty and a match + was found, + Ignored.Implicitly if the selected list is not empty but no match + was found. + """ + if not self._selected: + return Selected.Implicitly + + if code.startswith(self._selected): + return Selected.Explicitly + + return Ignored.Implicitly + + def is_user_ignored(self, code): + # type: (str) -> Union[Selected, Ignored] + """Determine if the code has been ignored by the user. + + :param str code: + The code for the check that has been run. + :returns: + Selected.Implicitly if the ignored list is empty, + Ignored.Explicitly if the ignored list is not empty and a match was + found, + Selected.Implicitly if the ignored list is not empty but no match + was found. + """ + if self._ignored and code.startswith(self._ignored): + return Ignored.Explicitly + + return Selected.Implicitly + + def _decision_for(self, code): + # type: (Error) -> Decision + startswith = code.startswith + selected = sorted([s for s in self._selected if startswith(s)])[0] + ignored = sorted([i for i in self._ignored if startswith(i)])[0] + + if selected.startswith(ignored): + return Decision.Selected + return Decision.Ignored + + def should_report_error(self, code): + # type: (str) -> Decision + """Determine if the error code should be reported or ignored. + + This method only cares about the select and ignore rules as specified + by the user in their configuration files and command-line flags. + + This method does not look at whether the specific line is being + ignored in the file itself. + + :param str code: + The code for the check that has been run. + """ + decision = self._decision_cache.get(code) + if decision is None: + LOG.debug('Deciding if "%s" should be reported', code) + selected = self.is_user_selected(code) + ignored = self.is_user_ignored(code) + LOG.debug('The user configured "%s" to be "%s", "%s"', + code, selected, ignored) + + if ((selected is Selected.Explicitly or + selected is Selected.Implicitly) and + ignored is Selected.Implicitly): + decision = Decision.Selected + elif (selected is Selected.Explicitly and + ignored is Ignored.Explicitly): + decision = self._decision_for(code) + elif (selected is Ignored.Implicitly or + ignored is Ignored.Explicitly): + decision = Decision.Ignored # pylint: disable=R0204 + + self._decision_cache[code] = decision + LOG.debug('"%s" will be "%s"', code, decision) + return decision + + def is_inline_ignored(self, error): + # type: (Error) -> bool + """Determine if an comment has been added to ignore this line.""" + physical_line = error.physical_line + # TODO(sigmavirus24): Determine how to handle stdin with linecache + if self.options.disable_noqa: + return False + + if physical_line is None: + physical_line = linecache.getline(error.filename, + error.line_number) + noqa_match = self.NOQA_INLINE_REGEXP.search(physical_line) + if noqa_match is None: + LOG.debug('%r is not inline ignored', error) + return False + + codes_str = noqa_match.groupdict()['codes'] + if codes_str is None: + LOG.debug('%r is ignored by a blanket ``# noqa``', error) + return True + + codes = set(utils.parse_comma_separated_list(codes_str)) + if error.code in codes or error.code.startswith(tuple(codes)): + LOG.debug('%r is ignored specifically inline with ``# noqa: %s``', + error, codes_str) + return True + + LOG.debug('%r is not ignored inline with ``# noqa: %s``', + error, codes_str) + return False + + def is_in_diff(self, error): + # type: (Error) -> bool + """Determine if an error is included in a diff's line ranges. + + This function relies on the parsed data added via + :meth:`~StyleGuide.add_diff_ranges`. If that has not been called and + we are not evaluating files in a diff, then this will always return + True. If there are diff ranges, then this will return True if the + line number in the error falls inside one of the ranges for the file + (and assuming the file is part of the diff data). If there are diff + ranges, this will return False if the file is not part of the diff + data or the line number of the error is not in any of the ranges of + the diff. + + :returns: + True if there is no diff or if the error is in the diff's line + number ranges. False if the error's line number falls outside + the diff's line number ranges. + :rtype: + bool + """ + if not self._parsed_diff: + return True + + # NOTE(sigmavirus24): The parsed diff will be a defaultdict with + # a set as the default value (if we have received it from + # flake8.utils.parse_unified_diff). In that case ranges below + # could be an empty set (which is False-y) or if someone else + # is using this API, it could be None. If we could guarantee one + # or the other, we would check for it more explicitly. + line_numbers = self._parsed_diff.get(error.filename) + if not line_numbers: + return False + + return error.line_number in line_numbers + + def handle_error(self, code, filename, line_number, column_number, text, + physical_line=None): + # type: (str, str, int, int, str) -> int + """Handle an error reported by a check. + + :param str code: + The error code found, e.g., E123. + :param str filename: + The file in which the error was found. + :param int line_number: + The line number (where counting starts at 1) at which the error + occurs. + :param int column_number: + The column number (where counting starts at 1) at which the error + occurs. + :param str text: + The text of the error message. + :param str physical_line: + The actual physical line causing the error. + :returns: + 1 if the error was reported. 0 if it was ignored. This is to allow + for counting of the number of errors found that were not ignored. + :rtype: + int + """ + error = Error(code, filename, line_number, column_number, text, + physical_line) + if error.filename is None or error.filename == '-': + error = error._replace(filename=self.options.stdin_display_name) + error_is_selected = (self.should_report_error(error.code) is + Decision.Selected) + is_not_inline_ignored = self.is_inline_ignored(error) is False + is_included_in_diff = self.is_in_diff(error) + if (error_is_selected and is_not_inline_ignored and + is_included_in_diff): + self.formatter.handle(error) + self.listener.notify(error.code, error) + return 1 + return 0 + + def add_diff_ranges(self, diffinfo): + """Update the StyleGuide to filter out information not in the diff. + + This provides information to the StyleGuide so that only the errors + in the line number ranges are reported. + + :param dict diffinfo: + Dictionary mapping filenames to sets of line number ranges. + """ + self._parsed_diff = diffinfo diff --git a/src/flake8/utils.py b/src/flake8/utils.py new file mode 100644 index 0000000..597dea6 --- /dev/null +++ b/src/flake8/utils.py @@ -0,0 +1,279 @@ +"""Utility methods for flake8.""" +import collections +import fnmatch as _fnmatch +import inspect +import io +import os +import re +import sys + +DIFF_HUNK_REGEXP = re.compile(r'^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$') + + +def parse_comma_separated_list(value): + # type: (Union[Sequence[str], str]) -> List[str] + """Parse a comma-separated list. + + :param value: + String or list of strings to be parsed and normalized. + :returns: + List of values with whitespace stripped. + :rtype: + list + """ + if not value: + return [] + + if not isinstance(value, (list, tuple)): + value = value.split(',') + + return [item.strip() for item in value] + + +def normalize_paths(paths, parent=os.curdir): + # type: (Union[Sequence[str], str], str) -> List[str] + """Parse a comma-separated list of paths. + + :returns: + The normalized paths. + :rtype: + [str] + """ + return [normalize_path(p, parent) + for p in parse_comma_separated_list(paths)] + + +def normalize_path(path, parent=os.curdir): + # type: (str, str) -> str + """Normalize a single-path. + + :returns: + The normalized path. + :rtype: + str + """ + # NOTE(sigmavirus24): Using os.path.sep allows for Windows paths to + # be specified and work appropriately. + separator = os.path.sep + if separator in path: + path = os.path.abspath(os.path.join(parent, path)) + return path.rstrip(separator) + + +def stdin_get_value(): + # type: () -> str + """Get and cache it so plugins can use it.""" + cached_value = getattr(stdin_get_value, 'cached_stdin', None) + if cached_value is None: + stdin_value = sys.stdin.read() + if sys.version_info < (3, 0): + cached_type = io.BytesIO + else: + cached_type = io.StringIO + stdin_get_value.cached_stdin = cached_type(stdin_value) + cached_value = stdin_get_value.cached_stdin + return cached_value.getvalue() + + +def parse_unified_diff(diff=None): + # type: (str) -> List[str] + """Parse the unified diff passed on stdin. + + :returns: + dictionary mapping file names to sets of line numbers + :rtype: + dict + """ + # Allow us to not have to patch out stdin_get_value + if diff is None: + diff = stdin_get_value() + + number_of_rows = None + current_path = None + parsed_paths = collections.defaultdict(set) + for line in diff.splitlines(): + if number_of_rows: + # NOTE(sigmavirus24): Below we use a slice because stdin may be + # bytes instead of text on Python 3. + if line[:1] != '-': + number_of_rows -= 1 + # We're in the part of the diff that has lines starting with +, -, + # and ' ' to show context and the changes made. We skip these + # because the information we care about is the filename and the + # range within it. + # When number_of_rows reaches 0, we will once again start + # searching for filenames and ranges. + continue + + # NOTE(sigmavirus24): Diffs that we support look roughly like: + # diff a/file.py b/file.py + # ... + # --- a/file.py + # +++ b/file.py + # Below we're looking for that last line. Every diff tool that + # gives us this output may have additional information after + # ``b/file.py`` which it will separate with a \t, e.g., + # +++ b/file.py\t100644 + # Which is an example that has the new file permissions/mode. + # In this case we only care about the file name. + if line[:3] == '+++': + current_path = line[4:].split('\t', 1)[0] + # NOTE(sigmavirus24): This check is for diff output from git. + if current_path[:2] == 'b/': + current_path = current_path[2:] + # We don't need to do anything else. We have set up our local + # ``current_path`` variable. We can skip the rest of this loop. + # The next line we will see will give us the hung information + # which is in the next section of logic. + continue + + hunk_match = DIFF_HUNK_REGEXP.match(line) + # NOTE(sigmavirus24): pep8/pycodestyle check for: + # line[:3] == '@@ ' + # But the DIFF_HUNK_REGEXP enforces that the line start with that + # So we can more simply check for a match instead of slicing and + # comparing. + if hunk_match: + (row, number_of_rows) = [ + 1 if not group else int(group) + for group in hunk_match.groups() + ] + parsed_paths[current_path].update( + range(row, row + number_of_rows) + ) + + # We have now parsed our diff into a dictionary that looks like: + # {'file.py': set(range(10, 16), range(18, 20)), ...} + return parsed_paths + + +def is_windows(): + # type: () -> bool + """Determine if we're running on Windows. + + :returns: + True if running on Windows, otherwise False + :rtype: + bool + """ + return os.name == 'nt' + + +def can_run_multiprocessing_on_windows(): + # type: () -> bool + """Determine if we can use multiprocessing on Windows. + + :returns: + True if the version of Python is modern enough, otherwise False + :rtype: + bool + """ + is_new_enough_python27 = sys.version_info >= (2, 7, 11) + is_new_enough_python3 = sys.version_info > (3, 2) + return is_new_enough_python27 or is_new_enough_python3 + + +def is_using_stdin(paths): + # type: (List[str]) -> bool + """Determine if we're going to read from stdin. + + :param list paths: + The paths that we're going to check. + :returns: + True if stdin (-) is in the path, otherwise False + :rtype: + bool + """ + return '-' in paths + + +def _default_predicate(*args): + return False + + +def filenames_from(arg, predicate=None): + # type: (str, callable) -> Generator + """Generate filenames from an argument. + + :param str arg: + Parameter from the command-line. + :param callable predicate: + Predicate to use to filter out filenames. If the predicate + returns ``True`` we will exclude the filename, otherwise we + will yield it. By default, we include every filename + generated. + :returns: + Generator of paths + """ + if predicate is None: + predicate = _default_predicate + if os.path.isdir(arg): + for root, sub_directories, files in os.walk(arg): + for filename in files: + joined = os.path.join(root, filename) + if predicate(joined): + continue + yield joined + # NOTE(sigmavirus24): os.walk() will skip a directory if you + # remove it from the list of sub-directories. + for directory in sub_directories: + if predicate(directory): + sub_directories.remove(directory) + else: + yield arg + + +def fnmatch(filename, patterns, default=True): + # type: (str, List[str], bool) -> bool + """Wrap :func:`fnmatch.fnmatch` to add some functionality. + + :param str filename: + Name of the file we're trying to match. + :param list patterns: + Patterns we're using to try to match the filename. + :param bool default: + The default value if patterns is empty + :returns: + True if a pattern matches the filename, False if it doesn't. + ``default`` if patterns is empty. + """ + if not patterns: + return default + return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns) + + +def parameters_for(plugin): + # type: (flake8.plugins.manager.Plugin) -> List[str] + """Return the parameters for the plugin. + + This will inspect the plugin and return either the function parameters + if the plugin is a function or the parameters for ``__init__`` after + ``self`` if the plugin is a class. + + :param plugin: + The internal plugin object. + :type plugin: + flake8.plugins.manager.Plugin + :returns: + Parameters to the plugin. + :rtype: + list(str) + """ + func = plugin.plugin + is_class = not inspect.isfunction(func) + if is_class: # The plugin is a class + func = plugin.plugin.__init__ + + if sys.version_info < (3, 3): + parameters = inspect.getargspec(func)[0] + else: + parameters = [ + parameter.name + for parameter in inspect.signature(func).parameters.values() + if parameter.kind == parameter.POSITIONAL_OR_KEYWORD + ] + + if is_class: + parameters.remove('self') + + return parameters |
