Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
96 changes: 90 additions & 6 deletions codeanalyzer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from codeanalyzer.utils import _set_log_level, logger
from codeanalyzer.config import OutputFormat
from codeanalyzer.schema import model_dump_json
from codeanalyzer.options import AnalysisOptions, EmitTarget
from codeanalyzer.options import AnalysisOptions, EmitTarget, ShardStrategy


def main(
Expand Down Expand Up @@ -83,9 +83,16 @@ def main(
help="Neo4j database name (default: server default). [env: NEO4J_DATABASE]",
),
] = None,
using_codeql: Annotated[
bool, typer.Option("--codeql/--no-codeql", help="Enable CodeQL-based analysis.")
] = False,
analysis_level: Annotated[
int,
typer.Option(
"-a",
"--analysis-level",
help="Analysis depth: 1=symbol table+Jedi call graph, 2=+PyCG call graph.",
min=1,
max=2,
),
] = 1,
using_ray: Annotated[
bool,
typer.Option("--ray/--no-ray", help="Enable Ray for distributed analysis."),
Expand Down Expand Up @@ -137,6 +144,78 @@ def main(
verbosity: Annotated[
int, typer.Option("-v", count=True, help="Increase verbosity: -v, -vv, -vvv")
] = 0,
pycg_shard: Annotated[
bool,
typer.Option(
"--pycg-shard/--no-pycg-shard",
help=(
"Shard PyCG call-graph analysis by Python package (level 2 only). "
"When the project exceeds the 500-file ceiling, PyCG is run "
"independently per top-level package with cross-package imports "
"treated as ghost nodes. Without this flag, projects over the "
"ceiling fall back to Jedi-only edges."
),
),
] = False,
pycg_shard_ceiling: Annotated[
int,
typer.Option(
"--pycg-shard-ceiling",
help=(
"Maximum files per shard when --pycg-shard is active (default 100). "
"Shards exceeding this limit are skipped; their call edges are "
"omitted from the call graph (Jedi edges for those packages are "
"still included). Lower values are safer for packages with deep "
"class hierarchies or heavy import graphs."
),
min=1,
),
] = 100,
pycg_shard_timeout: Annotated[
int,
typer.Option(
"--pycg-shard-timeout",
help=(
"Per-shard wall-clock timeout in seconds when --pycg-shard is "
"active (default 120). A shard that exceeds this limit is skipped "
"gracefully. PyCG's fixpoint is bimodal: it either converges "
"quickly or diverges indefinitely, so the timeout acts as a final "
"safety net after the file-count ceiling. Set to 0 to disable. "
"POSIX only (macOS / Linux); ignored on Windows."
),
min=0,
),
] = 120,
pycg_shard_strategy: Annotated[
ShardStrategy,
typer.Option(
"--pycg-shard-strategy",
help=(
"How --pycg-shard groups files (level 2 only). 'jedi' (default) "
"partitions the Jedi module-dependency graph (SCC + Louvain) so "
"tightly-coupled modules co-compute and few call edges are "
"severed between shards; import cycles are never split. "
"'package' uses the legacy one-shard-per-package-directory "
"grouping."
),
),
] = ShardStrategy.JEDI,
pycg_max_iter: Annotated[
int,
typer.Option(
"--pycg-max-iter",
help=(
"Cap on PyCG's fixpoint passes per shard/project (level 2; "
"default 50). PyCG iterates until its points-to state stops "
"changing, but its access-path domain has no convergence bound, "
"so heavy metaclass/mixin code (e.g. an ORM) can loop with each "
"pass costing seconds. The cap returns a sound-but-incomplete "
"call graph instead of looping until the timeout kills it. "
"Set to -1 for PyCG's unbounded run-to-convergence behaviour."
),
min=-1,
),
] = 50,
):
options = AnalysisOptions(
input=input,
Expand All @@ -148,7 +227,7 @@ def main(
neo4j_user=neo4j_user,
neo4j_password=neo4j_password,
neo4j_database=neo4j_database,
using_codeql=using_codeql,
analysis_level=analysis_level,
using_ray=using_ray,
rebuild_analysis=rebuild_analysis,
skip_tests=skip_tests,
Expand All @@ -157,6 +236,11 @@ def main(
cache_dir=cache_dir,
clear_cache=clear_cache,
verbosity=verbosity,
pycg_shard=pycg_shard,
pycg_shard_ceiling=pycg_shard_ceiling,
pycg_shard_timeout=pycg_shard_timeout,
pycg_shard_strategy=pycg_shard_strategy,
pycg_max_iter=pycg_max_iter,
)

_set_log_level(options.verbosity)
Expand Down Expand Up @@ -230,7 +314,7 @@ def _write_output(artifacts, output_dir: Path, format: OutputFormat):
app = typer.Typer(
callback=main,
name="canpy",
help="Static Analysis on Python source code using Jedi, CodeQL and Tree sitter.",
help="Static Analysis on Python source code using Jedi, PyCG and Tree sitter.",
invoke_without_command=True,
no_args_is_help=True,
add_completion=False,
Expand Down
Loading