写点什么

Android 抖音爆红的口红挑战爬坑总结

用户头像
Android架构
关注
发布于: 2 小时前

资源文件下载

关于资源文件的下载,我是选择 okdownload。okdownload 是一个支持多线程,多任务,断点续传,可靠,灵活,高性能以及强大的下载引擎。详情可以去看 okdownload GitHub 地址


  • 依赖方式


implementation 'com.liulishuo.okdownload:okdownload:1.0.5'implementation 'com.liulishuo.okdownload:okhttp:1.0.5'


  • 简单实用示例


单文件下载


DownloadTask task = new DownloadTask.Builder(url, parentFile).setFilename(filename)// the minimal interval millisecond for callback progress.setMinIntervalMillisCallbackProcess(30)// do re-download even if the task has already been completed in the past..setPassIfAlreadyCompleted(false).build();


task.enqueue(listener);


// canceltask.cancel();


// execute task synchronizedtask.execute(listener);


多文件下载


final DownloadTask[] tasks = new DownloadTask[2];tasks[0] = new DownloadTask.Builder("url1", "path", "filename1").build();tasks[1] = new DownloadTask.Builder("url2", "path", "filename1").build();DownloadTask.enqueue(tasks, listener);


  • 结合 Rxjava 实现文件下载


public class DownLoadUtils {


/**


  • 从中控下载文件到本地

  • @param url

  • @param parentPath 保存到本地文件的父文件路径

  • @param downloadFileName 保存到本地的文件名

  • @return*/public static Observable<File> getDownLoadFile(String url,String parentPath,String downloadFileName){// 下载本地没有的文件 MainActivity.appendAndScrollLog(String.format("开始下载资源文件 %s\n", url));final DownloadTask task = new DownloadTask.Builder(url, parentPath, downloadFileName).build();return Observable.create(new ObservableOnSubscribe<File>() {@Overridepublic void subscribe(final ObservableEmitter<File> emitter) throws Exception {task.enqueue(new DownloadListener2() {@Overridepublic void taskStart(DownloadTask task) {


}


@Overridepublic void taskEnd(DownloadTask task, EndCause cause, Exception realCause) {if (cause != EndCause.COMPLETED) {MainActivity.appendAndScrollLog(String.format("资源文件下载失败 %s %s\n", cause.toString(), task.getUrl()));emitter.onNext(new File(""));emitter.onComplete();return;}File file = task.getFile();MainActivity.appendAndScrollLog(String.format("资源文件下载完成 %s\n", file.getAbsolutePath()));emitter.onNext(file);emitter.onComplete();}});}}).retry();}


/**


  • 从中控下载文件到本地

  • @param url

  • @param saveFile 保存到本地的文件

  • @return*/public static Observable<File> getDownLoadFile(String url, File saveFile){return getDownLoadFile(url,saveFile.getParentFile().getAbsolutePath(),saveFile.getName());}}

屏蔽下拉菜单和底部导航栏

像娃娃机和格子机这些设备都是在线下直接面向用户的,因此我们不能将我们的 Android 设备全部都展现给我们的用户,我们需要对用户的行为做些限制,例如禁止用户通过导航栏或者下拉菜单退出当前程序,防止他们做出一些危险的操作。我的解决方案是把当前的 rocket 程序设置为默认启动和桌面应用程序,并将 Android 设备中自带的 launcher 程序 和 systemui 程序给禁用掉,那么设备一开始启动的时候就会启动我们的 rocket 应用,并成功的禁止了用户使用导航栏和下拉菜单来做非法的操作。


  • 查找 Android 设备中自带的 launcher 程序 和 systemui 程序的对应包名

  • 我们使用 adb shell pm list packages 就可以找出设备中已经安装的程序列表,主要是以包名显示的。

  • 查找 launcher 程序的包名,找出包名为:com.android.launcher3


LW-PC0920@lw1002022 MINGW64 ~/Desktop$ adb shell pm list packages | grep launcherpackage:com.android.launcher3


  • 查找 systemui 程序的包名:找出包名为:com.android.systemui


LW-PC0920@lw1002022 MINGW64 ~/Desktop$ adb shell pm list packages | grep systemuipackage:com.android.systemui


  • 禁止 Android 设备中自带的 launcher 程序 和 systemui 程序的使用

  • 禁止 launcher 程序的使用


adb shell pm disable com.android.launcher3


  • 禁止 systemui 程序的使用


adb shell pm disable com.android.systemui


  • 代码实现禁止 Android 设备中自带的 launcher 程序 和 systemui 程序的使用


