Skip to content

Commit

Permalink
feature: 通过令牌解决缓存扣减极端场景
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks committed Apr 9, 2024
1 parent 9e3b8fd commit 4e9d673
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ public final class RedisKeyConstant {
*/
public static final String TRAIN_STATION_PRICE = "index12306-ticket-service:train_station_price:%s_%s_%s";

/**
* 列车站点座位价格查询分布式锁 Key
*/
public static final String LOCK_TRAIN_STATION_PRICE = "index12306-ticket-service:lock:train_station_price";

/**
* 地区以及车站查询,Key Prefix + ( 车站名称 or 查询方式 )
*/
Expand Down Expand Up @@ -143,4 +138,9 @@ public final class RedisKeyConstant {
* 列车购买令牌桶加载数据 Key
*/
public static final String LOCK_TICKET_AVAILABILITY_TOKEN_BUCKET = "index12306-ticket-service:lock:ticket_availability_token_bucket:%s";

/**
* 令牌获取失败分布式锁 Key
*/
public static final String LOCK_TOKEN_BUCKET_ISNULL = "index12306-ticket-service:lock:token-bucket-isnull:%s";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.opengoofy.index12306.biz.ticketservice.service.handler.ticket.dto;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
* 令牌扣减返回参数
*
* @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:12306)获取项目资料
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class TokenResultDTO {

/**
* Token 为空
*/
private Boolean tokenIsNull;

/**
* 获取 Token 为空站点座位类型和数量
*/
private List<String> tokenIsNullSeatTypeCounts;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
package org.opengoofy.index12306.biz.ticketservice.service.handler.ticket.tokenbucket;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengoofy.index12306.biz.ticketservice.common.enums.VehicleTypeEnum;
import org.opengoofy.index12306.biz.ticketservice.dao.entity.TrainDO;
import org.opengoofy.index12306.biz.ticketservice.dao.mapper.SeatMapper;
import org.opengoofy.index12306.biz.ticketservice.dao.mapper.TrainMapper;
import org.opengoofy.index12306.biz.ticketservice.dto.domain.PurchaseTicketPassengerDetailDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.domain.RouteDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.domain.SeatTypeCountDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.req.PurchaseTicketReqDTO;
import org.opengoofy.index12306.biz.ticketservice.remote.dto.TicketOrderDetailRespDTO;
import org.opengoofy.index12306.biz.ticketservice.remote.dto.TicketOrderPassengerDetailRespDTO;
import org.opengoofy.index12306.biz.ticketservice.service.SeatService;
import org.opengoofy.index12306.biz.ticketservice.service.TrainStationService;
import org.opengoofy.index12306.biz.ticketservice.service.handler.ticket.dto.TokenResultDTO;
import org.opengoofy.index12306.framework.starter.bases.Singleton;
import org.opengoofy.index12306.framework.starter.cache.DistributedCache;
import org.opengoofy.index12306.framework.starter.common.toolkit.Assert;
Expand Down Expand Up @@ -72,7 +73,7 @@ public final class TicketAvailabilityTokenBucket {
private final TrainStationService trainStationService;
private final DistributedCache distributedCache;
private final RedissonClient redissonClient;
private final SeatMapper seatMapper;
private final SeatService seatService;
private final TrainMapper trainMapper;

private static final String LUA_TICKET_AVAILABILITY_TOKEN_BUCKET_PATH = "lua/ticket_availability_token_bucket.lua";
Expand All @@ -84,9 +85,9 @@ public final class TicketAvailabilityTokenBucket {
* 如果返回 {@link Boolean#FALSE} 代表当前访问出发站点和到达站点令牌已被拿完,无法参与购票下单等逻辑
*
* @param requestParam 购票请求参数入参
* @return 是否获取列车车票余量令牌桶中的令牌,{@link Boolean#TRUE} or {@link Boolean#FALSE}
* @return 是否获取列车车票余量令牌桶中的令牌返回结果
*/
public boolean takeTokenFromBucket(PurchaseTicketReqDTO requestParam) {
public TokenResultDTO takeTokenFromBucket(PurchaseTicketReqDTO requestParam) {
TrainDO trainDO = distributedCache.safeGet(
TRAIN_INFO + requestParam.getTrainId(),
TrainDO.class,
Expand All @@ -96,18 +97,20 @@ public boolean takeTokenFromBucket(PurchaseTicketReqDTO requestParam) {
List<RouteDTO> routeDTOList = trainStationService
.listTrainStationRoute(requestParam.getTrainId(), trainDO.getStartStation(), trainDO.getEndStation());
StringRedisTemplate stringRedisTemplate = (StringRedisTemplate) distributedCache.getInstance();
String actualHashKey = TICKET_AVAILABILITY_TOKEN_BUCKET + requestParam.getTrainId();
Boolean hasKey = distributedCache.hasKey(actualHashKey);
String tokenBucketHashKey = TICKET_AVAILABILITY_TOKEN_BUCKET + requestParam.getTrainId();
Boolean hasKey = distributedCache.hasKey(tokenBucketHashKey);
if (!hasKey) {
RLock lock = redissonClient.getLock(String.format(LOCK_TICKET_AVAILABILITY_TOKEN_BUCKET, requestParam.getTrainId()));
lock.lock();
if (!lock.tryLock()) {
throw new ServiceException("购票异常,请稍候再试");
}
try {
Boolean hasKeyTwo = distributedCache.hasKey(actualHashKey);
Boolean hasKeyTwo = distributedCache.hasKey(tokenBucketHashKey);
if (!hasKeyTwo) {
List<Integer> seatTypes = VehicleTypeEnum.findSeatTypesByCode(trainDO.getTrainType());
Map<String, String> ticketAvailabilityTokenMap = new HashMap<>();
for (RouteDTO each : routeDTOList) {
List<SeatTypeCountDTO> seatTypeCountDTOList = seatMapper.listSeatTypeCount(Long.parseLong(requestParam.getTrainId()), each.getStartStation(), each.getEndStation(), seatTypes);
List<SeatTypeCountDTO> seatTypeCountDTOList = seatService.listSeatTypeCount(Long.parseLong(requestParam.getTrainId()), each.getStartStation(), each.getEndStation(), seatTypes);
for (SeatTypeCountDTO eachSeatTypeCountDTO : seatTypeCountDTOList) {
String buildCacheKey = StrUtil.join("_", each.getStartStation(), each.getEndStation(), eachSeatTypeCountDTO.getSeatType());
ticketAvailabilityTokenMap.put(buildCacheKey, String.valueOf(eachSeatTypeCountDTO.getSeatCount()));
Expand All @@ -119,10 +122,10 @@ public boolean takeTokenFromBucket(PurchaseTicketReqDTO requestParam) {
lock.unlock();
}
}
DefaultRedisScript<Long> actual = Singleton.get(LUA_TICKET_AVAILABILITY_TOKEN_BUCKET_PATH, () -> {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
DefaultRedisScript<String> actual = Singleton.get(LUA_TICKET_AVAILABILITY_TOKEN_BUCKET_PATH, () -> {
DefaultRedisScript<String> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_TICKET_AVAILABILITY_TOKEN_BUCKET_PATH)));
redisScript.setResultType(Long.class);
redisScript.setResultType(String.class);
return redisScript;
});
Assert.notNull(actual);
Expand All @@ -139,8 +142,11 @@ public boolean takeTokenFromBucket(PurchaseTicketReqDTO requestParam) {
List<RouteDTO> takeoutRouteDTOList = trainStationService
.listTakeoutTrainStationRoute(requestParam.getTrainId(), requestParam.getDeparture(), requestParam.getArrival());
String luaScriptKey = StrUtil.join("_", requestParam.getDeparture(), requestParam.getArrival());
Long result = stringRedisTemplate.execute(actual, Lists.newArrayList(actualHashKey, luaScriptKey), JSON.toJSONString(seatTypeCountArray), JSON.toJSONString(takeoutRouteDTOList));
return result != null && Objects.equals(result, 0L);
String resultStr = stringRedisTemplate.execute(actual, Lists.newArrayList(tokenBucketHashKey, luaScriptKey), JSON.toJSONString(seatTypeCountArray), JSON.toJSONString(takeoutRouteDTOList));
TokenResultDTO result = JSON.parseObject(resultStr, TokenResultDTO.class);
return result == null
? TokenResultDTO.builder().tokenIsNull(Boolean.TRUE).build()
: result;
}

/**
Expand Down Expand Up @@ -178,6 +184,17 @@ public void rollbackInBucket(TicketOrderDetailRespDTO requestParam) {
}
}

/**
* 删除令牌,一般在令牌与数据库不一致情况下触发
*
* @param requestParam 删除令牌容器参数
*/
public void delTokenInBucket(PurchaseTicketReqDTO requestParam) {
StringRedisTemplate stringRedisTemplate = (StringRedisTemplate) distributedCache.getInstance();
String tokenBucketHashKey = TICKET_AVAILABILITY_TOKEN_BUCKET + requestParam.getTrainId();
stringRedisTemplate.delete(tokenBucketHashKey);
}

public void putTokenInBucket() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opengoofy.index12306.biz.ticketservice.dto.domain.PurchaseTicketPassengerDetailDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.domain.RouteDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.domain.SeatClassDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.domain.SeatTypeCountDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.domain.TicketListDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.req.CancelTicketOrderReqDTO;
import org.opengoofy.index12306.biz.ticketservice.dto.req.PurchaseTicketReqDTO;
Expand All @@ -70,6 +71,7 @@
import org.opengoofy.index12306.biz.ticketservice.service.TicketService;
import org.opengoofy.index12306.biz.ticketservice.service.TrainStationService;
import org.opengoofy.index12306.biz.ticketservice.service.cache.SeatMarginCacheLoader;
import org.opengoofy.index12306.biz.ticketservice.service.handler.ticket.dto.TokenResultDTO;
import org.opengoofy.index12306.biz.ticketservice.service.handler.ticket.dto.TrainPurchaseTicketRespDTO;
import org.opengoofy.index12306.biz.ticketservice.service.handler.ticket.select.TrainSeatTypeSelector;
import org.opengoofy.index12306.biz.ticketservice.service.handler.ticket.tokenbucket.TicketAvailabilityTokenBucket;
Expand Down Expand Up @@ -104,6 +106,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand All @@ -113,6 +117,7 @@
import static org.opengoofy.index12306.biz.ticketservice.common.constant.RedisKeyConstant.LOCK_PURCHASE_TICKETS_V2;
import static org.opengoofy.index12306.biz.ticketservice.common.constant.RedisKeyConstant.LOCK_REGION_TRAIN_STATION;
import static org.opengoofy.index12306.biz.ticketservice.common.constant.RedisKeyConstant.LOCK_REGION_TRAIN_STATION_MAPPING;
import static org.opengoofy.index12306.biz.ticketservice.common.constant.RedisKeyConstant.LOCK_TOKEN_BUCKET_ISNULL;
import static org.opengoofy.index12306.biz.ticketservice.common.constant.RedisKeyConstant.REGION_TRAIN_STATION;
import static org.opengoofy.index12306.biz.ticketservice.common.constant.RedisKeyConstant.REGION_TRAIN_STATION_MAPPING;
import static org.opengoofy.index12306.biz.ticketservice.common.constant.RedisKeyConstant.TRAIN_INFO;
Expand Down Expand Up @@ -363,12 +368,27 @@ public TicketPurchaseRespDTO purchaseTicketsV1(PurchaseTicketReqDTO requestParam
.expireAfterWrite(1, TimeUnit.DAYS)
.build();

private final Cache<String, Object> tokenTicketsRefreshMap = Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();

@Override
public TicketPurchaseRespDTO purchaseTicketsV2(PurchaseTicketReqDTO requestParam) {
// 责任链模式,验证 1:参数必填 2:参数正确性 3:乘客是否已买当前车次等...
purchaseTicketAbstractChainContext.handler(TicketChainMarkEnum.TRAIN_PURCHASE_TICKET_FILTER.name(), requestParam);
boolean tokenResult = ticketAvailabilityTokenBucket.takeTokenFromBucket(requestParam);
if (!tokenResult) {
// 为什么需要令牌限流?余票缓存限流不可以么?详情查看:https://nageoffer.com/12306/question
TokenResultDTO tokenResult = ticketAvailabilityTokenBucket.takeTokenFromBucket(requestParam);
if (tokenResult.getTokenIsNull()) {
Object ifPresentObj = tokenTicketsRefreshMap.getIfPresent(requestParam.getTrainId());
if (ifPresentObj == null) {
synchronized (TicketService.class) {
if (tokenTicketsRefreshMap.getIfPresent(requestParam.getTrainId()) == null) {
ifPresentObj = new Object();
tokenTicketsRefreshMap.put(requestParam.getTrainId(), ifPresentObj);
tokenIsNullRefreshToken(requestParam, tokenResult);
}
}
}
throw new ServiceException("列车站点已无余票");
}
// v1 版本购票存在 4 个较为严重的问题,v2 版本相比较 v1 版本更具有业务特点以及性能,整体提升较大
Expand Down Expand Up @@ -606,6 +626,38 @@ private List<Integer> buildTrainBrandList(List<TicketListDTO> seatResults) {
return trainBrandSet.stream().toList();
}

private final ScheduledExecutorService tokenIsNullRefreshExecutor = Executors.newScheduledThreadPool(1);

private void tokenIsNullRefreshToken(PurchaseTicketReqDTO requestParam, TokenResultDTO tokenResult) {
RLock lock = redissonClient.getLock(String.format(LOCK_TOKEN_BUCKET_ISNULL, requestParam.getTrainId()));
if (!lock.tryLock()) {
return;
}
tokenIsNullRefreshExecutor.schedule(() -> {
try {
List<Integer> seatTypes = new ArrayList<>();
Map<Integer, Integer> tokenCountMap = new HashMap<>();
tokenResult.getTokenIsNullSeatTypeCounts().stream()
.map(each -> each.split("_"))
.forEach(split -> {
int seatType = Integer.parseInt(split[0]);
seatTypes.add(seatType);
tokenCountMap.put(seatType, Integer.parseInt(split[1]));
});
List<SeatTypeCountDTO> seatTypeCountDTOList = seatService.listSeatTypeCount(Long.parseLong(requestParam.getTrainId()), requestParam.getDeparture(), requestParam.getArrival(), seatTypes);
for (SeatTypeCountDTO each : seatTypeCountDTOList) {
Integer tokenCount = tokenCountMap.get(each.getSeatType());
if (tokenCount < each.getSeatCount()) {
ticketAvailabilityTokenBucket.delTokenInBucket(requestParam);
break;
}
}
} finally {
lock.unlock();
}
}, 10, TimeUnit.SECONDS);
}

@Override
public void run(String... args) throws Exception {
ticketService = ApplicationContextHolder.getBean(TicketService.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

local inputString = KEYS[2]
local actualKey = inputString
local colonIndex = string.find(actualKey, ":")
Expand All @@ -9,16 +8,27 @@ end
local jsonArrayStr = ARGV[1]
local jsonArray = cjson.decode(jsonArrayStr)

local result = {}
local tokenIsNull = false
local tokenIsNullSeatTypeCounts = {}

for index, jsonObj in ipairs(jsonArray) do
local seatType = tonumber(jsonObj.seatType)
local count = tonumber(jsonObj.count)
local actualInnerHashKey = actualKey .. "_" .. seatType
local ticketSeatAvailabilityTokenValue = tonumber(redis.call('hget', KEYS[1], tostring(actualInnerHashKey)))
if ticketSeatAvailabilityTokenValue < count then
return 1
tokenIsNull = true
table.insert(tokenIsNullSeatTypeCounts, seatType .. "_" .. count)
end
end

result['tokenIsNull'] = tokenIsNull
if tokenIsNull then
result['tokenIsNullSeatTypeCounts'] = tokenIsNullSeatTypeCounts
return cjson.encode(result)
end

local alongJsonArrayStr = ARGV[2]
local alongJsonArray = cjson.decode(alongJsonArrayStr)

Expand All @@ -33,4 +43,4 @@ for index, jsonObj in ipairs(jsonArray) do
end
end

return 0
return cjson.encode(result)

0 comments on commit 4e9d673

Please sign in to comment.