Coverage for cogapp/cogapp.py: 50.13%

Shortcuts on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

510 statements  

1# coding: utf8 

2""" Cog content generation tool. 

3 http://nedbatchelder.com/code/cog 

4 

5 Copyright 2004-2021, Ned Batchelder. 

6""" 

7 

8from __future__ import absolute_import, print_function 

9 

10import copy 

11import getopt 

12import glob 

13import hashlib 

14import linecache 

15import os 

16import re 

17import shlex 

18import sys 

19import traceback 

20 

21from .backward import PY3, StringIO, string_types, to_bytes 

22 

23__all__ = ['Cog', 'CogUsageError', 'main'] 

24 

25__version__ = '3.3.0' 

26 

27usage = """\ 

28cog - generate content with inlined Python code. 

29 

30cog [OPTIONS] [INFILE | @FILELIST] ... 

31 

32INFILE is the name of an input file, '-' will read from stdin. 

33FILELIST is the name of a text file containing file names or 

34 other @FILELISTs. 

35 

36OPTIONS: 

37 -c Checksum the output to protect it against accidental change. 

38 -d Delete the generator code from the output file. 

39 -D name=val Define a global string available to your generator code. 

40 -e Warn if a file has no cog code in it. 

41 -I PATH Add PATH to the list of directories for data files and modules. 

42 -n ENCODING Use ENCODING when reading and writing files. 

43 -o OUTNAME Write the output to OUTNAME. 

44 -p PROLOGUE Prepend the generator source with PROLOGUE. Useful to insert an 

45 import line. Example: -p "import math" 

46 -P Use print() instead of cog.outl() for code output. 

47 -r Replace the input file with the output. 

48 -s STRING Suffix all generated output lines with STRING. 

49 -U Write the output with Unix newlines (only LF line-endings). 

50 -w CMD Use CMD if the output file needs to be made writable. 

51 A %s in the CMD will be filled with the filename. 

52 -x Excise all the generated output without running the generators. 

53 -z The end-output marker can be omitted, and is assumed at eof. 

54 -v Print the version of cog and exit. 

55 --check Check that the files would not change if run again. 

56 --markers='START END END-OUTPUT' 

57 The patterns surrounding cog inline instructions. Should 

58 include three values separated by spaces, the start, end, 

59 and end-output markers. Defaults to '[[[cog ]]] [[[end]]]'. 

60 --verbosity=VERBOSITY 

61 Control the amount of output. 2 (the default) lists all files, 

62 1 lists only changed files, 0 lists no files. 

63 -h Print this help. 

64""" 

65 

66# Other package modules 

67from .whiteutils import * 

68 

69class CogError(Exception): 

70 """ Any exception raised by Cog. 

71 """ 

72 def __init__(self, msg, file='', line=0): 

73 if file: 

74 Exception.__init__(self, "%s(%d): %s" % (file, line, msg)) 

75 else: 

76 Exception.__init__(self, msg) 

77 

78class CogUsageError(CogError): 

79 """ An error in usage of command-line arguments in cog. 

80 """ 

81 pass 

82 

83class CogInternalError(CogError): 

84 """ An error in the coding of Cog. Should never happen. 

85 """ 

86 pass 

87 

88class CogGeneratedError(CogError): 

89 """ An error raised by a user's cog generator. 

90 """ 

91 pass 

92 

93class CogUserException(CogError): 

94 """ An exception caught when running a user's cog generator. 

95 The argument is the traceback message to print. 

96 """ 

97 pass 

98 

99class CogCheckFailed(CogError): 

100 """ A --check failed. 

101 """ 

102 pass 

103 

104class Redirectable: 

105 """ An object with its own stdout and stderr files. 

106 """ 

107 def __init__(self): 

108 self.stdout = sys.stdout 

109 self.stderr = sys.stderr 

110 

111 def setOutput(self, stdout=None, stderr=None): 

112 """ Assign new files for standard out and/or standard error. 

113 """ 

114 if stdout: 114 ↛ 116line 114 didn't jump to line 116, because the condition on line 114 was never false

115 self.stdout = stdout 

116 if stderr: 116 ↛ 117line 116 didn't jump to line 117, because the condition on line 116 was never true

117 self.stderr = stderr 

118 

119 def prout(self, s, end="\n"): 

120 print(s, file=self.stdout, end=end) 

121 

122 def prerr(self, s, end="\n"): 

123 print(s, file=self.stderr, end=end) 

124 

125 

126class CogGenerator(Redirectable): 

127 """ A generator pulled from a source file. 

128 """ 

129 def __init__(self, options=None): 

130 Redirectable.__init__(self) 

131 self.markers = [] 

132 self.lines = [] 

133 self.options = options or CogOptions() 

134 

135 def parseMarker(self, l): 

136 self.markers.append(l) 

137 

138 def parseLine(self, l): 

139 self.lines.append(l.strip('\n')) 

140 

141 def getCode(self): 

142 """ Extract the executable Python code from the generator. 

143 """ 

144 # If the markers and lines all have the same prefix 

145 # (end-of-line comment chars, for example), 

146 # then remove it from all the lines. 

147 prefIn = commonPrefix(self.markers + self.lines) 

148 if prefIn: 

149 self.markers = [ l.replace(prefIn, '', 1) for l in self.markers ] 

150 self.lines = [ l.replace(prefIn, '', 1) for l in self.lines ] 

151 

152 return reindentBlock(self.lines, '') 

153 

154 def evaluate(self, cog, globals, fname): 

155 # figure out the right whitespace prefix for the output 

156 prefOut = whitePrefix(self.markers) 

157 

158 intext = self.getCode() 

159 if not intext: 

160 return '' 

161 

162 prologue = "import " + cog.cogmodulename + " as cog\n" 

163 if self.options.sPrologue: 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true

164 prologue += self.options.sPrologue + '\n' 

165 code = compile(prologue + intext, str(fname), 'exec') 

166 

167 # Make sure the "cog" module has our state. 

168 cog.cogmodule.msg = self.msg 

169 cog.cogmodule.out = self.out 

170 cog.cogmodule.outl = self.outl 

171 cog.cogmodule.error = self.error 

172 

173 real_stdout = sys.stdout 

174 if self.options.bPrintOutput: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true

175 sys.stdout = captured_stdout = StringIO() 

176 

177 self.outstring = '' 

178 try: 

179 eval(code, globals) 

180 except CogError: 180 ↛ 181line 180 didn't jump to line 181, because the exception caught by line 180 didn't happen

181 raise 

182 except: 

183 typ, err, tb = sys.exc_info() 

184 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

185 frames = find_cog_source(frames, prologue) 

186 msg = "".join(traceback.format_list(frames)) 

187 msg += "{}: {}".format(typ.__name__, err) 

188 raise CogUserException(msg) 

189 finally: 

190 sys.stdout = real_stdout 

191 

192 if self.options.bPrintOutput: 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true

193 self.outstring = captured_stdout.getvalue() 

194 

195 # We need to make sure that the last line in the output 

196 # ends with a newline, or it will be joined to the 

197 # end-output line, ruining cog's idempotency. 

198 if self.outstring and self.outstring[-1] != '\n': 

199 self.outstring += '\n' 

200 

201 return reindentBlock(self.outstring, prefOut) 

202 

203 def msg(self, s): 

204 self.prout("Message: "+s) 

205 

206 def out(self, sOut='', dedent=False, trimblanklines=False): 

207 """ The cog.out function. 

208 """ 

209 if trimblanklines and ('\n' in sOut): 

210 lines = sOut.split('\n') 

211 if lines[0].strip() == '': 

212 del lines[0] 

213 if lines and lines[-1].strip() == '': 

214 del lines[-1] 

215 sOut = '\n'.join(lines)+'\n' 

216 if dedent: 

217 sOut = reindentBlock(sOut) 

218 self.outstring += sOut 

219 

220 def outl(self, sOut='', **kw): 

221 """ The cog.outl function. 

222 """ 

223 self.out(sOut, **kw) 

224 self.out('\n') 

225 

