秒殺系統後台實現詳解
- 2020 年 9 月 8 日
- 筆記
秒殺後台實現
本文主要講解項目實戰中秒殺如何解決下面問題:
1)實現秒殺非同步下單,掌握如何保證生產者&消費者消息不丟失
2)實現防止惡意刷單
3)實現防止相同商品重複秒殺
4)實現秒殺下單介面隱藏
5)實現下單介面限流
1 秒殺非同步下單
用戶在下單的時候,需要基於JWT令牌資訊進行登陸人資訊認證,確定當前訂單是屬於誰的。
針對秒殺的特殊業務場景,僅僅依靠對象快取或者頁面靜態化等技術去解決服務端壓力還是遠遠不夠。
對於資料庫壓力還是很大,所以需要非同步下單,非同步是最好的解決辦法,但會帶來一些額外的程式上的
複雜性。
1.1 秒殺服務-下單實現
1)將tokenDecode工具類config放入秒殺服務並聲明Bean
public static void main(String[] args){
SpringApplication.run(SeckillApplication,class,args);
}
@Bean
public TokenDecode tokenDecode(){
return new TokenDecode();
}
2)更新秒殺服務啟動類,添加redis配置
/**
* 設置 redisTemplate 的序列化設置
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
// 1.創建 redisTemplate 模版
RedisTemplate<Object, Object> template = new RedisTemplate<>();
// 2.關聯 redisConnectionFactory
template.setConnectionFactory(redisConnectionFactory);
// 3.創建 序列化類
GenericToStringSerializer genericToStringSerializer = new GenericToStringSerializer(Object.class);
// 4.序列化類,對象映射設置
// 5.設置 value 的轉化格式和 key 的轉化格式
template.setValueSerializer(genericToStringSerializer);
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
2)新建下單controller並聲明方法
@RestController
@CrossOrigin
@RequestMapping("/seckillorder")
public class SecKillOrderController {
@Autowired
private TokenDecode tokenDecode;
@Autowired
private SecKillOrderService secKillOrderService;
/**
* 秒殺下單
* @param time 當前時間段
* @param id 秒殺商品id
* @return
*/
@RequestMapping("/add")
//獲取當前登陸人
String username = tokenDecode.getUserInfo().get("username");
boolean result = secKillOrderService.add(id,time,username);
if (result){
return new Result(true, StatusCode.OK,"下單成功");
}else{
return new Result(false,StatusCode.ERROR,"下單失敗");
}
}
}
3) 新建service介面
public interface SecKillOrderService {
/**
* 秒殺下單
* @param id 商品id
* @param time 時間段
* @param username 登陸人姓名
* @return
*/
boolean add(Long id, String time, String username);
}
4)更改預載入秒殺商品
當預載入秒殺商品的時候,提前載入每一個商品的庫存資訊,後續減庫存操作也會先預扣減快取中的庫存再非同步扣減mysql數據。
預扣減庫存會基於redis原子性操作實現
for (SeckillGoods seckillGoods : seckillGoodsList) {
redisTemplate.boundHashOps(SECKILL_GOODS_KEY + redisExtName).put(seckillGoods.getId(),seckillGoods);
//預載入庫存資訊
redisTemplate.OpsForValue(SECKILL_GOODS_STOCK_COUNT_KEY+seckillGoods.getId(),se
ckillGoods.getStockCount());
}
6)秒殺下單業務層實現
業務邏輯:
獲取秒殺商品數據與庫存量數據,如果沒有庫存則拋出異常執行redis預扣減庫存,並獲取扣減之後的庫存值如果扣減完的庫存值<=0, 則刪除redis中對應的商品資訊與庫存資訊基於mq非同步方式完成與mysql數據同步(最終一致性)
注意:庫存數據從redis中取出,轉換成String
@Service
public class SecKillOrderServiceImpl implements SecKillOrderService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private IdWorker idWorker;
@Autowired
private CustomMessageSender customMessageSender;
/**
* 秒殺下單
* @param id 商品id
* @param time 時間段
* @param username 登陸人姓名
* @return
*/
@Override
public boolean add(Long id, String time, String username) {
//獲取商品數據
SeckillGoods goods = (SeckillGoods)
redisTemplate.boundHashOps("SeckillGoods_" + time).get(id);
String redisStock = (String) redisTemplate.boundValueOps("StockCount_" +
goods.getId()).get();
if(StringUtils.isEmpty(redisStock)){
return false;
}
int value=Integer.parseInt(redisStock);
//如果沒有庫存,則直接拋出異常
if(goods==null || value<=0){
return false;
}
//redis預扣庫存
Long stockCount = redisTemplate.boundValueOps("StockCount_" +
id).decrement();
if (stockCount<=0){
//庫存沒了
//刪除商品資訊
redisTemplate.boundHashOps("SeckillGoods_" + time).delete(id);
//刪除對應的庫存資訊
redisTemplate.delete("StockCount_" + goods.getId());
}
//有庫存
//如果有庫存,則創建秒殺商品訂單
SeckillOrder seckillOrder = new SeckillOrder();
seckillOrder.setId(idWorker.nextId());
seckillOrder.setUserId(username);
seckillOrder.setSellerId(goods.getSellerId());
seckillOrder.setCreateTime(new Date());
seckillOrder.setStatus("0");
//發送消息
return true;
}
}
1.2 生產者保證消息不丟失
按照現有rabbitMQ的相關知識,生產者會發送消息到達消息伺服器。但是在實際生產環境下,消息生產者發送的消息很有可能當到達了消息伺服器之後,由於消息伺服器的問題導致消息丟失,如宕機。因為消息伺服器默認會將消息存儲在記憶體中。一旦消息伺服器宕機,則消息會產生丟失。因此要保證生產者的消息不丟失,要開始持久化策略。
rabbitMQ持久化: 交換機持久化 隊列持久化 消息持久化
但是如果僅僅只是開啟這兩部分的持久化,也很有可能造成消息丟失。因為消息伺服器很有可能在持久化的過程中出現宕機。因此需要通過數據保護機制來保證消息一定會成功進行持久化,否則將一直進行消息發送。
事務機制
事務機制採用類資料庫的事務機制進行數據保護,當消息到達消息伺服器,首先會開啟一個事務,接著進 行數據磁碟持久化,只有持久化成功才會進行事務提交,向消息生產者返回成功通知,消息生產者一旦接收成 功通知則不會再發送此條消息。當出現異常,則返回失敗通知.消息生產者一旦接收失敗通知,則繼續發送該 條消息。
事務機制雖然能夠保證數據安全,但是此機制採用的是同步機制,會產生系統間消息阻塞,影響整個系統 的消息吞吐量。從而導致整個系統的性能下降,因此不建議使用。
confirm機制
confirm模式需要基於channel進行設置, 一旦某條消息被投遞到隊列之後,消息隊列就會發送一個確 認資訊給生產者,如果隊列與消息是可持久化的, 那麼確認消息會等到消息成功寫入到磁碟之後發出. confirm的性能高,主要得益於它是非同步的.生產者在將第一條消息發出之後等待確認消息的同時也可以 繼續發送後續的消息.當確認消息到達之後,就可以通過回調方法處理這條確認消息. 如果MQ服務宕機了,則會 返回nack消息. 生產者同樣在回調方法中進行後續處理。
1.2.1 開啟confifirm機制
1)更改秒殺服務配置文件
rabbitmq:
host: 192.168.200.128
publisher-confirms: true #開啟confirm機制
2)開啟隊列持久化
@Configuration
public class RabbitMQConfig {
//秒殺商品訂單消息
public static final String SECKILL_ORDER_KEY="seckill_order";
@Bean
public Queue queue(){
//開啟隊列持久化
return new Queue(SECKILL_ORDER_KEY,true);
}
}
3)消息持久化源碼查看
4)增強rabbitTemplate
@Component
public class CustomMessageSender implements RabbitTemplate.ConfirmCallback {
static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);
private static final String MESSAGE_CONFIRM="message_confirm";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
public CustomMessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String
cause) {
if (ack){
//返回成功通知
//刪除redis中的相關數據
redisTemplate.delete(correlationData.getId());
redisTemplate.delete(MESSAGE_CONFIRM_+correlationData.getId());
}else{
//返回失敗通知
Map<String,String> map =
(Map<String,String>)redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_+correlationData.getId());
String exchange = map.get("exchange");
String routingKey = map.get("routingKey");
String sendMessage = map.get("sendMessage");
//重新發送
rabbitTemplate.convertAndSend(exchange,routingKey,
JSON.toJSONString(sendMessage));
}
}
//自定義發送方法
public void sendMessage(String exchange,String routingKey,String message){
//設置消息唯一標識並存入快取
CorrelationData correlationData = new
CorrelationData(UUID.randomUUID().toString());
redisTemplate.opsForValue().set(correlationData.getId(),message);
Map<String, String> map = new HashMap<>();
map.put("exchange", exchange);
map.put("routingKey", routingKey);
map.put("sendMessage", message);
redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_+correlationData.getId(),map)
;
//攜帶唯一標識發送消息
rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
}
}
5)發送消息
更改下單業務層實現
@Autowired
private CustomMessageSender customMessageSender;
1.3 秒殺下單服務更新庫存
1.3.1 非同步下單服務service_consume
1)添加依賴
<dependencies>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_common_db</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_order_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_seckill_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_goods_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
</dependencies>
2)新建application.yml
server:
port: 9022
spring:
jackson:
time-zone: GMT+8
application:
name: sec-consume
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.200.128:3306/changgou_seckill?
useUnicode=true&characterEncoding=utf-
8&useSSL=false&allowMultiQueries=true&serverTimezone=GMT%2b8
username: root
password: root
main:
allow-bean-definition-overriding: true #當遇到同樣名字的時候,是否允許覆蓋註冊
redis:
host: 192.168.200.128
rabbitmq:
host: 192.168.200.128
eureka:
client:
service-url:
defaultZone: //127.0.0.1:6868/eureka
instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true
client:
config:
default: #配置全局的feign的調用超時時間 如果 有指定的服務配置 默認的配置不會生效
connectTimeout: 60000 # 指定的是 消費者 連接服務提供者的連接超時時間 是否能連接
單位是毫秒
readTimeout: 20000 # 指定的是調用服務提供者的 服務 的超時時間() 單位是毫秒
#hystrix 配置
hystrix:
command:
default:
execution:
timeout:
#如果enabled設置為false,則請求超時交給ribbon控制
enabled: true
isolation:
strategy: SEMAPHORE
thread:
# 熔斷器超時時間,默認:1000/毫秒
timeoutInMilliseconds: 20000
3)新建啟動類
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan(basePackages = {"com.changgou.consume.dao"})
public class OrderConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(OrderConsumerApplication.class,args);
}
}
1.3.2 消費者手動ACK下單實現
按照現有RabbitMQ知識,可以得知當消息消費者成功接收到消息後,會進行消費並自動通知消息伺服器將該條消息刪除。此種方式的實現使用的是消費者自動應答機制。但是此種方式非常的不安全。在生產環境下,當消息消費者接收到消息,很有可能在處理消息的過程中出現意外情況從而導致消息丟失,因為如果使用自動應答機制是非常不安全。我們需要確保消費者當把消息成功處理完成之後,消息伺服器才會將該條消息刪除。此時要實現這種效果的話,就需要將自動應答轉換為手動應答,只有在消息消費者將消息處理完,才會通知消息伺服器將該條消息刪除。
1)更改配置文件
rabbitmq:
host: 192.168.200.128
listener:
simple:
acknowledge-mode: manual #手動
2)定義監聽類
@Component
public class ConsumeListener {
@Autowired
private SecKillOrderService secKillOrderService;
@RabbitListener(queues = RabbitMQConfig.SECKILL_ORDER_KEY)
public void receiveSecKillOrderMessage(Channel channel, Message message){
//轉換消息
SeckillOrder seckillOrder = JSON.parseObject(message.getBody(),
SeckillOrder.class);
//同步mysql訂單
int rows = secKillOrderService.createOrder(seckillOrder);
if (rows>0){
//返回成功通知
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}else{
//返回失敗通知
try {
//第一個boolean true所有消費者都會拒絕這個消息,false代表只有當前消費者拒
絕
//第二個boolean true當前消息會進入到死信隊列,false重新回到原有隊列中,默
認回到頭部
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3)定義業務層介面與實現類
public interface ConsumeService {
int handleCreateOrder(SeckillOrder order);
}
@Service
public class SecKillOrderServiceImpl implements SecKillOrderService { @Autowired
private SeckillGoodsMapper seckillGoodsMapper;
@Autowired
private SeckillOrderMapper seckillOrderMapper;
/**
* 添加訂單
* @param seckillOrder
* @return
*/
@Override
@Transactional
public int createOrder(SeckillOrder seckillOrder) {
int result =seckillGoodsMapper.updateStockCount(seckillOrder.getSeckillId());
if (result<=0){
return result;
}
result =seckillOrderMapper.insertSelective(seckillOrder);
if (result<=0){
return result;
}return 1;
資料庫欄位unsigned介紹
unsigned-----無符號,修飾int 、char
ALTER TABLE tb_seckill_goods MODIFY COLUMN stock_count int(11) UNSIGNED DEFAULT NULL COMMENT '剩餘庫存數';
1.5 流量削峰
在秒殺這種高並發的場景下,每秒都有可能產生幾萬甚至十幾萬條消息,如果沒有對消息處理量進行任何限制的話,很有可能因為過多的消息堆積從而導致消費者宕機的情況。因此官網建議對每一個消息消費者都設置處理消息總數(消息抓取總數)。
消息抓取總數的值,設置過大或者過小都不好,過小的話,會導致整個系統消息吞吐能力下降,造成性能浪費。過大的話,則很有可能導致消息過多,導致整個系統OOM。因此官網建議每一個消費者將該值設置在100-300之間。
1)更新消費者。
//設置預抓取總數
channel.basicQos(300);
1.6 秒殺渲染服務-下單實現
1)定義feign介面
@FeignClient(name="seckill")
public interface SecKillOrderFeign {
/**
* 秒殺下單
* @param time 當前時間段
* @param id 秒殺商品id
* @return
*/
@RequestMapping("/seckillorder/add")
public Result add(@RequestParam("time") String time, @RequestParam("id") Long id);
}
2)定義controller
@Controller
@CrossOrigin
@RequestMapping("/wseckillorder")
public class SecKillOrderController {
@Autowired
private SecKillOrderFeign secKillOrderFeign;
/**
* 秒殺下單
* @param time 當前時間段
* @param id 秒殺商品id
* @return
*/
@RequestMapping("/add")
@ResponseBody
public Result add(String time,Long id){
Result result = secKillOrderFeign.add(time, id);
return result;
}
}
2 防止惡意刷單解決
在生產場景下,很有可能會存在某些用戶惡意刷單的情況出現。這樣的操作對於系統而言,會導致業務出錯、臟數據、後端訪問壓力大等問題的出現。
一般要解決這個問題的話,需要前端進行控制,同時後端也需要進行控制。後端實現可以通過Redisincrde 原子性遞增來進行解決。
2.1 更新秒殺服務下單
2.2 防重方法實現
//防止重複提交
private String preventRepeatCommit(String username,Long id) {
String redisKey = "seckill_user_" + username+"_id_"+id;
long count = redisTemplate.opsForValue().increment(redisKey, 1);
if (count == 1){
//設置有效期五分鐘
redisTemplate.expire(redisKey, 5, TimeUnit.MINUTES);
return "success";
}
if (count>1){
return "fail";
}
return "fail";
}
3 防止相同商品重複秒殺
3.1 修改下單業務層實現
3.2 dao層新增查詢方法
public interface SeckillOrderMapper extends Mapper<SeckillOrder> {
/**
* 查詢秒殺訂單資訊
* @param username
* @param id
* @return
*/
@Select("select * from tb_seckill_order where user_id=#{username} and seckill_id=#{id}")
SeckillOrder getSecKillOrderByUserNameAndGoodsId(String username, Long id); }
4 秒殺下單介面隱藏
當前雖然可以確保用戶只有在登錄的情況下才可以進行秒殺下單,但是無法方法有一些惡意的用戶在登錄了之後,猜測秒殺下單的介面地址進行惡意刷單。所以需要對秒殺介面地址進行隱藏。
在用戶每一次點擊搶購的時候,都首先去生成一個隨機數並存入redis,接著用戶攜帶著這個隨機數去訪問秒殺下單,下單介面首先會從redis中獲取該隨機數進行匹配,如果匹配成功,則進行後續下單操作,如果匹配不成功,則認定為非法訪問。
4.1 將隨機數工具類放入common工程中
public class RandomUtil {
public static String getRandomString() {
int length = 15;
String base = "abcdefghijklmnopqrstuvwxyz0123456789";
Random random = new Random();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < length; i++) {
int number = random.nextInt(base.length());
sb.append(base.charAt(number));
}
return sb.toString();
}
public static void main(String[] args) {
String randomString = RandomUtil.getRandomString();
}
4.2秒殺渲染服務定義隨機數介面
/**
* 介面加密
* 生成隨機數存入redis,10秒有效期
*/
@GetMapping("/getToken")
@ResponseBody
public String getToken(){
String randomString = RandomUtil.getRandomString();
String cookieValue = this.readCookie();
redisTemplate.boundValueOps("randomcode_"+cookieValue).set(randomString,10, TimeUnit.SECONDS);
return randomString;
}
//讀取cookie private String readCookie(){
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String cookieValue = CookieUtil.readCookie(request, "uid").get("uid");
return cookieValue;
}
4.3 js修改
修改js下單方法
//秒殺下單
add:function(id){
app.msg ='正在下單';
//獲取隨機數
axios.get("/api/wseckillorder/getToken").then(function (response) {
var random=response.data;
axios.get("/api/wseckillorder/add? time="+moment(app.dateMenus[0]).format("YYYYMMDDHH")+"&id="+id+"&random="+random ).then(function (response) {
if (response.data.flag){
app.msg='搶單成功,即將進入支付!';
}else{app.msg='搶單失敗';
}
})
})
}
4.4 秒殺渲染服務更改
修改秒殺渲染服務下單介面
/**
* 秒殺下單
* @param time 當前時間段
* @param id 秒殺商品id
* @return
*/
@RequestMapping("/add")
@ResponseBody
public Result add(String time,Long id,String random){
//校驗密文有效
String randomcode = (String) redisTemplate.boundValueOps("randomcode").get(); if (StringUtils.isEmpty(randomcode) || !random.equals(randomcode)){
return new Result(false, StatusCode.ERROR,"無效訪問");
}
Result result = secKillOrderFeign.add(time, id);
return result;
}
5 秒殺下單介面限流
因為秒殺的特殊業務場景,生產場景下,還有可能要對秒殺下單介面進行訪問流量控制,防止過多的請求進入到後端伺服器。對於限流的實現方式,我們之前已經接觸過通過nginx限流,網關限流。但是他們都是對一個大的服務進行訪問限流,如果現在只是要對某一個服務中的介面方法進行限流呢?這裡推薦使用google提供的guava工具包中的RateLimiter進行實現,其內部是基於令牌桶演算法進行限流計算
1)添加依賴
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
2)自定義限流註解
@Documented
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME)
public @interface AccessLimit {}
3)自定義切面類
@Component
@Scope
@Aspect
public class AccessLimitAop {
@Autowired
private HttpServletResponse httpServletResponse;
private RateLimiter rateLimiter = RateLimiter.create(20.0); @Pointcut("@annotation(com.changgou.webSecKill.aspect.AccessLimit)")
public void limit(){} @Around("limit()")
public Object around(ProceedingJoinPoint proceedingJoinPoint){
boolean flag = rateLimiter.tryAcquire();
Object obj = null;
try{
if (flag){
obj=proceedingJoinPoint.proceed();
}else{
String errorMessage = JSON.toJSONString(new Result(false,StatusCode.ERROR,"fail"));
outMessage(httpServletResponse,errorMessage);
}
}catch (Throwable throwable) { throwable.printStackTrace();
}return obj;
}
private void outMessage(HttpServletResponse response, String errorMessage) { ServletOutputStream outputStream = null;
try {
response.setContentType("application/json;charset=UTF-8");
outputStream = response.getOutputStream();
outputStream.write(errorMessage.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}finally {
try {outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
4)使用自定義限流註解
歡迎觀看並寫出自己的見解!共同探討!
公眾號:良許Linux