過(guò)濾刪選(解析比f(wàn)ilter快)
d = [-1, 3, 2, 1, 0, 10]
r = [x for x in d if d > 3] ==> [10]
dd = {'a': 1, 'b': -1}
rr = {k: v for k, v in dd.items() if v > 0} ==> {'a': 1}
集合對(duì)象彻采,減少[0][1]...的使用傀顾,提高代碼的可維護(hù)性
method-1:
index_name_1, index_name_2 ... = xrange(2)
s = ('a', 'b' ...)
s[index_name_1] ==> 'a'
s[index_name_2] ==> 'b'
method-2:
from collections import namdtuple
s = namdtuple('tuple_name', ['index_name_1', 'index_name_2'...])
obj = s('a', 'b' ... )
obj.index_name_1 ==> 'a'
obj.index_name_2 ==> 'b'
對(duì)一定結(jié)構(gòu)數(shù)據(jù)的計(jì)數(shù)統(tǒng)計(jì)
from collections import Counter
a = [1, 2, 3, 3, 4, 1]
c = Counter(a)
a[1] ==> 2
a[2] ==> 1
a[3] ==> 2
a[4] ==> 4
c.most_common(1) ==> [(4, 4)]
Counter方法詳細(xì)介紹:http://www.pythoner.com/205.html
dict中按照value值排序
method-1:
a = {'a': 2, 'b': 1, 'c': 5}
sorted(zip(a.itervalues(), a.iterkeys())) ==> [(1, 'b'), (2, 'a'), (5, ')]
method-2:
a = {'a': 2, 'b': 1, 'c': 5}
sorted(a.iteritems(), key=lambda x: x[1]) ==> [('b', 1), ('a', 2), ('c', 5)]
如何快速找到多個(gè)字典中的公共鍵(key)
method-1:(只是試用有限個(gè)數(shù))
s1 = {'a': 1, 'b'充岛;2}
s2 = {'b': 1, 'c': 1}
s3 = {'a': 2, 'b': 3, 'd': 1}
s1.viewkeys() & s2.viewkeys() & s3.viewkeys() ==> {'b'}
method-2:
reduce(lambda a, b: a & b, map(dict.viewkeys, [s1, s2, s3])) ==> {'b'}
如何讓字典保持有序(collections.OrderedDict)
from time import time
from random import randint
from collections import OrderedDict
d = OrderedDict()
players = list('abcdefgh')
start = time()
for i in xrange(7):
raw_input()
p = players.pop(randint(0, 7 - i))
end = time()
print i + 1, p, end - start
d[p] = (i + 1, end - start)
for y in d:
print y, d[y]
==> p (1, time)
p (2, time)
p (3, time)
p (4, time)
p (5, time)
p (6, time)
p (7, time)
記錄歷史記錄肪笋,collections.deque和pickle的使用
from random import randint
from collections import deque
import pickle
N = randint(1, 100)
try:
history = pickle.load(open('history.txt'))
except:
history = deque([], 5)
def guess(k):
if k == N:
print 'right'
return True
if k < N:
print '%s is less then N' % k
else:
print '%s is greater then N' % k
return False
while True:
line = raw_input('plase input a number:')
if line.isdigit():
k = int(line)
history.append(k)
pickle.dump(history, open('history.txt', 'w'))
if guess(k):isalnum
break
elif line == 'history' or line == 'h?':
print list(history)
注:deque列表對(duì)象犬庇,有兩個(gè)參數(shù)钳幅,初始隊(duì)列和最大長(zhǎng)度付鹿,并有append和appendleft方法澜汤,分別是從后插入和從前插入
pickle的作用是將python對(duì)象存入文件,用法與json的使用的方法很相似舵匾,有dump和load方法等
使用isdigit判斷字符串是否可以轉(zhuǎn)為整型俊抵,另外還有很多字符串內(nèi)置的判斷函數(shù)
構(gòu)造可迭代對(duì)象,collections.Iterable和collections.Iterator的使用
from collections import Iterable, Iterator
class Demo1(Iterator):
def __init__(self, cities):
self.cities = cities
self.index = 0
def next(self):
if self.index == len(self.cities):
raise StopIteration
city = self.cities[self.index]
self.index += 1
return city
class Demo2(Iterable):
def __init__(self, cities):
self.cities = cities
def __iter__(self):
return Demo1(self.cities)
a = Demo2(['a','b', 'c'])
for each in a:
print each
注:在構(gòu)造可迭代對(duì)象時(shí)必須使用raise StopIteration來(lái)結(jié)束迭代
使用yield生成器實(shí)現(xiàn)可迭代對(duì)象
class PrimeNumber:
def __init__(self, start, end):
self.start = start
self.end = end
def isPrimeNum(self, k):
if k < 2:
return False
for i in xrange(2, k):
if k % i == 0:
return False
return True
def __iter__(self):
for k in xrange(self.start, self.end + 1):
if self.isPrimeNum(k):
yield k
for x in PrimeNumber(1, 100):
print x
注:關(guān)注yield的使用
反向迭代(reversed)
class Demo:
def __init__(self, start, end, step=0.1):
self.start = start
self.end = end
self.step = step
def __iter__(self):
t = self.start
while t <= self.end:
yield t
t += self.step
def __reversed__(self):
t = self.end
while t >= self.start:
yield t
t -= self.step
a = Demo(1.0, 2.0, 0.1)
for x in reversed(a):
print x
注:reversed獲得的是反向迭代器坐梯,iter獲得的是正向迭代器
文件切片操作徽诲,itertools.islice常用與大文件的切片操作
from itertools import islice
a = range(10)
t = iter(a)
print 'islice start'
for x in islice(t, 2, 4):
print x
print 'islice end'
for x in t:
print x
注:使用islice后的迭代對(duì)象會(huì)被消耗掉在islice切片的數(shù)據(jù),需要重新生成迭代對(duì)象
在for中同時(shí)迭代多個(gè)對(duì)象
# 計(jì)算每人三課的總分(并行問(wèn)題吵血,使用zip):
from random import randint
chinese = [randint(60, 100) for _ in xrange(40)]
math = [randint(60, 100) for _ in xrange(40)]
english = [randint(60, 100) for _ in xrange(40)]
total = []
for a, b, c in zip(chinese, math, english):
total.append(a + b + c)
# 統(tǒng)計(jì)幾個(gè)班英語(yǔ)成績(jī)都高于90的(串行問(wèn)題谎替,使用itertools.chain):
chain([1, 2, 3], [4, 5, 6]) ==> 生成1,2,3,4,5,6的迭代對(duì)象
e1 = [randint(60, 100) for _ in xrange(40)]
e2 = [randint(60, 100) for _ in xrange(42)]
e3 = [randint(60, 100) for _ in xrange(39)]
e4 = [randint(60, 100) for _ in xrange(43)]
count = 0
for _ in chain(e1, e2, e3, e4):
if _ > 90:
count += 1
拆分含有多個(gè)分隔符的字符串
method-1:
def mySplit(s, ds):
res = [s]
for d in ds:
t = []
map(lambda x: t.extend(x.split(d)), res)
res = t
return [_ for _ in res if _]
s = 'ab;cd|efg|hi,,jkl|mn\topq;rst,uvw\txyz'
print mySplit(s, ';,|\t')
==> ['ab', 'cd', 'efg', 'hi', 'jkl', 'mn', 'opq', 'rst', 'uvw', 'xyz']
method-2(推薦使用):
import re
s = 'ab;cd|efg|hi,,jkl|mn\topq;rst,uvw\txyz''
re.split(r'[;,|\t]+', s)
==> ['ab', 'cd', 'efg', 'hi', 'jkl', 'mn', 'opq', 'rst', 'uvw', 'xyz']
注:?jiǎn)蝹€(gè)切割直接使用split比較快
判斷字符串是否以某個(gè)字符串開(kāi)頭或結(jié)尾(使用startswith和endswith)
a = 'a.py'
a.startswith('a') ==> True
a.endswith('.py') ==> True
注:startswith和endswith只能接受一個(gè)元祖不能使用列表
字符替換(使用re.sub)
import re
a = '2017-11-01'
re.sub(r'(\d{4})-(\d{2})-(\d{2})', r'\2/\3/\1', a)
==> '11/01/2017'
re.sub('(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', a)
==> '11/01/2017'
注:使用正則表達(dá)式時(shí)最好使用'r'防止字符串的轉(zhuǎn)義
拼接字符串(使用str.join)
a = ['a', 'b', 'c', 'd']
'',join(a) ==> 'abcd'
'+'.join(a) ==> 'a+b+c+d'
a = ['abc', 123, 'xyz']
''.join([str(_) for _ in a]) ==> 'abc123xyz'
''.join((str(_) for _ in a)) ==> 'abc123xyz'
注:推薦使用第二種方式,使用生成器方式減少內(nèi)存占用
對(duì)字符串左右居中對(duì)齊
method-1:
s = 'abc'
s.ljust(10) ==> 'abc '
s.rjust(10) ==> ' abc'
s.center(10) ==> ' abc '
s.ljust(10, '=') ==> 'abc======='
method-2:
s = 'abc'
format(s, '<20') ==> 'abc '
format(s, '>20') ==> ' abc'
format(s, '^20') ==> ' abc '
去掉字符串中不需要的字符
method-1(刪除首尾蹋辅,使用strip,lstrip,rstrip):
s = ' abc 123 '
s.strip() ==> 'abc 123'
s.lstrip() ==> 'abc 123 '
s.rstrip() ==> ' abc 123'
s = '--abc--123--'
s.strip() ==> 'abc--123'
s.lstrip() ==> 'abc--123--'
s.rstrip() ==> '--abc--123'
s = '--abc++'
s.strip('-+') ==> 'abc'
method-2(刪除中間的字符钱贯,使用切片):
s = 'abc+123'
s[:3] + s[4:] ==> 'abc123'
method-3(刪除或替換任何位置的字符,使用replace,re.sub):
s = '\tabc\t123\txyz'
s.replace('\t', '') ==> 'abc123xyz'
s = '\tabc\t123\txyz\ropq\r'
import re
re.sub('[\t\r]', '', s) ==> 'abc123xyzopq'
注:replace只能替換一種字符侦另,re.sub可以使用正則表達(dá)式替換多種
method-4(刪除任何位置的字符秩命,使用translate):
s = 'abc\t123\nref\r'
s.translate(None, '\r\t\n') ==> 'abc123ref'
文件讀寫
python2:
s = u'你好'
with open('test.txt', 'w') as f:
f.write(s.encode('utf-8'))
with open('test.txt', 'r') as f:
s = f.read().decode('utf-8')
python3:
s = '你好'
# 't'指的是一種文本模式
with open('demo.txt', 'wt', encoding='utf-8') as f:
f.write(s)
with open('demo.txt', 'rt', encoding='utf-8') as f:
s = f.read()
注:
Python2和Python3字符對(duì)應(yīng)關(guān)系
python2 ? python3
str ???? ==> bytes
unicode ? ==> str
一般都會(huì)將字節(jié)寫入物理文件
文件讀寫緩沖
# 全緩沖
f = open('demo.txt', 'w', buffering=2048)
# 行緩沖
f = open('demo.txt', 'w', buffering=1)
...
# 在寫入'\n'之前都是寫入緩沖,在寫入'\n'后會(huì)將緩沖中的內(nèi)容寫入磁盤
f.write('\n')
# 無(wú)緩沖
f = open('demo.txt', 'w', buffering=0)
# 執(zhí)行write即寫入磁盤褒傅,不做緩沖
注:open默認(rèn)是全緩沖弃锐,大小是4096
文件狀態(tài)訪問(wèn)
系統(tǒng)調(diào)用:os.stat,os.fstat,o.lstat獲取文件狀態(tài)
快捷函數(shù):os.path下的函數(shù),使用起來(lái)更加簡(jiǎn)潔
import os
os.path.isdir 判斷是否是文件夾
os.path.islink 判斷是否是連接文件
os.path.isfile 判斷是否是普通文件
os.path.isabs 判斷是否是絕對(duì)路徑
...
如何使用臨時(shí)文件(使用tempfile中的TemporaryFile和NamedTemporaryFile)
# 獲得臨時(shí)文件對(duì)象
f = TemporaryFile()
# 將數(shù)據(jù)寫入磁盤
f.write('abc' * 4096)
# 先將文件指針指向開(kāi)頭后才能將數(shù)據(jù)都會(huì)到內(nèi)存中
f.seek(0)
# 根據(jù)具體需求量讀取數(shù)據(jù)
f.read(100)
# delete自動(dòng)清除參數(shù)默認(rèn)是True殿托,如果需要保留可以設(shè)置FALSE
ntf = NamedTemporaryFile()
# NamedTemporaryFile創(chuàng)建的臨時(shí)文件路徑
ntf.name
注:在系統(tǒng)文件中找不到TemporaryFile創(chuàng)建的臨時(shí)文件霹菊,使用NamdTemporaryFile創(chuàng)建的臨時(shí)文件可以在系統(tǒng)中找到TemporaryFile創(chuàng)建的只能在本進(jìn)程中訪問(wèn)NamedTemporaryFile創(chuàng)建的可以多進(jìn)程進(jìn)行訪問(wèn)
csv讀寫(使用csv模塊)
import csv
rf = open('demo.csv', 'rb')
# reader 是一個(gè)可迭代對(duì)象
reader = csv.reader(rf)
# 打印每一行
for _ in reader:
# 每一行都是一個(gè)列表
print _
wf = open('demo.csv', 'wb')
writer = csv.writer(wf)
# 寫入的數(shù)據(jù)是一個(gè)列表
writer.writerow(list_obj)
# 寫入的數(shù)據(jù)是由多個(gè)元組組成的列表
writer.writerows([tuple_obj, tuple_obj...])
注:一定要使用二進(jìn)制文件
json文件格式的處理(使用json庫(kù))
import json
a = {'a': None, 'b': 1}
json.dumps(a) ==> '{"a": null, "b": 1}'
# 可以去除多余的空格,另外也可以自己定制
json.dumps(a, separators=[',', ':']) ==> '{"a":null,"b":1}'
a = '{"a": null, "b": 1}'
json.loads(a) ==> {'a': None, 'b': 1}
with open('demo.json', 'wb') as f:
json.dump(a, f)
with open('demo.json', 'rb') as f:
a = json.load(f)
注:json.dump和json.load作用于文件
excel文件處理(使用第三方庫(kù)xlrd,xlwt)
pip install xlrd xlwt
book = xlrd.open_workbook('demo.xlsx')
# 所有sheet的列表
book.sheets()
# 根據(jù)索引獲得單個(gè)sheet對(duì)象
sheet = book.sheet_by_index(0)
# 根據(jù)名字獲得單個(gè)sheet對(duì)象
sheet = book.sheet_by_name(name)
如何派生內(nèi)置不可變類型并修改實(shí)例化行為(使用真正的構(gòu)造函數(shù)new)
class IntTuple(tuple):
def __new__(cls, iterable):
g = (_ for _ in iterable if isinstance(_, int) and _ > 0)
return super(IntTuple, cls).__new__(cls, g)
def __init__(self, iterable):
print self ==> (1, 6)
super(IntTuple, self).__init__(iterable)
t = IntTuple([1, -2, 'abc', 6, ['a', 'b'], 0])
print t ==> (1, 6)
注:new是類真正的構(gòu)造函數(shù)支竹,先于init被調(diào)用旋廷,self是由new構(gòu)造獲得
如何為創(chuàng)建大量實(shí)例節(jié)省內(nèi)存(使用slots屬性來(lái)聲明實(shí)例屬性列表)
class Player1(object):
def __init__(self, uid, name, status=0, level=1):
self.uid = uid
self.name = name
self.status = status
self.level = level
class Player2(object):
__slots__ = ['uid', 'name', 'status', 'kevek']
def __init__(self, uid, name, status=0, level=1):
self.uid = uid
self.name = name
self.status = status
self.level = level
p1 = Player1('001', 'p1')
p1.x = 11
p2 = Player2('002', 'p2')
p2.x == 11 ==> 報(bào)錯(cuò)鸠按,因?yàn)闆](méi)有__dict__屬性做數(shù)據(jù)的動(dòng)態(tài)綁定
注:使用slots關(guān)閉默認(rèn)的動(dòng)態(tài)屬性綁定dict,減少內(nèi)存也可以用來(lái)阻值類的數(shù)據(jù)動(dòng)態(tài)綁定
如何讓對(duì)象支持上下文管理(使用enter和exit)
class Demo(object):
def start(self):
"""
with打開(kāi)后執(zhí)行的方法
"""
# 主要的操作內(nèi)容柳洋,例如輸入密碼登錄等等操作
def __enter__(self):
"""
with之前的一些準(zhǔn)備工作
必須要有返回值待诅,作用給with調(diào)用后as的對(duì)象
"""
# 準(zhǔn)備工作,例如打開(kāi)連接
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
必要參數(shù)熊镣,在沒(méi)有異常時(shí),以下參數(shù)的值都是None
exc_type:異常類型
exc_val:異常的值
exc_tb:錯(cuò)誤路徑
"""
# 結(jié)束工作募书,例如關(guān)閉連接
# 設(shè)置返回值為True绪囱,如果有異常也不會(huì)拋出
# return True
with Demo() as d:
d.start()
注:enter在with之前調(diào)用,exit在with之后調(diào)用遇到異常也會(huì)執(zhí)行exit內(nèi)的方法
如何創(chuàng)建可管理的對(duì)象屬性(使用property)
class Demo(object):
def __init__(self):
self.show = 0
def getShow(self):
return self.show
def setShow(self, num):
self.show = num
SHOW = property(getShow, setShow)
d = Demo()
print d.SHOW ==> 0
d.SHOW = 2
print d.show ==> 2
注:property的第一個(gè)參數(shù)是get操作莹捡,第二參數(shù)是set操作鬼吵,第三個(gè)屬性是del操作需要補(bǔ)充@property的使用方法
類對(duì)象之間的<,<=,>,>=,==,!=
# method-1(使用__lt__,__le__,__gt__,__eq__,__ne__):
注:一般有兩個(gè)參數(shù),一個(gè)是self篮赢,另一個(gè)obj(需要比較的對(duì)象)
# method-2(使用functools.total_ordering):
from functools import total_ordering
@total_ordering
class Demo(object):
def __init__(self):
pass
def __lt__(self, obj):
return
def __eq__(self, obj):
return
注:使用functools.total_ordering只需要重寫lt和eq就可以實(shí)現(xiàn)全部<,<=,>,>=,==,!=
# method-3(使用抽象基類):
from functools import total_ordering
from abc import ABCMeta, abstractmethod
@total_ordering
class DemoBase(object):
@abstractmethod
def foo(self):
"""
所有繼承該類的子類都需要實(shí)現(xiàn)這個(gè)方法
"""
pass
def __lt__(self, obj):
if not isinstance(obj, DemoBase):
raise TypeError('obj is not DemoBase')
return self.foo() < obj.foo()
def __eg__(self, obj):
if not isinstance(obj, DemoBase):
raise TypeError('obj is not DemoBase')
return self.foo() == obj.foo()
class Demo1(DemoBase):
def __init__(self):
pass
def foo(self):
"""
此方法必須實(shí)現(xiàn)
"""
pass
class Demo2(DemoBase):
def __init__(self):
pass
def foo(self):
"""
此方法必須實(shí)現(xiàn)
"""
pass
d1 = Demo1()
d2 = Demo2()
# 使用DemoBase做基類就可以實(shí)現(xiàn)<,<=,>,>=,==,!=
# 子類就不需要實(shí)現(xiàn)__lt__,__eg__等方法
d1 > d2
d2 <= d2
...
如何會(huì)用描述符對(duì)實(shí)例屬性做類型檢查
class Attr(object):
def __init__(self, name, type_):
# type_ 指python內(nèi)置類型
self.name = name
self.type_ = type_
def __get__(self, instance, cls):
return instance.__dict__[self.name]
def __set__(self, instance, value):
if not isinstance(value, self.type_):
raise TypeError('expected an %s' % self.type_)
return instance.__dict__[self.name] = value
def __delete__(self, instance):
def instance.__dict__[self.name]
class Person(object):
name = Attr('name', str)
age = Attr('age', int)
height = Attr('height', float)
p = Person()
p.name = 'cancan'
prnt p.name ==> 'cancan'
p.name = 11 ==> raise Exception
...
如何在環(huán)狀數(shù)據(jù)結(jié)構(gòu)中管理內(nèi)存
sys.getrefcount(obj) - 1 ==> 查看對(duì)象被引用的次數(shù)
gc.collect() ==> 在對(duì)象定義了析構(gòu)函數(shù)__del__時(shí)不能被清除引用
a = Demo()
a_wref = weakref.ref(a)弱引用方式齿椅,在循環(huán)引用中可以減少引用次數(shù),方便管理
使用字符串來(lái)調(diào)用類中的方法
# method-1(使用getattr方法):
# 第一個(gè)參數(shù)是類對(duì)象名,第二參數(shù)是類對(duì)象擁有的方法名字符串启泣,第三個(gè)參數(shù)是在沒(méi)有找到對(duì)應(yīng)方法時(shí)返回的值
f = getattr(class_obj_name, class_obj_func_name, None)
if f:
f(args)
# method-2(使用operator.methodcaller)
from operater import mehtodcaller
s = 'abc123abc456'
# 從第四個(gè)參數(shù)找'abc'
s.find('abc', 4) ==> 6
mehtodcaller('find', 'abc', 4)(s) ==> 6
多線程使用(使用threading.Thread)
from threading import Thread
def foo():
"""
需要線程處理的函數(shù)
"""
pass
# method-1(直接使用方法):
# 傳入args是一個(gè)元組涣脚,當(dāng)只有一個(gè)參數(shù)時(shí)需要加','
T = Thread(target=foo, args=(arg1, arg2..))
T.start()
# method-2(使用類):
class Demo(Thread):
"""
使用類可以更好的封裝數(shù)據(jù)
"""
def __init__(self, args):
# 必須條用Thread這個(gè)父類的構(gòu)造器
Thread.__init__(self)
self.args = args
def run(self):
"""
必須實(shí)現(xiàn)該方法
"""
# 指定需要運(yùn)行的函數(shù)
foo()
threads = []
for _ in xrange(1, 10):
T = Demo(args)
threads.append(T)
# 使用start()會(huì)進(jìn)入run()函數(shù)
T.start()
for _ in threads:
# join()是一個(gè)阻塞函數(shù),run()沒(méi)有運(yùn)行完不會(huì)退出
_.join()
注:python中的多線程只適合用于多I/O密集型的環(huán)境寥茫,如果是CPU密集型遣蚀,建議使用多進(jìn)程
線程間事件通知(使用Threading.Event)
from Threading import Event
def f(e):
print 'f 0'
e.wait()
print 'f 1'
e = Event()
t = Thread(target=f, args=(e, ))
t.start()
==> 'f 0' # 此時(shí)等待e調(diào)用set()方法
e.set()
==> 'f 1'
# 清理后才可以重復(fù)使用
e,clear()
注:
1.兩個(gè)線程都需要是Event
2.wait是一個(gè)阻塞函數(shù),會(huì)等待另一個(gè)線程中set的調(diào)用
如何實(shí)現(xiàn)線程本地的數(shù)據(jù)(使用threading.local)
from threading import local
l = local()
# 創(chuàng)建的x是在調(diào)用該線程時(shí)的私有數(shù)據(jù)
# 注意:目前l(fā)屬于主線程
l.x = 'a'
# 主線程中運(yùn)行會(huì)修改參數(shù)
def f();
l.x = 'b'
f()
print l.x ==> 'b'
# 子線程中運(yùn)行不會(huì)修改主線程中的數(shù)據(jù)
l.x = 'a'
threading.Thread(target=f).start()
print l.x ==> 'a'
如何使用線程池(python3中使用concurrent.futures.ThreadPoolExecutor)
# 3表示線程池的個(gè)數(shù)
executor = ThreadPoolExecutor(3)
def f(a, b):
print('f', a, b)
return a ** b
# 使用池中的一個(gè)線程
executor.submit(f, 2, 3,) ==> f 2 3
# future 是使用池中線程之后返回的對(duì)象
future = executor.submit(f, 2, 3) ==> f 2 4
# 如果f函數(shù)運(yùn)行時(shí)間很長(zhǎng)纱耻,result()就會(huì)阻塞
future.result() ==> 16
# 同時(shí)調(diào)用
executor.map(f, [2, 3, 4], [4, 5, 6])
==> f 2 4
f 3 5
f 5 6
注:如果線程數(shù)大于線程池的數(shù)芭梯,超出的線程就會(huì)等待線程池中的線程運(yùn)行完畢,有了空位才會(huì)運(yùn)行
如何使用多進(jìn)程(使用multiprocessing.Process)
from multiprocessing import Process
def f(s): print s
# 啟動(dòng)一個(gè)子進(jìn)程
p = Process(target=f, args=('hello', ))
p.start() ==> 'hello'
# 等待一個(gè)進(jìn)程結(jié)束
p.join()
# 與多線程的不同之處在于使用的虛擬地址空間是不同的
x = 1
def f():
global x
x = 5
# 在本進(jìn)程調(diào)用這個(gè)函數(shù)
f()
x ==> 5
# 啟動(dòng)子進(jìn)程來(lái)測(cè)試
x = 1
p = Process(target=f).start()
# 在主進(jìn)程中查看x弄喘,x并沒(méi)有變化玖喘,得出子進(jìn)程和主進(jìn)程看到的x不是同一個(gè)
# 說(shuō)明個(gè)個(gè)進(jìn)程之間他們的虛擬地址是獨(dú)立的
x ==> 1
進(jìn)程間進(jìn)行通信(使用multiprocessing.Queue和multiprocessing.Pipe)
# method-1(使用multiprocessing.Queue):
from multiprocessing import Process, Queue
q = Queue()
def f(q):
# 等待主進(jìn)程傳一個(gè)值
print 'start'
print q.get()
print 'end'
Process(target=f, args=(q, )).start()
==> start
# 打印start,等待主進(jìn)程傳入一個(gè)值
q.put(100)
# 當(dāng)傳入一個(gè)值后子進(jìn)程立馬執(zhí)行
==> 100
end
# method-2(使用multiprocessing.Pipe):
from multiprocessing import Process, Pipe
# Pipe會(huì)創(chuàng)建一個(gè)雙向管道
c1, c2 = Pipe()
# 從c1傳入'a'
c1.send('a')
# 只能從c2讀取'a'
c2.recv() ==> 'a'
# 從c2傳入'a'
c1.send('a')
# 只能從c1讀取'a'
c2.recv() ==> 'a'
def f(c):
c.send(c.recv() * 2)
# 啟動(dòng)子進(jìn)程蘑志,等待c1端輸入
Process(target=f, args=(c2,)).start()
c1.send(2)
c1.recv() ==> 4
如何使用裝飾器
# -*- coding: utf-8 -*-
def memo(func):
cache = {}
def wrap(*args):
if args not in cache:
cache[args] = func(*args)
return cache[args]
return wrap
# [題目1] 菲波那切數(shù)列(Fibonacci Sequence)累奈,又稱黃金分割數(shù)列
# 指的是這樣一個(gè)數(shù)列: 1,1,2,3,5,8,11.21...
# 這個(gè)數(shù)列從第三項(xiàng)開(kāi)始,每一項(xiàng)都等于前面兩項(xiàng)之和卖漫,求數(shù)列第n項(xiàng)
@memo
def fibonacci(n):
if n <= 1:
return 1
return fibonacci(n - 1) + fibonacci(n - 2)
print fibonacci(50)
# [題目2] 一個(gè)共有10個(gè)臺(tái)階的樓梯费尽,從下面走到上面,一次只能邁1~3個(gè)臺(tái)階
# 并且不能后退羊始,走完這個(gè)樓梯共有多少種方法
@ memo
def climb(n, steps):
count = 0
if n == 0:
count = 1
elif n > 0:
for step in steps:
count += climb(n - step, steps)
return count
print climb(10, (1,2,3))
為被裝飾的函數(shù)保存元數(shù)據(jù)
from functools import update_wrapper, wrap旱幼, WRAPPER_ASSIGNMENTS, WRAPPER_UPDATES
method-1(使用update_wrapper函數(shù)):
def Demo(func):
def wrapper(*args, **kwargs):
"""is wrapper"""
print 'is wrapper'
func(*args, **kargs)
# 使用原函數(shù)的屬性替換包裹函數(shù),倒數(shù)第二個(gè)參數(shù)是替換突委,倒數(shù)第一個(gè)參數(shù)是合并
# 后兩個(gè)參數(shù)可以使用如下默認(rèn)參數(shù)
# WRAPPER_ASSIGNMENTS ==> ('__module__', '__name__', '__doc__')
# WRAPPER_UPDATES ==> ('__dict__', )
update_warapper(wrapper, func, ("__name__", "__doc__"), ("__dict__", ))
return wrapper
method-2(使用wraps裝飾器):
def Demo(func):
@wraps(func)
def wrapper(*args, **kwargs):
"""is wrapper"""
print 'is wrapper'
func(*args, **kargs)
return wrapper
注:
函數(shù)元數(shù)據(jù)
f.__name__ : 函數(shù)的名字
f.__doc__ : 函數(shù)文檔字符串
f.__module__ : 函數(shù)所屬模塊名
f.__dict__ : 屬性字典
f.__defaults__ : 默認(rèn)參數(shù)元祖
...
定義帶參數(shù)的裝飾器(使用與python3)
# python3 中的模塊
from inspect import signature
def typeAssert(*ty_args, **kwy_args):
def decorator(func):
"""
解決思路:
func -> a,b
d = {'a': int, 'b', str}
使用signature實(shí)現(xiàn)上述d的生成
wrapper通過(guò)d來(lái)做驗(yàn)證
arg in d, instache(arg, d[arg])
"""
# 獲得func的簽名
sig = signature(func)
btypes = sig.bind_partial(*ty_args, **kwy_args).arguments
def wrapper(*args, **kwargs):
for name, obj in sig.bind(*args, **kwargs).arguments.iteritems():
if name in btypes:
if not isinstance(obj, btypes[name])
raise TypeError('%s must be %s' % (name, btypes[name]))
return func(*args, **kwargs)
return wrapper
return decorator
@typeAssert(int, str, list)
def f(a, b, c):
print(a, b, c)
f(1, 'abc', [1, 2, 3])
f(1, 2, [1, 2, 3])
==> 1 abc [1, 2, 3]
TypeError: 'b' must be '<class 'str'>'
如何實(shí)現(xiàn)屬性可修改的函數(shù)裝飾器(適用于python3)
from functools import wraps
from random import randint
import time
import logging
def warn(timeout):
def decorator(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
used = time.time() - start
if used > timeout:
msg = '%s : %s > %s' % (func.__name__, used, timeout)
logging.warn(msg)
return res
def setTimeout(k):
# python3 特有的類型柏卤,用于設(shè)置閉包內(nèi)的全局變量
# 如果不聲明冬三,此函數(shù)中的timeout與warn(timeout)中的timeout不是同一個(gè)
nonlocal timeout
timeout = k
wrapper.setTimeout = setTimeout
return wrapper
return decorator
@warn(0.5)
def test():
print 'in test'
while randint(0, 1):
time.sleep(0.5)
for _ in range(30):
test()
test.setTimeout(1)
for _ in range(30):
test()