Skip to content

refactor: Use GitPython instead of git in command line #503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ readme = {file = "README.md", content-type = "text/markdown" }
requires-python = ">= 3.8"
dependencies = [
"click>=8.0.0",
"gitpython>=3.1.0",
"httpx",
"loguru>=0.7.0",
"pathspec>=0.12.1",
Expand Down
82 changes: 52 additions & 30 deletions src/gitingest/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
from typing import TYPE_CHECKING

from gitingest.config import DEFAULT_TIMEOUT
import git
from gitingest.utils.git_utils import (
_add_token_to_url,
check_repo_exists,
checkout_partial_clone,
create_git_auth_header,
create_git_command,
create_git_repo,
ensure_git_installed,
is_github_host,
resolve_commit,
run_command,
)
from gitingest.utils.logging_config import get_logger
from gitingest.utils.os_utils import ensure_directory_exists_or_create
Expand Down Expand Up @@ -83,41 +84,62 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
commit = await resolve_commit(config, token=token)
logger.debug("Resolved commit", extra={"commit": commit})

clone_cmd = ["git"]
# Prepare URL with authentication if needed
clone_url = url
if token and is_github_host(url):
clone_cmd += ["-c", create_git_auth_header(token, url=url)]

clone_cmd += ["clone", "--single-branch", "--no-checkout", "--depth=1"]
if partial_clone:
clone_cmd += ["--filter=blob:none", "--sparse"]

clone_cmd += [url, local_path]

# Clone the repository
logger.info("Executing git clone command", extra={"command": " ".join([*clone_cmd[:-1], "<url>", local_path])})
await run_command(*clone_cmd)
logger.info("Git clone completed successfully")
clone_url = _add_token_to_url(url, token)

# Clone the repository using GitPython
logger.info("Executing git clone operation", extra={"url": "<redacted>", "local_path": local_path})
try:
clone_kwargs = {
"single_branch": True,
"no_checkout": True,
"depth": 1,
}

if partial_clone:
# GitPython doesn't directly support --filter and --sparse in clone
# We'll need to use git.Git() for the initial clone with these options
git_cmd = git.Git()
cmd_args = ["clone", "--single-branch", "--no-checkout", "--depth=1"]
if partial_clone:
cmd_args.extend(["--filter=blob:none", "--sparse"])
cmd_args.extend([clone_url, local_path])
git_cmd.execute(cmd_args)
else:
git.Repo.clone_from(clone_url, local_path, **clone_kwargs)

logger.info("Git clone completed successfully")
except git.GitCommandError as exc:
msg = f"Git clone failed: {exc}"
raise RuntimeError(msg) from exc

# Checkout the subpath if it is a partial clone
if partial_clone:
logger.info("Setting up partial clone for subpath", extra={"subpath": config.subpath})
await checkout_partial_clone(config, token=token)
logger.debug("Partial clone setup completed")

git = create_git_command(["git"], local_path, url, token)

# Ensure the commit is locally available
logger.debug("Fetching specific commit", extra={"commit": commit})
await run_command(*git, "fetch", "--depth=1", "origin", commit)

# Write the work-tree at that commit
logger.info("Checking out commit", extra={"commit": commit})
await run_command(*git, "checkout", commit)

# Update submodules
if config.include_submodules:
logger.info("Updating submodules")
await run_command(*git, "submodule", "update", "--init", "--recursive", "--depth=1")
logger.debug("Submodules updated successfully")
# Create repo object and perform operations
try:
repo = create_git_repo(local_path, url, token)

# Ensure the commit is locally available
logger.debug("Fetching specific commit", extra={"commit": commit})
repo.git.fetch("--depth=1", "origin", commit)

# Write the work-tree at that commit
logger.info("Checking out commit", extra={"commit": commit})
repo.git.checkout(commit)

# Update submodules
if config.include_submodules:
logger.info("Updating submodules")
repo.git.submodule("update", "--init", "--recursive", "--depth=1")
logger.debug("Submodules updated successfully")
except git.GitCommandError as exc:
msg = f"Git operation failed: {exc}"
raise RuntimeError(msg) from exc

logger.info("Git clone operation completed successfully", extra={"local_path": local_path})
187 changes: 127 additions & 60 deletions src/gitingest/utils/git_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import TYPE_CHECKING, Final, Iterable
from urllib.parse import urlparse

import git
import httpx
from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND

Expand Down Expand Up @@ -50,6 +51,9 @@ def is_github_host(url: str) -> bool:
async def run_command(*args: str) -> tuple[bytes, bytes]:
"""Execute a shell command asynchronously and return (stdout, stderr) bytes.

This function is kept for backward compatibility with non-git commands.
Git operations should use GitPython directly.

Parameters
----------
*args : str
Expand Down Expand Up @@ -92,21 +96,26 @@ async def ensure_git_installed() -> None:

"""
try:
await run_command("git", "--version")
except RuntimeError as exc:
# Use GitPython to check git availability
git.Git().version()
except git.GitCommandError as exc:
msg = "Git is not installed or not accessible. Please install Git first."
raise RuntimeError(msg) from exc
except Exception as exc:
msg = "Git is not installed or not accessible. Please install Git first."
raise RuntimeError(msg) from exc

if sys.platform == "win32":
try:
stdout, _ = await run_command("git", "config", "core.longpaths")
if stdout.decode().strip().lower() != "true":
longpaths_value = git.Git().config("core.longpaths")
if longpaths_value.lower() != "true":
logger.warning(
"Git clone may fail on Windows due to long file paths. "
"Consider enabling long path support with: 'git config --global core.longpaths true'. "
"Note: This command may require administrator privileges.",
extra={"platform": "windows", "longpaths_enabled": False},
)
except RuntimeError:
except git.GitCommandError:
# Ignore if checking 'core.longpaths' fails.
pass

Expand Down Expand Up @@ -222,61 +231,73 @@ async def fetch_remote_branches_or_tags(url: str, *, ref_type: str, token: str |
msg = f"Invalid fetch type: {ref_type}"
raise ValueError(msg)

cmd = ["git"]

# Add authentication if needed
if token and is_github_host(url):
cmd += ["-c", create_git_auth_header(token, url=url)]

cmd += ["ls-remote"]

fetch_tags = ref_type == "tags"
to_fetch = "tags" if fetch_tags else "heads"

cmd += [f"--{to_fetch}"]

# `--refs` filters out the peeled tag objects (those ending with "^{}") (for tags)
if fetch_tags:
cmd += ["--refs"]

cmd += [url]

await ensure_git_installed()
stdout, _ = await run_command(*cmd)
# For each line in the output:
# - Skip empty lines and lines that don't contain "refs/{to_fetch}/"
# - Extract the branch or tag name after "refs/{to_fetch}/"
return [
line.split(f"refs/{to_fetch}/", 1)[1]
for line in stdout.decode().splitlines()
if line.strip() and f"refs/{to_fetch}/" in line
]

# Use GitPython to get remote references
try:
git_cmd = git.Git()

# Prepare environment with authentication if needed
env = None
if token and is_github_host(url):
auth_url = _add_token_to_url(url, token)
url = auth_url

fetch_tags = ref_type == "tags"
to_fetch = "tags" if fetch_tags else "heads"

# Build ls-remote command
cmd_args = ["ls-remote", f"--{to_fetch}"]
if fetch_tags:
cmd_args.append("--refs") # Filter out peeled tag objects
cmd_args.append(url)

# Run the command
output = git_cmd.execute(cmd_args, env=env)

# Parse output
return [
line.split(f"refs/{to_fetch}/", 1)[1]
for line in output.splitlines()
if line.strip() and f"refs/{to_fetch}/" in line
]
except git.GitCommandError as exc:
msg = f"Failed to fetch {ref_type} from {url}: {exc}"
raise RuntimeError(msg) from exc


def create_git_command(base_cmd: list[str], local_path: str, url: str, token: str | None = None) -> list[str]:
"""Create a git command with authentication if needed.
def create_git_repo(local_path: str, url: str, token: str | None = None) -> git.Repo:
"""Create a GitPython Repo object with authentication if needed.

Parameters
----------
base_cmd : list[str]
The base git command to start with.
local_path : str
The local path where the git command should be executed.
The local path where the git repository is located.
url : str
The repository URL to check if it's a GitHub repository.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.

Returns
-------
list[str]
The git command with authentication if needed.
git.Repo
A GitPython Repo object configured with authentication.

"""
cmd = [*base_cmd, "-C", local_path]
if token and is_github_host(url):
cmd += ["-c", create_git_auth_header(token, url=url)]
return cmd
try:
repo = git.Repo(local_path)

# Configure authentication if needed
if token and is_github_host(url):
auth_header = create_git_auth_header(token, url=url)
# Set the auth header in git config for this repo
key, value = auth_header.split('=', 1)
repo.git.config(key, value)

return repo
except git.InvalidGitRepositoryError as exc:
msg = f"Invalid git repository at {local_path}"
raise ValueError(msg) from exc


def create_git_auth_header(token: str, url: str = "https://github.com") -> str:
Expand Down Expand Up @@ -343,8 +364,13 @@ async def checkout_partial_clone(config: CloneConfig, token: str | None) -> None
if config.blob:
# Remove the file name from the subpath when ingesting from a file url (e.g. blob/branch/path/file.txt)
subpath = str(Path(subpath).parent.as_posix())
checkout_cmd = create_git_command(["git"], config.local_path, config.url, token)
await run_command(*checkout_cmd, "sparse-checkout", "set", subpath)

try:
repo = create_git_repo(config.local_path, config.url, token)
repo.git.execute(["sparse-checkout", "set", subpath])
except git.GitCommandError as exc:
msg = f"Failed to configure sparse-checkout: {exc}"
raise RuntimeError(msg) from exc


async def resolve_commit(config: CloneConfig, token: str | None) -> str:
Expand Down Expand Up @@ -400,20 +426,27 @@ async def _resolve_ref_to_sha(url: str, pattern: str, token: str | None = None)
If the ref does not exist in the remote repository.

"""
# Build: git [-c http.<host>/.extraheader=Auth...] ls-remote <url> <pattern>
cmd: list[str] = ["git"]
if token and is_github_host(url):
cmd += ["-c", create_git_auth_header(token, url=url)]

cmd += ["ls-remote", url, pattern]
stdout, _ = await run_command(*cmd)
lines = stdout.decode().splitlines()
sha = _pick_commit_sha(lines)
if not sha:
msg = f"{pattern!r} not found in {url}"
raise ValueError(msg)

return sha
try:
git_cmd = git.Git()

# Prepare authentication if needed
auth_url = url
if token and is_github_host(url):
auth_url = _add_token_to_url(url, token)

# Execute ls-remote command
output = git_cmd.execute(["ls-remote", auth_url, pattern])
lines = output.splitlines()

sha = _pick_commit_sha(lines)
if not sha:
msg = f"{pattern!r} not found in {url}"
raise ValueError(msg)

return sha
except git.GitCommandError as exc:
msg = f"Failed to resolve {pattern} in {url}: {exc}"
raise ValueError(msg) from exc


def _pick_commit_sha(lines: Iterable[str]) -> str | None:
Expand Down Expand Up @@ -449,3 +482,37 @@ def _pick_commit_sha(lines: Iterable[str]) -> str | None:
first_non_peeled = sha

return first_non_peeled # branch or lightweight tag (or None)


def _add_token_to_url(url: str, token: str) -> str:
"""Add authentication token to GitHub URL.

Parameters
----------
url : str
The original GitHub URL.
token : str
The GitHub token to add.

Returns
-------
str
The URL with embedded authentication.

"""
from urllib.parse import urlparse, urlunparse

parsed = urlparse(url)
# Add token as username in URL (GitHub supports this)
netloc = f"x-oauth-basic:{token}@{parsed.hostname}"
if parsed.port:
netloc += f":{parsed.port}"

return urlunparse((
parsed.scheme,
netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment
))
Loading
Loading