Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ $ canpy --help
│ [default: lazy] │
│ --skip-tests --include-tests Skip test files in analysis. │
│ [default: skip-tests] │
│ --no-venv --venv Skip virtualenv creation and │
│ dependency installation; resolve │
│ imports against the ambient Python │
│ environment instead. │
│ [default: venv] │
│ --file-name PATH Analyze only the specified file │
│ (relative to input directory). │
│ --cache-dir -c PATH Directory to store analysis cache. │
Expand Down
9 changes: 9 additions & 0 deletions codeanalyzer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ def main(
help="Skip test files in analysis.",
),
] = True,
no_venv: Annotated[
bool,
typer.Option(
"--no-venv/--venv",
help="Skip virtualenv creation and dependency installation; resolve "
"imports against the ambient Python environment instead.",
),
] = False,
file_name: Annotated[
Optional[Path],
typer.Option(
Expand Down Expand Up @@ -144,6 +152,7 @@ def main(
using_ray=using_ray,
rebuild_analysis=rebuild_analysis,
skip_tests=skip_tests,
no_venv=no_venv,
file_name=file_name,
cache_dir=cache_dir,
clear_cache=clear_cache,
Expand Down
118 changes: 100 additions & 18 deletions codeanalyzer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

import ray
from codeanalyzer.utils import logger
from codeanalyzer.schema import PyApplication, PyModule, model_dump_json, model_validate_json
from codeanalyzer.schema import (
PyApplication,
PyExternalSymbol,
PyModule,
model_dump_json,
model_validate_json,
)
from codeanalyzer.schema.py_schema import PyCallEdge
from codeanalyzer.semantic_analysis.call_graph import (
jedi_call_graph_edges,
Expand Down Expand Up @@ -60,6 +66,7 @@ def __init__(self, options: AnalysisOptions) -> None:
self.skip_tests = options.skip_tests
self.using_codeql = options.using_codeql
self.rebuild_analysis = options.rebuild_analysis
self.no_venv = options.no_venv
self.cache_dir = (
options.cache_dir.resolve() if options.cache_dir is not None else self.project_dir
) / ".codeanalyzer"
Expand Down Expand Up @@ -226,13 +233,41 @@ def _get_base_interpreter() -> Path:
f"a working Python interpreter that can create virtual environments."
)

@staticmethod
def _uv_bin() -> Optional[str]:
"""Path to a uv binary: the one bundled with the ``uv`` PyPI package (a
dependency, so normally always present -- including inside a Docker image),
else a uv on PATH, else ``None`` (callers fall back to pip)."""
try:
from uv import find_uv_bin

return str(find_uv_bin())
except Exception:
return shutil.which("uv")

def _install_into_venv(self, venv_python: Path, args: List[str]) -> None:
"""Install packages into the target venv, preferring uv for speed (parallel
downloads + a shared global cache) and falling back to the venv's own pip
when uv is unavailable."""
uv = self._uv_bin()
if uv:
cmd = [uv, "pip", "install", "--python", str(venv_python), *args]
else:
cmd = [str(venv_python), "-m", "pip", "install", *args]
self._cmd_exec_helper(cmd, cwd=self.project_dir, check=True)

def __enter__(self) -> "Codeanalyzer":
# If no virtualenv is provided, try to create one using requirements.txt or pyproject.toml
venv_path = self.cache_dir / self.project_dir.name / "virtualenv"
# Ensure the cache directory exists for this project
venv_path.parent.mkdir(parents=True, exist_ok=True)
if self.no_venv:
logger.info(
"--no-venv: using the ambient Python environment "
"(skipping virtualenv creation and dependency installation)"
)
# Create the virtual environment if it does not exist
if not venv_path.exists() or self.rebuild_analysis:
if not self.no_venv and (not venv_path.exists() or self.rebuild_analysis):
logger.info(f"(Re-)creating virtual environment at {venv_path}")
self._cmd_exec_helper(
[str(self._get_base_interpreter()), "-m", "venv", str(venv_path)],
Expand All @@ -249,24 +284,19 @@ def __enter__(self) -> "Codeanalyzer":
("test-requirements.txt", ["-r"]),
]

for dep_file, pip_args in dependency_files:
for dep_file, _ in dependency_files:
if (self.project_dir / dep_file).exists():
logger.info(f"Installing dependencies from {dep_file}")
self._cmd_exec_helper(
[str(venv_python), "-m", "pip", "install", "-U"] + pip_args + [str(self.project_dir / dep_file)],
cwd=self.project_dir,
check=True,
self._install_into_venv(
venv_python,
["--upgrade", "-r", str(self.project_dir / dep_file)],
)

# Handle Pipenv files
if (self.project_dir / "Pipfile").exists():
logger.info("Installing dependencies from Pipfile")
# Note: This would require pipenv to be installed
self._cmd_exec_helper(
[str(venv_python), "-m", "pip", "install", "pipenv"],
cwd=self.project_dir,
check=True,
)
self._install_into_venv(venv_python, ["pipenv"])
self._cmd_exec_helper(
["pipenv", "install", "--dev"],
cwd=self.project_dir,
Expand All @@ -289,14 +319,18 @@ def __enter__(self) -> "Codeanalyzer":

if any((self.project_dir / file).exists() for file in package_definition_files):
logger.info("Installing project in editable mode")
self._cmd_exec_helper(
[str(venv_python), "-m", "pip", "install", "-e", str(self.project_dir)],
cwd=self.project_dir,
check=True,
)
self._install_into_venv(venv_python, ["-e", str(self.project_dir)])
else:
logger.warning("No package definition files found, skipping editable installation")

# Point Jedi at the analysis venv so it resolves the project's third-party
# imports. This runs on both a fresh build and a lazy reuse of an existing
# venv -- previously self.virtualenv stayed None, so the install above was
# never actually used by the symbol-table builder. With --no-venv we leave
# it None so Jedi resolves against the ambient interpreter instead.
if not self.no_venv and venv_path.exists():
self.virtualenv = venv_path

if self.using_codeql:
logger.info(f"(Re-)initializing CodeQL analysis for {self.project_dir}")

Expand Down Expand Up @@ -358,6 +392,43 @@ def __exit__(self, *args, **kwargs) -> None:
logger.info(f"Clearing cache directory: {self.cache_dir}")
shutil.rmtree(self.cache_dir)

@staticmethod
def _compute_external_symbols(symbol_table, call_graph):
"""Build the external-symbol map: every call-graph endpoint whose signature
is not a declared class/callable in the symbol table is an external (an
imported library or builtin member). ``name``/``module`` are derived from
the signature (best effort: split on the last dot)."""
declared = set()

def walk_callable(c):
declared.add(c.signature)
for ic in (c.inner_callables or {}).values():
walk_callable(ic)
for cl in (c.inner_classes or {}).values():
walk_class(cl)

def walk_class(cl):
declared.add(cl.signature)
for m in (cl.methods or {}).values():
walk_callable(m)
for ic in (cl.inner_classes or {}).values():
walk_class(ic)

for mod in symbol_table.values():
for c in (mod.functions or {}).values():
walk_callable(c)
for cl in (mod.classes or {}).values():
walk_class(cl)

externals: Dict[str, PyExternalSymbol] = {}
for edge in call_graph:
for sig in (edge.source, edge.target):
if sig in declared or sig in externals:
continue
module, name = sig.rsplit(".", 1) if "." in sig else (sig, sig)
externals[sig] = PyExternalSymbol(name=name, module=module)
return externals

def analyze(self) -> PyApplication:
"""Analyze the project and return a PyApplication with symbol table.

Expand Down Expand Up @@ -397,8 +468,19 @@ def analyze(self) -> PyApplication:
jedi_edges = jedi_call_graph_edges(symbol_table)
call_graph = merge_edges(jedi_edges, codeql_edges)

# Classify call-graph endpoints that are not declared in the symbol table
# (imported library / builtin members) once, so the JSON and Neo4j backends
# share one authoritative external-symbol set.
external_symbols = self._compute_external_symbols(symbol_table, call_graph)

# Recreate pyapplication
app = PyApplication.builder().symbol_table(symbol_table).call_graph(call_graph).build()
app = (
PyApplication.builder()
.symbol_table(symbol_table)
.call_graph(call_graph)
.external_symbols(external_symbols)
.build()
)

# Save to cache
self._save_analysis_cache(app, cache_file)
Expand Down
15 changes: 13 additions & 2 deletions codeanalyzer/neo4j/bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ def session():
for stmt in [*CONSTRAINTS, *INDEXES]:
s.run(stmt)

# The application anchor (a shared node) — used to scope the orphan prune
# so it never touches modules belonging to a different :PyApplication.
app_name = next(
(n.value for n in rows.nodes if n.labels and n.labels[0] == "PyApplication"),
None,
)

# Partition nodes by owning module; shared nodes have no _module.
by_module: Dict[str, List[NodeRow]] = {}
shared: List[NodeRow] = []
Expand Down Expand Up @@ -135,13 +142,17 @@ def _purge(tx, module=m, node_keys=keys):
_upsert_edges(session, neo4j, edges)

# 6. orphan prune — only safe on a full run (a targeted run can't tell deleted from untargeted).
if full_run:
# Scope to THIS application's anchor so a full run for application B never
# deletes application A's modules from a shared database.
if full_run and app_name is not None:
present = list(by_module.keys())
with session() as s:
res = s.run(
"MATCH (m:PyModule) WHERE NOT m.file_key IN $present "
"MATCH (:PyApplication {name: $app})-[:PY_HAS_MODULE]->(m:PyModule) "
"WHERE NOT m.file_key IN $present "
f"OPTIONAL MATCH (m)-{DESCENDANTS}->(x) DETACH DELETE x, m "
"RETURN count(m) AS pruned",
app=app_name,
present=present,
)
pruned = res.single()
Expand Down
4 changes: 2 additions & 2 deletions codeanalyzer/neo4j/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from codeanalyzer.neo4j.schema import CONSTRAINTS, INDEXES

SCHEMA_VERSION = "1.0.0"
SCHEMA_VERSION = "1.1.0"

# PropType ∈ {"string", "integer", "float", "boolean", "string[]", "integer[]"}.

Expand Down Expand Up @@ -119,7 +119,7 @@ class RelType:
"PyExternal",
"PySymbol",
"signature",
{"signature": "string", "name": "string"},
{"signature": "string", "name": "string", "module": "string"},
),
NodeLabel("PyPackage", "PyPackage", "name", {"name": "string"}),
NodeLabel(
Expand Down
28 changes: 18 additions & 10 deletions codeanalyzer/neo4j/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ def project(app: PyApplication, app_name: str) -> GraphRows:
b.edge("PY_HAS_MODULE", app_ref, mod_ref)
_project_module_body(b, file_key, mod_ref, mod)

# The aggregated :PY_CALLS twin. Endpoints not present in the symbol table become
# :PyExternal ghost nodes (the analyzer already preserves them as ghost nodes).
# The aggregated :PY_CALLS twin. Endpoints listed in app.external_symbols become
# :PyExternal ghost nodes; the rest are declared :PySymbol nodes already emitted.
externals = app.external_symbols or {}
for e in app.call_graph:
src = _call_endpoint(b, e.source)
tgt = _call_endpoint(b, e.target)
src = _call_endpoint(b, e.source, externals)
tgt = _call_endpoint(b, e.target, externals)
b.edge("PY_CALLS", src, tgt, _call_edge_props(e.weight, list(e.provenance or [])))

return b.finish()
Expand All @@ -74,13 +75,20 @@ def _sym(signature: str) -> NodeRef:
return NodeRef("PySymbol", "signature", signature)


def _call_endpoint(b: RowBuilder, signature: str) -> NodeRef:
"""A call-graph endpoint: a known callable already emitted, or a phantom
:PyExternal symbol materialized on demand for a ghost target."""
if b.has_key(signature):
def _call_endpoint(b: RowBuilder, signature: str, externals: dict) -> NodeRef:
"""A call-graph endpoint: a declared callable already emitted, or an external
symbol (imported library / builtin member) materialized as a :PyExternal ghost.

Classification is authoritative -- it comes from ``app.external_symbols``, not a
"present in the graph" heuristic -- so an imported module name (which exists only
as a :PyPackage) can never shadow the call target. A small fallback still
materializes an external for any endpoint that is neither declared nor listed."""
ext = externals.get(signature)
if ext is None and b.has_key("PySymbol", signature):
return _sym(signature)
name = signature.rsplit(".", 1)[-1] if "." in signature else signature
return b.node(["PySymbol", "PyExternal"], "signature", signature, {"name": name})
name = ext.name if ext is not None else (signature.rsplit(".", 1)[-1] if "." in signature else signature)
module = ext.module if ext is not None else None
return b.node(["PySymbol", "PyExternal"], "signature", signature, prune({"name": name, "module": module}))


# ----------------------------------------------------------------------------------------------
Expand Down
15 changes: 10 additions & 5 deletions codeanalyzer/neo4j/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ def __init__(self) -> None:
self._nodes: Dict[str, NodeRow] = {} # key: f"{labels[0]} {value}"
self._edges: List[EdgeRow] = []
self._deferred: List[EdgeRow] = [] # edges gated against node existence at finish()
self._keys: set = set() # every node value seen, for resolved-gating
# (merge_label, value) of every node seen, for resolved-gating. Keyed by
# label too so a :PyPackage name can't shadow a :PySymbol signature (and
# vice versa) — otherwise a call to an imported module name like ``os``
# resolves to a :PySymbol node that was never created and the edge is lost.
self._keys: set = set()

def node(self, labels: List[str], key_prop: str, value: str, props: Props) -> NodeRef:
"""Upsert a node. Re-seeing the same ``(labels[0], value)`` merges props
Expand All @@ -98,7 +102,7 @@ def node(self, labels: List[str], key_prop: str, value: str, props: Props) -> No
existing.labels.append(label)
else:
self._nodes[node_id] = NodeRow(list(labels), key_prop, value, dict(props))
self._keys.add(value)
self._keys.add((labels[0], value))
return NodeRef(labels[0], key_prop, value)

def edge(self, type_: str, from_ref: NodeRef, to_ref: NodeRef, props: Optional[Props] = None) -> None:
Expand All @@ -121,12 +125,13 @@ def edge_to_symbol(
)
)

def has_key(self, value: str) -> bool:
return value in self._keys
def has_key(self, label: str, value: str) -> bool:
"""Whether a node with this ``(merge_label, value)`` identity was emitted."""
return (label, value) in self._keys

def finish(self) -> GraphRows:
for e in self._deferred:
if e.to_ref.value in self._keys:
if (e.to_ref.label, e.to_ref.value) in self._keys:
self._edges.append(e)
nodes = sorted(self._nodes.values(), key=lambda n: f"{n.labels[0]} {n.value}")
edges = sorted(self._edges, key=lambda e: f"{e.type} {e.from_ref.value} {e.to_ref.value}")
Expand Down
1 change: 1 addition & 0 deletions codeanalyzer/options/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AnalysisOptions:
using_ray: bool = False
rebuild_analysis: bool = False
skip_tests: bool = True
no_venv: bool = False
file_name: Optional[Path] = None
cache_dir: Optional[Path] = None
clear_cache: bool = False
Expand Down
2 changes: 2 additions & 0 deletions codeanalyzer/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
PyClass,
PyClassAttribute,
PyComment,
PyExternalSymbol,
PyImport,
PyModule,
PyVariableDeclaration,
)

__all__ = [
"PyApplication",
"PyExternalSymbol",
"PyImport",
"PyComment",
"PyModule",
Expand Down
Loading