支付通道接口异常统计上报
发布于: 1 小时前
支付中心对接第三方通道时,会遇到第三方接口不稳定导致无法支付的问题,这就需要有个失败统计功能,可以根据预定的阈值自动切换支付通道。
接口调用失败的上报类 ReporterUtils :
import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;
/** * 日志上报类。 * 调用第三方通道的接口异常时,上报。 * <p> * 维护一个内存队列msgQueue,日志先放入队列,然后由后台线程池推送给MQ。 */@Componentpublic class ReporterUtils {    private static final Logger log = LogManager.getLogger(ReporterUtils.class);
    @Autowired    private ReportConfig config;
    @Autowired    private MsgProducer msgProducer;
    private static LinkedBlockingQueue<ReportInfo> msgQueue;    private static ExecutorService executorService;
    @PostConstruct    private void init() {        log.debug("-----------------   reporter init ------");
        msgQueue = new LinkedBlockingQueue<ReportInfo>(config.getQueueMaxSize());
        // 添加处理线程        executorService = new ThreadPoolExecutor(config.getWorkerNum(), config.getWorkerNum(), Constant.EXECUTOR_KEEP_ALIVE_TIME, TimeUnit.SECONDS,                new LinkedBlockingQueue<Runnable>(Constant.EXECUTOR_WORK_QUEUE_SIZE), new ThreadFactory() {            @Override            public Thread newThread(Runnable r) {                Thread t = Executors.defaultThreadFactory().newThread(r);                t.setDaemon(true);                return t;            }        }, new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < config.getWorkerNum(); i++) {            executorService.submit(new Worker());        }    }
    private ReporterUtils() {    }
    /**     * 上报线程。     */    class Worker implements Runnable {        /**         * 持续扫描内存队列中的日志并上报给MQ。         */        @Override        public void run() {            try {
                while (true) {                    List<String> batchReportInfos = new ArrayList(config.getBatchSize());
                    ReportInfo reportInfo;                    for (int j = 0; j < config.getBatchSize(); j++) {                        /*获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作*/                        reportInfo = msgQueue.take();                        batchReportInfos.add(reportInfo.toString());                    }
                    String jsonBatchReportInfos = JacksonHelper.toJsonString(batchReportInfos);                    log.debug("批量上报通讯异常日志:" + jsonBatchReportInfos);                    //失败报告通过MQ发送给MsgListener类处理(也可以改为接口调用方式)                    msgProducer.send(Topic.HTTP_FAIL, jsonBatchReportInfos);                    sleep(10_000L);                }            } catch (Exception ex) {                log.error("report fail. ", ex);            }        }    }
    /**     * 调用第三方接口失败后,调用此方法,报告失败。     *     * @param reportInfo     * @return     */    public boolean httpFail(ReportInfo reportInfo) {        if (null != reportInfo) {            return msgQueue.offer(reportInfo);        }        return false;    }
    public static String getCurrentTimestampS() {        //精确到秒        return TimeUtil.getCurrentDateTime();    }
    public static long getCurrentTimestampMs() {        //精确到毫秒        return System.currentTimeMillis();    }
    private void sleep(Long millis) {        try {            Thread.sleep(millis);        } catch (InterruptedException e) {            //catched        }    }
}
复制代码
 报告失败信息 ReportInfo 类:
public class ReportInfo {
    private static final String SEPARATOR = ",";
    private String url; //请求地址    private String orderId;  // 交易的标识,比如订单号    private String channel; //支付通道
    private String currentTime;   // 上报时的时间戳,单位秒    private Long elapsedTimeMillis; // 实际请求耗时,单位毫秒
    private Integer connectTimeoutMillis;  // 请求时设置的连接超时时间,单位毫秒    private Integer readTimeoutMillis;  // 请求时设置的服务端返回超时时间,单位毫秒
    private Integer hasDnsError;  //  是否DNS解析错误,或域名(IP)不存在    private Integer hasConnectTimeout; // 是否连接超时    private Integer hasReadTimeout; // 是否服务端返回超时
    private Integer hasUnknownException; //其他未知的错误
    public ReportInfo() {    }
    public ReportInfo(String url, String orderId, String channel,                      String currentTime, Long elapsedTimeMillis,                      Integer connectTimeoutMillis, Integer readTimeoutMillis,                      Boolean hasDnsError, Boolean hasConnectTimeout, Boolean hasReadTimeout,                      Boolean hasUnknownException) {        this.orderId = orderId;        this.channel = channel;        this.currentTime = currentTime;        this.elapsedTimeMillis = elapsedTimeMillis;        this.url = url;        this.connectTimeoutMillis = connectTimeoutMillis;        this.readTimeoutMillis = readTimeoutMillis;        this.hasDnsError = hasDnsError ? 1 : 0;        this.hasConnectTimeout = hasConnectTimeout ? 1 : 0;        this.hasReadTimeout = hasReadTimeout ? 1 : 0;
        this.hasUnknownException = hasUnknownException ? 1 : 0;    }
    @Override    public String toString() {        return JacksonHelper.toJsonString(this);    }
    /**     * 转换成 csv 格式。     * 每条记录转换为一行,都好分割。     *     * @return     */    public String toLineString() {
        Object[] objects = new Object[]{url, orderId, channel,                currentTime, elapsedTimeMillis,                connectTimeoutMillis, readTimeoutMillis,                hasDnsError, hasConnectTimeout, hasReadTimeout, hasUnknownException};        StringBuffer sb = new StringBuffer();        for (Object obj : objects) {            sb.append(obj).append(SEPARATOR);        }        try {            return sb.toString();        } catch (Exception ex) {            return null;        }    }
    public String getUrl() {        return url;    }
    public void setUrl(String url) {        this.url = url;    }
    public String getOrderId() {        return orderId;    }
    public void setOrderId(String orderId) {        this.orderId = orderId;    }
    public String getChannel() {        return channel;    }
    public void setChannel(String channel) {        this.channel = channel;    }
    public String getCurrentTime() {        return currentTime;    }
    public void setCurrentTime(String currentTime) {        this.currentTime = currentTime;    }
    public Long getElapsedTimeMillis() {        return elapsedTimeMillis;    }
    public void setElapsedTimeMillis(Long elapsedTimeMillis) {        this.elapsedTimeMillis = elapsedTimeMillis;    }
    public Integer getConnectTimeoutMillis() {        return connectTimeoutMillis;    }
    public void setConnectTimeoutMillis(Integer connectTimeoutMillis) {        this.connectTimeoutMillis = connectTimeoutMillis;    }
    public Integer getReadTimeoutMillis() {        return readTimeoutMillis;    }
    public void setReadTimeoutMillis(Integer readTimeoutMillis) {        this.readTimeoutMillis = readTimeoutMillis;    }
    public Integer getHasDnsError() {        return hasDnsError;    }
    public void setHasDnsError(Integer hasDnsError) {        this.hasDnsError = hasDnsError;    }
    public Integer getHasConnectTimeout() {        return hasConnectTimeout;    }
    public void setHasConnectTimeout(Integer hasConnectTimeout) {        this.hasConnectTimeout = hasConnectTimeout;    }
    public Integer getHasReadTimeout() {        return hasReadTimeout;    }
    public void setHasReadTimeout(Integer hasReadTimeout) {        this.hasReadTimeout = hasReadTimeout;    }
    public Integer getHasUnknownException() {        return hasUnknownException;    }
    public void setHasUnknownException(Integer hasUnknownException) {        this.hasUnknownException = hasUnknownException;    }}
复制代码
 监听 MQ,接收失败报告 MsgListener:
@Componentpublic class MsgListener {    private static final Logger logger = LogManager.getLogger(MsgListener.class);
    @Autowired    private QualityService channelQualityService;
    @RabbitListener(queues = MqConstant.QUEUE_HTTP_FAIL, priority = "10")    public void onHttpFail(final String content) {        logger.debug("接收到上报的通讯异常日志:" + content);
        List<String> reportList;        try {            reportList = JacksonHelper.parseJson(content, ArrayList.class);        } catch (DataParseException e) {            logger.error("解析通讯异常日志失败:" + content, e);            return;        }
        for (String jsonReport : reportList) {            try {                ReportInfo reportInfo = JacksonHelper.parseJson(jsonReport, ReportInfo.class);                channelQualityService.onHttpFail(reportInfo);            } catch (Exception e) {                logger.error("处理上通讯异常日志失败", e);            }        }    }}
复制代码
 处理通道降级的服务类 ChannelQualityService:
关键点是 redis 执行 lua 脚本,实现通道降级,算法参考的是 spring-cloud 熔断器的思路:
/** * 支付通道 质量服务(QoS) 类。 * 处理通道降级。 * <p> * 统计各通道的失败次数,根据频率(某个时间段)、失败类型(UnknownHostException, ConnectTimeoutException,SocketTimeoutException,Exception)等, * 确定是否需要触发通道降级。 * * @author machunlin * @date 2018/4/28 */@Servicepublic class ChannelQualityServiceImpl implements QualityService {
    private static final Logger logger = LogManager.getLogger(ChannelQualityServiceImpl.class);
    @Autowired    private RedisTemplate<String, String> redisTemplate;
    /**     * 是否开启通道降级功能。     */    @Value("${channel-downgraded.enabled}")    private Boolean enabled;    /**     * 统计时长的滚动窗口,默认一分钟。     * 即:统计一分钟之内所有错误请求的总次数。     */    @Value("${channel-downgraded.rollingstats-window-seconds}")    private Integer rollingstatsWindowSeconds;
    /**     * 错误次数。     * 超过此阀值则触发降级。     */    @Value("${channel-downgraded.error-threshold}")    private Integer errorThreshold;
    /**     * 触发短路的时长。     * 即:在指定时长内,保持“已降级”状态。     */    @Value("${channel-downgraded.sleep-window-seconds}")    private Integer sleepWindowSeconds;
    private static DefaultRedisScript<String> errorCountScript = new DefaultRedisScript<>();    private static final StringBuilder luaIncrExpire = new StringBuilder(500);
    private static DefaultRedisScript<Long> channelDowngradedScript = new DefaultRedisScript<>();    private static final StringBuilder luaSAddExpire = new StringBuilder(200);
    static {        /**         * 错误次数统计:         * KEYS[1] = channel         * KEYS[2] = 统计时长的滚动窗口, 默认为1分钟         * KEYS[3] = 错误次数(阀值),默认4次         */        luaIncrExpire.append("     local errorCount").                append(" errorCount = redis.call('INCR', KEYS[1])").                append(" errorCount = tonumber(errorCount)").                append(" if errorCount == 1 then").               //如果是第一次报错,将设置key值的有效期                append("    redis.call('EXPIRE',KEYS[1], KEYS[2])").                append("    return 'done: errorCount == '..errorCount").                append(" elseif(errorCount <= tonumber(KEYS[3])) then").                append("    return 'done: errorCount is '..errorCount").                append(" else").                append("    return 'downgraded on trigger: errorCount is '..errorCount").                append(" end");        errorCountScript.setScriptText(luaIncrExpire.toString());        errorCountScript.setResultType(String.class);
        /**         * 降级通道新增channel值。         * (使用SET集合,value重复的会被忽略):         * KEYS[1] = key         * KEYS[2] = value         * KEYS[3] = ttl seconds         *         */        luaSAddExpire.append("  local result").                append(" result = redis.call('SADD', KEYS[1],KEYS[2])").                append(" if(result>0) then").                // 返回0表示value重复,不作处理                append("    redis.call('EXPIRE',KEYS[1],KEYS[3])").                append(" end").                append(" return result");        channelDowngradedScript.setScriptText(luaSAddExpire.toString());        channelDowngradedScript.setResultType(Long.class);    }
    /**     * 失败处理     *     * @param reportInfo     */    @Override    public void onHttpFail(final ReportInfo reportInfo) {        if (!enabled) {            logger.warn("通道质量服务(QoS)未开启");            return;        }
        handleDowngraded(reportInfo);    }
    /**     * 获取全部已降级通道     *     * @return     */    @Override    public Set<String> getDowngraded() {        Set<String> channelsDowngraded = redisTemplate.opsForSet().members(Constant.CACHE_KEY_CHANNEL_DOWN);        return channelsDowngraded;    }
    /**     * 处理通道降级。     *     * @param reportInfo     */    private void handleDowngraded(final ReportInfo reportInfo) {        List<String> keysCount = new ArrayList<>();        keysCount.add(Constant.CACHE_KEY_PREFIX_CHANNEL_ERR_COUNT + reportInfo.getChannel());        keysCount.add(rollingstatsWindowSeconds + ""); // 统计时长的滚动窗口        keysCount.add(errorThreshold + ""); // 错误阀值        String resp = redisTemplate.execute(errorCountScript, keysCount);
        logger.debug("失败请求已提交 : " + resp);
        if (resp.startsWith("downgraded on trigger")) {            /**             * 目前“被降级的通道”是直接放到redis中,集群中的其他PayService向redis查询。             * 后期可以优化:             * 1、已降级的通道,放到本机内存中,然后广播通知集群内的其他服务器更新;             * 2、通道路由时,直接取本机内存中的数据。             * 3、本机开启一个后台线程,监听"通道降级"的广播,并更新内存数据。             */            List<String> keysChannelDown = new ArrayList<>();            keysChannelDown.add(Constant.CACHE_KEY_CHANNEL_DOWN); // key            keysChannelDown.add(reportInfo.getChannel()); // value            keysChannelDown.add(sleepWindowSeconds + ""); // 有效期            Long result = redisTemplate.execute(channelDowngradedScript, keysChannelDown);
            if (result > 0) {                logger.debug("{}通道被降级,降级时长为{}秒", reportInfo.getChannel(), sleepWindowSeconds);            }        }    }
}
复制代码
 划线
评论
复制
发布于: 1 小时前阅读数: 6
版权声明: 本文为 InfoQ 作者【try catch】的原创文章。
原文链接:【http://xie.infoq.cn/article/2e7175feac52ba6c9df7756fb】。文章转载请联系作者。

try catch
关注
还未添加个人签名 2012.07.23 加入
还未添加个人简介











 
    
评论