支付通道接口异常统计上报
发布于: 1 小时前
支付中心对接第三方通道时,会遇到第三方接口不稳定导致无法支付的问题,这就需要有个失败统计功能,可以根据预定的阈值自动切换支付通道。
接口调用失败的上报类 ReporterUtils :
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 日志上报类。
* 调用第三方通道的接口异常时,上报。
* <p>
* 维护一个内存队列msgQueue,日志先放入队列,然后由后台线程池推送给MQ。
*/
@Component
public class ReporterUtils {
private static final Logger log = LogManager.getLogger(ReporterUtils.class);
@Autowired
private ReportConfig config;
@Autowired
private MsgProducer msgProducer;
private static LinkedBlockingQueue<ReportInfo> msgQueue;
private static ExecutorService executorService;
@PostConstruct
private void init() {
log.debug("----------------- reporter init ------");
msgQueue = new LinkedBlockingQueue<ReportInfo>(config.getQueueMaxSize());
// 添加处理线程
executorService = new ThreadPoolExecutor(config.getWorkerNum(), config.getWorkerNum(), Constant.EXECUTOR_KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(Constant.EXECUTOR_WORK_QUEUE_SIZE), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
}
}, new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < config.getWorkerNum(); i++) {
executorService.submit(new Worker());
}
}
private ReporterUtils() {
}
/**
* 上报线程。
*/
class Worker implements Runnable {
/**
* 持续扫描内存队列中的日志并上报给MQ。
*/
@Override
public void run() {
try {
while (true) {
List<String> batchReportInfos = new ArrayList(config.getBatchSize());
ReportInfo reportInfo;
for (int j = 0; j < config.getBatchSize(); j++) {
/*获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作*/
reportInfo = msgQueue.take();
batchReportInfos.add(reportInfo.toString());
}
String jsonBatchReportInfos = JacksonHelper.toJsonString(batchReportInfos);
log.debug("批量上报通讯异常日志:" + jsonBatchReportInfos);
//失败报告通过MQ发送给MsgListener类处理(也可以改为接口调用方式)
msgProducer.send(Topic.HTTP_FAIL, jsonBatchReportInfos);
sleep(10_000L);
}
} catch (Exception ex) {
log.error("report fail. ", ex);
}
}
}
/**
* 调用第三方接口失败后,调用此方法,报告失败。
*
* @param reportInfo
* @return
*/
public boolean httpFail(ReportInfo reportInfo) {
if (null != reportInfo) {
return msgQueue.offer(reportInfo);
}
return false;
}
public static String getCurrentTimestampS() {
//精确到秒
return TimeUtil.getCurrentDateTime();
}
public static long getCurrentTimestampMs() {
//精确到毫秒
return System.currentTimeMillis();
}
private void sleep(Long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
//catched
}
}
}
复制代码
报告失败信息 ReportInfo 类:
public class ReportInfo {
private static final String SEPARATOR = ",";
private String url; //请求地址
private String orderId; // 交易的标识,比如订单号
private String channel; //支付通道
private String currentTime; // 上报时的时间戳,单位秒
private Long elapsedTimeMillis; // 实际请求耗时,单位毫秒
private Integer connectTimeoutMillis; // 请求时设置的连接超时时间,单位毫秒
private Integer readTimeoutMillis; // 请求时设置的服务端返回超时时间,单位毫秒
private Integer hasDnsError; // 是否DNS解析错误,或域名(IP)不存在
private Integer hasConnectTimeout; // 是否连接超时
private Integer hasReadTimeout; // 是否服务端返回超时
private Integer hasUnknownException; //其他未知的错误
public ReportInfo() {
}
public ReportInfo(String url, String orderId, String channel,
String currentTime, Long elapsedTimeMillis,
Integer connectTimeoutMillis, Integer readTimeoutMillis,
Boolean hasDnsError, Boolean hasConnectTimeout, Boolean hasReadTimeout,
Boolean hasUnknownException) {
this.orderId = orderId;
this.channel = channel;
this.currentTime = currentTime;
this.elapsedTimeMillis = elapsedTimeMillis;
this.url = url;
this.connectTimeoutMillis = connectTimeoutMillis;
this.readTimeoutMillis = readTimeoutMillis;
this.hasDnsError = hasDnsError ? 1 : 0;
this.hasConnectTimeout = hasConnectTimeout ? 1 : 0;
this.hasReadTimeout = hasReadTimeout ? 1 : 0;
this.hasUnknownException = hasUnknownException ? 1 : 0;
}
@Override
public String toString() {
return JacksonHelper.toJsonString(this);
}
/**
* 转换成 csv 格式。
* 每条记录转换为一行,都好分割。
*
* @return
*/
public String toLineString() {
Object[] objects = new Object[]{url, orderId, channel,
currentTime, elapsedTimeMillis,
connectTimeoutMillis, readTimeoutMillis,
hasDnsError, hasConnectTimeout, hasReadTimeout, hasUnknownException};
StringBuffer sb = new StringBuffer();
for (Object obj : objects) {
sb.append(obj).append(SEPARATOR);
}
try {
return sb.toString();
} catch (Exception ex) {
return null;
}
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getCurrentTime() {
return currentTime;
}
public void setCurrentTime(String currentTime) {
this.currentTime = currentTime;
}
public Long getElapsedTimeMillis() {
return elapsedTimeMillis;
}
public void setElapsedTimeMillis(Long elapsedTimeMillis) {
this.elapsedTimeMillis = elapsedTimeMillis;
}
public Integer getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
public void setConnectTimeoutMillis(Integer connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
public Integer getReadTimeoutMillis() {
return readTimeoutMillis;
}
public void setReadTimeoutMillis(Integer readTimeoutMillis) {
this.readTimeoutMillis = readTimeoutMillis;
}
public Integer getHasDnsError() {
return hasDnsError;
}
public void setHasDnsError(Integer hasDnsError) {
this.hasDnsError = hasDnsError;
}
public Integer getHasConnectTimeout() {
return hasConnectTimeout;
}
public void setHasConnectTimeout(Integer hasConnectTimeout) {
this.hasConnectTimeout = hasConnectTimeout;
}
public Integer getHasReadTimeout() {
return hasReadTimeout;
}
public void setHasReadTimeout(Integer hasReadTimeout) {
this.hasReadTimeout = hasReadTimeout;
}
public Integer getHasUnknownException() {
return hasUnknownException;
}
public void setHasUnknownException(Integer hasUnknownException) {
this.hasUnknownException = hasUnknownException;
}
}
复制代码
监听 MQ,接收失败报告 MsgListener:
@Component
public class MsgListener {
private static final Logger logger = LogManager.getLogger(MsgListener.class);
@Autowired
private QualityService channelQualityService;
@RabbitListener(queues = MqConstant.QUEUE_HTTP_FAIL, priority = "10")
public void onHttpFail(final String content) {
logger.debug("接收到上报的通讯异常日志:" + content);
List<String> reportList;
try {
reportList = JacksonHelper.parseJson(content, ArrayList.class);
} catch (DataParseException e) {
logger.error("解析通讯异常日志失败:" + content, e);
return;
}
for (String jsonReport : reportList) {
try {
ReportInfo reportInfo = JacksonHelper.parseJson(jsonReport, ReportInfo.class);
channelQualityService.onHttpFail(reportInfo);
} catch (Exception e) {
logger.error("处理上通讯异常日志失败", e);
}
}
}
}
复制代码
处理通道降级的服务类 ChannelQualityService:
关键点是 redis 执行 lua 脚本,实现通道降级,算法参考的是 spring-cloud 熔断器的思路:
/**
* 支付通道 质量服务(QoS) 类。
* 处理通道降级。
* <p>
* 统计各通道的失败次数,根据频率(某个时间段)、失败类型(UnknownHostException, ConnectTimeoutException,SocketTimeoutException,Exception)等,
* 确定是否需要触发通道降级。
*
* @author machunlin
* @date 2018/4/28
*/
@Service
public class ChannelQualityServiceImpl implements QualityService {
private static final Logger logger = LogManager.getLogger(ChannelQualityServiceImpl.class);
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 是否开启通道降级功能。
*/
@Value("${channel-downgraded.enabled}")
private Boolean enabled;
/**
* 统计时长的滚动窗口,默认一分钟。
* 即:统计一分钟之内所有错误请求的总次数。
*/
@Value("${channel-downgraded.rollingstats-window-seconds}")
private Integer rollingstatsWindowSeconds;
/**
* 错误次数。
* 超过此阀值则触发降级。
*/
@Value("${channel-downgraded.error-threshold}")
private Integer errorThreshold;
/**
* 触发短路的时长。
* 即:在指定时长内,保持“已降级”状态。
*/
@Value("${channel-downgraded.sleep-window-seconds}")
private Integer sleepWindowSeconds;
private static DefaultRedisScript<String> errorCountScript = new DefaultRedisScript<>();
private static final StringBuilder luaIncrExpire = new StringBuilder(500);
private static DefaultRedisScript<Long> channelDowngradedScript = new DefaultRedisScript<>();
private static final StringBuilder luaSAddExpire = new StringBuilder(200);
static {
/**
* 错误次数统计:
* KEYS[1] = channel
* KEYS[2] = 统计时长的滚动窗口, 默认为1分钟
* KEYS[3] = 错误次数(阀值),默认4次
*/
luaIncrExpire.append(" local errorCount").
append(" errorCount = redis.call('INCR', KEYS[1])").
append(" errorCount = tonumber(errorCount)").
append(" if errorCount == 1 then"). //如果是第一次报错,将设置key值的有效期
append(" redis.call('EXPIRE',KEYS[1], KEYS[2])").
append(" return 'done: errorCount == '..errorCount").
append(" elseif(errorCount <= tonumber(KEYS[3])) then").
append(" return 'done: errorCount is '..errorCount").
append(" else").
append(" return 'downgraded on trigger: errorCount is '..errorCount").
append(" end");
errorCountScript.setScriptText(luaIncrExpire.toString());
errorCountScript.setResultType(String.class);
/**
* 降级通道新增channel值。
* (使用SET集合,value重复的会被忽略):
* KEYS[1] = key
* KEYS[2] = value
* KEYS[3] = ttl seconds
*
*/
luaSAddExpire.append(" local result").
append(" result = redis.call('SADD', KEYS[1],KEYS[2])").
append(" if(result>0) then"). // 返回0表示value重复,不作处理
append(" redis.call('EXPIRE',KEYS[1],KEYS[3])").
append(" end").
append(" return result");
channelDowngradedScript.setScriptText(luaSAddExpire.toString());
channelDowngradedScript.setResultType(Long.class);
}
/**
* 失败处理
*
* @param reportInfo
*/
@Override
public void onHttpFail(final ReportInfo reportInfo) {
if (!enabled) {
logger.warn("通道质量服务(QoS)未开启");
return;
}
handleDowngraded(reportInfo);
}
/**
* 获取全部已降级通道
*
* @return
*/
@Override
public Set<String> getDowngraded() {
Set<String> channelsDowngraded = redisTemplate.opsForSet().members(Constant.CACHE_KEY_CHANNEL_DOWN);
return channelsDowngraded;
}
/**
* 处理通道降级。
*
* @param reportInfo
*/
private void handleDowngraded(final ReportInfo reportInfo) {
List<String> keysCount = new ArrayList<>();
keysCount.add(Constant.CACHE_KEY_PREFIX_CHANNEL_ERR_COUNT + reportInfo.getChannel());
keysCount.add(rollingstatsWindowSeconds + ""); // 统计时长的滚动窗口
keysCount.add(errorThreshold + ""); // 错误阀值
String resp = redisTemplate.execute(errorCountScript, keysCount);
logger.debug("失败请求已提交 : " + resp);
if (resp.startsWith("downgraded on trigger")) {
/**
* 目前“被降级的通道”是直接放到redis中,集群中的其他PayService向redis查询。
* 后期可以优化:
* 1、已降级的通道,放到本机内存中,然后广播通知集群内的其他服务器更新;
* 2、通道路由时,直接取本机内存中的数据。
* 3、本机开启一个后台线程,监听"通道降级"的广播,并更新内存数据。
*/
List<String> keysChannelDown = new ArrayList<>();
keysChannelDown.add(Constant.CACHE_KEY_CHANNEL_DOWN); // key
keysChannelDown.add(reportInfo.getChannel()); // value
keysChannelDown.add(sleepWindowSeconds + ""); // 有效期
Long result = redisTemplate.execute(channelDowngradedScript, keysChannelDown);
if (result > 0) {
logger.debug("{}通道被降级,降级时长为{}秒", reportInfo.getChannel(), sleepWindowSeconds);
}
}
}
}
复制代码
划线
评论
复制
发布于: 1 小时前阅读数: 6
版权声明: 本文为 InfoQ 作者【try catch】的原创文章。
原文链接:【http://xie.infoq.cn/article/2e7175feac52ba6c9df7756fb】。文章转载请联系作者。
try catch
关注
还未添加个人签名 2012.07.23 加入
还未添加个人简介
评论