玩转redis

本文总结了:
redis的一般使用场景
常见操作,及如何实现
如何在python中实现这些操作

redis是非关系型数据库,NoSQL 不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。
redis和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set –有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,Redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件。并且在此基础上实现了master-slave(主从)同步。

redis有以下特点:

  • 不遵循SQL标准。
  • 不支持ACID。
  • 远超于SQL的性能。

安装

下载源码

打开reids官网:点击这里,下载最新稳定版本。

上传到centos7

  1. 可以使用wget命令直接下载
  2. 也可以使用ftp,scp等命令上传
  3. 还可以借助xftp、Winscp、MobaXterm等工具上传

解决依赖

  1. 检查有无C 语言的编译环境
    输入命令:gcc --version,假如提示命令不存在的话,进行第2步:安装gcc。
    检查gcc的版本,如版本过低则建议先升级gcc。
  2. 安装gcc
    yum install -y gcc

编译安装与启动

  1. 解tar包
    把包放到/opt目录下,使用tar -zxvf redis.xxx.tar.gz (文件名根据自己的来),解包。

  2. 在cd到redis里面执行以下命令

    make
    
    make install
    

    假如提示“致命错误”,可以执行make distclean

  3. 安装完成
    redis已经被安装到/usr/local/bin目录下

  4. 启动
    方法一:直接输入redis-sever在前台使用,但是不推荐,
    方法二:

    • 将/opt/redis-xx文件中的redis.conf文件,复制一份配置文件到/etc/cp redis.conf /etc/
    • 使用vim命令,将daemonize no改成daemonize yes,保存退出。
    • 使用redis-server /etc/redis.conf启动

设置自启动

注意:要保证redis.conf中的守护进程daemonize no改成yes

  1. /opt/redis-xx/utils/redis_init_script复制到/etc/init.d/下,并改名为redisd:

    cp redis_init_script /etc/init.d/redisd
    
  2. 编辑redisd

    #!/bin/sh
    #
    # Simple Redis init.d script conceived to work on Linux systems
    # as it does use of the /proc filesystem.
    
    ### BEGIN INIT INFO
    # Provides:     redis_6379
    # Default-Start:        2 3 4 5
    # Default-Stop:         0 1 6
    # Short-Description:    Redis data structure server
    # Description:          Redis data structure server. See //redis.io
    ### END INIT INFO
    
    REDISPORT=6379
    EXEC=/usr/local/bin/redis-server
    CLIEXEC=/usr/local/bin/redis-cli
    
    PIDFILE=/var/run/redis_${REDISPORT}.pid
    CONF="/etc/redis/${REDISPORT}.conf"
    
    case "$1" in
    	start)
    		if [ -f $PIDFILE ]
    		then
    				echo "$PIDFILE exists, process is already running or crashed"
    		else
    				echo "Starting Redis server..."
    				$EXEC $CONF
    		fi
    		;;
    	stop)
    		if [ ! -f $PIDFILE ]
    		then
    				echo "$PIDFILE does not exist, process is not running"
    		else
    				PID=$(cat $PIDFILE)
    				echo "Stopping ..."
    				$CLIEXEC -p $REDISPORT shutdown
    				while [ -x /proc/${PID} ]
    				do
    					echo "Waiting for Redis to shutdown ..."
    					sleep 1
    				done
    				echo "Redis stopped"
    		fi
    		;;
    	*)
    		echo "Please use start or stop as first argument"
    		;;
    esac
    

    需要修改三个配置EXEC、CLIEXEC、CONF,它们分别代表的意思是:
    EXEC:redis-server路径
    CLIEXEC:redis-cli客户端路径
    CONF:conf配置文件路径

  3. 把redisd添加到chkconfig

chkconfig --add redisd

使用chkconfig --list命令查看自启动服务也可以使用systemctl is-enabled redisd.service命令查看。
假如没有自启动,使用chkconfig redisd on打开自启动。

命令与配置

见我之前的博客:redis数据类型、命令及配置文件

redis发布与订阅

简单实现

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。

  1. 订阅端
    通过执行命令SUBSCRIBE 频道名 [频道名] 监听一个或多个频道,当发布者往频道中发布消息时,客户端就能收到消息。
    客户端订阅频道

  2. 发布端
    打开另一台客户端,往指定频道中发送消息。publish channel1 message
    发布消息

例子

注:发布的消息没有持久化,订阅的客户端只能收到订阅后发布的消息。

使用python客户端实现

此内容需要熟悉python的redis模块
主要是连接方式,操作数据的命令几乎和redis的标准命令一模一样
连接方式分:直接redis连接和连接池连接,详见:使用python来操作redis用法详解redis.Redis与redis.StrictRedis区别

  1. 订阅端

    # subscribe.py
    
    import redis
    
    # 使用连接池,且自动decode
    pool = redis.ConnectionPool(host="192.168.43.128", port=6379, decode_responses=True)
    r = redis.StrictRedis(connection_pool=pool)
    
    # 第一步
    p = r.pubsub()
    
    # 第二步
    p.subscribe("channel1", "channel2")
    
    # 第三步
    for item in p.listen():
    	print(item)
    	# {'type': 'message', 'pattern': None, 'channel': 'channel1', 'data': '1234'}
    
    	print("正在监听频道:{}".format(item['channel']))
    	if item['type'] == 'message':
    		data = item['data']
    		print("频道 {} 发来新消息:{}".format(item['channel'], data))
    		if data == 'exit':
    			print(item['channel'], '发布结束')
    			p.unsubscribe(item['channel'])
    			print("已取消订阅")
    
    
  2. 发布端

    # publish.py
    
    import redis
    
    pool = redis.ConnectionPool(host="192.168.43.128", port=6379, password=None)
    r = redis.StrictRedis(connection_pool=pool)
    
    while True:
    	inp = input("channel and message>>>").strip()
    	if not inp: continue
    	temp = inp.split(" ")
    	if len(temp) < 2:
    		print("formate error, formate:channel message}")
    		continue
    	channel, *inp = temp
    	msg = " ".join(inp)
    
    	# 和在redis-cli中的一样
    	r.publish(channel=channel, message=msg)
    	if msg == "exit":
    		print("%s 结束发布" % channel)
    
    

redis事务

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断
Redis事务的主要作用就是串联多个命令防止别的命令插队。

redis事务主要靠Multi、Exec、discard这三个命令完成
Multi:执行后,将之后输入的命令都会依次进入命令队列中
Exec:执行后,将之前的命令队列中的命令依次执行
组队的过程中可以通过discard来放弃组队。

示意图

python实现redis事务,在下文中。

事务的错误处理

事务的错误可能发生在Multi之后到Exec之前,或Exec之后。Redis对于这两种发生错误的处理,是不一样的,详见下图所示。
两种情况

由此,我们可以得知,Redis事务的三特性:

  1. 单独的隔离操作
    事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  2. 没有隔离级别的概念
    队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
  3. 不保证原子性
    事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

