🩰

Tornado

Date
Oct 13, 2023
Assign
Status
In progress
 
notion image
 

简介

Tornado 是一个python web框架和异步网络库,最初开发于 FriendFeed . 通过使用非阻塞网络I/O,Tornado可以扩展到数万个开放连接,使其非常适合 long polling , WebSockets 以及其他需要与每个用户建立长期连接的应用程序。
 
龙卷风以其高性能着称。它的设计允许处理大量并发连接,Tornado 在设计之初就考虑到了性能因素。 旨在解决 C10K(同时处理一万个连接的缩写),这样的设计使得其成为一个拥有非常高性能的框架 此外,它还拥有处理安全性、用户验证、社交网络以及与外部服务 (如数据库和网站API)进行异步交互的工具。
 
基于线程的服务器,比如Apache,为了传入的连接,维护了一个操作系统的线程池。Apache会为每个HTTP连接分配线程池中的一个线程,如果所有的线程都处于被占用的状态并且尚有内存可用时,则生成一个新的线程。尽管不同的操作系统会有不同的设置,大多数Linux发布版中都是默线程堆大小为8MB。Apahe的架构在大负载下变得不可预测,为每个打开的连接维护一个大的线程池等待数据极易讯速耗光服务器的内存资源
 
而为什么会有这么大的负载呢?服务器同时要对许多客户端提供服务,而服务器端的处理流程,只要遇到了10操作,往往需要长时间的等待。
notion image
 
整个流程52ms,中间50ms是在等待耗时操作结束,这段时间服务进程的CPU是空闲的,浪费掉了。
 
换个角度看,是CPU在各个流程间跳来跳去,专门处理那些红色的片段。这种模式下,服务进程有效利用了等待时间,实际花费的只是一头一尾两段真正占用CPU的时间。这样服务进程每秒可以处理的请求可成几十倍的增长。
notion image
 
而这样的模式,就是在一个进程之间同时处理多个协程,充分利用CPU时间,这就是我们需要的异步编程. 而 Tornado 就是可以支持异步非堵塞I0模式
 

安装

pip install tornado

第一个小程序

from tornado import web from tornado import ioloop class IndexHandler(web.RequestHandler): def get(self): self.write("hello,world") if __name__ == '__main__': app = web.Application( handlers=[("/", IndexHandler)],# 设置路由和handler方法 debug=True,# 设置debug 热启动 ) # 设置app对象 app.listen(8888) # 设置监听端口 ioloop.IOLoop.current().start() # 开始事件循环 # netstat -anto # 查看windows端口占用情况
 

同步IO 与异步IO

 
from tornado import web,ioloop from time import sleep class IndexHandler(web.RequestHandler): def get(self): sleep(3) self.write("hello,world") if __name__ == '__main__': app = web.Application( handlers=[("/", IndexHandler)],# 设置路由和handler方法 debug=True,# 设置debug 热启动 ) # 设置app对象 app.listen(8888) # 设置监听端口 ioloop.IOLoop.current().start() # 开始事件循环 # netstat -anto # 查看windows端口占用情况
加了sleep(3) 真的就硬生生等待了三秒
 
每次访问都会重复三秒。导致第二个访问就花了6秒。而且如果有了别的handler也会阻塞。
notion image
 

tornado 配置文件的使用

 

tornado.option

options 可以让服务运行前提前设置参数,而常见的2种设置参数方式为
  1. 命令行设置
  1. 文件设置

命令行解析

使用 tornado.options.define 前定义,通常在模块的顶层 然后,可以将这些选项作为以下厘性的属性进行访问 tornado.options.options
但要解析命令行参数时,需要使用 tornado.options.parse_command_line 来解析参数 具体代码如下:
from tornado import web, ioloop from tornado.options import define,options,parse_command_line # 定义key来接受传递来的参数 define('port',default=8000,help="port listen to",type=int) define('debug',default=True,help="set debug mode",type=bool) # 解析命令传进来的参数 parse_command_line() class IndexHandler(web.RequestHandler): async def get(self): self.write("hello,world tornado") if __name__ == '__main__': app = web.Application( handlers=[ ("/", IndexHandler) ], # 设置路由和handler方法 debug=options.debug, # 设置debug 热启动 ) # 设置app对象 app.listen(options.port) # 设置监听端口 ioloop.IOLoop.current().start() # 开始事件循环 # netstat -anto # 查看windows端口占用情况
命令行中
python main.py --port=7000 - -debug=True
 
 

文件创建设置

创建文件,将必要的参数直接写入即可
server.conf
port = 7000 debug = True
 
设置好文件后,通过 tornado.options.parse_config_file 来解析
from tornado import web, ioloop from tornado.options import define,options,parse_config_file # 定义key来接受传递来的参数 define('port',default=8000,help="port listen to",type=int) define('debug',default=True,help="set debug mode",type=bool) # 解析命令传进来的参数 parse_config_file("server.conf") class IndexHandler(web.RequestHandler): async def get(self): self.write("hello,world tornado") if __name__ == '__main__': app = web.Application( handlers=[ ("/", IndexHandler) ], # 设置路由和handler方法 debug=options.debug, # 设置debug 热启动 ) # 设置app对象 app.listen(options.port) # 设置监听端口 ioloop.IOLoop.current().start() # 开始事件循环 # netstat -anto # 查看windows端口占用情况
 
tips: 有了以上方法,就可更新的设置服务器的各种通用参数了,如数据库参数、文件目录参数、服务器参数等
 

tornado URL的设置

 

URL编写

1.完整匹配 2.通过re 3.通ur1传递参数 跳转 4.1 ur1 反转 name属性,reverse ur1 4.2 web .URLSpec
 
from tornado import web, ioloop # 最简单的页面 class IndexHandler(web.RequestHandler): async def get(self): self.write("hello,tornado") # 最简单的页面2 class HomeHandler(web.RequestHandler): async def get(self): self.write("hello,homePage") # URL 参数传参 class UserHandler(web.RequestHandler): async def get(self, userID): self.write(f"hello,User,您的登录编号是{userID}") # URL 参数传参 class UserNameHandler(web.RequestHandler): async def get(self, username): self.write(f"hello,您的登录用户名是{username}") # 双参数传递 class UserPasswordHandler(web.RequestHandler): async def get(self, username, password): self.write(f"hello,您的登录用户名是{username},您的密码是{password}") # 跳转函数 class RedirectHandler(web.RequestHandler): async def get(self): # self.redirect("home") self.redirect(self.reverse_url("home")) # 跳转函数 class RedirectTipHandler(web.RequestHandler): def initialize(self,username,password) -> None: self.username = username self.password = password print(username, password) async def get(self): # self.redirect("home") self.redirect(self.reverse_url("home")) if __name__ == '__main__': args = { "username": "lzc", "password": 'admin*123' } app = web.Application( handlers=[ ("/", IndexHandler), # ("/home/?", HomeHandler, "home"), web.URLSpec("/home/?", HomeHandler, name="home"), ("/user/(\d+)/?", UserHandler), ("/user/(\w+)/?", UserNameHandler), # ("/user/(\w+)/(\w+)/?", UserPasswordHandler), ("/user/(?P<username>\w+)/(?P<password>\w+)/?", UserPasswordHandler), web.URLSpec("/redirect/?", RedirectHandler), web.URLSpec("/redirect_tip/?", RedirectTipHandler, args), ], # 设置路由和handler方法 debug=True, # 设置debug 热启动 ) # 设置app对象 app.listen(8000) # 设置监听端口 ioloop.IOLoop.current().start() # 开始事件循环 # netstat -anto # 查看windows端口占用情况
 

tornado RequestHandler的使用

RequestHandler

tornado.web.RequestHandler
 
Http请求处理程序的基类
 

initialize

钩子类初始化,要求每个请求
def initialize(self, db=None): # 不要使用异步 这是初始化操作 self.db = db

prepare

相当于生命周期开始函数
在get,post,etc之前的请求开始时调用
执行通用初始化。
# 生命周期函数, 相当于onBefore def prepare(self) -> Optional[Awaitable[None]]: print(1111) return
 

on_finish

请求结束后调用
 
def on_finish(self): print(3333)
 
此方法以执行清理。日志记录等
  • get
  • head
  • post
  • delete
  • patch
  • put
  • options
async def get(self): print(222) self.write("成功访问get请求") async def post(self): self.write("成功访问post请求")

get_argument

⇒get_query_argument
在get中一样的。但是在post中 get_argument 也可以获取到
notion image
notion image
get:
 
async def get(self): name = self.get_argument('q') names = self.get_arguments('q') self.write(f"name>>>>{name},names>>>>>{names}") self.write("\n成功访问get请求")
post
async def post(self): name = self.get_argument('q') names = self.get_arguments('q') self.write(f"name>>>>{name},names>>>>>{names}") self.write("\n成功访问post请求")
返回具有给定名称的参数的值
 

get_arguments

⇒ get_query_arguments
返回具有定名称的参数列表
 

get_query_argument

从请求查询字符串中返回具有给定名称的参数的值
async def get(self): name = self.get_query_argument('q') names = self.get_query_arguments('q') self.write(f"name>>>>{name},names>>>>>{names}") self.write("成功访问get请求")
notion image

get_query_arguments

从请求查询字符串中返回具有给定名称的参数列表
 
notion image
 
 

get_body_argument

从请求主题中返回具有给定名称的参数的值
 
 

get_body_arguments

从请求主题中返回具有给定名称的参数列表
 
tips: URL字符串参数
可以使用 Content-Type为application-www-form-urlencoded 或者 multipart/form-data
如果是json无法获取数据
 
此时需要解析body里的请求体数据
我们使用get_body_argument以及get_argument都不行
notion image
找不到q这个属性
所以就要选择使用request
 
 

request

 
async def post(self): args = self.request.body.decode("utf-8") self.write(f'name>>>>{json.loads(args).get("q")}') self.write("\n成功访问post请求") # 执行完就结束了 await self.finish({ "msg": "成功访问" })
 
notion image
 

静态文件与跳转的使用

重定向Redirect

登录失败跳转首页
301 和 302 都是重定向 301 是永久重定向, 302 是暂时重定向
from tornado import web, ioloop class IndexHandler(web.RequestHandler): def get(self): self.finish("hello,tornado") class LoginHandler(web.RequestHandler): def get(self): # self.redirect(self.reverse_url()) # print(self.reverse_url("index")) self.redirect("/") if __name__ == '__main__': app = web.Application([ web.URLSpec("/",IndexHandler,name="index"), web.URLSpec("/login",LoginHandler,name="login"), ], debug=True) app.listen(8000) ioloop.IOLoop.current().start()
还有一种方法 可以完成redirect 跳转
from tornado.web import RedirectHandler
具体使用方法如下
from tornado import web, ioloop from tornado.web import RedirectHandler class IndexHandler(web.RequestHandler): def get(self): self.finish("hello,tornado") if __name__ == '__main__': app = web.Application([ web.URLSpec("/",IndexHandler,name="index"), web.URLSpec("/index",RedirectHandler,{"url":"/"}), ], debug=True) app.listen(8000) ioloop.IOLoop.current().start()
根据具体需求使用
 
 
 

静态文件访问

首先在目录新建文件 static 并且在文件中放一张图片
 
如下图配置
from tornado import web, ioloop from tornado.web import RedirectHandler class IndexHandler(web.RequestHandler): def get(self): self.finish("hello,tornado") if __name__ == '__main__': app = web.Application([ web.URLSpec("/", IndexHandler, name="index"), web.URLSpec("/index", RedirectHandler, {"url": "/"}), ], debug=True, static_path="./static/" ) app.listen(8000) ioloop.IOLoop.current().start()
 
notion image
 
我们进阶进行一下优化。
把 https://127.0.0.1:8000/static/VCG.jpg 修改成 https://127.0.0.1:8000/img/VCG.jpg
app = web.Application([ web.URLSpec("/", IndexHandler, name="index"), web.URLSpec("/index", RedirectHandler, {"url": "/"}), ], debug=True, static_path="./static/", static_url_prefix="/img/" )
 
然后就会发现application 里东西太多了 进而再优化
settings = { "debug": True, "static_path": "./static/", "static_url_prefix": "/img/", } app = web.Application([ web.URLSpec("/", IndexHandler, name="index"), web.URLSpec("/index", RedirectHandler, {"url": "/"}), ], **settings )
 
当然还可以通过静态控制器去实现
from tornado import web, ioloop from tornado.web import StaticFileHandler class IndexHandler(web.RequestHandler): def get(self): self.finish("hello,tornado") if __name__ == '__main__': app = web.Application([ web.URLSpec("/", IndexHandler, name="index"), ("/img/(.*)", StaticFileHandler, {"path": "./static/"}) ], debug=True, ) app.listen(8000) ioloop.IOLoop.current().start()
 

万能路径方法

所有都是通过相对路径
可以通过万能方法 去获取绝对路径
下面提供一种万能方法
import os path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") app = web.Application([ web.URLSpec("/", IndexHandler, name="index"), ("/img/(.*)", StaticFileHandler, {"path": path}) ], debug=True, ) app.listen(8000) ioloop.IOLoop.current().start()
 
 

templates 使用

简单创建个页面
 
from tornado import web, ioloop class IndexHandler(web.RequestHandler): def get(self): self.write("<h1>hello,tornado</h1>") if __name__ == '__main__': app = web.Application([ web.URLSpec("/", IndexHandler, name="index"), ], debug=True, ) app.listen(8000) ioloop.IOLoop.current().start()
 
notion image
可以看到self.write支持html写法
 
但是我们的初衷其实是写动态模板 所以。。。
class TemplateHandler(web.RequestHandler): def get(self): args = 'template1' t = template.Template(f"<h1>hello,{args}</h1>") self.write(t.generate())
 
但是这样好像和原本的没区别啊。所以。。。
class TemplateHandler(web.RequestHandler): def get(self): args = 'template2' t = template.Template("<h1>hello,{{arg}}</h1>") self.write(t.generate(arg=args))
 
notion image
 
但是我们不可能每次都访问一个html标签。所以我们肯定是会走文件的
所以。。。
 
新建html模板文件 名: templates
notion image
 
 
class TemplateHTMLHandler(web.RequestHandler): def get(self): args = 'template3' loader = template.Loader("./templates/") self.finish(loader.load("index.html").generate(arg=args))
html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>hello,{{arg}}</h1> </body> </html>
notion image
但是这样代码太多太烦了
所以。。。
 
from tornado import web, ioloop, template class TemplateBestHTMLHandler(web.RequestHandler): def get(self): args = 'template4' self.render("index.html", arg=args) if __name__ == '__main__': app = web.Application([ web.URLSpec("/4/", TemplateBestHTMLHandler), ], debug=True, template_path="templates" ) app.listen(8000) ioloop.IOLoop.current().start()
 
 
 
notion image
 
5种代码
from tornado import web, ioloop, template class IndexHandler(web.RequestHandler): def get(self): args = 'template' self.write(f"<h1>hello,{args}</h1>") class TemplateStrHandler(web.RequestHandler): def get(self): args = 'template1' t = template.Template(f"<h1>hello,{args}</h1>") self.write(t.generate()) class TemplateHandler(web.RequestHandler): def get(self): args = 'template2' t = template.Template("<h1>hello,{{arg}}</h1>") self.write(t.generate(arg=args)) class TemplateHTMLHandler(web.RequestHandler): def get(self): args = 'template3' loader = template.Loader("./templates/") self.finish(loader.load("index.html").generate(arg=args)) class TemplateBestHTMLHandler(web.RequestHandler): def get(self): args = 'template4' self.render("index.html", arg=args) if __name__ == '__main__': app = web.Application([ web.URLSpec("/", IndexHandler), web.URLSpec("/1/", TemplateStrHandler), web.URLSpec("/2/", TemplateHandler), web.URLSpec("/3/", TemplateHTMLHandler), web.URLSpec("/4/", TemplateBestHTMLHandler), ], debug=True, template_path="templates" ) app.listen(8000) ioloop.IOLoop.current().start()
 

案例

from tornado import web, ioloop, template class TemplateBestHTMLHandler(web.RequestHandler): def count_price(self, price, num): return price * num def get(self): orders = [ { "id": 1, "name": "MAC Pro 2060", "type": "32G", "price": 9999, "num": 1, "options": "<a href='delete?id=1'>删除</a>" }, { "id": 2, "name": "iPhone 15 Pro", "type": "6G", "price": 8999, "num": 1, "options": "<a href='delete?id=2'>删除</a>" }, { "id": 3, "name": "HUAWEI Mate60 Pro", "type": "12G", "price": 7999, "num": 1, "options": "<a href='delete?id=3'>删除</a>" }, { "id": 4, "name": "Sony 耳机", "type": "1G", "price": 999, "num": 1, "options":"<a href='delete?id=4'>删除</a>" }, ] self.render("index.html", orders=orders, count_price=self.count_price) if __name__ == '__main__': app = web.Application([ web.URLSpec("/", TemplateBestHTMLHandler), ], debug=True, template_path="templates", static_path="static" ) app.listen(8000) ioloop.IOLoop.current().start()
 
 
html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <table> {% set total = 0 %} {% for order in orders %} <tr> <td>名称:{{order['name']}}</td> <td>类型:{{order['type']}}</td> <td>价格:{{order['price']}}</td> <td>数量:{{order['num']}}</td> <td>总计:{{count_price(order['price'],order['num'])}}</td> <td> 其余:{% raw order['options'] %}</td> <div style="display: none"> {{ total = total + count_price(order['price'],order['num']) }} </div> </tr> {% end %} <p>总计 {{total}}</p> </table> </body> </html>
 

设置随机变量

{% set total = 0 %} {{ total = total + count_price(order['price'],order['num']) }} 总计 {{total}}
 
 

设置静态文件动态化

<link rel="stylesheet" href="{{ static_url('js/min.js') }}">
 

iF判断语句

{% if order['price'] < 1000 %} <td><strong>优惠力度很大</strong></td> {% else %} <td>价格:{{order['price']}}</td> {% end %}
notion image
 

Template继承使用

{% block content %} {% end %}
另一个页面
{% extends 'base.html' %} {% block content %} 其他内容 {% end %}
 
 

aiomysql的使用

 

安装

pip install aiomysql
 

使用

# -*- coding: utf-8 -*- # @Time : 2023/10/18 15:57 # @Author : lzc # @Email : hybpjx@163.com # @File : test_aio.py # @Software: PyCharm # @blog : https://www.cnblogs.com/zichliang import asyncio import aiomysql from tornado import ioloop # async def select_db(loop): # conn = await aiomysql.connect(host='127.0.0.1', port=3306, # user='root', password='admin*123', db='test', # loop=loop) # # cur = await conn.cursor() # await cur.execute("SELECT * FROM users") # print(cur.description) # r = await cur.fetchall() # print(r) # await cur.close() # conn.close() # # loop = asyncio.get_event_loop() # # loop.run_until_complete(select_db(loop)) async def select_db(): async with aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='admin*123', db='test') as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("select * from users;") tem = await cur.fetchall() print(tem) if __name__ == '__main__': ioloop.IOLoop.current().run_sync(select_db) # loop = asyncio.get_event_loop() # # loop.run_until_complete(select_db())
 
 

tornado 与aiomysql 整合

# -*- coding: utf-8 -*- # @Time : 2023/10/18 15:57 # @Author : lzc # @Email : hybpjx@163.com # @File : test_aio.py # @Software: PyCharm # @blog : https://www.cnblogs.com/zichliang import asyncio import aiomysql from tornado import ioloop, web class IndexHandler(web.RequestHandler): def initialize(self, mysql): self.mysql = mysql async def get(self): print(self.mysql) async with aiomysql.create_pool(host=self.mysql.get("host"), port=self.mysql.get("port"), user=self.mysql.get("user"), password=self.mysql.get("password"), db=self.mysql.get("db")) as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("select * from users;") tem = await cur.fetchall() print(tem) return self.finish({ "data":tem }) if __name__ == '__main__': settings = { "mysql": { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "admin*123", "db": "test" } } app = web.Application([ web.URLSpec("/", IndexHandler, {"mysql": settings.get("mysql")}, name="index"), ], debug=True, ) app.listen(8000) ioloop.IOLoop.current().start() # ioloop.IOLoop.current().run_sync(select_db) # loop = asyncio.get_event_loop() # # loop.run_until_complete(select_db())
 
 

增加数据

 
# -*- coding: utf-8 -*- # @Time : 2023/10/18 17:17 # @Author : lzc # @Email : hybpjx@163.com # @File : add.py # @Software: PyCharm # @blog : https://www.cnblogs.com/zichliang import aiomysql from tornado import ioloop, web class IndexHandler(web.RequestHandler): def initialize(self, mysql): self.mysql = mysql async def get(self): print(self.mysql) async with aiomysql.create_pool(host=self.mysql.get("host"), port=self.mysql.get("port"), user=self.mysql.get("user"), password=self.mysql.get("password"), db=self.mysql.get("db")) as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("select * from users;") tem = await cur.fetchall() print(tem) if not tem: tem = "暂无数据" return self.finish({ "data": tem }) async def post(self): username = self.get_argument("username") password = self.get_body_argument("password") nick_name = self.get_body_argument("nick_name") # await self.finish({ # "username": username, # "password": password, # "nick_name": nick_name, # }) args = [username, password, nick_name] async with aiomysql.create_pool(host=self.mysql.get("host"), port=self.mysql.get("port"), user=self.mysql.get("user"), password=self.mysql.get("password"), db=self.mysql.get("db")) as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("insert into users values(0,%s,%s,%s);", args) await conn.commit() _id = cur.lastrowid print(_id) args.insert(0, _id) await self.finish({ "data": args }) if __name__ == '__main__': settings = { "mysql": { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "admin*123", "db": "test" } } app = web.Application([ web.URLSpec("/", IndexHandler, {"mysql": settings.get("mysql")}, name="index"), ], debug=True, ) app.listen(8000) ioloop.IOLoop.current().start() # ioloop.IOLoop.current().run_sync(select_db) # loop = asyncio.get_event_loop() # # loop.run_until_complete(select_db())
 

修改数据

# -*- coding: utf-8 -*- # @Time : 2023/10/19 14:31 # @Author : lzc # @Email : hybpjx@163.com # @File : edit.py # @Software: PyCharm # @blog : https://www.cnblogs.com/zichliang import aiomysql from tornado import ioloop, web class IndexHandler(web.RequestHandler): def initialize(self, mysql): self.mysql = mysql async def get(self): print(self.mysql) async with aiomysql.create_pool(host=self.mysql.get("host"), port=self.mysql.get("port"), user=self.mysql.get("user"), password=self.mysql.get("password"), db=self.mysql.get("db")) as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("select * from users;") tem = await cur.fetchall() print(tem) if not tem: tem = "暂无数据" return self.finish({ "data": tem }) async def post(self, ): _id = self.get_body_argument("id","") username = self.get_body_argument("username","") password = self.get_body_argument("password","") nick_name = self.get_body_argument("nick_name","") async with aiomysql.create_pool(host=self.mysql.get("host"), port=self.mysql.get("port"), user=self.mysql.get("user"), password=self.mysql.get("password"), db=self.mysql.get("db"),charset="utf8") as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: if not _id: await cur.execute("insert into users values(0,'{}','{}','{}');".format(username, password, nick_name)) else: sql = "update users set username = '{}', password = '{}', nick_name = '{}' where id = '{}';" await cur.execute(sql.format(username, password, nick_name, _id)) await conn.commit() await self.finish({ "data": "ok" }) if __name__ == '__main__': settings = { "mysql": { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "admin*123", "db": "test" } } app = web.Application([ web.URLSpec("/", IndexHandler, {"mysql": settings.get("mysql")}, name="index"), ], debug=True, ) app.listen(8000) ioloop.IOLoop.current().start() # ioloop.IOLoop.current().run_sync(select_db) # loop = asyncio.get_event_loop() # # loop.run_until_complete(select_db())