PRC 即远程服务调用,是实现分布式最基本的方式,而这一实现基于的又是java的反射功能所实现的动态代理。
完全可以自己写一个不到200行的rpc服务:
只需要实现四个类:
0.本地客户端
1. 远程服务的本地代理
* 功能:* 0. 将本地的接口调用转化成JDK动态代理,在动态代理中实现接口的远程调用* 1. 创建Socket客户端,根据指定地址调用远程服务真正实现者* 2. 同步阻塞获取服务返回应答
2.远程的服务代理
* 功能:
* 0.监听TCP连接,收到新的连接后,将其封装成task,交给线程池去执行* 1.将客户端发送的码流反序列化成对象,调用服务实现,获取执行结果* 2.将执行结果序列化,通过Socket发送给客户端
3.远程服务的真正实现者
key : 为什么要用反射呢?因为远程请求来的时候,并不知道请求的是哪个类的哪个方法,所以需要动态反射出来类,并得到相应调用的方法。
result:最终的效果就是调用远程的服务就像调用本地的一样,客户端并不用管调用的服务在哪里,框架将一切都封装好了。
代码:
/**
* 本地客户端
*/
public class RPCClient {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
RPCExporter.exporter("locolhost", 8080);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
RPCImporter
EchoService echo = importer.importer(EchoServiceIpml.class,
new InetSocketAddress("locolhost", 8080));
System.out.println(echo.echo("Are you ok? "));
}
}
/**
* 远程服务的本地代理
* 功能:
* 0. 将本地的接口调用转化成JDK动态代理,在动态代理中实现接口的远程调用
* 1. 创建Socket客户端,根据指定地址调用远程服务真正实现者
* 2. 同步阻塞获取服务返回应答
*/
public class RPCImporter
public T importer(Class> serviceClass, InetSocketAddress address) {
return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),
new Class>[]{serviceClass.getInterfaces()[0]}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
socket = new Socket();
socket.connect(address);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(serviceClass.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readShort();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null) socket.close();
if (inputStream != null) inputStream.close();
if (outputStream != null) outputStream.close();
}
return null;
}
});
}
}
/**
* 远程的服务代理
* 功能:
* 0.监听TCP连接,收到新的连接后,将其封装成task,交给线程池去执行
* 1.将客户端发送的码流反序列化成对象,调用服务实现,获取执行结果
* 2.将执行结果序列化,通过Socket发送给客户端
*/
public class RPCExporter {
private static Executor threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void exporter(String hostName, int port) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(hostName, port));
try {
while (true) {
threadPool.execute(new ServerTask(serverSocket.accept()));
}
} finally {
serverSocket.close();
}
}
private static class ServerTask implements Runnable {
private Socket _client;
public ServerTask(Socket client) {
_client = client;
}
@Override
public void run() {
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
inputStream = new ObjectInputStream(_client.getInputStream());
String interfaceName = inputStream.readUTF();
String methodName = inputStream.readUTF();
Class> service = Class.forName(interfaceName);
Class>[] paramsType = (Class>[]) inputStream.readObject();
Method method = service.getMethod(methodName, paramsType);
Object[] args = (Object[]) inputStream.readObject();
Object result = method.invoke(service.newInstance(), args);
outputStream = new ObjectOutputStream(_client.getOutputStream());
outputStream.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
/**
* 远程的服务真正实现者
*/
public class EchoServiceIpml implements EchoService {
@Override
public String echo(String ping) {
return ping != null ? ping + "---> I'm ok" : "I'm ok";
}
}