226 def error(self, msg='Error raised by cog generator.'): 

227 """ The cog.error function. 

228 Instead of raising standard python errors, cog generators can use 

229 this function. It will display the error without a scary Python 

230 traceback. 

231 """ 

232 raise CogGeneratedError(msg) 

233 

234 

235class NumberedFileReader: 

236 """ A decorator for files that counts the readline()'s called. 

237 """ 

238 def __init__(self, f): 

239 self.f = f 

240 self.n = 0 

241 

242 def readline(self): 

243 l = self.f.readline() 

244 if l: 

245 self.n += 1 

246 return l 

247 

248 def linenumber(self): 

249 return self.n 

250 

251 

252class CogOptions: 

253 """ Options for a run of cog. 

254 """ 

255 def __init__(self): 

256 # Defaults for argument values. 

257 self.args = [] 

258 self.includePath = [] 

259 self.defines = {} 

260 self.bShowVersion = False 

261 self.sMakeWritableCmd = None 

262 self.bReplace = False 

263 self.bNoGenerate = False 

264 self.sOutputName = None 

265 self.bWarnEmpty = False 

266 self.bHashOutput = False 

267 self.bDeleteCode = False 

268 self.bEofCanBeEnd = False 

269 self.sSuffix = None 

270 self.bNewlines = False 

271 self.sBeginSpec = '[[[cog' 

272 self.sEndSpec = ']]]' 

273 self.sEndOutput = '[[[end]]]' 

274 self.sEncoding = "utf-8" 

275 self.verbosity = 2 

276 self.sPrologue = '' 

277 self.bPrintOutput = False 

278 self.bCheck = False 

279 

280 def __eq__(self, other): 

281 """ Comparison operator for tests to use. 

282 """ 

283 return self.__dict__ == other.__dict__ 

284 

285 def clone(self): 

286 """ Make a clone of these options, for further refinement. 

287 """ 

288 return copy.deepcopy(self) 

289 

290 def addToIncludePath(self, dirs): 

291 """ Add directories to the include path. 

292 """ 

293 dirs = dirs.split(os.pathsep) 

294 self.includePath.extend(dirs) 

295 

296 def parseArgs(self, argv): 

297 # Parse the command line arguments. 

298 try: 

299 opts, self.args = getopt.getopt( 

300 argv, 

301 'cdD:eI:n:o:rs:p:PUvw:xz', 

302 [ 

303 'check', 

304 'markers=', 

305 'verbosity=', 

306 ] 

307 ) 

308 except getopt.error as msg: 

309 raise CogUsageError(msg) 

310 

311 # Handle the command line arguments. 

312 for o, a in opts: 

313 if o == '-c': 

314 self.bHashOutput = True 

315 elif o == '-d': 

316 self.bDeleteCode = True 

317 elif o == '-D': 

318 if a.count('=') < 1: 

319 raise CogUsageError("-D takes a name=value argument") 

320 name, value = a.split('=', 1) 

321 self.defines[name] = value 

322 elif o == '-e': 

323 self.bWarnEmpty = True 

324 elif o == '-I': 

325 self.addToIncludePath(os.path.abspath(a)) 

326 elif o == '-n': 

327 self.sEncoding = a 

328 elif o == '-o': 

329 self.sOutputName = a 

330 elif o == '-r': 

331 self.bReplace = True 

332 elif o == '-s': 

333 self.sSuffix = a 

334 elif o == '-p': 

335 self.sPrologue = a 

336 elif o == '-P': 

337 self.bPrintOutput = True 

338 elif o == '-U': 

339 self.bNewlines = True 

340 elif o == '-v': 

341 self.bShowVersion = True 

342 elif o == '-w': 

343 self.sMakeWritableCmd = a 

344 elif o == '-x': 

345 self.bNoGenerate = True 

346 elif o == '-z': 

347 self.bEofCanBeEnd = True 

348 elif o == '--check': 

349 self.bCheck = True 

350 elif o == '--markers': 

351 self._parse_markers(a) 

352 elif o == '--verbosity': 

353 self.verbosity = int(a) 

354 else: 

