写点什么

支付通道接口异常统计上报

用户头像
try catch
关注
发布于: 1 小时前

支付中心对接第三方通道时,会遇到第三方接口不稳定导致无法支付的问题,这就需要有个失败统计功能,可以根据预定的阈值自动切换支付通道。

接口调用失败的上报类 ReporterUtils :

import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;

/** * 日志上报类。 * 调用第三方通道的接口异常时,上报。 * <p> * 维护一个内存队列msgQueue,日志先放入队列,然后由后台线程池推送给MQ。 */@Componentpublic class ReporterUtils { private static final Logger log = LogManager.getLogger(ReporterUtils.class);
@Autowired private ReportConfig config;
@Autowired private MsgProducer msgProducer;
private static LinkedBlockingQueue<ReportInfo> msgQueue; private static ExecutorService executorService;
@PostConstruct private void init() { log.debug("----------------- reporter init ------");
msgQueue = new LinkedBlockingQueue<ReportInfo>(config.getQueueMaxSize());
// 添加处理线程 executorService = new ThreadPoolExecutor(config.getWorkerNum(), config.getWorkerNum(), Constant.EXECUTOR_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(Constant.EXECUTOR_WORK_QUEUE_SIZE), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t; } }, new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < config.getWorkerNum(); i++) { executorService.submit(new Worker()); } }
private ReporterUtils() { }
/** * 上报线程。 */ class Worker implements Runnable { /** * 持续扫描内存队列中的日志并上报给MQ。 */ @Override public void run() { try {
while (true) { List<String> batchReportInfos = new ArrayList(config.getBatchSize());
ReportInfo reportInfo; for (int j = 0; j < config.getBatchSize(); j++) { /*获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作*/ reportInfo = msgQueue.take(); batchReportInfos.add(reportInfo.toString()); }
String jsonBatchReportInfos = JacksonHelper.toJsonString(batchReportInfos); log.debug("批量上报通讯异常日志:" + jsonBatchReportInfos); //失败报告通过MQ发送给MsgListener类处理(也可以改为接口调用方式) msgProducer.send(Topic.HTTP_FAIL, jsonBatchReportInfos); sleep(10_000L); } } catch (Exception ex) { log.error("report fail. ", ex); } } }
/** * 调用第三方接口失败后,调用此方法,报告失败。 * * @param reportInfo * @return */ public boolean httpFail(ReportInfo reportInfo) { if (null != reportInfo) { return msgQueue.offer(reportInfo); } return false; }

public static String getCurrentTimestampS() { //精确到秒 return TimeUtil.getCurrentDateTime(); }
public static long getCurrentTimestampMs() { //精确到毫秒 return System.currentTimeMillis(); }
private void sleep(Long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { //catched } }
}
复制代码

报告失败信息 ReportInfo 类:

public class ReportInfo {
private static final String SEPARATOR = ",";
private String url; //请求地址 private String orderId; // 交易的标识,比如订单号 private String channel; //支付通道
private String currentTime; // 上报时的时间戳,单位秒 private Long elapsedTimeMillis; // 实际请求耗时,单位毫秒
private Integer connectTimeoutMillis; // 请求时设置的连接超时时间,单位毫秒 private Integer readTimeoutMillis; // 请求时设置的服务端返回超时时间,单位毫秒
private Integer hasDnsError; // 是否DNS解析错误,或域名(IP)不存在 private Integer hasConnectTimeout; // 是否连接超时 private Integer hasReadTimeout; // 是否服务端返回超时
private Integer hasUnknownException; //其他未知的错误
public ReportInfo() { }
public ReportInfo(String url, String orderId, String channel, String currentTime, Long elapsedTimeMillis, Integer connectTimeoutMillis, Integer readTimeoutMillis, Boolean hasDnsError, Boolean hasConnectTimeout, Boolean hasReadTimeout, Boolean hasUnknownException) { this.orderId = orderId; this.channel = channel; this.currentTime = currentTime; this.elapsedTimeMillis = elapsedTimeMillis; this.url = url; this.connectTimeoutMillis = connectTimeoutMillis; this.readTimeoutMillis = readTimeoutMillis; this.hasDnsError = hasDnsError ? 1 : 0; this.hasConnectTimeout = hasConnectTimeout ? 1 : 0; this.hasReadTimeout = hasReadTimeout ? 1 : 0;
this.hasUnknownException = hasUnknownException ? 1 : 0; }
@Override public String toString() { return JacksonHelper.toJsonString(this); }
/** * 转换成 csv 格式。 * 每条记录转换为一行,都好分割。 * * @return */ public String toLineString() {
Object[] objects = new Object[]{url, orderId, channel, currentTime, elapsedTimeMillis, connectTimeoutMillis, readTimeoutMillis, hasDnsError, hasConnectTimeout, hasReadTimeout, hasUnknownException}; StringBuffer sb = new StringBuffer(); for (Object obj : objects) { sb.append(obj).append(SEPARATOR); } try { return sb.toString(); } catch (Exception ex) { return null; } }

public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getChannel() { return channel; }
public void setChannel(String channel) { this.channel = channel; }
public String getCurrentTime() { return currentTime; }
public void setCurrentTime(String currentTime) { this.currentTime = currentTime; }
public Long getElapsedTimeMillis() { return elapsedTimeMillis; }
public void setElapsedTimeMillis(Long elapsedTimeMillis) { this.elapsedTimeMillis = elapsedTimeMillis; }
public Integer getConnectTimeoutMillis() { return connectTimeoutMillis; }
public void setConnectTimeoutMillis(Integer connectTimeoutMillis) { this.connectTimeoutMillis = connectTimeoutMillis; }
public Integer getReadTimeoutMillis() { return readTimeoutMillis; }
public void setReadTimeoutMillis(Integer readTimeoutMillis) { this.readTimeoutMillis = readTimeoutMillis; }
public Integer getHasDnsError() { return hasDnsError; }
public void setHasDnsError(Integer hasDnsError) { this.hasDnsError = hasDnsError; }
public Integer getHasConnectTimeout() { return hasConnectTimeout; }
public void setHasConnectTimeout(Integer hasConnectTimeout) { this.hasConnectTimeout = hasConnectTimeout; }
public Integer getHasReadTimeout() { return hasReadTimeout; }
public void setHasReadTimeout(Integer hasReadTimeout) { this.hasReadTimeout = hasReadTimeout; }
public Integer getHasUnknownException() { return hasUnknownException; }
public void setHasUnknownException(Integer hasUnknownException) { this.hasUnknownException = hasUnknownException; }}
复制代码

监听 MQ,接收失败报告 MsgListener:

@Componentpublic class MsgListener {    private static final Logger logger = LogManager.getLogger(MsgListener.class);
@Autowired private QualityService channelQualityService;
@RabbitListener(queues = MqConstant.QUEUE_HTTP_FAIL, priority = "10") public void onHttpFail(final String content) { logger.debug("接收到上报的通讯异常日志:" + content);
List<String> reportList; try { reportList = JacksonHelper.parseJson(content, ArrayList.class); } catch (DataParseException e) { logger.error("解析通讯异常日志失败:" + content, e); return; }
for (String jsonReport : reportList) { try { ReportInfo reportInfo = JacksonHelper.parseJson(jsonReport, ReportInfo.class); channelQualityService.onHttpFail(reportInfo); } catch (Exception e) { logger.error("处理上通讯异常日志失败", e); } } }}
复制代码

处理通道降级的服务类 ChannelQualityService:

关键点是 redis 执行 lua 脚本,实现通道降级,算法参考的是 spring-cloud 熔断器的思路:


/** * 支付通道 质量服务(QoS) 类。 * 处理通道降级。 * <p> * 统计各通道的失败次数,根据频率(某个时间段)、失败类型(UnknownHostException, ConnectTimeoutException,SocketTimeoutException,Exception)等, * 确定是否需要触发通道降级。 * * @author machunlin * @date 2018/4/28 */@Servicepublic class ChannelQualityServiceImpl implements QualityService {
private static final Logger logger = LogManager.getLogger(ChannelQualityServiceImpl.class);
@Autowired private RedisTemplate<String, String> redisTemplate;
/** * 是否开启通道降级功能。 */ @Value("${channel-downgraded.enabled}") private Boolean enabled; /** * 统计时长的滚动窗口,默认一分钟。 * 即:统计一分钟之内所有错误请求的总次数。 */ @Value("${channel-downgraded.rollingstats-window-seconds}") private Integer rollingstatsWindowSeconds;
/** * 错误次数。 * 超过此阀值则触发降级。 */ @Value("${channel-downgraded.error-threshold}") private Integer errorThreshold;
/** * 触发短路的时长。 * 即:在指定时长内,保持“已降级”状态。 */ @Value("${channel-downgraded.sleep-window-seconds}") private Integer sleepWindowSeconds;
private static DefaultRedisScript<String> errorCountScript = new DefaultRedisScript<>(); private static final StringBuilder luaIncrExpire = new StringBuilder(500);
private static DefaultRedisScript<Long> channelDowngradedScript = new DefaultRedisScript<>(); private static final StringBuilder luaSAddExpire = new StringBuilder(200);
static { /** * 错误次数统计: * KEYS[1] = channel * KEYS[2] = 统计时长的滚动窗口, 默认为1分钟 * KEYS[3] = 错误次数(阀值),默认4次 */ luaIncrExpire.append(" local errorCount"). append(" errorCount = redis.call('INCR', KEYS[1])"). append(" errorCount = tonumber(errorCount)"). append(" if errorCount == 1 then"). //如果是第一次报错,将设置key值的有效期 append(" redis.call('EXPIRE',KEYS[1], KEYS[2])"). append(" return 'done: errorCount == '..errorCount"). append(" elseif(errorCount <= tonumber(KEYS[3])) then"). append(" return 'done: errorCount is '..errorCount"). append(" else"). append(" return 'downgraded on trigger: errorCount is '..errorCount"). append(" end"); errorCountScript.setScriptText(luaIncrExpire.toString()); errorCountScript.setResultType(String.class);
/** * 降级通道新增channel值。 * (使用SET集合,value重复的会被忽略): * KEYS[1] = key * KEYS[2] = value * KEYS[3] = ttl seconds * */ luaSAddExpire.append(" local result"). append(" result = redis.call('SADD', KEYS[1],KEYS[2])"). append(" if(result>0) then"). // 返回0表示value重复,不作处理 append(" redis.call('EXPIRE',KEYS[1],KEYS[3])"). append(" end"). append(" return result"); channelDowngradedScript.setScriptText(luaSAddExpire.toString()); channelDowngradedScript.setResultType(Long.class); }

/** * 失败处理 * * @param reportInfo */ @Override public void onHttpFail(final ReportInfo reportInfo) { if (!enabled) { logger.warn("通道质量服务(QoS)未开启"); return; }
handleDowngraded(reportInfo); }
/** * 获取全部已降级通道 * * @return */ @Override public Set<String> getDowngraded() { Set<String> channelsDowngraded = redisTemplate.opsForSet().members(Constant.CACHE_KEY_CHANNEL_DOWN); return channelsDowngraded; }
/** * 处理通道降级。 * * @param reportInfo */ private void handleDowngraded(final ReportInfo reportInfo) { List<String> keysCount = new ArrayList<>(); keysCount.add(Constant.CACHE_KEY_PREFIX_CHANNEL_ERR_COUNT + reportInfo.getChannel()); keysCount.add(rollingstatsWindowSeconds + ""); // 统计时长的滚动窗口 keysCount.add(errorThreshold + ""); // 错误阀值 String resp = redisTemplate.execute(errorCountScript, keysCount);
logger.debug("失败请求已提交 : " + resp);
if (resp.startsWith("downgraded on trigger")) { /** * 目前“被降级的通道”是直接放到redis中,集群中的其他PayService向redis查询。 * 后期可以优化: * 1、已降级的通道,放到本机内存中,然后广播通知集群内的其他服务器更新; * 2、通道路由时,直接取本机内存中的数据。 * 3、本机开启一个后台线程,监听"通道降级"的广播,并更新内存数据。 */ List<String> keysChannelDown = new ArrayList<>(); keysChannelDown.add(Constant.CACHE_KEY_CHANNEL_DOWN); // key keysChannelDown.add(reportInfo.getChannel()); // value keysChannelDown.add(sleepWindowSeconds + ""); // 有效期 Long result = redisTemplate.execute(channelDowngradedScript, keysChannelDown);
if (result > 0) { logger.debug("{}通道被降级,降级时长为{}秒", reportInfo.getChannel(), sleepWindowSeconds); } } }

}
复制代码


发布于: 1 小时前阅读数: 6
用户头像

try catch

关注

还未添加个人签名 2012.07.23 加入

还未添加个人简介

评论

发布
暂无评论
支付通道接口异常统计上报