osmarks

generate-tape-image.py v2

May 8th, 2020
272
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #!/usr/bin/env python3
  2. import subprocess
  3. import tempfile
  4. import os
  5. import os.path
  6. import collections
  7. import json
  8. import sys
  9. import multiprocessing
  10. import struct
  11.  
  12. ProcessedTrack = collections.namedtuple("ProcessedTrack", ["dfpwm_file", "metadata"])
  13.  
  14. def convert_wav_dfpwm(infile, outfile):
  15.     subprocess.run(["java", "-jar", "LionRay.jar", infile, outfile])
  16.  
  17. def convert_any_wav(infile, outfile):
  18.     subprocess.run(["ffmpeg", "-hide_banner", "-i", infile, "-ac", "1", outfile], stderr=subprocess.PIPE)
  19.  
  20. def read_meta(path):
  21.     proc = subprocess.run(["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", path], stdout=subprocess.PIPE)
  22.     data = json.loads(proc.stdout)
  23.     meta = {}
  24.     # These are the two locations I've found tags in in my not very thorough testing
  25.     try:
  26.         meta.update(data["format"]["tags"])
  27.     except KeyError: pass
  28.     try:
  29.         meta.update(data["streams"][0]["tags"])
  30.     except KeyError: pass
  31.     # lowercase all keys because in Opus files these seem to be uppercase sometimes
  32.     return { k.lower(): v for k, v in meta.items() }
  33.  
  34. def process_file(filename):
  35.     meta = read_meta(filename)
  36.     wav_dest = tempfile.mktemp(".wav")
  37.     convert_any_wav(filename, wav_dest)
  38.     dfpwm_dest = tempfile.mktemp(".dfpwm")
  39.     convert_wav_dfpwm(wav_dest, dfpwm_dest)
  40.     os.remove(wav_dest)
  41.     print(filename)
  42.     return ProcessedTrack(dfpwm_dest, {
  43.         "title": meta["title"],
  44.         "artist": meta.get("artist", None) or meta.get("artists", None),
  45.         "album": meta.get("album", None)
  46.     })
  47.  
  48. def read_binary(filename):
  49.     with open(filename, "rb") as f:
  50.         return f.read()
  51.  
  52. def process_dir(dirname):
  53.     files = list(map(lambda file: os.path.join(dirname, file), os.listdir(dirname)))
  54.     with multiprocessing.Pool(8) as p:
  55.         tracks = p.map(process_file, files)
  56.     tape_image = b""
  57.     tracks_meta = []
  58.     for track in tracks:
  59.         track.metadata["start"] = len(tape_image)
  60.         data = read_binary(track.dfpwm_file)
  61.         os.remove(track.dfpwm_file)
  62.         track.metadata["end"] = track.metadata["start"] + len(data)
  63.         tape_image += data
  64.         tracks_meta.append(track.metadata)
  65.     # dump in a compact format to save space
  66.     meta = json.dumps({ "tracks": tracks_meta }, separators=(',', ':')).encode("utf-8")
  67.     assert(len(meta) < 65536)
  68.     # new format - 0x54 marker byte, then metadata length as 2-byte big endian integer, then metadata, then concatenated DFPWM files
  69.     # start is now not an absolute position but just how far after the metadata it is
  70.     tape_image = b"\x54" + struct.pack(">H", len(meta)) + meta + tape_image
  71.     with open("tape.bin", "wb") as f:
  72.         f.write(tape_image)
  73.     # Tape lengths are measured in minutes. 6000 bytes are played per second because they use a 48000Hz sample rate and DFPWM is somehow 1 bit per sample.
  74.     length_minutes = len(tape_image) / (6000*60)
  75.     print(length_minutes, "minute tape required")
  76.  
  77. process_dir(sys.argv[1])
Add Comment
Please, Sign In to add comment