355 # Since getopt.getopt is given a list of possible flags, 

356 # this is an internal error. 

357 raise CogInternalError("Don't understand argument %s" % o) 

358 

359 def _parse_markers(self, val): 

360 try: 

361 self.sBeginSpec, self.sEndSpec, self.sEndOutput = val.split(' ') 

362 except ValueError: 

363 raise CogUsageError( 

364 '--markers requires 3 values separated by spaces, could not parse %r' % val 

365 ) 

366 

367 def validate(self): 

368 """ Does nothing if everything is OK, raises CogError's if it's not. 

369 """ 

370 if self.bReplace and self.bDeleteCode: 

371 raise CogUsageError("Can't use -d with -r (or you would delete all your source!)") 

372 

373 if self.bReplace and self.sOutputName: 

374 raise CogUsageError("Can't use -o with -r (they are opposites)") 

375 

376 

377class Cog(Redirectable): 

378 """ The Cog engine. 

379 """ 

380 def __init__(self): 

381 Redirectable.__init__(self) 

382 self.options = CogOptions() 

383 self._fixEndOutputPatterns() 

384 self.cogmodulename = "cog" 

385 self.createCogModule() 

386 self.bCheckFailed = False 

387 

388 def _fixEndOutputPatterns(self): 

389 end_output = re.escape(self.options.sEndOutput) 

390 self.reEndOutput = re.compile(end_output + r'(?P<hashsect> *\(checksum: (?P<hash>[a-f0-9]+)\))') 

391 self.sEndFormat = self.options.sEndOutput + ' (checksum: %s)' 

392 

393 def showWarning(self, msg): 

394 self.prout("Warning: "+msg) 

395 

396 def isBeginSpecLine(self, s): 

397 return self.options.sBeginSpec in s 

398 

399 def isEndSpecLine(self, s): 

400 return self.options.sEndSpec in s and not self.isEndOutputLine(s) 

401 

402 def isEndOutputLine(self, s): 

403 return self.options.sEndOutput in s 

404 

405 def createCogModule(self): 

406 """ Make a cog "module" object so that imported Python modules 

407 can say "import cog" and get our state. 

408 """ 

409 class DummyModule(object): 

410 """Modules don't have to be anything special, just an object will do.""" 

411 pass 

412 self.cogmodule = DummyModule() 

413 self.cogmodule.path = [] 

414 

415 def openOutputFile(self, fname): 

416 """ Open an output file, taking all the details into account. 

417 """ 

418 opts = {} 

419 mode = "w" 

420 if PY3: 

421 opts['encoding'] = self.options.sEncoding 

422 if self.options.bNewlines: 

423 if PY3: 

424 opts['newline'] = "\n" 

425 else: 

426 mode = "wb" 

427 fdir = os.path.dirname(fname) 

428 if os.path.dirname(fdir) and not os.path.exists(fdir): 

429 os.makedirs(fdir) 

430 return open(fname, mode, **opts) 

431 

432 def openInputFile(self, fname): 

433 """ Open an input file. """ 

434 if fname == "-": 

435 return sys.stdin 

436 else: 

437 opts = {} 

438 if PY3: 

439 opts['encoding'] = self.options.sEncoding 

440 return open(fname, "r", **opts) 

441 

442 def processFile(self, fIn, fOut, fname=None, globals=None): 

443 """ Process an input file object to an output file object. 

444 fIn and fOut can be file objects, or file names. 

445 """ 

446 

447 sFileIn = fname or '' 

448 sFileOut = fname or '' 

449 fInToClose = fOutToClose = None 

450 # Convert filenames to files. 

451 if isinstance(fIn, string_types): 451 ↛ 453line 451 didn't jump to line 453, because the condition on line 451 was never true

452 # Open the input file. 

453 sFileIn = fIn 

454 fIn = fInToClose = self.openInputFile(fIn) 

455 if isinstance(fOut, string_types): 455 ↛ 457line 455 didn't jump to line 457, because the condition on line 455 was never true

456 # Open the output file. 

457 sFileOut = fOut 

