|
| 1 | +""" |
| 2 | + One of the several implementations of Lempel–Ziv–Welch decompression algorithm |
| 3 | + https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch |
| 4 | +""" |
| 5 | + |
| 6 | +import math |
| 7 | +import sys |
| 8 | + |
| 9 | + |
| 10 | +def read_file_binary(file_path: str) -> str: |
| 11 | + """ |
| 12 | + Reads given file as bytes and returns them as a long string |
| 13 | + """ |
| 14 | + result = "" |
| 15 | + try: |
| 16 | + with open(file_path, "rb") as binary_file: |
| 17 | + data = binary_file.read() |
| 18 | + for dat in data: |
| 19 | + curr_byte = f"{dat:08b}" |
| 20 | + result += curr_byte |
| 21 | + return result |
| 22 | + except OSError: |
| 23 | + print("File not accessible") |
| 24 | + sys.exit() |
| 25 | + |
| 26 | + |
| 27 | +def decompress_data(data_bits: str) -> str: |
| 28 | + """ |
| 29 | + Decompresses given data_bits using Lempel–Ziv–Welch compression algorithm |
| 30 | + and returns the result as a string |
| 31 | + """ |
| 32 | + lexicon = {"0": "0", "1": "1"} |
| 33 | + result, curr_string = "", "" |
| 34 | + index = len(lexicon) |
| 35 | + |
| 36 | + for i in range(len(data_bits)): |
| 37 | + curr_string += data_bits[i] |
| 38 | + if curr_string not in lexicon: |
| 39 | + continue |
| 40 | + |
| 41 | + last_match_id = lexicon[curr_string] |
| 42 | + result += last_match_id |
| 43 | + lexicon[curr_string] = last_match_id + "0" |
| 44 | + |
| 45 | + if math.log2(index).is_integer(): |
| 46 | + newLex = {} |
| 47 | + for curr_key in list(lexicon): |
| 48 | + newLex["0" + curr_key] = lexicon.pop(curr_key) |
| 49 | + lexicon = newLex |
| 50 | + |
| 51 | + lexicon[bin(index)[2:]] = last_match_id + "1" |
| 52 | + index += 1 |
| 53 | + curr_string = "" |
| 54 | + return result |
| 55 | + |
| 56 | + |
| 57 | +def write_file_binary(file_path: str, to_write: str) -> None: |
| 58 | + """ |
| 59 | + Writes given to_write string (should only consist of 0's and 1's) as bytes in the |
| 60 | + file |
| 61 | + """ |
| 62 | + byte_length = 8 |
| 63 | + try: |
| 64 | + with open(file_path, "wb") as opened_file: |
| 65 | + result_byte_array = [ |
| 66 | + to_write[i : i + byte_length] |
| 67 | + for i in range(0, len(to_write), byte_length) |
| 68 | + ] |
| 69 | + |
| 70 | + if len(result_byte_array[-1]) % byte_length == 0: |
| 71 | + result_byte_array.append("10000000") |
| 72 | + else: |
| 73 | + result_byte_array[-1] += "1" + "0" * ( |
| 74 | + byte_length - len(result_byte_array[-1]) - 1 |
| 75 | + ) |
| 76 | + |
| 77 | + for elem in result_byte_array[:-1]: |
| 78 | + opened_file.write(int(elem, 2).to_bytes(1, byteorder="big")) |
| 79 | + except OSError: |
| 80 | + print("File not accessible") |
| 81 | + sys.exit() |
| 82 | + |
| 83 | + |
| 84 | +def remove_prefix(data_bits: str) -> str: |
| 85 | + """ |
| 86 | + Removes size prefix, that compressed file should have |
| 87 | + Returns the result |
| 88 | + """ |
| 89 | + counter = 0 |
| 90 | + for letter in data_bits: |
| 91 | + if letter == "1": |
| 92 | + break |
| 93 | + counter += 1 |
| 94 | + |
| 95 | + data_bits = data_bits[counter:] |
| 96 | + data_bits = data_bits[counter + 1 :] |
| 97 | + return data_bits |
| 98 | + |
| 99 | + |
| 100 | +def compress(source_path: str, destination_path: str) -> None: |
| 101 | + """ |
| 102 | + Reads source file, decompresses it and writes the result in destination file |
| 103 | + """ |
| 104 | + data_bits = read_file_binary(source_path) |
| 105 | + data_bits = remove_prefix(data_bits) |
| 106 | + decompressed = decompress_data(data_bits) |
| 107 | + write_file_binary(destination_path, decompressed) |
| 108 | + |
| 109 | + |
| 110 | +if __name__ == "__main__": |
| 111 | + compress(sys.argv[1], sys.argv[2]) |
0 commit comments