这篇文章上次修改于 193 天前,可能其部分内容已经发生变化,如有疑问可询问作者。
Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的开发便利性简化了分布式系统的开发,比如服务发现、服务网关、服务路由、链路追踪等。Spring Cloud 并不重复造轮子,而是将市面上开发得比较好的模块集成进去,进行封装,从而减少了各模块的开发成本。换句话说:Spring Cloud 提供了构建分布式系统所需的“全家桶”。
1.nacos 1.引入依赖
你需要下载Nacos的安装包,你可以从Nacos的官方网站上下载:https://github.com/alibaba/nacos/releases
2.配置文件
父工程:
1 2 3 4 5 6 7 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-alibaba-dependencies</artifactId > <version > 2.2.5.RELEASE</version > <type > pom</type > <scope > import</scope > </dependency >
客户端:
1 2 3 4 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency >
配置nacos地址
1 2 3 4 5 6 string: application: iname:item-service cloud: nacos: server-addr:192.168.3.133:8848
1 2 3 4 5 6 7 8 9 PREFER_HOST_MODE=hostname MODE=standalone SPRING_DATASOURCE_PLATFORM=mysql MYSQL_SERVICE_HOST=192.168.3.133 //配置为自己的ip MYSQL_SERVICE_DB_NAME=nacos MYSQL_SERVICE_PORT=3306 MYSQL_SERVICE_USER=root MYSQL_SERVICE_PASSWORD=123 MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
1 docker run -d --name nacos --env-file /root/nacos/custom.env -p 8848:8848 -p 9848:9848 -p 9849:9849 --restart=always nacos/nacos-server:v2.1.1-slim
2.openFeign 1.引入依赖
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency >
2.通过@EnableFeignCilents注解
1 2 3 4 5 6 7 8 9 10 11 12 13 import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.openfeign.EnableFeignClients;@EnableFeignClients @MapperScan("com.cartService.mapper") @SpringBootApplication public class CartServiceApplication { public static void main (String[] args) { SpringApplication.run(CartServiceApplication.class, args); } }
3.编写FeignCilent
1 2 3 4 5 6 7 8 9 10 11 12 import com.cartService.domain.dto.ItemDTO;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import java.util.List;@FeignClient("item-service") public interface itemClient { @GetMapping("/items") List<ItemDTO> queryItemByIds (@RequestParam("ids") List<Long> ids) ; }
4.使用FeignCilent,实现远程调用
1 2 3 private final itemClient itemClient;List<ItemDTO> items = itemClient.queryItemByIds(itemIds);
3.OpenFeign整合OkHttp 1.引入依赖
1 2 3 4 <dependency > <groupId > io.github.openfeign</groupId > <artifactId > feign-okhttp</artifactId > </dependency >
2.开启连接池
1 2 3 feign: okhttp: enabled: true
4.最佳实践
多个微服务使用相同的cilent,可能需要重复写多遍,如果此时更改,那么全部都要更改
1.新建一个模块richu-api 引入openFeign的依赖
2.配置dto和client
3.在其他模块引入
1 2 3 4 5 <dependency > <groupId > com.richu</groupId > <artifactId > richu-api</artifactId > <version > 1.0.0</version > </dependency >
4.指定包名
1 @EnableFeignClients(basePackages = "com.hmall.api.client")
5.日志输出
openFeign只有在日志级别为Debug才会进行日志输出
None,不记录任何值,默认值
BASIC 仅记录请求的方法,URL和响应状态码和执行时间
Headers:在basic的基础上,额外记录请求和响应的头信息
Full:记录所有的请求和响应的明细,包括头信息,请求体,元数据
1 2 3 4 5 6 public class DefaultFeignConfig { @Bean public Logger.Level feignLoggerLevel () { return Logger.Level.FULL; } }
局部配置,在@FeignClient注解中声明
1 2 3 4 5 6 @FeignClient(value = "item-service",configuration = DefaultFeignConfig.class) public interface ItemClient { @GetMapping("/items") List<ItemDTO> queryItemByIds (@RequestParam("ids") Collection<Long> ids) ; }
全局配置,在@EnableFeignClients注解中声明
1 2 3 4 5 6 7 8 @EnableFeignClients(basePackages = "com.hmall.api.client",defaultConfiguration = DefaultFeignConfig.class) @MapperScan("com.cartService.mapper") @SpringBootApplication public class CartServiceApplication { public static void main (String[] args) { SpringApplication.run(CartServiceApplication.class, args); } }
6.网关 1.使用
网络 关口,负责请求的路由、转发、身份校验
1.创建新模块
2.引入网关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency > </dependencies >
3.编写启动类
1 2 3 4 5 6 @SpringBootApplication public class GatewayApplication { public static void main (String[] args) { SpringApplication.run(GatewayApplication.class,args); } }
4.配置路由规则
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 spring: application: name:gateway cloud: nacos: discovery: server-addr: 192.168 .3 .1 :8848 gateway: routes: - id: item-service //路由id 自定义 唯一 uri: lb://item-service //目标微服务 lb代表负载均衡 predicates: //路由断言 - Path=/items/** //请求路径做判断 - id: cart-service uri: lb://cart-service predicates: - Path=/carts/**
2.路由属性
id 路由唯一标识
uri路由目标地址
路由断言
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 After:在指定时间之后进行路由 Before:在指定时间之前进行路由 Between:在指定时间之间进行路由 Cookie 配置说明:【Cookie=cookie名, cookie值的正则表达式规则】 Header 配置说明:【Header=header名, header值的正则表达式规则】 Host 配置说明:【Host=主机名(可配置多个,也可以使用通配符)】 Method 配置说明:【Method=请求类型】 Path 配置说明:【Path=请求路径】 Query 配置说明:【Query=参数名,参数值】
路由过滤器
3.自定义过滤器 GateWayFilter:路由过滤器,作用于任意指定的路由
GlobalFilter:全局过滤器,作用范围是所有路由,声明后自动生效
1 2 3 4 public interface GlobalFilter { Mono<void > filter (ServerWebExchange exchange,GatewayFilterChain chain) }
3.1.GlobalFilter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class GlobalFilters implements GlobalFilter , Ordered { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); HttpHeaders headers = request.getHeaders(); System.out.println("headers = " +headers); return chain.filter(exchange); } @Override public int getOrder () { return 0 ; } }
3.2GateWayFilter 1 2 3 4 5 6 7 8 9 10 11 12 @Component public class GateWayFilter extends AbstractGatewayFilterFactory { @Override public GatewayFilter apply (Object config) { return new GatewayFilter () { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange); } }; } }
装饰类可以指定顺序
1 2 3 4 5 6 7 8 9 10 11 12 @Component public class GateWayFilter extends AbstractGatewayFilterFactory { @Override public GatewayFilter apply (Object config) { return new OrderedGatewayFilter (new GatewayFilter () { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange); } },1 ); } }
实现登录校验
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.richu.gateway.filters;import cn.hutool.core.text.AntPathMatcher;import com.hmall.common.exception.UnauthorizedException;import com.richu.gateway.config.AuthProperties;import com.richu.gateway.util.JwtTool;import lombok.RequiredArgsConstructor;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.core.Ordered;import org.springframework.http.HttpHeaders;import org.springframework.http.HttpStatus;import org.springframework.http.server.reactive.ServerHttpRequest;import org.springframework.http.server.reactive.ServerHttpResponse;import org.springframework.stereotype.Component;import org.springframework.web.server.ServerWebExchange;import reactor.core.publisher.Mono;import java.util.List;@Component @RequiredArgsConstructor public class GlobalFilters implements GlobalFilter , Ordered { private final AuthProperties authProperties; private final JwtTool jwtTool; private AntPathMatcher antPathMatcher = new AntPathMatcher (); private String token; @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); if (isExclude(request.getPath().toString())){ return chain.filter(exchange); } List<String> hearders = request.getHeaders().get("Authorization" ); if (hearders != null && !hearders.isEmpty()) { token = hearders.get(0 ); } Long userid = null ; try { userid = jwtTool.parseToken(token); }catch (UnauthorizedException e){ ServerHttpResponse response = exchange.getResponse(); response.setStatusCode(HttpStatus.UNAUTHORIZED); return response.setComplete(); } System.out.println(userid); return chain.filter(exchange); } private boolean isExclude (String path) { for (String pathPattern: authProperties.getExcludePaths()){ if (antPathMatcher.match(path,pathPattern)){ return true ; } } return false ; } @Override public int getOrder () { return 0 ; } }
4.网关传递信息到微服务 1 2 3 4 5 6 String userinfo = userid.toString();ServerWebExchange swe = exchange.mutate() .request(builder -> builder.header("user-info" , userinfo)).build(); return chain.filter(swe);
在微服务的common模块拦截器中获取,这样在其他业务就可以直接使用
1.定义拦截器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class UseInfoInterceptor implements HandlerInterceptor { @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String userinfo = request.getHeader("user-info" ); if (StrUtil.isBlankIfStr(userinfo)){ UserContext.setUser(Long.parseLong(userinfo)); } return true ; } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { HandlerInterceptor.super .afterCompletion(request, response, handler, ex); } }
2.配置类
1 2 3 4 5 6 7 @Configuration public class MvcConfig implements WebMvcConfigurer { @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(new UseInfoInterceptor ()); } }
3.配置
1 2 3 4 org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.hmall.common.config.MyBatisConfig,\ com.hmall.common.config.MvcConfig,\ com.hmall.common.config.JsonConfig
MvcConfig类会被其他没有SpringMvc的环境加载,此时会报错
1 2 @ConditionalOnClass(DispatcherServlet.class)
7.配置管理 1.配置热更新
减少重复配置,简化开发
nacos –> 配置管理
将yaml配置文件移到nacos配置
1 2 3 4 5 6 7 8 9 10 11 //nacos项 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > pring-cloud-starter-alibaba-nacos-config</artifactId > </dependency > //bootstrap配置 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-bootstrap</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 spring: cloud: nacos: server-addr:192.168.3.133 config: file-extension:yaml sgared-configs: --data-id:shared-jdbc.yaml --data-id:shared-log.yaml --data-id:shared-swagger.yaml
配置热更新
无需重启即可使配置重新生效
前提条件
1.nacos
1 2 3 4 5 6 @Data @Component @ConfigurationPreperties(prefix = "hm.cart") public class config { private Integer num; }
2.动态路由
监听到路由信息之后,可以利用RouteDefinitionWriter来更新路由表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package com.richu.gateway.routers;import cn.hutool.json.JSONUtil;import com.alibaba.cloud.nacos.NacosConfigManager;import com.alibaba.nacos.api.config.listener.Listener;import com.alibaba.nacos.api.exception.NacosException;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.cloud.gateway.route.RouteDefinition;import org.springframework.cloud.gateway.route.RouteDefinitionWriter;import org.springframework.stereotype.Component;import reactor.core.publisher.Mono;import javax.annotation.PostConstruct;import java.util.HashSet;import java.util.List;import java.util.Set;import java.util.concurrent.Executor;@Slf4j @Component @RequiredArgsConstructor public class DunamicRoute { private final NacosConfigManager nacosConfigManager; private final RouteDefinitionWriter writer; private final String dataId = "gateway-routes.json" ; private final String group = "DEFAULT_GROUP" ; private final Set<String> RouteIds = new HashSet <>(); @PostConstruct public void init () throws NacosException { String configInfo = nacosConfigManager.getConfigService().getConfigAndSignListener(dataId, group, 5000 , new Listener () { @Override public Executor getExecutor () { return null ; } @Override public void receiveConfigInfo (String configInfo) { updateConfigInfo(configInfo); } }); updateConfigInfo(configInfo); } public void updateConfigInfo (String config) { List<RouteDefinition> list = JSONUtil.toList(config, RouteDefinition.class); for (String ids : RouteIds) { writer.delete(Mono.just(ids)).subscribe(); } RouteIds.clear(); for (RouteDefinition routeDefinition : list) { writer.save(Mono.just(routeDefinition)).subscribe(); RouteIds.add(routeDefinition.getId()); } } }
将路由配置写入nacos的gateway-routes.json配置文件中
8.微服务保护和分布式事务
雪崩问题 微服务调用链的某个服务故障,导致整个链路的微服务都不可用
1.解决方案
请求限流:限制访问微服务的请求的并发量
线程隔离:舱壁模式
服务熔断:
由断路器统计请求的异常比例或慢调用比例,如果超出阈值则会熔断该业务,则拦截该接口的请求
Sentinel
Hystrix
隔离策略
信号量隔离
线程池隔离/信号量隔离
熔断降级策略
基于慢调⽤⽐例或异常⽐例
基于失败⽐率
实时指标实现
滑动窗⼝
滑动窗⼝(基于 RxJava)
规则配置
⽀持多种数据源
⽀持多种数据源
扩展性
多个扩展点
插件的形式
基于注解的⽀持
⽀持
⽀持
限流
基于 QPS,⽀持基于调⽤关系的限流
有限的⽀持
流量整形
⽀持慢启动、匀速排队模式
不⽀持
系统⾃适应保护
⽀持
不⽀持
控制台
开箱即⽤,可配置规则、查看秒级监控、机器发现等
不完善
常⻅框架的适配
Servlet、Spring Cloud、Dubbo、gRPC 等
Servlet、Spring Cloud Netflix
2.Sentinel 1.安装控制台 alibaba/Sentinel: A powerful flow control component enabling reliability, resilience and monitoring for microservices. (面向云原生微服务的高可用流控防护组件) (github.com)
启动
1 java -server -Xms64m -Xmx256m -Dserver.port=8090 -Dcsp.sentinel.dashboard.server=localhost:8090 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
访问
用户名和密码sentinel
[Sentinel Dashboard](http://localhost:8090/#/login)
2.引入依赖 1 2 3 4 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId > </dependency >
3.配置文件 1 2 3 4 5 spring: cloud: sentinel: transport: dashboard: localhost:8090
4.簇点链路
单机调用链路,是进入服务后被Sentinel监控的资源链,默认Sentinel只会监控SpringMvc的每一个Endpoint(http接口)
RestFul的API请求路径相同,这会导致簇点资源名称重复。
1 2 3 4 5 6 spring: cloud: sentinel: transport: dashboard: localhost:8090 http-method-specify: true
5.请求限流
点击流控,配置单机阈值
6.线程隔离
点击流控,选择并发线程数,设置单机阈值
7.Fallback 1 2 3 4 5 feign: sentinel: enabled: true okhttp: enabled: true
定义fallbackFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j public class ItemClientFallbackFactory implements FallbackFactory <ItemClient> { @Override public ItemClient create (Throwable cause) { return new ItemClient () { @Override public List<ItemDTO> queryItemByIds (Collection<Long> ids) { log.info("查询商品失败" ,cause); return null ; } }; } }
将该类定义为bean
1 2 3 4 @Bean public ItemClientFallbackFactory itemClientFallbackFactory () { return new ItemClientFallbackFactory (); }
写上fallbackFactory
1 2 3 4 5 6 @FeignClient(value = "item-service",fallbackFactory = ItemClientFallbackFactory.class) public interface ItemClient { @GetMapping("/items") List<ItemDTO> queryItemByIds (@RequestParam("ids") Collection<Long> ids) ; }
8.服务熔断
断路器统计服务调用的异常比例、慢请求比例,如果超出阈值就会熔断该服务。拦截该服务的一切请求;当服务恢复时,断路器会放行
3.分布式事务
分布式事务就是指事务的发起者、资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上。在上述转账的业务中,用户A-100操作和用户B+100操作不是位于同一个节点上。本质上来说,分布式事务就是为了保证在分布式场景下,数据操作的正确执行。
1.seata
seata事务管理有三个角色
TC:事务协调:维护全局和分支事务的状态,协调全局事务提交或者回滚
TM:事务管理器:定义全局事务的范围,开始全局事务提交和回滚全局事务
TC:资源管理器:管理分支事务,与TC交谈以注册分支事务和报告分支事务的状态
2.部署TC服务 1.导入数据库
2.导入配置文件
3.Docker部署
1 2 docker load -i seata-1.5.2.tar docker inspect mysql
1 docker run --name seata -p 8099:8099 -p 7099:7099 -e SEATA_IP=192.168.3.133 -v ./seata:/seata-server/resources --privileged=true --network richu -d seataio/seata-server:1.5.2
3.微服务整合seata 1.导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-bootstrap</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency >
2.在nacos里田家庵配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 seata: registery: type:nacos 注册中心nacos nacos: server-addr:nacos地址 namespace:"" group:DEFAULT_GROUP application:service username:nacos password:nacos tx-service-group:事务组名 service: vgroup-mapping: 事务名:"default"
4.XA模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 seata: registery: type:nacos 注册中心nacos nacos: server-addr:nacos地址 namespace:"" group:DEFAULT_GROUP application:service username:nacos password:nacos tx-service-group:事务组名 service: vgroup-mapping: 事务名:"default" data-source-proxy-mode:XA
入口声明为GlobalTransational
其他相关方法注解Transational
5.AT模式
seata主推的是AT模式
在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存
成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作
全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
1.将undo-log数据表添加到每一个微服务数据库中
因为seata默认使用at,所以可以不用配置
8.RabbitMQ
高性能异步通信组件
同步调用优点
时效性强,等到结果才返回
同步调用缺点
扩展性差
性能下降
级联失败问题
异步调用
消息发送者:投递消息的人,就是原来的调用者 Publisher
消费接收者:接收和处理消息的人,就是原来的服务提供者 Consumer
消息代理:管理、暂存、转发消息,类似与服务器 Broker
1.安装rabbitMQ
docker run -e RABBITMQ_DEFAULT_USER=richu -e RABBITMQ_DEFAULT_PASS=12345678 -v mq-plugins:/plugins –name mq –hostname mq -p 15672:15672 -p 5672:5672 –network richu -d rabbitmq:3.8-management
2.Java客户端的收发流程
消息通信协议AMQP,该协议与语言无关,更符合微服务中独立性的要求
1.创建queue 2.在父工程引入依赖 1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
3.配置yaml文件 1 2 3 4 5 6 7 spring: rabbitmq: host: 192.168 .3 .133 port: 5672 virtual-host: / username: richu password: 12345678
4.测试发送 1 2 3 4 5 6 7 8 9 10 11 12 13 @SpringBootTest class PublisherApplicationTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testqueue () { String queueName = "queue1" ; String Message = "hello mq" ; rabbitTemplate.convertAndSend(queueName,Message); } }
5.测试接收 1 2 3 4 5 6 7 8 9 @Slf4j @Component public class SpringRabiitListener { @RabbitListener(queues = "queue1") public void receive1 (String msg) { log.info("收到的消息为:{}" ,msg); } }
3.Work Queues
多个消费者监听一个队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j @Component public class SpringRabiitListener { @RabbitListener(queues = "queue1") public void receive1 (String msg) { log.info("1收到的消息为:{}" ,msg); } @RabbitListener(queues = "queue1") public void receive2 (String msg) { System.out.printf("2收到的消息为:%s\n" ,msg); } } @Test public void testqueue () { String queueName = "queue1" ; for (int i = 0 ; i < 10 ; i++) { String Message = "hahaha" + i; rabbitTemplate.convertAndSend(queueName,Message); } }
1 2 3 4 5 6 7 8 9 10 2收到的消息为:hahaha1 2收到的消息为:hahaha3 2收到的消息为:hahaha5 2收到的消息为:hahaha7 08-28 13:19:07:463 INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener : 1收到的消息为:hahaha0 2收到的消息为:hahaha9 08-28 13:19:07:464 INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener : 1收到的消息为:hahaha2 08-28 13:19:07:464 INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener : 1收到的消息为:hahaha4 08-28 13:19:07:464 INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener : 1收到的消息为:hahaha6 08-28 13:19:07:464 INFO 18392 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener : 1收到的消息为:hahaha8
同一个消息只能被一个消费者处理
默认情况为轮询,但不同设备的处理速度可能不同,从而导致慢的很忙,快的很闲
更改配置
1 2 3 4 5 6 7 8 9 10 spring: rabbitmq: host: 192.168 .3 .133 port: 5672 virtual-host: / username: richu password: 12345678 listener: direct: prefetch: 1
4.Fanout交换机exchange
交换机接收发送者发送的消息,并且将消息发送到绑定的队列
fanout—>广播
Direct —>定向
Topic –>话题
向交换机发送消息
1 2 3 4 5 6 7 @Test public void testqueue1 () { String queueName = "richu.fanout" ; String Message = "hahaha" ; rabbitTemplate.convertAndSend(queueName,null ,Message); }
接收消息
1 2 3 4 5 6 7 8 9 @RabbitListener(queues = "queue1") public void receive1 (String msg) { log.info("1收到的消息为:{}" ,msg); } @RabbitListener(queues = "queue2") public void receive2 (String msg) { log.info("2收到的消息为:{}" ,msg); }
结果:两个消费者都收到消息
1 2 08-28 14:06:18:718 INFO 2900 --- [ntContainer#1-1] c.i.consumer.mq.SpringRabiitListener : 1收到的消息为:hahaha 08-28 14:06:18:718 INFO 2900 --- [ntContainer#0-1] c.i.consumer.mq.SpringRabiitListener : 2收到的消息为:hahaha
声明两个队列,交换机绑定两个队列
5.Direct交换机
部分交换机收到消息
每一个队列和交换机都设置Bindingkey
发布者发消息的时候指消息的BingKey
交换机将消息路由到BingKey一致的队列
1 2 3 4 5 6 @Test public void testqueue1 () { String queueName = "richu.fanout" ; String Message = "我是消息" ; rabbitTemplate.convertAndSend(queueName,"blue" ,Message); }
6.Topic交换机
也是基于RouteKey,但是通常是多个单词的组合,以.做分割符
# 代表0或多个单词
*代表一个单词
chain.#
#.news
7.声明队列交换机
Queue :声明队列,用工厂类QueueBuilder构建
Exchange:声明交换机 用工厂类ExchangeBuilder构建
Binding:声明队列和交换机的绑定关系, 用工厂类BingdingBuilder构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public class FanoutConfiguration { @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange ("richu.exchange" ); } @Bean public Queue fanoutQueue () { return new Queue ("richu.queue" ); } @Bean public Binding fanoutBinding (Queue fanoutQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue).to(fanoutExchange); } }
SpringAMQP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @RabbitListener(bindings = @QueueBinding( value = @Queue(name="queue1",durable = "true"), exchange = @Exchange(name = "richu.fanout",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void receive1 (String msg) { log.info("1收到的消息为:{}" ,msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name="queue2",durable = "true"), exchange = @Exchange(name = "richu.fanout",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void receive2 (String msg) { log.info("2收到的消息为:{}" ,msg); }
8.消息转换器
使用Json的序列化代替JDK的转换器
1.引入依赖
1 2 3 4 <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-databind</artifactId > </dependency >
2.publish和consumer都要设置
1 2 3 4 5 6 7 8 9 10 11 @SpringBootApplication public class PublisherApplication { public static void main (String[] args) { SpringApplication.run(PublisherApplication.class); } @Bean public Jackson2JsonMessageConverter messageConverter () { return new Jackson2JsonMessageConverter (); } }
9.RabbitMQ高级 1.发送者重连
有时候由于网络波动可能会导致MQ丢失消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168 .3 .133 port: 5672 virtual-host: / username: richu password: 12345678 template: retry: enabled: true initial-interval: 100ms multiplier: 1 max-attempts: 3
2.发送者确认
发送者发消息给MQ时,MQ会返回确认结果返回给发送者
1.添加配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.3.133 port: 5672 virtual-host: / username: richu password: 12345678 listener: direct: prefetch: 1 publisher-confirm-type: correlated publisher-returns: true
publisher-confirm-type
none 关闭confirm机制
simple同步阻塞等待MQ的回执消息
correlate MQ异步回调方式返回回执消息
2.配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j @Configuration @RequiredArgsConstructor public class Mqconfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setReturnsCallback(returned -> { log.error("监听到了消息return" ); log.debug("exchange:{}" ,returned.getExchange()); log.debug("routingKey:{}" ,returned.getRoutingKey()); log.debug("message:{}" ,returned.getMessage()); log.debug("replyCode:{}" ,returned.getReplyCode()); log.debug("replyText:{}" ,returned.getReplyText()); }); } }
3.配置发送者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Test public void testqueue1 () { CorrelationData cd = new CorrelationData (UUID.randomUUID().toString()); cd.getFuture().addCallback(new ListenableFutureCallback <CorrelationData.Confirm>() { @Override public void onFailure (Throwable ex) { log.error("处理结果异常" ,ex); } @Override public void onSuccess (CorrelationData.Confirm result) { if (result.isAck()){ log.debug("收到了confirm ack" ); }else { log.error("收到了confirm nack !reson:{}" ,result.getReason()); } } }); String queueName = "richu.fanout" ; String Message = "我是消息" ; rabbitTemplate.convertAndSend(queueName,"blue" ,Message,cd); }
3.MQ的可靠性
一旦宕机内存中的所有数据消息都会丢失
内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,引发MQ阻塞
1 2 3 4 5 6 7 8 9 10 11 public void testqueue () { Message message = MessageBuilder.withBody("hello0" .getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); String queueName = "queue1" ; for (int i = 0 ; i < 100000 ; i++) { rabbitTemplate.convertAndSend(queueName,message); } }
4.Lazy Queue
只写入磁盘,消息持久化到磁盘
需要消息时才会从磁盘中读取并加载到内存
3.12后默认支持
4.消费者可靠性 1.消费者确认机制
ack 成功处理消息,MQ队列删除消息
nack 处理失败,MQ队列需要再次投递消息
reject拒绝接收,MQ队列删除消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168 .3 .133 port: 5672 virtual-host: / username: richu password: 12345678 template: retry: enabled: true initial-interval: 100ms multiplier: 1 max-attempts: 3 listener: direct: acknowledge-mode: auto
2.失败重试机制
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
1、RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
2、ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(不建议采用:会出现死循环)。
3、RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。(推荐使用)
1 2 3 4 5 6 7 8 9 10 listener: simple: prefetch: 1 acknowledge-mode: AUTO retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 stateless: true
RepublishMessageRecoverer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Configuration @RequiredArgsConstructor public class ErrorMessageConfig { private final RabbitTemplate rabbitTemplate; @Bean public DirectExchange directExchange () { return new DirectExchange ("error" ); } @Bean public Queue errorQueue () { return new Queue ("error" ); } @Bean public Binding bindingError (Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("error" ); } @Bean public RepublishMessageRecoverer messageConverter (RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer (rabbitTemplate,"error" ,"error" ); } }
5.业务幂等性
执行一个业务一次或者多次对业务状态的影响的一致
1.给每一个消息设置一个唯一id
发送者(不太推荐)
1 2 3 4 5 6 @Bean public MessageConverter messageConverter () { Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter (); jjmc.setCreateMessageIds(true ); return jjmc; }
2.业务判断(基于业务本身做判断)
6.延迟消息 1.死信交换机
消费者声明为消费失败,消息的requeue参数设置为false,就会成为死信
消息是一个过期消息
队列消息积满
通过dead-letter-exchange属性指定一个交换机,那么该队列的死信就会投递到这个交换机中
1 2 3 4 5 6 7 8 @RabbitListener(bindings = @QueueBinding( value = @Queue(name="dlx.queue",durable = "true"), exchange = @Exchange(name = "dlx.direct",type = ExchangeTypes.DIRECT), key = {"hi"} )) public void listenexchange (Message msg) { log.info("死信交换机和死信队列收到的消息为:{}" ,msg); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration @RequiredArgsConstructor public class NormalConfig { @Bean public DirectExchange normalExchange () { return new DirectExchange ("normal.exchange" ); } @Bean public Queue normalQueue () { return QueueBuilder.durable("normal.queue" ).deadLetterExchange("dlx.direct" ).build(); } @Bean public Binding bindingNormal ( Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("hi" ); } }
发送消息
1 2 3 4 5 6 7 @Test void testSendDelayMessage () { rabbitTemplate.convertAndSend("normal.exchange" ,"hi" ,"我是一只坤" ,message->{ message.getMessageProperties().setDelay(10000 ); return message; }); }
2.延迟消息插件
安装 docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1 2 3 4 5 6 7 8 @RabbitListener(bindings = @QueueBinding( value = @Queue(name="dlx.queue",durable = "true"), exchange = @Exchange(name = "dlx.direct",delayed = "true"), key = {"hi"} )) public void delayedexchange (Message msg) { log.info("死信交换机和死信队列收到的消息为:{}" ,msg); }
发送消息
1 2 3 4 5 6 7 @Test void testSendDelayMessage () { rabbitTemplate.convertAndSend("normal.exchange" ,"hi" ,"我是一只坤" ,message->{ message.getMessageProperties().setDelay(10000 ); return message; }); }
10.Elasticsearch 1.安装
1 2 3 4 5 6 7 8 9 docker run -d --name es -e ES_JAVA_OPTS="-Xms512m -Xmx512m" \ -e "discovery.type=single-node" \ -v es-data:/usr/share/elasticsearch/data \ -v es-plugins:/usr/share/elasticsearch/plugins \ --privileged \ --network richu \ -p 9200:9200 \ -p 9300:9300 \ elasticsearch:7.12.1
1 docker run -d --name es -e ES_JAVA_OPTS="-Xms512m -Xmx512m" -e "discovery.type=single-node" -v es-data:/usr/share/elasticsearch/data -v es-plugins:/usr/share/elasticsearch/plugins --privileged --network richu -p 9200:9200 -p 9300:9300 elasticsearch:7.12.1
1 docker run -d --name kibana -e ELASTICSEARCH_HOST=http://es:9200 --network=richu -p 5601:5601 kibana:7.12.1
2.倒排索引
文档:每条数据就是一个文档
词条:文档按照语义分成的词语
1 2 3 4 5 POST /_analyze { "analyzer":"standard", "test":"我爱学习,我每天学习" }
扩展分词器
IkAnalyzer.cfg.xml
1 <entry key="ext_dict">ext.dic</entry>
3.基础概念
索引库 : 相同类型的文档的集合
type 字段数据类型,常见的简单类型有:
字符串:text(可分词文本)、keyword(精确值、品牌、国家、ip地址)
数值:long 、integer、short、byte、double、float
布尔:boolean
日期:date
对象:object
index是否创建索引,默认为true
analyzer:使用哪种分词器
properties:该字段的子字段
4.索引库的CURD操作 1 2 3 4 5 6 7 8 9 10 11 PUT /索引库 { "mapping":{ "properties":{ "字段名":{ "type":"text", "analyzer":"ik_smart" } } } }
查询
删除
修改
1 2 3 4 5 6 7 8 PUT /修改的索引库/_mapping{ "properties":{ "新字段名":{ "type":"text", "analyzer":"ik_smart" } } }
5.文档的CURD
新增文档
1 2 3 4 5 6 POST /索引库/_doc/文档id { "字段1":"值1", "字段2":"值2", "字段3":"值3" }
查询
删除
修改(先删除再新增)(如果没有就会添加)
1 2 3 4 5 6 PUT /索引库/_doc/文档id { "字段1":"值1", "字段2":"值2", "字段3":"值3" }
增量修改
1 2 3 4 5 6 POST /索引库/_doc/文档id { "doc":{ "字段名":"值" } }
6.批处理请求
1 2 3 4 5 6 7 8 9 10 11 POST /_bulk //新增 {"index" : { "_index":"索引名","_id":"1"}} {"字段1":"值1","字段2":"值2"} //删除 {"delete":{"_index":"test","_id":2}} //修改 {"update":{"_id":"1","_index":"test"}} {"doc":{"field2":"value"}}
6.JavaRestClient 1 2 3 4 <dependency > <groupId > org.elasticsearch.client</groupId > <artifactId > elasticsearch-rest-high-level-client</artifactId > </dependency >
默认为7.17.10 覆盖版本
1 <elasticsearch.version>7.12.1</elasticsearch.version>
创建client
1 2 3 4 5 6 7 8 9 10 @BeforeEach void setUp () { client = new RestHighLevelClient (RestClient.builder(HttpHost.create("http://192.16.3.133:9200" ))); } @AfterEach void destrory () throws IOException { if (client != null ){ client.close(); } }
测试
1 2 3 4 @Test void test () { System.out.println("client" +client); }
7.操作索引库的API
新增
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Test void createIndex () throws IOException { CreateIndexRequest request = new CreateIndexRequest ("items" ); request.source(MAPPING_TEMPLATE, XContentType.JSON); client.indices().create(request, RequestOptions.DEFAULT); } private static final String MAPPING_TEMPLATE = "{\n" + " \"mappings\":{\n" + " \"properties\":{\n" + " \"id\":{\n" + " \"type\":\"keyword\"\n" + " },\n" + " \"name\":{\n" + " \"type\":\"keyword\",\n" + " \"analyzer\":\"ik_smart\"\n" + " },\n" + " \"price\":{\n" + " \"type\":\"keyword\",\n" + " \"index\":false\n" + " },\n" + " \"category\":{\n" + " \"type\":\"keyword\"\n" + " },\n" + " \"brand\":{\n" + " \"type\":\"keyword\"\n" + " },\n" + " \"sold\":{\n" + " \"type\":\"integer\"\n" + " },\n" + " \"commentCount\":{\n" + " \"type\":\"integger\",\n" + " \"index\":false\n" + " },\n" + " \"isAD\":{\n" + " \"type\":\"boolean\"\n" + " },\n" + " \"updateTime\":{\n" + " \"type\":\"date\"\n" + " }\n" + "\t }\n" + " }\n" + "}" ;
查询
1 2 3 4 5 6 @Test void selectIndex () throws IOException { GetIndexRequest request = new GetIndexRequest ("items" ); client.indices().exists(request, RequestOptions.DEFAULT); }
删除
1 2 3 4 5 6 @Test void deleteIndex () throws IOException { DeleteIndexRequest request = new DeleteIndexRequest ("items" ); client.indices().delete(request, RequestOptions.DEFAULT); }
8.操作文档 1 2 3 4 5 6 7 8 9 @Test void createIndex () throws IOException { IndexRequest request = new IndexRequest ("items" ).id("1" ); request.source("{\"name\":\"tom\",\"age\":\"18\"}" ,XContentType.JSON); client.index(request,RequestOptions.DEFAULT); }
查询
1 2 3 4 5 6 7 8 9 10 @Test public void getIndex () throws IOException { GetRequest request = new GetRequest ("items" ,"1" ); GetResponse response = client.get(request, RequestOptions.DEFAULT); String sourceAsString = response.getSourceAsString(); System.out.println(sourceAsString); }
删除
1 2 3 4 5 6 7 8 9 @Test public void changeIndex () throws IOException { IndexRequest request = new IndexRequest ("items" ).id("1" ); IndexResponse rep = client.index(request, RequestOptions.DEFAULT); System.out.println(rep); }
全量更新
1 2 3 4 5 6 7 8 9 @Test public void changeIndex () throws IOException { IndexRequest request = new IndexRequest ("items" ).id("1" ); request.source(MAPPING_TEMPLATE, XContentType.JSON); IndexResponse rep = client.index(request, RequestOptions.DEFAULT); System.out.println(rep); }
局部更新
1 2 3 4 5 6 7 8 9 10 11 12 @Test public void updateIndex () throws IOException { UpdateRequest request = new UpdateRequest ("items" ,"1" ); request.doc( "name" ,"zs" , "age" ,"46" ); client.update(request, RequestOptions.DEFAULT); }
9.批处理 1 2 3 4 5 6 7 8 9 10 11 @Test public void bulkIndex () throws IOException { BulkRequest bulkRequest = new BulkRequest (); bulkRequest.add(new IndexRequest ("items" ,"1" ). source("{\"name\":\"tom\",\"age\":\"18\"}" ,XContentType.JSON)); bulkRequest.add(new DeleteRequest ("items" ,"1" )); client.bulk(bulkRequest, RequestOptions.DEFAULT); }
11.DSL操作
查询语法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 GET /indexName/_search { "query":{ "查询类型":{ "查询条件":条件值 } } } //查询所有 GET /indexName/_search { "query":{ "match_all":{ } } }
1.叶子查询
全文检索 full text
match_query
multi_match_query
精确查询
地理geo查询 搜索地理位置
geo_distance
geo_bounding_box
match查询,对输入内容分词,然后查询
1 2 3 4 5 6 7 8 GET /indexName/_search { "query":{ "match":{ "FILELD":"TEXT" } } }
multi_match查询,支持同时查询多个字段,然后查询
1 2 3 4 5 6 7 8 9 GET /indexName/_search { "query":{ "multi_match":{ "query":"TEXT", "fields":["FIELD1","FIELD2"] } } }
term精确查询
1 2 3 4 5 6 7 8 9 10 GET /indexName/_search { "query":{ "term":{ "fields":{ "value":"values" } } } }
range范围查询
1 2 3 4 5 6 7 8 9 10 11 GET /indexName/_search { "query":{ "range":{ "fields":{ "gte":value, "lte":value } } } }
ids根据id查询
1 2 3 4 5 6 7 8 GET /indexName/_search { "query":{ "ids":{ "values":["12","13"] } } }
2.复合查询
基于逻辑运算组合叶子查询,实现组合条件
bool
基于算法修改查询时的文档相关性算分,从而改变文档排名
function_score
dis_max
must 与
should 或
must_not 非
filter 必须匹配,不参与算分
1 2 3 4 5 6 7 8 9 GET /items/_search { "query":{ "bool":{ "must":[{"match":{"fileds":"value"}}], "filter":["term":{"fileds":"value"},"range":{"gte":"value","lte":"value"}] } } }
3.排序和分页
排序sort
1 2 3 4 5 6 7 8 9 GET /indexName/_search { "query":{ "match_all":{} }, "sort":{ "FIELD":"DESC" ASC、DESC } }
分页
from 从第几个文档开始
size总共查询几个文档
1 2 3 4 5 6 7 8 9 10 11 GET /indexName/_search { "query":{ "match_all":{} }, "from":0, "size":10, "sort":{ "FIELD":"DESC" ASC、DESC } }
4.深度分页问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 GET /items/_search { "query":{ "match":{ "filed":"value" } }, "highlight":{ "fields":{ "fields":{ "pre_tags":"<em>", "post_tags":"</em>" } } } }