本文把前面的代码整理一遍,不仅仅是demo层面,而是考虑到放进生产中使用,且尽可能用高版本,关于这块技术,网上的文章真是一言难尽,要么就是个概念,要么就是把官网的demo拿过来跑一遍,质量太差。
我本地有Istio,也安装了K8s和Docker,这些都可以根据官网来安装,我这里就忽略了。
我本地使用的版本情况
jdk:17
spring-boot-starter-parent:2.7.9,没有上3.0.0,是因为RocketMQ还不支持,上了之后启动RocketMQ报错,所以用了3.0.0之前的最后一个版本
spring-cloud-starter-kubernetes-client-all:2.1.6
在K8s容器中安装Istio,默认也会安装Envoy代理/网关,和Kiali dashboard等
把Envoy作为南北向流量网关,负责请求转发,限流等
把Envoy作为东西向业务网关,负责服务之间负载均衡等
service-provider 和 service-consumer 分别暴露gRpc接口,相互调用
service-provider 发送RocketMQ消息,service-consumer 多个消费组同时消费
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.9</version><relativePath/> <!-- lookup parent from repository --></parent><groupId&le</groupId><artifactId>service-provider</artifactId><version>0.0.1-SNAPSHOT</version><packaging>pom</packaging><name>service-provider</name><description>service-provider</description><modules><module>service-provider-proto</module><module>service-provider-start</module><module>service-provider-dto</module></modules><properties><java.version>17</java.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><type>pom</type><scope>import</scope><version>2.7.9</version></dependency><dependency><groupId&le</groupId><artifactId>service-consumer-proto</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId&le</groupId><artifactId>service-provider-proto</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId&le</groupId><artifactId>service-provider-dto</artifactId><version>0.0.1-SNAPSHOT</version></dependency></dependencies></dependencyManagement>
</project>
这个模块里面主要是dto类,方便把dto模块提供给其他服务pom引用,而不是引用整个服务,我这里只有一个Order类
这里是 service-provider 暴露出去的gRpc接口,我这里暴露了一个HelloService接口
同时这里需要配置下gRpc的一些依赖和插件,然后这个模块需要被本服务(接口的实现)或者其他服务(接口的调用)引用,我们看一下 service-provider-proto 模块的pom
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>service-provider</artifactId><groupId&le</groupId><version>0.0.1-SNAPSHOT</version></parent><groupId&le</groupId><artifactId>service-provider-proto</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>service-provider-proto</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- gRpc --><dependency><groupId>net.devh</groupId><artifactId>grpc-spring-boot-starter</artifactId><version>2.14.0.RELEASE</version></dependency><dependency><groupId&pc</groupId><artifactId>grpc-protobuf</artifactId><version>1.52.1</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>at</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version></dependency></dependencies><build><extensions><extension><groupId&d.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId&lstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact&le.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact&pc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins></build>
</project>
这里就是启动类了,我也把一些业务代码放进来了,按理说这不规范,但每个公司也有不同的分法。
我直接把代码都贴出来,不做过多的解释,方便有同学跟着写的。
ample.service.provider;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@SpringBootApplication
@EnableDiscoveryClient
public class ServiceProviderApplication {public static void main(String[] args) {SpringApplication.run(ServiceProviderApplication.class, args);}
}
这个是前面暴露HelloService的接口实现
ample.service.provider.facade;ample.service.provider.DemoConfig;
ample.service.provider.api.HelloServiceGrpc;
ample.service.provider.api.SayHelloData;
ample.service.provider.api.SayHelloRequest;
ample.service.provider.api.SayHelloResponse;
slf4j.Slf4j;
import net.pc.server.service.GrpcService;
import org.springframework.beans.factory.annotation.Autowired;@Slf4j
@GrpcService
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {@Autowiredprivate DemoConfig demoConfig;@Overridepublic void sayHello(SayHelloRequest request, io.grpc.stub.StreamObserver<SayHelloResponse> responseObserver) {log.info("接收consumer的say hello grpc 请求");SayHelloData helloData = wBuilder().setName("maple").Message()).build();SayHelloResponse.Builder builder = wBuilder().setCode(0).setMessage("success").setSuccess(true).setData(helloData);Next(builder.build());Completed();}
}
这个主要是测试动态配置是否生效的
ample.service.provider;import org.t.properties.ConfigurationProperties;
import t.annotation.Configuration;@Configuration
@ConfigurationProperties(prefix = "bean")
public class DemoConfig {private String message;public String getMessage() {return message;}public void setMessage(String message) {ssage = message;}
}
这个是 service-consumer 暴露出的gRpc接口,service-provider 去调用,因为生产中不同服务之间肯定是可以相互rpc调用的。
ample.service.provider;sumer.api.MemberQueryRequest;
sumer.api.MemberQueryResponse;
sumer.api.MemberQueryServiceGrpc;
le.protobuf.InvalidProtocolBufferException;
slf4j.Slf4j;
import net.pc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class MemberQueryGrpcClient {@GrpcClient("service-consumer")private MemberQueryServiceGrpc.MemberQueryServiceBlockingStub memberQueryBlockingStub;public String queryMember() {MemberQueryRequest request = wBuilder().setMemberId("111").setUsername("Wang Hong Bo").build();MemberQueryResponse response = memberQueryBlockingStub.queryMember(request);log.info(de:{}", Code());log.info(ssage:{}", Message());log.info("MemberQueryResponse.success:{}", Success());log.info("MemberQueryResponse.data:{}", Data());log.info("MemberQueryResponse.address1:{}", Data().getAddressList().get(0).getAddress());log.info("MemberQueryResponse.address2:{}", Data().getAddressList().get(1).getAddress());String();}
}
这个是MQ生产者发送消息的服务,我这里通过这个代码
String sceneStr = SceneEnum.destination(scene) 把消息做了转换,topic中间以“|”分隔,比如说我的Topic为:TP_S_1100|EC_EVENT_0001
但是在阿里云买的RocketMQ产品,它不支持topic中间以“|”分隔,认为它是特殊字符,我是本地安装的RocketMQ,它是支持的。
ample.service.provider.producer;import com.alibaba.fastjson.JSON;
ample.ums.SceneEnum;
slf4j.Slf4j;
import ketmq.client.producer.SendCallback;
import ketmq.client.producer.SendResult;
import RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import ssaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RocketMQProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 普通发送** @param scene* @param payload*/public void send(SceneEnum scene, Object payload) {String sceneStr = SceneEnum.destination(scene);log.info("producer.sendMessage: 【{}】,【{}】", sceneStr, JSONString(payload));vertAndSend(sceneStr, payload);}/*** 同步发送** @param scene* @param payload* @return*/public SendResult sendSync(SceneEnum scene, Object payload) {String sceneStr = SceneEnum.destination(scene);SendResult sendResult = rocketMQTemplate.syncSend(sceneStr, payload);log.info("producer.sendMessage: 【{}】,【{}】, sendResult:{}", sceneStr, JSONString(payload), sendResult);return sendResult;}/*** 发送异步消息** @param scene* @param payload*/public void sendASync(SceneEnum scene, Object payload) {rocketMQTemplate.asyncSend(SceneEnum.destination(scene), payload, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送成功啦" + sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println("异步发送出异常啦" + Message());}});}/*** 发送延时消息<br/>* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendDelay(SceneEnum scene, Object payload, int delayLevel) {rocketMQTemplate.syncSend(SceneEnum.destination(scene), MessageBuilder.withPayload(payload).build(), 2000, delayLevel);}/*** 发送单向消息(不关心发送结果,如日志)*/public void sendOneWayMsg(SceneEnum scene, Object payload) {rocketMQTemplate.sendOneWay(SceneEnum.destination(scene), payload);}
}
这个是场景枚举类,生产中肯定有各种场景定义
ample.ums;public enum SceneEnum {ORDER_CREATE("11000001", "ORDER_CREATE"),ORDER_CONFIRM("11000002", "ORDER_CONFIRM"),;private String sceneCode;private String desc;SceneEnum(String sceneCode, String desc) {this.sceneCode = sceneCode;this.desc = desc;}public String getSceneCode() {return sceneCode;}public String getDesc() {return desc;}public static String destination(SceneEnum scene) {String topic = "TP_S_" + SceneCode().substring(0, 4) + "|" + "EC_EVENT_" + SceneCode().substring(4, 8);String tag = Desc();return topic + ":" + tag;}
}
这个类是我用来测试服务功能的
ample.ller;ample.Order;
ample.service.provider.DemoConfig;
ample.service.provider.MemberQueryGrpcClient;
ample.ums.SceneEnum;
ample.service.provider.producer.RocketMQProducerService;
slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.t.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RefreshScope
@Slf4j
@RestController
@RequestMapping("/provider")
public class ProviderController {@Value("${ssage}")private String mapleTestMessage;@Autowiredprivate DemoConfig demoConfig;@Autowiredprivate RocketMQProducerService producerService;@Autowiredprivate MemberQueryGrpcClient memberQueryGrpcClient;@GetMapping("/provider-hello")public String sayHello() {log.info("hello world");return "hello world";}@GetMapping("/grpc/queryMember")public String queryMember() {log.info("消费服务:service-provider grpc 调用 service-consumer 的 query member 接口");return memberQueryGrpcClient.queryMember();}@GetMapping("/sendMq")public String sendMq() {Order order = new Order();order.setOrderName("Apple");for (int i = 0; i < 5; i++) {order.setPrice(i);producerService.sendSync(SceneEnum.ORDER_CREATE, order);}return "send message complete";}@GetMapping("/provider-value-config")public String valueConfig() {log.info("直接@Value获取配置:{}", mapleTestMessage);return mapleTestMessage;}@GetMapping("/demo-config")public String demoConfig() {log.info("通过ConfigurationProperties注解获取配置:{}", Message());Message();}
}
这个是我们的启动引导配置,所以里面只做了K8s的相关配置
spring:application:name: service-providercloud:kubernetes:reload:enabled: true #修改K8s的ConfigMap配置之后自动刷新,有默认的刷新策略和刷新时机config:name: ${spring.application.name} #定义配置文件的名称namespace: service-k8s-demosources:- name: ${spring.application.name}#真正引用配置文件的名称,根据我们的profile自动找对应的配置文件,该服务专属的配置namespace: service-k8s-demo- name: service-common-config #一个公共的配置文件,微服务架构中每个服务都可以引用的namespace: service-k8s-demo
management:endpoint:restart:enabled: truehealth:enabled: trueprobes:enabled: trueshow-details: alwaysprometheus:enabled: trueinfo:enabled: trueendpoints:web:exposure:include: '*'
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId&le</groupId><artifactId>service-provider</artifactId><version>0.0.1-SNAPSHOT</version></parent><artifactId>service-provider-start</artifactId><packaging>jar</packaging><name>service-provider-start</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId&le</groupId><artifactId>service-consumer-proto</artifactId></dependency><dependency><groupId&le</groupId><artifactId>service-provider-proto</artifactId></dependency><dependency><groupId&le</groupId><artifactId>service-provider-dto</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- kubernetes --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-kubernetes-client-all</artifactId><version>2.1.6</version><exclusions><exclusion><groupId&le.protobuf</groupId><artifactId>protobuf-java</artifactId></exclusion></exclusions></dependency><!-- RocketMQ --><dependency><groupId>ketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional><version>1.18.26</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><classifier>exec</classifier> <!-- 需要打可执行文件 --></configuration></plugin></plugins></build>
</project>
在 service-provider-start 模块下面,我们写了个Dockerfile文件,内容很简单。
FROM arm64v8/openjdk:20-slim-buster
ADD service-provider-start-0.0.1-SNAPSHOT-exec.jar service-provider.jar
ENTRYPOINT java -jar service-provider.jar
开发环境的配置文件
apiVersion: v1
kind: ConfigMap
metadata:name: service-provider-devnamespace: service-k8s-demolabels:spring.fig: "true"
l: |-spring:profiles: devserver:port: 30000grpc:server:port: 9090rocketmq:producer:group: SERVICE_PRODUCERname-server: 124.222.91.116:9876bean:message: Hello World! --devmaple-test:message: maple for dev config map in k8s --devredis:ip: k8s --dev
生产环境的配置文件
apiVersion: v1
kind: ConfigMap
metadata:name: service-provider-prodnamespace: service-k8s-demolabels:spring.fig: "true"
l: |-spring:profiles: prodserver:port: 30000grpc:server:port: 9090rocketmq:producer:group: SERVICE_PRODUCERname-server: 124.222.91.116:9876bean:message: Hello World! --prodmaple-test:message: maple for dev config map in k8s --prodredis:ip: k8s --prod
公共的配置文件,我这里主要是定义了gRpc的服务地址与端口,因为生产中服务是要相互调用的,网上的很多文章把 address: 'dns:///service-consumer:9091' 都写为了 address: 'static://service-consumer:9091',但是这会导致服务上下线之后,调用方调不到服务的情况,所以采用DNS来动态的发现服务。
apiVersion: v1
kind: ConfigMap
metadata:name: service-common-confignamespace: service-k8s-demolabels:spring.fig: "true"
l: |-grpc:client:GLOBAL:negotiation-type: plaintextenable-keep-alive: truekeep-alive-without-calls: trueservice-consumer:address: 'dns:///service-consumer:9091'service-provider:address: 'dns:///service-provider:9090'
官网上也给出了下面这种写法来区分不同环境,但是把所有环境都放进一个文件太过臃肿,而且这个文件也不建议太大, 不能超过1M,如果太大就要考虑挂载到磁盘上的目录。
这个是我们在K8s容器中部署服务的yaml文件
apiVersion: v1
kind: Namespace
metadata:name: service-k8s-demolabels:name: service-k8s-demo---apiVersion: v1
kind: ServiceAccount
metadata:name: service-k8s-demonamespace: service-k8s-demo---kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:namespace: service-k8s-demoname: service-k8s-demo
rules:- apiGroups:- ""resources:- services- configmaps- endpoints- nodes- pods- secrets- namespacesverbs:- get- list- watch---apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:name: service-k8s-demonamespace: service-k8s-demo
subjects:
- kind: ServiceAccountname: service-k8s-demonamespace: service-k8s-demo
roleRef:kind: ClusterRolename: service-k8s-demoapiGroup: rbac.authorization.k8s.io---apiVersion: apps/v1
kind: Deployment
metadata:name: service-providernamespace: service-k8s-demolabels:app: service-provider
spec:replicas: 3template:metadata:name: service-providerlabels:app: service-providerspec:containers:- name: service-providerimage: service-provider:1.0imagePullPolicy: IfNotPresentenv:- name: SPRING_PROFILES_ACTIVEvalue: "dev"ports:- name: httpprotocol: TCPcontainerPort: 30000- name: grpcprotocol: TCPcontainerPort: 9090serviceAccountName: service-k8s-demorestartPolicy: Alwaysselector:matchLabels:app: service-provider---apiVersion: v1
kind: Service
metadata:name: service-providernamespace: service-k8s-demo
spec:selector:app: service-providerports:- port: 80targetPort: 30000name: http- port: 9090targetPort: 9090name: grpctype: NodePort
你可以发现我在这里指定了个profiles,实际生产中可以有两份yaml文件,也可以一份yaml文件,把profile给动态传进来,这个我之后再研究过来更新吧。
这个是Envoy充当南北向网关的,负责请求的转发和限流等,我这里目前是测试了转发,限流后面测试完成再过来更新。
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:name: service-k8s-demo-gatewaynamespace: service-k8s-demo
spec:selector:istio: ingressgateway # use istio default controllerservers:- port:number: 31400name: httpprotocol: HTTPhosts:- "*"
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:name: service-k8s-demo-virtual-servicenamespace: service-k8s-demo
spec:hosts:- "*"gateways:- service-k8s-demo-gatewayhttp:- match:- uri:prefix: /consumerroute:- destination:host: service-consumerport:number: 80- match:- uri:prefix: /providerroute:- destination:host: service-providerport:number: 80
至此为止,我们的 service-provider 模块的代码都已经全部完成了,不要妄想本地能跑起来哈,因为我们是在yaml文件里指定的profile,所以必须在K8s中执行yaml文件才能让他自动把帮我们去找config文件的,不过可以试着启动看看有没有什么报错,我本地启动是报这俩错,这是因为我们本地没有和K8s打交道,导致没有K8s的环境,所以也找不到配置类,报了RocketMQ的错,但是没有关系,等我们使用K8s时这些问题都没有了。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.9</version><relativePath/> <!-- lookup parent from repository --></parent><groupId&le</groupId><artifactId>service-consumer</artifactId><version>0.0.1-SNAPSHOT</version><packaging>pom</packaging><name>service-consumer</name><description>service-consumer</description><modules><module>service-consumer-proto</module><module>service-consumer-start</module></modules><properties><java.version>17</java.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><type>pom</type><scope>import</scope><version>2.7.9</version></dependency><dependency><groupId&le</groupId><artifactId>service-consumer-proto</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId&le</groupId><artifactId>service-provider-proto</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId&le</groupId><artifactId>service-provider-dto</artifactId><version>0.0.1-SNAPSHOT</version></dependency></dependencies></dependencyManagement></project>
syntax = "proto3";option java_multiple_files = true;
option java_package = sumer.api";
option java_outer_classname = "MemberQueryServiceProto";service MemberQueryService {rpc queryMember (MemberQueryRequest) returns (MemberQueryResponse) {}
}message MemberQueryRequest {string memberId = 1;string username = 2;
}message MemberQueryResponse {int32 code = 1;string message = 2;bool success = 3;MemberQueryData data = 4;
}message MemberQueryData {string memberId = 1;string username = 2;int32 age = 3;repeated AddressData address = 4;map<string, string> extMap = 5;
}message AddressData {string address = 1;string phone = 2;
}
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>service-consumer</artifactId><groupId&le</groupId><version>0.0.1-SNAPSHOT</version></parent><groupId&le</groupId><artifactId>service-consumer-proto</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>service-consumer-proto</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- gRpc --><dependency><groupId>net.devh</groupId><artifactId>grpc-spring-boot-starter</artifactId><version>2.14.0.RELEASE</version></dependency><dependency><groupId&pc</groupId><artifactId>grpc-protobuf</artifactId><version>1.52.1</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>at</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version></dependency></dependencies><build><extensions><extension><groupId&d.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId&lstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact&le.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact&pc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins></build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId&le</groupId><artifactId>service-consumer</artifactId><version>0.0.1-SNAPSHOT</version></parent><artifactId>service-consumer-start</artifactId><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId&le</groupId><artifactId>service-provider-proto</artifactId></dependency><dependency><groupId&le</groupId><artifactId>service-consumer-proto</artifactId></dependency><dependency><groupId&le</groupId><artifactId>service-provider-dto</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- kubernetes --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-kubernetes-client-all</artifactId><version>2.1.6</version><exclusions><exclusion><groupId&le.protobuf</groupId><artifactId>protobuf-java</artifactId></exclusion></exclusions></dependency><!-- RocketMQ --><dependency><groupId>ketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional><version>1.18.26</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><classifier>exec</classifier></configuration></plugin></plugins></build></project>
这个是 service-consumer 暴露接口的实现类
sumer.facade;sumer.api.*;
llect.Maps;
slf4j.Slf4j;
import net.pc.server.service.GrpcService;import java.util.ArrayList;
import java.util.List;
import java.util.Map;@Slf4j
@GrpcService
public class MemberQueryServiceImpl extends MemberQueryServiceGrpc.MemberQueryServiceImplBase {@Overridepublic void queryMember(MemberQueryRequest request, io.grpc.stub.StreamObserver<MemberQueryResponse> responseObserver) {log.info("接收其他服务的 query member 的 grpc 请求, 请求参数为:{}", request);List<AddressData> addressList = new ArrayList<>();addressList.wBuilder().setAddress("杭州").setPhone("110").build());addressList.wBuilder().setAddress("马来西亚").setPhone("911").build());Map<String, String> extMap = wHashMap();extMap.put("secondField", "2");extMap.put("lastField", "last");MemberQueryData queryData = wBuilder().MemberId()).Username()).setAge(18).addAllAddress(addressList).putExtMap("firstField", "test-maple").putAllExtMap(extMap).build();MemberQueryResponse.Builder builder = wBuilder().setCode(0).setMessage("success").setSuccess(true).setData(queryData);Next(builder.build());Completed();}
}
sumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@EnableDiscoveryClient
@SpringBootApplication
public class ServiceConsumerApplication {public static void main(String[] args) {SpringApplication.run(ServiceConsumerApplication.class, args);}
}
这个是消费订单创建的消息,然后给用户发送通知的
sumer.mq;import com.alibaba.fastjson.JSON;
ample.Order;
slf4j.Slf4j;
import ketmq.spring.annotation.RocketMQMessageListener;
import RocketMQListener;
import org.springframework.stereotype.Component;/*** 创建订单成功,发送gotone通知*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "TP_S_1100|EC_EVENT_0001", consumerGroup = "CREATE_ORDER_GOTONE_CONSUMER")
public class CreateOrderGotoneConsumer implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {log.info("consumer message: 【CreateOrderGotoneConsumer】,【{}】", JSONString(order));}
}
这个也是消费订单创建的消息,扣减库存的,这两个是不同的consumerGroup,所以都能消费到消息,但是这个情况一定要做好幂等处理,访问其中一个consumerGroup消费失败触发重试一直投递消息,导致处理成功的consumerGroup也一直收到消息,所以要做好幂等。
sumer.mq;import com.alibaba.fastjson.JSON;
ample.Order;
slf4j.Slf4j;
import ketmq.spring.annotation.RocketMQMessageListener;
import RocketMQListener;
import org.springframework.stereotype.Component;/*** 创建订单成功,扣减库存*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "TP_S_1100|EC_EVENT_0001", consumerGroup = "DEDUCT_INVENTORY_CONSUMER")
public class DeductInventoryConsumer implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {log.info("consumer message: 【DeductInventoryConsumer】,【{}】", JSONString(order));}
}
这个是 service-consumer 调用 service-provider 的gRpc 接口
pc;ample.service.provider.api.HelloServiceGrpc;
ample.service.provider.api.SayHelloRequest;
ample.service.provider.api.SayHelloResponse;
import net.pc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;@Service
public class ProviderServiceGrpcClient {@GrpcClient("service-provider")private HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub;public String sayHello() {SayHelloRequest request = wBuilder().setName("maple123").build();SayHelloResponse sayHelloResponse = helloServiceBlockingStub.sayHello(request);String();}
}
这个是测试consumer服务的入口
ller;pc.ProviderServiceGrpcClient;
sumer.ServiceConsumerApplication;
import lombok.RequiredArgsConstructor;
slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.t.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.List;@Slf4j
@RequiredArgsConstructor
@RefreshScope
@RestController
@RequestMapping("/consumer")
public class ConsumerController {private final DiscoveryClient discoveryClient;private final ProviderServiceGrpcClient providerServiceGrpcClient;public static void main(String[] args) {SpringApplication.run(ServiceConsumerApplication.class, args);}@Value("${consumer.body}")private String consumerBody;@GetMapping("/grpc/hello")public String sayHello() {log.info("消费服务:service-consumer grpc 调用 service-provider");return providerServiceGrpcClient.sayHello();}@GetMapping("/consumerBody")public String consumerBody() {log.info("获取配置consumerBody:{}", consumerBody);return consumerBody;}@GetMapping("/consumers/services")public List<String> findServices() {log.info("当前注册中心下所有服务");List<String> services = Services();services.stream().map(discoveryClient::getInstances).forEach(v ->v.forEach(s -> System.out.printf("%s:%s uri:%s%n", s.getHost(), s.getPort(), s.getUri())));return services;}
}
这个是 service-consumer 的引导类配置,也放了rocketmq 的配置进来,因为对于消费者它启动的时候就会去初始化consumer。
spring:application:name: service-consumercloud:kubernetes:reload:enabled: trueconfig:name: ${spring.application.name}namespace: service-k8s-demosources:- name: ${spring.application.name}namespace: service-k8s-demo- name: service-common-confignamespace: service-k8s-demomanagement:endpoint:restart:enabled: truehealth:enabled: trueinfo:enabled: trueendpoints:web:exposure:include: '*'
rocketmq:name-server: 124.222.91.116:9876
FROM arm64v8/openjdk:20-slim-buster
ADD service-consumer-start-0.0.1-SNAPSHOT-exec.jar service-consumer.jar
ENTRYPOINT java -jar service-consumer.jar
apiVersion: v1
kind: ConfigMap
metadata:name: service-consumer-devnamespace: service-k8s-demolabels:spring.fig: "true"
l: |-spring:profiles: devserver:port: 30001grpc:server:port: 9091consumer:body: "1234567890--dev"
apiVersion: v1
kind: ConfigMap
metadata:name: service-consumer-prodnamespace: service-k8s-demolabels:spring.fig: "true"
l: |-spring:profiles: prodserver:port: 30001grpc:server:port: 9091consumer:body: "1234567890--prod"
apiVersion: v1
kind: Namespace
metadata:name: service-k8s-demolabels:name: service-k8s-demo---apiVersion: v1
kind: ServiceAccount
metadata:name: service-k8s-demonamespace: service-k8s-demo---kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:namespace: service-k8s-demoname: service-k8s-demo
rules:- apiGroups:- ""resources:- services- configmaps- endpoints- nodes- pods- secrets- namespacesverbs:- get- list- watch---apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:name: service-k8s-demonamespace: service-k8s-demo
subjects:- kind: ServiceAccountname: service-k8s-demonamespace: service-k8s-demo
roleRef:kind: ClusterRolename: service-k8s-demoapiGroup: rbac.authorization.k8s.io---apiVersion: apps/v1
kind: Deployment
metadata:name: service-consumernamespace: service-k8s-demolabels:app: service-consumer
spec:replicas: 1template:metadata:name: service-consumerlabels:app: service-consumerspec:containers:- name: service-consumerimage: service-consumer:1.0imagePullPolicy: IfNotPresentenv:- name: SPRING_PROFILES_ACTIVEvalue: "prod"ports:- name: httpprotocol: TCPcontainerPort: 30001- name: grpcprotocol: TCPcontainerPort: 9091serviceAccountName: service-k8s-demorestartPolicy: Alwaysselector:matchLabels:app: service-consumer---apiVersion: v1
kind: Service
metadata:name: service-consumernamespace: service-k8s-demo
spec:selector:app: service-consumerports:- port: 80targetPort: 30001name: http- port: 9091targetPort: 9091name: grpctype: NodePort
至此为止,我们的 service-consumer 模块的代码也都已经全部完成了。
0. 准备环境:docker、K8s、Istio
service-provider 和 service-consumer 打镜像,我这里直接打到本地仓库,这里是docker的范畴(docker build -t service-provider:1.0 . 等)
把 service-provider 和 service-consumer 的 config 都在K8s容器中执行,这里是K8s的范畴(kubectl apply -f service-consumer-dev.yaml 等)
把 service-provider 和 service-consumer 部署到 K8s 容器中,这里是K8s的范畴(kubectl apply -f service-consumer-deploy.yaml 等),部署完成之后我们查看一下相关的pod,service,config等
这个时候两个服务的端口都有了,就可以使用controller里的rest接口相互进行测试了,可以测试下service-consumer调用service-provide的gRpc接口,可以发现 service-provide 虽然有3台但全打在了一个POD上,这个是因为gRpc是基于HTTP2.0多路复用,L4层基于连接级别的负载均衡,在K8s中负载均衡是失效的,需要借助L7应用层负载均衡来做,Envoy和Linkerd等可以实现,Linkerd实现起来很简单,可以点这里。
把我们的namespace 注入到 Envoy 中
kubectl label namespace service-k8s-demo istio-injection=enabled
启动Istio dashboard Kiali
istioctl dashboard kiali &
做测试,我的Gateway里的端口是31400
测试gRpc相互调用,同时也关注下负载均衡的问题,这里解决了。
测试RocketMQ 生产消费
测试动态配置
至此,测试完成。
本文发布于:2024-01-28 19:51:16,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064426819860.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |