Advertisement
dan-masek

Fixed LZ77 implementation

Apr 8th, 2021
694
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.58 KB | None | 0 0
  1. from abc import ABCMeta, abstractmethod
  2. import time
  3. import numpy as np
  4. from cv2_41 import cv2
  5.  
  6. # ============================================================================
  7.  
  8. def save_image_size(filename, rows, cols, channels):
  9.     with open(filename, "w") as imgSize:
  10.         imgSize.write(str(rows) + '\n')  # write row dimension
  11.         imgSize.write(str(cols) + '\n')  # write col dimension
  12.         imgSize.write(str(channels) + '\n')  # write col dimension
  13.  
  14. # ----------------------------------------------------------------------------
  15.  
  16. def load_image_size(filename):
  17.     with open(filename, "r") as imgSize:
  18.         row = int(imgSize.readline())
  19.         col = int(imgSize.readline())
  20.         ch = int(imgSize.readline())
  21.  
  22.     return (row, col, ch)
  23.  
  24. # ============================================================================
  25.  
  26. class LZ77Output(object):
  27.     __metaclass__ = ABCMeta
  28.  
  29.     @abstractmethod
  30.     def write_symbol(self, match_info, char):
  31.         pass
  32.  
  33. # ----------------------------------------------------------------------------
  34.  
  35. class LZ77OutputNPArray(LZ77Output):
  36.     def __init__(self):
  37.         self._tuples = np.array([], dtype=np.uint16)
  38.         self._chars = np.array([], dtype=np.uint8)
  39.  
  40.     def write_symbol(self, match_info, char):
  41.         self._tuples = np.append(self._tuples, np.uint16(match_info))
  42.         self._chars = np.append(self._chars, char)
  43.  
  44.     def save(self, filename_tuples, filename_chars):
  45.         np.save(filename_tuples, self._tuples)
  46.         np.save(filename_chars, self._chars)
  47.  
  48. # ----------------------------------------------------------------------------
  49.  
  50. class LZ77OutputList(LZ77Output):
  51.     def __init__(self):
  52.         self._tuples = []
  53.         self._chars = []
  54.  
  55.     def write_symbol(self, match_info, char):
  56.         self._tuples.append(match_info[0])
  57.         self._tuples.append(match_info[1])
  58.         self._chars.append(char)
  59.  
  60.     def save(self, filename_tuples, filename_chars):
  61.         np.save(filename_tuples, np.array(self._tuples, dtype=np.uint16))
  62.         np.save(filename_chars, np.array(self._chars, dtype=np.uint8))
  63.  
  64. # ============================================================================
  65.  
  66. class LZ77Input(object):
  67.     __metaclass__ = ABCMeta
  68.  
  69.     @abstractmethod
  70.     def read_symbol(self):
  71.         pass
  72.  
  73. # ----------------------------------------------------------------------------
  74.  
  75. class LZ77InputNPArray(LZ77Input):
  76.     def __init__(self, filename_tuples, filename_chars):
  77.         self._tuples = np.load(filename_tuples)
  78.         self._chars = np.load(filename_chars)
  79.         self._pos = 0
  80.  
  81.     def read_symbol(self):
  82.         if self._pos >= len(self._chars):
  83.             return (None, None)
  84.         match_info = (self._tuples[self._pos * 2], self._tuples[self._pos * 2 + 1])
  85.         char = self._chars[self._pos]
  86.         self._pos += 1
  87.         return (match_info, char)
  88.  
  89. # ============================================================================
  90.  
  91. FILENAME_IMAGE_SIZE = "imgSize.txt"
  92. FILENAME_COMPRESSED = "Compressed.txt"
  93. FILENAME_ENCODED_TUPLE = "encodedTuple.npy"
  94. FILENAME_ENCODED_CHARS = "encodedChar.npy"
  95.  
  96. # ============================================================================
  97.  
  98. def get_match_length(input_buffer, search_pos, pos, look_ahead_length):
  99.     for i in range(look_ahead_length):
  100.         if input_buffer[search_pos + i] != input_buffer[pos + i]:
  101.             return i
  102.     return look_ahead_length
  103.  
  104. # ----------------------------------------------------------------------------
  105.  
  106. def find_longest_match(input_buffer, sliding_window_start, pos, look_ahead_end):
  107.     look_ahead_length = look_ahead_end - pos
  108.     max_match_length = 0
  109.     max_match_pos = 0
  110.  
  111.     for search_pos in range(pos - 1, sliding_window_start - 1, -1):
  112.         match_length = get_match_length(input_buffer, search_pos, pos, look_ahead_length)
  113.         if match_length > max_match_length:
  114.             max_match_length = match_length
  115.             max_match_pos = search_pos
  116.             if match_length >= look_ahead_length:
  117.                 break # Short circuit, can't get any longer (when sliding window is bigger than look-ahead)
  118.  
  119.     if max_match_length == 0:
  120.         return 0, 0
  121.     return (pos - max_match_pos, max_match_length)
  122.  
  123. # ----------------------------------------------------------------------------
  124.  
  125. def lz77_compress_buffer(input_buffer, sliding_window_size, look_ahead_size, lz77_out):
  126.     pos = 0 # Current position -- beginning of look-ahead, one past the end of sliding window
  127.     end_pos = input_buffer.size
  128.     while pos < end_pos:
  129.         sliding_window_start = max(0, pos - sliding_window_size)
  130.         look_ahead_end = min(end_pos, pos + look_ahead_size)
  131.         offset, length = find_longest_match(input_buffer, sliding_window_start, pos, look_ahead_end)
  132.         if length < 2:
  133.             # No match, just emit literal
  134.             symbol = input_buffer[pos]
  135.             #print('%04d: Literal %s' % (pos, symbol))
  136.  
  137.             lz77_out.write_symbol((0, 0), symbol)
  138.             pos += 1
  139.         else:
  140.             if pos + length == end_pos:
  141.                 length -= 1 # Make sure there is one literal left to code
  142.             match_info = (offset, length)
  143.             symbol = input_buffer[pos + length]
  144.             #print('%04d: Match %s + Literal %s' % (pos, match_info, symbol))
  145.             lz77_out.write_symbol(match_info, symbol)
  146.             pos += length + 1
  147.  
  148.     return lz77_out
  149.  
  150. # ----------------------------------------------------------------------------
  151.  
  152. def lz77Compress(image, sliding_window_size, look_ahead_size, lz77_output_class=LZ77OutputList):
  153.     img = cv2.imread(image)
  154.     input_buffer = img.flatten()
  155.  
  156.     lz77_out = lz77_output_class()
  157.  
  158.     t_start = time.time()
  159.     lz77_compress_buffer(input_buffer, sliding_window_size, look_ahead_size, lz77_out)
  160.     t_end = time.time()
  161.  
  162.     save_image_size(FILENAME_IMAGE_SIZE, *img.shape)
  163.     lz77_out.save(FILENAME_ENCODED_TUPLE, FILENAME_ENCODED_CHARS)
  164.     # lz77_out.save_debug(FILENAME_COMPRESSED)
  165.  
  166.     print("Compress: %s" % (t_end - t_start))
  167.  
  168. # ============================================================================
  169.  
  170. def copy_match(output_buffer, offset, length, pos):
  171.     for i in range(length):
  172.         output_buffer[pos + i] = output_buffer[pos - offset + i]
  173.  
  174. # ----------------------------------------------------------------------------
  175.  
  176. def lz77_decompress_buffer(lz77_in, expected_size):
  177.     output_buffer = np.empty((expected_size), np.uint8)
  178.  
  179.     pos = 0
  180.     end_pos = output_buffer.size
  181.     while pos < end_pos:
  182.         match_info, symbol = lz77_in.read_symbol()
  183.         if not match_info:
  184.             break
  185.  
  186.         offset, length = match_info
  187.         if length == 0:
  188.             output_buffer[pos] = symbol
  189.             pos += 1
  190.         else:
  191.             copy_match(output_buffer, offset, length, pos)
  192.             output_buffer[pos + length] = symbol
  193.             pos += length + 1
  194.  
  195.     assert pos == end_pos
  196.  
  197.     return output_buffer
  198.  
  199. # ----------------------------------------------------------------------------
  200.  
  201. def lz77Decompressor(lz77_input_class=LZ77Input):
  202.     rows, cols, channels = load_image_size(FILENAME_IMAGE_SIZE)
  203.  
  204.     lz77_in = LZ77InputNPArray(FILENAME_ENCODED_TUPLE, FILENAME_ENCODED_CHARS)
  205.  
  206.     t_start = time.time()
  207.     output_buffer = lz77_decompress_buffer(lz77_in, rows * cols * channels)
  208.     t_end = time.time()
  209.  
  210.     image = np.reshape(output_buffer, (rows, cols, channels))
  211.  
  212.     cv2.imwrite("output.png", image)
  213.  
  214.     print("Decompress: %s" % (t_end - t_start))
  215.  
  216. # ============================================================================
  217.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement