首页 技术 正文
技术 2022年11月15日
0 收藏 535 点赞 2,919 浏览 35500 个字

引自B站楠哥:https://space.bilibili.com/434617924

一、创建父工程

创建父工程hello-spring-cloud-alibaba

Spring Cloud Alibaba 的环境在父工程中创建,微服务的各个组件作为子工程,继承父工程的环境。

Spring Boot —》Spring Cloud —》Spring Cloud Alibaba

引入依赖时注意提前查询一下spring cloud的版本依赖关系。

pom文件为,父工程打包时应打成pom,否则子工程构建时无法打包成功。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yqd</groupId>
<artifactId>hello-spring-cloud-alibaba</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>hello-spring-cloud-alibaba</name>
<description>Demo project for hello-spring-cloud-alibaba</description>
<packaging>pom</packaging> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> <dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR3</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build></project>

父工程中只需要引入所需要的依赖就可以了,不用其他的资源,可以删除src目录和resources目录。

二、服务的提供者

在父工程中添加spring boot模块module,工程名字为provider。

pom文件为

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yqd</groupId>
<artifactId>hello-spring-cloud-alibaba</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>com.yqd</groupId>
<artifactId>provider</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>provider</name>
<description>Demo project for Spring Boot</description> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build></project>

在子工程中要引入spring-cloud-starter-alibaba-nacos-discovery依赖,这样才可以注册到nacos中。

配置文件application.yml为:

spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
application:
name: providerserver:
port: 8081

做个接口供外部调用:

package com.yqd.controller;import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProviderController { @Value("${server.port}") //SPEL
private String port; @GetMapping("/index")
public String index(){
return this.port;
}
}

下载解压nacos1.2.1,解压之后打开bin目录下的startup.cmd,然后通过url访问nacos,http://localhost:8848/nacos/index.html,登录的用户名和密码都默认是nocos。可以通过nacos查看注册的实例。

三、服务的消费者

在父工程中添加spring boot模块module,工程名字为consumer。

pom文件为

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yqd</groupId>
<artifactId>hello-spring-cloud-alibaba</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>com.yqd</groupId>
<artifactId>consumer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>consumer</name>
<description>Demo project for Spring Boot</description> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build></project>

在子工程中要引入spring-cloud-starter-alibaba-nacos-discovery依赖,这样才可以注册到nacos中。但是消费者只需要到提供者中调用接口就可以了,因此可以不用在配置文件中配置nacos。

配置文件application.yml为:

server:
port: 8180

远程过程调用使用RestTemplate

package com.yqd.config;import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.web.client.RestTemplate;@Configuration
public class ConsumerConfiguration {
@Bean
@LoadBalanced //使用Ribbon进行远程调用
@Primary
public RestTemplate restTemplate(){
return new RestTemplate();
} //必须使用自己配置的这个Bean,并且使用名称注入,否则报错java.lang.IllegalStateException: No instances available for ...
@Bean
public RestTemplate restTemplateCustom(){
return new RestTemplate();
}
}

创建接口来从provider的接口中获取数据

package com.yqd.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;import java.util.List;
import java.util.concurrent.ThreadLocalRandom;@RestController
@Slf4j
public class ConsumerController {
//自动装载,直接用
@Autowired
private DiscoveryClient discoveryClient;
//没有自动装载,不能直接用,需要用配置类装载
@Autowired(required = false) // 必须使用自己配置的这个Bean,并且使用名称注入,否则报错
@Qualifier("restTemplateCustom") // 表示根据名称来找bean
private RestTemplate restTemplateCustom; /**
* 返回服务实例
* @return
*/
@GetMapping("/instances")
public List<ServiceInstance> instances() {
List<ServiceInstance> provider = this.discoveryClient.getInstances("provider");
return provider;
} @GetMapping("/index")
public String index(){
//调用provider服务的index方法
//1、找到provider实例
List<ServiceInstance> list = this.discoveryClient.getInstances("provider");
int index = ThreadLocalRandom.current().nextInt(list.size());//返回一个随机数
ServiceInstance serviceInstance = list.get(index); //获取其中一个实例
String url = serviceInstance.getUri()+"/index";//URL=URI+请求路径
//2、调用
log.info("调用的端口是,{}", serviceInstance.getPort());
return "调用了端口为:"+serviceInstance.getPort()+"的服务,返回结果是:"+this.restTemplateCutom.getForObject(url, String.class);
}
}

四、使用Ribbon做负载均衡

配置类

@Configuration
public class ConsumerConfig { @Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}}
@RestController
public class ConsumerController { @Autowired
private RestTemplate restTemplate; @GetMapping("/index")
public String index(){
return "consumer远程调用provier:"+this.restTemplate.getForObject("http://provider/index", String.class);
}}

application.yml中配置,随机调用机器

server:
port: 8180
provider:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

Nacos 权重配置,在nacos中配置机器的权重,权重越高被访问的概率越大

package com.yqd.config;import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.ribbon.NacosServer;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;@Slf4j
public class NacosWeightedRule extends AbstractLoadBalancerRule { @Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties; @Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
//读取配置文件
} @Override
public Server choose(Object o) {
ILoadBalancer loadBalancer = this.getLoadBalancer();
BaseLoadBalancer baseLoadBalancer = (BaseLoadBalancer) loadBalancer;
//获取要请求的微服务名称
String name = baseLoadBalancer.getName();
//获取服务发现的相关API
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
try {
Instance instance = namingService.selectOneHealthyInstance(name);
log.info("选择的实例是port={},instance={}",instance.getPort(),instance);
return new NacosServer(instance);
} catch (NacosException e) {
e.printStackTrace();
return null;
}
}
}
server:
port: 8180
provider:
ribbon:
NFLoadBalancerRuleClassName: com.yqd.config.NacosWeightedRule

五、Sentinel 服务限流降级

雪崩效应—>降级、限流、熔断

解决方案:

1、设置线程超时

2、设置限流

3、熔断器 Sentinel、Hystrix

1、在服务提供者provider项目的pom.xml中加入依赖

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2.2.1.RELEASE</version>
</dependency><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

2、在provider的application中增加配置

management:
endpoints:
web:
exposure:
include: '*'
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080

3、下载 Sentinel 控制台,解压,启动。访问地址:http://localhost:8080。用户名和密码都是sentinel。

4、启动nacos,再启动provider项目,访问provider中的接口,等待一会刷新会出现服务相关的信息。必须有请求时才可以监控到。

5.1 流控规则

直接限流:直接对某个接口限流

关联限流:对某个接口的访问会影响对另一个接口的访问

链路限流:可以通过链路入侵到其他层,比如controller层、service层

链路过滤相关代码:

1、在服务提供者provider项目的pom.xml中加入依赖

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.7.1</version>
</dependency><dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-web-servlet</artifactId>
<version>1.7.1</version>
</dependency>

2、在provider的application中增加配置

spring:
cloud:
sentinel:
filter:
enabled: false

3、增加配置类

package com.southwind.configuration;import com.alibaba.csp.sentinel.adapter.servlet.CommonFilter;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FilterConfiguration { @Bean
public FilterRegistrationBean registrationBean(){
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(new CommonFilter());
registrationBean.addUrlPatterns("/*");
registrationBean.addInitParameter(CommonFilter.WEB_CONTEXT_UNIFY,"false");
registrationBean.setName("sentinelFilter");
return registrationBean;
}
}

4、Service

@Service
public class HelloService { @SentinelResource("test")
public void test(){
System.out.println("test");
}
}

5、Controller

@Autowired
private HelloService helloService;@GetMapping("/test1")
public String test1(){
this.helloService.test();
return "test1";
}@GetMapping("/test2")
public String test2(){
this.helloService.test();
return "test2";
}

6、访问接口http://localhost:8081/test1和http://localhost:8081/test2。然后再sentinel控制台簇点链路会看到两个接口test1和test2。对test1接口增加一个流控,设置单机阈值=1,流控模式魏链路,入口资源为/test1,新增。然后再去分别刷新访问两个接口,会发现test1接口刷新太快会限流,test2接口不会限流。