public static void enableLauncher(Boolean enabled) {List<PackageInfo> piList = MainActivity.instance.packageManager.getInstalledPackages(0);ArrayList<String> packages = new ArrayList();for (PackageInfo pi : piList) {String name = pi.packageName;if (name.contains("systemui") || name.contains("launcher")) {packages.add(name);}}for (String packageName : packages) {su(String.format("pm %s %s\n", enabled ? "enable" : "disable", packageName));}}


/**


  • 执行 adb 指令


*/public static int su(String cmd) {try {Process p = Runtime.getRuntime().exec("su");DataOutputStream os = new DataOutputStream(p.getOutputStream());os.writeBytes(cmd);os.writeBytes("exit\n");os.flush();os.close();return p.waitFor();} catch (Exception ex) {return -1;}}

Iot 的实现

关于 IoT 的实现,我们这边使用的是阿里的《微消息队列 for IoT》服务,关于《微消息队列 for IoT》服务,阿里的解释如下:


微消息队列 for IoT 是消息队列(MQ)的子产品。针对用户在移动互联网以及物联网领域的存在的特殊消息传输需求,消息队列(MQ) 通过推出微消息队列 for IoT 开放了对 MQTT 协议的完整支持


  • MQTT 协议?

  • MQTT 的全称是:Message Queuing Telemetry Transport( 消息队列遥测传输),是一种轻量的,基于发布订阅模型的即时通讯协议。该协议设计开放,协议简单,平台支持丰富,几乎可以把所有联网物品和外部连接起来,因此在移动互联网和物联网领域拥有众多优势。

  • MQTT 的特点

  • 使用发布/订阅(Pub/Sub)消息模式,提供一对多的消息分发,解除了应用程序之间的耦合;

  • 对负载内容屏蔽的消息传输;

  • 使用 TCP/IP 提供基础的网络连接;

  • 有三种级别的消息传递服务;

  • 小型传输,开销很小(头部长度固定为 2 字节),协议交换最小化,以降低网络流量。

  • 关键名词的解释


Android 中实现 iot

关于显示 iot 连接的实现过程是这样的:首先我们将设备的三元组从管理后台中批量生成,文件名的格式为 deviceName.json(例如:00001.json),里面是关于每个设备的三元组信息;接着我们将装有三元组文件的 U 盘插入到 Android 设备中(娃娃机或者口红挑战);rocket 程序会自动监测到 U 盘的插入并将文件剪切到 Android 设备的制定目录下;再接着 Android 设备可以去读取指定文件中三元组信息;最后使用此三元组进行连接 mqtt。


  • 添加依赖


implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'


  • 关于三元组

  • 在 Android 设备中需要关心的三个东西,mqtt 协议中用来识别一个设备的必要三要素,如果存在相同的三元组,那么必然出错,导致 mqtt 频繁断开重连。三元组这个主要是在阿里的管理后台生成的,Android 设备这端只需要拿来用就可以了。



  • 关于订阅的 topic

  • 关于 topic 是在阿里云的后台管理中进行设置的,我们的收发消息都是通过这些 topic 来进行的。

  • 代码实现 iot 连接

  • 剪切三元组配置文件


/**


  • 剪切配置文件(三元组)

  • @param packageName*/public static void moveConfig(String packageName) {File usbConfigDir = new File(UsbStorage.usbPath, Config.wejoyConfigDirInUsb);File extProjectDir = new File(Environment.getExternalStorageDirectory(), Config.resourceDirName);File extConfigFile = new File(extProjectDir, Config.wejoyConfigFileInSdcard);if (!usbConfigDir.exists() || extConfigFile.exists()) {return;}extProjectDir.mkdirs();File[] configFiles = usbConfigDir.listFiles();if (configFiles.length > 0) {Arrays.sort(configFiles);moveFile(configFiles[0], extConfigFile);}}


public static void moveFile(File src, File dst) {su(String.format("mv -f %s %s\n", src.getAbsolutePath(), dst.getAbsolutePath()));}


  • 读取指定路径的配置文件信息(三元组)


public static File configFile = new File(new File(Environment.getExternalStorageDirectory(), "WejoyRes"), "Config.json");


