Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- Program to copy a file from source to destination.
- During the copy operation the hash value is calculated.
- A progress bar is also supported.
- If tqdm is not installed, it will install via
- pip as a user site-package.
- """
- import hashlib
- import sys
- import mmap
- from argparse import ArgumentParser
- from argparse import ArgumentDefaultsHelpFormatter as HelpFormatter
- from pathlib import Path
- from subprocess import call
- def chunker(file_size, chunk_size):
- pos = 0
- while pos < file_size:
- next_pos = min(pos + chunk_size, file_size)
- yield slice(pos, next_pos)
- pos += chunk_size
- def copy(src, dst, buffer_size, hash_algo, progress=False, check=False):
- file_size = src.stat().st_size
- if progress:
- progress_bar = tqdm(
- total=file_size,
- desc=f'Copy {src.name} > {dst.name}',
- unit="B", unit_scale=True,
- unit_divisor=1024,
- )
- hasher = hashlib.new(hash_algo)
- with src.open('rb') as src_fd:
- mm_src = mmap.mmap(src_fd.fileno(), 0, access=mmap.ACCESS_READ)
- with dst.open('r+b') as dst_fd:
- for data_slice in chunker(file_size, buffer_size):
- dst_fd.write(mm_src[data_slice])
- hasher.update(mm_src[data_slice])
- if progress:
- progress_bar.update(data_slice.stop - data_slice.start)
- progress_bar.close()
- if check:
- dst_fd.seek(0)
- dst_hasher = hashlib.new(hash_algo)
- if progress:
- progress_bar = tqdm(
- total=file_size,
- desc=f'Hashing {dst.name}',
- unit="B", unit_scale=True,
- unit_divisor=1024,
- )
- while True:
- chunk = dst_fd.read(buffer_size)
- if not chunk:
- break
- if progress:
- progress_bar.update(len(chunk))
- dst_hasher.update(chunk)
- if progress:
- progress_bar.close()
- check_result = True
- if dst_hasher.digest() == hasher.digest():
- print('Written file is ok', file=sys.stderr)
- else:
- print('Written file is different', file=sys.stderr)
- check_result = False
- return hasher.hexdigest(), check_result
- if __name__ == '__main__':
- hashes = hashlib.algorithms_available
- parser = ArgumentParser(description=__doc__, formatter_class=HelpFormatter)
- parser.add_argument('src', type=Path, help='Source file to copy')
- parser.add_argument('dst', type=Path, help='Destination file')
- parser.add_argument('hash', choices=hashes, help='Hash algorithm')
- parser.add_argument('--buffer', type=int, default=64 * 1024, help='Buffer size in bytes')
- parser.add_argument('--overwrite', action='store_true', help='Allow overwriting of destination file')
- parser.add_argument('--progress', action='store_true', help='Show a progress bar')
- parser.add_argument('--check', action='store_true', help='Read the destination file again and calculate the hash value')
- args = parser.parse_args()
- if not args.src.exists():
- print(f'Source file {args.src.name} does not exist.', file=sys.stderr)
- sys.exit(2)
- if not args.overwrite and args.dst.exists():
- print(f'Destination file {args.dst.name} exists.', file=sys.stderr)
- sys.exit(3)
- if args.progress:
- try:
- from tqdm import tqdm
- except ImportError:
- print('Python-Module tqdm is not installed, installing it now.', file=sys.stderr)
- call([sys.executable, '-m', 'pip', 'install', 'tqdm', '--user'])
- try:
- from tqdm import tqdm
- except ImportError:
- args.progress = False
- else:
- args.progress = True
- else:
- args.progress = True
- hex_digest, check_result = copy(args.src, args.dst, args.buffer, args.hash, args.progress, args.check)
- args.dst.with_suffix('.' + args.hash).write_text(f'{hex_digest} {args.dst.name}\n')
- if not check_result:
- sys.exit(10)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement