并发编程概念大总结–干货
- 2019 年 10 月 3 日
- 笔记
??
? ????????????????????????????????
????????????????????????????????,???????????,????????,???????????? ?Lock??
???????? ?? ?? ??
??-->????? -->??-??io??->??-??????->?? -?????->??
??????????????????????CPU?????????
- ??????????????????
- ???????
- ??????????????
- ????????????
- ???????????????????
- ????????????????????
- ???????????????????????
- ????????????
???????
??? ??????
??? ??? ???????????????
from multiprocessing import Process import os def func(): print(os.getpid(), os.getppid()) # pid process ???id ppid ???id if __name__ == '__main__': # ???????????????,??__main__?? print('main:', os.getpid(), os.getppid()) p = Process(target=func) # target????????????????? p = Process(target=func,args=('??',)) # p = Process(target=func,kwargs={'name':'??'}) ?????? p.start() #????? ??????????????p.run() ????windows?Process()????# if __name__ == '__main__':? ??Windows??fork????????????Python?????????? ????????Process????????????????????????????? ?????Process???????????if __name__ == “__main __”???if?????????????????
???????????
- ???????????????run?????????
from multiprocessing import Process # class ??(Process): # def __init__(self,??): # self.???=?? # super().__init__() # def run(self): # print("?????????") # p=??() # p.start()
Process????????????name pid ident daemon terminate() isalive() p.name ??????? is_alive()???????????????????????true??????????false terminate() #??????? ?????
?????
# ????????????????????????? # ??????????????????????????????????? # ?????????????????????????????????????? ?????????? - ??????????????????? - ???????????????????? ????????????????????????????????
?start????????daemon=True,???????????????????? p = Process(target=son2) p.daemon = True # ????p.start()???,????p??????? p.start()
??????????????????????????????? # ??????????? # ?????????????????
??????????????????
- ???????–>??????–>?????–>?????
- ?????–>??????? –>??????–>?????
??????(IPC):
-
????????????????????
Queue?????socket?????????????????? ????? ??????
-
??????????????????????????
??????(?????):memcache/redis/rabbitmq/kafka
????????Manager???????
??????????????????????
????? ??????
- ???????????????????????????
???:???????????????? ????????None
??????????????????????????
# ????????????????????? # ?????????????????????????? # ?????????????????????????? # ??????????????????????? # ?????????????????????????? # ?????????????????? # ???????????????? ???? # ????????? ?? ?????????????
from multiprocessing import Queue,Pipe # ??:ipc??????????????????????????????????? # ??socket????pickle??????? # pipe?????????????????????????????? # ??socket?pickle ??????? ?????
?—-multiprocessing.Lock:
???????????????????????????????????????????????????????????
?:??????????????????????
?????????????????
# from multiprocessing import Lock #????????????????? # lock=Lock() # lock.acquire() # '''?????????''' # lock.release() # with lock? # ...
???
???????cpu????cpu???????
???
??cpu?????????10?????????cpu
?????????????CPU,?????????CPU???
????????????
?????????????CPU??????????
?????????
??????????????
??A???????B??????B??????????A??
??????????????????
??A???????B???????B??????????A??
??:??CPU??? input accept recv sleep connect
??????CPU???
# ??????????????????????????????????????CPU??? num=input('>>>') # ???????????????????????????????????????CPU?? ret=eval(1+2+3+4) # ?????:start() ??????????????????????????????????CPU?? # ????:??????????????????????????????????CPU??? # 10????????????????????????????????????????
# ???? # ???????????cpu????input sleep recv accept content get # ????? ******* # ???????????cpu??????????????? ?strip,eval,max,sorted????? # ???? # ???????????????????????????????????????????????? # ????? ******* # ??????????????????? start terminate
??
?????????CPU???????????????????????????????????????cpu????????????????????????????????CPU????????????????????????????????????????????????????????????
????? GIL (global interpreter lock)?
- ?cpython??????GIL?????????;????????????????GC??????????????????????????? ??????????????????CPU??,??????????????????????????
- ????IO?????????CPU????????CPU??????????????????????????????io??????
GC:?????????????
pypy??? gc?????
jpython??? gc?????
????:
# import time # from threading import Thread # def func(i): # print("start%s"%i) # time.sleep(1) # print("end%s"%i) # for i in range(10): # Thread(target=func,args=(i,)).start() # ????????? # from threading import Thread # class MyThread(Thread): # def __init__(self,a,b): # self.a=a # self.b=b # super().__init__() # def run(self): # print(self.ident) # t=MyThread(1,2) # t.start() #???? ???????run?? # print(t.ident)
# ??????????? ??terminate # ????????????????????? # current_thread() ???????,current_thread().itent()???id # enumerate() ?? ????????????,????????? # active_count() ?? ????????????
??????????????????????????????
;?????????????
???? : ???????????????;
????????????????????????? # ???? ??????????????? # ????????????????????????????? # ???? ???????????? # ????????????????????????????? # ???????????????? # ???????????????????????????????????????????????? # ?????-->?????-->?????-->??????????????-->?????????
?????????
# += -= *= /= while if ????? +??????????? # append pop strip???? # ???????????????????????? ????
????
?????? ??????
import time class A: from threading import Lock __instance=None lock=Lock() def __new__(cls, *args, **kwargs): with cls.lock: if not cls.__instance: time.sleep(0.000001) cls.__instance=super().__new__(cls) return cls.__instance def func(): a=A() print(a) from threading import Thread for i in range(10): Thread(target=func).start()
???????????
- ??? ?(RLock)??????????????? ???????????????
- ?????????????acquire??????acquire??????release
- ??? ????????????????????????????????????????
- ????????????????acquire?????acquire????release
???????????
??????/????????????????????????????????????????????
???????
- ? ????????????????????????????????????????
???queue?
??????????????????,????
?
?????
- ???????????????????????????????????
???????
- ???????????????????????????????????????
- ???????????????????????????????????????????????/????????
- ?????/??????????????????????????????????????????????????
# multiprocessing ?? ??threading??pool # concurrent.futures???????????????????????? #ThreadPoolExecutor??????????? #ProcessPoolExecutor: ??????????
?????????io??????
import time import random from threading import current_thread from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a,b): print(current_thread().ident,'start',a,b) #???? time.sleep(random.randint(1,4)) print(current_thread().ident,'end') tp=ThreadPoolExecutor(4) for i in range(20): tp.submit(func,i,i+1) #?????? tp.submit(func, a=i, b=i + 1) #??????
????
????????????io?????????????????????? ???input??
import os import time import random from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a,b): print(os.getpid(),'start',a,b) #???? time.sleep(random.randint(1,4)) print(os.getpid(),'end') if __name__ == '__main__': pp=ProcessPoolExecutor(4) for i in range(20): pp.submit(func,i,i+1) #?????? pp.submit(func, a=i, b=i + 1) #??????
????:????
import time import random from threading import current_thread from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a,b): print(current_thread().ident,'start',a,b) #???? time.sleep(random.randint(1,4)) print(current_thread().ident,'end',a) return ?a,a*b) def print_func(ret): #???? print(ret.result()) if __name__ == '__main__': tp=ThreadPoolExecutor(4) for i in range(20): #?????????? ret=tp.submit(func,i,i+1) #?????? ret.add_done_callback(print_func) #???? # ???? ?ret?????????????ret???????????????print_func???? # ??????????????????????????
??
????????????
- ??????????????????????????
- ????????????????io????????????io?????????
# ?????io?????? # gevent:???greenlet?????????+????io??? # asyncio:???yield?????????+????io??? # tornado???web?? # yield from:????????? # send????????? # asyncio ?? ??Python????????????? # ????Python????????????aysnc await
??????????????
- ?????????
- ?????????????????????????????
- ??????????????cpu????????????
???
import gevent def func(): print('start func') gevent.sleep(1) print('end func') g=gevent.spawn(func) g1=gevent.spawn(func) gevent.joinall([g,g1])
asyncio?
import asyncio async def func(name): #async???? print("start",name) #await ???????async??? await asyncio.sleep(1) #await???????????? print('end') loop=asyncio.get_event_loop() #???? loop.run_until_complete(asyncio.wait([func('wudi'),func('anwen')]))
????????
# ????????????????????????????????????????? # ?????????????????????????????????,?????? # ???????????????? ???????????????????????????????????????? # ??????io???????????????(socket,?????) asyncio async def func(name): #async???? print("start",name) #await ???????async??? await asyncio.sleep(1) #await???????????? print('end') loop=asyncio.get_event_loop() #???? loop.run_until_complete(asyncio.wait([func('wudi'),func('anwen')]))
????????
# ????????????????????????????????????????? # ?????????????????????????????????,?????? # ???????????????? ???????????????????????????????????????? # ??????io???????????????(socket,?????)