全异步IO框架asyncio.docx
- 文档编号:18494336
- 上传时间:2023-08-18
- 格式:DOCX
- 页数:24
- 大小:130.90KB
全异步IO框架asyncio.docx
《全异步IO框架asyncio.docx》由会员分享,可在线阅读,更多相关《全异步IO框架asyncio.docx(24页珍藏版)》请在冰点文库上搜索。
全异步IO框架asyncio
异步IO框架:
asyncio
1.如何定义/创建协程
还记得在前两章节的时候,我们创建了生成器,是如何去检验我们创建的是不是生成器对象吗?
我们是借助了isinstance()函数,来判断是否是collections.abc 里的Generator类的子类实现的。
同样的方法,我们也可以用在这里。
只要在一个函数前面加上 async 关键字,这个函数对象是一个协程,通过isinstance函数,它确实是Coroutine类型。
fromcollections.abcimportCoroutine
asyncdefhello(name):
print('Hello,',name)
if__name__=='__main__':
#生成协程对象,并不会运行函数内的代码
coroutine=hello("World")
#检查是否是协程Coroutine类型
print(isinstance(coroutine,Coroutine))#True
前两节,我们说,生成器是协程的基础,那我们是不是有办法,将一个生成器,直接变成协程使用呢。
答案是有的。
importasynciofromcollections.abcimportGenerator,Coroutine
'''
只要在一个生成器函数头部用上@asyncio.coroutine装饰器
就能将这个函数对象,【标记】为协程对象。
注意这里是【标记】,划重点。
实际上,它的本质还是一个生成器。
标记后,它实际上已经可以当成协程使用。
后面会介绍。
'''
@asyncio.coroutinedefhello():
#异步调用asyncio.sleep
(1):
yieldfromasyncio.sleep
(1)
if__name__=='__main__':
coroutine=hello()
print(isinstance(coroutine,Generator))#True
print(isinstance(coroutine,Coroutine))#False
2.asyncio的几个概念
在了解asyncio的使用方法前,首先有必要先介绍一下,这几个贯穿始终的概念。
∙event_loop事件循环:
程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。
当满足事件发生的时候,调用相应的协程函数。
∙coroutine协程:
协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。
协程对象需要注册到事件循环,由事件循环调用。
∙future对象:
代表将来执行或没有执行的任务的结果。
它和task上没有本质的区别
∙task任务:
一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
Task对象是Future的子类,它将coroutine和Future联系在一起,将coroutine封装成一个Future对象。
∙async/await关键字:
python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
其作用在一定程度上类似于yield。
这几个概念,干看可能很难以理解,没事,往下看实例,然后再回来,我相信你一定能够理解。
3.学习协程是如何工作的
协程完整的工作流程是这样的
∙定义/创建协程对象
∙将协程转为task任务
∙定义事件循环对象容器
∙将task任务扔进事件循环对象中触发
光说不练假把戏,一起来看下
importasyncio
asyncdefhello(name):
print('Hello,',name)
#定义协程对象
coroutine=hello("World")
#定义事件循环对象容器
loop=asyncio.get_event_loop()#task=asyncio.ensure_future(coroutine)
#将协程转为task任务
task=loop.create_task(coroutine)
#将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)
输出结果,当然显而易见
Hello,World
4.await与yield对比
前面我们说,await用于挂起阻塞的异步调用接口。
其作用在一定程度上类似于yield。
注意这里是,一定程度上,意思是效果上一样(都能实现暂停的效果),但是功能上却不兼容。
就是你不能在生成器中使用await,也不能在async定义的协程中使用yieldfrom。
小明不是胡说八道的。
有实锤。
再来一锤。
除此之外呢,还有一点很重要的。
∙yieldfrom 后面可接 可迭代对象,也可接future对象/协程对象;
∙await 后面必须要接 future对象/协程对象
如何验证呢?
yieldfrom 后面可接 可迭代对象,这个前两章已经说过了,这里不再赘述。
接下来,就只要验证,yieldfrom和await都可以接future对象/协程对象就可以了。
验证之前呢,要先介绍一下这个函数:
asyncio.sleep(n),这货是asyncio自带的工具函数,他可以模拟IO阻塞,他返回的是一个协程对象。
func=asyncio.sleep
(2)print(isinstance(func,Future))#Falseprint(isinstance(func,Coroutine))#True
还有,要学习如何创建Future对象,不然怎么验证。
前面概念里说过,Task是Future的子类,这么说,我们只要创建一个task对象即可。
importasynciofromasyncio.futuresimportFuture
asyncdefhello(name):
awaitasyncio.sleep
(2)
print('Hello,',name)
coroutine=hello("World")
#将协程转为task对象
task=asyncio.ensure_future(coroutine)
print(isinstance(task,Future))#True
好了,接下来,开始验证。
5.绑定回调函数
异步IO的实现原理,就是在IO高的地方挂起,等IO结束后,再继续执行。
在绝大部分时候,我们后续的代码的执行是需要依赖IO的返回值的,这就要用到回调了。
回调的实现,有两种,一种是绝大部分程序员喜欢的,利用的同步编程实现的回调。
这就要求我们要能够有办法取得协程的await的返回值。
importasyncioimporttime
asyncdef_sleep(x):
time.sleep
(2)
return'暂停了{}秒!
'.format(x)
coroutine=_sleep
(2)
loop=asyncio.get_event_loop()
task=asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
#task.result()可以取得返回结果
print('返回结果:
{}'.format(task.result()))
输出
返回结果:
暂停了2秒!
还有一种是通过asyncio自带的添加回调函数功能来实现。
importtimeimportasyncio
asyncdef_sleep(x):
time.sleep
(2)
return'暂停了{}秒!
'.format(x)
defcallback(future):
print('这里是回调函数,获取返回结果是:
',future.result())
coroutine=_sleep
(2)
loop=asyncio.get_event_loop()
task=asyncio.ensure_future(coroutine)
#添加回调函数
task.add_done_callback(callback)
loop.run_until_complete(task)
输出
这里是回调函数,获取返回结果是:
暂停了2秒!
emmm,和上面的结果是一样的。
协程中的多任务。
1.协程中的并发
协程的并发,和线程一样。
举个例子来说,就好像一个人同时吃三个馒头,咬了第一个馒头一口,就得等这口咽下去,才能去啃第其他两个馒头。
就这样交替换着吃。
asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。
第一步,当然是创建多个协程的列表。
#协程函数asyncdefdo_some_work(x):
print('Waiting:
',x)
awaitasyncio.sleep(x)
return'Doneafter{}s'.format(x)
#协程对象
coroutine1=do_some_work
(1)
coroutine2=do_some_work
(2)
coroutine3=do_some_work(4)
#将协程转成task,并组成list
tasks=[
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
第二步,如何将这些协程注册到事件循环中呢。
有两种方法,至于这两种方法什么区别,稍后会介绍。
∙使用asyncio.wait()
loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))
∙使用asyncio.gather()
#千万注意,这里的「*」不能省略loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))
最后,return的结果,可以用task.result()查看。
fortaskintasks:
print('Taskret:
',task.result())
完整代码如下
importasyncio
#协程函数asyncdefdo_some_work(x):
print('Waiting:
',x)
awaitasyncio.sleep(x)
return'Doneafter{}s'.format(x)
#协程对象
coroutine1=do_some_work
(1)
coroutine2=do_some_work
(2)
coroutine3=do_some_work(4)
#将协程转成task,并组成list
tasks=[
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
fortaskintasks:
print('Taskret:
',task.result())
输出结果
Waiting:
1Waiting:
2Waiting:
4Taskret:
Doneafter1sTaskret:
Doneafter2sTaskret:
Doneafter4s
2.协程中的嵌套
使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。
来看个例子。
importasyncio
#用于内部的协程函数asyncdefdo_some_work(x):
print('Waiting:
',x)
awaitasyncio.sleep(x)
return'Doneafter{}s'.format(x)
#外部的协程函数asyncdefmain():
#创建三个协程对象
coroutine1=do_some_work
(1)
coroutine2=do_some_work
(2)
coroutine3=do_some_work(4)
#将协程转为task,并组成list
tasks=[
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
#【重点】:
await一个task列表(协程)
#dones:
表示已经完成的任务
#pendings:
表示未完成的任务
dones,pendings=awaitasyncio.wait(tasks)
fortaskindones:
print('Taskret:
',task.result())
loop=asyncio.get_event_loop()
loop.run_until_complete(main())
如果这边,使用的是asyncio.gather(),是这么用的
#注意这边返回结果,与await不一样
results=awaitasyncio.gather(*tasks)forresultinresults:
print('Taskret:
',result)
输出还是一样的。
Waiting:
1Waiting:
2Waiting:
4Taskret:
Doneafter1sTaskret:
Doneafter2sTaskret:
Doneafter4s
仔细查看,可以发现这个例子完全是由上面「协程中的并发」例子改编而来。
结果完全一样。
只是把创建协程对象,转换task任务,封装成在一个协程函数里而已。
外部的协程,嵌套了一个内部的协程。
其实你如果去看下asyncio.await()的源码的话,你会发现下面这种写法
loop.run_until_complete(asyncio.wait(tasks))
看似没有嵌套,实际上内部也是嵌套的。
这里也把源码,贴出来,有兴趣可以看下,没兴趣,可以直接跳过。
#内部协程函数asyncdef_wait(fs,timeout,return_when,loop):
assertfs,'SetofFuturesisempty.'
waiter=loop.create_future()
timeout_handle=None
iftimeoutisnotNone:
timeout_handle=loop.call_later(timeout,_release_waiter,waiter)
counter=len(fs)
def_on_completion(f):
nonlocalcounter
counter-=1
if(counter<=0or
return_when==FIRST_COMPLETEDor
return_when==FIRST_EXCEPTIONand(notf.cancelled()and
f.exception()isnotNone)):
iftimeout_handleisnotNone:
timeout_handle.cancel()
ifnotwaiter.done():
waiter.set_result(None)
forfinfs:
f.add_done_callback(_on_completion)
try:
awaitwaiter
finally:
iftimeout_handleisnotNone:
timeout_handle.cancel()
done,pending=set(),set()
forfinfs:
f.remove_done_callback(_on_completion)
iff.done():
done.add(f)
else:
pending.add(f)
returndone,pending
#外部协程函数asyncdefwait(fs,*,loop=None,timeout=None,return_when=ALL_COMPLETED):
iffutures.isfuture(fs)orcoroutines.iscoroutine(fs):
raiseTypeError(f"expectalistoffutures,not{type(fs).__name__}")
ifnotfs:
raiseValueError('Setofcoroutines/Futuresisempty.')
ifreturn_whennotin(FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED):
raiseValueError(f'Invalidreturn_whenvalue:
{return_when}')
ifloopisNone:
loop=events.get_event_loop()
fs={ensure_future(f,loop=loop)forfinset(fs)}
#【重点】:
await一个内部协程
returnawait_wait(fs,timeout,return_when,loop)
3.协程中的状态
还记得我们在讲生成器的时候,有提及过生成器的状态。
同样,在协程这里,我们也了解一下协程(准确的说,应该是Future对象,或者Task任务)有哪些状态。
Pending:
创建future,还未执行Running:
事件循环正在调用执行任务Done:
任务执行完毕Cancelled:
Task被取消后的状态
可手工 python3xx.py 执行这段代码,
importasyncioimportthreadingimporttime
asyncdefhello():
print("Runningintheloop...")
flag=0
whileflag<1000:
withopen("F:
\\test.txt","a")asf:
f.write("------")
flag+=1
print("Stoptheloop")
if__name__=='__main__':
coroutine=hello()
loop=asyncio.get_event_loop()
task=loop.create_task(coroutine)
#Pending:
未执行状态
print(task)
try:
t1=threading.Thread(target=loop.run_until_complete,args=(task,))
#t1.daemon=True
t1.start()
#Running:
运行中状态
time.sleep
(1)
print(task)
t1.join()
exceptKeyboardInterruptase:
#取消任务
task.cancel()
#Cacelled:
取消任务
print(task)
finally:
print(task)
顺利执行的话,将会打印 Pending -> Pending:
Runing -> Finished 的状态变化
假如,执行后立马按下Ctrl+C,则会触发task取消,就会打印 Pending -> Cancelling -> Cancelling 的状态变化。
4.gather与wait
还记得上面我说,把多个协程注册进一个事件循环中有两种方法吗?
∙使用asyncio.wait()
loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))
∙使用asyncio.gather()
#千万注意,这里的「*」不能省略loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))
asyncio.gather 和 asyncio.wait 在asyncio中用得的比较广泛,这里有必要好好研究下这两货。
还是照例用例子来说明,先定义一个协程函数
importasyncio
asyncdeffactorial(name,number):
f=1
foriinrange(2,number+1):
print("Task%s:
Computefactorial(%s)..."%(name,i))
awaitasyncio.sleep
(1)
f*=i
print("Task%s:
factorial(%s)=%s"%(name,number,f))
5.接收参数方式
asyncio.wait
接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。
它可以这样,用asyncio.ensure_future转为task对象
tasks=[
asyncio.ensure_future(factorial("A",2)),
asyncio.ensure_future(factorial("B",3)),
asyncio.ensure_future(factorial("C",4))
]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
也可以这样,不转为task对象。
loop=asyncio.get_event_loop()
tasks=[
factorial("A",2),
factorial("B",3),
factorial("C",4)
]
loop.run_until_complet
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- 异步 IO 框架 asyncio