5.2流控效果

快速失败

直接抛出异常

Warm UP

给系统一个预热的时间,预热时间段内单机阈值较低,预热时间过后单机阈值增加,预热时间内当前的单机阈值是设置的阈值的三分之一,预热时间过后单机阈值恢复设置的值。

排队等待

当请求调用失败之后,不会立即抛出异常,等待下一次调用,时间范围是超时时间,在时间范围内如果能请求成功则不抛出异常,如果请求则抛出异常。

5.3 降级规则

RT

单个请求的响应时间超过阈值,则进入准降级状态,接下来 1 S 内连续 5 个请求响应时间均超过阈值,就进行降级,持续时间为时间窗口的值。

异常比例

每秒异常数量占通过量的比例大于阈值,就进行降级处理,持续时间为时间窗口的值。

异常数

1 分钟内的异常数超过阈值就进行降级处理,时间窗口的值要大于 60S,否则刚结束熔断又进入下一次熔断了。

5.4 热点规则

热点规则是流控规则的更细粒度操作,可以具体到对某个热点参数的限流,设置限流之后,如果带着限流参数的请求量超过阈值,则进行限流,时间为统计窗口时长。

必须要添加 @SentinelResource,即对资源进行流控。

@GetMapping("/hot")
@SentinelResource("hot")
public String hot(
@RequestParam(value = "num1",required = false) Integer num1,
@RequestParam(value = "num2",required = false) Integer num2){
return num1+"-"+num2;
}

5.5 授权规则

给指定的资源设置流控应用(追加参数),可以对流控应用进行访问权限的设置,具体就是添加白名单和黑名单。

如何给请求指定流控应用,通过实现 RequestOriginParser 接口来完成,代码如下所示。

package com.southwind.configuration;import com.alibaba.csp.sentinel.adapter.servlet.callback.RequestOriginParser;
import org.springframework.util.StringUtils;import javax.servlet.http.HttpServletRequest;public class RequestOriginParserDefinition implements RequestOriginParser {
@Override
public String parseOrigin(HttpServletRequest httpServletRequest) {
String name = httpServletRequest.getParameter("name");
if(StringUtils.isEmpty(name)){
throw new RuntimeException("name is null");
}
return name;
}
}

要让 RequestOriginParserDefinition 生效,需要在配置类中进行配置。

package com.southwind.configuration;import com.alibaba.csp.sentinel.adapter.servlet.callback.WebCallbackManager;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
public class SentinelConfiguration { @PostConstruct
public void init(){
WebCallbackManager.setRequestOriginParser(new RequestOriginParserDefinition());
}
}

5.6 自定义规则异常返回

创建异常处理类

package com.southwind.handler;import com.alibaba.csp.sentinel.adapter.servlet.callback.UrlBlockHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowException;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;public class ExceptionHandler implements UrlBlockHandler {
@Override
public void blocked(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws IOException {
httpServletResponse.setContentType("text/html;charset=utf-8");
String msg = null;
if(e instanceof FlowException){
msg = "限流";
}else if(e instanceof DegradeException){
msg = "降级";
}
httpServletResponse.getWriter().write(msg);
}
}

进行配置。

@Configuration
public class SentinelConfiguration { @PostConstruct
public void init(){
WebCallbackManager.setUrlBlockHandler(new ExceptionHandler());
}
}

六、整合 RocketMQ

6.1 安装 RocketMQ

1、传入 Linux 服务器

2、解压缩,把rocketmq的文件夹放到/usr/local/目录下

unzip rocketmq-all-4.7.1-bin-release.zip

3、切换到rockmq的目录中,启动 NameServer

nohup ./bin/mqnamesrv &

4、检查是否启动成功

netstat -an | grep 9876

5、启动 Broker

启动之前需要编辑配置文件,修改 JVM 内存设置,默认给的内存 4 GB,超过我们的 JVM 了。

cd bin
vim runserver.sh

改成如下参数:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh

改成如下参数:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

启动 Broker

nohup ./mqbroker -n localhost:9876 &

可以查看日志

tail -f ~/logs/rocketmqlogs/broker.log

启动成功

6、测试 RocketMQ

消息发送

cd bin
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Producer

消息接收

cd bin
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

7、关闭 RocketMQ

cd bin
./mqshutdown broker
./mqshutdown namesrv

6.2 安装 RocketMQ 控制台

1、把rocketmq的客户端文件解压缩,进入目录rocketmq-externals-rocketmq-console-1.0.0\rocketmq-console\src\main\resources,修改配置application.yml,打包

mvn clean package -Dmaven.test.skip=true

2、进入 target 启动 jar

java -jar rocketmq-console-ng-1.0.0.jar

打开浏览器访问 localhost:9877,如果报错

这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 9876 端口

firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

重新启动控制台项目

6.3 Java 实现消息发送

1、pom.xml 中引入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>

2、生产消息

package com.southwind;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class Test {
public static void main(String[] args) throws Exception {
//创建消息生产者
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//设置NameServer
producer.setNamesrvAddr("192.168.248.129:9876");
//启动生产者
producer.start();
//构建消息对象
Message message = new Message("myTopic","myTag",("Test MQ").getBytes());
//发送消息
SendResult result = producer.send(message, 1000);
System.out.println(result);
//关闭生产者
producer.shutdown();
}
}

3、直接运行,如果报错 sendDefaultImpl call timeout,可以开放 10911 端口

firewall-cmd --zone=public --add-port=10911/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

打开 RocketMQ 控制台,可查看消息。

6.4 Java 实现消息消费

package com.southwind.service;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;@Slf4j
public class ConsumerTest {
public static void main(String[] args) throws MQClientException {
//创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//设置NameServer
consumer.setNamesrvAddr("192.168.248.129:9876");
//指定订阅的主题和标签
consumer.subscribe("myTopic","*");
//回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("Message=>{}",list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}

6.5 Spring Boot 整合 RocketMQ

provider中添加内容

1、pom.xml

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

2、application.yml

rocketmq:
name-server: 192.168.248.129:9876
producer:
group: myprovider

3、Order

package com.southwind.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Integer id;
private String buyerName;
private String buyerTel;
private String address;
private Date createDate;
}

4、Controller

@Autowired
private RocketMQTemplate rocketMQTemplate;@GetMapping("/create")
public Order create(){
Order order = new Order(
1,
"张三",
"123123",
"软件园",
new Date()
);
this.rocketMQTemplate.convertAndSend("myTopic",order);
return order;
}

consumer中添加内容

1、pom.xml

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

2、application.yml

rocketmq:
name-server: 192.168.248.129:9876

3、Service

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "myConsumer",topic = "myTopic")
public class SmsService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("新订单{},发短信",order);
}
}

先启动provider,再启动consumer。然后调用provider的接口产生消息:http://localhost:8081/create,消息会存入rocketmq服务器中,consumer一直监听着服务器,当服务器新存入消息的时候就从服务器中读取消息。可能会遇到无法读取消息的情况,可以重启一下consumer。

七、服务网关

Spring Cloud Gateway 是基于 Netty,跟 Servlet 不兼容,所以你的工程中不能出现 Servlet 的组件 。

1、pom.xml

注意,一定不能出现 spring web 的依赖,因为 Gateway 与 Servlet 不兼容。因此需要把spring-boot-starter-web依赖从父工程中添加到需要它的子工程中,这样gateway中就不会依赖到它了。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

2、application.yml

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: provider_route
uri: http://localhost:8081
predicates:
- Path=/provider/**
filters:
- StripPrefix=1

上面这种做法其实没有用到 nacos ,现在我们让 gateway 直接去 nacos 中发现服务,配置更加简单了。

1、pom.xml 引入 nacos

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency><dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

2、application.yml

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true

7.1 Gateway 限流

7.1.1 基于路由限流

1、pom.xml

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency><dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>

2、配置类

package com.southwind.configuration;import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;import javax.annotation.PostConstruct;
import java.util.*;@Configuration
public class GatewayConfiguration {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
} //配置限流的异常处理
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
} //配置初始化的限流参数
@PostConstruct
public void initGatewayRules(){
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(
new GatewayFlowRule("provider_route")
.setCount(1)
.setIntervalSec(1)
);
GatewayRuleManager.loadRules(rules);
} //初始化限流过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
} //自定义限流异常页面
@PostConstruct
public void initBlockHandlers(){
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
Map map = new HashMap();
map.put("code",0);
map.put("msg","被限流了");
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}
}

3、application.yml

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: provider_route
uri: http://localhost:8081
predicates:
- Path=/provider/**
filters:
- StripPrefix=1

7.1.2 基于 API 分组限流

1、修改配置类,添加基于 API 分组限流的方法,修改初始化的限流参数

package com.southwind.configuration;import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem;
import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler;
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.result.view.ViewResolver;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;import javax.annotation.PostConstruct;
import java.util.*;@Configuration
public class GatewayConfiguration { private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
} //配置限流的异常处理
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
} //配置初始化的限流参数
@PostConstruct
public void initGatewayRules(){
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(new GatewayFlowRule("provider_api1").setCount(1).setIntervalSec(1));
rules.add(new GatewayFlowRule("provider_api2").setCount(1).setIntervalSec(1));
GatewayRuleManager.loadRules(rules);
} //初始化限流过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
} //自定义限流异常页面
@PostConstruct
public void initBlockHandlers(){
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
Map map = new HashMap();
map.put("code",0);
map.put("msg","被限流了");
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
} //自定义API分组
@PostConstruct
private void initCustomizedApis(){
Set<ApiDefinition> definitions = new HashSet<>();
ApiDefinition api1 = new ApiDefinition("provider_api1")
.setPredicateItems(new HashSet<ApiPredicateItem>(){{
add(new ApiPathPredicateItem().setPattern("/provider/api1/**")
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
}});
ApiDefinition api2 = new ApiDefinition("provider_api2")
.setPredicateItems(new HashSet<ApiPredicateItem>(){{
add(new ApiPathPredicateItem().setPattern("/provider/api2/demo1"));
}});
definitions.add(api1);
definitions.add(api2);
GatewayApiDefinitionManager.loadApiDefinitions(definitions);
}
}

2、在provider的Controller中添加方法

@GetMapping("/api1/demo1")
public String demo1(){
return "demo";
}@GetMapping("/api1/demo2")
public String demo2(){
return "demo";
}@GetMapping("/api2/demo1")
public String demo3(){
return "demo";
}@GetMapping("/api2/demo2")
public String demo4(){
return "demo";
}

7.1.3 基于 Nacos 服务发现组件进行限流

server:
port: 8010
spring:
application:
name: gateway
cloud:
gateway:
discovery:
locator:
enabled: true

API 分组代码修改,改为 discovery 中的服务名。

ApiDefinition api2 = new ApiDefinition("provider_api2")
.setPredicateItems(new HashSet<ApiPredicateItem>(){{
add(new ApiPathPredicateItem().setPattern("/p1/api2/demo1"));
}});

八、分布式事务

8.1 模拟分布式事务异常

1、创建两个工程 order、pay,pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency><dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

2、建两个数据库 order、pay,两个微服务分别访问。

order数据库里面建一个表orders,字段有id和username。

pay数据库里面建一个表pay,字段有id和username。

3、分别写两个服务的 application.yml

server:
port: 8010
spring:
application:
name: order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
url: jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
server:
port: 8020
spring:
application:
name: pay
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
url: jdbc:mysql://localhost:3306/pay?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC

4、分别写两个 Service

package com.southwind.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate; public void save(){
this.jdbcTemplate.update("insert into orders(username) values ('张三')");
}
}
package com.southwind.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;@Service
public class PayService {
@Autowired
private JdbcTemplate jdbcTemplate; public void save(){
this.jdbcTemplate.update("insert into pay(username) values ('张三')");
}
}

5、控制器 Order 通过 RestTemplate 调用 Pay 的服务

package com.southwind.controller;import com.southwind.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;@RestController
public class OrderController { @Autowired
private OrderService orderService;
@Autowired
private RestTemplate restTemplate; @GetMapping("/save")
public String save(){
//订单
this.orderService.save();
int i = 10/0;
//支付
this.restTemplate.getForObject("http://localhost:8020/save",String.class);
return "success";
}
}
package com.southwind.controller;import com.southwind.service.PayService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class PayController {
@Autowired
private PayService payService; @GetMapping("/save")
public String save(){
this.payService.save();
return "success";
}
}

6、启动类

package com.southwind;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;@SpringBootApplication
public class OrderApplication { public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
} @Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}
package com.southwind;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PayApplication { public static void main(String[] args) {
SpringApplication.run(PayApplication.class, args);
}}

分布式异常模拟结束,Order 存储完成之后,出现异常,会导致 Pay 无法存储,但是 Order 数据库不会进行回滚。

8.2 Seata 解决

1、下载

2、解压,进入到conf文件夹下修改两个文件

regisry.conf

registry {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}config {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}

nacos-config.txt

3、启动 Nacos,通过git bish的命令行窗口来运行 nacos-config.sh,将 Seata 配置导入 Nacos

进入 conf,右键 Git Bash Here

cd conf
sh nacos-config.sh 127.0.0.1

执行成功,刷新 Nacos,配置加入

nacos-config.txt 配置已生效

4、启动 Seata Server, JDK 8 以上环境无法启动

cd bin
seata-server.bat -p 8090 -m file

启动成功,Nacos 注册成功。

Seata 服务环境搭建完毕,接下来去应用中添加。

1、初始化数据库,在两个数据库中添加事务日志记录表,SQL Seata 已经提供。复制conf文件夹下db_undo_log.sql中的建表语句分别在order数据库和pay数据库中新建undo_log表。

2、直接在两个数据库运行脚本。

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

3、两个工程的 pom.xml 添加 Seata 组件和 Nacos Config 组件。

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.1.1.RELEASE</version>
</dependency><dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

4、给 JDBCTemplate 添加代理数据源,因为本地数据源无法通知两个不同的数据库使用代理数据源才可以

package com.southwind;import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.client.RestTemplate;import javax.sql.DataSource;@SpringBootApplication
public class OrderApplication { public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
} @Bean
public RestTemplate restTemplate(){
return new RestTemplate();
} @Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource){
return new JdbcTemplate(new DataSourceProxy(dataSource));
}
}
package com.southwind;import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.JdbcTemplate;import javax.sql.DataSource;@SpringBootApplication
public class PayApplication { public static void main(String[] args) {
SpringApplication.run(PayApplication.class, args);
} @Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource){
return new JdbcTemplate(new DataSourceProxy(dataSource));
}}

5、将 registry.conf 复制到两个工程的 resources 下。

6、给两个工程添加 bootstrap.yml 读取 Nacos 配置。

spring:
application:
name: order
cloud:
nacos:
config:
server-addr: localhost:8848
namespace: public
group: SEATA_GROUP
alibaba:
seata:
tx-service-group: ${spring.application.name}
spring:
application:
name: pay
cloud:
nacos:
config:
server-addr: localhost:8848
namespace: public
group: SEATA_GROUP
alibaba:
seata:
tx-service-group: ${spring.application.name}

tx-service-group 需要和 Nacos 配置中的名称一致。

7、在 Order 调用 Pay 处添加注解 @GlobalTransactional

package com.southwind.controller;import com.southwind.service.OrderService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;@RestController
public class OrderController { @Autowired
private OrderService orderService;
@Autowired
private RestTemplate restTemplate; @GetMapping("/save")
@GlobalTransactional
public String save(){
//订单
this.orderService.save();
int i = 10/0;
//支付
this.restTemplate.getForObject("http://localhost:8020/save",String.class);
return "success";
}
}
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,121
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,593
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,438
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,209
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,845
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,930