static void read() throws IOException {if (configFile.exists()) {RandomAccessFile in = new RandomAccessFile(configFile, "r");byte[] buf = new byte[(int) configFile.length()];in.read(buf);in.close();instance = JsonIterator.deserialize(new String(buf, "utf-8"), Config.class);} else {instance = new Config();}mqttRequestTopic = String.format("/sys/%s/%s/rrpc/request/", instance.productKey, instance.deviceName);mqttResponseTopic = String.format("/sys/%s/%s/rrpc/response/", instance.productKey, instance.deviceName);mqttPublishTopic = String.format("/%s/%s/update", instance.productKey, instance.deviceName);}


  • 连接 mqtt


static void init() {instance = new IoT();DeviceInfo deviceInfo = new DeviceInfo();deviceInfo.productKey = Config.instance.productKey;deviceInfo.deviceName = Config.instance.deviceName;deviceInfo.deviceSecret = Config.instance.deviceSecret;final LinkKitInitParams params = new LinkKitInitParams();params.deviceInfo = deviceInfo;params.connectConfig = new IoTApiClientConfig();LinkKit.getInstance().registerOnPushListener(instance);initDisposable = Observable.interval(0, Config.instance.mqttConnectIntervalSeconds, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).map(new Function<Long, Boolean>() {@Overridepublic Boolean apply(Long aLong) throws Exception {if (!initialized) {LinkKit.getInstance().init(MainActivity.instance, params, instance);}return initialized;}}).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {if (aBoolean) {initDisposable.dispose();}}});}


  • 发送消息: 发送消息的时候,我们需要指定 topic,否则服务器无法接收到我们的消息。


static void publish(String json) {Log.e(TAG, "publish: "+json );MqttPublishRequest res = new MqttPublishRequest();res.isRPC = false;res.topic = Config.mqttPublishTopic;res.payloadObj = json;LinkKit.getInstance().publish(res, new IConnectSendListener() {@Overridepublic void onResponse(ARequest aRequest, AResponse aResponse) {}


@Overridepublic void onFailure(ARequest aRequest, AError aError) {}});}


  • 接收消息: 接收消息的时候,我们也需要判断是来自哪个 topic 中的,除了我们指定的 topic,其他的 topic 我们都不做处理;当我们接收到服务器中发送来的消息的时候,我们是先判断消息的类型,然后根据相对应的类型做出不同的反应。例如我们收到后台请求给娃娃机的上分的指令,那么我们就向设备中的硬件模块发送上分的指令,并等待设备反应并给后台发送一条响应信息。这条响应的消息是需要在指定的时间内完成,否则认为超时。