458 fOut = fOutToClose = self.openOutputFile(fOut) 

459 

460 try: 

461 fIn = NumberedFileReader(fIn) 

462 

463 bSawCog = False 

464 

465 self.cogmodule.inFile = sFileIn 

466 self.cogmodule.outFile = sFileOut 

467 self.cogmodulename = 'cog_' + hashlib.md5(sFileOut.encode()).hexdigest() 

468 sys.modules[self.cogmodulename] = self.cogmodule 

469 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

470 sys.modules['cog'] = self.cogmodule 

471 

472 # The globals dict we'll use for this file. 

473 if globals is None: 473 ↛ 477line 473 didn't jump to line 477, because the condition on line 473 was never false

474 globals = {} 

475 

476 # If there are any global defines, put them in the globals. 

477 globals.update(self.options.defines) 

478 

479 # loop over generator chunks 

480 l = fIn.readline() 

481 while l: 

482 # Find the next spec begin 

483 while l and not self.isBeginSpecLine(l): 

484 if self.isEndSpecLine(l): 484 ↛ 485line 484 didn't jump to line 485, because the condition on line 484 was never true

485 raise CogError("Unexpected '%s'" % self.options.sEndSpec, 

486 file=sFileIn, line=fIn.linenumber()) 

487 if self.isEndOutputLine(l): 487 ↛ 488line 487 didn't jump to line 488, because the condition on line 487 was never true

488 raise CogError("Unexpected '%s'" % self.options.sEndOutput, 

489 file=sFileIn, line=fIn.linenumber()) 

490 fOut.write(l) 

491 l = fIn.readline() 

492 if not l: 

493 break 

494 if not self.options.bDeleteCode: 494 ↛ 498line 494 didn't jump to line 498, because the condition on line 494 was never false

495 fOut.write(l) 

496 

497 # l is the begin spec 

498 gen = CogGenerator(options=self.options) 

499 gen.setOutput(stdout=self.stdout) 

500 gen.parseMarker(l) 

501 firstLineNum = fIn.linenumber() 

502 self.cogmodule.firstLineNum = firstLineNum 

503 

504 # If the spec begin is also a spec end, then process the single 

505 # line of code inside. 

506 if self.isEndSpecLine(l): 

507 beg = l.find(self.options.sBeginSpec) 

508 end = l.find(self.options.sEndSpec) 

509 if beg > end: 

510 raise CogError("Cog code markers inverted", 

511 file=sFileIn, line=firstLineNum) 

512 else: 

513 sCode = l[beg+len(self.options.sBeginSpec):end].strip() 

514 gen.parseLine(sCode) 

515 else: 

516 # Deal with an ordinary code block. 

517 l = fIn.readline() 

518 

519 # Get all the lines in the spec 

520 while l and not self.isEndSpecLine(l): 

521 if self.isBeginSpecLine(l): 521 ↛ 522line 521 didn't jump to line 522, because the condition on line 521 was never true

522 raise CogError("Unexpected '%s'" % self.options.sBeginSpec, 

523 file=sFileIn, line=fIn.linenumber()) 

524 if self.isEndOutputLine(l): 524 ↛ 525line 524 didn't jump to line 525, because the condition on line 524 was never true

525 raise CogError("Unexpected '%s'" % self.options.sEndOutput, 

526 file=sFileIn, line=fIn.linenumber()) 

527 if not self.options.bDeleteCode: 527 ↛ 529line 527 didn't jump to line 529, because the condition on line 527 was never false

528 fOut.write(l) 

529 gen.parseLine(l) 

530 l = fIn.readline() 

531 if not l: 531 ↛ 532line 531 didn't jump to line 532, because the condition on line 531 was never true

532 raise CogError( 

533 "Cog block begun but never ended.", 

534 file=sFileIn, line=firstLineNum) 

535 

536 if not self.options.bDeleteCode: 536 ↛ 538line 536 didn't jump to line 538, because the condition on line 536 was never false

537 fOut.write(l) 

538 gen.parseMarker(l) 

539 

540 l = fIn.readline() 

541 

542 # Eat all the lines in the output section. While reading past 

543 # them, compute the md5 hash of the old output. 

544 previous = "" 

545 hasher = hashlib.md5() 

546 while l and not self.isEndOutputLine(l): 

547 if self.isBeginSpecLine(l): 547 ↛ 548line 547 didn't jump to line 548, because the condition on line 547 was never true

548 raise CogError("Unexpected '%s'" % self.options.sBeginSpec, 

549 file=sFileIn, line=fIn.linenumber()) 

550 if self.isEndSpecLine(l): 550 ↛ 551line 550 didn't jump to line 551, because the condition on line 550 was never true

551 raise CogError("Unexpected '%s'" % self.options.sEndSpec, 

552 file=sFileIn, line=fIn.linenumber()) 

553 previous += l 

554 hasher.update(to_bytes(l)) 

555 l = fIn.readline() 

556 curHash = hasher.hexdigest() 

557 

558 if not l and not self.options.bEofCanBeEnd: 558 ↛ 560line 558 didn't jump to line 560, because the condition on line 558 was never true

559 # We reached end of file before we found the end output line. 

560 raise CogError("Missing '%s' before end of file." % self.options.sEndOutput, 

561 file=sFileIn, line=fIn.linenumber()) 

562 

563 # Make the previous output available to the current code 

564 self.cogmodule.previous = previous 

565 

566 # Write the output of the spec to be the new output if we're 

567 # supposed to generate code. 

568 hasher = hashlib.md5() 

569 if not self.options.bNoGenerate: 569 ↛ 575line 569 didn't jump to line 575, because the condition on line 569 was never false

570 sFile = "<cog %s:%d>" % (sFileIn, firstLineNum) 

571 sGen = gen.evaluate(cog=self, globals=globals, fname=sFile) 

572 sGen = self.suffixLines(sGen) 

573 hasher.update(to_bytes(sGen)) 

574 fOut.write(sGen) 

575 newHash = hasher.hexdigest() 

576 

577 bSawCog = True 

578 

579 # Write the ending output line 

580 hashMatch = self.reEndOutput.search(l) 

581 if self.options.bHashOutput: 581 ↛ 582line 581 didn't jump to line 582, because the condition on line 581 was never true

582 if hashMatch: 

583 oldHash = hashMatch.groupdict()['hash'] 

584 if oldHash != curHash: 

585 raise CogError("Output has been edited! Delete old checksum to unprotect.", 

586 file=sFileIn, line=fIn.linenumber()) 

587 # Create a new end line with the correct hash. 

588 endpieces = l.split(hashMatch.group(0), 1) 

589 else: 

590 # There was no old hash, but we want a new hash. 

591 endpieces = l.split(self.options.sEndOutput, 1) 

592 l = (self.sEndFormat % newHash).join(endpieces) 

593 else: 

594 # We don't want hashes output, so if there was one, get rid of 

595 # it. 

596 if hashMatch: 596 ↛ 597line 596 didn't jump to line 597, because the condition on line 596 was never true

597 l = l.replace(hashMatch.groupdict()['hashsect'], '', 1) 

598 

599 if not self.options.bDeleteCode: 599 ↛ 601line 599 didn't jump to line 601, because the condition on line 599 was never false

600 fOut.write(l) 

601 l = fIn.readline() 

602 

603 if not bSawCog and self.options.bWarnEmpty: 603 ↛ 604line 603 didn't jump to line 604, because the condition on line 603 was never true

604 self.showWarning("no cog code found in %s" % sFileIn) 

605 finally: 

606 if fInToClose: 606 ↛ 607line 606 didn't jump to line 607, because the condition on line 606 was never true

607 fInToClose.close() 

608 if fOutToClose: 608 ↛ 609line 608 didn't jump to line 609, because the condition on line 608 was never true

609 fOutToClose.close() 

610 

611 

612 # A regex for non-empty lines, used by suffixLines. 

613 reNonEmptyLines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

614 

615 def suffixLines(self, text): 

616 """ Add suffixes to the lines in text, if our options desire it. 

617 text is many lines, as a single string. 

618 """ 

