Spaces:
Runtime error
Runtime error
# | |
# DEPRECATED: implementation for ffi.verify() | |
# | |
import sys, os, binascii, shutil, io | |
from . import __version_verifier_modules__ | |
from . import ffiplatform | |
from .error import VerificationError | |
if sys.version_info >= (3, 3): | |
import importlib.machinery | |
def _extension_suffixes(): | |
return importlib.machinery.EXTENSION_SUFFIXES[:] | |
else: | |
import imp | |
def _extension_suffixes(): | |
return [suffix for suffix, _, type in imp.get_suffixes() | |
if type == imp.C_EXTENSION] | |
if sys.version_info >= (3,): | |
NativeIO = io.StringIO | |
else: | |
class NativeIO(io.BytesIO): | |
def write(self, s): | |
if isinstance(s, unicode): | |
s = s.encode('ascii') | |
super(NativeIO, self).write(s) | |
class Verifier(object): | |
def __init__(self, ffi, preamble, tmpdir=None, modulename=None, | |
ext_package=None, tag='', force_generic_engine=False, | |
source_extension='.c', flags=None, relative_to=None, **kwds): | |
if ffi._parser._uses_new_feature: | |
raise VerificationError( | |
"feature not supported with ffi.verify(), but only " | |
"with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,)) | |
self.ffi = ffi | |
self.preamble = preamble | |
if not modulename: | |
flattened_kwds = ffiplatform.flatten(kwds) | |
vengine_class = _locate_engine_class(ffi, force_generic_engine) | |
self._vengine = vengine_class(self) | |
self._vengine.patch_extension_kwds(kwds) | |
self.flags = flags | |
self.kwds = self.make_relative_to(kwds, relative_to) | |
# | |
if modulename: | |
if tag: | |
raise TypeError("can't specify both 'modulename' and 'tag'") | |
else: | |
key = '\x00'.join(['%d.%d' % sys.version_info[:2], | |
__version_verifier_modules__, | |
preamble, flattened_kwds] + | |
ffi._cdefsources) | |
if sys.version_info >= (3,): | |
key = key.encode('utf-8') | |
k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff) | |
k1 = k1.lstrip('0x').rstrip('L') | |
k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff) | |
k2 = k2.lstrip('0').rstrip('L') | |
modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key, | |
k1, k2) | |
suffix = _get_so_suffixes()[0] | |
self.tmpdir = tmpdir or _caller_dir_pycache() | |
self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension) | |
self.modulefilename = os.path.join(self.tmpdir, modulename + suffix) | |
self.ext_package = ext_package | |
self._has_source = False | |
self._has_module = False | |
def write_source(self, file=None): | |
"""Write the C source code. It is produced in 'self.sourcefilename', | |
which can be tweaked beforehand.""" | |
with self.ffi._lock: | |
if self._has_source and file is None: | |
raise VerificationError( | |
"source code already written") | |
self._write_source(file) | |
def compile_module(self): | |
"""Write the C source code (if not done already) and compile it. | |
This produces a dynamic link library in 'self.modulefilename'.""" | |
with self.ffi._lock: | |
if self._has_module: | |
raise VerificationError("module already compiled") | |
if not self._has_source: | |
self._write_source() | |
self._compile_module() | |
def load_library(self): | |
"""Get a C module from this Verifier instance. | |
Returns an instance of a FFILibrary class that behaves like the | |
objects returned by ffi.dlopen(), but that delegates all | |
operations to the C module. If necessary, the C code is written | |
and compiled first. | |
""" | |
with self.ffi._lock: | |
if not self._has_module: | |
self._locate_module() | |
if not self._has_module: | |
if not self._has_source: | |
self._write_source() | |
self._compile_module() | |
return self._load_library() | |
def get_module_name(self): | |
basename = os.path.basename(self.modulefilename) | |
# kill both the .so extension and the other .'s, as introduced | |
# by Python 3: 'basename.cpython-33m.so' | |
basename = basename.split('.', 1)[0] | |
# and the _d added in Python 2 debug builds --- but try to be | |
# conservative and not kill a legitimate _d | |
if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'): | |
basename = basename[:-2] | |
return basename | |
def get_extension(self): | |
ffiplatform._hack_at_distutils() # backward compatibility hack | |
if not self._has_source: | |
with self.ffi._lock: | |
if not self._has_source: | |
self._write_source() | |
sourcename = ffiplatform.maybe_relative_path(self.sourcefilename) | |
modname = self.get_module_name() | |
return ffiplatform.get_extension(sourcename, modname, **self.kwds) | |
def generates_python_module(self): | |
return self._vengine._gen_python_module | |
def make_relative_to(self, kwds, relative_to): | |
if relative_to and os.path.dirname(relative_to): | |
dirname = os.path.dirname(relative_to) | |
kwds = kwds.copy() | |
for key in ffiplatform.LIST_OF_FILE_NAMES: | |
if key in kwds: | |
lst = kwds[key] | |
if not isinstance(lst, (list, tuple)): | |
raise TypeError("keyword '%s' should be a list or tuple" | |
% (key,)) | |
lst = [os.path.join(dirname, fn) for fn in lst] | |
kwds[key] = lst | |
return kwds | |
# ---------- | |
def _locate_module(self): | |
if not os.path.isfile(self.modulefilename): | |
if self.ext_package: | |
try: | |
pkg = __import__(self.ext_package, None, None, ['__doc__']) | |
except ImportError: | |
return # cannot import the package itself, give up | |
# (e.g. it might be called differently before installation) | |
path = pkg.__path__ | |
else: | |
path = None | |
filename = self._vengine.find_module(self.get_module_name(), path, | |
_get_so_suffixes()) | |
if filename is None: | |
return | |
self.modulefilename = filename | |
self._vengine.collect_types() | |
self._has_module = True | |
def _write_source_to(self, file): | |
self._vengine._f = file | |
try: | |
self._vengine.write_source_to_f() | |
finally: | |
del self._vengine._f | |
def _write_source(self, file=None): | |
if file is not None: | |
self._write_source_to(file) | |
else: | |
# Write our source file to an in memory file. | |
f = NativeIO() | |
self._write_source_to(f) | |
source_data = f.getvalue() | |
# Determine if this matches the current file | |
if os.path.exists(self.sourcefilename): | |
with open(self.sourcefilename, "r") as fp: | |
needs_written = not (fp.read() == source_data) | |
else: | |
needs_written = True | |
# Actually write the file out if it doesn't match | |
if needs_written: | |
_ensure_dir(self.sourcefilename) | |
with open(self.sourcefilename, "w") as fp: | |
fp.write(source_data) | |
# Set this flag | |
self._has_source = True | |
def _compile_module(self): | |
# compile this C source | |
tmpdir = os.path.dirname(self.sourcefilename) | |
outputfilename = ffiplatform.compile(tmpdir, self.get_extension()) | |
try: | |
same = ffiplatform.samefile(outputfilename, self.modulefilename) | |
except OSError: | |
same = False | |
if not same: | |
_ensure_dir(self.modulefilename) | |
shutil.move(outputfilename, self.modulefilename) | |
self._has_module = True | |
def _load_library(self): | |
assert self._has_module | |
if self.flags is not None: | |
return self._vengine.load_library(self.flags) | |
else: | |
return self._vengine.load_library() | |
# ____________________________________________________________ | |
_FORCE_GENERIC_ENGINE = False # for tests | |
def _locate_engine_class(ffi, force_generic_engine): | |
if _FORCE_GENERIC_ENGINE: | |
force_generic_engine = True | |
if not force_generic_engine: | |
if '__pypy__' in sys.builtin_module_names: | |
force_generic_engine = True | |
else: | |
try: | |
import _cffi_backend | |
except ImportError: | |
_cffi_backend = '?' | |
if ffi._backend is not _cffi_backend: | |
force_generic_engine = True | |
if force_generic_engine: | |
from . import vengine_gen | |
return vengine_gen.VGenericEngine | |
else: | |
from . import vengine_cpy | |
return vengine_cpy.VCPythonEngine | |
# ____________________________________________________________ | |
_TMPDIR = None | |
def _caller_dir_pycache(): | |
if _TMPDIR: | |
return _TMPDIR | |
result = os.environ.get('CFFI_TMPDIR') | |
if result: | |
return result | |
filename = sys._getframe(2).f_code.co_filename | |
return os.path.abspath(os.path.join(os.path.dirname(filename), | |
'__pycache__')) | |
def set_tmpdir(dirname): | |
"""Set the temporary directory to use instead of __pycache__.""" | |
global _TMPDIR | |
_TMPDIR = dirname | |
def cleanup_tmpdir(tmpdir=None, keep_so=False): | |
"""Clean up the temporary directory by removing all files in it | |
called `_cffi_*.{c,so}` as well as the `build` subdirectory.""" | |
tmpdir = tmpdir or _caller_dir_pycache() | |
try: | |
filelist = os.listdir(tmpdir) | |
except OSError: | |
return | |
if keep_so: | |
suffix = '.c' # only remove .c files | |
else: | |
suffix = _get_so_suffixes()[0].lower() | |
for fn in filelist: | |
if fn.lower().startswith('_cffi_') and ( | |
fn.lower().endswith(suffix) or fn.lower().endswith('.c')): | |
try: | |
os.unlink(os.path.join(tmpdir, fn)) | |
except OSError: | |
pass | |
clean_dir = [os.path.join(tmpdir, 'build')] | |
for dir in clean_dir: | |
try: | |
for fn in os.listdir(dir): | |
fn = os.path.join(dir, fn) | |
if os.path.isdir(fn): | |
clean_dir.append(fn) | |
else: | |
os.unlink(fn) | |
except OSError: | |
pass | |
def _get_so_suffixes(): | |
suffixes = _extension_suffixes() | |
if not suffixes: | |
# bah, no C_EXTENSION available. Occurs on pypy without cpyext | |
if sys.platform == 'win32': | |
suffixes = [".pyd"] | |
else: | |
suffixes = [".so"] | |
return suffixes | |
def _ensure_dir(filename): | |
dirname = os.path.dirname(filename) | |
if dirname and not os.path.isdir(dirname): | |
os.makedirs(dirname) | |