# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 et:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Internal low-level interface to Git repositories
"""
from __future__ import annotations
import logging
import os
import os.path as op
import posixpath
import re
import subprocess
import warnings
from collections.abc import (
Callable,
Iterable,
Iterator,
Mapping,
Sequence,
)
from functools import wraps
from itertools import chain
from os import (
PathLike,
linesep,
)
from os.path import (
commonprefix,
curdir,
dirname,
exists,
isabs,
)
from os.path import join as opj
from os.path import (
pardir,
relpath,
sep,
)
from re import Pattern
from typing import (
TYPE_CHECKING,
Any,
List,
Optional,
Tuple,
TypeVar,
Union,
overload,
)
import datalad.utils as ut
from datalad import ssh_manager
from datalad.cmd import (
BatchedCommand,
GitWitlessRunner,
NoCapture,
StdOutErrCapture,
WitlessProtocol,
)
from datalad.config import (
parse_gitconfig_dump,
write_config_section,
)
from datalad.consts import (
ILLEGAL_CHARS_WIN,
RESERVED_NAMES_WIN,
)
from datalad.core.local.repo import repo_from_path
from datalad.dataset.gitrepo import GitRepo as CoreGitRepo
from datalad.dataset.gitrepo import (
_get_dot_git,
path_based_str_repr,
)
from datalad.log import log_progress
from datalad.support.due import (
Doi,
due,
)
from datalad.typing import (
Concatenate,
Literal,
P,
Protocol,
Self,
T,
TypedDict,
)
from datalad.utils import (
Path,
PurePosixPath,
ensure_dir,
ensure_list,
ensure_unicode,
generate_file_chunks,
getpwd,
is_interactive,
on_windows,
optional_args,
path_is_subpath,
posix_relpath,
)
from .exceptions import (
CapturedException,
CommandError,
FileNotInRepositoryError,
InvalidGitReferenceError,
InvalidGitRepositoryError,
NoSuchPathError,
)
# imports from same module:
from .external_versions import external_versions
from .network import (
RI,
PathRI,
is_ssh,
)
from .path import (
get_filtered_paths_,
get_parent_paths,
)
if TYPE_CHECKING:
from datalad.distribution.dataset import Dataset
# shortcuts
_curdirsep = curdir + sep
_pardirsep = pardir + sep
lgr = logging.getLogger('datalad.gitrepo')
Option = Union[str, bool, None, List[Union[str, bool, None]], Tuple[Union[str, bool, None], ...]]
# outside the repo base classes only used in ConfigManager
[docs]
def to_options(split_single_char_options: bool = True, **kwargs: Option) -> list[str]:
"""Transform keyword arguments into a list of cmdline options
Imported from GitPython.
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
Parameters
----------
split_single_char_options: bool
kwargs:
Returns
-------
list
"""
def dashify(string: str) -> str:
return string.replace('_', '-')
def transform_kwarg(name: str, value: str | bool | None, split_single_char_options: bool) -> list[str]:
if len(name) == 1:
if value is True:
return ["-%s" % name]
elif value not in (False, None):
if split_single_char_options:
return ["-%s" % name, "%s" % value]
else:
return ["-%s%s" % (name, value)]
else:
if value is True:
return ["--%s" % dashify(name)]
elif value is not False and value is not None:
return ["--%s=%s" % (dashify(name), value)]
return []
args = []
for k, v in sorted(kwargs.items()):
if isinstance(v, (list, tuple)):
for value in v:
args += transform_kwarg(k, value, split_single_char_options)
else:
args += transform_kwarg(k, v, split_single_char_options)
return args
def _normalize_path(base_dir: str, path: str) -> str:
"""Helper to check paths passed to methods of this class.
Checks whether `path` is beneath `base_dir` and normalizes it.
Additionally paths are converted into relative paths with respect to
`base_dir`, considering PWD in case of relative paths. This
is intended to be used in repository classes, which means that
`base_dir` usually will be the repository's base directory.
Parameters
----------
base_dir: str
directory to serve as base to normalized, relative paths
path: str
path to be normalized
Returns
-------
str:
path, that is a relative path with respect to `base_dir`
"""
if not path:
return path
pathobj = Path(path)
# do absolute() in addition to always get an absolute path
# even with non-existing base_dirs on windows
base_dir = str(Path(base_dir).resolve().absolute()) # realpath OK
# path = normpath(path)
# Note: disabled normpath, because it may break paths containing symlinks;
# But we don't want to realpath relative paths, in case cwd isn't the
# correct base.
if pathobj.is_absolute():
# path might already be a symlink pointing to annex etc,
# so realpath only its directory, to get "inline" with
# realpath(base_dir) above
path = str(pathobj.parent.resolve() / pathobj.name) # realpath OK
# Executive decision was made to not do this kind of magic!
#
# elif commonprefix([realpath(getpwd()), base_dir]) == base_dir:
# # If we are inside repository, rebuilt relative paths.
# path = opj(realpath(getpwd()), path)
#
# BUT with relative curdir/pardir start it would assume relative to curdir
#
elif path.startswith(_curdirsep) or path.startswith(_pardirsep):
path = str(Path(getpwd()).resolve() / pathobj) # realpath OK
else:
# We were called from outside the repo. Therefore relative paths
# are interpreted as being relative to self.path already.
return path
if commonprefix([path, base_dir]) != base_dir:
raise FileNotInRepositoryError(msg="Path outside repository: %s"
% base_dir, filename=path)
return relpath(path, start=base_dir)
class _WithPath(Protocol):
path: str
[docs]
@optional_args
def normalize_path(func: Callable[Concatenate[_WithPath, str, P], T]) -> Callable[Concatenate[_WithPath, str, P], T]:
"""Decorator to provide unified path conversion for a single file
Unlike normalize_paths, intended to be used for functions dealing with a
single filename at a time
Note
----
This is intended to be used within the repository classes and therefore
returns a class method!
The decorated function is expected to take a path at
first positional argument (after 'self'). Additionally the class `func`
is a member of, is expected to have an attribute 'path'.
"""
@wraps(func)
def _wrap_normalize_path(self: _WithPath, file_: str, *args: P.args, **kwargs: P.kwargs) -> T:
file_new = _normalize_path(self.path, file_)
return func(self, file_new, *args, **kwargs)
return _wrap_normalize_path
[docs]
@optional_args
def normalize_paths(func, match_return_type=True, map_filenames_back=False,
serialize=False):
"""Decorator to provide unified path conversions.
Note
----
This is intended to be used within the repository classes and therefore
returns a class method!
The decorated function is expected to take a path or a list of paths at
first positional argument (after 'self'). Additionally the class `func`
is a member of, is expected to have an attribute 'path'.
Accepts either a list of paths or a single path in a str. Passes a list
to decorated function either way, but would return based on the value of
match_return_type and possibly input argument.
If a call to the wrapped function includes normalize_path and it is False
no normalization happens for that function call (used for calls to wrapped
functions within wrapped functions, while possible CWD is within a
repository)
Parameters
----------
match_return_type : bool, optional
If True, and a single string was passed in, it would return the first
element of the output (after verifying that it is a list of length 1).
It makes easier to work with single files input.
map_filenames_back : bool, optional
If True and returned value is a dictionary, it assumes to carry entries
one per file, and then filenames are mapped back to as provided from the
normalized (from the root of the repo) paths
serialize : bool, optional
Loop through files giving only a single one to the function one at a time.
This allows to simplify implementation and interface to annex commands
which do not take multiple args in the same call (e.g. checkpresentkey)
"""
@wraps(func)
def _wrap_normalize_paths(self, files, *args, **kwargs):
normalize = _normalize_path if kwargs.pop('normalize_paths', True) \
else lambda rpath, filepath: filepath
if files:
if isinstance(files, str) or not files:
files_new = [normalize(self.path, files)]
single_file = True
elif isinstance(files, list):
files_new = [normalize(self.path, path) for path in files]
single_file = False
else:
raise ValueError("_files_decorator: Don't know how to handle "
"instance of %s." % type(files))
else:
single_file = None
files_new = []
if map_filenames_back:
def remap_filenames(out):
"""Helper to map files back to non-normalized paths"""
if isinstance(out, dict):
assert(len(out) == len(files_new))
files_ = [files] if single_file else files
mapped = out.__class__()
for fin, fout in zip(files_, files_new):
mapped[fin] = out[fout]
return mapped
else:
return out
else:
remap_filenames = lambda x: x
if serialize: # and not single_file:
result = [
func(self, f, *args, **kwargs)
for f in files_new
]
else:
result = func(self, files_new, *args, **kwargs)
if single_file is None:
# no files were provided, nothing we can do really
return result
elif (result is None) or not match_return_type or not single_file:
# If function doesn't return anything or no denormalization
# was requested or it was not a single file
return remap_filenames(result)
elif single_file:
if len(result) != 1:
# Magic doesn't apply
return remap_filenames(result)
elif isinstance(result, (list, tuple)):
return result[0]
elif isinstance(result, dict) and tuple(result)[0] == files_new[0]:
# assume that returned dictionary has files as keys.
return tuple(result.values())[0]
else:
# no magic can apply
return remap_filenames(result)
else:
return RuntimeError("should have not got here... check logic")
return _wrap_normalize_paths
def _prune_deeper_repos(repos: list[Path]) -> list[Path]:
return repos
[docs]
class GitProgress(WitlessProtocol):
"""Reduced variant of GitPython's RemoteProgress class
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
"""
# inform super-class to capture stderr
proc_err = True
_num_op_codes = 10
BEGIN, END, COUNTING, COMPRESSING, WRITING, RECEIVING, RESOLVING, FINDING_SOURCES, CHECKING_OUT, ENUMERATING = \
[1 << x for x in range(_num_op_codes)]
STAGE_MASK = BEGIN | END
OP_MASK = ~STAGE_MASK
DONE_TOKEN = 'done.'
TOKEN_SEPARATOR = ', '
_known_ops = {
COUNTING: ("Counting", "Objects"),
ENUMERATING: ("Enumerating", "Objects"),
COMPRESSING: ("Compressing", "Objects"),
WRITING: ("Writing", "Objects"),
RECEIVING: ("Receiving", "Objects"),
RESOLVING: ("Resolving", "Deltas"),
FINDING_SOURCES: ("Finding", "Sources"),
CHECKING_OUT: ("Check out", "Things"),
}
__slots__ = ('_unprocessed', '_seen_ops', '_pbars')
re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)")
re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
def __init__(self, done_future: Any = None, encoding: Optional[str] = None) -> None:
super().__init__(done_future=done_future, encoding=encoding)
self._unprocessed: Optional[bytes] = None
self._seen_ops: list[int] = []
self._pbars: set[str] = set()
[docs]
def connection_made(self, transport: subprocess.Popen) -> None:
super().connection_made(transport)
self._seen_ops = []
self._pbars = set()
[docs]
def process_exited(self) -> None:
# take down any progress bars that were not closed orderly
for pbar_id in self._pbars:
log_progress(
lgr.info,
pbar_id,
'Finished',
)
super().process_exited()
[docs]
def pipe_data_received(self, fd: int, byts: bytes) -> None:
# progress reports only come from stderr
if fd != 2:
# let the base class decide what to do with it
super().pipe_data_received(fd, byts)
return
for line in byts.splitlines(keepends=True):
# put any unprocessed content back in front
line = self._unprocessed + line if self._unprocessed else line
self._unprocessed = None
if not self._parse_progress_line(line):
# anything that doesn't look like a progress report
# is retained and returned
# in case of partial progress lines, this can lead to
# leakage of progress info into the output, but
# it is better to enable better (maybe more expensive)
# subsequent filtering than hiding lines with
# unknown, potentially important info
lgr.debug('Non-progress stderr: %s', line)
if line.endswith((b'\r', b'\n')):
# complete non-progress line, pass on
super().pipe_data_received(fd, line)
else:
# an incomplete line, maybe the next batch completes
# it to become a recognizable progress report
self._unprocessed = line
def _parse_progress_line(self, bytes_line: bytes) -> bool:
"""Process a single line
Parameters
----------
bytes_line : bytes
Returns
-------
bool
Flag whether the line was recognized as a Git progress report.
"""
# handle
# Counting objects: 4, done.
# Compressing objects: 50% (1/2)
# Compressing objects: 100% (2/2)
# Compressing objects: 100% (2/2), done.
line = bytes_line.decode(self.encoding)
if line.startswith(('warning:', 'error:', 'fatal:')):
return False
# find escape characters and cut them away - regex will not work with
# them as they are non-ascii. As git might expect a tty, it will send them
last_valid_index = None
for i, c in enumerate(reversed(line)):
if ord(c) < 32:
# its a slice index
last_valid_index = -i - 1
# END character was non-ascii
# END for each character in line
if last_valid_index is not None:
line = line[:last_valid_index]
# END cut away invalid part
line = line.rstrip()
cur_count, max_count = None, None
match = self.re_op_relative.match(line)
if match is None:
match = self.re_op_absolute.match(line)
if not match:
return False
# END could not get match
op_code = 0
_remote, op_name, _percent, cur_count, max_count, message = match.groups()
# get operation id
if op_name == "Counting objects":
op_code |= self.COUNTING
elif op_name == "Compressing objects":
op_code |= self.COMPRESSING
elif op_name == "Writing objects":
op_code |= self.WRITING
elif op_name == 'Receiving objects':
op_code |= self.RECEIVING
elif op_name == 'Resolving deltas':
op_code |= self.RESOLVING
elif op_name == 'Finding sources':
op_code |= self.FINDING_SOURCES
elif op_name == 'Checking out files':
op_code |= self.CHECKING_OUT
elif op_name == 'Enumerating objects':
op_code |= self.ENUMERATING
else:
# Note: On windows it can happen that partial lines are sent
# Hence we get something like "CompreReceiving objects", which is
# a blend of "Compressing objects" and "Receiving objects".
# This can't really be prevented.
lgr.debug(
'Output line matched a progress report of an unknown type: %s',
line)
# TODO investigate if there is any chance that we might swallow
# important info -- until them do not flag this line
# as progress
return False
# END handle op code
pbar_id = 'gitprogress-{}-{}'.format(id(self), op_code)
op_props = self._known_ops[op_code]
# figure out stage
if op_code not in self._seen_ops:
self._seen_ops.append(op_code)
op_code |= self.BEGIN
log_progress(
lgr.info,
pbar_id,
'Start {} {}'.format(
op_props[0].lower(),
op_props[1].lower(),
),
label=op_props[0],
unit=' {}'.format(op_props[1]),
total=float(max_count) if max_count else None,
)
self._pbars.add(pbar_id)
# END begin opcode
if message is None:
message = ''
# END message handling
done_progress = False
message = message.strip()
if message.endswith(self.DONE_TOKEN):
op_code |= self.END
message = message[:-len(self.DONE_TOKEN)]
done_progress = True
# END end message handling
message = message.strip(self.TOKEN_SEPARATOR)
if cur_count and max_count:
log_progress(
lgr.info,
pbar_id,
line,
update=float(cur_count),
noninteractive_level=logging.DEBUG,
)
if done_progress:
log_progress(
lgr.info,
pbar_id,
'Finished {} {}'.format(
op_props[0].lower(),
op_props[1].lower(),
),
noninteractive_level=logging.DEBUG,
)
self._pbars.discard(pbar_id)
return True
[docs]
class StdOutCaptureWithGitProgress(GitProgress):
proc_out = True
[docs]
class FetchInfo(dict):
"""
dict that carries results of a fetch operation of a single head
Reduced variant of GitPython's RemoteProgress class
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
"""
NEW_TAG, NEW_HEAD, HEAD_UPTODATE, TAG_UPDATE, REJECTED, FORCED_UPDATE, \
FAST_FORWARD, ERROR = [1 << x for x in range(8)]
_re_fetch_result = re.compile(r'^\s*(.) (\[?[\w\s\.$@]+\]?)\s+(.+) [-> ]+ ([^\s]+)( \(.*\)?$)?')
_flag_map = {
'!': ERROR,
'+': FORCED_UPDATE,
'*': 0,
'=': HEAD_UPTODATE,
' ': FAST_FORWARD,
'-': TAG_UPDATE,
}
_operation_map = {
NEW_TAG: 'new-tag',
NEW_HEAD: 'new-branch',
HEAD_UPTODATE: 'uptodate',
TAG_UPDATE: 'tag-update',
REJECTED: 'rejected',
FORCED_UPDATE: 'forced-update',
FAST_FORWARD: 'fast-forward',
ERROR: 'error',
}
@classmethod
def _from_line(cls, line: str) -> FetchInfo:
"""Parse information from the given line as returned by git-fetch -v
and return a new FetchInfo object representing this information.
"""
match = cls._re_fetch_result.match(line)
if match is None:
raise ValueError("Failed to parse line: %r" % line)
# parse lines
control_character, operation, local_remote_ref, remote_local_ref, note = \
match.groups()
# parse flags from control_character
flags = 0
try:
flags |= cls._flag_map[control_character]
except KeyError:
raise ValueError(
"Control character %r unknown as parsed from line %r"
% (control_character, line))
# END control char exception handling
# parse operation string for more info - makes no sense for symbolic refs,
# but we parse it anyway
old_commit = None
if 'rejected' in operation:
flags |= cls.REJECTED
if 'new tag' in operation:
flags |= cls.NEW_TAG
if 'tag update' in operation:
flags |= cls.TAG_UPDATE
if 'new branch' in operation:
flags |= cls.NEW_HEAD
if '...' in operation or '..' in operation:
split_token = '...'
if control_character == ' ':
split_token = split_token[:-1]
old_commit = operation.split(split_token)[0]
# END handle refspec
return cls(
ref=remote_local_ref.strip(),
local_ref=local_remote_ref.strip(),
# convert flag int into a list of operation labels
operations=[
cls._operation_map[o]
for o in cls._operation_map.keys()
if flags & o
],
note=note,
old_commit=old_commit,
)
[docs]
class PushInfo(dict):
"""dict that carries results of a push operation of a single head
Reduced variant of GitPython's RemoteProgress class
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
"""
NEW_TAG, NEW_HEAD, NO_MATCH, REJECTED, REMOTE_REJECTED, REMOTE_FAILURE, DELETED, \
FORCED_UPDATE, FAST_FORWARD, UP_TO_DATE, ERROR = [1 << x for x in range(11)]
_flag_map = {'X': NO_MATCH,
'-': DELETED,
'*': 0,
'+': FORCED_UPDATE,
' ': FAST_FORWARD,
'=': UP_TO_DATE,
'!': ERROR}
_operation_map = {
NEW_TAG: 'new-tag',
NEW_HEAD: 'new-branch',
NO_MATCH: 'no-match',
REJECTED: 'rejected',
REMOTE_REJECTED: 'remote-rejected',
REMOTE_FAILURE: 'remote-failure',
DELETED: 'deleted',
FORCED_UPDATE: 'forced-update',
FAST_FORWARD: 'fast-forward',
UP_TO_DATE: 'uptodate',
ERROR: 'error',
}
@classmethod
def _from_line(cls, line: str) -> PushInfo:
"""Create a new PushInfo instance as parsed from line which is expected to be like
refs/heads/master:refs/heads/master 05d2687..1d0568e as bytes"""
control_character, from_to, summary = line.split('\t', 3)
flags = 0
# control character handling
try:
flags |= cls._flag_map[control_character]
except KeyError:
raise ValueError("Control character %r unknown as parsed from line %r" % (control_character, line))
# END handle control character
# from_to handling
from_ref_string, to_ref_string = from_to.split(':')
# commit handling, could be message or commit info
old_commit = None
if summary.startswith('['):
if "[rejected]" in summary:
flags |= cls.REJECTED
elif "[remote rejected]" in summary:
flags |= cls.REMOTE_REJECTED
elif "[remote failure]" in summary:
flags |= cls.REMOTE_FAILURE
elif "[no match]" in summary:
flags |= cls.ERROR
elif "[new tag]" in summary:
flags |= cls.NEW_TAG
elif "[new branch]" in summary:
flags |= cls.NEW_HEAD
# up-to-date encoded in control character
else:
# fast-forward or forced update - was encoded in control character,
# but we parse the old and new commit
split_token = "..."
if control_character == " ":
split_token = ".."
old_sha, _new_sha = summary.split(' ')[0].split(split_token)
# have to use constructor here as the sha usually is abbreviated
old_commit = old_sha
# END message handling
return cls(
from_ref=from_ref_string.strip(),
to_ref=to_ref_string.strip(),
# convert flag int into a list of operation labels
operations=[
cls._operation_map[o]
for o in cls._operation_map.keys()
if flags & o
],
note=summary.strip(),
old_commit=old_commit,
)
InfoT = TypeVar("InfoT", FetchInfo, PushInfo)
[docs]
class GitAddOutput(TypedDict):
file: str
success: bool
[docs]
@path_based_str_repr
class GitRepo(CoreGitRepo):
"""Representation of a git repository
"""
# We must check git config to have name and email set, but
# should do it once
_config_checked = False
GIT_MIN_VERSION = "2.25.0"
git_version = None
@classmethod
def _check_git_version(cls) -> None:
external_versions.check("cmd:git", min_version=cls.GIT_MIN_VERSION)
cls.git_version = external_versions['cmd:git']
# This is the least common denominator to claim that a user
# used DataLad.
# Citing JOSS publication https://joss.theoj.org/papers/10.21105/joss.03262
@due.dcite(Doi("10.21105/joss.03262"),
# override path since there is no need ATM for such details
path="datalad",
description="DataLad - Data management and distribution platform")
def __init__(self, path: str, runner: Any = None, create: bool = True,
git_opts: Optional[dict[str, Any]] = None, repo: Any = None,
fake_dates: bool = False, create_sanity_checks: bool = True,
**kwargs: Any) -> None:
"""Creates representation of git repository at `path`.
Can also be used to create a git repository at `path`.
Parameters
----------
path: str
path to the git repository; In case it's not an absolute path,
it's relative to PWD
create: bool, optional
if true, creates a git repository at `path` if there is none. Also
creates `path`, if it doesn't exist.
If set to false, an exception is raised in case `path` doesn't exist
or doesn't contain a git repository.
repo: git.Repo, optional
This argument is ignored.
create_sanity_checks: bool, optional
Whether to perform sanity checks during initialization (when
`create=True` and target path is not a valid repo already), such as
that new repository is not created in the directory where git already
tracks some files.
kwargs:
keyword arguments serving as additional options to the git-init
command. Therefore, it makes sense only if called with `create`.
Generally, this way of passing options to the git executable is
(or will be) used a lot in this class. It's a transformation of
python-style keyword arguments (or a `dict`) to command line arguments,
provided by GitPython.
A single character keyword will be prefixed by '-', multiple characters
by '--'. An underscore in the keyword becomes a dash. The value of the
keyword argument is used as the value for the corresponding command
line argument. Assigning a boolean creates a flag.
Examples:
no_commit=True => --no-commit
C='/my/path' => -C /my/path
"""
# this will set up .pathobj and .dot_git
super().__init__(path)
if self.git_version is None:
self._check_git_version()
# BEGIN Repo validity test
# We want to fail early for tests, that would be performed a lot. In
# particular this is about GitRepo.is_valid_repo. We would use the
# latter to decide whether or not to call GitRepo() only for __init__ to
# then test the same things again. If we fail early we can save the
# additional test from outer scope.
self.path = path
# Note, that the following three path objects are used often and
# therefore are stored for performance. Path object creation comes with
# a cost. Most notably, this is used for validity checking of the
# repository.
_valid_repo = self.is_valid_git()
do_create = False
if create and not _valid_repo:
if repo is not None:
# `repo` passed with `create`, which doesn't make sense
raise TypeError("argument 'repo' must not be used with 'create'")
do_create = True
else:
# Note: We used to call gitpy.Repo(path) here, which potentially
# raised NoSuchPathError or InvalidGitRepositoryError. This is
# used by callers of GitRepo.__init__() to detect whether we have a
# valid repo at `path`. Now, with switching to lazy loading property
# `repo`, we detect those cases without instantiating a
# gitpy.Repo().
if not exists(path):
raise NoSuchPathError(path)
if not _valid_repo:
raise InvalidGitRepositoryError(path)
# END Repo validity test
# So that we "share" control paths with git/git-annex
if ssh_manager:
ssh_manager.ensure_initialized()
# note: we may also want to distinguish between a path to the worktree
# and the actual repository
if git_opts is None:
git_opts = {}
if kwargs:
git_opts.update(kwargs)
self._cfg = None
if do_create: # we figured it out earlier
from_cmdline = git_opts.pop('_from_cmdline_', [])
self.init(
sanity_checks=create_sanity_checks,
init_options=from_cmdline + to_options(True, **git_opts),
)
# with DryRunProtocol path might still not exist
self.inode: Optional[int]
if exists(self.path):
self.inode = os.stat(self.path).st_ino
else:
self.inode = None
if fake_dates:
self.configure_fake_dates()
@property
def bare(self) -> bool:
"""Returns a bool indicating whether the repository is bare
Importantly, this is not reporting the configuration value
of 'core.bare', in order to be usable at a stage where a
Repo instance is not yet equipped with a ConfigManager.
Instead, it is testing whether the repository path and its
"dot_git" are identical. The value of 'core.bare' can be query
from the ConfigManager in a fully initialized instance.
"""
return self.pathobj == self.dot_git
[docs]
@classmethod
def clone(cls, url: str, path: str, *args: Any, clone_options: Optional[list[str] | dict[str, Option]] = None, **kwargs: Any) -> Self:
"""Clone url into path
Provides workarounds for known issues (e.g.
https://github.com/datalad/datalad/issues/785)
Parameters
----------
url : str
path : str
clone_options : dict or list
Arbitrary options that will be passed on to the underlying call to
`git-clone`. This may be a list of plain options or key-value pairs
that will be converted to a list of plain options with `to_options`.
expect_fail : bool
Whether expect that command might fail, so error should be logged then
at DEBUG level instead of ERROR
kwargs:
Passed to the Repo class constructor.
"""
if 'repo' in kwargs:
raise TypeError("argument 'repo' conflicts with cloning")
# TODO: what about 'create'?
expect_fail = kwargs.pop('expect_fail', False)
# fail early on non-empty target:
from os import listdir
if exists(path) and listdir(path):
raise ValueError(
"destination path '%s' already exists and is not an "
"empty directory." % path)
else:
# protect against cloning into existing and obviously dangling
# instance for that location
try:
del cls._unique_instances[path]
except KeyError:
# didn't exist - all fine
pass
# Massage URL
url_ri = RI(url) if not isinstance(url, RI) else url
if on_windows:
# When we're cloning from a local path on Windows, the URL at
# this point is platform-specific (e.g., "..\\origin"). According
# to Git clone's manpage, clone urls can't have backslashes.
# While Git does manage to clone a URL with backslashes,
# in the case of subdatasets cloned from relative paths it nevertheless
# messed up the resulting remote url, resulting in a mix of
# front and backslashes (see also gh-7180):
# 'C:/Users/adina/AppData/Local/Temp/datalad_temp_frvczceh/ds/..\\origin'
# Therefore, we're turning it to Posix now.
if isinstance(url_ri, PathRI):
url = Path(url).as_posix()
url_ri = PathRI(url)
else:
# if we are on windows, the local path of a URL
# would not end up being a proper local path and cloning
# would fail. Don't try to be smart and just pass the
# URL along unmodified
# try to get a local path from `url`:
try:
url = url_ri.localpath
url_ri = RI(url)
except ValueError:
pass
if is_ssh(url_ri):
ssh_manager.get_connection(url).open()
else:
if isinstance(url_ri, PathRI):
# expand user, because execution not going through a shell
# doesn't work well otherwise
new_url = os.path.expanduser(url)
if url != new_url:
lgr.info("Expanded source path to %s from %s", new_url, url)
url = new_url
cmd = cls._git_cmd_prefix + ['clone', '--progress']
if clone_options:
if isinstance(clone_options, Mapping):
clone_options = to_options(True, **clone_options)
cmd.extend(clone_options)
cmd.extend([url, path])
fix_annex = None
ntries = 5 # 3 is not enough for robust workaround
for trial in range(ntries):
try:
lgr.debug("Git clone from %s to %s", url, path)
res = GitWitlessRunner().run(cmd, protocol=GitProgress)
# fish out non-critical warnings by git-clone
# (empty repo clone, etc.), all other content is logged
# by the progress helper to 'debug'
for errline in res['stderr'].splitlines():
if errline.startswith('warning:'):
lgr.warning(errline[8:].strip())
lgr.debug("Git clone completed")
break
except CommandError as e:
# log here but let caller decide what to do
ce = CapturedException(e)
str_e = str(e)
# see https://github.com/datalad/datalad/issues/785
if re.search("Request for .*aborted.*Unable to find", str_e,
re.DOTALL) \
and trial < ntries - 1:
lgr.info(
"Hit a known issue with Git (see GH#785). Trial #%d, "
"retrying",
trial)
continue
#(lgr.debug if expect_fail else lgr.error)(e_str)
if "Clone succeeded, but checkout failed." in str_e:
fix_annex = ce
break
raise
# get ourselves a repository instance
gr = cls(path, *args, **kwargs)
if fix_annex:
# cheap check whether we deal with an AnnexRepo - we can't check the class of `gr` itself, since we then
# would need to import our own subclass
if hasattr(gr, 'is_valid_annex'):
lgr.warning("Experienced issues while cloning. "
"Trying to fix it, using git-annex-fsck.")
if not gr.is_initialized():
gr._init()
gr.fsck()
else:
lgr.warning("Experienced issues while cloning: %s", fix_annex)
# ensure that Git doesn't mangle relative paths into obscure absolute
# paths: https://github.com/datalad/datalad/issues/3538
if isinstance(url_ri, PathRI):
url_path = Path(url)
if not url_path.is_absolute():
# get git-created path
remote_url = 'remote.' + gr.get_remotes()[0] + '.url'
git_url = gr.config.get(remote_url)
if Path(git_url).is_absolute():
# Git created an absolute path from a relative URL.
git_url = op.relpath(git_url, gr.path)
# always in POSIX even on Windows
path = Path(git_url).as_posix()
gr.config.set(remote_url, path,
scope='local', force=True)
return gr
# Note: __del__ shouldn't be needed anymore as we switched to
# `weakref.finalize`.
# https://docs.python.org/3/library/weakref.html#comparing-finalizers-with-del-methods
#
# Keeping both methods and this comment around as a reminder to not
# use __del__, if we figure there's a need for cleanup in the future.
#
# def __del__(self):
# # unbind possibly bound ConfigManager, to prevent all kinds of weird
# # stalls etc
# self._cfg = None
[docs]
def is_valid_git(self) -> bool:
"""Returns whether the underlying repository appears to be still valid
Note, that this almost identical to the classmethod is_valid_repo().
However, if we are testing an existing instance, we can save Path object
creations. Since this testing is done a lot, this is relevant. Creation
of the Path objects in is_valid_repo() takes nearly half the time of the
entire function.
Also note, that this method is bound to an instance but still
class-dependent, meaning that a subclass cannot simply overwrite it.
This is particularly important for the call from within __init__(),
which in turn is called by the subclasses' __init__. Using an overwrite
would lead to the wrong thing being called.
"""
return self.is_valid()
[docs]
@classmethod
def is_valid_repo(cls, path: str) -> bool:
"""Returns if a given path points to a git repository"""
return cls.is_valid(path)
[docs]
@staticmethod
def get_git_dir(repo: str | GitRepo) -> str:
"""figure out a repo's gitdir
'.git' might be a directory, a symlink or a file
Note
----
This method is likely to get deprecated, please use GitRepo.dot_git instead!
That one's not static, but it's cheaper and you should avoid
not having an instance of a repo you're working on anyway.
Note, that the property in opposition to this method returns an absolute path.
Parameters
----------
repo: path or Repo instance
currently expected to be the repos base dir
Returns
-------
str
relative path to the repo's git dir; So, default would be ".git"
"""
if isinstance(repo, GitRepo):
return str(repo.dot_git)
pathobj = Path(repo)
dot_git = _get_dot_git(pathobj, ok_missing=False)
try:
dot_git = dot_git.relative_to(pathobj)
except ValueError:
# is not a subpath, return as is
lgr.debug("Path %r is not subpath of %r", dot_git, pathobj)
return str(dot_git)
@property
def config(self):
# just proxy the core repo APIs property for backward-compatibility
return self.cfg
[docs]
def is_with_annex(self) -> bool:
"""Report if GitRepo (assumed) has (remotes with) a git-annex branch
"""
return any(
b['refname:strip=2'] == 'git-annex' or b['refname:strip=2'].endswith('/git-annex')
for b in self.for_each_ref_(fields='refname:strip=2', pattern=['refs/heads', 'refs/remotes'])
)
[docs]
@classmethod
def get_toppath(cls, path: str, follow_up: bool = True, git_options: Optional[list[str]] = None) -> Optional[str]:
"""Return top-level of a repository given the path.
Parameters
-----------
follow_up : bool
If path has symlinks -- they get resolved by git. If follow_up is
True, we will follow original path up until we hit the same resolved
path. If no such path found, resolved one would be returned.
git_options: list of str
options to be passed to the git rev-parse call
Return None if no parent directory contains a git repository.
"""
cmd = ['git']
if git_options:
cmd.extend(git_options)
cmd += ["rev-parse", "--show-toplevel"]
try:
out = GitWitlessRunner(cwd=path).run(
cmd, protocol=StdOutErrCapture)
assert isinstance(out, dict)
toppath = out['stdout'].rstrip('\n\r')
except CommandError:
return None
except OSError:
toppath = GitRepo.get_toppath(dirname(path), follow_up=follow_up,
git_options=git_options)
# normalize the report, because, e.g. on windows it can come out
# with improper directory separators (C:/Users/datalad)
toppath = str(Path(toppath))
if follow_up:
path_ = path
path_prev = ""
while path_ and path_ != path_prev: # on top /.. = /
if str(Path(path_).resolve()) == toppath:
toppath = path_
break
path_prev = path_
path_ = dirname(path_)
return toppath
[docs]
@normalize_paths
def add(self, files: list[str], git: bool = True, git_options: Optional[list[str]] = None, update: bool = False) -> list[GitAddOutput]:
"""Adds file(s) to the repository.
Parameters
----------
files: list
list of paths to add
git: bool
somewhat ugly construction to be compatible with AnnexRepo.add();
has to be always true.
update: bool
--update option for git-add. From git's manpage:
Update the index just where it already has an entry matching
<pathspec>. This removes as well as modifies index entries to match
the working tree, but adds no new files.
If no <pathspec> is given when --update option is used, all tracked
files in the entire working tree are updated (old versions of Git
used to limit the update to the current directory and its
subdirectories).
Returns
-------
list
Of status dicts.
"""
# under all circumstances call this class' add_ (otherwise
# AnnexRepo.add would go into a loop
return list(GitRepo.add_(self, files, git=git, git_options=git_options,
update=update))
[docs]
def add_(self, files: list[str], git: bool = True, git_options: Optional[list[str]] = None, update: bool = False) -> Iterator[GitAddOutput]:
"""Like `add`, but returns a generator"""
# TODO: git_options is used as options for the git-add here,
# instead of options to the git executable => rename for consistency
if not git:
lgr.warning(
'GitRepo.add() called with git=%s, this should not happen',
git)
git = True
# there is no other way then to collect all files into a list
# at this point, because we need to pass them at once to a single
# `git add` call
files = [_normalize_path(self.path, f) for f in ensure_list(files) if f]
if not (files or git_options or update):
# wondering why just a warning? in cmdline this is also not an error
lgr.warning("add was called with empty file list and no options.")
return
try:
# without --verbose git 2.9.3 add does not return anything
add_out = self._call_git(
# Set annex.gitaddtoannex to prevent storing files in
# annex with a v6+ annex repo.
['-c', 'annex.gitaddtoannex=false', 'add'] +
ensure_list(git_options) +
to_options(update=update) + ['--verbose'],
files=files,
pathspec_from_file=True,
read_only=False,
)
# get all the entries
for o in self._process_git_get_output(*add_out):
yield o
# Note: as opposed to git cmdline, force is True by default in
# gitpython, which would lead to add things, that are
# ignored or excluded otherwise
# 2. Note: There is an issue with globbing (like adding '.'),
# which apparently doesn't care for 'force' and therefore
# adds '.git/...'. May be it's expanded at the wrong
# point in time or sth. like that.
# For now, use direct call to git add.
#self.cmd_call_wrapper(self.repo.index.add, files, write=True,
# force=False)
# TODO: May be make use of 'fprogress'-option to indicate
# progress
# But then, we don't have it for git-annex add, anyway.
#
# TODO: Is write=True a reasonable way to do it?
# May be should not write until success of operation is
# confirmed?
# What's best in case of a list of files?
except OSError as e:
lgr.error("add: %s", e)
raise
# Make sure return value from GitRepo is consistent with AnnexRepo
# currently simulating similar return value, assuming success
# for all files:
# TODO: Make return values consistent across both *Repo classes!
return
@staticmethod
def _process_git_get_output(stdout: str | bytes, stderr: Any = None) -> list[GitAddOutput]:
"""Given both outputs (stderr is ignored atm) of git add - process it
Primarily to centralize handling in both indirect annex and direct
modes when ran through proxy
"""
return [{'file': f, 'success': True}
for f in re.findall("'(.*)'[\n$]", ensure_unicode(stdout))]
[docs]
@normalize_paths(match_return_type=False)
def remove(self, files: list[str], recursive: bool = False, **kwargs: Option) -> list[str]:
"""Remove files.
Calls git-rm.
Parameters
----------
files: list of str
list of paths to remove
recursive: False
whether to allow recursive removal from subdirectories
kwargs:
see `__init__`
Returns
-------
[str]
list of successfully removed files.
"""
if recursive:
kwargs['r'] = True
# the name is chosen badly, but the purpose is to make sure that
# any pending operations actually manifest themselves in the Git repo
# on disk (in case of an AnnexRepo, it could be pending batch
# processes that need closing)
self.precommit()
# output per removed file is expected to be "rm 'PATH'":
return [
line.strip()[4:-1]
for line in self.call_git_items_(
['rm'] + to_options(True, **kwargs), files=files, pathspec_from_file=True)
]
[docs]
def precommit(self) -> None:
"""Perform pre-commit maintenance tasks
"""
# we used to clean up GitPython here
pass
@staticmethod
def _get_prefixed_commit_msg(msg: Optional[str]) -> str:
DATALAD_PREFIX = "[DATALAD]"
return DATALAD_PREFIX if not msg else "%s %s" % (DATALAD_PREFIX, msg)
@property
def fake_dates_enabled(self) -> bool:
"""Is the repository configured to use fake dates?
"""
# this turned into a private property of the CoreGitRepo
return self._fake_dates_enabled
[docs]
def add_fake_dates(self, env):
# was renamed in CoreGitRepo
return self.add_fake_dates_to_env(env)
[docs]
def commit(self, msg: Optional[str] = None,
options: Optional[list[str]] = None, _datalad_msg: bool = False,
careless: bool = True, files: Optional[list[str]] = None,
date: Optional[str] = None, index_file: Optional[str] = None) -> None:
"""Commit changes to git.
Parameters
----------
msg: str, optional
commit-message
options: list of str, optional
cmdline options for git-commit
_datalad_msg: bool, optional
To signal that commit is automated commit by datalad, so
it would carry the [DATALAD] prefix
careless: bool, optional
if False, raise when there's nothing actually committed;
if True, don't care
files: list of str, optional
path(s) to commit
date: str, optional
Date in one of the formats git understands
index_file: str, optional
An alternative index to use
"""
self.precommit()
# assemble commandline
cmd = ['commit']
options = ensure_list(options)
if date:
options += ["--date", date]
orig_msg = msg
if not msg:
if '--amend' in options:
if '--no-edit' not in options:
# don't overwrite old commit message with our default
# message by default, but reuse old one. In other words:
# Make --no-edit the default:
options += ["--no-edit"]
else:
msg = 'Recorded changes'
_datalad_msg = True
if _datalad_msg:
msg = self._get_prefixed_commit_msg(msg)
if msg:
options += ["-m", msg]
cmd.extend(options)
# set up env for commit
env = self.add_fake_dates(None) \
if self.fake_dates_enabled else os.environ.copy()
if index_file:
env['GIT_INDEX_FILE'] = index_file
lgr.debug("Committing via direct call of git: %s", cmd)
prev_sha = self.get_hexsha()
# Old code was doing clever --amend'ing of chunked series of commits manually
# here, but with pathspec_from_file it is no longer needed.
# store pre-commit state to be able to check if anything was committed
try:
# Note: call_git operates via joining call_git_items_ and that one wipes out
# .stdout from exception and collects/repopulates stderr only. Let's use
# _call_git which returns both outputs and collects/re-populates both stdout
# **and** stderr
_ = self._call_git(
cmd,
files=files,
env=env,
pathspec_from_file=True,
)
except CommandError as e:
# real errors first
if "did not match any file(s) known to git" in e.stderr:
raise FileNotInRepositoryError(
cmd=e.cmd,
msg="File(s) unknown to git",
code=e.code,
filename=linesep.join([
l for l in e.stderr.splitlines()
if l.startswith("error: pathspec")
])
)
# behavior choices now
elif not careless:
# not willing to compromise at all
raise
elif 'nothing to commit' in e.stdout:
lgr.debug("nothing to commit in %s. Ignored.", self)
elif 'no changes added to commit' in e.stdout or \
'nothing added to commit' in e.stdout:
lgr.debug("no changes added to commit in %s. Ignored.", self)
else:
raise
if orig_msg \
or '--dry-run' in cmd \
or prev_sha == self.get_hexsha() \
or ('--amend' in cmd and '--no-edit' in cmd) \
or (not is_interactive()) \
or self.config.obtain('datalad.save.no-message') != 'interactive':
# we had a message given, or nothing was committed, or prev. commit
# was amended, or we are not connected to a terminal, or no
# interactive message input is desired:
# we can go home
return
# handle interactive message entry by running another `git-commit`
self._git_runner.run(
self._git_cmd_prefix + cmd + ['--amend', '--edit'],
protocol=NoCapture,
stdin=None,
env=env,
)
# TODO usage is only in the tests, consider making a test helper and
# remove from GitRepo API
[docs]
def get_indexed_files(self) -> list[str]:
"""Get a list of files in git's index
Returns
-------
list
list of paths rooting in git's base dir
"""
return [
str(r.relative_to(self.pathobj))
for r in self.get_content_info(
paths=None, ref=None, untracked='no')
]
[docs]
def get_hexsha(self, commitish: Optional[str] = None, short: bool = False) -> Optional[str]:
"""Return a hexsha for a given commitish.
Parameters
----------
commitish : str, optional
Any identifier that refers to a commit (defaults to "HEAD").
short : bool, optional
Return the abbreviated form of the hexsha.
Returns
-------
str or, if no commitish was given and there are no commits yet, None.
Raises
------
ValueError
If a commitish was given, but no corresponding commit could be
determined.
"""
# use --quiet because the 'Needed a single revision' error message
# that is the result of running this in a repo with no commits
# isn't useful to report
cmd = ['rev-parse', '--quiet', '--verify', '{}^{{commit}}'.format(
commitish if commitish else 'HEAD')
]
if short:
cmd.append('--short')
try:
return self.call_git_oneline(cmd, read_only=True)
except CommandError as e:
if commitish is None:
return None
raise ValueError("Unknown commit identifier: %s" % commitish)
[docs]
@normalize_paths(match_return_type=False)
def get_last_commit_hexsha(self, files: list[str]) -> Optional[str]:
"""Return the hash of the last commit the modified any of the given
paths"""
try:
commit = self.call_git(
['rev-list', '-n1', 'HEAD'],
files=files,
expect_fail=True,
read_only=True,
)
commit = commit.strip()
return commit if commit else None
except CommandError:
if self.get_hexsha() is None:
# unborn branch, don't freak out
return None
raise
[docs]
def get_revisions(self, revrange: str | list[str] | None = None, fmt: str = "%H", options: Optional[list[str]] = None) -> list[str]:
"""Return list of revisions in `revrange`.
Parameters
----------
revrange : str or list of str or None, optional
Revisions or revision ranges to walk. If None, revision defaults to
HEAD unless a revision-modifying option like `--all` or
`--branches` is included in `options`.
fmt : string, optional
Format accepted by `--format` option of `git log`. This should not
contain new lines because the output is split on new lines.
options : list of str, optional
Options to pass to `git log`. This should not include `--format`.
Returns
-------
List of revisions (str), formatted according to `fmt`.
"""
if revrange is None:
revrange = []
elif isinstance(revrange, str):
revrange = [revrange]
cmd = ["log", "--format={}".format(fmt)]
cmd.extend((options or []) + revrange + ["--"])
try:
stdout = self.call_git(cmd, expect_fail=True, read_only=True)
except CommandError as e:
if "does not have any commits" in e.stderr:
return []
raise
return stdout.splitlines()
[docs]
def commit_exists(self, commitish: str) -> bool:
"""Does `commitish` exist in the repo?
Parameters
----------
commitish : str
A commit or an object that can be dereferenced to one.
Returns
-------
bool
"""
# Note: The peeling operator "^{commit}" is required so that rev-parse
# doesn't succeed if passed a full hexsha that is valid but doesn't
# exist.
return self.call_git_success(
["rev-parse", "--verify", commitish + "^{commit}"],
read_only=True,
)
[docs]
def get_merge_base(self, commitishes: str | list[str]) -> Optional[str]:
"""Get a merge base hexsha
Parameters
----------
commitishes: str or list of str
List of commitishes (branches, hexshas, etc) to determine the merge
base of. If a single value provided, returns merge_base with the
current branch.
Returns
-------
str or None
If no merge-base for given commits, or specified treeish doesn't
exist, None returned
"""
if isinstance(commitishes, str):
commitishes = [commitishes]
if not commitishes:
raise ValueError("Provide at least a single value")
elif len(commitishes) == 1:
branch = self.get_active_branch()
if branch is None:
raise ValueError("Single commitish provided and no active branch")
commitishes = commitishes + [branch]
try:
base = self.call_git_oneline(['merge-base'] + commitishes,
read_only=True)
except CommandError as exc:
if exc.code == 1 and not (exc.stdout or exc.stderr):
# No merge base was found (unrelated commits).
return None
if "fatal: Not a valid object name" in exc.stderr:
return None
raise
return base
[docs]
def is_ancestor(self, reva: str, revb: str) -> bool:
"""Is `reva` an ancestor of `revb`?
Parameters
----------
reva, revb : str
Revisions.
Returns
-------
bool
"""
return self.call_git_success(
["merge-base", "--is-ancestor", reva, revb],
read_only=True)
[docs]
def get_commit_date(self, branch: Optional[str] = None, date: str = 'authored') -> Optional[int]:
"""Get the date stamp of the last commit (in a branch or head otherwise)
Parameters
----------
date: {'authored', 'committed'}
Which date to return. "authored" will be the date shown by "git show"
and the one possibly specified via --date to `git commit`
Returns
-------
int or None
None if no commit
"""
if date == 'committed':
format = '%ct'
elif date == 'authored':
format = '%at'
else:
raise ValueError('unknow date type: {}'.format(date))
d = self.format_commit(format, commitish=branch)
return int(d) if d else None
[docs]
def get_active_branch(self) -> Optional[str]:
"""Get the name of the active branch
Returns
-------
str or None
Returns None if there is no active branch, i.e. detached HEAD,
and the branch name otherwise.
"""
try:
out = self.call_git(["symbolic-ref", "HEAD"], expect_fail=True,
read_only=True)
except CommandError as e:
if 'HEAD is not a symbolic ref' in e.stderr:
lgr.debug("detached HEAD in %s", self)
return None
else:
raise e
return out.strip()[11:] # strip refs/heads/
[docs]
def get_corresponding_branch(self, branch: Any = None) -> Optional[str]:
"""Always returns None, a plain GitRepo has no managed branches"""
return None
[docs]
def get_branches(self) -> list[str]:
"""Get all branches of the repo.
Returns
-------
[str]
Names of all branches of this repository.
"""
return [
b['refname:strip=2']
for b in self.for_each_ref_(fields='refname:strip=2', pattern='refs/heads')
]
[docs]
def get_remote_branches(self) -> list[str]:
"""Get all branches of all remotes of the repo.
Returns
-----------
[str]
Names of all remote branches.
"""
# TODO: Reconsider melting with get_branches()
# TODO: treat entries like this: origin/HEAD -> origin/master'
# currently this is done in collection
return [
b['refname:strip=2']
for b in self.for_each_ref_(fields='refname:strip=2', pattern='refs/remotes')
]
[docs]
def get_remotes(self, with_urls_only: bool = False) -> list[str]:
"""Get known remotes of the repository
Parameters
----------
with_urls_only : bool, optional
return only remotes which have urls
Returns
-------
remotes : list of str
List of names of the remotes
"""
from datalad.utils import unique
self.config.reload()
remotes = unique([x[7:] for x in self.config.sections()
if x.startswith("remote.")])
if with_urls_only:
remotes = [
r for r in remotes
if self.config.get('remote.%s.url' % r)
]
return remotes
# TODO this is practically unused outside the tests, consider turning
# into a test helper and trim from the API
[docs]
def get_files(self, branch: Optional[str] = None) -> list[str]:
"""Get a list of files in git.
Lists the files in the (remote) branch.
Parameters
----------
branch: str
Name of the branch to query. Default: active branch.
Returns
-------
[str]
list of files.
"""
return [
str(p.relative_to(self.pathobj))
for p in self.get_content_info(
paths=None, ref=branch, untracked='no')
]
[docs]
def add_remote(self, name: str, url: str, options: Optional[list[str]] = None) -> tuple[str, str]:
"""Register remote pointing to a url
"""
cmd = ['remote', 'add']
if options:
cmd += options
cmd += [name, url]
# for historical reasons this method returns stdout and
# stderr, keeping that for now
result = self._call_git(cmd)
self.config.reload()
return result
[docs]
def remove_remote(self, name: str) -> None:
"""Remove existing remote
"""
# TODO: testing and error handling!
from .exceptions import RemoteNotAvailableError
try:
self.call_git(['remote', 'remove', name])
except CommandError as e:
if 'No such remote' in e.stderr:
raise RemoteNotAvailableError(name,
cmd="git remote remove",
msg="No such remote",
stdout=e.stdout,
stderr=e.stderr)
else:
raise e
# config.reload necessary, because the associated remote config
# will vanish
self.config.reload()
return
def _maybe_open_ssh_connection(self, remote: Optional[str], prefer_push: bool = True) -> None:
"""Open connection if `remote` has an SSH URL.
Doing so enables SSH caching, preventing datalad-sshrun subprocesses
from opening (and then closing) their own.
Parameters
----------
remote : str
prefer_push : bool, optional
Use `remote.<remote>.pushurl` if there is one, falling back to
`remote.<remote>.url`.
"""
if remote:
url = None
if prefer_push:
url = self.get_remote_url(remote, push=True)
url = url or self.get_remote_url(remote)
if url and is_ssh(url):
ssh_manager.get_connection(url).open()
[docs]
def update_remote(self, name: Optional[str] = None, verbose: bool = False) -> None:
"""
"""
options = ["-v"] if verbose else []
self._maybe_open_ssh_connection(name)
namelst = [name] if name else []
self.call_git(
['remote'] + namelst + ['update'] + options,
expect_stderr=True
)
[docs]
def fetch(self, remote: Optional[str] = None, refspec: str | list[str] | None = None, all_: bool = False, git_options: Optional[list[str]] = None,
**kwargs: Option) -> list[FetchInfo]:
"""Fetches changes from a remote (or all remotes).
Parameters
----------
remote : str, optional
name of the remote to fetch from. If no remote is given and
`all_` is not set, the tracking branch is fetched.
refspec : str or list, optional
refspec(s) to fetch.
all_ : bool, optional
fetch all remotes (and all of their branches).
Fails if `remote` was given.
git_options : list, optional
Additional command line options for git-fetch.
kwargs :
Deprecated. GitPython-style keyword argument for git-fetch.
Will be appended to any git_options.
"""
git_options = ensure_list(git_options)
if kwargs:
git_options.extend(to_options(True, **kwargs))
return list(
self.fetch_(
remote=remote,
refspec=refspec,
all_=all_,
git_options=git_options,
)
)
[docs]
def fetch_(self, remote: Optional[str] = None, refspec: str | list[str] | None = None, all_: bool = False, git_options: Optional[list[str]] = None) -> Iterator[FetchInfo]:
"""Like `fetch`, but returns a generator"""
yield from self._fetch_push_helper(
base_cmd=self._git_cmd_prefix + ['fetch', '--verbose', '--progress'],
action='fetch',
urlvars=('remote.{}.url', 'remote.{}.url'),
protocol=GitProgress,
info_cls=FetchInfo,
info_from='stderr',
add_remote=False,
remote=remote,
refspec=refspec,
all_=all_,
git_options=git_options)
[docs]
def push(self, remote: Optional[str] = None, refspec: str | list[str] | None = None, all_remotes: bool = False,
all_: bool = False, git_options: Optional[list[str]] = None, **kwargs: Option) -> list[PushInfo]:
"""Push changes to a remote (or all remotes).
If remote and refspec are specified, and remote has
`remote.{remote}.datalad-push-default-first` configuration variable
set (e.g. by `create-sibling-github`), we will first push the first
refspec separately to possibly ensure that the first refspec is chosen
by remote as the "default branch".
See https://github.com/datalad/datalad/issues/4997
Upon successful push if this variable was set in the local git config,
we unset it, so subsequent pushes would proceed normally.
Parameters
----------
remote : str, optional
name of the remote to push to. If no remote is given and
`all_` is not set, the tracking branch is pushed.
refspec : str or list, optional
refspec(s) to push.
all_ : bool, optional
push to all remotes. Fails if `remote` was given.
git_options : list, optional
Additional command line options for git-push.
kwargs :
Deprecated. GitPython-style keyword argument for git-push.
Will be appended to any git_options.
"""
git_options = ensure_list(git_options)
if kwargs:
git_options.extend(to_options(True, **kwargs))
if all_remotes:
# be nice to the elderly
all_ = True
push_refspecs = [refspec]
cfg = self.config # shortcut
cfg_push_var = "remote.{}.datalad-push-default-first".format(remote)
if remote and refspec and cfg.obtain(cfg_push_var, default=False, valtype=bool):
refspec = ensure_list(refspec)
lgr.debug("As indicated by %s pushing first refspec %s separately first",
cfg_push_var, refspec[0])
push_refspecs = [[refspec[0]], refspec[1:]]
push_res: list[PushInfo] = []
for refspecs in push_refspecs:
push_res.extend(
self.push_(
remote=remote,
refspec=refspecs,
all_=all_,
git_options=git_options,
)
)
# note: above push_ should raise exception if errors out
if '--dry-run' not in git_options \
and cfg.get_from_source('local', cfg_push_var) is not None:
lgr.debug("Removing %s variable from local git config after successful push", cfg_push_var)
cfg.unset(cfg_push_var, 'local')
return push_res
[docs]
def push_(self, remote: Optional[str] = None, refspec: str | list[str] | None = None, all_: bool = False, git_options: Optional[list[str]] =None) -> Iterator[PushInfo]:
"""Like `push`, but returns a generator"""
yield from self._fetch_push_helper(
base_cmd=self._git_cmd_prefix + ['push', '--progress', '--porcelain'],
action='push',
urlvars=('remote.{}.pushurl', 'remote.{}.url'),
protocol=StdOutCaptureWithGitProgress,
info_cls=PushInfo,
info_from='stdout',
add_remote=True,
remote=remote,
refspec=refspec,
all_=all_,
git_options=git_options)
def _fetch_push_helper(
self,
base_cmd: list[str], # arg list
action: str, # label fetch|push
urlvars: tuple[str, ...], # variables to query for URLs
protocol: type[WitlessProtocol], # processor for output
info_cls: type[InfoT], # Push|FetchInfo
info_from: str, # stdout, stderr
add_remote: bool, # whether to add a 'remote' field to the info dict
remote: Optional[str] = None, refspec: str | list[str] | None = None, all_: bool =False, git_options: Optional[list[str]] = None) -> Iterator[InfoT]:
git_options = ensure_list(git_options)
cmd = base_cmd + git_options
if remote is None:
if refspec:
# conflicts with using tracking branch or push all remotes
# For now: Just fail.
# TODO: May be check whether it fits to tracking branch
raise ValueError(
"refspec specified without a remote. ({})".format(refspec))
if all_:
remotes_to_process = self.get_remotes(with_urls_only=True)
else:
# No explicit remote to fetch.
# => get tracking branch:
tb_remote, refspec = self.get_tracking_branch()
if tb_remote is not None:
remotes_to_process = [tb_remote]
else:
# No remote, no tracking branch
# => fail
raise ValueError(
"Neither a remote is specified to {} "
"from nor a tracking branch is set up.".format(action))
else:
if all_:
raise ValueError(
"Option 'all_' conflicts with specified remote "
"'{}'.".format(remote))
remotes_to_process = [remote]
if refspec:
# prep for appending to cmd
refspec = ensure_list(refspec)
# no need for progress report, when there is just one remote
log_remote_progress = len(remotes_to_process) > 1
if log_remote_progress:
pbar_id = '{}remotes-{}'.format(action, id(self))
log_progress(
lgr.info,
pbar_id,
'Start %sing remotes for %s', action, self,
total=len(remotes_to_process),
label=action.capitalize(),
unit=' Remotes',
)
try:
for remote in remotes_to_process:
r_cmd = cmd + [remote]
if refspec:
r_cmd += refspec
if log_remote_progress:
log_progress(
lgr.info,
pbar_id,
'{}ing remote %s'.format(action.capitalize()),
remote,
update=1,
increment=True,
)
# best effort to enable SSH connection caching
url = self.config.get(
# make two attempts to get a URL
urlvars[0].format(remote),
self.config.get(
urlvars[1].format(remote),
None)
)
if url and is_ssh(url):
ssh_manager.get_connection(url).open()
try:
out = self._git_runner.run(
r_cmd,
protocol=protocol,
)
output = out[info_from] or ''
except CommandError as e:
output = None
# intercept some errors that we express as an error report
# in the info dicts
if re.match(
'.*^error: failed to (push|fetch) some refs',
e.stderr,
re.DOTALL | re.MULTILINE):
output = getattr(e, info_from)
hints = ' '.join([l[6:] for l in e.stderr.splitlines()
if l.startswith('hint: ')])
if output is None:
output = ''
if not output:
raise
assert isinstance(output, str)
for line in output.splitlines():
try:
# push info doesn't identify a remote, add it here
pi = info_cls._from_line(line)
if add_remote:
pi['remote'] = remote
# There were errors, but Git provided hints
if 'error' in pi['operations']:
pi['hints'] = hints or None
yield pi
except Exception:
# it is not progress and no push info
# don't hide it completely
lgr.debug('git-%s reported: %s', action, line)
finally:
if log_remote_progress:
log_progress(
lgr.info,
pbar_id,
'Finished %sing remotes for %s', action, self,
)
[docs]
def get_remote_url(self, name: str, push: bool = False) -> Optional[str]:
"""Get the url of a remote.
Reads the configuration of remote `name` and returns its url or None,
if there is no url configured.
Parameters
----------
name: str
name of the remote
push: bool
if True, get the pushurl instead of the fetch url.
"""
var = 'remote.{0}.{1}'.format(name, 'pushurl' if push else 'url')
return self.config.get(var, None)
[docs]
def set_remote_url(self, name: str, url: str, push: bool = False) -> None:
"""Set the URL a remote is pointing to
Sets the URL of the remote `name`. Requires the remote to already exist.
Parameters
----------
name: str
name of the remote
url: str
push: bool
if True, set the push URL, otherwise the fetch URL
"""
var = 'remote.{0}.{1}'.format(name, 'pushurl' if push else 'url')
self.config.set(var, url, scope='local', reload=True)
[docs]
def get_branch_commits_(self, branch: Optional[str] = None, limit: Optional[str] = None, stop: Optional[str] = None) -> Iterator[str]:
"""Return commit hexshas for a branch
Parameters
----------
branch: str, optional
If not provided, assumes current branch
limit: None | 'left-only', optional
Limit which commits to report. If None -- all commits (merged or not),
if 'left-only' -- only the commits from the left side of the tree upon
merges
stop: str, optional
hexsha of the commit at which stop reporting (matched one is not
reported either)
Yields
------
str
"""
cmd = ['rev-list']
if limit == 'left-only':
cmd.append('--left-only')
if not branch:
branch = self.get_active_branch()
if branch is None:
raise ValueError("Branch not provided and no active branch")
cmd.append(branch)
# and trailing -- marker to make sure that Git never confused the branch
# with a potentially existing directory of the same name
cmd.append('--')
for r in self.call_git_items_(cmd):
if stop and stop == r:
return
yield r
[docs]
def checkout(self, name: str, options: Optional[list[str]] = None) -> None:
"""
"""
# TODO: May be check for the need of -b options herein?
cmd = ['checkout']
if options:
cmd += options
cmd += [str(name)]
self.call_git(cmd, expect_stderr=True)
# checkout can change committed config, or create branch config
self.config.reload()
# TODO: Before implementing annex merge, find usages and check for a needed
# change to call super().merge
[docs]
def merge(self, name: str, options: Optional[list[str]] = None, msg: Optional[str] = None, allow_unrelated: bool = False, **kwargs: Any) -> None:
if options is None:
options = []
if msg:
options = options + ["-m", msg]
if allow_unrelated:
options += ['--allow-unrelated-histories']
self.call_git(
['merge'] + options + [name],
**kwargs
)
[docs]
def remove_branch(self, branch: str) -> None:
self.call_git(['branch', '-D', branch])
[docs]
def cherry_pick(self, commit: str) -> None:
"""Cherry pick `commit` to the current branch.
Parameters
----------
commit : str
A single commit.
"""
self.call_git(["cherry-pick", commit])
@property
def dirty(self) -> bool:
"""Is the repository dirty?
Note: This provides a quick answer when you simply want to know if
there are any untracked changes or modifications in this repository or
its submodules. For finer-grained control and more detailed reporting,
use status() instead.
"""
stdout = self.call_git(
["status", "--porcelain",
# Ensure the result isn't influenced by status.showUntrackedFiles.
"--untracked-files=normal",
# Ensure the result isn't influenced by diff.ignoreSubmodules.
"--ignore-submodules=none"])
if bool(stdout.strip()):
# The quick `git status`-based check can give a different answer
# than `datalad status` for submodules on an adjusted branch.
st = self.diffstatus(fr="HEAD" if self.get_hexsha() else None,
to=None, untracked="normal")
return any(r.get("state") != "clean" for r in st.values())
return False
@property
def untracked_files(self) -> list[str]:
"""Legacy interface, do not use! Use the status() method instead.
Despite its name, it also reports on untracked datasets, and
yields their names with trailing path separators.
"""
return [
'{}{}'.format(
str(p.relative_to(self.pathobj)),
os.sep if props['type'] != 'file' else ''
)
for p, props in self.status(
untracked='all', eval_submodule_state='no').items()
if props.get('state', None) == 'untracked'
]
[docs]
def gc(self, allow_background: bool = False, auto: bool = False) -> None:
"""Perform house keeping (garbage collection, repacking)"""
cmd_options = []
if not allow_background:
cmd_options += ['-c', 'gc.autodetach=0']
cmd_options += ['gc', '--aggressive']
if auto:
cmd_options += ['--auto']
self.call_git(cmd_options)
def _parse_gitmodules(self) -> dict[PurePosixPath, dict[str, str]]:
# TODO read .gitconfig from Git blob?
gitmodules = self.pathobj / '.gitmodules'
if not gitmodules.exists():
return {}
# pull out file content
out = self.call_git(
['config', '-z', '-l', '--file', '.gitmodules'],
read_only=True)
# abuse our config parser
# disable multi-value report, because we could not deal with them
# anyways, and they should not appear in a normal .gitmodules file
# but could easily appear when duplicates are included. In this case,
# we better not crash
db, _ = parse_gitconfig_dump(out, cwd=self.path, multi_value=False)
mods: dict[str, dict[str, str]] = {}
for k, v in db.items():
if not k.startswith('submodule.'):
# we don't know what this is
lgr.warning("Skip unrecognized .gitmodule specification: %s=%s", k, v)
continue
k_l = k.split('.')
# module name is everything after 'submodule.' that is not the variable
# name
mod_name = '.'.join(k_l[1:-1])
# variable name is the last 'dot-free' segment in the key
mods.setdefault(mod_name, {})[k_l[-1]] = v
out = {}
# bring into traditional shape
for name, props in mods.items():
if 'path' not in props:
lgr.warning("Failed to get '%s', skipping this submodule", name)
continue
modprops = {'gitmodule_{}'.format(k): v
for k, v in props.items()
if not k.startswith('__')}
# Keep as PurePosixPath for possible normalization of / in the path etc
modpath = PurePosixPath(props['path'])
modprops['gitmodule_name'] = name
out[modpath] = modprops
return out
[docs]
def get_submodules_(self, paths: Optional[list[str | PathLike[str]]] = None) -> Iterator[dict]:
"""Yield submodules in this repository.
Parameters
----------
paths : list(pathlib.PurePath), optional
Restrict submodules to those under `paths`. Paths must be relative
to the resolved repository root, and must be normed to match the
reporting done by Git, i.e. no parent dir components
(ala "some/../this").
Returns
-------
A generator that yields a dictionary with information for each
submodule.
"""
if not (self.pathobj / ".gitmodules").exists():
return
modinfo = self._parse_gitmodules()
if not modinfo:
# we exit early, if there is nothing on record (even though
# a .gitmodules file exists).
# without this conditional exit, we would be able to discover
# subprojects even when they are not recorded in .gitmodules,
# but at the cost of running an unconstrained ls-files call
# below
return
posix_mod_paths = [m.as_posix() for m in modinfo]
if paths:
# harmonize them into relative to the repository
posix_paths = []
for path in paths:
path = ut.PurePath(path)
if path.is_absolute():
try:
path = path.relative_to(self.pathobj)
except ValueError as exc:
lgr.debug(
"Path %s it not underneath %s, skipping since nothing should match it: %s",
path, self.pathobj, CapturedException(exc)
)
continue
posix_paths.append(path.as_posix())
# constrain the report by the given paths, make sure all paths are POSIX
posix_mod_paths = list(get_filtered_paths_(
posix_mod_paths,
posix_paths,
include_within_path=True,
))
for r in self.call_git_items_(
['ls-files', '--stage', '-z'],
sep='\0',
files=posix_mod_paths,
read_only=True,
keep_ends=True,
):
if not r.startswith('160000'):
# make sure this method never talks about non-dataset
# content
continue
props, rpath = r.split('\t')
mode, gitsha, stage = props.split(' ')
if stage not in ('0', '2'):
# we either have non-merge situation, or a simple merge
# situation (i.e. stage=0). the reported gitsha always
# matches what we have locally.
# or we are in a three-way merge, in which case stage=2
# is what we want to report, because it matches the
# current HEAD (see git-read-tree manpage).
# there is either a stage 2 or stage 0, never both
continue
# remove the expected line separator from the path
rpath = rpath[:-1]
path = PurePosixPath(rpath)
yield dict(
path=self.pathobj / rpath, # full path returned here
type='dataset',
gitshasum=gitsha,
**modinfo.get(path, {})
)
[docs]
def get_submodules(self, sorted_: bool = True, paths: Optional[list[str | PathLike[str]]] = None) -> list[dict]:
"""Return list of submodules.
Parameters
----------
sorted_ : bool, optional
Sort submodules by path name.
paths : list(pathlib.PurePath), optional
Restrict submodules to those under `paths`.
Returns
-------
List of submodule namedtuples if `compat` is true or otherwise a list
of dictionaries as returned by `get_submodules_`.
"""
xs = self.get_submodules_(paths=paths)
if sorted_:
return sorted(xs, key=lambda x: x["path"])
else:
return list(xs)
[docs]
def update_ref(self, ref: str, value: str, oldvalue: Optional[str] = None, symbolic: bool = False) -> None:
"""Update the object name stored in a ref "safely".
Just a shim for `git update-ref` call if not symbolic, and
`git symbolic-ref` if symbolic
Parameters
----------
ref : str
Reference, such as `ref/heads/BRANCHNAME` or HEAD.
value : str
Value to update to, e.g. hexsha of a commit when updating for a
branch ref, or branch ref if updating HEAD
oldvalue: str
Value to update from. Safeguard to be verified by git. This is only
valid if `symbolic` is not True.
symbolic : None
To instruct if ref is symbolic, e.g. should be used in case of
ref=HEAD
"""
if symbolic:
if oldvalue:
raise ValueError("oldvalue and symbolic must not be given both")
cmd = ['symbolic-ref', ref, value]
else:
cmd = ['update-ref', ref, value] + ([oldvalue] if oldvalue else [])
self.call_git(cmd)
[docs]
def tag(self, tag: str, message: Optional[str] = None, commit: Optional[str] = None, options: Optional[list[str]] = None) -> None:
"""Tag a commit
Parameters
----------
tag : str
Custom tag label. Must be a valid tag name.
message : str, optional
If provided, adds ['-m', <message>] to the list of `git tag`
arguments.
commit : str, optional
If provided, will be appended as last argument to the `git tag` call,
and can be used to identify the commit that shall be tagged, if
not HEAD.
options : list, optional
Additional command options, inserted prior a potential `commit`
argument.
"""
# TODO: call in save.py complains about extensive logging. When does it
# happen in what way? Figure out, whether to just silence it or raise or
# whatever else.
args = ['tag']
if message:
args += ['-m', message]
if options is not None:
args.extend(options)
args.append(tag)
if commit:
args.append(commit)
self.call_git(args)
@overload
def get_tags(self, output: None = None) -> list[dict[str, str]]:
...
@overload
def get_tags(self, output: str) -> list[str]:
...
[docs]
def describe(self, commitish: Optional[str] = None, **kwargs: Option) -> Optional[str]:
""" Quick and dirty implementation to call git-describe
Parameters
----------
kwargs:
transformed to cmdline options for git-describe;
see __init__ for description of the transformation
"""
# TODO: be more precise what failure to expect when and raise actual
# errors
cmd = ['describe'] + to_options(True, **kwargs)
if commitish is not None:
cmd.append(commitish)
try:
describe = self.call_git(cmd, expect_fail=True)
return describe.strip()
except Exception:
return None
[docs]
def get_tracking_branch(self, branch: Optional[str] = None, remote_only: bool = False) -> tuple[Optional[str], Optional[str]]:
"""Get the tracking branch for `branch` if there is any.
Parameters
----------
branch: str
local branch to look up. If none is given, active branch is used.
remote_only : bool
Don't return a value if the upstream remote is set to "." (meaning
this repository).
Returns
-------
tuple
(remote or None, refspec or None) of the tracking branch
"""
if branch is None:
branch = self.get_corresponding_branch() or self.get_active_branch()
if branch is None:
return None, None
track_remote = self.config.get('branch.{0}.remote'.format(branch), None)
if remote_only and track_remote == ".":
return None, None
track_branch = self.config.get('branch.{0}.merge'.format(branch), None)
return track_remote, track_branch
@property
def count_objects(self) -> dict[str, int]:
"""return dictionary with count, size(in KiB) information of git objects
"""
count_cmd = ['count-objects', '-v']
count_str = self.call_git(count_cmd)
count = {key: int(value)
for key, value in [item.split(': ')
for item in count_str.split('\n')
if len(item.split(': ')) == 2]}
return count
[docs]
def get_git_attributes(self) -> dict[str, str | bool]:
"""Query gitattributes which apply to top level directory
It is a thin compatibility/shortcut wrapper around more versatile
get_gitattributes which operates on a list of paths and returns
a dictionary per each path
Returns
-------
dict:
a dictionary with attribute name and value items relevant for the
top ('.') directory of the repository, and thus most likely the
default ones (if not overwritten with more rules) for all files within
repo.
"""
return self.get_gitattributes('.')['.']
[docs]
def get_gitattributes(self, path: str | list[str], index_only: bool = False) -> dict[str, dict[str, str | bool]]:
"""Query gitattributes for one or more paths
Parameters
----------
path: path or list
Path(s) to query. Paths may be relative or absolute.
index_only: bool
Flag whether to consider only gitattribute setting that are reflected
in the repository index, not just in the work tree content.
Returns
-------
dict:
Each key is a queried path (always relative to the repository root),
each value is a dictionary with attribute
name and value items. Attribute values are either True or False,
for set and unset attributes, or are the literal attribute value.
"""
path = ensure_list(path)
cmd = ["check-attr", "-z", "--all"]
if index_only:
cmd.append('--cached')
# make sure we have one entry for each query path to
# simplify work with the result
attributes: dict[str, dict[str, str | bool]] = {p: {} for p in path}
attr = []
for item in self.call_git_items_(cmd, files=path, sep='\0',
read_only=True):
attr.append(item)
if len(attr) < 3:
continue
# we have a full record
p, name, value = attr
attrs = attributes[p]
attrs[name] = \
True if value == 'set' else False if value == 'unset' else value
# done, reset item
attr = []
return {relpath(k, self.path) if isabs(k) else k: v
for k, v in attributes.items()}
[docs]
def set_gitattributes(self, attrs: list[tuple[str, dict[str, str | bool]]], attrfile: str = '.gitattributes', mode: str = 'a') -> None:
"""Set gitattributes
By default appends additional lines to `attrfile`. Note, that later
lines in `attrfile` overrule earlier ones, which may or may not be
what you want. Set `mode` to 'w' to replace the entire file by
what you provided in `attrs`.
Parameters
----------
attrs : list
Each item is a 2-tuple, where the first element is a path pattern,
and the second element is a dictionary with attribute key/value
pairs. The attribute dictionary must use the same semantics as those
returned by `get_gitattributes()`. Path patterns can use absolute paths,
in which case they will be normalized relative to the directory
that contains the target .gitattributes file (see `attrfile`).
attrfile: path
Path relative to the repository root of the .gitattributes file the
attributes shall be set in.
mode: str
'a' to append .gitattributes, 'w' to replace it
"""
git_attributes_file = op.join(self.path, attrfile)
attrdir = op.dirname(git_attributes_file)
if not op.exists(attrdir):
os.makedirs(attrdir)
with open(git_attributes_file, mode + '+') as f:
# for append, fix existing files that do not end with \n
if mode == 'a' and f.tell():
f.seek(max(0, f.tell() - len(os.linesep)))
if not f.read().endswith('\n'):
f.write('\n')
for pattern, attr in sorted(attrs, key=lambda x: x[0]):
# normalize the pattern relative to the target .gitattributes file
npath = _normalize_path(
op.join(self.path, op.dirname(attrfile)), pattern)
# paths in gitattributes always have to be POSIX
npath = Path(npath).as_posix()
attrline = u''
if npath.count(' '):
# quote patterns with spaces
attrline += u'"{}"'.format(npath.replace('"', '\\"'))
else:
attrline += npath
for a in sorted(attr):
val = attr[a]
if val is True:
attrline += ' {}'.format(a)
elif val is False:
attrline += ' -{}'.format(a)
else:
attrline += ' {}={}'.format(a, val)
f.write('{}\n'.format(attrline))
[docs]
def get_content_info(self, paths: Optional[Sequence[str | PathLike[str]]] = None, ref: Optional[str] = None, untracked: str = 'all') -> dict[Path, dict[str, str | int | None]]:
"""Get identifier and type information from repository content.
This is simplified front-end for `git ls-files/tree`.
Both commands differ in their behavior when queried about subdataset
paths. ls-files will not report anything, ls-tree will report on the
subdataset record. This function uniformly follows the behavior of
ls-tree (report on the respective subdataset mount).
Parameters
----------
paths : list(pathlib.PurePath) or None
Specific paths, relative to the resolved repository root, to query
info for. Paths must be normed to match the reporting done by Git,
i.e. no parent dir components (ala "some/../this").
If `None`, info is reported for all content.
ref : gitref or None
If given, content information is retrieved for this Git reference
(via ls-tree), otherwise content information is produced for the
present work tree (via ls-files). With a given reference, the
reported content properties also contain a 'bytesize' record,
stating the size of a file in bytes.
untracked : {'no', 'normal', 'all'}
If and how untracked content is reported when no `ref` was given:
'no': no untracked files are reported; 'normal': untracked files
and entire untracked directories are reported as such; 'all': report
individual files even in fully untracked directories.
Returns
-------
dict
Each content item has an entry under a pathlib `Path` object instance
pointing to its absolute path inside the repository (this path is
guaranteed to be underneath `Repo.path`).
Each value is a dictionary with properties:
`type`
Can be 'file', 'symlink', 'dataset', 'directory'
`gitshasum`
SHASUM of the item as tracked by Git, or None, if not
tracked. This could be different from the SHASUM of the file
in the worktree, if it was modified.
Raises
------
ValueError
In case of an invalid Git reference (e.g. 'HEAD' in an empty
repository)
"""
lgr.debug('%s.get_content_info(...)', self)
# TODO limit by file type to replace code in subdatasets command
info: dict[Path, dict[str, str | int | None]] = dict()
if paths: # is not None separate after
# path matching will happen against what Git reports
# and Git always reports POSIX paths
# any incoming path has to be relative already, so we can simply
# convert unconditionally
# note: will be list-ified below
posix_paths = [ut.PurePath(p).as_posix() for p in paths]
elif paths is not None:
return info
else:
posix_paths = None
if posix_paths and (not ref or external_versions["cmd:git"] >= "2.29.0"):
# If a path points within a submodule, we need to map it to the
# containing submodule before feeding it to ls-files or ls-tree.
#
# Before Git 2.29.0, ls-tree and ls-files differed in how they
# reported paths within submodules: ls-files provided no output,
# and ls-tree listed the submodule. Now they both return no output.
submodules = [s["path"].relative_to(self.pathobj).as_posix()
for s in self.get_submodules_()]
# `paths` get normalized into PurePosixPath above, submodules are POSIX as well
posix_paths = get_parent_paths(posix_paths, submodules)
# this will not work in direct mode, but everything else should be
# just fine
if not ref:
# make sure no operations are pending before we figure things
# out in the worktree
self.precommit()
# --exclude-standard will make sure to honor and standard way
# git can be instructed to ignore content, and will prevent
# crap from contaminating untracked file reports
cmd = ['ls-files', '--stage', '-z']
# untracked report mode, using labels from `git diff` option style
if untracked == 'all':
cmd += ['--exclude-standard', '-o']
elif untracked == 'normal':
cmd += ['--exclude-standard', '-o', '--directory', '--no-empty-directory']
elif untracked == 'no':
pass
else:
raise ValueError(
'unknown value for `untracked`: {}'.format(untracked))
props_re = re.compile(
r'(?P<type>[0-9]+) (?P<sha>.*) (.*)\t(?P<fname>.*)$')
else:
cmd = ['ls-tree', ref, '-z', '-r', '--full-tree', '-l']
props_re = re.compile(
r'(?P<type>[0-9]+) ([a-z]*) (?P<sha>[^ ]*) [\s]*(?P<size>[0-9-]+)\t(?P<fname>.*)$')
lgr.debug('Query repo: %s', cmd)
try:
stdout = self.call_git(
cmd,
files=posix_paths,
expect_fail=True,
read_only=True)
except CommandError as exc:
if "fatal: Not a valid object name" in exc.stderr:
raise InvalidGitReferenceError(ref)
raise
lgr.debug('Done query repo: %s', cmd)
self._get_content_info_line_helper(
ref,
info,
stdout.split('\0'),
props_re)
lgr.debug('Done %s.get_content_info(...)', self)
return info
def _get_content_info_line_helper(self, ref: Optional[str], info: dict[Path, dict[str, str | int | None]], lines: list[str], props_re: Pattern[str]) -> None:
"""Internal helper of get_content_info() to parse Git output"""
mode_type_map = {
'100644': 'file',
'100755': 'file',
'120000': 'symlink',
'160000': 'dataset',
}
for line in lines:
if not line:
continue
inf: dict[str, str | int | None] = {}
props = props_re.match(line)
if not props:
# not known to Git, but Git always reports POSIX
path = ut.PurePosixPath(line)
inf['gitshasum'] = None
else:
# again Git reports always in POSIX
path = ut.PurePosixPath(props.group('fname'))
# revisit the file props after this path has not been rejected
if props:
inf['gitshasum'] = props.group('sha')
inf['type'] = mode_type_map.get(
props.group('type'), props.group('type'))
if ref and inf['type'] == 'file':
inf['bytesize'] = int(props.group('size'))
# join item path with repo path to get a universally useful
# path representation with auto-conversion and tons of other
# stuff
joinedpath = self.pathobj.joinpath(path)
if 'type' not in inf:
# be nice and assign types for untracked content
inf['type'] = 'symlink' if joinedpath.is_symlink() \
else 'directory' if joinedpath.is_dir() else 'file'
info[joinedpath] = inf
[docs]
def status(self, paths: Optional[Sequence[str | PathLike[str]]] = None, untracked: str= 'all', eval_submodule_state: Literal["commit", "full", "no"] = 'full') -> dict[Path, dict[str, str]]:
"""Simplified `git status` equivalent.
Parameters
----------
paths : list or None
If given, limits the query to the specified paths. To query all
paths specify `None`, not an empty list. If a query path points
into a subdataset, a report is made on the subdataset record
within the queried dataset only (no recursion).
untracked : {'no', 'normal', 'all'}
If and how untracked content is reported:
'no': no untracked files are reported; 'normal': untracked files
and entire untracked directories are reported as such; 'all': report
individual files even in fully untracked directories.
eval_submodule_state : {'full', 'commit', 'no'}
If 'full' (the default), the state of a submodule is evaluated by
considering all modifications, with the treatment of untracked files
determined by `untracked`. If 'commit', the modification check is
restricted to comparing the submodule's HEAD commit to the one
recorded in the superdataset. If 'no', the state of the subdataset is
not evaluated.
Returns
-------
dict
Each content item has an entry under a pathlib `Path` object instance
pointing to its absolute path inside the repository (this path is
guaranteed to be underneath `Repo.path`).
Each value is a dictionary with properties:
`type`
Can be 'file', 'symlink', 'dataset', 'directory'
`state`
Can be 'added', 'untracked', 'clean', 'deleted', 'modified'.
"""
lgr.debug('Query status of %r for %s paths',
self, len(paths) if paths is not None else 'all')
return self.diffstatus(
fr='HEAD' if self.get_hexsha() else None,
to=None,
paths=paths,
untracked=untracked,
eval_submodule_state=eval_submodule_state)
[docs]
def diff(self, fr: Optional[str], to: Optional[str], paths: Optional[Sequence[str | PathLike[str]]] = None, untracked: str = 'all',
eval_submodule_state: Literal["commit", "full", "no"] = 'full') -> dict[Path, dict[str, str]]:
"""Like status(), but reports changes between to arbitrary revisions
Parameters
----------
fr : str or None
Revision specification (anything that Git understands). Passing
`None` considers anything in the target state as new.
to : str or None
Revision specification (anything that Git understands), or None
to compare to the state of the work tree.
paths : list or None
If given, limits the query to the specified paths. To query all
paths specify `None`, not an empty list.
untracked : {'no', 'normal', 'all'}
If and how untracked content is reported when `to` is None:
'no': no untracked files are reported; 'normal': untracked files
and entire untracked directories are reported as such; 'all': report
individual files even in fully untracked directories.
eval_submodule_state : {'full', 'commit', 'no'}
If 'full' (the default), the state of a submodule is evaluated by
considering all modifications, with the treatment of untracked files
determined by `untracked`. If 'commit', the modification check is
restricted to comparing the submodule's HEAD commit to the one
recorded in the superdataset. If 'no', the state of the subdataset is
not evaluated.
Returns
-------
dict
Each content item has an entry under a pathlib `Path` object instance
pointing to its absolute path inside the repository (this path is
guaranteed to be underneath `Repo.path`).
Each value is a dictionary with properties:
`type`
Can be 'file', 'symlink', 'dataset', 'directory'
`state`
Can be 'added', 'untracked', 'clean', 'deleted', 'modified'.
"""
return {k: v for k, v in self.diffstatus(
fr=fr, to=to, paths=paths,
untracked=untracked,
eval_submodule_state=eval_submodule_state).items()
if v.get('state', None) != 'clean'}
@overload
def diffstatus(self, fr: Optional[str], to: Optional[str], paths: Optional[Sequence[str | PathLike[str]]] = None, untracked: str = 'all', *, eval_submodule_state: Literal["global"], _cache: Optional[dict] = None) -> str:
...
@overload
def diffstatus(self, fr: Optional[str], to: Optional[str], paths: Optional[Sequence[str | PathLike[str]]] = None, untracked: str = 'all', eval_submodule_state: Literal["commit", "full", "no"] = "full", _cache: Optional[dict] = None) -> dict[Path, dict[str, str]]:
...
[docs]
def diffstatus(self, fr: Optional[str], to: Optional[str], paths: Optional[Sequence[str | PathLike[str]]] = None, untracked: str = 'all',
eval_submodule_state: str = 'full', _cache: Optional[dict] = None) -> dict[Path, dict[str, str]] | str:
"""Like diff(), but reports the status of 'clean' content too.
It supports an additional submodule evaluation state 'global'.
If given, it will return a single 'modified'
(vs. 'clean') state label for the entire repository, as soon as
it can.
"""
def _get_cache_key(label: str, paths: Optional[list[Path]], ref: Optional[str], untracked: Optional[str] = None) -> tuple[str, str, Optional[tuple[Path, ...]], Optional[str], Optional[str]]:
return self.path, label, tuple(paths) if paths else None, \
ref, untracked
if _cache is None:
_cache = {}
ppaths: Optional[list[Path]]
if paths is not None:
# at this point we must normalize paths to the form that
# Git would report them, to easy matching later on
ppaths = [
p.relative_to(self.pathobj) if p.is_absolute() else p
for p in map(ut.Path, paths)
]
else:
ppaths = None
# TODO report more info from get_content_info() calls in return
# value, those are cheap and possibly useful to a consumer
# we need (at most) three calls to git
if to is None:
# everything we know about the worktree, including os.stat
# for each file
key = _get_cache_key('ci', ppaths, None, untracked)
if key in _cache:
to_state = _cache[key]
else:
to_state = self.get_content_info(
paths=ppaths, ref=None, untracked=untracked)
_cache[key] = to_state
# we want Git to tell us what it considers modified and avoid
# reimplementing logic ourselves
key = _get_cache_key('mod', ppaths, None)
if key in _cache:
modified = _cache[key]
else:
# from Git 2.31.0 onwards ls-files has --deduplicate
# by for backward compatibility keep doing deduplication here
modified = set(
self.pathobj.joinpath(ut.PurePosixPath(p))
for p in self.call_git_items_(
# we must also look for deleted files, for the logic
# below to work. Only from Git 2.31.0 would they be
# included with `-m` alone
['ls-files', '-z', '-m', '-d'],
# low-level code cannot handle pathobjs
files=[str(p) for p in ppaths] if ppaths is not None else None,
sep='\0',
read_only=True)
if p)
_cache[key] = modified
else:
key = _get_cache_key('ci', ppaths, to)
if key in _cache:
to_state = _cache[key]
else:
to_state = self.get_content_info(paths=ppaths, ref=to)
_cache[key] = to_state
# we do not need worktree modification detection in this case
modified = None
# origin state
key = _get_cache_key('ci', ppaths, fr)
if key in _cache:
from_state = _cache[key]
else:
if fr:
from_state = self.get_content_info(paths=ppaths, ref=fr)
else:
# no ref means from nothing
from_state = {}
_cache[key] = from_state
status = dict()
for f, to_state_r in to_state.items():
props = self._diffstatus_get_state_props(
f,
from_state.get(f, None),
to_state_r,
# are we comparing against a recorded commit or the worktree
to is not None,
# if we have worktree modification info, report if
# path is reported as modified in it
modified and f in modified,
eval_submodule_state)
# potential early exit in "global" eval mode
if eval_submodule_state == 'global' and \
props.get('state', None) not in ('clean', None):
# any modification means globally 'modified'
return 'modified'
status[f] = props
for f, from_state_r in from_state.items():
if f not in to_state:
# we new this, but now it is gone and Git is not complaining
# about it being missing -> properly deleted and deletion
# stages
status[f] = dict(
state='deleted',
type=from_state_r['type'],
# report the shasum to distinguish from a plainly vanished
# file
gitshasum=from_state_r['gitshasum'],
)
if eval_submodule_state == 'global':
return 'modified'
if to is not None or eval_submodule_state == 'no':
# if we have `to` we are specifically comparing against
# a recorded state, and this function only attempts
# to label the state of a subdataset, not investigate
# specifically what the changes in subdatasets are
# this is done by a high-level command like rev-diff
# so the comparison within this repo and the present
# `state` label are all we need, and they are done already
if eval_submodule_state == 'global':
return 'clean'
else:
return status
# loop over all subdatasets and look for additional modifications
for f, st in status.items():
f = str(f)
if 'state' in st or not st['type'] == 'dataset':
# no business here
continue
if not GitRepo.is_valid_repo(f):
# submodule is not present, no chance for a conflict
st['state'] = 'clean'
continue
# we have to recurse into the dataset and get its status
subrepo = repo_from_path(f)
# get the HEAD commit, or the one of the corresponding branch
# only that one counts re super-sub relationship
# save() syncs the corresponding branch each time
subrepo_commit = subrepo.get_hexsha(subrepo.get_corresponding_branch())
st['gitshasum'] = subrepo_commit
# subdataset records must be labeled clean up to this point
# test if current commit in subdataset deviates from what is
# recorded in the dataset
st['state'] = 'modified' \
if st['prev_gitshasum'] != subrepo_commit \
else 'clean'
if eval_submodule_state == 'global' and st['state'] == 'modified':
return 'modified'
if eval_submodule_state == 'commit':
continue
# the recorded commit did not change, so we need to make
# a more expensive traversal
st['state'] = subrepo.diffstatus(
# we can use 'HEAD' because we know that the commit
# did not change. using 'HEAD' will facilitate
# caching the result
fr='HEAD',
to=None,
paths=None,
untracked=untracked,
eval_submodule_state='global',
_cache=_cache) if st['state'] == 'clean' else 'modified'
if eval_submodule_state == 'global' and st['state'] == 'modified':
return 'modified'
if eval_submodule_state == 'global':
return 'clean'
else:
return status
def _diffstatus_get_state_props(self, f: Path,
from_state: Optional[dict[str, str]],
to_state: dict[str, str],
against_commit: bool,
modified_in_worktree: bool,
eval_submodule_state: str) -> dict[str, str]:
"""Helper to determine diff properties for a single path
Parameters
----------
f : Path
from_state : dict
to_state : dict
against_commit : bool
Flag whether `to_state` reflects a commit or the worktree.
modified_in_worktree : bool
Flag whether a worktree modification is reported. This is ignored
when `against_commit` is True.
eval_submodule_state : {'commit', 'no', ...}
"""
if against_commit:
# we can ignore any worktree modification reported when
# comparing against a commit
modified_in_worktree = False
props = {}
if 'type' in to_state:
props['type'] = to_state['type']
to_sha = to_state['gitshasum']
from_sha = from_state['gitshasum'] if from_state else None
# determine the state of `f` from from_state and to_state records, if
# it can be determined conclusively from it. If not, it will
# stay None for now
state = None
if not from_state:
# this is new, or rather not known to the previous state
state = 'added' if to_sha else 'untracked'
elif to_sha == from_sha and not modified_in_worktree:
# something that is seemingly unmodified, based on the info
# gathered so far
if to_state['type'] == 'dataset':
if against_commit or eval_submodule_state == 'commit':
# we compare against a recorded state, just based on
# the shas we can be confident, otherwise the state
# of a subdataset isn't fully known yet, because
# `modified_in_worktree` will only reflect changes
# in the commit of a subdataset without looking into
# it for uncommitted changes. Such tests are done
# later and based on further conditionals for
# performance reasons
state = 'clean'
else:
# no change in git record, and no change on disk
# at this point we know that the reported object ids
# for this file are identical in the to and from
# records. If to is None, we're comparing to the
# working tree and a deleted file will still have an
# identical id, so we need to check whether the file is
# gone before declaring it clean. This working tree
# check is irrelevant and wrong if to is a ref.
state = 'clean' \
if against_commit or (f.exists() or f.is_symlink()) \
else 'deleted'
else:
# change in git record, or on disk
# for subdatasets leave the 'modified' judgement to the caller
# for supporting corner cases, such as adjusted branch
# which require inspection of a subdataset
# TODO we could have a new file that is already staged
# but had subsequent modifications done to it that are
# unstaged. Such file would presently show up as 'added'
# ATM I think this is OK, but worth stating...
state = ('modified'
if against_commit or to_state['type'] != 'dataset'
else None
) if f.exists() or f.is_symlink() else 'deleted'
# TODO record before and after state for diff-like use
# cases
if state in ('clean', 'added', 'modified', None):
# assign present gitsha to any record
# state==None can only happen for subdatasets that
# already existed, so also assign a sha for them
props['gitshasum'] = to_sha
if 'bytesize' in to_state:
# if we got this cheap, report it
props['bytesize'] = to_state['bytesize']
elif state == 'clean':
assert from_state is not None
if 'bytesize' in from_state:
# no change, we can take this old size info
props['bytesize'] = from_state['bytesize']
if state in ('clean', 'modified', 'deleted', None):
# assign previous gitsha to any record
# state==None can only happen for subdatasets that
# already existed, so also assign a sha for them
assert from_sha is not None
props['prev_gitshasum'] = from_sha
if state:
# only report a state if we could determine any
# outside code tests for existence of the property
# and not (always) for the value
props['state'] = state
return props
def _save_pre(self, paths: Optional[Sequence[str | PathLike[str]]], _status: Optional[dict[Path, dict[str, str]]], **kwargs: Any) -> Optional[dict[Path, dict[str, str]]]:
# helper to get an actionable status report
if paths is not None and not paths and not _status:
return None
if _status is None:
if 'untracked' not in kwargs:
kwargs['untracked'] = 'normal'
status = self.status(
paths=paths,
**{k: kwargs[k] for k in kwargs
if k in ('untracked', 'eval_submodule_state')})
else:
# we want to be able to add items down the line
# make sure to detach from prev. owner
status = _status.copy()
return status
[docs]
def get_staged_paths(self) -> list[str]:
"""Returns a list of any stage repository path(s)
This is a rather fast call, as it will not depend on what is going on
in the worktree.
"""
try:
return list(self.call_git_items_(
['diff', '--name-only', '--staged'],
expect_stderr=True))
except CommandError as e:
lgr.debug(CapturedException(e))
return []
def _save_post(self, message: Optional[str], files: Iterable[Path], partial_commit: bool, amend: bool = False,
allow_empty: bool = False) -> None:
# helper to commit changes reported in status
# TODO remove pathobj stringification when commit() can
# handle it
to_commit = [str(f.relative_to(self.pathobj))
for f in files] \
if partial_commit else None
if not partial_commit or to_commit or allow_empty or \
(amend and message):
# we directly call GitRepo.commit() to avoid a whole slew
# if direct-mode safeguards and workarounds in the AnnexRepo
# implementation (which also run an additional dry-run commit
GitRepo.commit(
self,
files=to_commit,
msg=message,
options=to_options(amend=amend, allow_empty=allow_empty),
# do not raise on empty commit
# it could be that the `add` in this save-cycle has already
# brought back a 'modified' file into a clean state
careless=True,
)
[docs]
def save(self, message: Optional[str] = None, paths: Optional[list[Path]] = None, _status: Optional[dict[Path, dict[str, str]]] = None, **kwargs: Any) -> list[dict]:
"""Save dataset content.
Parameters
----------
message : str or None
A message to accompany the changeset in the log. If None,
a default message is used.
paths : list or None
Any content with path matching any of the paths given in this
list will be saved. Matching will be performed against the
dataset status (GitRepo.status()), or a custom status provided
via `_status`. If no paths are provided, ALL non-clean paths
present in the repo status or `_status` will be saved.
_status : dict or None
If None, Repo.status() will be queried for the given `ds`. If
a dict is given, its content will be used as a constraint.
For example, to save only modified content, but no untracked
content, set `paths` to None and provide a `_status` that has
no entries for untracked content.
**kwargs :
Additional arguments that are passed to underlying Repo methods.
Supported:
- git : bool (passed to Repo.add()
- eval_submodule_state : {'full', 'commit', 'no'}
passed to Repo.status()
- untracked : {'no', 'normal', 'all'} - passed to Repo.status()
- amend : bool (passed to GitRepo.commit)
"""
return list(
self.save_(
message=message,
paths=paths,
_status=_status,
**kwargs
)
)
[docs]
def save_(self, message: Optional[str] = None, paths: Optional[list[Path]] = None, _status: Optional[dict[Path, dict[str, str]]] = None, **kwargs: Any) -> Iterator[dict]:
"""Like `save()` but working as a generator."""
from datalad.interface.results import get_status_dict
status_state = _get_save_status_state(
self._save_pre(paths, _status, **kwargs) or {}
)
amend = kwargs.get('amend', False)
# TODO: check on those None's -- may be those are also "nothing to worry about"
# and we could just return?
if not any(status_state.values()) and not (message and amend):
# all clean, nothing todo
lgr.debug('Nothing to save in %r, exiting early', self)
return
# three things are to be done:
# - remove (deleted if not already staged)
# - add (modified/untracked)
# - commit (with all paths that have been touched, to bypass
# potential pre-staged bits)
staged_paths = self.get_staged_paths()
need_partial_commit = bool(staged_paths)
if need_partial_commit and hasattr(self, "call_annex"):
# so we have some staged content. let's check which ones
# are symlinks -- those could be annex key links that
# are broken after a `git-mv` operation
# https://github.com/datalad/datalad/issues/4967
# call `git-annex pre-commit` on them to rectify this before
# saving the wrong symlinks
added = status_state['added']
tofix = [
sp for sp in staged_paths
if added.get(self.pathobj / sp, {}).get("type") == "symlink"
]
if tofix:
self.call_annex(['pre-commit'], files=tofix)
submodule_change = False
if status_state['deleted']:
vanished_subds = [
str(f.relative_to(self.pathobj))
for f, props in status_state['deleted'].items()
if props.get('type') == 'dataset'
]
if vanished_subds:
# we submodule removal we use `git-rm`, because the clean-up
# is more complex than just an index update -- make no
# sense to have a duplicate implementation.
# we do not yield here, but only altogether below -- we are just
# processing gone components, should always be quick.
self._call_git(['rm', '-q'], files=vanished_subds)
submodule_change = True
# remove anything from the index that was found to be gone
self._call_git(
['update-index', '--remove'],
files=[
str(f.relative_to(self.pathobj))
for f, props in status_state['deleted'].items()
# do not update the index, if there is already
# something staged for this path (e.g.,
# a directory was removed and a file staged
# in its place)
if not props.get('gitshasum')
# we already did the submodules
and props.get('type') != 'dataset'
]
)
# now yield all deletions
for p, props in status_state['deleted'].items():
yield get_status_dict(
action='delete',
refds=self.pathobj,
type=props.get('type'),
path=str(p),
status='ok',
logger=lgr)
# TODO this additional query should not be, based on status as given
# if anyhow possible, however, when paths are given, status may
# not contain all required information. In case of path=None AND
# _status=None, we should be able to avoid this, because
# status should have the full info already
# looks for contained repositories
untracked_dirs = [
f.relative_to(self.pathobj)
for f, props in status_state['untracked'].items()
if props.get('type', None) == 'directory']
to_add_submodules = []
if untracked_dirs:
to_add_submodules = [
sm for sm, sm_props in
self.get_content_info(
untracked_dirs,
ref=None,
# request exhaustive list, so that everything that is
# still reported as a directory must be its own repository
untracked='all').items()
if sm_props.get('type', None) == 'directory']
to_add_submodules = _prune_deeper_repos(to_add_submodules)
to_stage_submodules = {
f: props
for f, props in status_state['modified_or_untracked'].items()
if props.get('type', None) == 'dataset'}
if to_stage_submodules:
lgr.debug(
'%i submodule path(s) to stage in %r %s',
len(to_stage_submodules), self,
to_stage_submodules
if len(to_stage_submodules) < 10 else '')
to_add_submodules += list(to_stage_submodules)
if to_add_submodules:
for r in self._save_add_submodules(to_add_submodules):
if r.get('status', None) == 'ok':
submodule_change = True
yield r
if submodule_change:
# this will alter the config, reload
self.config.reload()
# need to include .gitmodules in what needs committing
f = self.pathobj.joinpath('.gitmodules')
status_state['modified_or_untracked'][f] = \
status_state['modified'][f] = \
dict(type='file', state='modified')
# now stage .gitmodules
self._call_git(['update-index', '--add'], files=['.gitmodules'])
# and report on it
yield get_status_dict(
action='add',
refds=self.pathobj,
type='file',
path=f,
status='ok',
logger=lgr)
to_add = {
# TODO remove pathobj stringification when add() can
# handle it
str(f.relative_to(self.pathobj)): props
for f, props in status_state['modified_or_untracked'].items()
if not (f in to_add_submodules or f in to_stage_submodules)}
if to_add:
compat_config = \
self.config.obtain("datalad.save.windows-compat-warning")
to_add, problems = self._check_for_win_compat(to_add, compat_config)
lgr.debug(
'%i path(s) to add to %s %s',
len(to_add), self, to_add if len(to_add) < 10 else '')
if to_add:
yield from self._save_add(
to_add,
git_opts=None,
**{k: kwargs[k] for k in kwargs
if k in (('git',) if hasattr(self, 'uuid')
else tuple())})
if problems:
from datalad.interface.results import get_status_dict
msg = \
'Incompatible name for Windows systems; disable with ' \
'datalad.save.windows-compat-warning.',
for path in problems:
yield get_status_dict(
action='save',
refds=self.pathobj,
type='file',
path=(self.pathobj / ut.PurePosixPath(path)),
status='impossible',
message=msg,
logger=lgr)
# https://github.com/datalad/datalad/issues/6558
# file could have become a directory. Unfortunately git
# would then mistakenly refuse to commit if that old path is also
# given to commit, so we better filter it out
if status_state['deleted'] and status_state['added']:
# check if any "deleted" is a directory now. Then for those
# there should be some other path under that directory in 'added'
for f in [_ for _ in status_state['deleted'] if _.is_dir()]:
# this could potentially be expensive if lots of files become
# directories, but it is unlikely to happen often
# Note: PurePath.is_relative_to was added in 3.9 and seems slowish
# path_is_subpath faster, also if comparing to "in f.parents"
f_str = str(f)
if any(path_is_subpath(str(f2), f_str) for f2 in status_state['added']):
status_state['deleted'].pop(f) # do not bother giving it to commit below in _save_post
# Note, that allow_empty is always ok when we amend. Required when we
# amend an empty commit while the amendment is empty, too (though
# possibly different message). If an empty commit was okay before, it's
# okay now.
status_state.pop('modified_or_untracked') # pop the hybrid state
self._save_post(message, chain(*status_state.values()), need_partial_commit, amend=amend,
allow_empty=amend)
# TODO yield result for commit, prev helper checked hexsha pre
# and post...
def _check_for_win_compat(self, files: dict[str, Any], config: str) -> tuple[dict[str, Any], Optional[list[str]]]:
"""Check file names for illegal characters or reserved names on Windows
In the case that a non-Windows-compatible file is detected, warn users
about potential interoperability issues.
Parameters
----------
files
list of files to add
config
value of self.config.obtain("datalad.save.windows-compat-warning"),
used to choose appropriate behavior. "none" performs no check,
"warning" warns in case of incompatibilities, and "error" results in
an error result in case of incompatibilities
"""
# don't perform any check when the configuration is set to 'none'
if config == 'none':
return files, None
from collections import defaultdict
problems: dict[str, list[str]] = defaultdict(list)
for file in files:
for part in Path(file).parts:
# check every component of the path for incompatibilities
if Path(part).stem.upper() in RESERVED_NAMES_WIN:
problems['Elements using a reserved filename:'].append(part)
problems['paths'].append(file)
if re.search(ILLEGAL_CHARS_WIN, part):
problems['Elements with illegal characters:'].append(part)
problems['paths'].append(file)
if part.endswith('.'):
problems['Elements ending with a dot:'].append(part)
problems['paths'].append(file)
if part.endswith(' '):
problems['Elements ending with a space:'].append(part)
problems['paths'].append(file)
if not problems:
return files, None
msg = \
"Some elements of your dataset are not compatible with " \
"Windows systems. Disable this check by changing " \
"datalad.save.windows-compat-warning or consider renaming " \
"the following elements: "
for k, v in problems.items():
# use the key as an explanation, and report filenames only once
msg += f"\n{k} {[*{*v}]}" if k != 'paths' else ''
if config == 'warning':
lgr.warning(msg)
return files, None
elif config == 'error':
# take the problematic files out of to_add
for path in [*{*problems['paths']}]:
files.pop(path)
return files, [*{*problems['paths']}]
else:
raise ValueError(f"Invalid 'config' value {config!r}")
def _save_add(self, files: dict[str, Any], git_opts: Optional[list[str]] = None) -> Iterator[dict]:
"""Simple helper to add files in save()"""
from datalad.interface.results import get_status_dict
try:
# without --verbose git 2.9.3 add does not return anything
add_out = self._call_git(
# Set annex.largefiles to prevent storing files in
# annex with a v6+ annex repo.
['-c', 'annex.largefiles=nothing', 'add'] +
ensure_list(git_opts) + ['--verbose'],
files=list(files.keys()),
pathspec_from_file=True,
)
# get all the entries
for r in self._process_git_get_output(*add_out):
yield get_status_dict(
action=str(r.get('command', 'add')),
refds=self.pathobj,
type='file',
path=(self.pathobj / ut.PurePosixPath(r['file']))
if 'file' in r else None,
status='ok' if r.get('success', None) else 'error',
key=r.get('key', None),
# while there is no git-annex underneath here, we
# tend to fake its behavior, so we can also support
# this type of messaging
#message='\n'.join(r['error-messages'])
#if 'error-messages' in r else None,
message=None,
logger=lgr)
except OSError as e:
lgr.error("add: %s", e)
raise
def _save_add_submodules(self, paths: list[Path] | dict[Path, dict]) -> Iterator[dict]:
"""Add new submodules, or updates records of existing ones
This method does not use `git submodule add`, but aims to be more
efficient by limiting the scope to mere in-place registration of
multiple already present repositories.
Parameters
----------
paths : list(Path)
Yields
------
dict
Result records
"""
from datalad.interface.results import get_status_dict
# first gather info from all datasets in read-only fashion, and then
# update index, .gitmodules and .git/config at once
info = []
# To avoid adding already known: https://github.com/datalad/datalad/issues/6843
# We must not add already known submodules explicitly since "untracked"
# can be assigned even for known ones (TODO: add issue, might have been closed)?
# Not sure if operating on relative paths would provide any speed up so use full
known_sub_paths = {s['path'] for s in self.get_submodules_()}
for path in paths:
already_known = path in known_sub_paths
rpath = str(path.relative_to(self.pathobj).as_posix())
subm = repo_from_path(path)
# if there is a corresponding branch, we want to record it's state.
# we rely on the corresponding branch being synced already.
# `save` should do that each time it runs.
subm_commit = subm.get_hexsha(subm.get_corresponding_branch())
if not subm_commit:
yield get_status_dict(
action='add_submodule',
ds=self,
path=str(path),
status='error',
message=('cannot add subdataset %s with no commits', subm),
logger=lgr)
continue
# make an attempt to configure a submodule source URL based on the
# discovered remote configuration
remote, branch = subm.get_tracking_branch()
url = subm.get_remote_url(remote) if remote else None
if url is None:
url = './{}'.format(rpath)
subm_id = subm.config.get('datalad.dataset.id', None)
info.append(
dict(
# if we have additional information on this path, pass it on.
# if not, treat it as an untracked directory
paths[path] if isinstance(paths, dict)
else dict(type='directory', state='untracked'),
path=path, rpath=rpath, commit=subm_commit, id=subm_id,
url=url, known=already_known))
# bypass any convenience or safe-manipulator for speed reasons
# use case: saving many new subdatasets in a single run
with (self.pathobj / '.gitmodules').open('a') as gmf, \
(self.pathobj / '.git' / 'config').open('a') as gcf:
for i in info:
# we update the subproject commit unconditionally
self.call_git([
'update-index', '--add', '--replace', '--cacheinfo', '160000',
i['commit'], i['rpath']
])
# only write the .gitmodules/.config changes when this is not yet
# a subdataset and not yet already known
# TODO: we could update the URL, and branch info at this point,
# even for previously registered subdatasets
if not i['known'] and (i['type'] != 'dataset' or (
i['type'] == 'dataset' and i['state'] == 'untracked')):
gmprops = dict(path=i['rpath'], url=i['url'])
if i['id']:
gmprops['datalad-id'] = i['id']
write_config_section(
gmf, 'submodule', i['rpath'], gmprops)
write_config_section(
gcf, 'submodule', i['rpath'], dict(active='true', url=i['url']))
# This mirrors the result structure yielded for
# to_stage_submodules below.
yield get_status_dict(
action='add',
refds=self.pathobj,
type='dataset',
key=None,
path=i['path'],
status='ok',
logger=lgr)
def _get_save_status_state(status: dict[Path, dict[str, str]]) -> dict[Optional[str], dict[Path, dict[str, str]]]:
"""
Returns
-------
dict
By status category by file path, mapped to status properties.
"""
# Sort status into status by state with explicit list of states
# (excluding clean we do not care about) we expect to be present
# and which we know of (unless None), and modified_or_untracked hybrid
# since it is used below
status_state: dict[Optional[str], dict[Path, dict[str, str]]] = {
k: {}
for k in (None, # not cared of explicitly here
'added', # not cared of explicitly here
# 'clean' # not even wanted since nothing to do about those
'deleted',
'modified',
'untracked',
'modified_or_untracked', # hybrid group created here
)}
for f, props in status.items():
state = props.get('state', None)
if state == 'clean':
# we don't care about clean
continue
if state == 'modified' and props.get('gitshasum') \
and props.get('gitshasum') == props.get('prev_gitshasum'):
# reported as modified, but with identical shasums -> typechange
# a subdataset maybe? do increasingly expensive tests for
# speed reasons
if props.get('type') != 'dataset' and f.is_dir() \
and GitRepo.is_valid_repo(f):
# it was not a dataset, but now there is one.
# we declare it untracked to engage the discovery tooling.
state = 'untracked'
props = dict(type='dataset', state='untracked')
status_state[state][f] = props
# The hybrid one to retain the same order as in original status
if state in ('modified', 'untracked'):
status_state['modified_or_untracked'][f] = props
return status_state
# used in in the get command and GitRepo.add_submodule(), the
# latter is not used outside the tests
def _fixup_submodule_dotgit_setup(ds: Dataset, relativepath: str | Path) -> None:
"""Implementation of our current of .git in a subdataset
Each subdataset/module has its own .git directory where a standalone
repository would have it. No gitdir files, no symlinks.
"""
# move .git to superrepo's .git/modules, remove .git, create
# .git-file
path = opj(ds.path, relativepath)
subds_dotgit = opj(path, ".git")
repo = GitRepo(path, create=False)
if repo.dot_git.parent == repo.pathobj:
# this is what we want
return
# first we want to remove any conflicting worktree setup
# done by git to find the checkout at the mountpoint of the
# submodule, if we keep that, any git command will fail
# after we move .git
# Ben: Shouldn't we re-setup a possible worktree afterwards?
repo.config.unset('core.worktree', scope='local')
# what we have here is some kind of reference, remove and
# replace by the target
os.remove(subds_dotgit)
# make absolute
src_dotgit = str(repo.dot_git)
# move .git
from os import (
listdir,
rename,
rmdir,
)
ensure_dir(subds_dotgit)
for dot_git_entry in listdir(src_dotgit):
rename(opj(src_dotgit, dot_git_entry),
opj(subds_dotgit, dot_git_entry))
assert not listdir(src_dotgit)
rmdir(src_dotgit)
# try retro-fitting GitRepo with deprecated functionality
# must be done last in this file
try:
from datalad_deprecated.gitrepo import DeprecatedGitRepoMethods
for symbol in dir(DeprecatedGitRepoMethods):
if symbol.startswith('__'):
# ignore Python internals
continue
if hasattr(GitRepo, symbol):
lgr.debug(
'Not retro-fitted GitRepo with deprecated %s, '
'name-space conflict', symbol)
# do not override existing symbols
continue
# assign deprecated symbol to GitRepo
setattr(GitRepo, symbol, getattr(DeprecatedGitRepoMethods, symbol))
lgr.debug('Retro-fitted GitRepo with deprecated %s', symbol)
except ImportError as e:
ce = CapturedException(e)
lgr.debug(
'Not retro-fitting GitRepo with deprecated symbols, '
'datalad-deprecated package not found')