Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的开发便利性简化了分布式系统的开发,比如服务发现、服务网关、服务路由、链路追踪等。Spring Cloud 并不重复造轮子,而是将市面上开发得比较好的模块集成进去,进行封装,从而减少了各模块的开发成本。换句话说:Spring Cloud 提供了构建分布式系统所需的“全家桶”。

1.nacos

1.引入依赖

你需要下载Nacos的安装包,你可以从Nacos的官方网站上下载:https://github.com/alibaba/nacos/releases

2.配置文件

父工程:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.2.5.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

客户端:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

配置nacos地址

string:
	application:
		iname:item-service
	cloud:
		nacos:
			server-addr:192.168.3.133:8848
PREFER_HOST_MODE=hostname
MODE=standalone
SPRING_DATASOURCE_PLATFORM=mysql
MYSQL_SERVICE_HOST=192.168.3.133    //配置为自己的ip
MYSQL_SERVICE_DB_NAME=nacos
MYSQL_SERVICE_PORT=3306
MYSQL_SERVICE_USER=root
MYSQL_SERVICE_PASSWORD=123
MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
docker run -d --name nacos --env-file /root/nacos/custom.env -p 8848:8848 -p 9848:9848 -p 9849:9849 --restart=always nacos/nacos-server:v2.1.1-slim

2.openFeign

1.引入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>

2.通过@EnableFeignCilents注解

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;

@EnableFeignClients
@MapperScan("com.cartService.mapper")
@SpringBootApplication
public class CartServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(CartServiceApplication.class, args);
    }
}

3.编写FeignCilent

import com.cartService.domain.dto.ItemDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

import java.util.List;

@FeignClient("item-service")
public interface itemClient {
    @GetMapping("/items")
    List<ItemDTO> queryItemByIds(@RequestParam("ids") List<Long> ids);
}

4.使用FeignCilent,实现远程调用

private final itemClient itemClient;

List<ItemDTO> items = itemClient.queryItemByIds(itemIds);

3.OpenFeign整合OkHttp

1.引入依赖

<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-okhttp</artifactId>
</dependency>

2.开启连接池

feign:
  okhttp:
    enabled: true

4.最佳实践

多个微服务使用相同的cilent,可能需要重复写多遍,如果此时更改,那么全部都要更改

1.新建一个模块richu-api 引入openFeign的依赖

2.配置dto和client

3.在其他模块引入

<dependency>
    <groupId>com.richu</groupId>
    <artifactId>richu-api</artifactId>
    <version>1.0.0</version>
</dependency>

4.指定包名

@EnableFeignClients(basePackages = "com.hmall.api.client")

5.日志输出

openFeign只有在日志级别为Debug才会进行日志输出

  • None,不记录任何值,默认值
  • BASIC 仅记录请求的方法,URL和响应状态码和执行时间
  • Headers:在basic的基础上,额外记录请求和响应的头信息
  • Full:记录所有的请求和响应的明细,包括头信息,请求体,元数据
public class DefaultFeignConfig {
    @Bean
    public Logger.Level feignLoggerLevel() {
        return Logger.Level.FULL;
    }
}

局部配置,在@FeignClient注解中声明

@FeignClient(value = "item-service",configuration = DefaultFeignConfig.class)
public interface ItemClient {

    @GetMapping("/items")
    List<ItemDTO> queryItemByIds(@RequestParam("ids") Collection<Long> ids);
}

全局配置,在@EnableFeignClients注解中声明

@EnableFeignClients(basePackages = "com.hmall.api.client",defaultConfiguration = DefaultFeignConfig.class)
@MapperScan("com.cartService.mapper")
@SpringBootApplication
public class CartServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(CartServiceApplication.class, args);
    }
}

6.网关

1.使用

网络 关口,负责请求的路由、转发、身份校验

1.创建新模块

2.引入网关依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-loadbalancer</artifactId>
    </dependency>
</dependencies>

3.编写启动类

@SpringBootApplication
public class GatewayApplication {
    public static void main(String[] args) {
        SpringApplication.run(GatewayApplication.class,args);
    }
}

4.配置路由规则

spring:
  application:
    name:gateway
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.3.1:8848
    gateway:
      routes:
        - id: item-service    //路由id 自定义 唯一
          uri: lb://item-service  //目标微服务 lb代表负载均衡
          predicates:     //路由断言
            - Path=/items/**  //请求路径做判断
        - id: cart-service
          uri: lb://cart-service
          predicates:
            - Path=/carts/**

2.路由属性

id 路由唯一标识

uri路由目标地址

路由断言

After:在指定时间之后进行路由

Before:在指定时间之前进行路由

Between:在指定时间之间进行路由

Cookie   配置说明:【Cookie=cookie名, cookie值的正则表达式规则】

Header  配置说明:【Header=header名, header值的正则表达式规则】

Host  配置说明:【Host=主机名(可配置多个,也可以使用通配符)】

Method   配置说明:【Method=请求类型】

Path  配置说明:【Path=请求路径】

Query  配置说明:【Query=参数名,参数值】

路由过滤器

3.自定义过滤器

GateWayFilter:路由过滤器,作用于任意指定的路由

GlobalFilter:全局过滤器,作用范围是所有路由,声明后自动生效

public interface GlobalFilter{
	Mono<void> filter(ServerWebExchange exchange,GatewayFilterChain chain)
	//请求上下文request,response                      过滤器链
}

3.1.GlobalFilter

public class GlobalFilters implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //TODO 请求校验逻辑
        ServerHttpRequest request = exchange.getRequest();
        HttpHeaders headers = request.getHeaders();
        System.out.println("headers = "+headers);
        //放行
        
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

3.2GateWayFilter

@Component
public class GateWayFilter extends AbstractGatewayFilterFactory {
    @Override
    public GatewayFilter apply(Object config) {
        return new GatewayFilter() {
            @Override
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                return chain.filter(exchange);
            }
        };
    }
}

装饰类可以指定顺序

@Component
public class GateWayFilter extends AbstractGatewayFilterFactory {
    @Override
    public GatewayFilter apply(Object config) {
        return new OrderedGatewayFilter(new GatewayFilter() {
            @Override
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                return chain.filter(exchange);
            }
        },1);
    }
}

实现登录校验

package com.richu.gateway.filters;

import cn.hutool.core.text.AntPathMatcher;
import com.hmall.common.exception.UnauthorizedException;
import com.richu.gateway.config.AuthProperties;
import com.richu.gateway.util.JwtTool;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.util.List;

@Component
@RequiredArgsConstructor
public class GlobalFilters implements GlobalFilter, Ordered {

    private final AuthProperties authProperties;

    private final JwtTool jwtTool;
    
    private AntPathMatcher antPathMatcher = new AntPathMatcher();
    private String token;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //获取request
        ServerHttpRequest request = exchange.getRequest();
        //判断是否需要进行登录拦截
        if (isExclude(request.getPath().toString())){
            return chain.filter(exchange);
        }
        //获取token
        List<String> hearders = request.getHeaders().get("Authorization");
        if (hearders != null && !hearders.isEmpty()) {
            token = hearders.get(0);
        }
        Long userid = null;
        //校验token
        try {
            userid = jwtTool.parseToken(token);
        }catch (UnauthorizedException e){
            ServerHttpResponse response = exchange.getResponse();
            response.setStatusCode(HttpStatus.UNAUTHORIZED);
            return response.setComplete();
        }
        //TODO 传递用户信息
        System.out.println(userid);
        //放行
        return chain.filter(exchange);
    }

    private boolean isExclude(String path) {
        for (String pathPattern: authProperties.getExcludePaths()){
            if (antPathMatcher.match(path,pathPattern)){
                return true;
            }
        }
        return false;
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

4.网关传递信息到微服务

//传递用户信息
String userinfo = userid.toString();
ServerWebExchange swe = exchange.mutate()
    .request(builder -> builder.header("user-info", userinfo)).build();
//放行
return chain.filter(swe);

在微服务的common模块拦截器中获取,这样在其他业务就可以直接使用

1.定义拦截器类

public class UseInfoInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //获取登录用户信息
        String userinfo = request.getHeader("user-info");
        //判断是否获取了用户如果是,存入ThreadLocal
        if (StrUtil.isBlankIfStr(userinfo)){
            UserContext.setUser(Long.parseLong(userinfo));
        }
        //放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        HandlerInterceptor.super.afterCompletion(request, response, handler, ex);
    }
}

2.配置类

@Configuration
public class MvcConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new UseInfoInterceptor());
    }
}

3.配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.hmall.common.config.MyBatisConfig,\
  com.hmall.common.config.MvcConfig,\
  com.hmall.common.config.JsonConfig

MvcConfig类会被其他没有SpringMvc的环境加载,此时会报错

//解决措施,在MvcConfig上加
@ConditionalOnClass(DispatcherServlet.class)

7.配置管理

1.配置热更新

减少重复配置,简化开发

nacos –> 配置管理

将yaml配置文件移到nacos配置

//nacos项
<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>pring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

//bootstrap配置
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
spring:
	cloud:
		nacos:
			server-addr:192.168.3.133
			config:
				file-extension:yaml
				sgared-configs:
					--data-id:shared-jdbc.yaml
					--data-id:shared-log.yaml
					--data-id:shared-swagger.yaml

配置热更新

无需重启即可使配置重新生效

前提条件

1.nacos

@Data
@Component
@ConfigurationPreperties(prefix = "hm.cart")
public class config{
	private Integer num;
}

2.动态路由

监听到路由信息之后,可以利用RouteDefinitionWriter来更新路由表

package com.richu.gateway.routers;


import cn.hutool.json.JSONUtil;
import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.cloud.gateway.route.RouteDefinitionWriter;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;

@Slf4j
@Component
@RequiredArgsConstructor
public class DunamicRoute {
    private final NacosConfigManager nacosConfigManager;

    private final RouteDefinitionWriter writer;

    private final String dataId = "gateway-routes.json";

    private final String group = "DEFAULT_GROUP";

    private final Set<String> RouteIds = new HashSet<>();
    //项目启动时,先拉取一次配置,并且添加配置监听器
    @PostConstruct
    public void init() throws NacosException {
        String configInfo =  nacosConfigManager.getConfigService().getConfigAndSignListener(dataId, group, 5000, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }

            @Override
            public void receiveConfigInfo(String configInfo){
                //监听到配置变更,需要更新路由表
                updateConfigInfo(configInfo);
            }
        });
        //第一次读取配置
        updateConfigInfo(configInfo);
    }

    public void updateConfigInfo(String config){
        //1.解析配置信息
        List<RouteDefinition> list = JSONUtil.toList(config, RouteDefinition.class);
        //2.删除旧的路由表
        for (String ids : RouteIds) {
            //删除路由表
            writer.delete(Mono.just(ids)).subscribe();
        }
        RouteIds.clear();
        //3.更新路由表
       for (RouteDefinition routeDefinition : list) {
           //更新路由表
           writer.save(Mono.just(routeDefinition)).subscribe();
           //记录路由id
           RouteIds.add(routeDefinition.getId());
       }
    }
}

将路由配置写入nacos的gateway-routes.json配置文件中

8.微服务保护和分布式事务

雪崩问题 微服务调用链的某个服务故障,导致整个链路的微服务都不可用

1.解决方案

请求限流:限制访问微服务的请求的并发量

线程隔离:舱壁模式

服务熔断:

​ 由断路器统计请求的异常比例或慢调用比例,如果超出阈值则会熔断该业务,则拦截该接口的请求

Sentinel Hystrix
隔离策略 信号量隔离 线程池隔离/信号量隔离
熔断降级策略 基于慢调⽤⽐例或异常⽐例 基于失败⽐率
实时指标实现 滑动窗⼝ 滑动窗⼝(基于 RxJava)
规则配置 ⽀持多种数据源 ⽀持多种数据源
扩展性 多个扩展点 插件的形式
基于注解的⽀持 ⽀持 ⽀持
限流 基于 QPS,⽀持基于调⽤关系的限流 有限的⽀持
流量整形 ⽀持慢启动、匀速排队模式 不⽀持
系统⾃适应保护 ⽀持 不⽀持
控制台 开箱即⽤,可配置规则、查看秒级监控、机器发现等 不完善
常⻅框架的适配 Servlet、Spring Cloud、Dubbo、gRPC 等 Servlet、Spring Cloud Netflix

2.Sentinel

1.安装控制台

alibaba/Sentinel: A powerful flow control component enabling reliability, resilience and monitoring for microservices. (面向云原生微服务的高可用流控防护组件) (github.com)

启动

java  -server -Xms64m -Xmx256m  -Dserver.port=8090 -Dcsp.sentinel.dashboard.server=localhost:8090 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar

访问

用户名和密码sentinel

[Sentinel Dashboard](http://localhost:8090/#/login)

2.引入依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

3.配置文件

spring:
  cloud:
    sentinel:
      transport:
        dashboard: localhost:8090

4.簇点链路

单机调用链路,是进入服务后被Sentinel监控的资源链,默认Sentinel只会监控SpringMvc的每一个Endpoint(http接口)

RestFul的API请求路径相同,这会导致簇点资源名称重复。

spring:
  cloud:
    sentinel:
      transport:
        dashboard: localhost:8090
      http-method-specify: true

5.请求限流

点击流控,配置单机阈值

6.线程隔离

点击流控,选择并发线程数,设置单机阈值

7.Fallback

feign:
  sentinel:
    enabled: true
  okhttp:
    enabled: true

定义fallbackFactory

@Slf4j
public class ItemClientFallbackFactory implements FallbackFactory<ItemClient> {

    @Override
    public ItemClient create(Throwable cause) {
        return new ItemClient() {
            @Override
            public List<ItemDTO> queryItemByIds(Collection<Long> ids) {
                //进行错误处理
                log.info("查询商品失败",cause);
                return null;
            }
        };
    }
}

将该类定义为bean

@Bean
public ItemClientFallbackFactory itemClientFallbackFactory(){
    return new ItemClientFallbackFactory();
}

写上fallbackFactory

@FeignClient(value = "item-service",fallbackFactory = ItemClientFallbackFactory.class)
public interface ItemClient {

    @GetMapping("/items")
    List<ItemDTO> queryItemByIds(@RequestParam("ids") Collection<Long> ids);
}

8.服务熔断

断路器统计服务调用的异常比例、慢请求比例,如果超出阈值就会熔断该服务。拦截该服务的一切请求;当服务恢复时,断路器会放行

3.分布式事务

分布式事务就是指事务的发起者、资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上。在上述转账的业务中,用户A-100操作和用户B+100操作不是位于同一个节点上。本质上来说,分布式事务就是为了保证在分布式场景下,数据操作的正确执行。

1.seata

seata事务管理有三个角色

TC:事务协调:维护全局和分支事务的状态,协调全局事务提交或者回滚

TM:事务管理器:定义全局事务的范围,开始全局事务提交和回滚全局事务

TC:资源管理器:管理分支事务,与TC交谈以注册分支事务和报告分支事务的状态

2.部署TC服务

1.导入数据库

2.导入配置文件

3.Docker部署

docker load -i  seata-1.5.2.tar
docker inspect mysql
docker run --name seata -p 8099:8099 -p 7099:7099 -e SEATA_IP=192.168.3.133 -v ./seata:/seata-server/resources --privileged=true --network richu -d seataio/seata-server:1.5.2

3.微服务整合seata

1.导入依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

2.在nacos里田家庵配置

seata:
	registery:
		type:nacos  注册中心nacos
		nacos:
			server-addr:nacos地址
			namespace:""
			group:DEFAULT_GROUP
			application:service   #seata服务名
			username:nacos
			password:nacos
		tx-service-group:事务组名
		service:
			vgroup-mapping:
			事务名:"default"

4.XA模式

seata:
	registery:
		type:nacos  注册中心nacos
		nacos:
			server-addr:nacos地址
			namespace:""
			group:DEFAULT_GROUP
			application:service   #seata服务名
			username:nacos
			password:nacos
		tx-service-group:事务组名
		service:
			vgroup-mapping:
			事务名:"default"
	data-source-proxy-mode:XA

入口声明为GlobalTransational

其他相关方法注解Transational

5.AT模式

seata主推的是AT模式

在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存

成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作

全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

1.将undo-log数据表添加到每一个微服务数据库中

因为seata默认使用at,所以可以不用配置

8.RabbitMQ

高性能异步通信组件

同步调用优点

时效性强,等到结果才返回

同步调用缺点

扩展性差

性能下降

级联失败问题

异步调用

消息发送者:投递消息的人,就是原来的调用者 Publisher

消费接收者:接收和处理消息的人,就是原来的服务提供者 Consumer

消息代理:管理、暂存、转发消息,类似与服务器 Broker

1.安装rabbitMQ

docker run -e RABBITMQ_DEFAULT_USER=richu -e RABBITMQ_DEFAULT_PASS=12345678 -v mq-plugins:/plugins –name mq –hostname mq -p 15672:15672 -p 5672:5672 –network richu -d rabbitmq:3.8-management

2.Java客户端的收发流程

消息通信协议AMQP,该协议与语言无关,更符合微服务中独立性的要求

1.创建queue

2.在父工程引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.配置yaml文件

spring:
  rabbitmq:
    host: 192.168.3.133
    port: 5672
    virtual-host: /
    username: richu
    password: 12345678

4.测试发送

@SpringBootTest
class PublisherApplicationTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testqueue(){
        String queueName = "queue1";
        String Message = "hello mq";
        
        rabbitTemplate.convertAndSend(queueName,Message);
    }
}

5.测试接收

@Slf4j
@Component
public class SpringRabiitListener {

    @RabbitListener(queues = "queue1")
    public void receive1(String msg) {
        log.info("收到的消息为:{}",msg);
    }
}

3.Work Queues

多个消费者监听一个队列

@Slf4j
@Component
public class SpringRabiitListener {

    @RabbitListener(queues = "queue1")
    public void receive1(String msg) {
        log.info("1收到的消息为:{}",msg);
    }

    @RabbitListener(queues = "queue1")
    public void receive2(String msg) {
        System.out.printf("2收到的消息为:%s\n",msg);
    }
}

@Test
public void testqueue(){
    String queueName = "queue1";
    for (int i = 0; i < 10; i++) {
        String Message = "hahaha" + i;
        rabbitTemplate.convertAndSend(queueName,Message);
    }
}
2收到的消息为:hahaha1
2收到的消息为:hahaha3
2收到的消息为:hahaha5
2收到的消息为:hahaha7
08-28 13:19:07:463  INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener     : 1收到的消息为:hahaha0
2收到的消息为:hahaha9
08-28 13:19:07:464  INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener     : 1收到的消息为:hahaha2
08-28 13:19:07:464  INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener     : 1收到的消息为:hahaha4
08-28 13:19:07:464  INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener     : 1收到的消息为:hahaha6
08-28 13:19:07:464  INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener     : 1收到的消息为:hahaha8

同一个消息只能被一个消费者处理

默认情况为轮询,但不同设备的处理速度可能不同,从而导致慢的很忙,快的很闲

更改配置

spring:
  rabbitmq:
    host: 192.168.3.133
    port: 5672
    virtual-host: /
    username: richu
    password: 12345678
    listener:
      direct:
        prefetch: 1

4.Fanout交换机exchange

交换机接收发送者发送的消息,并且将消息发送到绑定的队列

fanout—>广播

Direct —>定向

Topic –>话题

向交换机发送消息

@Test
public void testqueue1(){
    String queueName = "richu.fanout";
    String Message = "hahaha";
    //交换机名称    RoutingKey    消息
    rabbitTemplate.convertAndSend(queueName,null,Message);
}

接收消息

@RabbitListener(queues = "queue1")
public void receive1(String msg) {
    log.info("1收到的消息为:{}",msg);
}

@RabbitListener(queues = "queue2")
public void receive2(String msg) {
    log.info("2收到的消息为:{}",msg);
}

结果:两个消费者都收到消息

08-28 14:06:18:718  INFO 2900 --- [ntContainer#1-1] c.i.consumer.mq.SpringRabiitListener     : 1收到的消息为:hahaha
08-28 14:06:18:718  INFO 2900 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener     : 2收到的消息为:hahaha

声明两个队列,交换机绑定两个队列

5.Direct交换机

部分交换机收到消息

每一个队列和交换机都设置Bindingkey

发布者发消息的时候指消息的BingKey

交换机将消息路由到BingKey一致的队列

@Test
public void testqueue1(){
    String queueName = "richu.fanout";
    String Message = "我是消息";
    rabbitTemplate.convertAndSend(queueName,"blue",Message);
}

6.Topic交换机

也是基于RouteKey,但是通常是多个单词的组合,以.做分割符

# 代表0或多个单词

*代表一个单词

chain.# #.news

7.声明队列交换机

Queue :声明队列,用工厂类QueueBuilder构建

Exchange:声明交换机 用工厂类ExchangeBuilder构建

Binding:声明队列和交换机的绑定关系, 用工厂类BingdingBuilder构建

@Configuration
public class FanoutConfiguration {
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("richu.exchange");
    }

    @Bean
    public Queue fanoutQueue() {
        //return QueueBuilder.durable("richu.queue").build();
        return new Queue("richu.queue");
    }

    @Bean
    public Binding fanoutBinding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}

SpringAMQP

//基于注解开发
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name="queue1",durable = "true"),
    exchange = @Exchange(name = "richu.fanout",type = ExchangeTypes.DIRECT),
    key = {"red","blue"}
))
public void receive1(String msg) {
    log.info("1收到的消息为:{}",msg);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name="queue2",durable = "true"),
    exchange = @Exchange(name = "richu.fanout",type = ExchangeTypes.DIRECT),
    key = {"red","blue"}
))
public void receive2(String msg) {
    log.info("2收到的消息为:{}",msg);
    }

8.消息转换器

使用Json的序列化代替JDK的转换器

1.引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

2.publish和consumer都要设置

@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

9.RabbitMQ高级

1.发送者重连

有时候由于网络波动可能会导致MQ丢失消息

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.3.133
    port: 5672
    virtual-host: /
    username: richu
    password: 12345678
    template:
      retry:
        enabled: true  #开启超时重试机制
        initial-interval: 100ms #失败后的等待时长
        multiplier: 1 #下次的等待时长倍数
        max-attempts: 3 #最大尝试次数

2.发送者确认

发送者发消息给MQ时,MQ会返回确认结果返回给发送者

1.添加配置

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.3.133
    port: 5672
    virtual-host: /
    username: richu
    password: 12345678
    listener:
      direct:
        prefetch: 1
    publisher-confirm-type: correlated
    publisher-returns: true

publisher-confirm-type

  • none 关闭confirm机制
  • simple同步阻塞等待MQ的回执消息
  • correlate MQ异步回调方式返回回执消息

2.配置

@Slf4j
@Configuration
@RequiredArgsConstructor
public class Mqconfig {

    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("监听到了消息return");
            log.debug("exchange:{}",returned.getExchange());
            log.debug("routingKey:{}",returned.getRoutingKey());
            log.debug("message:{}",returned.getMessage());
            log.debug("replyCode:{}",returned.getReplyCode());
            log.debug("replyText:{}",returned.getReplyText());
        });
    }
}

3.配置发送者

@Test
public void testqueue1(){
    CorrelationData cd  = new CorrelationData(UUID.randomUUID().toString());
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("处理结果异常",ex);
        }

        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            if (result.isAck()){
                log.debug("收到了confirm ack");
            }else{
                log.error("收到了confirm nack !reson:{}",result.getReason());
            }
        }
    });
    String queueName = "richu.fanout";
    String Message = "我是消息";

    rabbitTemplate.convertAndSend(queueName,"blue",Message,cd);
}

3.MQ的可靠性

一旦宕机内存中的所有数据消息都会丢失

内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,引发MQ阻塞

    public void testqueue(){
        //1.自定义构建消息
/*        Message message = MessageBuilder.withBody("hello0".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); //不持久*/
        Message message = MessageBuilder.withBody("hello0".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); //持久
        String queueName = "queue1";
        for (int i = 0; i < 100000; i++) {
            rabbitTemplate.convertAndSend(queueName,message);
        }
    }

4.Lazy Queue

只写入磁盘,消息持久化到磁盘

需要消息时才会从磁盘中读取并加载到内存

3.12后默认支持

x-queue-mode = lazy

4.消费者可靠性

1.消费者确认机制

ack 成功处理消息,MQ队列删除消息

nack 处理失败,MQ队列需要再次投递消息

reject拒绝接收,MQ队列删除消息

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.3.133
    port: 5672
    virtual-host: /
    username: richu
    password: 12345678
    template:
      retry:
        enabled: true  #开启超时重试机制
        initial-interval: 100ms #失败后的等待时长
        multiplier: 1 #下次的等待时长倍数
        max-attempts: 3 #最大尝试次数
    listener:
      direct:
        acknowledge-mode: auto

2.失败重试机制

  • 失败消息处理机制

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

1、RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。

2、ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(不建议采用:会出现死循环)。

3、RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。(推荐使用)

listener: # 开启消费者确认其机制
      simple:
        prefetch: 1  #消费者每次只能获取一条消息,处理完才能获取下一条(可实现能者多劳)
        acknowledge-mode: AUTO  # none:关闭ack;manual:手动ack;auto:自动ack
        retry:
          enabled: true  #开启消费者失败重试
          initial-interval: 1000ms  #初始的失败等待时长为1秒
          multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 #最大重试次数
          stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false

RepublishMessageRecoverer

@Configuration
@RequiredArgsConstructor
public class ErrorMessageConfig {
    private final RabbitTemplate rabbitTemplate;

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("error");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error");
    }

    @Bean
    public Binding bindingError(Queue queue, DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("error");
    }

    @Bean
    public RepublishMessageRecoverer messageConverter(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate,"error","error");
    }
}

5.业务幂等性

执行一个业务一次或者多次对业务状态的影响的一致

1.给每一个消息设置一个唯一id

发送者(不太推荐)

@Bean
public MessageConverter messageConverter(){
    Jackson2JsonMessageConverter jjmc =  new Jackson2JsonMessageConverter();
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

2.业务判断(基于业务本身做判断)

6.延迟消息

1.死信交换机

消费者声明为消费失败,消息的requeue参数设置为false,就会成为死信

消息是一个过期消息

队列消息积满

通过dead-letter-exchange属性指定一个交换机,那么该队列的死信就会投递到这个交换机中

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name="dlx.queue",durable = "true"),
    exchange = @Exchange(name = "dlx.direct",type = ExchangeTypes.DIRECT),
    key = {"hi"}
))
public void listenexchange(Message msg) {
    log.info("死信交换机和死信队列收到的消息为:{}",msg);
}
@Configuration
@RequiredArgsConstructor
public class NormalConfig {

    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal.exchange");
    }

    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();
    }

    @Bean
    public Binding bindingNormal( Queue queue, DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("hi");
    }
}

