nginx+uwsgi+djangorestframework+flower+celery+redis

nginx+uwsgi+djangorestframework+flower+celery+redis配置如下:

nginx server配置, 沒有https,注釋掉ssl開頭配置即可.

server {      listen 80;      listen 443 ssl;      server_name localhost;  # 域名        ssl_certificate   your crt;  # 證書crt      ssl_certificate_key  your key;  # 證書key      ssl_protocols TLSv1 TLSv1.1 TLSv1.2;      ssl_ciphers ECDHE-RSA-AES256-SHA384:AES256-SHA256:RC4:HIGH:!MD5:!aNULL:!eNULL:!NULL:!DH:!EDH:!AESGCM;      ssl_prefer_server_ciphers on;      ssl_session_cache shared:SSL:10m;      ssl_session_timeout 10m;      # 指定項目路徑uwsgi      location / {          include weixin_uwsgi_params; # 導入一個Nginx模組他是用來和uWSGI進行通訊的          uwsgi_connect_timeout 180; # 設置連接uWSGI超時時間          uwsgi_pass unix:/data/www/weixin_api/weixin.sock; # 指定uwsgi的sock文件所有動態請求就會直接丟給他      }        # 指定靜態文件路徑      location /static/ {          alias /data/www/weixin_api/static/;      }  }

安裝uwsgi

pip3 install uwsgi

創建一個uwsgi.ini配置文件,配置如下:

# uwsgi使用配置文件啟動  [uwsgi]  # 項目目錄  chdir=/data/www/weixin_api/  # 指定項目的application  wsgi-file=weixin_api/weixin/wsgi.py  # 指定sock的文件路徑  socket=/data/www/weixin_api/weixin.sock  # 進程個數  workers=8  pidfile=/data/www/weixin_api/script/uwsgi.pid  # 指定IP埠  http=0.0.0.0:8006  # 指定靜態文件  static-map=/static=/data/www/weixin_api/static  # 啟動uwsgi的用戶名和用戶組  uid=root  gid=root  # 啟用主進程  master=true  # 自動移除unix Socket和pid文件當服務停止的時候  vacuum=true  # 序列化接受的內容,如果可能的話  thunder-lock=true  # 啟用執行緒  enable-threads=true  # 設置自中斷時間  harakiri=30  # 設置緩衝  post-buffering=8192  # 設置日誌目錄  daemonize=/data/www/weixin_api/script/uwsgi.log  wsgi-file = /data/www/weixin_api/weixin/wsgi.py

uwsgi的關閉與啟動,可以寫一個shell腳本來控制

創建一個uwsgi_restart.sh,如下:

#!/bin/bash  API_INI="/data/www/API_rest_framework/script/uwsgi.ini"  WEIXIN_INI="/data/www/weixin_api/script/uwsgi.ini"  UWSGI="/usr/local/python36/bin/uwsgi"  PSID="ps aux | grep "uwsgi"| grep -v "grep" | wc -l"    if [ ! -n "$1" ]  then      content="Usages: sh uwsgiserver.sh [start|stop|restart]"      echo -e "33[31m $content 33[0m"      exit 0  fi     if [ $1 = start ]  then      if [ `eval $PSID` -gt 4 ]      then          content="uwsgi is running!"          echo -e "33[32m $content 33[0m"          exit 0      else          $UWSGI $API_INI          $UWSGI $WEIXIN_INI          content="Start uwsgi service [OK]"          echo -e "33[32m $content 33[0m"      fi     elif [ $1 = stop ];then      if [ `eval $PSID` -gt 4 ];then          killall -9 uwsgi      fi      content="Stop uwsgi service [OK]"      echo -e "33[32m $content 33[0m"  elif [ $1 = restart ];then      if [ `eval $PSID` -gt 4 ];then          killall -9 uwsgi      fi      $UWSGI --ini $API_INI      $UWSGI --ini $WEIXIN_INI      content="Restart uwsgi service [OK]"      echo -e "33[32m $content 33[0m"    else      content="Usages: sh uwsgiserver.sh [start|stop|restart]"      echo -e "33[31m $content 33[0m"  fi

djangorestframework安裝

pip3 install django

pip3 install djangorestframework

在settings配置文件的 installed_apps添加rest_framework

INSTALLED_APPS = [      'django.contrib.admin',      'django.contrib.auth',      'django.contrib.contenttypes',      'django.contrib.sessions',      'django.contrib.messages',      'django.contrib.staticfiles',      'application.apps.ApplicationConfig',      'rest_framework',  ]    REST_FRAMEWORK = {      'DEFAULT_VERSIONING_CLASS': "rest_framework.versioning.URLPathVersioning",      'DEFAULT_VERSION': 'v1',      'ALLOWED_VERSIONS': ['v1', 'v2'],      'VERSION_PARAM': 'version',      'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',      'PAGE_SIZE': 100,  # 默認分頁大小      'DEFAULT_RENDERER_CLASSES': ('rest_framework.renderers.JSONRenderer', ),  }

介面程式碼示例:

from rest_framework.views import APIView  from django.http import JsonResponse      class UploadFile(APIView):      def dispatch(self, request, *args, **kwargs):          """          請求到來之後,都要執行dispatch方法,dispatch方法根據請求方式不同觸發 get/post/方法          """          return super().dispatch(request, *args, **kwargs)        def get(self, request, *args, **kwargs):          pass        def post(self, request, *args, **kwargs):          pass

celery介紹

實時處理和任務調度的分散式任務隊列。

用戶使用 Celery 產生任務,借用中間人來傳遞任務,任務執行單元從中間人那裡消費任務。任務執行單元可以單機部署,也可以分散式部署,因此 Celery 是一個高可用的生產者消費者模型的非同步任務隊列。你可以將你的任務交給 Celery 處理,也可以讓 Celery 自動按 crontab 那樣去自動調度任務,然後去做其他事情,你可以隨時查看任務執行的狀態,也可以讓 Celery 執行完成後自動把執行結果告訴你。

使用Celery的常見場景如下:

1.高並發的請求任務。互聯網已經普及,人們的衣食住行中產生的交易都可以線上進行,這就避免不了某些時間極高的並發任務請求,如公司中常見的購買理財、學生繳費,在理財產品投放市場後、開學前的一段時間,交易量猛增,確認交易時間較長,此時可以把交易請求任務交給 Celery 去非同步執行,執行完再將結果返回給用戶。用戶提交後不需要等待,任務完成後會通知到用戶(購買成功或繳費成功),提高了網站的整體吞吐量和響應時間,幾乎不需要增加硬體成本即可滿足高並發。

2.定時任務。在雲計算,大數據,集群等技術越來越普及,生產環境的機器也越來越多,定時任務是避免不了的,如果每台機器上運行著自己的 crontab 任務,管理起來相當麻煩,例如當進行災備切換時,某些 crontab 任務可能需要單獨手工調起,給運維人員造成極大的麻煩,有了 Celery ,你可以集中管理所有機器的定時任務,而且災備無論何時切換,crontab 任務總能正確的執行。

3.非同步任務。 一些耗時較長的操作,比如 I/O 操作,網路請求,可以交給 Celery 去非同步執行,用戶提交後可以做其他事情,當任務完成後將結果返回用戶即可,可提高用戶體驗。比如發送簡訊/郵件、推送消息、清理/設置快取等

Celery 的優點

1. 純 Python 編寫,開源。這已經是站在巨人的肩膀上了,雖然 Celery 是由純 Python 編寫的,但協議可以用任何語言實現。迄今,已有 Ruby 實現的 RCelery 、node.js 實現的 node-celery 以及一個 PHP 客戶端 ,語言互通也可以通過 using webhooks 實現。

2. 靈活的配置。默認的配置已經滿足絕大多數需求,因此你不需要編寫配置文件基本就可以使用,當然如果有個性化地訂製,你可以選擇使用配置文件,也可以將配置寫在源程式碼文件里。

3. 方便監控。任務的所有狀態,均在你的掌握之下。

4. 完善的錯誤處理。

5. 靈活的任務隊列和任務路由。你可以非常方便地將一個任務運行在你指定的隊列上,這叫任務路由。

Celery 的架構:

Celery支援不同的方式存儲任務的結果,包括RabbitMQ,AMQP,Redis,memcached,MongoDb,SQLAlchemy等

celery使用-安裝:

環境是:centos-7.6 + python-3.6.8 + redis-5.0.4

pip install celery

pip install eventlet

pip install redis

目錄結構,在django settings目錄下,創建一個celery.py文件

├── weixin

│    ├── celery.py

│    ├── __init__.py

│    ├── __pycache__

│    ├── settings.py

│    ├── urls.py

│    └── wsgi.py

celery.py內容如下:

#!/usr/bin/env python  # -*- coding: utf-8 -*-  from __future__ import absolute_import, unicode_literals  import os  from celery import Celery    # set the default Django settings module for the 'API' program.  os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'weixin.settings')    app = Celery('celery')  # 設置時區  app.conf.timezone = 'Asia/Shanghai'  app.conf.enable_utc = False  #app.conf.enable_utc = True  # Using a string here means the worker don't have to serialize  # the configuration object to child processes.  # - namespace='CELERY' means all celery-related configuration keys  #   should have a `CELERY_` prefix.  # 使用django的settings設置  app.config_from_object('django.conf:settings')      # Load task modules from all registered Django app configs.  # Celery 會自動發現模組  app.autodiscover_tasks()      @app.task(bind=True)  def debug_task(self):      print('Request: {0!r}'.format(self.request))

同目錄下面更改__init__.py的內容如下:

from __future__ import absolute_import, unicode_literals    # This will make sure the app is always imported when  # Django starts so that shared_task will use this app.  from .celery import app as celery_app    __all__ = ['celery_app']

在django settings最後添加內容如下:

# django celery settings  # 如redis中設置有密碼,則需加上password,後面的/5 指的是使用redis的哪個庫  BROKER_URL = 'redis://:[email protected]:9999/5'  CELERY_RESULT_BACKEND = 'redis://:[email protected]:9999/5'  CELERY_ACCEPT_CONTENT = ['json']  CELERY_TASK_SERIALIZER = 'json'  CELERY_RESULT_SERIALIZER = 'json'  #CELERY_ENABLE_UTC = False  #CELERY_TIMEZONE = 'Asia/Shanghai'    # celery worker並發數  CELERYD_CONCURRENCY = 20    # 非常重要,有些情況下可以防止死鎖  CELERYD_FORCE_EXECV = True    # 每個worker最大執行任務數  CELERYD_MAX_TASKS_PER_CHILD = 100

創建celery job,目錄結構如下:

├── celery_job

│   │   ├── __init__.py

│   │   ├── send_mail.py

創建一個send_mail.py內容如下:

#!/usr/bin/env python  # encoding: utf-8  import smtplib  from email.mime.text import MIMEText  from email.mime.multipart import MIMEMultipart  from email.mime.application import MIMEApplication  from celery import task      @task(bind=True)  def send_email(self, html_path, file_name_path, send_to_user, send_mail_subject, send_mail_body):      """      發送郵件      :param file_name_path: html存放路徑      :param send_to_user: 收件人      :param send_mail_subject: 郵件主題      :param send_mail_body: excel是否轉html      :return:      """      send_to_user = send_to_user.split(',')      if send_mail_body != 'True':          html_content = ""      else:          with open(html_path, encoding='utf-8', mode='r') as fp:              html_content = fp.read()              fp.close()      mail_info = {          "from": "發件人郵箱",          "to": "收件人郵箱",          "hostname": "smtp hostname",          "username": "郵箱帳號",          "password": '郵箱密碼',          "mail_subject": send_mail_subject,  # 郵件主題名字          "mail_text": html_content,          "mail_encoding": "utf-8",          "mail_port": '587',      }      try:          msg = MIMEMultipart()          msg["Subject"] = mail_info.get('mail_subject')          msg["From"] = mail_info.get('from')          msg["To"] = ','.join(mail_info.get('to'))          part = MIMEText(mail_info.get('mail_text'), _subtype='html', _charset='utf-8')          msg.attach(part)            # xlsx類型附件          part = MIMEApplication(open(file_name_path, 'rb').read())          part.add_header('Content-Disposition', 'attachment', filename='{}.xlsx'.format(send_mail_subject))          msg.attach(part)          s = smtplib.SMTP(mail_info.get('hostname'), mail_info.get('mail_port'), timeout=20)          s.ehlo()          s.starttls()          s.login(mail_info.get('username'), mail_info.get('password'))  # 登陸伺服器          s.sendmail(mail_info.get('username'), mail_info.get('to'), msg.as_string())  # 發送郵件          # return {'status': 'success', 'msg': 'Send mail success'}          return True      except Exception as e:          """             郵件發送失敗,使用retry進行重試             retry的參數可以有:             exc:指定拋出的異常             throw:重試時是否通知worker是重試任務             eta:指定重試的時間/日期             countdown:在多久之後重試(每多少秒重試一次)             max_retries:最大重試次數           """          raise self.retry(exc=e, countdown=10, max_retries=6)

django views配置視圖函數

#!/usr/bin/env python  # encoding: utf-8  from rest_framework.views import APIView  from django.http import JsonResponse  import hashlib  import os  from application.celery_job.send_mail import send_email        class DataReport(APIView):      def __init__(self):          super().__init__()          self.key = 'your key'          # 允許訪問介面的郵箱          self.auth_user = ['xxxxx', 'xxxxx']        def dispatch(self, request, *args, **kwargs):          return super().dispatch(request, *args, **kwargs)        def get(self, request):          return JsonResponse({'status': 'error', 'msg': 'Is not get API'})        def post(self, request):          print(request.data)          domain_name = request.META['HTTP_HOST']  # 獲取域名          token = request.data.get('token')  # token          excel_file = request.FILES.get("excel_file")  # 獲取文件內容          send_mail = request.data.get("send_mail")  # 是否發郵件(True or False)          send_to_user = request.data.get('send_to_user')  # 收件人郵箱(一個或多個)          send_mail_subject = request.data.get('send_mail_subject')  # 郵件主題          send_mail_body = request.data.get('send_mail_body')  # 郵件正文_html(True or False)                   if not token:              return JsonResponse({'status': 'error', 'msg': 'Token cannot be empty'})          ret = self.auth_token(token)          if not ret:              return JsonResponse({'status': 'error', 'msg': 'Auth token error'})          if send_mail:              if not send_to_user:                  return JsonResponse({'status': 'error', 'msg': 'The recipient cannot be empty'})              if not send_mail_subject:                  return JsonResponse({'status': 'error', 'msg': 'Mail subject cannot be empty'})              # 發送郵件          if send_mail:              result3 = send_email.delay(html_path, file_name_path, send_to_user, send_mail_subject, send_mail_body)              task_id = result3.task_id          return JsonResponse({'status': 'success', 'task_id': task_id})        def auth_token(self, token):          """          驗證token是否有效          :param token:          :return: True or False          """          # 加密後的token_list          token_list = []          for i in self.auth_user:              m = hashlib.md5(i[::-1][::2].encode('utf-8'))              m.update(self.key.encode('utf-8'))              token_list.append(m.hexdigest())          for i in token_list:              if i == token:                  return True          else:              return False

django urls配置訪問路徑

from django.contrib import admin  from django.urls import path  from application.views.weixin_api.data_report import DataReport  from django.conf.urls import url  from django.views import static  from django.conf import settings    urlpatterns = [      path('admin/', admin.site.urls),      url(r'^static/(?P<path>.*)$', static.serve, {'document_root': settings.STATIC_ROOT}, name='static'),      url(r'^data_report$', DataReport().as_view(), name='data_report'),  ]

啟動程式:

nginx : /usr/local/nginx/sbin/nginx

uwsgi: /usr/local/python36/bin/uwsgi –ini /data/www/project/script/uwsgi.ini

redis: /usr/local/redis/redis-server /usr/local/redis/redis.conf

## -c 20 指得是並發數為20  

celery: celery -A /data/www/project/weixin worker  -l info -c 20 -P eventlet

安裝flower: 實時監控celery任務狀態

pip install flower

啟動flower

# –max_tasks 為頁面允許存儲的最大數

flower –port=7788 –broker=redis://:password@[email protected]:9999/5 –broker_api=redis://:[email protected]:9999/5  –max_tasks=1000000

執行任務,查看flower監控的狀態:

訪問頁面: http://ip:7788

查看任務狀態