"""Module for git related operations."""
from __future__ import annotations
from contextlib import nullcontext
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from git import GitCommandError, Repo
from semantic_release.cli.masking_filter import MaskingFilter
from semantic_release.cli.util import indented, noop_report
from semantic_release.errors import (
DetachedHeadGitError,
GitAddError,
GitCommitEmptyIndexError,
GitCommitError,
GitFetchError,
GitPushError,
GitTagError,
LocalGitError,
UnknownUpstreamBranchError,
UpstreamBranchChangedError,
)
from semantic_release.globals import logger
if TYPE_CHECKING: # pragma: no cover
from contextlib import _GeneratorContextManager
from logging import Logger
from typing import Sequence
from git import Actor
[docs]
class GitProject:
def __init__(
self,
directory: Path | str = ".",
commit_author: Actor | None = None,
credential_masker: MaskingFilter | None = None,
) -> None:
self._project_root = Path(directory).resolve()
self._logger = logger
self._cred_masker = credential_masker or MaskingFilter()
self._commit_author = commit_author
@property
def project_root(self) -> Path:
return self._project_root
@property
def logger(self) -> Logger:
return self._logger
def _get_custom_environment(
self,
repo: Repo,
custom_vars: dict[str, str] | None = None,
) -> nullcontext[None] | _GeneratorContextManager[None]:
"""
git.custom_environment is a context manager but
is not reentrant, so once we have "used" it
we need to throw it away and re-create it in
order to use it again
"""
author_vars = (
{
"GIT_AUTHOR_NAME": self._commit_author.name,
"GIT_AUTHOR_EMAIL": self._commit_author.email,
"GIT_COMMITTER_NAME": self._commit_author.name,
"GIT_COMMITTER_EMAIL": self._commit_author.email,
}
if self._commit_author
else {}
)
custom_env_vars = {
**author_vars,
**(custom_vars or {}),
}
return (
nullcontext()
if not custom_env_vars
else repo.git.custom_environment(**custom_env_vars)
)
[docs]
def is_dirty(self) -> bool:
with Repo(str(self.project_root)) as repo:
return repo.is_dirty()
[docs]
def is_shallow_clone(self) -> bool:
"""
Check if the repository is a shallow clone.
:return: True if the repository is a shallow clone, False otherwise
"""
with Repo(str(self.project_root)) as repo:
shallow_file = Path(repo.git_dir, "shallow")
return shallow_file.exists()
[docs]
def git_unshallow(self, noop: bool = False) -> None:
"""
Convert a shallow clone to a full clone by fetching the full history.
:param noop: Whether or not to actually run the unshallow command
"""
if noop:
noop_report("would have run:\n" " git fetch --unshallow")
return
with Repo(str(self.project_root)) as repo:
try:
self.logger.info("Converting shallow clone to full clone...")
repo.git.fetch("--unshallow")
self.logger.info("Repository unshallowed successfully")
except GitCommandError as err:
# If the repository is already a full clone, git fetch --unshallow will fail
# with "fatal: --unshallow on a complete repository does not make sense"
# We can safely ignore this error by checking the stderr message
stderr = str(err.stderr) if err.stderr else ""
if "does not make sense" in stderr or "complete repository" in stderr:
self.logger.debug("Repository is already a full clone")
else:
self.logger.exception(str(err))
raise
[docs]
def git_add(
self,
paths: Sequence[Path | str],
force: bool = False,
strict: bool = False,
noop: bool = False,
) -> None:
if noop:
noop_report(
indented(
f"""\
would have run:
git add {str.join(" ", [str(Path(p)) for p in paths])}
"""
)
)
return
git_args = dict(
filter(
lambda k_v: k_v[1], # if truthy
{
"force": force,
}.items(),
)
)
with Repo(str(self.project_root)) as repo:
# TODO: in future this loop should be 1 line:
# repo.index.add(all_paths_to_add, force=False) # noqa: ERA001
# but since 'force' is deliberately ineffective (as in docstring) in gitpython 3.1.18
# we have to do manually add each filepath, and catch the exception if it is an ignored file
for updated_path in paths:
try:
repo.git.add(str(Path(updated_path)), **git_args)
except GitCommandError as err: # noqa: PERF203, acceptable performance loss
err_msg = f"Failed to add path ({updated_path}) to index"
if strict:
self.logger.exception(str(err))
raise GitAddError(err_msg) from err
self.logger.warning(err_msg)
[docs]
def git_commit(
self,
message: str,
date: int | None = None,
commit_all: bool = False,
no_verify: bool = False,
noop: bool = False,
) -> None:
git_args = dict(
filter(
lambda k_v: k_v[1], # if truthy
{
"a": commit_all,
"m": message,
"date": date,
"no_verify": no_verify,
}.items(),
)
)
if noop:
command = (
f"""\
GIT_AUTHOR_NAME={self._commit_author.name} \\
GIT_AUTHOR_EMAIL={self._commit_author.email} \\
GIT_COMMITTER_NAME={self._commit_author.name} \\
GIT_COMMITTER_EMAIL={self._commit_author.email} \\
"""
if self._commit_author
else ""
)
# Indents the newlines so that terminal formatting is happy - note the
# git commit line of the output is 24 spaces indented too
# Only this message needs such special handling because of the newlines
# that might be in a commit message between the subject and body
indented_commit_message = message.replace("\n\n", "\n\n" + " " * 24)
command += f"git commit -m '{indented_commit_message}'"
command += "--all" if commit_all else ""
command += "--no-verify" if no_verify else ""
noop_report(
indented(
f"""\
would have run:
{command}
"""
)
)
return
with Repo(str(self.project_root)) as repo:
has_index_changes = bool(repo.index.diff("HEAD"))
has_working_changes = self.is_dirty()
will_commit_files = has_index_changes or (
has_working_changes and commit_all
)
if not will_commit_files:
raise GitCommitEmptyIndexError("No changes to commit!")
with self._get_custom_environment(repo):
try:
repo.git.commit(**git_args)
except GitCommandError as err:
self.logger.exception(str(err))
raise GitCommitError("Failed to commit changes") from err
[docs]
def git_tag(
self,
tag_name: str,
message: str,
isotimestamp: str,
force: bool = False,
noop: bool = False,
) -> None:
try:
datetime.fromisoformat(isotimestamp)
except ValueError as err:
raise ValueError("Invalid timestamp format") from err
if noop:
command = str.join(
" ",
filter(
None,
[
f"GIT_COMMITTER_DATE={isotimestamp}",
*(
[
f"GIT_AUTHOR_NAME={self._commit_author.name}",
f"GIT_AUTHOR_EMAIL={self._commit_author.email}",
f"GIT_COMMITTER_NAME={self._commit_author.name}",
f"GIT_COMMITTER_EMAIL={self._commit_author.email}",
]
if self._commit_author
else [""]
),
f"git tag -a {tag_name} -m '{message}'",
"--force" if force else "",
],
),
).strip()
noop_report(
indented(
f"""\
would have run:
{command}
"""
)
)
return
with Repo(str(self.project_root)) as repo, self._get_custom_environment(
repo,
{"GIT_COMMITTER_DATE": isotimestamp},
):
try:
repo.git.tag(tag_name, a=True, m=message, force=force)
except GitCommandError as err:
self.logger.exception(str(err))
raise GitTagError(f"Failed to create tag ({tag_name})") from err
[docs]
def git_push_branch(self, remote_url: str, branch: str, noop: bool = False) -> None:
if noop:
noop_report(
indented(
f"""\
would have run:
git push {self._cred_masker.mask(remote_url)} {branch}
"""
)
)
return
with Repo(str(self.project_root)) as repo:
try:
repo.git.push(remote_url, branch)
except GitCommandError as err:
self.logger.exception(str(err))
raise GitPushError(
f"Failed to push branch ({branch}) to remote"
) from err
[docs]
def git_push_tag(
self, remote_url: str, tag: str, noop: bool = False, force: bool = False
) -> None:
if noop:
noop_report(
indented(
f"""\
would have run:
git push {self._cred_masker.mask(remote_url)} tag {tag} {"--force" if force else ""}
""" # noqa: E501
)
)
return
with Repo(str(self.project_root)) as repo:
try:
repo.git.push(remote_url, "tag", tag, force=force)
except GitCommandError as err:
self.logger.exception(str(err))
raise GitPushError(f"Failed to push tag ({tag}) to remote") from err
[docs]
def verify_upstream_unchanged( # noqa: C901
self,
local_ref: str = "HEAD",
upstream_ref: str = "origin",
remote_url: str | None = None,
noop: bool = False,
) -> None:
"""
Verify that the upstream branch has not changed since the given local reference.
:param local_ref: The local reference to compare against upstream (default: HEAD)
:param upstream_ref: The name of the upstream remote or specific remote branch (default: origin)
:param remote_url: Optional authenticated remote URL to use for fetching (default: None, uses configured remote)
:param noop: Whether to skip the actual verification (for dry-run mode)
:raises UpstreamBranchChangedError: If the upstream branch has changed
"""
if not local_ref.strip():
raise ValueError("Local reference cannot be empty")
if not upstream_ref.strip():
raise ValueError("Upstream reference cannot be empty")
if noop:
noop_report(
indented(
"""\
would have verified that upstream branch has not changed
"""
)
)
return
with Repo(str(self.project_root)) as repo:
# Get the current active branch
try:
active_branch = repo.active_branch
except TypeError:
# When in detached HEAD state, active_branch raises TypeError
err_msg = (
"Repository is in detached HEAD state, cannot verify upstream state"
)
raise DetachedHeadGitError(err_msg) from None
# Get the tracking branch (upstream branch)
if (tracking_branch := active_branch.tracking_branch()) is not None:
upstream_full_ref_name = tracking_branch.name
self.logger.info("Upstream branch name: %s", upstream_full_ref_name)
else:
# If no tracking branch is set, derive it
upstream_name = (
upstream_ref.strip()
if upstream_ref.find("/") == -1
else upstream_ref.strip().split("/", maxsplit=1)[0]
)
if not repo.remotes or upstream_name not in repo.remotes:
err_msg = "No remote found; cannot verify upstream state!"
raise UnknownUpstreamBranchError(err_msg)
upstream_full_ref_name = (
f"{upstream_name}/{active_branch.name}"
if upstream_ref.find("/") == -1
else upstream_ref.strip()
)
if upstream_full_ref_name not in repo.refs:
err_msg = f"No upstream branch found for '{active_branch.name}'; cannot verify upstream state!"
raise UnknownUpstreamBranchError(err_msg)
# Extract the remote name from the tracking branch
# tracking_branch.name is in the format "remote/branch"
remote_name, remote_branch_name = upstream_full_ref_name.split(
"/", maxsplit=1
)
remote_ref_obj = repo.remotes[remote_name]
# Fetch the latest changes from the remote
self.logger.info("Fetching latest changes from remote '%s'", remote_name)
try:
# Check if we should use authenticated URL for fetch
# Only use remote_url if:
# 1. It's provided and different from the configured remote URL
# 2. It contains authentication credentials (@ symbol)
# 3. The configured remote is NOT a local path, file:// URL, or test URL (example.com)
# This ensures we don't break tests or local development
configured_url = remote_ref_obj.url
is_local_or_test_remote = (
configured_url.startswith(("file://", "/", "C:/", "H:/"))
or "example.com" in configured_url
or not configured_url.startswith(
(
"https://",
"http://",
"git://",
"git@",
"ssh://",
"git+ssh://",
)
)
)
use_authenticated_fetch = (
remote_url
and "@" in remote_url
and remote_url != configured_url
and not is_local_or_test_remote
)
if use_authenticated_fetch:
# Use authenticated remote URL for fetch
# Fetch the remote branch and update the local tracking ref
repo.git.fetch(
remote_url,
f"refs/heads/{remote_branch_name}:refs/remotes/{upstream_full_ref_name}",
)
else:
# Use the default remote configuration for local paths,
# file:// URLs, test URLs, or when no authentication is needed
remote_ref_obj.fetch()
except GitCommandError as err:
self.logger.exception(str(err))
err_msg = f"Failed to fetch from remote '{remote_name}'"
raise GitFetchError(err_msg) from err
# Get the SHA of the upstream branch
try:
upstream_commit_ref = remote_ref_obj.refs[remote_branch_name].commit
upstream_sha = upstream_commit_ref.hexsha
except AttributeError as err:
self.logger.exception(str(err))
err_msg = f"Unable to determine upstream branch SHA for '{upstream_full_ref_name}'"
raise GitFetchError(err_msg) from err
# Get the SHA of the specified ref (default: HEAD)
try:
local_commit = repo.commit(repo.git.rev_parse(local_ref))
except GitCommandError as err:
self.logger.exception(str(err))
err_msg = f"Unable to determine the SHA for local ref '{local_ref}'"
raise LocalGitError(err_msg) from err
# Compare the two SHAs
if local_commit.hexsha != upstream_sha and not any(
commit.hexsha == upstream_sha for commit in local_commit.iter_parents()
):
err_msg = str.join(
"\n",
(
f"[LOCAL SHA] {local_commit.hexsha} != {upstream_sha} [UPSTREAM SHA].",
f"Upstream branch '{upstream_full_ref_name}' has changed!",
),
)
raise UpstreamBranchChangedError(err_msg)
self.logger.info(
"Verified upstream branch '%s' has not changed",
upstream_full_ref_name,
)