huffman編碼簡介
參考 這篇文章
python實現(xiàn)
- 開頭
# -*- coding: utf-8 -*-
import heapq
import collections
import json
import io
from BitVector import BitVector # 網(wǎng)上的bitbector實現(xiàn)
import hashlib
import sys
import time
import base64
- 二叉樹類
class TreeNode(object):
def __init__(self, key=None, val=None, code=None):
self.key, self.val, self.code = key, val, code
self.left = None
self.right = None
def __cmp__(self, other):
return cmp(self.val, other.val)
def __repr__(self):
return 'key=%s\nval=%s\ncode=%s' % (self.key, self.val, self.code)
- 定義裝飾器用于計時
def caltime(func):
def wrapper(*args, **kwargs):
st = time.clock()
res = func(*args, **kwargs)
print '%s take %.3f s...' % (func.__name__, time.clock()-st)
return res
return wrapper
- 定義huffman類
class Huffman(object):
def __init__(self, raw_name, action='zip'):
self.raw_name = raw_name
if action=='zip':
with open(self.raw_name, 'rb') as f:
self.data = f.read()
self.save_name = raw_name + '.hfm'
elif action=='unzip':
if not raw_name.endswith('.hfm'):
raise IOError('file name must be ends with .hfm')
self.save_name = raw_name.rsplit('.', 1)[0]
self.root_node = None
self.encode_dict = None
self.tree_str = None
@caltime
def _build_heap(self):
count = collections.Counter(self.data)
count_list = count.items()
return [TreeNode(key=ord(x[0]), val=x[1]) for x in count_list]
@caltime
def _build_huffman_tree(self):
heap = self._build_heap()
while heap:
left_node = heapq.heappop(heap)
right_node = heapq.heappop(heap)
parent_node = TreeNode(val=left_node.val+right_node.val)
parent_node.left = left_node
parent_node.right = right_node
if heap:
heapq.heappush(heap, parent_node)
else:
return parent_node
# =========================================================== #
# 進行huffman編碼耳奕,返回編碼字典
# =========================================================== #
@caltime
def _huffman_encode(self):
def huffman_encode_helper(nd, cur_code=''):
encode_dict = {}
if nd.left is None and nd.right is None:
nd.code = cur_code
encode_dict.update({nd.key: nd.code})
if nd.left:
encode_dict.update(huffman_encode_helper(nd.left, cur_code+'0'))
if nd.right:
encode_dict.update(huffman_encode_helper(nd.right, cur_code+'1'))
return encode_dict
self.root_node = self._build_huffman_tree()
self.encode_dict = huffman_encode_helper(self.root_node)
self.tree_str = self._serialize_huffman_tree()
# =========================================================== #
# huffman樹序列化
# =========================================================== #
@caltime
def _serialize_huffman_tree(self):
def pre_order(nd):
if nd is None:
return ['#']
output = [[nd.key, nd.code]] if nd.key is not None else [None]
output += pre_order(nd.left)
output += pre_order(nd.right)
return output
return json.dumps(pre_order(self.root_node))
# =========================================================== #
# 按照碼本進行字符壓縮
# =========================================================== #
@caltime
def huffman_zip(self):
print 'start huffman encoding...'
self._huffman_encode()
new_data = ''.join([self.encode_dict[ord(x)] for x in self.data])
bit_sz = len(new_data)
pad_sz = 8 - bit_sz%8
new_data += '0'*pad_sz
# bit_data = BitVector(bitstring=new_data) # 使用BitVector很慢
byte_data = []
for ii in range(0, len(new_data), 8):
bit = 0
for j in range(8):
bit = (bit<<1) + int(new_data[ii+j])
byte_data.append(chr(bit))
print 'save to %s...' % self.save_name
with open(self.save_name, 'wb') as fp:
# 把序列化的huffman樹寫在前面
fp.write('%s-%s-%s-' % (len(self.tree_str), self.tree_str, bit_sz))
# bit_data.write_to_file(fp)
fp.writelines(byte_data)
# =========================================================== #
# huffman樹反序列化蔗崎,重建樹
# =========================================================== #
@caltime
def _deserialize_huffman_tree(self):
def pre_order(node_list):
nd_data = node_list.pop(0)
if nd_data=='#':
return None
nd = TreeNode(key=chr(nd_data[0]), code=nd_data[1]) if isinstance(nd_data, list) else TreeNode()
nd.left = pre_order(node_list)
nd.right = pre_order(node_list)
return nd
return pre_order(json.loads(self.tree_str))
# =========================================================== #
# huffman解壓縮
# =========================================================== #
@caltime
def huffman_unzip(self):
with open(self.raw_name, 'rb') as fp:
n_tree = ''
while 1:
c = fp.read(1)
if c!='-':
n_tree += c
else:
break
## 重建huffman樹
print 'start huffman decoding...'
self.tree_str = fp.read(int(n_tree))
self.root_node = self._deserialize_huffman_tree()
fp.read(1)
n_data = ''
while 1:
c = fp.read(1)
if c!='-':
n_data += c
else:
break
n_data = int(n_data)
byte_data = fp.read()
## 保存到文件
print 'save to %s...' % self.save_name
nd = self.root_node
with open(self.save_name, 'wb') as fp:
ii = 0
data = []
while ii<=n_data:
if nd.left is None and nd.right is None:
data.append(nd.key)
nd = self.root_node
else:
bit = ord(byte_data[ii/8]) & (128>>ii%8)
nd = nd.left if bit==0 else nd.right
ii += 1
fp.writelines(data)
總結(jié)
- huffman壓縮
- 詞頻統(tǒng)計
統(tǒng)計每個字符(8位)出現(xiàn)的頻率,針對中文編碼涕滋,每個字節(jié)可用
0x??
表示
- 構(gòu)建二叉樹
每次取出詞頻最低的2個字符進行構(gòu)建(利用堆),并將其父節(jié)點加入詞庫
- 進行編碼
自上而下左0右1給每個葉子節(jié)點賦值贿讹,得到編碼字典
字符->二進制序列
- 壓縮
將源文件所有字符按編碼字典轉(zhuǎn)換為二進制序列后(需補齊位數(shù)為8的倍數(shù))
寫入新文件
我在文件頭寫入了二叉樹(編碼字典)信息长窄,供解壓縮使用
- 運行結(jié)果
start huffman encoding...
_build_heap take 0.079 s...
_build_huffman_tree take 0.116 s...
_serialize_huffman_tree take 0.001 s...
_huffman_encode take 0.123 s...
save to xxx.pdf.hfm...
huffman_zip take 2.759 s...
-
看一下生成的壓縮文件
.hfm文件
- huffman解碼
- 重建二叉樹
從壓縮文件頭讀入二叉樹信息,反序列化后構(gòu)建二叉樹
- 解碼
從文件讀入字節(jié)流医吊,按位操作,從根節(jié)點開始逮京,0左1右卿堂,直到葉子節(jié)點,
復(fù)原出原字符懒棉,寫入新文件
猜想使用2個線程一個讀一個寫會不會更快
- 運行結(jié)果
start huffman decoding...
_deserialize_huffman_tree take 0.003 s...
save to xxx.pdf...
huffman_unzip take 1.030 s...