619 if self.options.sSuffix: 619 ↛ 621line 619 didn't jump to line 621, because the condition on line 619 was never true

620 # Find all non-blank lines, and add the suffix to the end. 

621 repl = r"\g<0>" + self.options.sSuffix.replace('\\', '\\\\') 

622 text = self.reNonEmptyLines.sub(repl, text) 

623 return text 

624 

625 def processString(self, sInput, fname=None): 

626 """ Process sInput as the text to cog. 

627 Return the cogged output as a string. 

628 """ 

629 fOld = StringIO(sInput) 

630 fNew = StringIO() 

631 self.processFile(fOld, fNew, fname=fname) 

632 return fNew.getvalue() 

633 

634 def replaceFile(self, sOldPath, sNewText): 

635 """ Replace file sOldPath with the contents sNewText 

636 """ 

637 if not os.access(sOldPath, os.W_OK): 

638 # Need to ensure we can write. 

639 if self.options.sMakeWritableCmd: 

640 # Use an external command to make the file writable. 

641 cmd = self.options.sMakeWritableCmd.replace('%s', sOldPath) 

642 self.stdout.write(os.popen(cmd).read()) 

643 if not os.access(sOldPath, os.W_OK): 

644 raise CogError("Couldn't make %s writable" % sOldPath) 

645 else: 

646 # Can't write! 

647 raise CogError("Can't overwrite %s" % sOldPath) 

648 f = self.openOutputFile(sOldPath) 

649 f.write(sNewText) 

650 f.close() 

651 

652 def saveIncludePath(self): 

653 self.savedInclude = self.options.includePath[:] 

654 self.savedSysPath = sys.path[:] 

655 

656 def restoreIncludePath(self): 

657 self.options.includePath = self.savedInclude 

658 self.cogmodule.path = self.options.includePath 

659 sys.path = self.savedSysPath 

660 

661 def addToIncludePath(self, includePath): 

662 self.cogmodule.path.extend(includePath) 

663 sys.path.extend(includePath) 

664 

665 def processOneFile(self, sFile): 

666 """ Process one filename through cog. 

667 """ 

668 

669 self.saveIncludePath() 

670 bNeedNewline = False 

671 

672 try: 

673 self.addToIncludePath(self.options.includePath) 

674 # Since we know where the input file came from, 

675 # push its directory onto the include path. 

676 self.addToIncludePath([os.path.dirname(sFile)]) 

677 

678 # How we process the file depends on where the output is going. 

679 if self.options.sOutputName: 

680 self.processFile(sFile, self.options.sOutputName, sFile) 

681 elif self.options.bReplace or self.options.bCheck: 

682 # We want to replace the cog file with the output, 

683 # but only if they differ. 

684 verb = "Cogging" if self.options.bReplace else "Checking" 

685 if self.options.verbosity >= 2: 

686 self.prout("%s %s" % (verb, sFile), end="") 

687 bNeedNewline = True 

688 

689 try: 

690 fOldFile = self.openInputFile(sFile) 

691 sOldText = fOldFile.read() 

692 fOldFile.close() 

693 sNewText = self.processString(sOldText, fname=sFile) 

694 if sOldText != sNewText: 

695 if self.options.verbosity >= 1: 

696 if self.options.verbosity < 2: 

697 self.prout("%s %s" % (verb, sFile), end="") 

698 self.prout(" (changed)") 

699 bNeedNewline = False 

700 if self.options.bReplace: 

701 self.replaceFile(sFile, sNewText) 

702 else: 

703 assert self.options.bCheck 

704 self.bCheckFailed = True 

705 finally: 

706 # The try-finally block is so we can print a partial line 

707 # with the name of the file, and print (changed) on the 

708 # same line, but also make sure to break the line before 

709 # any traceback. 

710 if bNeedNewline: 

711 self.prout("") 

712 else: 

713 self.processFile(sFile, self.stdout, sFile) 

714 finally: 

715 self.restoreIncludePath() 

716 

717 def processWildcards(self, sFile): 

718 files = glob.glob(sFile) 

719 if files: 

720 for sMatchingFile in files: 

721 self.processOneFile(sMatchingFile) 

722 else: 

723 self.processOneFile(sFile) 

724 

725 def processFileList(self, sFileList): 

726 """ Process the files in a file list. 

727 """ 

728 flist = self.openInputFile(sFileList) 

729 lines = flist.readlines() 

730 flist.close() 

731 for l in lines: 

732 # Use shlex to parse the line like a shell. 

733 lex = shlex.shlex(l, posix=True) 

734 lex.whitespace_split = True 

735 lex.commenters = '#' 

736 # No escapes, so that backslash can be part of the path 

737 lex.escape = '' 

738 args = list(lex) 

739 if args: 

740 self.processArguments(args) 

741 

742 def processArguments(self, args): 

743 """ Process one command-line. 

744 """ 

745 saved_options = self.options 

746 self.options = self.options.clone() 

747 

748 self.options.parseArgs(args[1:]) 

749 self.options.validate() 

750 

751 if args[0][0] == '@': 

752 if self.options.sOutputName: 

753 raise CogUsageError("Can't use -o with @file") 

754 self.processFileList(args[0][1:]) 

755 else: 

756 self.processWildcards(args[0]) 

757 

758 self.options = saved_options 

759 

760 def callableMain(self, argv): 

761 """ All of command-line cog, but in a callable form. 

762 This is used by main. 

763 argv is the equivalent of sys.argv. 

764 """ 

765 argv = argv[1:] 

766 

767 # Provide help if asked for anywhere in the command line. 

768 if '-?' in argv or '-h' in argv: 

769 self.prerr(usage, end="") 

770 return 

771 

772 self.options.parseArgs(argv) 

773 self.options.validate() 

774 self._fixEndOutputPatterns() 

775 

776 if self.options.bShowVersion: 

777 self.prout("Cog version %s" % __version__) 

778 return 

779 

780 if self.options.args: 

781 for a in self.options.args: 

782 self.processArguments([a]) 

783 else: 

784 raise CogUsageError("No files to process") 

785 

786 if self.bCheckFailed: 

787 raise CogCheckFailed("Check failed") 

788 

789 def main(self, argv): 

790 """ Handle the command-line execution for cog. 

791 """ 

792 

793 try: 

794 self.callableMain(argv) 

795 return 0 

796 except CogUsageError as err: 

797 self.prerr(err) 

798 self.prerr("(for help use -h)") 

799 return 2 

800 except CogGeneratedError as err: 

801 self.prerr("Error: %s" % err) 

802 return 3 

803 except CogUserException as err: 

804 self.prerr("Traceback (most recent call last):") 

805 self.prerr(err.args[0]) 

806 return 4 

807 except CogCheckFailed as err: 

808 self.prerr(err) 

809 return 5 

810 except CogError as err: 

811 self.prerr(err) 

812 return 1 

813 

814 

815def find_cog_source(frame_summary, prologue): 

816 """Find cog source lines in a frame summary list, for printing tracebacks. 

817 

818 Arguments: 

819 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

820 prologue: the text of the code prologue. 

821 

822 Returns 

823 A list of 4-item tuples, updated to correct the cog entries. 

824 

825 """ 

826 prolines = prologue.splitlines() 

827 for filename, lineno, funcname, source in frame_summary: 

828 if not source: 828 ↛ 840line 828 didn't jump to line 840, because the condition on line 828 was never false

829 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

830 if m: 830 ↛ 831line 830 didn't jump to line 831, because the condition on line 830 was never true

831 if lineno <= len(prolines): 

832 filename = '<prologue>' 

833 source = prolines[lineno-1] 

834 lineno -= 1 # Because "import cog" is the first line in the prologue 

835 else: 

836 filename, coglineno = m.groups() 

837 coglineno = int(coglineno) 

838 lineno += coglineno - len(prolines) 

839 source = linecache.getline(filename, lineno).strip() 

840 yield filename, lineno, funcname, source 

841 

842 

843def main(): 

844 """Main function for entry_points to use.""" 

845 return Cog().main(sys.argv)