项目地址

如何做一个聊天室

FastAPI的Websocket程序结构

协程模型

代码解析

先启动一个全局的dispatch_message协程,用于广播消息

用户加入聊天室

用户处理消息

如何让协程优雅的退出

总结

交流

100行Python代码实现FastAPI Websocket 聊天室(纯协程方案)

本文发表于入职啦(公众号: ruzhila) 大家可以访问入职啦学习更多的编程实战。

项目地址

代码已经开源, fastapi_chatroom 👏 欢迎Star

代码运行效果: screenshot

所有的项目都在github上开源:100-line-code 欢迎Star 👏

用100行代码的不同语言(Java、Python、Go、Javascript、Rust)实现项目,通过讲解项目的实现,帮助大家学习编程

我们会定期在群里分享最新的项目实战代码,包括不同语言的实现

老师还会详细讲解代码优化的思路,扫码加入实战群:

入群学习

如何做一个聊天室

网页的聊天室最基本的功能就是能输入消息,并且广播给聊天室里面的所有成员,要支持消息的发生和接送,我们可以使用Websocket协议来实现这个功能

如果没有Websocket, 网页如果要获取服务器上的消息,就只能通过Pull的方式去获取最新的消息,也就是我们常说的轮询,这样会导致服务器的压力增大,而且消息的实时性也不高

这是一个简单的聊天室的工作流程:

workflow

  1. Bob 连接到聊天室,服务器启动了2个协程,一个用于接收消息,一个用于发送消息
  2. Bob 发送消息到服务端, 然后通过 dispatch_messsage 协程广播给所有的用户
  3. 聊天室所有的用户 收到消息之后,通过websocket发送到网页上,显示消息

这是最基本的聊天室的工作流程,通过这样的流程,我们可以大概知晓我们要怎么设计程序结构

下一步就是我们要用FastAPI实现简单的Websocket代码

FastAPI的Websocket程序结构

FastAPI是完全基于asyncio的HTTP Server框架,不同于其他的多线程框架,FastAPI是基于协程的框架,可以支持异步编程,也就是要求所有的函数都是异步函数

可以看一下如何用FastAPI实现一个简单的Websocket程序

from fastapi import FastAPI, WebSocket
app = FastAPI()

@app.websocket("/chat")
async def chatroom(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message text was: {data}")

通过这个代码,我们可以很简单就可以启用/chat的Websocket服务,通过receive_text接收消息,通过send_text发送消息,这个相当于做了一个简单的echo功能。

先上代码:

code

协程模型

每个用户加入之后,都需要启动2个协程,协程之间通过asyncio.Queue来通信,这种消息通讯的方式是现代主流语言,比如go和rust都提倡的通信方式

  1. 系统启动时候,会启动一个全局的room_queue,当用户所有发送的消息都会放到这个队列里面,然后启动一个dispatch_message协程,用于广播消息
  2. 每个用户加入聊天室之后,都会启动2个协程,一个用于接收消息(task_recv_from_client),一个用于发送消息(task_send_to_client)

也就是说,用户发送的消息,收到后放入到room_queue,然后dispatch_message协程广播给所有的用户,将消息放入到用户的txbuf里面 然后用户的task_send_to_client 这个协程会从各自的txbuf中获取消息,然后发送到websocket上

可能会有人问为什么不能收到消息之后,遍历所有的用户的websocket直接发送消息?

其实这样做也不是不可以,如果用户数量比较多,那么每次发送消息都需要做一次O(n)的遍历,从算法上就不是一个好的模型

通过这种Pub/Sub的模式,可以将消息广播的复杂度降低到O(1),这样可以支持更多的用户,并且充分利用好协程的并发优势

代码解析

先启动一个全局的dispatch_message协程,用于广播消息

dispatch 我们看到代码其实很简单,就是

  • 98行等待消息进来
  • 102-105行 广播给所有的用户

用户加入聊天室

chat 用户加入一个聊天室之后,会做一些基础的初始化工作,比如检查用户是否已经加入,名字是否规范

  • 71和79行 分别处理用户的加入和退出,并且广播系统消息给所有的用户,这样可以确保用户的加入和退出都是准确的
  • 74行 去执行用户的消息处理, 这时候是一个阻塞的操作,如果用户退出,那么这个协程也会退出

用户处理消息

我们定义了一个Client的类,用来处理用户的封装,这样可以更好的管理用户的状态 serve

  • 51行 通过asyncio.gather启动多个协程,这个功能是同步协程最好的方案,相当于并行启动多个协程,任意一个协程退出,其他的协程也会退出,这样我们就可以保证,只要有一个任务退出,不管是接送还是发送,都会终止,不会出现僵尸任务
  • 35-40行 收到用户输入的消息之后,放入到room_queue里面,其他就不用管了,前面的dispatch_message协程会广播给所有的用户
  • 42-47行tx里面获取消息,这个队列的消息是来自于dispatch_message协程广播的消息

通过这样的设计,可以提升效率,降低系统的复杂度

如何让协程优雅的退出

要知道,我们大部分的协程都是一个死循环,如果没有处理好退出的逻辑,就会导致程序的内存泄漏,所以我们要确保协程的退出是优雅的 协程都是通过asyncio.Queue来通信,我们可以通过None来通知协程退出,只要收到None就退出即可

这种做法就是完全基于消息的协同,通过消息来控制协程的生命周期,这样可以确保协程的退出是优雅的

  • 57行 当一个任意一个协程退出的时候,给tx发送一个None
  • 45行 就可以让task_send_to_client结束

总结

通过这个项目,我们学会了如何使用Python的asyncio + FastAPI+ Websocket实现一个简单的聊天室,学会处理Websocket请求,了解协程的使用

  • FastAPI是最常用的Python Web框架之一,它的性能非常好,支持异步编程,可以用于高并发的场景
  • asyncio 是python最核心的异步协程库,可以用于处理IO密集型的任务, 通过async/await关键字,可以实现异步编程, 提高程序的性能 也简化了异步任务之间的同步操作
  • Websocket是一种基于TCP协议的全双工通信协议,可以实现客户端和服务端之间的实时通信,默认所有的浏览器都支持Websocket协议

交流

我们构建了一个100行代码项目的实战群,大家可以扫码加入,一起学习编程

入群学习

也可以访问入职啦学习更多的编程实战

所有的代码都在github上开源:100-line-code 欢迎Star 👏

友情链接:

Copyright© 2024 杭州园中葵科技有限公司 版权所有