事务冲突

我们知道,redis在进行CURD操作数据时是采用单线程+IO多路复用的模式多个命令串行执行,但redis是支持多客户端连接的,所以在处理连接时是多线程的,所以高并发的情况下,客户端之间会存在资源竞争。当多个客户端并发操作同一Key值时,就会产生类似于多线程操作的现象,造成事务冲突。

举个例子:
假如余额为10000,在某个时间段,有三个请求进来了:
一个请求想给金额减8000
一个请求想给金额减5000
一个请求想给金额减1000

那么,它就有可能造成以下情况:
可能的情况

为此,我们需要对请求加上或者使用lua脚本

悲观锁

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
示意图

简要流程:

  1. 当第一个请求到来,请求减8000
  2. 其它请求阻塞,直到第一个请求处理完
  3. 然后是第二个、第三个…

优点:对于开发者而言会十分简单
缺点:使用悲观锁后,数据库的性能有所下降,因为大量的线程都会被阻塞,而且需要有大量的恢复过程,需要进一步改变算法以提高系统的并发能力。

乐观锁

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量Redis就是利用这种check-and-set机制实现事务的
示意图

实现
使用命令:

  1. WATCH key [key …]
    multi之前执行,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断
  2. UNWATCH
    取消 WATCH 命令对所有 key 的监视。
    如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。

watch打断事务

优点:使用乐观锁有助于提高并发性能
缺点:

1.由于版本号冲突,乐观锁导致多次请求服务失败的概率大大提高,在特定的场合,可能造成库存遗留问题,可以使用”重入”来提高成功的概率,但相对复杂了,其性能也会随着版本号冲突的概率提升而提升,并不稳定。
2. 大量的 SQL 被执行,对于数据库的性能要求较高,容易引起数据库性能的瓶颈。

使用python redis模块操作redis

from redis import WatchError
import redis


# 使用连接池,且自动decode
pool = redis.ConnectionPool(host="192.168.43.128", port=6379, decode_responses=True)
r = redis.StrictRedis(connection_pool=pool)

num = 8000
# pipeline:def pipeline(self, transaction=True, shard_hint=None)
# 默认开启事务
with r.pipeline() as pipe:
    try:
        pipe.watch('money')
        current_value = pipe.get('money')

        if int(current_value) >= num:
            next_value = int(current_value) - num
            # 开启事务
            pipe.multi()
			
			# input("阻塞中....")
			# 可以去掉上面这行注释,测试一下,watch的值被修改后,是否被打断
			
            # 修改值
            pipe.set('money', next_value)
            pipe.execute()
            print("修改成功")
        else:
            print("余额不足")
    except WatchError:
        print("被打断了,操作失败")

实例

以秒杀商品案例为实例,探讨一下如何使用事务。

  1. 两个key,保存秒杀数据:

    • 商品库存
      由于保存剩余的商品个数

      set sk:10001:qt 100
      
      # 10001表示的产品的id,qt表示库存
      # 这样就可以批量管理产品了
      
    • 秒杀者清单
      保存已经成功秒杀的用户id

      sadd sk:10001:user 1001
      
      # 10001表示的产品的id,user表示用户,1001表示用户id
      
  2. 写一个flask应用,用于测试

    from flask import Flask, render_template, jsonify, request
    from redis import WatchError
    import redis
    import random
    import time
    
    app = Flask(__name__)
    
    pool = redis.ConnectionPool(host="192.168.43.128", port="6379", password=None)
    r = redis.StrictRedis(connection_pool=pool)
    
    
    @app.get("/")
    def index():
    	return render_template("index.html")
    
    
    @app.post("/seckill/")
    def sec_kill():
    	res = {
    		"status": 200,
    		"msg": "秒杀成功!"
    
    	}
    	# 拿到产品id
    	pd_id = request.form.get("pdid")
    	# 随机生成一个用户id,用于模拟
    	random.seed(time.time())
    	user_id = random.randint(1, 1000)
    
    	pd_key = "sk:%s:qt" % pd_id
    	user_key = "sk:%s:user" % pd_id
    	with r.pipeline() as pipe:
    
    		try:
    			pipe.watch(pd_key)
    			if not pipe.exists(pd_key):
    				# 不存在,则还没开始
    				res["status"] = 400
    				res["msg"] = "秒杀还未开始!"
    				return jsonify(res)
    
    			# 已经开始
    			# 判断秒杀是否结束
    
    			# 注意返回的是字节,需要decode
    
    			pd_count = pipe.get(pd_key).decode()
    			pd_count = int(pd_count)  # 懒得进行异常处理了
    
    			if pd_count < 1:
    				res["status"] = 401
    				res["msg"] = "秒杀已经结束了!"
    				return jsonify(res)
    			# 判断用户是否在集合中
    
    			if pipe.sismember(user_key, user_id):
    				print(user_key)
    				res["status"] = 403
    				res["msg"] = "你已经秒杀成功了!"
    				return jsonify(res)
    
    			# 正式开始
    
    			pipe.multi()
    			pipe.decr(pd_key)
    			pipe.sadd(user_key, user_id)
    			pipe.execute()
    
    		except WatchError:
    			res["status"] = 500
    			res["msg"] = "服务器繁忙,请稍后再试!"
    			return jsonify(res)
    
    	return jsonify(res)
    
    
    if __name__ == '__main__':
    	app.run()
    
    

    这是index.html文件:

    <!DOCTYPE html>
    <html lang="en">
    <head>
    	
    	<title>一元秒杀专场</title>
    </head>
    <body>
    <div>产品10001 亿元秒杀 <span onclick="sendSecKill('10001')" style="background-color: cadetblue">立刻抢购</span></div>
    <div id="showResponse"></div>
    
    <script>
    	function sendSecKill(pdid) {
    		// 使用原生js,发送ajax请求
    		let url = "/seckill/";
    		let request;
    
    		if (window.XMLHttpRequest) {
    			request = new XMLHttpRequest();  // Chrome, mozilla
    		} else if (window.ActiveXObject) {
    			request = new ActiveXObject("Microsoft.XMLHTTP");  // IE
    		}
    		request.onreadystatechange = function () {
    			if (request.readyState === 4) {
    				let jsonObj = JSON.parse(request.responseText);  // 解析json数据
    				document.getElementById("showResponse").innerText = jsonObj.msg;  // 显示在页面上
    
    			}
    		}
    		request.open("POST", url, true);
    		request.setRequestHeader('Content-type', 'application/x-www-form-urlencoded');
    		// 发送数据
    		request.send("pdid="+pdid);
    
    	}
    
    </script>
    </body>
    </html>
    
  3. 一个客户端测试

     set sk:10001:qt 10
    
     # 设置10个库存
    

    然后使用浏览器访问。
    秒杀

  4. 高并发测试

    • 方案一:使用工具ab模拟测试
      点击ad命令压力测试查看。

    • 方案二:自己写个小脚本

      import requests
      import json
      from concurrent.futures import ThreadPoolExecutor
      
      
      def sk_func():
      	data = {
      		"pdid": "10001"
      	}
      
      	res = requests.post(url="//127.0.0.1:5000/seckill/", data=data)
      	return res
      
      
      def sk_call_back(future):
      	res = future.result().text
      	print(json.loads(res))
      
      
      # 10个线程
      # 200个请求
      with ThreadPoolExecutor(10) as pool:
      	for _ in range(200):
      		task = pool.submit(sk_func)
      		task.add_done_callback(sk_call_back)
      
      

      示意图

  5. 存在的问题
    当10个连接秒杀库存为10的产品时,最后产品可能会剩余。这就是库存遗留的问题,如图。这是因为有些连接同时请求修改数据,但watch的key的值发生了改变,所以库存没能减少。
    库存遗留
    遇到这种问题,要么重入,要么让用户再次请求,要么不使用乐观锁使用lua脚本。

