summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sphinx/util/inventory.py47
1 files changed, 18 insertions, 29 deletions
diff --git a/sphinx/util/inventory.py b/sphinx/util/inventory.py
index 0fdc3e833..43a868e95 100644
--- a/sphinx/util/inventory.py
+++ b/sphinx/util/inventory.py
@@ -10,20 +10,20 @@
import os
import re
import zlib
+from typing import Callable, IO, Iterator
from sphinx.util import logging
-
-if False:
- # For type annotation
- from typing import Callable, IO, Iterator # NOQA
- from sphinx.builders import Builder # NOQA
- from sphinx.environment import BuildEnvironment # NOQA
- from sphinx.util.typing import Inventory # NOQA
+from sphinx.util.typing import Inventory
BUFSIZE = 16 * 1024
logger = logging.getLogger(__name__)
+if False:
+ # For type annotation
+ from sphinx.builders import Builder
+ from sphinx.environment import BuildEnvironment
+
class InventoryFileReader:
"""A file reader for inventory file.
@@ -31,21 +31,18 @@ class InventoryFileReader:
This reader supports mixture of texts and compressed texts.
"""
- def __init__(self, stream):
- # type: (IO) -> None
+ def __init__(self, stream: IO) -> None:
self.stream = stream
self.buffer = b''
self.eof = False
- def read_buffer(self):
- # type: () -> None
+ def read_buffer(self) -> None:
chunk = self.stream.read(BUFSIZE)
if chunk == b'':
self.eof = True
self.buffer += chunk
- def readline(self):
- # type: () -> str
+ def readline(self) -> str:
pos = self.buffer.find(b'\n')
if pos != -1:
line = self.buffer[:pos].decode()
@@ -59,15 +56,13 @@ class InventoryFileReader:
return line
- def readlines(self):
- # type: () -> Iterator[str]
+ def readlines(self) -> Iterator[str]:
while not self.eof:
line = self.readline()
if line:
yield line
- def read_compressed_chunks(self):
- # type: () -> Iterator[bytes]
+ def read_compressed_chunks(self) -> Iterator[bytes]:
decompressor = zlib.decompressobj()
while not self.eof:
self.read_buffer()
@@ -75,8 +70,7 @@ class InventoryFileReader:
self.buffer = b''
yield decompressor.flush()
- def read_compressed_lines(self):
- # type: () -> Iterator[str]
+ def read_compressed_lines(self) -> Iterator[str]:
buf = b''
for chunk in self.read_compressed_chunks():
buf += chunk
@@ -89,8 +83,7 @@ class InventoryFileReader:
class InventoryFile:
@classmethod
- def load(cls, stream, uri, joinfunc):
- # type: (IO, str, Callable) -> Inventory
+ def load(cls, stream: IO, uri: str, joinfunc: Callable) -> Inventory:
reader = InventoryFileReader(stream)
line = reader.readline().rstrip()
if line == '# Sphinx inventory version 1':
@@ -101,8 +94,7 @@ class InventoryFile:
raise ValueError('invalid inventory header: %s' % line)
@classmethod
- def load_v1(cls, stream, uri, join):
- # type: (InventoryFileReader, str, Callable) -> Inventory
+ def load_v1(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
invdata = {} # type: Inventory
projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:]
@@ -120,8 +112,7 @@ class InventoryFile:
return invdata
@classmethod
- def load_v2(cls, stream, uri, join):
- # type: (InventoryFileReader, str, Callable) -> Inventory
+ def load_v2(cls, stream: InventoryFileReader, uri: str, join: Callable) -> Inventory:
invdata = {} # type: Inventory
projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:]
@@ -150,10 +141,8 @@ class InventoryFile:
return invdata
@classmethod
- def dump(cls, filename, env, builder):
- # type: (str, BuildEnvironment, Builder) -> None
- def escape(string):
- # type: (str) -> str
+ def dump(cls, filename: str, env: "BuildEnvironment", builder: "Builder") -> None:
+ def escape(string: str) -> str:
return re.sub("\\s+", " ", string)
with open(os.path.join(filename), 'wb') as f: