帶你認識 flask 用戶通知
- 2019 年 12 月 10 日
- 筆記
01
私有消息
我要實現的私有消息功能非常簡單。當你訪問用戶的個人主頁時,會顯示一個可以向該用戶發送私有消息鏈接。該鏈接將帶你進入一個新的頁面,在新頁面中,可以在Web表單中發送消息。要閱讀發送給你的消息,頁面頂部的導航欄將會有一個新的「消息」鏈接,它會將你帶到與主頁或發現頁面相似的頁面,但不會顯示用戶動態,它會顯示其他用戶發送給你的消息。
以下小節介紹了實現此功能所需的各個步驟
02
私有消息數據庫支持
第一項任務是擴展數據庫以支持私有消息。這是一個新的Message
模型:
app/models.py:Message模型
class Message(db.Model): id = db.Column(db.Integer, primary_key=True) sender_id = db.Column(db.Integer, db.ForeignKey('user.id')) recipient_id = db.Column(db.Integer, db.ForeignKey('user.id')) body = db.Column(db.String(140)) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) def __repr__(self): return '<Message {}>'.format(self.body)
這個模型類與Post
模型相似,唯一的區別是有兩個用戶外鍵,一個用於發信人,另一個用於收信人。 User
模型可以獲得這兩個用戶的關係,以及一個新字段,用於指示用戶最後一次閱讀他們的私有消息的時間:
app/models.py:User模型對私有消息的支持
class User(UserMixin, db.Model): # ... messages_sent = db.relationship('Message', foreign_keys='Message.sender_id', backref='author', lazy='dynamic') messages_received = db.relationship('Message', foreign_keys='Message.recipient_id', backref='recipient', lazy='dynamic') last_message_read_time = db.Column(db.DateTime) # ... def new_messages(self): last_read_time = self.last_message_read_time or datetime(1900, 1, 1) return Message.query.filter_by(recipient=self).filter( Message.timestamp > last_read_time).count()
這兩個關係將返回給定用戶發送和接收的消息,並且在關係的Message
一側將添加author
和recipient
回調引用。我之所以使用author
回調而不是更適合的sender
,是因為通過使用author
,我可以使用我用於用戶動態的相同邏輯渲染這些消息。 last_message_read_time
字段將存儲用戶最後一次訪問消息頁面的時間,並將用於確定是否有比此字段更新時間戳的未讀消息。 new_messages()
輔助方法實際上使用這個字段來返回用戶有多少條未讀消息。在本章的最後,我將把這個數字作為頁面頂部導航欄中的一個漂亮的徽章。
完成了數據庫更改後,現在是時候生成新的遷移並使用它升級數據庫了:
(venv) $ flask db migrate -m "private messages" (venv) $ flask db upgrade
03
發送私有信息
下一步設計發送消息。我需要一個簡單的Web表單來接收消息:
app/main/forms.py:私有消息表單類
class MessageForm(FlaskForm): message = TextAreaField(_l('Message'), validators=[ DataRequired(), Length(min=0, max=140)]) submit = SubmitField(_l('Submit'))
而且我還需要在網頁上呈現此表單的HTML模板:
app/templates/send_message.html:發送私有消息HTML模板
{% extends "base.html" %} {% import 'bootstrap/wtf.html' as wtf %} {% block app_content %} <h1>{{ _('Send Message to %(recipient)s', recipient=recipient) }}</h1> <div class="row"> <div class="col-md-4"> {{ wtf.quick_form(form) }} </div> </div> {% endblock %}
接下來,我將添加一個新的 /send_message/<recipient> 路由來處理實際發送的私有消息:
app/main/routes.py:發送私有消息的視圖函數
from app.main.forms import MessageForm from app.models import Message # ... @bp.route('/send_message/<recipient>', methods=['GET', 'POST']) @login_required def send_message(recipient): user = User.query.filter_by(username=recipient).first_or_404() form = MessageForm() if form.validate_on_submit(): msg = Message(author=current_user, recipient=user, body=form.message.data) db.session.add(msg) db.session.commit() flash(_('Your message has been sent.')) return redirect(url_for('main.user', username=recipient)) return render_template('send_message.html', title=_('Send Message'), form=form, recipient=recipient)
這個視圖函數中的邏輯顯而易見。發送私有消息的操作只需在數據庫中添加一個新的「消息」實例即可。
將所有內容聯繫在一起的最後一項更改是在用戶個人主頁中添加上述路由的鏈接:
app/templates/user.html:個人主頁中添加發送私有消息的鏈接
{% if user != current_user %} <p> <a href="{{ url_for('main.send_message', recipient=user.username) }}"> {{ _('Send private message') }} </a> </p> {% endif %}
04
查看私有信息
這個功能的第二大部分是查看私有信息。為此,我添加另一條路由 /messages ,該路由與主頁和發現頁面非常相似,包括分頁的完全支持:
app/main/routes.py:查看消息視圖函數
@bp.route('/messages') @login_required def messages(): current_user.last_message_read_time = datetime.utcnow() db.session.commit() page = request.args.get('page', 1, type=int) messages = current_user.messages_received.order_by( Message.timestamp.desc()).paginate( page, current_app.config['POSTS_PER_PAGE'], False) next_url = url_for('main.messages', page=messages.next_num) if messages.has_next else None prev_url = url_for('main.messages', page=messages.prev_num) if messages.has_prev else None return render_template('messages.html', messages=messages.items, next_url=next_url, prev_url=prev_url)
我在這個視圖函數中做的第一件事是用當前時間更新User.last_message_read_time
字段。這會將發送給該用戶的所有消息標記為已讀。然後,我查詢消息模型以獲得消息列表,並按照最近的時間戳進行排序。我決定在這裡復用POSTS_PER_PAGE
配置項,因為用戶動態和消息的頁面看起來非常相似,但是如果發生了分歧,為消息添加單獨的配置變量也是有意義的。分頁邏輯與我用於用戶動態的邏輯完全相同,因此這對你來說應該很熟悉。
上面的視圖函數通過渲染一個新的 /app/templates/messages.html 模板文件結束,該模板如下:
app/templates/messages.html:查看消息HTML模板
{% extends "base.html" %} {% block app_content %} <h1>{{ _('Messages') }}</h1> {% for post in messages %} {% include '_post.html' %} {% endfor %} <nav aria-label="..."> <ul class="pager"> <li class="previous{% if not prev_url %} disabled{% endif %}"> <a href="{{ prev_url or '#' }}"> <span aria-hidden="true">←</span> {{ _('Newer messages') }} </a> </li> <li class="next{% if not next_url %} disabled{% endif %}"> <a href="{{ next_url or '#' }}"> {{ _('Older messages') }} <span aria-hidden="true">→</span> </a> </li> </ul> </nav> {% endblock %}
在這裡,我採取了另一個小技巧。我注意到除了Message
具有額外的recipient
關係(我不需要在消息頁面中顯示,因為它總是當前用戶),Post
和Message
實例具有幾乎相同的結構。所以我決定復用app/templates/_post.html子模板來渲染私有消息。出於這個原因,這個模板使用了奇怪的for循環for post in messages
,以便私有消息的渲染也可以套用到子模板上。
要讓用戶訪問新的視圖函數,導航頁面需要生成一個新的「消息」鏈接:
app/templates/base.html:導航欄中的消息鏈接
{% if current_user.is_anonymous %} ... {% else %} <li> <a href="{{ url_for('main.messages') }}"> {{ _('Messages') }} </a> </li> ... {% endif %}
該功能現已完成,但作為所有更改的一部分,還有一些新的文本被添加到幾個位置,並且需要將這些文本合併到語言翻譯中。第一步是更新所有的語言目錄:
(venv) $ flask translate update
然後,app/translations中的每種語言都需要使用新翻譯更新其messages.po文件。你可以在本項目的GitHub代碼庫中找到西班牙語翻譯,或者直接下載zip文件
05
靜態消息通知薇章
現在私有消息功能已經實現,但是還沒有通過任何渠道告訴用戶有私有消息等待閱讀。導航欄上的未讀消息標誌的最簡單實現可以使用Bootstrap badge小部件渲染到基礎模板中:
app/templates/base.html:導航欄的靜態消息通知徽章
... <li> <a href="{{ url_for('main.messages') }}"> {{ _('Messages') }} {% set new_messages = current_user.new_messages() %} {% if new_messages %} <span class="badge">{{ new_messages }}</span> {% endif %} </a> </li> ...
在這裡,我直接從模板中調用上面添加到User模型中的new_messages()
方法,並將該數字存儲在new_messages
模板變量中。然後,如果該變量不為零,我只需添加帶有該數字的徽章到消息鏈接後面即可。以下是這個頁面的外觀:

06
動態消息通知薇章
上一節介紹的解決方案是一種簡單的常規方式來顯示通知,但它有一個缺點,即徽章僅在加載新頁面時刷新。如果用戶花費很長時間閱讀一個頁面上的內容而沒有點擊任何鏈接,那麼在該時間內出現的新消息將不會顯示,直到用戶最終點擊鏈接並加載新頁面。
為了讓這個應用程序對我的用戶更有用,我希望徽章自行更新未讀消息的數量,而用戶不必點擊鏈接並加載新頁面。上一節的解決方案的一個問題是,當加載頁面時消息計數為非零時,徽章才在頁面中渲染。更方便的是始終在導航欄中包含徽章,並在消息計數為零時將其標記為隱藏。這樣可以很容易地使用JavaScript顯示徽章:
app/templates/base.html:使用JavaScript渲染的友好未讀消息徽章
<li> <a href="{{ url_for('main.messages') }}"> {{ _('Messages') }} {% set new_messages = current_user.new_messages() %} <span id="message_count" class="badge" style="visibility: {% if new_messages %}visible {% else %}hidden {% endif %};"> {{ new_messages }} </span> </a> </li>
使用此版本的徽章時,我總是將其包含在內,但當new_messages
非零時,visibility
CSS屬性設置為visible
;否則設置為hidden
。我還為表示徽章的元素添加了一個id
屬性,以便使用$('#message_count')
jQuery選擇器來簡化這個元素的選取。
接下來,我編寫一個簡短的JavaScript函數,將該徽章更新為最新的數字:
app/templates/base.html:導航欄中的動態消息通知徽章
... {% block scripts %} <script> // ... function set_message_count(n) { $('#message_count').text(n); $('#message_count').css('visibility', n ? 'visible' : 'hidden'); } </script> {% endblock %}
這個新的set_message_count()
函數將設置徽章元素中的消息數量,並調整可見性,以便在計數為0時隱藏徽章
07
向客戶端發送消息通知
現在剩下的就是增加一種機制,通過這種機制,客戶端可以定期接收有關用戶擁有的未讀消息數量的更新。當更新發生時,客戶端將調用set_message_count()
函數來使用戶知道更新。
實際上有兩種方法可以讓服務器將這些更新告知客戶端,而且你可能會猜到,這兩種方法都有優點和缺點,因此選擇哪種方法很大程度上取決於項目。在第一種方法中,客戶端通過發送異步請求定期向服務器請求更新。來自此請求的響應是更新列表,客戶端可以使用這些更新來更新頁面的不同元素,例如未讀消息計數標記。第二種方法需要客戶端和服務器之間的特殊連接類型,以允許服務器自由地將數據推送到客戶端。請注意,無論採用哪種方法,我都希望將通知視為通用實體,以便我可以擴展此框架以支持除未讀消息徽章以外的其他類型的事件。
第一種解決方案最大的優點是易於實施。我需要做的只是嚮應用程序添加另一條路由,例如 /notifications ,它返回JSON格式的通知列表。然後客戶端應用程序遍歷通知列表並將必要的更改應用於頁面。該解決方案的缺點是實際事件和通知之間會有延遲,因為客戶端會定期請求通知列表。例如,如果客戶端每10秒鐘詢問一次通知,則可能延遲10秒接收通知。
第二個解決方案需要在協議級別進行更改,因為HTTP沒有服務器主動向客戶端發送數據的任何規定。到目前為止,實現服務器推送消息的最常見方式是擴展服務器以支持除HTTP之外的WebSocket連接。WebSocket是一種不同於HTTP的協議,在服務器和客戶端之間建立永久連接。服務器和客戶端可以隨時向對方發送數據,而無需另一方請求。這種機制的優點是,無論何時發生客戶感興趣的事件,服務器都可以發送通知,而不會有任何延遲。缺點是WebSocket需要比HTTP更複雜的設置,因為服務器需要與每個客戶端保持永久連接。想像一下,例如有四個worker進程的服務器通常可以服務幾百個HTTP客戶端,因為HTTP中的連接是短暫的並且不斷被回收。而相同的服務器只能處理四個WebSocket客戶端,在絕大多數情況下,這會導致資源緊張。正是由於這種限制,WebSocket應用程序通常圍繞異步服務器進行設計,因為這種服務器在管理大量worker和活動連接方面效率更高。
好消息是,不管你使用什麼方法,在客戶端你都會有一個回調函數,它將被更新列表調用。因此,我可以從第一個解決方案開始,該解決方案實施起來要容易得多,如果發現不足,可以遷移到WebSocket服務器,該服務器可以配置為調用相同的客戶端回調。在我看來,對於這種類型的應用,第一種解決方案實際上是可以接受的。基於WebSocket的實現對於需要以接近零延遲傳遞更新的應用程序非常有用。
這裡有一些業界的類似案例。Twitter也使用的是第一種導航欄通知的方法;Facebook使用稱為長輪詢的HTTP變體,它解決了直接輪詢的一些限制,同時仍然使用HTTP請求;Stack Overflow和Trello這兩個站點使用WebSocket來實現通知機制。你可以通過查看瀏覽器調試器的「Network」選項卡來查找任何網站上發生的後台活動請求
我們繼續實施輪詢解決方案。首先,我要添加一個新模型來跟蹤所有用戶的通知,以及用戶模型中的關係
app/models.py:通知模型
import json from time import time # ... class User(UserMixin, db.Model): # ... notifications = db.relationship('Notification', backref='user', lazy='dynamic') # ... class Notification(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(128), index=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) timestamp = db.Column(db.Float, index=True, default=time) payload_json = db.Column(db.Text) def get_data(self): return json.loads(str(self.payload_json))
通知將會有一個名稱,一個關聯的用戶,一個Unix時間戳和一個有效載荷。時間戳默認從time.time()
函數中獲取。每種類型的通知都會有所不同,所以我將它寫為JSON字符串,因為這樣可以編寫列表,字典或單個值(如數字或字符串)。為了方便,我添加了get_data()
方法,以便調用者不必操心JSON的反序列化。
這些更改需要包含在新的數據庫遷移中:
(venv) $ flask db migrate -m "notifications" (venv) $ flask db upgrade
為了方便,我將新增的Message
和Notification
模型添加到shell上下文,這樣我就可以直接在用flask shell
命令啟動的解釋器中使用這兩個模型了
microblog.py: 添加Message和Notification模型到shell上下文
# ... from app.models import User, Post, Notification, Message # ... @app.shell_context_processor def make_shell_context(): return {'db': db, 'User': User, 'Post': Post, 'Message': Message, 'Notification': Notification}
我還將在用戶模型中添加一個add_notification()
輔助方法,以便更輕鬆地處理這些對象:
app/models.py:Notification模型
class User(UserMixin, db.Model): # ... def add_notification(self, name, data): self.notifications.filter_by(name=name).delete() n = Notification(name=name, payload_json=json.dumps(data), user=self) db.session.add(n) return n
此方法不僅為用戶添加通知給數據庫,還確保如果具有相同名稱的通知已存在,則會首先刪除該通知。我將要使用的通知將被稱為unread_message_count
。如果數據庫已經有一個帶有這個名稱的通知,例如值為3,則當用戶收到新消息並且消息計數變為4時,我就會替換舊的通知
在任何未讀消息數改變的地方,我需要調用add_notification()
,以便我更新用戶的通知,這樣的地方有兩處。首先,在send_message()
視圖函數中,當用戶收到一個新的私有消息時:
app/main/routes.py:更新用戶通知
@bp.route('/send_message/<recipient>', methods=['GET', 'POST']) @login_required def send_message(recipient): # ... if form.validate_on_submit(): # ... user.add_notification('unread_message_count', user.new_messages()) db.session.commit() # ... # ...
第二個地方是用戶轉到消息頁面時,未讀計數需要歸零:
app/main/routes.py:查看消息視圖函數
@bp.route('/messages') @login_required def messages(): current_user.last_message_read_time = datetime.utcnow() current_user.add_notification('unread_message_count', 0) db.session.commit() # ...
既然用戶的所有通知都保存在數據庫中,那麼我可以添加一條新路由,客戶端可以使用該路由為登錄用戶檢索通知:
app/main/routes.py:通知視圖函數
from app.models import Notification # ... @bp.route('/notifications') @login_required def notifications(): since = request.args.get('since', 0.0, type=float) notifications = current_user.notifications.filter( Notification.timestamp > since).order_by(Notification.timestamp.asc()) return jsonify([{ 'name': n.name, 'data': n.get_data(), 'timestamp': n.timestamp } for n in notifications])
這是一個相當簡單的函數,它返回一個包含用戶通知列表的JSON負載。每個通知都以包含三個元素的字典的形式給出,即通知名稱,與通知有關的附加數據(如消息數量)和時間戳。通知按照從創建時間順序進行排序。
我不希望客戶重複發送通知,所以我給他們提供了一個選項,只請求給定時間戳之後產生的通知。 since
選項可以作為浮點數包含在請求URL的查詢字符串中,其中包含開始時間的unix時間戳。如果包含此參數,則只有在此時間之後發生的通知才會被返回。
完成此功能的最後一部分是在客戶端實現實際輪詢。最好的做法是在基礎模板中實現,以便所有頁面自動繼承該行為:
app/templates/base.html:輪詢通知
... {% block scripts %} <script> // ... {% if current_user.is_authenticated %} $(function() { var since = 0; setInterval(function() { $.ajax('{{ url_for('main.notifications') }}?since=' + since).done( function(notifications) { for (var i = 0; i < notifications.length; i++) { if (notifications[i].name == 'unread_message_count') set_message_count(notifications[i].data); since = notifications[i].timestamp; } } ); }, 10000); }); {% endif %} </script>
該函數包含在一個模板條件中,因為我只想在用戶登錄時輪詢新消息。對於沒有登錄的用戶,這個函數將不會被渲染。
你已經在第二十章中看到了jQuery的$(function() { ...})
模式。 這是註冊一個函數在頁面加載後執行的方式。 對於這個功能,我需要在頁面加載時做的是設置一個定時器來獲取用戶的通知。 你還看到了setTimeout()
JavaScript函數,它在等待特定時間之後運行作為參數給出的函數。 setInterval()
函數使用與setTimeout()
相同的參數,但不是一次性觸發定時器,而是定期調用回調函數。 本處,我的間隔設置為10秒(以毫秒為單位),所以我將以每分鐘大約六次的頻率查看通知是否有更新。
利用定期計時器和Ajax,該函數輪詢新通知路由,並在其完成回調中迭代通知列表。 當收到名為unread_message_count
的通知時,通過調用上面定義的函數和通知中給出的計數來調整消息計數徽章。
我處理since
參數的方式可能會令人困惑。 我首先將這個參數初始化為0。 參數總是包含在請求URL中,但是我不能像以前那樣使用Flask的url_for()
來生成查詢字符串,因為一次請求中url_for()
只在服務器上運行一次,而我需要since
參數動態更新多次。 第一次,這個請求將被發送到 /notifications?since=0 ,但是一旦我收到通知,我就會將since
更新為它的時間戳。 這可以確保我不會收到重複的內容,因為我總是要求收到自我上次看到的通知以來發生的新通知。 同樣重要的是要注意,我在interval函數外聲明since
變量,因為我不希望它是局部變量,我想要在所有調用中使用相同的變量。
最簡單的測試方法是使用兩種不同的瀏覽器A和B。 在兩個瀏覽器上使用不同的用戶登錄Microblog。 然後從A瀏覽器向B瀏覽器上的用戶發送一個或多個消息。 B瀏覽器的導航欄應更新為顯示你在10秒鐘內發送的消息數量。 而當你點擊消息鏈接時,未讀消息數重置為零。