lua脚本

Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。lua语言比较简单,感兴趣的可以点击lua教程学习。

Lua脚本在redis的作用:

  • LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
  • 利用lua脚本淘汰用户,解决超卖问题。
  • 通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题,所以没有库存遗留问题。

注意:redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

使用lua脚本完成秒杀实例

python redis使用lua脚本的方式:

import redis

r = redis.Redis("127.0.0.1")

lua = """
local key = KEYS[1]
local field = ARGV[1]
local timestamp_new = ARGV[2]

-- 做什么都行
return 1;
"""

cmd = r.register_script(lua)

# 传参并接受返回值
res = cmd(keys=['k1'], args=['a1', 123])
print(res)

正式开始:

  1. flask代码

    from flask import Flask, render_template, jsonify, request
    import redis
    import random
    import time
    
    app = Flask(__name__)
    
    pool = redis.ConnectionPool(host="192.168.43.128", port="6379", password=None)
    r = redis.StrictRedis(connection_pool=pool)
    
    
    @app.get("/")
    def index():
    	return render_template("index.html")
    
    
    @app.post("/seckill/")
    def sec_kill():
    	res = {
    		"status": 200,
    		"msg": "秒杀成功!"
    
    	}
    	# 拿到产品id
    	pd_id = request.form.get("pdid")
    	# 随机生成一个用户id,用于模拟
    	random.seed(time.time_ns())
    	user_id = random.randint(1, 1000)
    
    	with open("./secKill.lua", encoding="utf-8") as f:
    		cmd = r.register_script(f.read())
    		
    		# 传参
    		code = cmd(keys=[user_id, pd_id])
    
    	# code是int,所以不需要数据转换
    	if code == 3:
    		# 未开始
    		res["status"] = 400
    		res["msg"] = "秒杀还未开始!"
    	elif code == 2:
    		# 已经秒杀成功了
    		res["status"] = 403
    		res["msg"] = "你已经秒杀成功了!"
    	elif code == 1:
    		# 秒杀成功,默认即可
    		pass
    	elif code == 0:
    		# 秒杀结束
    		res["status"] = 401
    		res["msg"] = "秒杀已经结束了!"
    
    	else:
    		# 脚本返回值错误了
    		res["status"] = 500
    		res["msg"] = "未知错误!"
    	return jsonify(res)
    
    
    if __name__ == '__main__':
    	app.run()
    
    
  2. lua脚本

    -- ./secKill.lua
    local userid=KEYS[1];
    local prodid=KEYS[2];
    -- ..是字符串拼接
    local qtkey="sk:"..prodid..":qt";
    local usersKey="sk:"..prodid..":user";
    local qtExists=redis.call("exists",qtkey);
    local userExists=redis.call("sismember",usersKey,userid);
    -- 已经秒杀未开始
    if tonumber(qtExists)==0 then
      return 3;
    end
    -- 已经秒杀成功
    if tonumber(userExists)==1 then
      return 2;
    end
    local num= redis.call("get" ,qtkey);
    -- 秒杀结束
    if tonumber(num)<=0 then
      return 0;
    else -- 秒杀成功
      redis.call("decr",qtkey);
      redis.call("sadd",usersKey,userid);
    end
    return 1;
    

    一些说明:
    local 声明变量
    ..是lua拼接字符串的语法
    if ... then .. end是判断语句
    redis.call()是在调用redis的命令,第一个参数传命令,其它的参数传命令的参数
    tonumber类型转换成number

  3. 效果
    使用脚本测试,已经没有了库存遗留的问题了。
    效果图

redis持久化

Redis 提供了2个不同形式的持久化方式。

  • RDB(Redis DataBase)
  • AOF(Append Of File)

RDB,简而言之,就是在不同的时间点,将 redis 存储的数据生成快照并存储到磁盘等介质上;
AOF,则是换了一个角度来实现持久化,那就是将 redis 执行过的所有写指令记录下来,在下次 redis 重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复了。
其实 RDB 和 AOF 两种方式也可以同时使用,在这种情况下,如果 redis 重启的话,则会优先采用 AOF 方式来进行数据恢复,这是因为 AOF 方式的数据恢复完整度更高。
如果你没有数据持久化的需求,也完全可以关闭 RDB 和 AOF 方式,这样的话,redis 将变成一个纯内存数据库,就像 memcache 一样。

官方推荐两个都启用。
如果对数据不敏感,可以选单独用RDB。
不建议单独用 AOF,因为可能会出现Bug。
如果只是做纯内存缓存,可以都不用。
若RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,只保留save 900 1即可。

RDB

执行 rdb 持久化时, Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,可以优先考虑使用RDB方式持久化。

优点:

  • rdb文件体积比较小, 适合备份及传输
  • 性能会比 aof 好(aof 需要写入日志到文件中)
  • rdb 恢复比 aof 要更快
  • 适合大规模的数据恢复

缺点:

  • 服务器故障时会丢失最后一次备份之后的数据
  • Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
  • 虽然Redis在fork时使用了写时复制技术,但是如果数据庞大时还是比较消耗性能。

Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术”。
一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。

rdb持久化流程

要使用redis的RDB持久化,有两种方式。
一、使用配置文件,自动执行。
二、在redis-cli中主动执行命令,进行持久化。

自动持久化

Redis要自动持久化,需要配置什么时候执行持久化操作,以及在哪保存rdb文件。关于rdb的配置,主要有以下几个:

  • save
    它有两个参数,第一个参数是秒数,第二个是写操作次数
    save 60 10000是指,在60秒内修改了10000次。
    save配置

  • dbfilename
    持久化后的rdb文件名,默认即可。
    dbfilename配置

  • dir
    rdb文件的保存路径,默认为启动Redis时的路径,也可以修改成一个固定的路径。
    dir配置

    此配置在AOF时也要用到,aof文件的目录也是这个

  • stop-writes-on-bgsave-error
    当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes。
    stop-writes-on-bgsave-error配置

  • rdbcompression
    对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。推荐yes.

    rdbcompression配置

  • rdbchecksum
    在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能。推荐yes.
    rdbchecksum配置

配置完文件后,
重启redis,在规定的时间内修改数据,我们就会发现:在指定dir目录下就会生成rdb文件。(dir和rdb文件名都是上面配置过的)

rdb持久化示意图

手动持久化

手动持久化,有两个命令:

  • save
    只管保存,其它不管,全部阻塞,不建议使用。
  • bgsave
    Redis会在后台异步进行快照操作, 快照同时还可以响应客户端请求。

可通过lastsave命令获取最后一次成功执行快照的时间
另外,执行flushall命令,也会产生rdb文件,但里面是空的,无意义

还原rdb数据

只要dir目录下有rdb文件,在redis启动时,会自动读取。

这里的dir和rdb文件,就是配置文件中的dir和dbfilename

RDB核心函数

RDB 功能最核心的是rdbSaverdbLoad两个函数, 前者用于生成 RDB 文件到磁盘, 而后者则用于将 RDB 文件中的数据重新载入到内存中。

rdbSave与rdbLoad

  • rdbSave
    rdbSave 函数负责将内存中的数据库数据以 RDB 格式保存到磁盘中,如果 RDB 文件已存在,那么新的 RDB 文件将替换已有的 RDB 文件。
    在保存 RDB 文件期间,主进程会被阻塞,直到保存完成为止。
    SAVE 和 BGSAVE 两个命令都会调用 rdbSave 函数,但它们调用的方式各有不同:
    SAVE 直接调用 rdbSave ,阻塞 Redis 主进程,直到保存完成为止。在主进程阻塞期间,服务器不能处理客户端的任何请求。
    BGSAVE 则 fork 出一个子进程,子进程负责调用 rdbSave ,并在保存完成之后向主进程发送信号,通知保存已完成。因为 rdbSave 在子进程被调用,所以 Redis 服务器在 BGSAVE 执行期间仍然可以继续处理客户端的请求。
  • rdbLoad
    当 Redis 服务器启动时, rdbLoad 函数就会被执行, 它读取 RDB 文件, 并将文件中的数据库数据载入到内存中。
    在载入期间, 服务器每载入 1000 个键就处理一次所有已到达的请求,
    不过只有 PUBLISH 、 SUBSCRIBE 、 PSUBSCRIBE 、UNSUBSCRIBE 、 PUNSUBSCRIBE 五个命令的请求会被正确地处理, 其他命令一律返回错误。
    等到载入完成之后, 服务器才会开始正常处理所有命令。

AOF

aof以日志的形式来记录每个操作(增量保存),注意:读操作是不会被保存的, 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

优点:

  • 备份机制更稳健,丢失数据概率更低。
  • 可读的日志文本,通过操作AOF稳健,可以处理误操作。
    缺点:
  • 比起RDB占用更多的磁盘空间。
  • 恢复备份速度要慢。
  • 每次读写都同步的话,有一定的性能压力。
  • 存在个别Bug,造成恢复不能。

AOF配置

  • appendonly
    是否开启AOF,默认no
    appendonly配置
  • appendfilename
    aof文件的文件名
    appendfilename配置
  • dir
    aof文件的保存路径,默认为启动Redis时的路径,可以修改成一个固定的路径。
    dir配置
  • appendfsync
    AOF的同步策略。,有三种策略:

    1. always
      始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好。
    2. everysec
      每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。
    3. no
      不主动进行同步,把同步时机交给操作系统。
      appendfsync配置

AOF恢复数据、修复aof文件

  • 恢复数据
    1. 修改默认的appendonly no,改为yes
    2. 将有数据的aof文件复制一份保存到对应目录(配置中的dir)
    3. 恢复:重启redis,然后重新加载

假如aof文件损坏,可以尝试用以下方法修复:

  • 修复aof文件
    1. 修改默认的appendonly no,改为yes
    2. 备份被写坏的AOF文件
    3. 通过/usr/local/bin/redis-check-aof–fix appendonly.aof进行修复aof文件
    4. 恢复:重启redis,然后重新加载

AOF Rewrite

AOF持久化流程如下:

  1. 客户端的请求写命令会被append追加到AOF缓冲区内;
  2. AOF缓冲区根据AOF持久化策略,将操作同步到磁盘的AOF文件中;
  3. AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
  4. Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;
    aof持久化流程

什么是重写
AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集。

既然当AOF文件过大时,redis就会执行重写操作,那么aof文件多大是过大呢?以及redis是如何实现重写的呢?

何时重写

先看看一些配置:

auto-aof-rewrite-percentage:设置重写的基准值的百分数,默认100%
auto-aof-rewrite-min-size:设置重写的基准值的大小,默认64MB
默认配置
no-appendfsync-on-rewrite
yes时,不写入aof文件只写入缓存,用户请求不会阻塞,但是在这段时间如果宕机会丢失这段时间的缓存数据。(降低数据安全性,提高性能)
no时,把数据往磁盘里刷,但是遇到重写操作,可能会发生阻塞。(数据安全,但是性能降低)
详见:Redis的AOF配置redis的no-appendfsync-on-rewrite参数

注意:系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size

有了以上配置和变量,我们就可以得出了重写的条件:

  1. AOF文件大小 >= base_size +base_size * auto-aof-rewrite-percentage (默认100%)
  2. AOF文件大小 >= auto-aof-rewrite-min-size (默认64mb)
  3. 同时满足这两个条件时进行重写

例如:文件达到70MB开始重写,降到50MB,下次什么时候开始重写? ==> 100MB

如何重写

  1. bgrewriteaof触发重写
    判断是否当前有bgsave或bgrewriteaof在运行,如果有,则等待该命令结束后再继续执行。
  2. 主进程fork出子进程执行重写操作,保证主进程不会阻塞。
  3. 子进程遍历redis内存中数据到临时文件
    客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失。
  4. 子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。
    主进程把aof_rewrite_buf中的数据写入到新的AOF文件。
  5. 使用新的AOF文件覆盖旧的AOF文件,完成AOF重写。
    重写流程

也可手动执行bgrewriteaof命令,后台完成Rewrite

只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上。

redis主从复制

Redis虽然读取写入的速度都特别快,但是也会产生读压力特别大的情况。为了分担读压力,Redis支持主从复制。所谓的主从复制是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master),后者称为从节点(slaver);数据的复制是单向的,只能由主节点到从节点。所以从节点不能进行写操作。
示意图
默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

优点:

  • 读写分离,性能扩展
  • 故障恢复,当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复

一般使用

一主多从

通过配置slaveof实现:slaveof ip 端口
下面使用一个虚拟机启动4个redis服务来演示(一主三从):
图示

  1. 修改配置文件
    将redis.conf中的daemonize设为yes
    注释掉port、pidfile、logfile、rdbfilename、appendfilename
    然后再在redis.conf的目录下使用vim命令,配置4个文件

    # redis-6379.conf
    
    include ./redis.conf
    
    port 6379
    pidfile /var/run/redis_6379.pid
    dbfilename dump6379.rdb
    appendfilename appendonly6379.aof
    
    
    # redis-6380.conf
    
    include ./redis.conf
    
    port 6380
    pidfile /var/run/redis_6380.pid
    dbfilename dump6380.rdb
    appendfilename appendonly6380.aof
    
    slaveof 192.168.43.128 6379
    
    
    # redis-6381.conf
    
    include ./redis.conf
    
    port 6381
    pidfile /var/run/redis_6381.pid
    dbfilename dump6381.rdb
    appendfilename appendonly6381.aof
    
    slaveof 192.168.43.128 6379
    
    
    # redis-6382.conf
    
    include ./redis.conf
    
    port 6382
    pidfile /var/run/redis_6382.pid
    dbfilename dump6382.rdb
    appendfilename appendonly6382.aof
    
    slaveof 192.168.43.128 6379
    
    

    slaveof 指定的是master的ip和端口

  2. 启动redis服务

    redis-server redis-6379.conf
    redis-server redis-6380.conf
    redis-server redis-6381.conf
    redis-server redis-6382.conf
    
  3. 查看效果
    连接上6379端口的服务器

    redis-cli -p 6379
    

    输入info replication
    示意图

    info replication 打印主从复制的相关信息
    假如不是这个效果,请检查防火墙

这种搭建方式的缺点:master挂掉后,slave不能继续工作

6380从节点

薪火相传

上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。该模式的风险是一旦某个slave宕机,后面的slave都没法备份。
当使用slaveof命令时,会清除之前的数据,重新建立拷贝最新的。

薪火相传

master宕机后,任何slave都不可以写数据
6380端口的主机可以使用slaveof no one命令,断开Master
断开后,6380端口的主机就可以写数据,6381端口的主机可以读取6380端口主机的数据
Master恢复后,6380端口的主机需要使用slaveof声明自己的主机

哨兵模式

使用上述方式实现时在主节点宕机后,整个redis都无法正常工作了。主要原因是redis在master挂掉后不知道选谁作为自己的主节点。所以只要我们在配置中配置好每个节点的优先级,再启动一个哨兵。当主节点宕机后,哨兵根据优先级,选一个优先级高的节点当master,再通知其它节点应该把谁当作master,这样就解决了主机宕机的问题了。

注:原主节点恢复后,会成为slave,而不会重新成为master, 除非也会成为当前master再次宕机,且它的优先级最高。

确定优先级

我们手动去配置哪个机器的优先级是多少。需要用到的配置是slave-priority,它在选举主机时使用,它的值是一个数,值越小,优先级越高,。默认100。
下面我们再修改配置文件:

[[email protected] myredis]# cat redis-*
# 6379
include ./redis.conf

port 6379
pidfile /var/run/redis_6379.pid
dbfilename dump6379.rdb
appendfilename appendonly6379.aof

slave-priority 100


# 6380
include ./redis.conf

port 6380
pidfile /var/run/redis_6380.pid
dbfilename dump6380.rdb
appendfilename appendonly6380.aof

slaveof 192.168.43.128 6379

slave-priority 100

# 6381

include ./redis.conf

port 6381
pidfile /var/run/redis_6381.pid
dbfilename dump6381.rdb
appendfilename appendonly6381.aof

slaveof 192.168.43.128 6379

slave-priority 100


# 6382

include ./redis.conf

port 6382
pidfile /var/run/redis_6382.pid
dbfilename dump6382.rdb
appendfilename appendonly6382.aof

slaveof 192.168.43.128 6379

slave-priority 100

配置哨兵

创建一个sentinel.conf文件(名字绝不能错)

sentinel [‘sentɪn(ə)l] 为哨兵、守卫的意思

填写以下内容:

# 守护进程设为no的话,就是前台运行,可以查看日志输出
daemonize yes
# sentinel  monitor 被监控的名称  host  port  1
monitor mymaster 192.168.43.128 6379 1

monitor 为监控的意思
mymaster 是一个自定义的名字
192.168.43.128为master的ip
6379 为master的端口
1表示要有多少个哨兵认为主服务器不可用的时候,才会进行failover操作

关于哨兵的其它配置,见此文:哨兵模式配置文件中的全部配置

启动并测试

  • 启动哨兵

    redis-sentinel sentinel.conf
    

    redis-sentinel

  • 启动redis-server

    redis-server redis-6379.conf
    redis-server redis-6380.conf
    redis-server redis-6381.conf
    redis-server redis-6382.conf
    

当master挂掉后,等10秒左右,哨兵会重新选出master并未新的master分配slave,此过程会修改原来redis和哨兵的配置文件

假如想进一步学习,可以查看此文Redis哨兵模式(sentinel)学习总结及部署记录(主从复制、读写分离、主从切换)

优缺点

  • 优点

    1. 哨兵集群,基于主从复制模式,所有的主从配置的优点,它都有。
    2. 主从可以切换,故障可以转移,系统的可用性就会更好。
    3. 哨兵模式就是主从模式的升级版,从收到到自动,更加健壮。
  • 缺点

    1. Redis不好在线扩容,集群容量一旦达到上限,在线扩容就会十分麻烦。
    2. 实现哨兵模式的配置比较麻烦,并且其中有很多选项。

主从复制原理

  1. Slave启动成功连接到master后会发送一个sync命令
  2. Master接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,master将传送整个数据文件到slave,以完成一次完全同步。

示意图

复制时用到了两个概念:
全量复制与增量复制

  • 全量复制:slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  • 增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步。但是只要是重新连接master,全量复制将被自动执行。

redis集群

集群,即Redis Cluster,是Redis 3.0开始引入的分布式存储方案。
集群由多个节点(Node)组成,Redis的数据分布在这些节点中。集群中的节点分为主节点和从节点:只有主节点负责读写请求和集群信息的维护;从节点只进行主节点数据和状态信息的复制。

Redis集群的作用有下面几点:

  • 数据分区:突破单机的存储限制,将数据分散到多个不同的节点存储;
  • 负载均衡:每个主节点都可以处理读写请求,提高了并发能力;
  • 高可用:集群有着和哨兵模式类似的故障转移能力,提升集群的稳定性;

例子

下面将以6个redis-server服务,组成3个一主一仆的集群。
实现效果

  1. 创建配置文件

    include ./redis.conf
    # redis.conf是原本的redis配置文件
    
    daemonize yes
    port 6379
    pidfile "/var/run/redis_6379.pid"
    dbfilename "dump6379.rdb"
    appendfilename "appendonly6379.aof"
    protected-mode no
    save 3600 1
    
    
    # cluster
    
    # 是否打开集群模式
    cluster-enabled yes
    
    # 设定节点配置文件名,自动生成
    cluster-config-file nodes-6379.conf
    
    # 定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。
    cluster-node-timeout 15000
    

    关键在于cluster-enabledcluster-config-filecluster-node-timeout这三个配置
    使用sed命令,替换部分内容,生成其它节点的配置文件:

    cat redis-6379.conf | sed "s/6379/6380/g" > redis-6380.conf
    cat redis-6379.conf | sed "s/6379/6381/g" > redis-6381.conf
    cat redis-6379.conf | sed "s/6379/6382/g" > redis-6382.conf
    cat redis-6379.conf | sed "s/6379/6383/g" > redis-6383.conf
    cat redis-6379.conf | sed "s/6379/6384/g" > redis-6384.conf
    

    目录

  2. 分别启动6个redis-server
    效果

  3. 创建集群
    注:这是redis5之后的方法,之前的版本需要安装ruby环境,然后使用redis-trib.rb命令执行,可参考此文:使用Ruby脚本搭建集群

    使用redis-cli --cluster create xxx命令创建集群

    redis-cli --cluster create --cluster-replicas 1 192.168.43.128:6379 192.168.43.128:6380 192.168.43.128:6381 192.168.43.128:6382 192.168.43.128:6383 192.168.43.128:6384
    
    # 建议使用真实IP
    

    –cluster-replicas表示每个主节点有几个从节点
    redis-cli –cluster代替了之前的redis-trib.rb,我们无需安装ruby环境即可直接使用它附带的所有功能:创建集群、增删节点、槽迁移、完整性检查、数据重平衡等等。

    假如报错了: Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0.
    可以尝试以下步骤:

    1. 关闭所有redis服务
    2. 删除所有rdb、aof文件以及刚生成的nodes-xxxx.conf
    3. 重启redis-server
    4. 再次执行命令

    最后根据提示输入“yes”即可完成搭建。
    示意图

slots

现在我们已经搭建了一个集群,redis集群的一个特点就是去中心化,也就是说,无论你连接的是哪台redis服务器操作哪个key,它都能找到key对应的redis服务器,然后操作。这一切都是Redis利用slot(插槽)自动完成的。
一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽,其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。集群中的每个节点负责处理一部分插槽。
举个例子, 如果一个集群可以有主节点, 其中:
节点 A 负责处理 0 号至 5460 号插槽。
节点 B 负责处理 5461 号至 10922 号插槽。
节点 C 负责处理 10923 号至 16383 号插槽。

在客户端使用cluster nodes命令可以查看哪个主节点对应的插槽范围。
cluster nodes

集群命令

使用redis-cli -c 表明采用集群策略连接,如redis-cli -c -h 192.168.43.128 -p 6379

集群

  • cluster info
    打印集群的信息
  • cluster nodes
    列出集群当前已知的所有节点( node),以及这些节点的相关信息。

节点

  • cluster meet <ip> <port>
    将 ip 和 port 所指定的节点添加到集群当中,让它成为集群的一份子。
  • cluster forget <node_id>
    从集群中移除 node_id 指定的节点。
  • cluster replicate <master_node_id>
    将当前从节点设置为 node_id 指定的master节点的slave节点。只能针对slave节点操作。
  • cluster saveconfig
    将节点的配置文件保存到硬盘里面。

槽(slot)

  • cluster addslots <slot> [slot ...]
    将一个或多个槽( slot)指派( assign)给当前节点。
  • cluster delslots <slot> [slot ...]
    移除一个或多个槽对当前节点的指派。
  • cluster flushslots
    移除指派给当前节点的所有槽,让当前节点变成一个没有指派任何槽的节点。
  • cluster setslot <slot> node <node_id>
    将槽 slot 指派给 node_id 指定的节点,如果槽已经指派给另一个节点,那么先让另一个节点删除该槽>,然后再进行指派。
  • cluster setslot <slot> migrating <node_id>
    将本节点的槽 slot 迁移到 node_id 指定的节点中。
  • cluster setslot <slot> importing <node_id>
    从 node_id 指定的节点中导入槽 slot 到本节点。
  • cluster setslot <slot> stable
    取消对槽 slot 的导入( import)或者迁移( migrate)。

  • cluster keyslot <key>
    计算键 key 应该被放置在哪个槽上。
  • cluster countkeysinslot <slot>
    返回槽 slot 目前包含的键值对数量。
  • cluster getkeysinslot <slot> <count>
    返回 count 个 slot 槽中的键 。

集群维护

这部分内容指的就是,在已经搭建好集群情况下,对集群的节点增删改。
redis5之前的版本,操作看这里:Redis Cluster日常操作命令梳理
之前说过从redis5开始,redis-cli --cluster代替了之前的redis-trib.rb。使用help命令:

详细的解释,及各个命令如何使用,见此文章:Redis 5.0 redis-cli –cluster help说明

[[email protected] myredis]# redis-cli --cluster help
Cluster Manager Commands:
  create         host1:port1 ... hostN:portN
                 --cluster-replicas <arg>
  check          host:port
                 --cluster-search-multiple-owners
  info           host:port
  fix            host:port
                 --cluster-search-multiple-owners
                 --cluster-fix-with-unreachable-masters
  reshard        host:port
                 --cluster-from <arg>
                 --cluster-to <arg>
                 --cluster-slots <arg>
                 --cluster-yes
                 --cluster-timeout <arg>
                 --cluster-pipeline <arg>
                 --cluster-replace
  rebalance      host:port
                 --cluster-weight <node1=w1...nodeN=wN>
                 --cluster-use-empty-masters
                 --cluster-timeout <arg>
                 --cluster-simulate
                 --cluster-pipeline <arg>
                 --cluster-threshold <arg>
                 --cluster-replace
  add-node       new_host:new_port existing_host:existing_port
                 --cluster-slave
                 --cluster-master-id <arg>
  del-node       host:port node_id
  call           host:port command arg arg .. arg
                 --cluster-only-masters
                 --cluster-only-replicas
  set-timeout    host:port milliseconds
  import         host:port
                 --cluster-from <arg>
                 --cluster-from-user <arg>
                 --cluster-from-pass <arg>
                 --cluster-from-askpass
                 --cluster-copy
                 --cluster-replace
  backup         host:port backup_directory
  help

For check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.

Cluster Manager Options:
  --cluster-yes  Automatic yes to cluster commands prompts

[[email protected] myredis]#

添加节点

  1. 你得先有节点
    修改配置文件:

    cat redis-6384.conf | sed "s/6384/6385/" > redis-6385.conf
    cat redis-6384.conf | sed "s/6384/6386/" > redis-6386.conf
    

    启动服务:

    redis-server redis-6385.conf
    redis-server redis-6386.conf
    
    
  2. 添加主节点

    redis-cli --cluster add-node 192.168.43.128:6385  192.168.43.128:6379
    
    # 192.168.43.128:6385是新的节点
    # 192.168.43.128:6379是旧的节点,用于找你加入的是哪个集群
    

    也可同时添加多个主节点

  3. 添加从节点

    redis-cli --cluster add-node  --cluster-slave --cluster-master-id 4f6782603f0d524361b68d3fb4945b38d6fa1dd3 192.168.43.128:6386 192.168.43.128:6382
    
    # --cluster-master-id是主节点id,通过cluster nodes可以查看
    # 不指定时,随机分配
    # 192.168.43.128:6386是新节点
    # 192.168.43.128:6382是旧节点,用于找你加入的是哪个集群
    
    

    同样可以添加多个从节点

  4. 重新分配slot
    新增加的主节点,是没有slots的,所以需要我们分配。请仔细阅读“#”后面的备注。

    # ip是用来确认是哪个集群的
    redis-cli --cluster reshard  192.168.43.128:6386   
    
    How many slots do you want to move (from 1 to 16384)? 1000  # 设置slot数1000  
    What is the receiving node ID? 03ccad2ba5dd1e062464bc7590400441fafb63f2  # 输入主节点id
    Please enter all the source node IDs.  
     Type 'all' to use all the nodes as source nodes for the hash slots.  
     Type 'done' once you entered all the source nodes IDs.  
    Source node #1:all   # 表示全部节点重新洗牌  
    Do you want to proceed with the proposed reshard plan (yes/no)? yes   # 确认重新分  
    

    可以把分配的过程理解成打扑克牌,all表示大家重新洗牌;输入某个主节点的node id。而输入done的话,会把slot抽出来给receiving node ID,相当于抽牌过程,不过在执行这个选项前会多一步:确认是抽谁的slot(见删除节点那里)。

修改节点

  • 改变从节点的master

     cluster replicate 9495c02bb1be1da198b529c9ff5e5a1728e545f6
    

    效果图

    你操作的是当前从节点,所以要注意你当前连上的是哪个节点。

删除节点

  1. 删除主节点
    如果主节点有从节点,将从节点转移到其他主节点
    在客户端中,使用cluster replicate <node-id>命令,改变从节点的主节点。
    如果主节点有slot,去掉分配的slot,然后再删除主节点

    redis-cli --cluster reshard  192.168.43.128:6386  # 取消分配的slot也是这个命令
    
    How many slots do you want to move (from 1 to 16384)? 1000 # 被删除master的所有slot数量  
    What is the receiving node ID? bf8c3e87f457e9c60575e03333323dfff778bd95 # 接收slot的master  
    Please enter all the source node IDs.  
     Type 'all' to use all the nodes as source nodes for the hash slots.  
     Type 'done' once you entered all the source nodes IDs.  
    Source node #1:4f6782603f0d524361b68d3fb4945b38d6fa1dd3  # 被删除master的node-id  
    Source node #2:done   
    
    Do you want to proceed with the proposed reshard plan (yes/no)? yes  # 取消slot后,reshard  
    
    

    删除主节点

    redis-cli --cluster del-node 192.168.43.128:6385 4f6782603f0d524361b68d3fb4945b38d6fa1dd3
    
    # 192.168.43.128:6385表示哪个集群
    # 4f6782603f0d524361b68d3fb4945b38d6fa1dd3是节点的id
    
  2. 删除从节点、
    同样使用redis-cli --cluster del-node命令。

    redis-cli --cluster del-node 192.168.163.132:6384 f6a6957421b80409106cb36be3c7ba41f3b603ff
    
    # 192.168.163.132:6384表示哪个集群
    # f6a6957421b80409106cb36be3c7ba41f3b603ff是要删的节点的id
    

故障修复

假如主节点宕机了,那么从节点将会在一段时间内(根据cluster-node-timeout配置决定,默认15秒)自动升为主节点。和哨兵模式一样,原主节点恢复后会变成从机。

如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?答案是根据cluster-require-full-coverage的配置:

  1. 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉。
  2. 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。

常见问题及解决

缓存穿透

key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。
示意图

原因
缓存穿透的问题,肯定是再大并发情况下。依此为前提,我们分析缓存穿透的原因如下:

  1. 恶意攻击,猜测你的key命名方式,然后估计使用一个你缓存中不会有的key进行访问。
  2. 第一次数据访问,这时缓存中还没有数据,则并发场景下,所有的请求都会压到数据库。
  3. 数据库的数据也是空,这样即使访问了数据库,也是获取不到数据,那么缓存中肯定也没有对应的数据。这样也会导致穿透。

解决方法

  1. 对空值缓存
    如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
  2. 设置可访问的名单(白名单)
    使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
  3. 采用布隆过滤器(Bloom Filter)
    布隆过滤器是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。
    布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
    将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
  4. 进行实时监控
    当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

缓存击穿

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。当key可能会在某些时间点被超高并发地访问时,就要考虑缓存被“击穿”的问题了。

示意图

解决方法

  1. 预先设置热门数据
    在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
  2. 实时调整
    现场监控哪些数据热门,实时调整key的过期时长
  3. 使用锁:
    分布式锁,示意图

缓存穿透与缓存击穿的区别

击穿就穿一层,只是穿了redis,数据库有数据
穿透穿了两层,redis和数据库都没数据
因此它们的应当方案有所区别

缓存雪崩

缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。即多个key同时过期,使得后端DB忙不过来。

缓存雪崩

解决方法

  1. 构建多级缓存架构
    如:nginx缓存 + redis缓存 +其他缓存(ehcache等)
  2. 使用锁或队列
    用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
  3. 设置过期标志更新缓存
    记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
  4. 将缓存失效时间分散开
    比如使用随机数作为过期时间,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

分布式锁

由于我们把单机部署成了分布式的集群,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,所以我们需要解决数据安全的问题,这就是分布式锁。

分布式锁主流的实现方案:

  1. 基于数据库实现分布式锁
  2. 基于缓存(Redis等)
  3. 基于Zookeeper
    每一种分布式锁解决方案都有各自的优缺点:
  4. 性能:redis最高
  5. 可靠性:zookeeper最高
    这里,我们就基于redis实现分布式锁。

方案一

基于redis实现分布式锁,一般的方式是通过设置一个key作为标识,当key存在时,则其它操作返回false,不存在时设置key,然后进行自己的操作。
key要满足两个条件:

  1. key要有过期时间,不然可造成全部都阻塞了
  2. key的值不能一样,用于给在操作时判断是否为自己的key,防止误删

直接说可能抽象,画个图就简单了。

示意图

我这是用python实现的,其它语言也大同小异,只要根据上面图的逻辑就行了。

代码实现:

import uuid
import math
import time

from redis import WatchError


