首页 技术 正文
技术 2022年11月8日
0 收藏 659 点赞 1,891 浏览 5753 个字

引入

 Web端即时通讯技术:即时通讯技术简单的说就是实现这样一种功能:服务器端可以即时地将数据的更新或变化反应到客户端,例如消息即时推送等功能都是通过这种技术实现的。但是在Web中,由于浏览器的限制,实现即时通讯需要借助一些方法。这种限制出现的主要原因是,一般的Web通信都是浏览器先发送请求到服务器,服务器再进行响应完成数据的现实更新。

  实现Web端即时通讯的方法:实现即时通讯主要有四种方式,它们分别是轮询、长轮询(comet)、长连接(SSE)、WebSocket。它们大体可以分为两类,一种是在HTTP基础上实现的,包括短轮询、comet和SSE;另一种不是在HTTP基础上实现是,即WebSocket。下面分别介绍一下这四种轮询方式,以及它们各自的优缺点。

当我们要实现一个实时投票系统,或者是实时通讯系统,我们的页面数据总需要更新

我们不能让用户一直去刷新页面。所以就有了轮询,长轮询,以及websock的出现

轮询

既然我想要实时获取后端的数据,那我就每隔2秒给后端发一次请求

这种我们就叫轮询~那它会有一些缺点就是存在延时~就算每秒发一次~也会存在一定的延迟

下面我们看下轮询的代码:

from flask import Flask, render_template, request, jsonifyapp = Flask(__name__)USERS = {
1: {"name": "悟空", "count": 0},
2: {"name": "悟能", "count": 0},
3: {"name": "悟净", "count": 0},}@app.route("/")
def index():
return render_template("index.html", users=USERS)@app.route("/vote", methods=["POST"])
def vote():
uid = request.json.get("uid")
USERS[uid]["count"] += 1
return "投票成功"@app.route("/get_vote")
def get_vote():
return jsonify(USERS)if __name__ == '__main__':
app.run()

app.py

<!DOCTYPE html>
<html lang="en"><head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Title</title>
<script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script>
<script src="https://cdn.bootcss.com/axios/0.19.0-beta.1/axios.js"></script></head>
<body>
<h1>选丑大赛</h1><ul>
{% for key, value in users.items()%}
<li id="{{key}}" onclick="vote({{key}})">{{value.name}} ({{value.count}})</li>
{% endfor %}
</ul><script>
function vote(uid) {
axios.request({
url: "/vote",
method: "POST",
data: {
"uid": uid
}
}).then(function (res) {
console.log(res.data)
})
} function get_vote() {
axios.request({
url: "/get_vote",
method: "GET"
}).then(function (res) {
console.log(res)
for(let key in res.data){
let liEle = document.getElementById(key);
let username = res.data[key]["name"]
let count = res.data[key]["count"]
liEle.innerText = `${username} (${count})`
}
})
} window.onload = function () {
setInterval(get_vote, 2000)
}</script></body>
</html>

index.html

长轮询

轮询缺点就是延迟,那么如果前端发送过来请求,如果没有数据的更新

后端的请求就阻塞了,直到有数据返回或者超时再返回,这样延迟就可以得到很好的解决

python中有个queue对象,当我们从这个队列里拿不到值的时候,可以阻塞住请求的

import queue
from flask import Flaskapp = Flask(__name__)
q = queue.Queue()@app.route("/get")
def index():
try:
val = q.get(timeout=20)
except queue.Empty:
val = "超时"
return val@app.route("/vote")
def vote():
q.put("")
return "投票成功"if __name__ == '__main__':
app.run()

queueDemo.py

如果我为每个请求都建立一个q对象,然后阻塞住他们的请求,有数据更新的时候,给他们的q对象返回值就可以了。

from flask import Flask, render_template, request, jsonify, session
import queue
import uuidapp = Flask(__name__)
app.secret_key = "lajdgia"USERS = {
1: {"name": "悟空", "count": 0},
2: {"name": "悟能", "count": 0},
3: {"name": "悟净", "count": 0},}
# 为每个用户建立一个q对象
# 以用户的uuid为key 值为q对象
Q_DICT = {}@app.route("/")
def index():
user_uuid = str(uuid.uuid4())
session["user_uuid"] = user_uuid
Q_DICT[user_uuid] = queue.Queue()
return render_template("index2.html", users=USERS)@app.route("/vote", methods=["POST"])
def vote():
# 投票 循环q对象的dict 给每个q对象返回值
uid = request.json.get("uid")
USERS[uid]["count"] += 1
for q in Q_DICT.values():
q.put(USERS)
return "投票成功"@app.route("/get_vote", methods=["POST", "GET"])
def get_vote():
# 获取投票结果 去自己的q对象里取值 没有夯住 知道有或者超时返回
user_uuid = session.get("user_uuid")
q = Q_DICT[user_uuid]
try:
users = q.get(timeout=30)
except queue.Empty:
users = ""
return jsonify(users)if __name__ == '__main__':
app.run()

app2.py

<!DOCTYPE html>
<html lang="en"><head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Title</title>
<script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script>
<script src="https://cdn.bootcss.com/axios/0.19.0-beta.1/axios.js"></script></head>
<body>
<h1>选丑大赛</h1><ul>
{% for key, value in users.items()%}
<li id="{{key}}" onclick="vote({{key}})">{{value.name}} ({{value.count}})</li>
{% endfor %}
</ul><script>
function vote(uid) {
axios.request({
url: "/vote",
method: "POST",
data: {
"uid": uid
}
}).then(function (res) {
console.log(res.data)
})
} function get_votes() {
axios.request({
url: "/get_vote",
method: "POST"
}).then(function (res) {
console.log(res);
if(res.data != ""){
for(let key in res.data){
let liEle = document.getElementById(key);
let username = res.data[key]["name"]
let count = res.data[key]["count"]
liEle.innerText = `${username} (${count})`
}
}
get_votes()
})
} window.onload = function () {
get_votes()
}</script></body>
</html>

index2.html

websocket

websocket是一个协议,协议规定

连接的时候需要握手,发送的数据需要加密~~连接之后不断开

Flask不带websocket,我们需要下载

下载:pip install gevent-websocket

from flask import Flask, request, render_template
from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
import jsonapp = Flask(__name__)USERS = {
1: {"name": "悟空", "count": 0},
2: {"name": "悟能", "count": 0},
3: {"name": "悟净", "count": 0},}@app.route("/")
def index():
return render_template("index3.html", users=USERS)WEBSOCKET_LIST = []
@app.route("/vote")
def vote():
ws = request.environ.get("wsgi.websocket")
if not ws:
return "HTTP请求"
WEBSOCKET_LIST.append(ws)
while True:
uid = ws.receive()
if not uid:
WEBSOCKET_LIST.remove(ws)
ws.close()
break
uid = int(uid)
USERS[uid]["count"] += 1
name = USERS[uid]["name"]
new_count = USERS[uid]["count"]
for client in WEBSOCKET_LIST:
client.send(json.dumps({"uid": uid, "name": name, "count": new_count}))if __name__ == '__main__':
http_server = WSGIServer(('127.0.0.1', 5000), app, handler_class=WebSocketHandler)
http_server.serve_forever()

app3.py

<!DOCTYPE html>
<html lang="en"><head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Title</title>
<script src="https://cdn.bootcss.com/axios/0.19.0-beta.1/axios.js"></script></head>
<body>
<h1>选丑大赛</h1><ul>
{% for (key, value) in users.items() %}
<li onclick="vote({{key}})" id="{{key}}">{{value.name}} ({{value.count}})</li>
{% endfor%}</ul><script>
let ws = new WebSocket('ws://127.0.0.1:5000/vote') function vote(uid) {
ws.send(uid)
}
ws.onmessage = function (event) {
let data = JSON.parse(event.data);
let liEle = document.getElementById(data.uid);
liEle.innerText = `${data.name} (${data.count})`
}</script></body>
</html>

index3.html

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,999
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,511
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,357
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,140
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,770
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,848