對于大對象的存取或油,s3提供了分段上傳/下載的接口郭脂,基于此零渐,可以進一步實現(xiàn)多線程并行傳輸或者斷點續(xù)傳等功能娄涩。
本實現(xiàn)使用了亞馬遜的boto庫
https://pypi.python.org/pypi/boto
以及filechunkio庫
https://pypi.python.org/pypi/filechunkio/
from boto.s3.connection import S3Connection,OrdinaryCallingFormat,SubdomainCallingFormat
import ssl
import boto.s3.key
import os
import json
import random
import fire
import math
from filechunkio import FileChunkIO
class S3(object):
? ? def __init__(self,host,access_key_id,secret_access_key,port=443,is_secure=True):
? ? ? ? if is_secure:
? ? ? ? ? ? ssl._create_default_https_context =ssl._create_unverified_context
? ? ? ? self._secure=is_secure
? ? ? ? self.host=host
? ? ? ? self.access_key_id=access_key_id
? ? ? ? self.secret_access_key=secret_access_key
? ? ? ? self.port=port
? ? ? ? self._connection=False
? ? ? ? self._client=None
? ? # def __del__(self):
? ? #? ? self._client.closed()
? ? def connect(self,aws=False):
? ? ? ? try:
? ? ? ? ? ? if aws:
? ? ? ? ? ? ? ? calling_format=SubdomainCallingFormat()
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? calling_format=OrdinaryCallingFormat()
? ? ? ? ? ? conn = S3Connection(
? ? ? ? ? ? ? ? host=self.host,
? ? ? ? ? ? ? ? aws_access_key_id=self.access_key_id,
? ? ? ? ? ? ? ? aws_secret_access_key=self.secret_access_key,
? ? ? ? ? ? ? ? port=self.port,
? ? ? ? ? ? ? ? debug=1,
? ? ? ? ? ? ? ? is_secure=self._secure,
? ? ? ? ? ? ? ? calling_format=calling_format
? ? ? ? ? ? )
? ? ? ? ? ? self._connection=True
? ? ? ? ? ? self._client=conn
? ? ? ? except Exception as e:
? ? ? ? ? ? self._connection=False
? ? ? ? ? ? print(e)
? ? def create_bucket(self,bucket):
? ? ? ? try:
? ? ? ? ? ? bucket=self._client.create_bucket(bucket)
? ? ? ? except Exception as e:
? ? ? ? ? ? print(e)
? ? ? ? ? ? print(bucket)
? ? def delete_bucket(self,bucket):
? ? ? ? try:
? ? ? ? ? ? bucket=self._client.get_bucket(bucket)
? ? ? ? ? ? bucket.delete()
? ? ? ? except Exception as e:
? ? ? ? ? ? print(e)
? ? ? ? ? ? print(bucket)
? ? def set_acl(self):
? ? ? ? pass
? ? def delete_bucket_force(self,bucket):
? ? ? ? try:
? ? ? ? ? ? bucket=self._client.get_bucket(bucket)
? ? ? ? ? ? for obj in bucket.list():
? ? ? ? ? ? ? ? obj.delete()
? ? ? ? ? ? bucket.delete()
? ? ? ? except Exception as e:
? ? ? ? ? ? print(e)
? ? ? ? ? ? print(bucket)
? ? def upload_file(self,bucket,file_path,path=''):
? ? ? ? try:
? ? ? ? ? ? bucket=self._client.get_bucket(bucket)
? ? ? ? ? ? key=bucket.new_key(path+os.path.basename(file_path))
? ? ? ? ? ? key.set_contents_from_filename(file_path)
? ? ? ? ? ? return True
? ? ? ? except Exception as e:
? ? ? ? ? ? print(e)
? ? ? ? ? ? return False
? ? def chunk_upload_file(self,bucket,file_path):
? ? ? ? try:
? ? ? ? ? ? source_path = file_path
? ? ? ? ? ? source_size = os.stat(file_path).st_size
? ? ? ? ? ? chunk_size = 52428800
? ? ? ? ? ? chunk_count = int(math.ceil(source_size / float(chunk_size)))
? ? ? ? ? ? bucket=self._client.get_bucket(bucket)
? ? ? ? ? ? mp=bucket.initiate_multipart_upload(os.path.basename(file_path))
? ? ? ? ? ? for i in range(chunk_count):
? ? ? ? ? ? ? ? offset = chunk_size * i
? ? ? ? ? ? ? ? bytes_ = min(chunk_size, source_size - offset)
? ? ? ? ? ? ? ? with FileChunkIO(source_path, 'r', offset=offset,bytes=bytes_) as fp:
? ? ? ? ? ? ? ? ? ? mp.upload_part_from_file(fp, part_num=i + 1)
? ? ? ? ? ? mp.complete_upload()
? ? ? ? ? ? return True
? ? ? ? except Exception as e:
? ? ? ? ? ? print(e)
? ? ? ? ? ? return False
if __name__ == "__main__":
? ? s3_obj=S3("host","id","key",443,True)
? ? s3_obj.connect()
? ? fire.Fire({
? ? ? ? "upload":s3_obj.upload_file,
? ? ? ? "mp_upload":s3_obj.chunk_upload_file,
? ? ? ? "create_bucket":s3_obj.create_bucket
? ? })