對于大對象的存取,s3提供了分段上傳/下載的接口础淤,基于此资铡,可以進(jìn)一步實現(xiàn)多線程并行傳輸或者斷點(diǎn)續(xù)傳等功能。
本實現(xiàn)使用了亞馬遜的boto庫
https://pypi.python.org/pypi/boto
以及filechunkio庫
https://pypi.python.org/pypi/filechunkio/
1.分段上傳
為了分段上傳一個大文件边篮,需要先將文件分段己莺,然后使用云盤提供的Multipart接口上傳每個分段即可,最后云盤將在后端把所有分段合并成一個Object戈轿。
下面的例子中使用了FileChunkIO分段讀取文件:
chunksize=4096*1024
chunkcnt=int(math.ceil(filesize*1.0/chunksize))
mp=bucket.initiate_multipart_upload("object-1") #創(chuàng)建Multipart對象
for i in range(0,chunkcnt):
offset=chunksize*i
len=min(chunksize,filesize-offset)
fp=FileChunkIO(“/path/to/file”,'r',offset=offset,bytes=len) #創(chuàng)建文件的分段
mp.upload_part_from_file(fp,part_num=i+1) #上傳每個分段
mp.complete_upload()
完成分段上傳之后凌受,需要使用Multipart的complete_upload()或者cancel_upload()結(jié)束分段上傳,釋放Multipart占用的資源思杯。
2.分段下載
為了使用分段下載胜蛉,需要指定分段在文件中的起始偏移地址和終止偏移地址,然后構(gòu)造包含Range報文頭的HTTP Get請求下載相應(yīng)的分段色乾。
示例如下:
chunksize=4096*1024
chunkcnt=int(math.ceil(filesize*1.0/chunksize))
for i in range(0,chunkcnt):
offset=chunksize*i
len=min(chunksize,filesize-offset)
resp=conn.make_request("GET",bucket.name,filename,headers={"Range":"bytes=%d-%d" % (offset,offset+len)})
data=resp.read(len)
if data == "":
break
fp.write(data)
3.多線程的完整實現(xiàn)
import shutil
import math
import string
import io
from io import BytesIO
import os
from os import path
import sys
import traceback
import boto
import boto.s3.connection
from filechunkio import FileChunkIO
import threading
import Queue
import time
class Chunk:
num = 0
offset = 0
len = 0
def __init__(self, n, o, l):
self.num = n
self.offset = o
self.len = l
chunksize = 8 << 20
def init_queue(filesize):
chunkcnt = int(math.ceil(filesize*1.0/chunksize))
q = Queue.Queue(maxsize = chunkcnt)
for i in range(0,chunkcnt):
offset = chunksize*i
len = min(chunksize, filesize-offset)
c = Chunk(i+1, offset, len)
q.put(c)
return q
def upload_chunk(filepath, mp, q, id):
while (not q.empty()):
chunk = q.get()
fp = FileChunkIO(filepath, 'r', offset=chunk.offset, bytes=chunk.len)
mp.upload_part_from_file(fp, part_num=chunk.num)
fp.close()
q.task_done()
def upload_file_multipart(filepath, keyname, bucket, threadcnt=8):
filesize = os.stat(filepath).st_size
mp = bucket.initiate_multipart_upload(keyname)
q = init_queue(filesize)
for i in range(0, threadcnt):
t = threading.Thread(target=upload_chunk, args=(filepath, mp, q, i))
t.setDaemon(True)
t.start()
q.join()
mp.complete_upload()
def download_chunk(filepath, bucket, key, q, id):
while (not q.empty()):
chunk = q.get()
offset = chunk.offset
len = chunk.len
resp = bucket.connection.make_request("GET", bucket.name, key.name, headers={"Range":"bytes=%d-%d" % (offset, offset+len)})
data = resp.read(len)
fp = FileChunkIO(filepath, 'r+', offset=offset, bytes=len)
fp.write(data)
fp.close()
q.task_done()
def download_file_multipart(key, bucket, filepath, threadcnt=8):
if type(key) == str:
key=bucket.get_key(key)
filesize=key.size
if os.path.exists(filepath):
os.remove(filepath)
os.mknod(filepath)
q = init_queue(filesize)
for i in range(0, threadcnt):
t = threading.Thread(target=download_chunk, args=(filepath, bucket, key, q, i))
t.setDaemon(True)
t.start()
q.join()
access_key = "test"
secret_key = "123456"
host = "*****"
filepath = "/search/2G.file"
keyname = "2G.file"
threadcnt = 8
conn = boto.connect_s3(
aws_access_key_id = access_key,
aws_secret_access_key = secret_key,
host = host,
is_secure=False,
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
)
bucket = conn.get_bucket("test")
time1= time.time()
upload_file_multipart(filepath, keyname, bucket, threadcnt)
time2= time.time()
print "upload %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)
key = bucket.get_key(keyname)
download_filepath = path.join(".", keyname)
time1= time.time()
download_file_multipart(key, bucket, download_filepath, threadcnt)
time2= time.time()
print "download %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)