def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2):
    """
    基于 Redis 实现的分布式锁1

    :param conn: Redis 连接
    :param lock_name: 锁的名称
    :param acquire_timeout: 获取锁的超时时间,默认 3 秒
    :param lock_timeout: 锁的超时时间,默认 2 秒
    :return:
    """

    # uuid做唯一标识
    identifier = str(uuid.uuid4())
    lock_name = f'lock:{lock_name}'
    lock_timeout = int(math.ceil(lock_timeout))  # 向上取整

    end = time.time() + acquire_timeout
    while time.time() < end:
        # 如果不存在这个锁则加锁并设置过期时间,避免死锁
        if conn.set(lock_name, identifier, ex=lock_timeout, nx=True):
            # 需要返回唯一标识,删除key时要用到
            return identifier

        time.sleep(0.001)
    # 获取锁超时了,返回False
    return False


def release_lock(conn, lock_name, identifier):
    """
    释放锁

    :param conn: Redis 连接
    :param lock_name: 锁的名称
    :param identifier: 锁的标识
    :return:
    """
    # python中redis事务是通过pipeline的封装实现的
    with conn.pipeline() as pipe:
        lock_name = 'lock:' + lock_name

        while True:
            try:
                # watch 锁, multi 后如果该 key 被其他客户端改变, 事务操作会抛出 WatchError 异常
                pipe.watch(lock_name)
                current_id = pipe.get(lock_name)

                # 假如是字节的话,就decode,这与创建Redis的参数有关
                current_id = current_id.decode('utf-8') if isinstance(current_id, bytes) else current_id
                if current_id and current_id == identifier:
                    # 事务开始
                    pipe.multi()
                    pipe.delete(lock_name)
                    pipe.execute()
                    return True

                pipe.unwatch()
                break
            except WatchError:
                pass
        return False


redis-py-cluster不支持MULTI/EXEC + WATCH的形式(见:Transactions and WATCH),所以上面的代码的conn为redis-py-cluster创建的话,会报错。
想要不报错的话,去掉事务pipe.multi()即可。
个人感觉还是下面这种方案好

方案二

该方案利用的是,lua脚本在redis中的原子性,达到事务+watch的效果。

import uuid
import math
import time


def acquire_lock_with_timeout(conn, lock_name, acquire_timeout=3, lock_timeout=2):
    """
    基于 Redis 实现的分布式锁2
    
    :param conn: Redis 连接
    :param lock_name: 锁的名称
    :param acquire_timeout: 获取锁的超时时间,默认 3 秒
    :param lock_timeout: 锁的超时时间,默认 2 秒
    :return:
    """

    identifier = str(uuid.uuid4())
    lockname = f'lock:{lock_name}'
    lock_timeout = int(math.ceil(lock_timeout))

    end = time.time() + acquire_timeout

    while time.time() < end:
        # 如果不存在这个锁则加锁并设置过期时间,避免死锁
        if conn.set(lockname, identifier, ex=lock_timeout, nx=True):
            return identifier

        time.sleep(0.001)

    return False


def release_lock(conn, lock_name, identifier):
    """
    释放锁
    
    :param conn: Redis 连接
    :param lockname: 锁的名称
    :param identifier: 锁的标识
    :return:
    """
    unlock_script = """
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    """
    lock_name= f'lock:{lock_name}'
    unlock = conn.register_script(unlock_script)
    result = unlock(keys=[lock_name], args=[identifier])
    if result:
        return True
    else:
        return False

将以上代码写成上下文协议,用起来更方便:

import uuid
import math
import time


class RedisLock:
    def __init__(self, conn, lock_name, acquire_timeout=3, lock_timeout=2):
        """
        基于 Redis 实现的分布式锁2

        :param conn: Redis 连接
        :param lock_name: 锁的名称
        :param acquire_timeout: 获取锁的超时时间,默认 3 秒
        :param lock_timeout: 锁的超时时间,默认 2 秒
        :return:
        """
        self.conn = conn
        self.lock_name = f'lock:{lock_name}'
        self.acquire_timeout = acquire_timeout
        self.lock_timeout = int(math.ceil(lock_timeout))  # 向上取整
        super().__init__()

    def __enter__(self):
        self.identifier = str(uuid.uuid4())

        end = time.time() + self.acquire_timeout
        while time.time() < end:
            # 如果不存在这个锁则加锁并设置过期时间,避免死锁
            if self.conn.set(self.lock_name, self.identifier, ex=self.lock_timeout, nx=True):
                # 获取锁成功了
                return True

            time.sleep(0.001)
        # 获取锁超时了,失败,返回False
        return False

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 释放锁
        # 异常不处理,让其自动上浮
        unlock_script = """
        if redis.call("get",KEYS[1]) == ARGV[1] then
            return redis.call("del",KEYS[1])
        else
            return 0
        end
        """
        unlock = self.conn.register_script(unlock_script)
        unlock(keys=[self.lock_name], args=[self.identifier])


使用:

我将之前秒杀的例子稍作修改了一下

from flask import Flask, render_template, jsonify, request
from rediscluster import RedisCluster
import random
import time

# 上面的分布锁
import Lock

app = Flask(__name__)

startup_nodes = [
    {"host": "192.168.43.128", "port": "6379"},
    {"host": "192.168.43.128", "port": "6381"},
    {"host": "192.168.43.128", "port": "6380"},
]

r = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)


@app.get("/")
def index():
    return render_template("index.html")


@app.post("/seckill/")
def sec_kill():
    res = {
        "status": 200,
        "msg": "秒杀成功!"

    }
    # 拿到产品id
    pd_id = request.form.get("pdid")
    # 随机生成一个用户id,用于模拟
    random.seed(time.time_ns())
    user_id = random.randint(1, 1000)

    pd_key = "sk:%s:qt" % pd_id
    user_key = "sk:%s:user" % pd_id

    with Lock.RedisLock(conn=r, lock_name="secKill", acquire_timeout=1, lock_timeout=1) as is_get_lock:
        if is_get_lock:
            # 取得锁了
            if not r.exists(pd_key):
                # 不存在,则还没开始
                res["status"] = 400
                res["msg"] = "秒杀还未开始!"
                return jsonify(res)

            pd_count = int(r.get(pd_key))   # 懒得做异常处理了
            if pd_count < 1:
                res["status"] = 401
                res["msg"] = "秒杀已经结束了!"
                return jsonify(res)

            # 判断用户是否在集合中
            if r.sismember(user_key, user_id):
                res["status"] = 403
                res["msg"] = "你已经秒杀成功了!"
                return jsonify(res)
            # 产品数量-1
            # 并把用户id添加进集合中
            r.decr(pd_key)
            r.sadd(user_key, user_id)
        else:
            res["status"] = 500
            res["msg"] = "服务器繁忙,请稍后再试!"

    return jsonify(res)


if __name__ == '__main__':
    app.run()

参考:

redis的持久化(RDB&AOF的区别)
redis缓存穿透怎么解决
深入学习Redis(3):主从复制
Redis Cluster日常操作命令梳理
还有些忘记记录下来了

Tags: