Source code for cookiecutter.zipfile

"""Utility functions for handling and fetching repo archives in zip format."""

from __future__ import annotations

import os
import tempfile
from pathlib import Path
from zipfile import BadZipFile, ZipFile

import requests

from cookiecutter.exceptions import InvalidZipRepository
from cookiecutter.prompt import prompt_and_delete, read_repo_password
from cookiecutter.utils import make_sure_path_exists


[docs] def unzip( zip_uri: str, is_url: bool, clone_to_dir: Path | str = ".", no_input: bool = False, password: str | None = None, ) -> str: """Download and unpack a zipfile at a given URI. This will download the zipfile to the cookiecutter repository, and unpack into a temporary directory. :param zip_uri: The URI for the zipfile. :param is_url: Is the zip URI a URL or a file? :param clone_to_dir: The cookiecutter repository directory to put the archive into. :param no_input: Do not prompt for user input and eventually force a refresh of cached resources. :param password: The password to use when unpacking the repository. """ # Ensure that clone_to_dir exists clone_to_dir = Path(clone_to_dir).expanduser() make_sure_path_exists(clone_to_dir) if is_url: # Build the name of the cached zipfile, # and prompt to delete if it already exists. identifier = zip_uri.rsplit('/', 1)[1] zip_path = os.path.join(clone_to_dir, identifier) if os.path.exists(zip_path): download = prompt_and_delete(zip_path, no_input=no_input) else: download = True if download: # (Re) download the zipfile r = requests.get(zip_uri, stream=True, timeout=100) with open(zip_path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks f.write(chunk) else: # Just use the local zipfile as-is. zip_path = os.path.abspath(zip_uri) # Now unpack the repository. The zipfile will be unpacked # into a temporary directory try: zip_file = ZipFile(zip_path) if len(zip_file.namelist()) == 0: raise InvalidZipRepository(f'Zip repository {zip_uri} is empty') # The first record in the zipfile should be the directory entry for # the archive. If it isn't a directory, there's a problem. first_filename = zip_file.namelist()[0] if not first_filename.endswith('/'): raise InvalidZipRepository( f"Zip repository {zip_uri} does not include a top-level directory" ) # Construct the final target directory project_name = first_filename[:-1] unzip_base = tempfile.mkdtemp() unzip_path = os.path.join(unzip_base, project_name) # Extract the zip file into the temporary directory try: zip_file.extractall(path=unzip_base) except RuntimeError as runtime_err: # File is password protected; try to get a password from the # environment; if that doesn't work, ask the user. if password is not None: try: zip_file.extractall(path=unzip_base, pwd=password.encode('utf-8')) except RuntimeError as e: raise InvalidZipRepository( 'Invalid password provided for protected repository' ) from e elif no_input: raise InvalidZipRepository( 'Unable to unlock password protected repository' ) from runtime_err else: retry: int | None = 0 while retry is not None: try: password = read_repo_password('Repo password') zip_file.extractall( path=unzip_base, pwd=password.encode('utf-8') ) retry = None except RuntimeError as e: # noqa: PERF203 retry += 1 # type: ignore[operator] if retry == 3: raise InvalidZipRepository( 'Invalid password provided for protected repository' ) from e except BadZipFile as e: raise InvalidZipRepository( f'Zip repository {zip_uri} is not a valid zip archive:' ) from e return unzip_path