#!/usr/bin/env python # encoding: UTF-8 # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # SvnWcSub - Subscribe to a SvnPubSub stream, and keep a set of working copy # paths in sync # # Example: # svnwcsub.py svnwcsub.conf # # On startup svnwcsub checks the working copy's path, runs a single svn update # and then watches for changes to that path. # # See svnwcsub.conf for more information on its contents. # # TODO: # - bulk update at startup time to avoid backlog warnings # - fold BDEC into Daemon # - fold WorkingCopy._get_match() into __init__ # - remove wc_ready(). assume all WorkingCopy instances are usable. # place the instances into .watch at creation. the .update_applies() # just returns if the wc is disabled (eg. could not find wc dir) # - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER # (a base path exclusion list should work for the ASF) # - add support for SIGHUP to reread the config and reinitialize working copies # - joes will write documentation for svnpubsub as these items become fulfilled # - make LOGLEVEL configurable import errno import subprocess import threading import sys import os import re import posixpath try: import ConfigParser except ImportError: import configparser as ConfigParser import time import logging.handlers try: import Queue except ImportError: import queue as Queue import optparse import functools try: import urlparse except ImportError: import urllib.parse as urlparse import daemonize import svnpubsub.client import svnpubsub.util assert hasattr(subprocess, 'check_call') def check_call(*args, **kwds): """Wrapper around subprocess.check_call() that logs stderr upon failure, with an optional list of exit codes to consider non-failure.""" assert 'stderr' not in kwds if '__okayexits' in kwds: __okayexits = kwds['__okayexits'] del kwds['__okayexits'] else: __okayexits = set([0]) # EXIT_SUCCESS kwds.update(stderr=subprocess.PIPE) pipe = subprocess.Popen(*args, **kwds) output, errput = pipe.communicate() if pipe.returncode not in __okayexits: cmd = args[0] if len(args) else kwds.get('args', '(no command)') # TODO: log stdout too? logging.error('Command failed: returncode=%d command=%r stderr=%r', pipe.returncode, cmd, errput) raise subprocess.CalledProcessError(pipe.returncode, args) return pipe.returncode # is EXIT_OK ### note: this runs synchronously. within the current Twisted environment, ### it is called from ._get_match() which is run on a thread so it won't ### block the Twisted main loop. def svn_info(svnbin, env, path): "Run 'svn info' on the target path, returning a dict of info data." args = [svnbin, "info", "--non-interactive", "--", path] output = svnpubsub.util.check_output(args, env=env).strip() info = { } for line in output.split('\n'): idx = line.index(':') info[line[:idx]] = line[idx+1:].strip() return info try: import glob glob.iglob def is_emptydir(path): # ### If the directory contains only dotfile children, this will readdir() # ### the entire directory. But os.readdir() is not exposed to us... for x in glob.iglob('%s/*' % path): return False for x in glob.iglob('%s/.*' % path): return False return True except (ImportError, AttributeError): # Python ≤2.4 def is_emptydir(path): # This will read the entire directory list to memory. return not os.listdir(path) class WorkingCopy(object): def __init__(self, bdec, path, url): self.path = path self.url = url try: self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env) bdec.wc_ready(self) except: logging.exception('problem with working copy: %s', path) def update_applies(self, uuid, path): if self.uuid != uuid: return False path = str(path) if path == self.match: #print "ua: Simple match" # easy case. woo. return True if len(path) < len(self.match): # path is potentially a parent directory of match? #print "ua: parent check" if self.match[0:len(path)] == path: return True if len(path) > len(self.match): # path is potentially a sub directory of match #print "ua: sub dir check" if path[0:len(self.match)] == self.match: return True return False def _get_match(self, svnbin, env): ### quick little hack to auto-checkout missing working copies dotsvn = os.path.join(self.path, ".svn") if not os.path.isdir(dotsvn) or is_emptydir(dotsvn): logging.info("autopopulate %s from %s" % (self.path, self.url)) check_call([svnbin, 'co', '-q', '--force', '--non-interactive', '--config-option', 'config:miscellany:use-commit-times=on', '--', self.url, self.path], env=env) # Fetch the info for matching dirs_changed against this WC info = svn_info(svnbin, env, self.path) root = info['Repository Root'] url = info['URL'] relpath = url[len(root):] # also has leading '/' uuid = info['Repository UUID'] return str(relpath), uuid PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/") class BigDoEverythingClasss(object): def __init__(self, config): self.svnbin = config.get_value('svnbin') self.env = config.get_env() self.tracking = config.get_track() self.hook = config.get_optional_value('hook') self.streams = config.get_value('streams').split() self.worker = BackgroundWorker(self.svnbin, self.env, self.hook) self.watch = [ ] def start(self): for path, url in self.tracking.items(): # working copies auto-register with the BDEC when they are ready. WorkingCopy(self, path, url) def wc_ready(self, wc): # called when a working copy object has its basic info/url, # Add it to our watchers, and trigger an svn update. logging.info("Watching WC at %s <-> %s" % (wc.path, wc.url)) self.watch.append(wc) self.worker.add_work(OP_BOOT, wc) def _normalize_path(self, path): if path[0] != '/': return "/" + path return posixpath.abspath(path) def commit(self, url, commit): if commit.type != 'svn' or commit.format != 1: logging.info("SKIP unknown commit format (%s.%d)", commit.type, commit.format) return logging.info("COMMIT r%d (%d paths) from %s" % (commit.id, len(commit.changed), url)) paths = map(self._normalize_path, commit.changed) if len(paths): pre = posixpath.commonprefix(paths) if pre == "/websites/": # special case for svnmucc "dynamic content" buildbot commits # just take the first production path to avoid updating all cms working copies for p in paths: m = PRODUCTION_RE_FILTER.match(p) if m: pre = m.group(0) break #print "Common Prefix: %s" % (pre) wcs = [wc for wc in self.watch if wc.update_applies(commit.repository, pre)] logging.info("Updating %d WC for r%d" % (len(wcs), commit.id)) for wc in wcs: self.worker.add_work(OP_UPDATE, wc) # Start logging warnings if the work backlog reaches this many items BACKLOG_TOO_HIGH = 20 OP_BOOT = 'boot' OP_UPDATE = 'update' OP_CLEANUP = 'cleanup' class BackgroundWorker(threading.Thread): def __init__(self, svnbin, env, hook): threading.Thread.__init__(self) # The main thread/process should not wait for this thread to exit. ### compat with Python 2.5 self.setDaemon(True) self.svnbin = svnbin self.env = env self.hook = hook self.q = Queue.Queue() self.has_started = False def run(self): while True: # This will block until something arrives operation, wc = self.q.get() # Warn if the queue is too long. # (Note: the other thread might have added entries to self.q # after the .get() and before the .qsize().) qsize = self.q.qsize()+1 if operation != OP_BOOT and qsize > BACKLOG_TOO_HIGH: logging.warn('worker backlog is at %d', qsize) try: if operation == OP_UPDATE: self._update(wc) elif operation == OP_BOOT: self._update(wc, boot=True) elif operation == OP_CLEANUP: self._cleanup(wc) else: logging.critical('unknown operation: %s', operation) except: logging.exception('exception in worker') # In case we ever want to .join() against the work queue self.q.task_done() def add_work(self, operation, wc): # Start the thread when work first arrives. Thread-start needs to # be delayed in case the process forks itself to become a daemon. if not self.has_started: self.start() self.has_started = True self.q.put((operation, wc)) def _update(self, wc, boot=False): "Update the specified working copy." # For giggles, let's clean up the working copy in case something # happened earlier. self._cleanup(wc) logging.info("updating: %s", wc.path) ## Run the hook HEAD = svn_info(self.svnbin, self.env, wc.url)['Revision'] if self.hook: hook_mode = ['pre-update', 'pre-boot'][boot] logging.info('running hook: %s at %s', wc.path, hook_mode) args = [self.hook, hook_mode, wc.path, HEAD, wc.url] rc = check_call(args, env=self.env, __okayexits=[0, 1]) if rc == 1: # TODO: log stderr logging.warn('hook denied update of %s at %s', wc.path, hook_mode) return del rc ### we need to move some of these args into the config. these are ### still specific to the ASF setup. args = [self.svnbin, 'switch', '--quiet', '--non-interactive', '--trust-server-cert', '--ignore-externals', '--config-option', 'config:miscellany:use-commit-times=on', '--', wc.url + '@' + HEAD, wc.path] check_call(args, env=self.env) ### check the loglevel before running 'svn info'? info = svn_info(self.svnbin, self.env, wc.path) assert info['Revision'] == HEAD logging.info("updated: %s now at r%s", wc.path, info['Revision']) ## Run the hook if self.hook: hook_mode = ['post-update', 'boot'][boot] logging.info('running hook: %s at revision %s due to %s', wc.path, info['Revision'], hook_mode) args = [self.hook, hook_mode, wc.path, info['Revision'], wc.url] check_call(args, env=self.env) def _cleanup(self, wc): "Run a cleanup on the specified working copy." ### we need to move some of these args into the config. these are ### still specific to the ASF setup. args = [self.svnbin, 'cleanup', '--non-interactive', '--trust-server-cert', '--config-option', 'config:miscellany:use-commit-times=on', wc.path] check_call(args, env=self.env) class ReloadableConfig(ConfigParser.SafeConfigParser): def __init__(self, fname): ConfigParser.SafeConfigParser.__init__(self) self.fname = fname self.read(fname) ### install a signal handler to set SHOULD_RELOAD. BDEC should ### poll this flag, and then adjust its internal structures after ### the reload. self.should_reload = False def reload(self): # Delete everything. Just re-reading would overlay, and would not # remove sections/options. Note that [DEFAULT] will not be removed. for section in self.sections(): self.remove_section(section) # Now re-read the configuration file. self.read(fname) def get_value(self, which): return self.get(ConfigParser.DEFAULTSECT, which) def get_optional_value(self, which, default=None): if self.has_option(ConfigParser.DEFAULTSECT, which): return self.get(ConfigParser.DEFAULTSECT, which) else: return default def get_env(self): env = os.environ.copy() default_options = self.defaults().keys() for name, value in self.items('env'): if name not in default_options: env[name] = value return env def get_track(self): "Return the {PATH: URL} dictionary of working copies to track." track = dict(self.items('track')) for name in self.defaults().keys(): del track[name] return track def optionxform(self, option): # Do not lowercase the option name. return str(option) class Daemon(daemonize.Daemon): def __init__(self, logfile, pidfile, umask, bdec): daemonize.Daemon.__init__(self, logfile, pidfile) self.umask = umask self.bdec = bdec def setup(self): # There is no setup which the parent needs to wait for. pass def run(self): logging.info('svnwcsub started, pid=%d', os.getpid()) # Set the umask in the daemon process. Defaults to 000 for # daemonized processes. Foreground processes simply inherit # the value from the parent process. if self.umask is not None: umask = int(self.umask, 8) os.umask(umask) logging.info('umask set to %03o', umask) # Start the BDEC (on the main thread), then start the client self.bdec.start() mc = svnpubsub.client.MultiClient(self.bdec.streams, self.bdec.commit, self._event) mc.run_forever() def _event(self, url, event_name, event_arg): if event_name == 'error': logging.exception('from %s', url) elif event_name == 'ping': logging.debug('ping from %s', url) else: logging.info('"%s" from %s', event_name, url) def prepare_logging(logfile): "Log to the specified file, or to stdout if None." if logfile: # Rotate logs daily, keeping 7 days worth. handler = logging.handlers.TimedRotatingFileHandler( logfile, when='midnight', backupCount=7, ) else: handler = logging.StreamHandler(sys.stdout) # Add a timestamp to the log records formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) # Apply the handler to the root logger root = logging.getLogger() root.addHandler(handler) ### use logging.INFO for now. switch to cmdline option or a config? root.setLevel(logging.INFO) def handle_options(options): # Set up the logging, then process the rest of the options. prepare_logging(options.logfile) # In daemon mode, we let the daemonize module handle the pidfile. # Otherwise, we should write this (foreground) PID into the file. if options.pidfile and not options.daemon: pid = os.getpid() # Be wary of symlink attacks try: os.remove(options.pidfile) except OSError: pass fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0444) os.write(fd, '%d\n' % pid) os.close(fd) logging.info('pid %d written to %s', pid, options.pidfile) if options.gid: try: gid = int(options.gid) except ValueError: import grp gid = grp.getgrnam(options.gid)[2] logging.info('setting gid %d', gid) os.setgid(gid) if options.uid: try: uid = int(options.uid) except ValueError: import pwd uid = pwd.getpwnam(options.uid)[2] logging.info('setting uid %d', uid) os.setuid(uid) def main(args): parser = optparse.OptionParser( description='An SvnPubSub client to keep working copies synchronized ' 'with a repository.', usage='Usage: %prog [options] CONFIG_FILE', ) parser.add_option('--logfile', help='filename for logging') parser.add_option('--pidfile', help="the process' PID will be written to this file") parser.add_option('--uid', help='switch to this UID before running') parser.add_option('--gid', help='switch to this GID before running') parser.add_option('--umask', help='set this (octal) umask before running') parser.add_option('--daemon', action='store_true', help='run as a background daemon') options, extra = parser.parse_args(args) if len(extra) != 1: parser.error('CONFIG_FILE is required') config_file = extra[0] if options.daemon and not options.logfile: parser.error('LOGFILE is required when running as a daemon') if options.daemon and not options.pidfile: parser.error('PIDFILE is required when running as a daemon') # Process any provided options. handle_options(options) c = ReloadableConfig(config_file) bdec = BigDoEverythingClasss(c) # We manage the logfile ourselves (along with possible rotation). The # daemon process can just drop stdout/stderr into /dev/null. d = Daemon('/dev/null', os.path.abspath(options.pidfile), options.umask, bdec) if options.daemon: # Daemonize the process and call sys.exit() with appropriate code d.daemonize_exit() else: # Just run in the foreground (the default) d.foreground() if __name__ == "__main__": main(sys.argv[1:])