欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

《java版gRPC实战》全系列链接

  1. 用proto生成代码
  2. 服务发布和调用
  3. 服务端流
  4. 客户端流
  5. 双向流
  6. 客户端动态获取服务端地址
  7. 基于eureka的注册发现

客户端为什么要动态获取服务端地址

本文是《java版gRPC实战》系列的第六篇,前面咱们在开发客户端应用时,所需的服务端地址都是按如下步骤设置的:

  • 在application.yml中配置,如下图:

  • 在用到gRPC的bean中,使用注解GrpcClient即可将Stub类注入到成员变量中:

  • 上述操作方式的优点是简单易用好配置,缺点也很明显:服务端的IP地址或者端口一旦有变化,就必须修改application.yml并重启客户端应用;

为什么不用注册中心

  • 您一定会想到解决上述问题最简单的方法就是使用注册中心,如nacos、eureka等,其实我也是这么想的,直到有一天,由于工作原因,我要在一个已有的gRPC微服务环境部署自己的应用,这个微服务环境并非java技术栈,而是基于golang的,他们都使用了go-zero框架( 老扎心了),这个go-zero框架没有提供java语言的SDK,因此,我只能服从go-zero框架的规则,从etcd中取得其他微服务的地址信息,才能调用其他gRPC服务端,如下图所示:

  • 如此一来,咱们之前那种在application.yml中配置服务端信息的方法就用不上了,本篇咱们来开发一个新的gRPC客户端应用,满足以下需求:
  1. 创建Stub对象的时候,服务端的信息不再来自注解GrpcClient,而是来自查询etcd的结果;
  2. etcd上的服务端信息有变化的时候,客户端可以及时更新,而不用重启应用;

本篇概览

  • 本篇要开发名为get-service-addr-from-etcd的springboot应用,该应用从etcd取得local-server应用的IP和端口,然后调用local-server的sayHello接口,如下图:

  1. 开发客户端应用;
  2. 部署gRPC服务端应用;
  3. 部署etcd;
  4. 模拟go-zero的规则,将服务端应用的IP地址和端口写入etcd;
  5. 启动客户端应用,验证能否正常调用服务端的服务;
  6. 重启服务端,重启的时候修改端口;
  7. 修改etcd中服务端的端口信息;
  8. 调用接口触发客户端重新实例化Stub对象;
  9. 验证客户端能否正常调用修改了端口的服务端服务;

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

  • grpc-tutorials文件夹下有多个目录,本篇文章对应的客户端代码在get-service-addr-from-etcd目录下,如下图:

开发客户端应用

  • 在父工程的build.gradle文件中新增一行,这是etcd相关的库,如下图红框所示:

  • 在父工程grpc-turtorials下面新建名为get-service-addr-from-etcd的模块,其build.gradle内容如下:
plugins {
id 'org.springframework.boot'
} dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation 'io.etcd:jetcd-core'
implementation project(':grpc-lib')
}
  • 配置文件application.yml,设置自己的web端口号和应用名,另外grpc.etcdendpoints是etcd集群的地址信息:
server:
port: 8084
spring:
application:
name: get-service-addr-from-etcd grpc:
# etcd的地址,从此处取得gRPC服务端的IP和端口
etcdendpoints: 'http://192.168.72.128:2379,http://192.168.50.239:2380,http://192.168.50.239:2381'
  • 启动类DynamicServerAddressDemoApplication.java的代码就不贴了,普通的springboot启动类而已;

  • 新增StubWrapper.java文件,这是个spring bean,要重点关注的是simpleBlockingStub方法,当bean在spring注册的时候simpleBlockingStub方法会被执行,这样每当bean在spring注册时,都会从etcd查询gRPC服务端信息,然后创建SimpleBlockingStub对象:

package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import static com.google.common.base.Charsets.UTF_8; /**
* @author will (zq2599@gmail.com)
* @version 1.0
* @description: 包装了SimpleBlockingStub实例的类,发起gRPC请求时需要用到SimpleBlockingStub实例
* @date 2021/5/8 19:34
*/
@Component("stubWrapper")
@Data
@Slf4j
@ConfigurationProperties(prefix = "grpc")
public class StubWrapper { /**
* 这是etcd中的一个key,该key对应的值是grpc服务端的地址信息
*/
private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server"; /**
* 配置文件中写好的etcd地址
*/
private String etcdendpoints; private SimpleGrpc.SimpleBlockingStub simpleBlockingStub; /**
* 从etcd查询gRPC服务端的地址
* @return
*/
public String[] getGrpcServerInfo() {
// 创建client类
KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient(); GetResponse response = null; // 去etcd查询/grpc/local-server这个key的值
try {
response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get();
} catch (Exception exception) {
log.error("get grpc key from etcd error", exception);
} if (null==response || response.getKvs().isEmpty()) {
log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY);
return null;
} // 从response中取得值
String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8); // rawAddrInfo是“192.169.0.1:8080”这样的字符串,即一个IP和一个端口,用":"分割,
// 这里用":"分割成数组返回
return null==rawAddrInfo ? null : rawAddrInfo.split(":");
} /**
* 每次注册bean都会执行的方法,
* 该方法从etcd取得gRPC服务端地址,
* 用于实例化成员变量SimpleBlockingStub
*/
@PostConstruct
public void simpleBlockingStub() {
// 从etcd获取地址信息
String[] array = getGrpcServerInfo(); log.info("create stub bean, array info from etcd {}", Arrays.toString(array)); // 数组的第一个元素是gRPC服务端的IP地址,第二个元素是端口
if (null==array || array.length<2) {
log.error("can not get valid grpc address from etcd");
return;
} // 数组的第一个元素是gRPC服务端的IP地址
String addr = array[0];
// 数组的第二个元素是端口
int port = Integer.parseInt(array[1]); // 根据刚才获取的gRPC服务端的地址和端口,创建channel
Channel channel = ManagedChannelBuilder
.forAddress(addr, port)
.usePlaintext()
.build(); // 根据channel创建stub
simpleBlockingStub = SimpleGrpc.newBlockingStub(channel);
}
}
  • GrpcClientService是封装了StubWrapper的服务类:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.HelloReply;
import com.bolingcavalry.grpctutorials.lib.HelloRequest;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.grpc.StatusRuntimeException;
import lombok.Setter;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; @Service
public class GrpcClientService { @Autowired(required = false)
@Setter
private StubWrapper stubWrapper; public String sendMessage(final String name) {
// 很有可能simpleStub对象为null
if (null==stubWrapper) {
return "invalid SimpleBlockingStub, please check etcd configuration";
} try {
final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build());
return response.getMessage();
} catch (final StatusRuntimeException e) {
return "FAILED with " + e.getStatus().getCode().name();
}
}
}
  • 新增一个controller类GrpcClientController,提供一个http接口,里面会调用GrpcClientService的方法,最终完成远程gRPC调用:
package com.bolingcavalry.dynamicrpcaddr;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; @RestController
public class GrpcClientController { @Autowired
private GrpcClientService grpcClientService; @RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "will") String name) {
return grpcClientService.sendMessage(name);
}
}
  • 接下来新增一个controller类RefreshStubInstanceController,对外提供一个http接口refreshstub,作用是删掉stubWrapper这个bean,再重新注册一次,这样每当外部调用refreshstub接口,就可以从etcd取得服务端信息再重新实例化SimpleBlockingStub成员变量,这样就达到了客户端动态获取服务端地址的效果:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; @RestController
public class RefreshStubInstanceController implements ApplicationContextAware { private ApplicationContext applicationContext; @Autowired
private GrpcClientService grpcClientService; @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
} @RequestMapping("/refreshstub")
public String refreshstub() { String beanName = "stubWrapper"; //获取BeanFactory
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory(); // 删除已有bean
defaultListableBeanFactory.removeBeanDefinition(beanName); //创建bean信息.
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class); //动态注册bean.
defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition()); // 更新引用关系(注意,applicationContext.getBean方法很重要,会触发StubWrapper实例化操作)
grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class)); return "Refresh success";
}
}
  • 编码完成,开始验证;

部署gRPC服务端应用

部署gRPC服务端应用很简单,启动local-server应用即可:

部署etcd

  • 为了简化操作,我这里的etcd集群是用docker部署的,对应的docker-compose.yml文件内容如下:
version: '3'
services:
etcd1:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd1'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd1:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd1:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2379:2379
volumes:
- ./store/etcd1/data:/etcd_data
etcd2:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd2'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd2:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd2:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2380:2379
volumes:
- ./store/etcd2/data:/etcd_data
etcd3:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd3'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd3:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd3:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2381:2379
volumes:
- ./store/etcd3/data:/etcd_data
  • 准备好上述文件后,执行docker-compose up -d即可创建集群;

将服务端应用的IP地址和端口写入etcd

  • 我这边local-server所在服务器IP是192.168.50.5,端口9898,所以执行以下命令将local-server信息写入etcd:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898

启动客户端应用

  • 打开DynamicServerAddressDemoApplication.java,点击下图红框位置,即可启动客户端应用:

  • 注意下图红框中的日志,该日志证明客户端应用从etcd获取服务端信息成功:

  • 浏览器访问应用get-service-addr-from-etcd的http接口,成功收到响应,证明gRPC调用成功:

  • 去看local-server的控制台,如下图红框,证明远程调用确实执行了:

重启服务端,重启的时候修改端口

  • 为了验证动态获取服务端信息是否有效,咱们先把local-server应用的端口改一下,如下图红框,改成9899:

  • 改完重启local-server,如下图红框,可见gRPC端口已经改为9899:

  • 这时候再访问get-service-addr-from-etcd的http接口,由于get-service-addr-from-etcd不知道local-server的监听端口发生了改变,因此还是去访问9898端口,毫无意外的返回了失败:

修改etcd中服务端的端口信息

现在执行以下命令,将etcd中的服务端信息改为正确的:

docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899

调用接口触发客户端重新实例化Stub对象

  • 聪明的您一定知道接下来要做的事情了:让StubWrapper的bean重新在spring环境注册,也就是调用RefreshStubInstanceController提供的http接口refreshstub:

  • 查看get-service-addr-from-etcd应用的控制台,如下图红框,StubWrapper已经重新注册了,并且从etcd取得了最新的服务端信息:

验证客户端能否正常调用修改了端口的服务端服务

  • 再次访问get-service-addr-from-etcd应用的web接口,如下图,gRPC调用成功:

  • 至此,在不修改配置不重启服务的情况下,客户端也可以适应服务端的变化了,当然了,本文只是提供基本的操作参考,实际上的微服务环境会更复杂,例如refreshstub接口可能被其他服务调用,这样服务端有了变化可以更加及时地被更新,还有客户端本身也肯能是gRPC服务提供方,那也要把自己注册到etcd上去,还有利用etcd的watch功能监控指定的服务端是否一直存活,以及同一个gRPC服务的多个实例如何做负载均衡,等等,这些都要根据您的实际情况来定制;

  • 本篇内容过多,可见对于这些官方不支持的微服务环境,咱们自己去做注册发现的适配很费时费力的,如果设计和选型能自己做主,我们更倾向于使用现成的注册中心,接下来的文章,咱们就一起尝试使用eureka为gRPC提供注册发现服务;

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...

https://github.com/zq2599/blog_demos

最新文章

  1. Java8新特性——接口的默认方法和类方法
  2. ubuntu 安装mongodb
  3. iOS打包ipa包
  4. QQ视差特效和ListView侧滑删除
  5. MYSQL的慢查询两个方法
  6. [BS-25] IOS中手势UIGestureRecognizer概述
  7. 键盘unicode值对照表
  8. for else
  9. mysql中常用的公式及个人演示
  10. React学习笔记(三) 组件传值
  11. websocket nova vnc proxy
  12. 向着DJANGO奔跑!
  13. pl/sql执行动态sql
  14. 大话NoSql
  15. nicescroll 配置参数
  16. Less命名空间
  17. 不可思议的纯 CSS 滚动进度条效果
  18. REST API设计指导——译自Microsoft REST API Guidelines(三)
  19. 常被问到的十个 Java 面试题
  20. 解决logging模块日志信息重复问题

热门文章

  1. Android达到什么水平才能顺利拿到 20k 无压力?
  2. Promise教程及用法
  3. 一台服务器上部署多个Terracotta的方法
  4. Git-01-安装
  5. 【笔记】随机森林和Extra-Trees
  6. 42岁大龄程序员的迷茫,看我最新尝鲜.net 5+Dapper搭建的WebAPI框架
  7. SIM900A—发送、接收中英文短信
  8. NOIP 模拟 $15\; \rm \text{玫瑰花精}$
  9. COM笔记-引用计数
  10. OAuth2 与OpenID的区别