发送消息

@Test
void testSendDelayMessage(){
    rabbitTemplate.convertAndSend("normal.exchange","hi","我是一只坤",message->{
        message.getMessageProperties().setDelay(10000);
        return message;
    });
}

2.延迟消息插件

安装 docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name="dlx.queue",durable = "true"),
    exchange = @Exchange(name = "dlx.direct",delayed = "true"),
    key = {"hi"}
))
public void delayedexchange(Message msg) {
    log.info("死信交换机和死信队列收到的消息为:{}",msg);
}

发送消息

@Test
void testSendDelayMessage(){
    rabbitTemplate.convertAndSend("normal.exchange","hi","我是一只坤",message->{
        message.getMessageProperties().setDelay(10000);
        return message;
    });
}

10.Elasticsearch

1.安装

  • Elasticsearch
docker run -d --name es -e ES_JAVA_OPTS="-Xms512m -Xmx512m" \
  -e "discovery.type=single-node" \
  -v es-data:/usr/share/elasticsearch/data \
  -v es-plugins:/usr/share/elasticsearch/plugins \
  --privileged \
  --network richu \
  -p 9200:9200 \
  -p 9300:9300 \
  elasticsearch:7.12.1
docker run -d --name es -e ES_JAVA_OPTS="-Xms512m -Xmx512m" -e "discovery.type=single-node" -v es-data:/usr/share/elasticsearch/data -v es-plugins:/usr/share/elasticsearch/plugins --privileged --network richu -p 9200:9200 -p 9300:9300 elasticsearch:7.12.1
  • kibana 图形化
docker run -d --name kibana -e ELASTICSEARCH_HOST=http://es:9200 --network=richu -p 5601:5601 kibana:7.12.1

2.倒排索引

文档:每条数据就是一个文档

词条:文档按照语义分成的词语

POST /_analyze
{
	"analyzer":"standard",
	"test":"我爱学习,我每天学习"
}

扩展分词器

IkAnalyzer.cfg.xml

<entry key="ext_dict">ext.dic</entry>

3.基础概念

索引库 : 相同类型的文档的集合

type 字段数据类型,常见的简单类型有:

  • 字符串:text(可分词文本)、keyword(精确值、品牌、国家、ip地址)
  • 数值:long 、integer、short、byte、double、float
  • 布尔:boolean
  • 日期:date
  • 对象:object

index是否创建索引,默认为true

analyzer:使用哪种分词器

properties:该字段的子字段

4.索引库的CURD操作

PUT /索引库
{
	"mapping":{
		"properties":{
			"字段名":{
				"type":"text",
				"analyzer":"ik_smart"
			}
		}
	}
}

查询

GET /richu

删除

DELETE /richu

修改

PUT /修改的索引库/_mapping{
	"properties":{
		"新字段名":{
			"type":"text",
			"analyzer":"ik_smart"
		}
	}
}

5.文档的CURD

新增文档

POST /索引库/_doc/文档id
{
	"字段1":"值1",
	"字段2":"值2",
	"字段3":"值3"
}

查询

GET /索引库/_doc/文档id

删除

DELETE /索引库/_doc/文档id

修改(先删除再新增)(如果没有就会添加)

PUT /索引库/_doc/文档id
{
	"字段1":"值1",
	"字段2":"值2",
	"字段3":"值3"
}

增量修改

POST /索引库/_doc/文档id
{
	"doc":{
		"字段名":"值"
	}
}

6.批处理请求

POST /_bulk
//新增
{"index" : { "_index":"索引名","_id":"1"}}
{"字段1":"值1","字段2":"值2"}

//删除
{"delete":{"_index":"test","_id":2}}

//修改
{"update":{"_id":"1","_index":"test"}}
{"doc":{"field2":"value"}}

6.JavaRestClient

<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

默认为7.17.10 覆盖版本

<elasticsearch.version>7.12.1</elasticsearch.version>

创建client

@BeforeEach
void setUp(){
    client = new RestHighLevelClient(RestClient.builder(HttpHost.create("http://192.16.3.133:9200")));
}
@AfterEach
void destrory() throws IOException {
    if (client != null){
        client.close();
    }
}

测试

@Test
void test(){
    System.out.println("client"+client);
}

7.操作索引库的API

新增

@Test
void createIndex() throws IOException {
    //准备request对象
    CreateIndexRequest request = new CreateIndexRequest("items");
    //2.准备发送请求
    request.source(MAPPING_TEMPLATE, XContentType.JSON);
    //发送
    client.indices().create(request, RequestOptions.DEFAULT);
}


 private static final String MAPPING_TEMPLATE = "{\n" +
            "    \"mappings\":{\n" +
            "        \"properties\":{\n" +
            "            \"id\":{\n" +
            "                \"type\":\"keyword\"\n" +
            "            },\n" +
            "            \"name\":{\n" +
            "                \"type\":\"keyword\",\n" +
            "                \"analyzer\":\"ik_smart\"\n" +
            "            },\n" +
            "            \"price\":{\n" +
            "                \"type\":\"keyword\",\n" +
            "                \"index\":false\n" +
            "            },\n" +
            "            \"category\":{\n" +
            "                \"type\":\"keyword\"\n" +
            "            },\n" +
            "            \"brand\":{\n" +
            "                \"type\":\"keyword\"\n" +
            "            },\n" +
            "            \"sold\":{\n" +
            "                \"type\":\"integer\"\n" +
            "            },\n" +
            "            \"commentCount\":{\n" +
            "                \"type\":\"integger\",\n" +
            "                \"index\":false\n" +
            "            },\n" +
            "            \"isAD\":{\n" +
            "                \"type\":\"boolean\"\n" +
            "            },\n" +
            "            \"updateTime\":{\n" +
            "                \"type\":\"date\"\n" +
            "            }\n" +
            "\t    }\n" +
            "    }\n" +
            "}";

查询

@Test
void selectIndex() throws IOException {
    GetIndexRequest request = new GetIndexRequest("items");
    //判断是否存在
    client.indices().exists(request, RequestOptions.DEFAULT);
}

删除

@Test
void deleteIndex() throws IOException {
    DeleteIndexRequest request = new DeleteIndexRequest("items");
    //判断是否存在
    client.indices().delete(request, RequestOptions.DEFAULT);
}

8.操作文档

@Test
void createIndex() throws IOException {
    //准备一个request
    IndexRequest request = new IndexRequest("items").id("1");
    //准备参数
    request.source("{\"name\":\"tom\",\"age\":\"18\"}",XContentType.JSON);
    //发送请求
    client.index(request,RequestOptions.DEFAULT);
}

查询

@Test
public void getIndex() throws IOException {
    //准备一个request
    GetRequest request = new GetRequest("items","1");
    //发送请求
    GetResponse response = client.get(request, RequestOptions.DEFAULT);
    //解析结果
    String sourceAsString = response.getSourceAsString();
    System.out.println(sourceAsString);
}

删除

@Test
public void changeIndex() throws IOException {
    //准备一个request
    IndexRequest request = new IndexRequest("items").id("1");
    //发送请求
    IndexResponse rep = client.index(request, RequestOptions.DEFAULT);
    //解析结果
    System.out.println(rep);
}

全量更新

@Test
public void changeIndex() throws IOException {
    //准备一个request
    IndexRequest request = new IndexRequest("items").id("1");
    request.source(MAPPING_TEMPLATE, XContentType.JSON);
    IndexResponse rep = client.index(request, RequestOptions.DEFAULT);
    
    System.out.println(rep);
}

局部更新

@Test
public void updateIndex() throws IOException {
    //准备一个request
    UpdateRequest request = new UpdateRequest("items","1");
    //发送请求
    request.doc(
        "name","zs",
        "age","46"
    );
    //解析结果
    client.update(request, RequestOptions.DEFAULT);
}

9.批处理

@Test
public void bulkIndex() throws IOException {
    //准备一个request
    BulkRequest bulkRequest = new BulkRequest();
    //发送请求
    bulkRequest.add(new IndexRequest("items","1").
                    source("{\"name\":\"tom\",\"age\":\"18\"}",XContentType.JSON));
    bulkRequest.add(new DeleteRequest("items","1"));
    //解析结果
    client.bulk(bulkRequest, RequestOptions.DEFAULT);
}

11.DSL操作

查询语法

GET /indexName/_search
{
	"query":{
		"查询类型":{
			"查询条件":条件值
		}
	}
}
//查询所有
GET /indexName/_search
{
	"query":{
		"match_all":{
		}
	}
}

1.叶子查询

  • 全文检索 full text

    • match_query
    • multi_match_query
  • 精确查询

    • ids
    • range
    • term
  • 地理geo查询 搜索地理位置

    • geo_distance
    • geo_bounding_box

match查询,对输入内容分词,然后查询

GET /indexName/_search
{
	"query":{
		"match":{
			"FILELD":"TEXT"
		}
	}
}

multi_match查询,支持同时查询多个字段,然后查询

GET /indexName/_search
{
	"query":{
		"multi_match":{
			"query":"TEXT",
			"fields":["FIELD1","FIELD2"]
		}
	}
}

term精确查询

GET /indexName/_search
{
	"query":{
		"term":{
			"fields":{
				"value":"values"
			}
		}
	}
}

range范围查询

GET /indexName/_search
{
	"query":{
		"range":{
			"fields":{
				"gte":value,
				"lte":value
			}
		}
	}
}

ids根据id查询

GET /indexName/_search
{
	"query":{
		"ids":{
			"values":["12","13"]
		}
	}
}

2.复合查询

基于逻辑运算组合叶子查询,实现组合条件

bool

基于算法修改查询时的文档相关性算分,从而改变文档排名

function_score

dis_max

  • must 与
  • should 或
  • must_not 非
  • filter 必须匹配,不参与算分
GET /items/_search
{
	"query":{
		"bool":{
			"must":[{"match":{"fileds":"value"}}],
			"filter":["term":{"fileds":"value"},"range":{"gte":"value","lte":"value"}]
		}
	}
}

3.排序和分页

排序sort

GET /indexName/_search
{
	"query":{
		"match_all":{}
	},
	"sort":{
		"FIELD":"DESC"  ASC、DESC
	}
}

分页

  • from 从第几个文档开始
  • size总共查询几个文档
GET /indexName/_search
{
	"query":{
		"match_all":{}
	},
	"from":0,
	"size":10,
	"sort":{
		"FIELD":"DESC"  ASC、DESC
	}
}

4.深度分页问题

GET /items/_search
{
	"query":{
		"match":{
			"filed":"value"
		}
	},
	"highlight":{
		"fields":{
			"fields":{
				"pre_tags":"<em>",
				"post_tags":"</em>"
			}
		}
	}
}