Redis 是 Remote Dictionary Server 的缩写,是一个开源的、基于内存的键值存储数据库。它不仅支持简单的键值对,还支持丰富的数据结构,如字符串、哈希、列表、集合、位图、HyperLogLogs等。同时,Redis 也支持持久化,能够将内存中的数据异步保存到磁盘上,是内存数据库中最具代表性的产品之一。

与传统数据库相比,Redis 的主要优势在于速度极快,它将数据存储在内存中,而不是硬盘,这使得数据读写操作的速度比传统数据库快了数百倍,适合需要快速响应的场景。

1.Redis的简介

Redis是一个基于内存的key-value结构数据库

  • 基于内存存储,读写性能高
  • 适合存储热点数据(热点商品,咨讯,新闻)

NoSQL(NoSQL = Not Only SQL ),意即“不仅仅是SQL”,

泛指非关系型的数据库。随着互联网web2.0网站的兴起,传统的关系数据库在应付web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,暴露了很多难以克服的问题,而非关系型的数据库则由于其本身的特点得到了非常迅速的发展。NoSQL数据库的产生就是为了解决大规模数据集合多重数据种类带来的挑战,尤其是大数据应用难题,包括超大规模数据的存储。

https://redis.io

linux安装redis

  • yum install -y gcc tcl 安装依赖
  • tar -zxvf redis.tar.gz 解压
  • make && make install 编译安装
  • 安装目录 /usr/local/bin 在目录中启动redis-server

redis配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 是否在后台执行,yes:后台运行;no:不是后台运行(老版本默认)
daemonize yes

# 当Redis以上述守护进程方式运行时,Redis默认会把进程文件写入/var/run/redis.pid文件
pidfile /var/run/redis.pid

# 是否开启保护模式。如配置里没有指定bind和密码。开启该参数后,redis只允许本地访问,拒绝外部访问
# 要是开启了密码和bind,可以开启。否则最好关闭,设置为no。
protected-mode no

# Redis监听端口号,默认为6379,如果指定0端口,表示Redis不监听TCP连接
port 6379

#配置unix socket来让redis支持监听本地连接。
"redis.conf" 387L, 16784C 35,17 6%

# 是否在后台执行,yes:后台运行;no:不是后台运行(老版本默认)
daemonize yes

# 当Redis以上述守护进程方式运行时,Redis默认会把进程文件写入/var/run/redis.pid文件
pidfile /var/run/redis.pid


# 是否开启保护模式。如配置里没有指定bind和密码。开启该参数后,redis只允许本地访问,拒绝外部访问
# 要是开启了密码和bind,可以开启。否则最好关闭,设置为no。
protected-mode no

# 只允许来自bind指定网卡的Redis请求。如没有指定,则可以接受来自任意一个网卡的Redis请求
bind 127.0.0.1

# 是否将日志输出到系统日志

# 注释掉“save”这一行配置项就可以让保存数据库功能失效


# master检测到slave上次发送的时间超过repl-timeout,即认为slave离线,清除该slave信息。

# 注意:因为redis太快了,每秒可认证15w次密码,简单的很容易被攻破,最好使用一个更复杂的密码

# maxmemory <bytes>

# 但redis如果中途宕机,会导致可能有几分钟的数据丢失,按照上面save条件来策略进行持久化
# Append Only File是另一种持久化方式,可提供更好的持久化特性。
# 在AOF重写或写入rdb文件时,会执行大量IO
# AOF自动重写配置。当目前AOF文件大小超过上一次重写的aof文件大小的百分之多少进行重写
############################ LUA SCRIPTING(LUA 脚本) ###########################
# 只有SCRIPT KILL和SHUTDOWN NOSAVE可以用。第一个可以杀没有调write命令的东西。

############################ LUA SCRIPTING(LUA 脚本) ###########################
# 如果达到最大时间限制(毫秒),redis会记个log,然后返回error。当一个脚本超过了最大时限。
# 只有SCRIPT KILL和SHUTDOWN NOSAVE可以用。第一个可以杀没有调write命令的东西。
# 要是已经调用了write,只能用第二个命令杀。
lua-time-limit 5000

########################### REDIS CLUSTER(Redis集群) ###########################
# 集群开关,默认是不开启集群模式。
# cluster-enabled yes

# 集群配置文件的名称,每个节点都有一个集群相关的配置文件,持久化保存集群的信息。
# 这个文件无需手动配置,这个配置文件有Redis生成并更新,每个Redis集群节点需要一个单独的配置文件,# 请确保与实例运行的系统中配置文件名称不冲突
# cluster-config-file nodes-6379.conf

2.Redis的常用的数据类型

  • 字符串string 普通字符串
  • 哈希hash 散列,类似于Java的HashMap结构
  • 列表list 按照插入顺序排序,可以有重复元素
  • 集合set 无序集合,没有重复元素,类似Java的HashMap
  • 有序集合sorted set / zset 每个元素关联一个分数,根据分数升序排序,没有重复元素

3.Redis常用的操作

1.Redis通用操作

SELECT index

SELECT 1 切换到一号redis数据库

KEYS pattern

KEYS * 匹配所有

KEYS *name 匹配以name结尾的

DEL key 返回删除个数

DEL name 删除name

DEL k1 k2 k3 k4 k5 删除多个

EXISTS key 存在1 不存在为0

EXISTS num 是否存在num

EXPIRE key second

EXPIRE test 10 设置test存在20秒

TTL key -2为不存在 -1为永久存储 其他的为剩余时间

TTL test 查看test的剩余时间

1.string操作

SET key value 设置指定key的值

SET name rc

GET key 获取指定key的值

GET name

SETEX key seconds value 设置指定key的value,并将key的过期时间设置为seconds秒

SETEX name 20 rc

SETNX key value 只有key不存在时才设置

SETNX key age

INCR key 自增1

INCR age

INCRBY key num 让key增加num,num可以为负数

INCRBY age -1

INCRBYFLOAT key num 让key增加num,num可以为负数

INCRBYFLOAT height 0.1

2.哈希操作

HSET key field value 设置指定字段的值

HGET key field 获取指定字段的值

HDEL key field 删除指定字段

HKEYS key 获取哈希表的所有的字段

HVALS key 获取哈希表的所有的值

3.列表操作命令

LPUSH key value1 [value2] 将一个或者多个值插入到列表头部

LPUSH mylist 1 2 3

RPUSH key value1 [value2] 将一个或者多个值插入到列表尾部

RPUSH mylist 4 5 6

LRANGE key start stop 获取列表指定范围内的元素

LRANGE mylist 1 2

RPOP key count 移除并获取列表最后count个元素

RPOP mylist 1

LPOP key count 移除并获取列表前count个元素

LPOP mylist 1

LLEN key 获取列表长度

LLEN mulist

BLPOP key timeout 如果列表为空则等待timeout,否则获取列表头元素

BLPOP mylist 20

BRPOP key timeout 如果列表为空则等待timeout,否则获取列表尾元素

BRPOP mylist 20

4.集合操作命令

SADD key member1 [member2] 添加成员

SADD myset 1 2 3 4

SMEMBERS key 返回所有成员

SMEMBERS myset

SCARD key 获取集合的成员数

SCARD myset

SINTER key1 [key2] 返回集合的交集

SDIFF key1 [key2] 返回集合的差集

SUNION key1 [key1] 返回集合的并集

SREM key member1 [member2] 删除集合中一个或多个成员

SISMEMBER key value 判断集合是否有value

SISMEMBER myset 1

5.有序集合

SortedSet

可排序的set,元素不能重复,查询速度快

ZADD key score1 member1 [score2 member2] 添加成员

ZADD ss 85 Alice 92 Bob 78 Cindy 88 Dave 95 Eve

ZRANGE key min max 按照排名排序

ZRANGE m 0 2
ZREVRANGE key min max 查询倒数几名

ZREVRANGE m 0 2

ZRANK ket member 获取sorted set中的指定元素的排名

ZRANK ss Alice

ZINCRBY key increment member 有序集合中对指定成员的分数加上增量increment

ZINCRBY ss 2 Alice

SREM key member1 [member2 …] 删除集合中一个或多个成员

ZSCORE key member 获取指定元素的score值

ZCOUNT key min max 统计大于min小于max的人数

ZDIFF,ZINTER,ZUNION 求差集,交集,并集

ZRANGEBYSCORE key min max 获取位于min-max的score的属性值

ZRANGEBYSCORE m 0 80

4.Jedis

1.Jedis基本使用

引入依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class JedisTest {
private Jedis jedis;
@Before
public void setUp(){
jedis = new Jedis("127.0.0.1",6379);
//jedis.auth(""); 验证密码
jedis.select(0); //选择库 默认0
}

@Test
public void testString(){
String result = jedis.set("name","richu");
System.out.println("result:" + result);
String name = jedis.get("name");
System.out.println("name:" + name);
}

@Test
public void testHash(){
jedis.hset("rc:server","name","rc");
jedis.hset("rc:server","age","32");

String name = jedis.hget("rc:server", "name");
String age = jedis.hget("rc:server", "age");
System.out.println("name:"+name+","+"age:"+age);
}

@After
public void resume(){
if (jedis != null){
jedis.close();
}
}
}

2.Jedis连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class RedisClient {
private static JedisPool jedisPool;

static {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(8); //最大等待
jedisPoolConfig.setMinIdle(0); //最小等待
jedisPoolConfig.setMaxTotal(8); //最大连接
jedisPoolConfig.setMaxWait(Duration.of(1000, TimeUnit.MILLISECONDS.toChronoUnit()));
jedisPool = new JedisPool(jedisPoolConfig,"127.0.0.1",6379,2000);
}

public static Jedis getJedis(){
return jedisPool.getResource();
}
}

5.SpringDataRedis

1.基本模版

1.导入坐标

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

2.配置redis数据源

1
2
3
4
5
6
spring:
redis:
host: localhost
port: 6379
password:
database: 0

3.新建配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
@Slf4j
public class RedisConfiguration {

@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory connectionFactory){
//创建redisTemplate对象
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
//设置key的序列化
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
//设置value的序列化
redisTemplate.setValueSerializer(RedisSerializer.string());
redisTemplate.setHashValueSerializer(RedisSerializer.string());
return redisTemplate;
}
}

4.获取操作对象

1
2
3
4
5
ValueOperations valueOperations = redisTemplate.opsForValue();
ListOperations listOperations = redisTemplate.opsForList();
HashOperations hashOperations = redisTemplate.opsForHash();
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
SetOperations setOperations = redisTemplate.opsForSet();

2.操作string

1
2
3
4
5
6
7
@Test
public void test1() {
redisTemplate.opsForValue().set("city","纽约");
System.out.println(redisTemplate.opsForValue().get("city"));
redisTemplate.opsForValue().set("token","123", 3, TimeUnit.MINUTES);
redisTemplate.opsForValue().setIfAbsent("city","华盛顿");
}

3.操作hash

1
2
3
4
5
6
7
8
HashOperations hashOperations = redisTemplate.opsForHash();
hashOperations.put("100","name","tom");
hashOperations.put("200","age","20");
String name = (String)hashOperations.get("100","name");
System.out.println(name);
hashOperations.delete("100","name");
Set keys = hashOperations.keys("100");
List values = hashOperations.values("100");

4.操作list

1
2
3
4
5
6
7
8
9
10
11
@Test
public void test3(){
ListOperations listOperations = redisTemplate.opsForList();
listOperations.leftPushAll("200","age","20");
listOperations.leftPush("200","name");

listOperations.range("200",1,2);

Long size = listOperations.size("200");
System.out.println(size);
}

5.操作集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void test4(){
SetOperations setOperations = redisTemplate.opsForSet();
setOperations.add("200","age","20");

Set members = setOperations.members("200");
System.out.println(members);

Set intersect = setOperations.intersect("key1", "key2");
System.out.println(intersect);

Set union = setOperations.union("key1", "key2");
System.out.println(union);

setOperations.remove("200","age","20");
}

6.操作有序集合

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void test5(){
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
zSetOperations.add("key1","a",10);

Set key1 = zSetOperations.range("key1", 0, -1);
key1.forEach(System.out::println);

zSetOperations.incrementScore("key1","a",9);

zSetOperations.remove("key1","a");
}

7.通用操作

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void test6(){
Set keys = redisTemplate.keys("key1");
keys.forEach(System.out::println);

Boolean key11 = redisTemplate.hasKey("key1");

DataType key1 = redisTemplate.type("key1");


redisTemplate.delete("key1");
}

8.StringRedisTemplate

使用stringRedisTemplate默认为string序列器,因此需要手动序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
@Autowired
StringRedisTemplate stringRedisTemplate;

private final ObjectMapper mapper = new ObjectMapper();
@Test
public void testString() throws JsonProcessingException {
User user = new User("1",2,"3");
//手动序列化
String ustring = mapper.writeValueAsString(user);
stringRedisTemplate.opsForValue().set("user:100",ustring);
User nuser = mapper.readValue(stringRedisTemplate.opsForValue().get("user:100"),User.class);
System.out.println("nuser = " + nuser);
}

6.Redis脚本

EVAL命令

Redis中使用EVAL命令来直接执行指定的Lua脚本。

1
EVAL luascript numkeys key [key ...] arg [arg ...]
  • EVAL 命令的关键字。
  • luascript Lua 脚本。
  • numkeys 指定的Lua脚本需要处理键的数量,其实就是 key数组的长度。
  • key 传递给Lua脚本零到多个键,空格隔开,在Lua 脚本中通过 KEYS[INDEX]来获取对应的值,其中1 <= INDEX <= numkeys
  • arg是传递给脚本的零到多个附加参数,空格隔开,在Lua脚本中通过ARGV[INDEX]来获取对应的值,其中1 <= INDEX <= numkeys

如果我们要执行redis.call(‘set’,’name’,’lucy’)这个脚本

EVAL "return redis.call('set','name','lucy')" 0

样例 EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 RC RICHU

JAVA执行redis脚本

1
2
3
4
5
6
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("script.lua"));
}
1
2
3
stringRedisTemplate.execute(UNLOCK_SCRIPT,
keyList,
argvList);

Redisson

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

配置类RedissonConfig.java

1
2
3
4
5
6
7
8
9
10
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("");
return Redisson.create(config);
}
}

使用

1
2
3
4
5
6
7
8
9
10
@Resource
private RedissonClient redissonClient;

RLock lock = redissonClient.getLock("key");
boolean b = lock.tryLock(1, 10, TimeUnit.SECONDS);
try{
//...
}finally {
lock.unlock();
}

解决主从不一致问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient1(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient2(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6380");
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient3(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6381");
return Redisson.create(config);
}
}

设置锁

1
2
3
4
5
RLock lock1 = redissonClient1.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");

redissonClient1.getMultiLock(lock1, lock2, lock3);

7 .Redis Pub/Sub

Redis提供了一组简单而强大的Pub/Sub命令,用于管理发布者和订阅者之间的消息传递。

1. 订阅相关命令

  • SUBSCRIBE:订阅一个或多个频道,接收发布到这些频道的消息。

    1
    2
    SUBSCRIBE channel [channel ...]
    1
  • PSUBSCRIBE:按模式订阅一个或多个频道,支持通配符。

    1
    2
    PSUBSCRIBE pattern [pattern ...]
    1
  • UNSUBSCRIBE:取消订阅一个或多个频道,或取消所有频道的订阅。

    1
    2
    UNSUBSCRIBE [channel [channel ...]]
    1
  • PUNSUBSCRIBE:取消按模式订阅一个或多个频道,或取消所有模式的订阅。

    1
    2
    PUNSUBSCRIBE [pattern [pattern ...]]
    1

2. 发布相关命令

  • PUBLISH:向指定频道发布一条消息,所有订阅了该频道的订阅者都会接收到这条消息。

    1
    PUBLISH channel message

3. Redis Pub/Sub的特点

  • 实时性高:消息一旦发布,订阅者几乎可以立即接收到,适用于需要即时响应的应用。
  • 简单易用:Redis提供了简洁的命令集,易于集成和使用。
  • 高效性能:由于Redis基于内存,Pub/Sub消息传递的延迟极低,吞吐量高。
  • 去中心化:发布者和订阅者不需要知道彼此的存在,通过频道进行消息传递,降低了系统耦合度。

4.缺点

  • 不支持数据持久化
  • 无法避免数据丢失

8.Stream

XADD

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD 语法格式:

1
XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

1
XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

1
XREVRANGE key end start [COUNT count]
  • key :队列名
  • end :结束值, + 表示最大值
  • start :开始值, - 表示最小值
  • count :数量

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

1
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量
  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key :队列名
  • id :消息 ID

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

1
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

XREADGROUP GROUP

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。

9.GEO

Redis GEO 主要用于存储地理位置信息,并对存储的信息进行操作,该功能在 Redis 3.2 版本新增。

Redis GEO 操作方法有:

  • GEOADD:添加地理位置的坐标。
  • GEOPOS:获取地理位置的坐标。
  • GEODIST:计算两个位置之间的距离。
  • GEORADIUS:根据用户给定的经纬度坐标来获取指定范围内的地理位置集合。
  • GEORADIUSBYMEMBER:根据储存在位置集合里面的某个地点获取指定范围内的地理位置集合。
  • GEOHASH:返回一个或多个位置对象的 geohash 值。

GEOADD

GEOADD用于存储指定的地理空间位置,可以将一个或多个经度(longitude)、纬度(latitude)、位置名称(member)添加到指定的 key 中。

GEOADD语法格式如下:

1
GEOADD key longitude latitude member [longitude latitude member ...]

GEOADD g1 116.407396 39.904252 bj

GEOADD g1 121.473701 31.230416 sh

GEOADD g1 113.264356 23.129378 gz

GEOPOS

GEOPOS用于从给定的 key 里返回所有指定名称(member)的位置(经度和纬度),不存在的返回 nil。

GEOPOS语法格式如下:

1
GEOPOS key member [member ...]

GEOPOS g1 bj

GEODIST

GEODIST用于返回两个给定位置之间的距离。

GEODIST语法格式如下:

1
GEODIST key member1 member2 [m|km|ft|mi]

member1 member2 为两个地理位置。

最后一个距离单位参数说明:

  • m :米,默认单位。
  • km :千米。
  • mi :英里。
  • ft :英尺。

GEODIST g1 bj sh

GEODIST g1 bj sh km

GEORADIUS、GEORADIUSBYMEMBER

GEORADIUS以给定的经纬度为中心, 返回键包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素。

GEORADIUSBYMEMBER和 GEORADIUS 命令一样, 都可以找出位于指定范围内的元素, 但是 georadiusbymember 的中心点是由给定的位置元素决定的, 而不是使用经度和纬度来决定中心点。

GEORADIUS与 GEORADIUSBYMEMBER语法格式如下:

1
2
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]

参数说明:

  • m :米,默认单位。
  • km :千米。
  • mi :英里。
  • ft :英尺。
  • WITHDIST: 在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。
  • WITHCOORD: 将位置元素的经度和纬度也一并返回。
  • WITHHASH: 以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。 这个选项主要用于底层应用或者调试, 实际中的作用并不大。
  • COUNT 限定返回的记录数。
  • ASC: 查找结果根据距离从近到远排序。
  • DESC: 查找结果根据从远到近排序。

GEOHASH

Redis GEO 使用 GEOHASH来保存地理位置的坐标。

GEOHASH用于获取一个或多个位置元素的 GEOHASH值。

GEOHASH语法格式如下:

1
GEOHASH key member [member ...]

GEOHASH g1 bj

10.HyperLogLog

序号 命令及描述
1 [PFADD key element element …] 添加指定元素到 HyperLogLog 中。
2 [PFCOUNT key key …] 返回给定 HyperLogLog 的基数估算值。
3 [PFMERGE destkey sourcekey sourcekey …] 将多个 HyperLogLog 合并为一个 HyperLogLog

11.RDB

Redis数据备份文件,将redis的数据存储到硬盘中

RDB 持久化方式是 Redis 将当前内存中的数据快照(snapshot)保存到硬盘的过程。换句话说,Redis 会创建一个代表某一时刻的数据集的磁盘文件。

save 主进程执行备份,会阻塞运行

bgsave 子进程执行RDB,不会阻塞运行

1.触发RDB生成:

触发 RDB 文件的生成有以下两种方式:

手动触发:通过执行 SAVE 或 BGSAVE 命令。

自动触发:基于 Redis 配置文件中的 save 指令设置的条件。(默认是通过 BGSAVE 命令来触发的)

1
2
3
4
redis 配置文件 save 指令设置: 
save 3600 1 # 3600秒内如果超过1个key被修改则生成 RDB
save 300 100 # 300秒内如果超过100个key被修改则生成 RDB
save 60 10000 # 60秒内如果超过10000个key被修改则生成 RDB

2.创建子进程:

当执行 BGSAVE 命令时,Redis 主进程(父进程)会执行 fork 操作来创建一个子进程。这是一个昂贵的操作,尤其当数据集很大时。但好处是,一旦 fork 完成,父进程可以立即返回处理其他客户端请求,而不需要等待 RDB 的生成过程。

这里涉及到操作系统级别的知识。当 fork 操作执行后,子进程会获得父进程内存中的数据副本。但由于操作系统使用写时复制(Copy-On-Write, COW)技术,任何在父进程(Redis主进程)上发生的写操作不会影响子进程中的数据。这确保了子进程中的数据是隔离的,不受父进程中数据更改的影响。

3.子进程生成RDB文件:

子进程将开始遍历整个数据集,将所有的数据写入一个新的临时RDB文件。这是一个纯I/O操作,并且是线性的,所以非常快。子进程不需要处理任何客户端请求,只专注于写 RDB 文件,所以效率很高。

注意 :线性指的是数据在磁盘上是连续写入的。

4.RDB文件替换

一旦子进程完成了新的 RDB 文件的写入,它会替换掉旧的 RDB 文件,并发送一个信号通知父进程任务完成。然后子进程退出。

当 Redis 重新启动时,如果配置为使用 RDB 持久化,它会查找 RDB 文件,并加载它。由于 RDB 文件是一个紧凑的二进制表示形式,数据加载非常快。

5.RDB 配置详解

Redis 默认是不会开启持久化选项的,只要重启 redisredis 之前保存的数据都会丢失的。因此在实际的生产环境中,我们都会配置 Redis 的 RDB 配置。

redis.conf 中关于 RDB 的所有配置:

1
2
3
4
5
6
7
8
save <seconds> <changes>
stop-writes-on-bgsave-error
rdbcompression
rdbchecksum
sanitize-dump-payload
dbfilename
rdb-del-sync-files
dir

12.AOF

全量备份总是耗时的,有时候我们提供一种更加高效的方式AOF,工作机制很简单,redis会将每一个收到的写命令都通过write函数追加到文件中。通俗的理解就是日志记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# AOF持久化三种同步策略:
# 1) no:不同步(不执行fsync),数据不会持久化
# 2) always:每次有数据发生变化时都会写入appendonly.aof(慢,安全)
# 3) everysec:每秒同步一次到appendonly.aof,可能会导致丢失这1s数据(折中选择,默认值)
appendonly no


# 指定aof文件名,默认为appendonly.aof
# appendfilename appendonly.aof

# 在AOF重写或写入rdb文件时,会执行大量IO
no-appendfsync-on-rewrite no

# AOF自动重写配置。当目前AOF文件大小超过上一次重写的aof文件大小的百分之多少进行重写
# 即当AOF文件增长到一定大小时,Redis能调用bgrewriteaof对日志文件进行重写。
# 当前AOF文件大小是上次日志重写得到AOF文件大小的二倍(设置为100)时,自动启动新的日志重写过程。
auto-aof-rewrite-percentage 100


# 设置允许重写的最小AOF文件大小,避免了达到约定百分比但尺寸仍然很小的情况还要重写
auto-aof-rewrite-min-size 64mb

aof-load-truncated yes

12.搭建主从Redis

修改配置文件的bind

./redis-server ../redis.conf 启动redis

slaveof <masterip> <masterport> 或者 replicaof <masterip> <masterport>

slaveof 192.168.3.133 6380

slaveof 192.168.3.133 6381

slaveof on one 取消成为slave节点

查看主从节点 info replication

配置sentinel

vim redis-sentinel-6381.conf

1
2
3
4
5
6
7
8
port 26379
daemonize yes
logfile "26379.log"
dir /usr/local/redis/redis03
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-millisenconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000

./redis-sentinel ../redis-sentinel-6381.conf

13.Java客户端使用主从redis

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

application.yaml

1
2
3
4
5
6
7
8
spring:
redis:
sentinel:
master: mymaster # 主节点名称
nodes:
- 192.168.3.133:6379
- 192.168.3.133:6380
- 192.168.3.133:6381

application.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
public RedisConnectionFactory lettuceConnectionFactory(RedisProperties redisProperties) {
// 配置哨兵节点以及主节点
RedisSentinelConfiguration redisSentinelConfiguration = new RedisSentinelConfiguration(
redisProperties.getSentinel().getMaster(), new HashSet<>(redisProperties.getSentinel().getNodes())
);

// 配置读写分离
LettucePoolingClientConfiguration lettuceClientConfiguration = LettucePoolingClientConfiguration.builder()
// 读写分离,这里的ReadFrom是配置Redis的读取策略,是一个枚举,包括下面选择
// MASTER 仅读取主节点
// MASTER_PREFERRED 优先读取主节点,如果主节点不可用,则读取从节点
// REPLICA_PREFERRED 优先读取从节点,如果从节点不可用,则读取主节点
// REPLICA 仅读取从节点
// NEAREST 从最近节点读取
// ANY 从任意一个从节点读取
.readFrom(ReadFrom.REPLICA_PREFERRED)
.build();

return new LettuceConnectionFactory(redisSentinelConfiguration, lettuceClientConfiguration);
}
}