Java 实现 RPC(源码),java 常见面试题
分布式系统的成员之一,解决服务之间的调用问题。远程调用时,RPC 实现了调用远程服务能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。
动态代理
序列化
网络传输
调用服务 API 接口
本次 RPC 搭建使用的接口代理模式、Java 默认 Serializable 序列化以及 BIO 网络传输方式
动态代理模块封装
package com.ruider.Invoker;
/**
动态代理工厂,对相应的 class 生产其对应代理类
*/
import com.ruider.common.ServiceInformation;
import com.ruider.networkCommunication.BIOService;
import com.ruider.networkCommunication.NetIO;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class InvokerUtils implements InvocationHandler {
/**
调用服务类
*/
private Class clazz;
/**
调用服务管理
*/
private ServiceInformation serviceInformation;
/**
获取代理对象
@param T
@param url
@param port
@return
*/
public Object getBean (Class T, String url, int port) {
this.clazz = T;
ServiceInformation serviceInformation = new ServiceInformation();
serviceInformation.setServiceURL(url);
serviceInformation.setPort(port);
serviceInformation.setClassName(T.getName());
this.serviceInformation = serviceInformation;
return Proxy.newProxyInstance(T.getClassLoader(), new Class[]{T}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
NetIO BIO = new BIOService();
this.serviceInformation.setMethod(method);
this.serviceInformation.setArgs(args);
return BIO.send(serviceInformation);
}
}
网络传输模块封装
package com.ruider.networkCommunication;
import com.ruider.common.ServiceInformation;
/**
BIO 服务
*/
public interface NetIO {
/**
客户端发送请求到其他服务端
@param serviceInformation
@return
*/
Object send (ServiceInformation serviceInformation);
/**
本机作为服务端接受请求并且处理请求
@param port
*/
void recv (int port);
}
package com.ruider.networkCommunication;
import com.ruider.common.ServiceInformation;
import com.ruider.common.UserApi;
import com.ruider.server.UserService;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
/**
使用 BIO 通信
*/
public class BIOService implements NetIO {
/**
主机 IP
*/
private String url;
/**
端口
*/
private int port;
/**
客户端 socket
*/
private Socket clientSocket;
/**
服务端 socket
*/
private ServerSocket serverSocket;
public BIOService() {}
public BIOService(String url, int port) {
this.url = url;
this.port = port;
}
/**
客户端采用 BIO 通信方式发送请求信息
@param serviceInformation
@return
*/
@Override
public Object send(ServiceInformation serviceInformation) {
ObjectOutputStream objectOutputStream = null;
ObjectInputStream objectInputStream = null;
try {
this.clientSocket = new Socket(serviceInformation.getServiceURL(), serviceInformation.getPort());
String className = serviceInformation.getClassName();
String methodName = serviceInformation.getMethod().getName();
Class<?>[] parameterTypes = serviceInformation.getMethod().getParameterTypes();
Object[] args = serviceInformation.getArgs();
objectOutputStream = new ObjectOutputStream(this.clientSocket.getOutputStream());
objectOutputStream.writeUTF(className);
objectOutputStream
.writeUTF(methodName);
objectOutputStream.writeObject(parameterTypes);
objectOutputStream.writeObject(args);
objectInputStream = new ObjectInputStream(this.clientSocket.getInputStream());
Object o = objectInputStream.readObject();
System.out.println("Object = " + o);
return o;
}
catch (Exception e) {
if (this.clientSocket != null) {
try {
this.clientSocket.close();
}catch (IOException e1) {
e1.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
}catch (IOException e1) {
e1.printStackTrace();
}
}
if (objectInputStream != null) {
try {
objectInputStream.close();
}catch (IOException e1) {
e1.printStackTrace();
}
}
return null;
}
finally {
if (this.clientSocket != null) {
try {
this.clientSocket.close();
}catch (IOException e1) {
e1.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
}catch (IOException e1) {
e1.printStackTrace();
}
}
if (objectInputStream != null) {
try {
objectInputStream.close();
}catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
/**
服务端采用 BIO 通信获取数据
@return
*/
@Override
public void recv (int port) {
ObjectInputStream objectInputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
this.serverSocket = new ServerSocket(port);
System.out.println("启动远程服务监听...");
//监听客户端发来消息
while (true){
Socket socket = serverSocket.accept();
objectInputStream = new ObjectInputStream(socket.getInputStream());
//客户端传输类名
String className = objectInputStream.readUTF();
String methodName = objectInputStream.readUTF();
Class<?>[] parameterTypes = (Class<?>[])objectInputStream.readObject();
Object[] arguments = (Object[]) objectInputStream.readObject();
Class clazz = null;
//服务匹配
if(className.equals(UserApi.class.getName())){
clazz = UserService.class;
}
//clazz = UserService.class;
Method method = clazz.getMethod(methodName,parameterTypes);
Object result = method.invoke(clazz.newInstance(),arguments);
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
socket.close();
}
} catch (Exception e) {
try {
if (objectOutputStream != null) {
objectOutputStream.close();
}
if (objectInputStream != null) {
objectInputStream.close();
}
if (this.serverSocket != null) {
this.serverSocket.close();
}
}
catch (IOException e1) {
e1.printStackTrace();
}
评论