@Overridepublic void onNotify(String s, final String topic, final AMessage aMessage) {if (!topic.startsWith(Config.mqttRequestTopic)) {return;}Observable.create(new ObservableOnSubscribe<MqttMessage>() {@Overridepublic void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception {MqttMessage msg = JsonIterator.deserialize(new String((byte[]) aMessage.data, "utf-8"), MqttMessage.class);if (msg == null) {return;}emitter.onNext(msg);emitter.onComplete();}}).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).flatMap(new Function<MqttMessage, ObservableSource<MqttMessage>>() {@Overridepublic ObservableSource<MqttMessage> apply(MqttMessage msg) throws Exception {Log.e(TAG, "收到消息 key:"+msg.key+" msg:"+msg.body.m);switch (msg.key) {case "h": {//SetHeartBeatDownstream setHeartBeatDownstream = msg.body.m.as(SetHeartBeatDownstream.class);// 和设备进行通信,并等待设备的响应 return Device.setHeartBeat(setHeartBeatDownstream);}case "b": {//AddCoinsDownstream addCoinsDownstream = msg.body.m.as(AddCoinsDownstream.class);// 和设备进行通信,并等待设备的响应 return Device.addCoins(addCoinsDownstream);}case "g": {//// 和设备进行通信,并等待设备的响应 return Device.getParam();}case "s": {//SetParamDownstream setParamDownstream = msg.body.m.as(SetParamDownstream.class);// 和设备进行通信,并等待设备的响应 return Device.setParam(setParamDownstream);}}return Observable.never();}}).observeOn(Scheduler


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


s.io()).map(new Function<MqttMessage, Boolean>() {@Overridepublic Boolean apply(MqttMessage msg) throws Exception {MqttPublishRequest res = new MqttPublishRequest();res.isRPC = false;res.topic = topic.replace("request", "response");//res.msgId = topic.split("/")[6];res.payloadObj = JsonStream.serialize(msg);LinkKit.getInstance().publish(res, new IConnectSendListener() {@Overridepublic void onResponse(ARequest aRequest, AResponse aResponse) {}


@Overridepublic void onFailure(ARequest aRequest, AError aError) {}});return true;}}).subscribe();}

Android 和硬件通信

在娃娃机和口红挑战的这两个设备中,我们都需要和设备进行通信,例如:娃娃机投币、娃娃机出礼反馈、按下选中口红的格子等等这些都是需要和硬件模块进行通信的。在关于串口通信的框架选择方面,我们主要是选择 Google 的 android-serialport-api 来实现。项目原地址


  • 依赖方式


  1. 在根 build.gradle 中添加


allprojects {repositories {...maven { url 'https://jitpack.io' }}}


  1. 子 module 添加依赖


dependencies {implementation 'com.github.licheedev.Android-SerialPort-API:serialport:1.0.1'}


  • 修改 su 路径


// su 默认路径为 "/system/bin/su"// 可通过此方法修改 SerialPort.setSuPath("/system/xbin/su");


  • 连接方式


连接串口的时候需要指定串口号以及波特率,之后定时处理机器发送的指令。


static void init() throws IOException {SerialPort.setSuPath("/system/xbin/su");// 设置串口号及波特率 serialPort = new SerialPort(Config.serialPort, Config.baudrate);// 接收指令流 inputStream = serialPort.getInputStream();// 发送指令流 outputStream = serialPort.getOutputStream();// 每隔 100ms 处理机器信息 Observable.interval(100, TimeUnit.MILLISECONDS).observeOn(serialScheduler).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {// 处理机器发送的指令 handleRecv();}});}


  • 向机器发送指令


向机器发送指令的时候是结合 Rxjava 来实现的。除此之外,向机器发送指令是需要有规定格式的(内部制定的通信协议),我们发送及接收数据都是一个字节数组,因此我们格式是需要严格按照我们制定的协议进行的,如下是娃娃机投币的简单示例:


static ObservableSource<MqttMessage> addCoins(final AddCoinsDownstream msg) {return Observable.create(new ObservableOnSubscribe<MqttMessage>() {@Overridepublic void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception {currentUser = msg.u;currentHeadUrl = msg.h;currentNickname = msg.nk;byte[] buf = new byte[]{0x11, addCoinsCmd, msg.num, msg.c, 0, 0x00, 0x00};byte[] ret = sign(buf);try {outputStream.write(ret);} catch (IOException e) {e.printStackTrace();}penddingCmd = addCoinsCmd;penddingEmitter = emitter;}}).subscribeOn(serialScheduler);}


  • 接收机器指令


关于接受机器消息这一块是每隔 100ms 进行的,在处理机器指令的时候,首先需要过滤到无效的字节,之后再按照我们制定的协议来处理消息,判断是娃娃机上分,还是游戏结果等信息,最后并对机器的数据返回进行 CRC16 校验。


static void handleRecv() {try {for (; ; ) {int len = inputStream.available();if (len <= 0) {break;}len = inputStream.read(buf, bufReadOffset, buf.length - bufReadOffset);//Log.d("serialPort", String.format("read: %s", byteToHex(buf, bufReadOffset, len)));bufReadOffset += len;for (; ; ) {if (bufParseEnd == -1) {for (; bufParseStart < bufReadOffset; bufParseStart++) {if (buf[bufParseStart] == (byte) 0xAA) {bufParseEnd = bufParseStart + 1;break;}}}if (bufParseEnd != -1) {for (; bufParseEnd < bufReadOffset; bufParseEnd++) {if (buf[bufParseEnd] == (byte) 0xAA) {bufParseStart = bufParseEnd;bufParseEnd += 1;continue;}if (buf[bufParseEnd] == (byte) 0xDD) {if (bufParseEnd - bufParseStart >= 5) {bufParseEnd += 1;byte size = buf[bufParseStart + 1];byte index = buf[bufParseStart + 2];byte cmd = buf[bufParseStart + 3];byte check = (byte) (size ^ index ^ cmd);for (int i = bufParseStart + 4; i < bufParseEnd - 2; i++) {check ^= buf[i];}if (check == buf[bufParseEnd - 2]) {//Log.d("serialPort", String.format("protocol: %s, size: %d, index: %d, cmd: %d, check: %d, data: %s", byteToHex(buf, bufParseStart, bufParseEnd - bufParseStart), size, index, cmd, check, byteToHex(buf, bufParseStart + 4, size - 3)));switch (cmd) {// 心跳 case heartBeatCmd: {}break;


// 上分 case addCoinsCmd: {


}break;


// 游戏结果 case gameResultCmd: {boolean gift = buf[bufParseStart + 7] != 0;IoT.sendGameResult(gift);if (gift) {// 发送用户信息到中控,进行排行榜显示 WSSender.getInstance().sendUserInfo(currentUser, currentHeadUrl, currentNickname);}}break;default:break;}}}bufParseStart = bufParseEnd;bufParseEnd = -1;break;}}}if (bufParseStart >= bufReadOffset || bufParseEnd >= bufReadOffset) {break;}}if (bufReadOffset == buf.length) {System.arraycopy(buf, bufParseStart, buf, 0, bufReadOffset - bufParseStart);if (bufParseEnd != -1) {bufParseEnd -= bufParseStart;bufReadOffset = bufParseEnd;} else {bufReadOffset = 0;}bufParseStart = 0;}}} catch (IOException e) {e.printStackTrace();}}

websocket 通信

在中控和娃娃机进行通信的方式我们是选择 websocket 进行的。中控端是 server,然后娃娃机是 client。

server

  • Server 的实现:目前 server 的实现只是为了接收娃娃机的数据反馈,所以并没有什么复杂的操作。


class WSServer extends WebSocketServer {private MainActivity mainActivity;


public void setMainActivity(MainActivity mainActivity) {this.mainActivity = mainActivity;}


WSServer(InetSocketAddress address) {super(address);}


@Overridepublic void onOpen(WebSocket conn, ClientHandshake handshake) {mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已连接\n");}


@Overridepublic void onClose(WebSocket conn, int code, String reason, boolean remote) {mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已断开\n");}


@Overridepublic void onMessage(WebSocket conn, final String message) {Observable.create(new ObservableOnSubscribe<SocketMessage>() {@Overridepublic void subscribe(ObservableEmitter<SocketMessage> emitter) throws Exception {final SocketMessage socketMessage = JsonIterator.deserialize(message, SocketMessage.class);emitter.onNext(socketMessage);emitter.onComplete();}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<SocketMessage>() {@Overridepublic void accept(SocketMessage socketMessage) throws Exception {if (socketMessage.getCode() == SocketMessage.TYPE_USER) {// 夹到娃娃


} else if (socketMessage.getCode() == SocketMessage.TYPE_SAY_HELLO) {// 连接招呼语}}});


}


@Overridepublic void onError(WebSocket conn, Exception ex) {}


@Overridepublic void onStart() {


}}


  • 简单使用方式


appendAndScrollLog("初始化 WebSocket 服务...\n");WSServer wsServer = new WSServer(18104);wsServer.setMainActivity(MainActivity.this);wsServer.setConnectionLostTimeout(5);wsServer.setReuseAddr(true);wsServer.start();appendAndScrollLog("初始化 WebSocket 服务完成\n");

client

在 client 端,目前需要做的人物有断开重连以及数据发送的操作。断开重连的时候需要在新的子线程中进行,否则会报如下错误:


You cannot initialize a reconnect out of the websocket thread. Use reconnect in another thread to insure a successful cleanup


因此,我们每次断开重新的时候是需要在新的子线程中进行的。除此之外,在发送数据的时候,如果刚好 socket 没有连接上,那么发送数据是会报异常的,因此我们有数据要发送的时候如果 socket 没有连接,那么就先缓存到本地,等到 socket 连接上之后再把滞留的数据一次性发送出去。


  • 依赖配置


implementation 'org.java-websocket:Java-WebSocket:1.3.9'


  • WSClient.java


class WSClient extends WebSocketClient {


private static final String TAG = "WSClient";private static WSClient instance;private static URI sUri;private WSReceiver mWSReceiver;private Disposable mReconnectDisposable;private ConnectCallback mConnectCallback;


/**


  • step 1:需要先调用,设置 url

  • @param uri*/public static void setUri(URI uri){sUri = uri;}


/**


  • step 1:

  • 需要先调用,设置服务端的 url

  • @param ipAddress

  • @param port*/public static void setUri(String ipAddress,int port){try {sUri = new URI(String.format("ws://%s:%d", ipAddress, port));} catch (URISyntaxException e) {e.printStackTrace();}}


public static WSClient getInstance(){if (instance == null) {synchronized (WSClient.class){if (instance == null) {instance = new WSClient(sUri);}}}return instance;}


/**


  • step 2:连接 websocket*/public void onConnect(){setConnectionLostTimeout(Config.instance.webSocketTimeoutSeconds);setReuseAddr(true);connect();}


private WSClient(URI server) {super(server);// 初始化消息发送者 WSSender.getInstance().setWSClient(this);// 初始化消息接收者 mWSReceiver = new WSReceiver();mWSReceiver.setWSClient(this);mWSReceiver.setWSSender(WSSender.getInstance());


}


@Overridepublic void onOpen(ServerHandshake handshakedata) {Log.d(TAG, "onOpen: ");MainActivity.appendAndScrollLog("websocket 已连接\n");Observable.just("").subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {if (mConnectCallback != null) {mConnectCallback.onWebsocketConnected();}}});


// 清除滞留的所有消息 WSSender.getInstance().clearAllMessage();


}


@Overridepublic void onMessage(String message) {Log.d(TAG, "onMessage: ");mWSReceiver.handlerMessage(message);}


@Overridepublic void onClose(int code, String reason, boolean remote) {Log.d(TAG, "onClose: ");MainActivity.appendAndScrollLog(String.format("websocket 已断开,断开原因:%s\n",reason));Observable.just("").subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {if (mConnectCallback != null) {mConnectCallback.onWebsocketClosed();}}});onReconnect();}


@Overridepublic void onError(Exception ex) {if (ex != null) {Log.d(TAG, "onError: "+ex.getMessage());MainActivity.appendAndScrollLog(String.format("websocket 出现错误,错误原因:%s\n",ex.getMessage()));}onReconnect();}


public void onReconnect() {if (mReconnectDisposable != null&& !mReconnectDisposable.isDisposed()){return;}mReconnectDisposable = Observable.timer(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Log.d(TAG, "websocket reconnect");WSClient.this.reconnect();mReconnectDisposable.dispose();}});


}


public void setConnectCallback(ConnectCallback mConnectCallback) {this.mConnectCallback = mConnectCallback;}


public interface ConnectCallback{void onWebsocketConnected();void onWebsocketClosed();}}


  • WSSender.java


/**


  • Created by runla on 2018/10/26.

  • 文件描述:Websocket 的消息发送者*/


public class WSSender {private static final String TAG = "WSSender";public static final int MAX_MESSAGE_COUNT = 128;private static WSSender instance;private WSClient mWSClientManager;// 消息队列 private LinkedList<String> mMessageList = new LinkedList<>();


private WSSender() {}


public static WSSender getInstance() {if (instance == null) {synchronized (WSSender.class) {if (instance == null) {instance = new WSSender();}}}return instance;}


public void setWSClient(WSClient wsClientManager) {this.mWSClientManager = wsClientManager;}


/**


  • 发送所有滞留的消息*/public void clearAllMessage() {if (mWSClientManager == null) {return;}


while (mMessageList.size() > 0&& mMessageList.getFirst() != null) {Log.d(TAG, "sendMessage: " + mMessageList.size());mWSClientManager.send(mMessageList.getFirst());mMessageList.removeFirst();}}


/**


  • 发送消息,如果消息发送不出去,那么就等到连接成功后再次尝试发送

  • @param msg

  • @return*/public boolean sendMessage(String msg) {if (mWSClientManager == null) {throw new NullPointerException("websocket client is null");}if (TextUtils.isEmpty(msg)) {return false;}// 将需要发送的数据添加到队列的尾部 mMessageList.addLast(msg);


while (mMessageList.size() > 0&& mMessageList.getFirst() != null) {Log.d(TAG, "sendMessage: " + mMessageList.size());if (!mWSClientManager.isOpen()) {// 尝试重连 mWSClientManager.onReconnect();break;} else {mWSClientManager.send(mMessageList.getFirst());mMessageList.removeFirst();}}


// 如果消息队列中超过我们设置的最大容量,那么移除最先添加进去的消息 if (mMessageList.size() >= MAX_MESSAGE_COUNT) {mMessageList.removeFirst();}return false;}}


  • WSReceiver.java


/**


  • Created by runla on 2018/10/26.

  • 文件描述:Websocket 的消息接收者*/


public class WSReceiver {private WSClient mWSClientManager;private WSSender mWSSender;private OnMessageCallback onMessageCallback;public WSReceiver() {}


public void setWSClient(WSClient mWSClientManager) {this.mWSClientManager = mWSClientManager;}


public void setWSSender(WSSender mWSSender) {this.mWSSender = mWSSender;}


/**


  • 处理接收消息

  • @param message*/public void handlerMessage(String message){


if (onMessageCallback != null){onMessageCallback.onHandlerMessage(message);}}


public void setOnMessageCallback(OnMessageCallback onMessageCallback) {this.onMessageCallback = onMessageCallback;

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
Android 抖音爆红的口红挑战爬坑总结