0%

Redis

Redis特点

NoSQL特点

  1. 大数据高性能(Redis一秒写8w次,读11w次,细粒度的关村,性能较高)
  2. 数据类型多样性,不需要实现设计数据库,随取随用,键值对存储,列存储,文档存储,图形数据库
  3. 方便扩展(数据之间没有关系,容易扩展)
  4. 没有固定查询语言
  5. 最终一致性
  6. CAP定理和BASE (异地多活)

大数据特点

  • 3V:描述问题
    • 海量Volume
    • 多样Variety
    • 实时Veiocity
  • 3高:对程序的要求
    • 高并发
    • 高可用(随时水平拆分)
    • 高性能(保证用户使用)

NoSQL分类

  • KV键值对:Redis

  • 文档型数据库:

    • MongoDB:分布式文件存储的数据库,c++变编写,一个介于关系型和非关系型之间的产品
    • CouchDB
  • 列存储数据库:HBase

  • 图关系数据库:Neo4j,InfoGrid

    用来存储对象之间的关系网图,如社交关系网

Redis:Remote DIctionary Server

特点

  • 内存存储,持久化(rdb,aof)
  • 效率高,用于高速存储
  • 发布订阅信息
  • 地图信息分析
  • 计时器、计数器(浏览量等)
  • Redis的常见用途:数据库缓存消息中间件MQ

为什么Redis是单线程的?

Redis给予内存操作,Redis的性能瓶颈不是CPU,而是机器的内存和网络带宽,既然可以使用单线程来实现,就没必要用多线程

  • 避免线程上下文切换开销
  • 避免线程同步机制的开销
  • 如果是多线程模型就需要设计底层线程安全的数据结构,这会让redis更加复杂

Redis是多线程吗?

  • Redis4.0(引入多线程处理异步任务)
  • Redis6.0(在网络模型中实现多线程IO)

一般讨论的单线程Redis一般指Redis6.0之前的单线程多路复用网络模型

但Redis6.0执行实际任务仍然是单线程,除非是非阻塞命令,如:UNLINK, FLUSHALL ASYNC, FLUSHDB ASYNC

Redis配置

安装和运行

Redis官方推荐在linux上进行redis的部署,github的windows版本已经停更许久

redis默认端口号6379

操作流程:

  1. 直接解压redis-6.2.5.tar.gz

  2. 安装依赖yum install gcc-c++

    gcc -v检查版本

  3. redis目录中运行make

  4. redis默认不是后台启动的,需要修改配置文件,我们可以将make文件同级目录的redis.conf文件拷贝一份到我们指定的目录,以防止错误配置报错,同时对其进行修改,开启守护进程模式

    image-20210920153359292

  5. redis默认指令在make目录下的src目录,进入后输入指令,指定配置文件并运行redis服务器

    1
    ./redis-server ../redis.conf #后面的参数使我们自己的配置文件路径
  6. 启动客户端

    1
    ./redis-cli [-h 主机ip] -p 6379 #默认主机ip为本机,可以不写
  7. 客户端测试连接ping,如果出现PONG的返回提示即运行成功

  8. 查看进程ps -ef|grep redis

    image-20210920153854917

  9. 客户端关闭服务shutdown

测试性能

redis-benchmark:压力测试工具

指令示例

1
2
#100个并发客户端 100000条请求
redis-benchmark -p 6379 -c 100 -n 100000

image-20210920162142997

RedisKey基础操作

  • redis默认有16个数据库,默认使用第’0’个

    可以使用select 编号来切换数据库

  • 查看db大小dbsize

  • 清空数据库(不会清空已经持久化的数据)

    1
    2
    flushdb #清除当前数据库
    flushall #清除全部数据库
  • keys * #查看所有的key
    set <key> <value> #设置键值对
    get <key> #获取key对应的value值
    exists <key> #查看指定key是否存在
    move <key> <db_id> #将指定k-v键值对移动至指定数据库
    expire <key> <seconds> #指定过期时间
    ttl <key> #查看指定key剩余时间
    type <key> #查看指定key对应value类型
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ## Redis配置文件

    + 配置文件对大小写不敏感

    可以`include xxx.config`导入多个配置文件

    + 可以通过客户端对配置文件进行修改(如果有密码要求已经认证),如

    ```bash
    config set requirepass "" #修改密码
  • 网络

    1
    2
    3
    bind 127.0.0.1 #绑定ip
    protected-mode yes #保护模式
    port 6379 #端口
  • 通用

    1
    2
    3
    4
    5
    6
    daemonize yes #守护者进程运行(后台运行)
    pidfile /var/run/redis_6379.pid #守护者模式需要指定pid文件
    loglevel notice #日志级别: debug verbose notice warning
    logfile "" #日志文件名
    database 16 #数据库数量
    always-show-log yes #是否显示logo
  • 快照

    持久化:在规定时间内执行多少次操作会持久化到.rdb.aof

    1
    2
    3
    4
    5
    6
    7
    8
    save 900 1 #900s内至少1个key修改了一次就进行持久化操作,下面两个同理
    save 300 10
    save 60 10000

    stop-writes-on-bgsave-error yes #异常之后是否继续工作
    rdbcompression yes #是否压缩rdb文件,需要消耗cpu资源
    rdbchecksum yes #保存rdb文件时是否错误校验
    dir ./ # rdb文件保存的位置
  • 主从复制

    1
    2
    replicaof <masterip> <masterport> #从机配置文件添加主机的ip和端口
    masterauth <master-password> #如果主机有认证密码则在这里配置
  • 安全

    1
    requirepass xxx #设置认证密码,默认为空
    1
    auth xxx #在客户端访问时进行密码认证
  • 客户端限制

    1
    maxclients 10000 #最大客户端数量
  • 内存

    1
    2
    maxmemory <bytes> #最大内存容量
    maxmemory-policy noeviction #内存达到上限的策略
    1
    2
    3
    4
    5
    6
    7
    #内存策略
    volatile-lru #只对设置了过期时间的的key进行lru(默认)
    allkeys-lru #删除lru算法的key
    volatile-random #随机删除即将过期key
    allkeys-random #随机删除
    volatile-ttl #删除季建国七的
    noeviction #永不过期,返回错误
  • append only模式

    1
    2
    3
    appendonly no #默认不开启aof模式,而是使用rdb模式
    appendfilename "appendonly.aof" #持久化文件的名字
    appendfsync everysec #每秒执行一次sync,可能丢失数据,其它选项 always no

Redis数据

五大数据类型

String

String类型操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
append <key> "someString" #向指定key的value字符串后面追加自定义字符串 / 如果当前key不存在就相当于set key value
strlen <key> #获取指定key的value字符串长度

#getrange <key> 0 -1表示获取全部
getrange <key> left right #截取字符串,相当于切片
setrange <key> offset <value> #将偏移量为offset的值开始向后替换为所指的value

setex <key> <seconds> <value> #set with expire 设置过期时间
setnx <key> <value>#set not exists 如果不存在设置,在分布式所常见

#批量操作都是原子性操作,同时成功或失败
mset <key1> <value1> <key2> <value> ... #批量设置值
mget <key1> <key2> ... #批量获取值

getset <key> <value> #先get再set

int类型操作

1
2
3
4
incr <key> #i++ 如果指定key不存在则会自动自动设置kv,且v=1
decr <key> #i-- 如果指定key不存在则会自动自动设置kv,且v=-1
incrby <key> num #数值增加num 同上,自动设置值为num
decrby <key> num #数值减少num 同上,自动设置值为-num

对象操作

1
set user:1 {name:zhangsan,age:3} #以对象形式保存

List

可以将其看做一个双向链表,在两头进行操作效率最高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
lpush <list> <value> #左入栈(新建list)
rpush <list> <value> #右入栈(新建list)

#与getrange类似 lrange 0 -1表示获取全部
lrange <list> left right #从左到右排列

#其他的操作也可以通过l和r作以区分
lpop <list> #左出栈
rpop <list> #右出栈

lset <list> i <newItem> #更新下标为i的值

lindex <list> i #获取list中第i个值

llen <list> #获取list长度

lrem <list> num <value> #移除list中num个为value的值

ltrim <list> left right #切片(与range不同并且会改变原list)

lpoprpush <list1> <list2> #将list1中左端元素移至list2最右端

linsert <list> before/after <value1> <value2> #在value1值得前/后插入value2

Set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sadd <set> <value> #向set中添加value值对(新建set)
smember <set> #查看指定set的所有值
sismenmber <set> <value> #判断某个值是否是set中的元素
scard <set> #查看set中元素的个数
srem <set> <value> #移除set中某个元素

srandmember <set> [i] #随机抽出i个数的元素(默认一个)

spop <set> #随机出栈元素
smove <set1> <set2> <value>#将set1中指定值元素移至set2中

#集合相关
sdiff <set1> <set2> #求差集
sinter <set1> <set2> #求交集
sunion <set1> <set2> #求并集

Hash

key-map,可以存一些变更数据,且更适合对象的存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
hset <map> <key> <value> #给map中填入kv值(新建map)
hset <map> <key1> <value2> <key2> <value2> ... #可以同时赋多个kv值

hgetall <map> #获取一个map中的全部kv值
hdel <map> <key> #删除一个map中指定的kv值

hlen <map> #获取map长度

hexists <map> <key> #判断一个map中指定kv是否存在

hkeys <map> #获取一个map所有key
hvals <map> #获取一个map所有value

incr/decr <map> <key> #自增/自减

hsetnx <map> <key> <value> #不存在则设置

Zset

有序集合

可以用来是先重要消息,以及带权重任务

1
2
3
4
5
6
7
8
9
10
11
zadd <zset> <score> <value> #给zset插入值score和value
zrange <zset> left right #查看从左到右排列

#left和right可以使用+/-inf表示无穷
zrangebyscore <zset> left right #按score排序,区间为left~right
zrevrangebyscore <zset> right left #同上降序排序
zrevrange <zset> left right #按照索引排序,与上面不一样!

zrem <set> <value> #移除指定值的元素
zcard <set> #获取集合中的个数
zcount <set> low high #同级score在low和high之间的个数

特殊数据类型

geospatial

地理位置信息geospatial简称GEO

1
2
3
4
5
6
7
8
9
10
11
12
13
#lot:经度,lat纬度,member详细信息
#规则:南北极无法直接添加,一般会下载城市数据通过java一次性导入
#可以使用key和member联合确定,如key=china:city member=beijing
geoadd <key> <lot> <lat> <member> #添加geo数据
geopos <key> <member1> [<member2> ...] #获取位置信息

geodist <key> <member1> <member2> #计算直线距离

#如附近的人
georadius <key> <lot> <lat> 5 km #给出某一个位置指定半径之内的元素
georadiusbymember <key> <member> 5 km #给出某一个元素指定半径之内的元素

geohash <key> <member1> <member2> #将二维经纬度转换为一维的字符串,两个字符串越相近,距离也越相近

获取附近的人示例:

1
2
#获取以北京为中心,500km为半径距离最近的前三个城市的全部信息
georadiusbymember china:city beijing 500 km withdist withcoord count 3

可用的距离单位:

  • m :米,默认单位。
  • km :千米。
  • mi :英里。
  • ft :英尺。

底层其实是zset,我们可以使用zset相关命令来操作geo,如:

1
2
zrange <ket> left right #查看元素
zrem <key> left right #删除元素

Hyperloglog

基数:不重复的元素

image-20210920223331016

1
2
3
pfadd <key> <value1> <value3> ... #添加数据
pfcount <key> #统计数据
pfmerge <key1> <key2> #合并集合(包括去重)

如果允许容错就可以使用Hyperloglog,不允许容错则使用set()集合或自定义数据类型去重

Bitmaps

位存储

image-20210920224558909

1
2
setbit <key> <offset> <value> #给位移量为offset的位置放入元素value,当然value只能为0,1
bitcount <key> #统计为1的位数

Redis高级特性

Redis的事务

  • 为了保持简单,redis事务保证了其中的一致性和隔离性,不满足原子性和持久性;
  • 一次性,顺序性,排他性的执行一系列的命令

事务指令

1
2
3
4
multi #开启事务
... #输入一系列指令
exec #执行事务中所有的指令
discard #取消事务

不保证原子性的原因

  • 编译型异常:代码有问题,事务中所有指令都不会执行

    image-20210921131338267

  • 运行时异常:逻辑错误,只有事务中出错的指令不会执行,其他正确指令正常运行

    image-20210921131647150

  • 悲观锁:认为什么时候都会出问题,任何操作都会加锁
  • 乐观锁:认为什么时候都不会出问题,所以不会上锁,更新数据的时候判断数据是否已被修改

Redis监视变量,可以视为乐观锁操作

1
2
3
4
5
# 先watch某个key再开启事务,如果exec时key对应的value改变,则事务中的所有指令提交失败,事务回滚
watch <key>
multi

unwatch #取消之前的watch指令

如果watch发现value已经改变,事务执行失败:

  • unwatch先解锁监视
  • watch获取最新值,重新监视
  • 重新提交事务

订阅发布

image-20210921204012895

一些常用命令

image-20210921204032829

image-20210921204110130

主从复制

基本概念

image-20210921204255540

image-20210921205757888

image-20210921205827739

复制原理

image-20210921212534965

1
2
3
4
5
6
7
8
9
10
11
12
13
info replication #查看从属关系
#返回信息
role:master #角色:master,每台redis服务器默认都是master
connected_slaves:0 #已连接的从机
master_failover_state:no-failover
master_replid:20e2fc4f1bef8039f45fc11e89e7abb7c6dbd2dd
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

环境测试

将原配置文件拷贝三份redis-m1.config redis-s1.config redis-s2.config

修改其中的属性:端口,pid,log,dump.rdb

然后分别运行,检查情况

image-20210921210922412

一主二从,在从机的客户端使用命令salveof <ip> <port>使其成为某个主机的从机(暂时),使用salveof no one使其重新变为主机

然后在主机查看结果

image-20210921211506064

  • 只有主机可以进行写操作,从机进行写操作会进行报错,且从机可以读到主机新写入的数据(增量复制
  • 主机断开连接,从机依旧保持从机状态,不能进行写操作,主机如果重新上线,从机依旧可以读取主机写的数据
  • 如果是命令行进行的配置,从机重启后会重新变为主机,只要变回从机,会立即从主机中获取值,进行完全复制

主机宕机的解决方案

如果主机m1突然断开连接,如何选取一个新的主机?

  • 手动选取某个从机为主机salveof no one

  • 哨兵模式(自动选取主机的模式)

    image-20210921215322848

    image-20210921215405539

    image-20210921213724681

    image-20210921213850531

    当主机客观下线之后,所有哨兵会根据算法投票选取一个从机成为新的主机

    哨兵模式配置文件sentinel.conf

    最基础的配置文件可以只配置sentinel monitor mymaster 127.0.0.1 6379 1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    # Example sentinel.conf
    # 哨兵sentinel实例运行的端口 默认26379
    port 26379
    # 哨兵sentinel的工作目录
    dir /tmp

    # 哨兵sentinel监控的redis主节点的 ip port
    # master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
    # quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
    # sentinel monitor <master-name> <ip> <redis-port> <quorum>
    # 最后的1表示总共有1个哨兵观测到宕机就判断主机宕机
    sentinel monitor mymaster 127.0.0.1 6379 1

    # 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
    # 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
    # sentinel auth-pass <master-name> <password>
    sentinel auth-pass mymaster MySUPER--secret-0123passw0rd

    # 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
    # sentinel down-after-milliseconds <master-name> <milliseconds>
    sentinel down-after-milliseconds mymaster 30000

    # 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
    # 这个数字越小,完成failover所需的时间就越长,
    # 但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
    # 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
    # sentinel parallel-syncs <master-name> <numslaves>
    sentinel parallel-syncs mymaster 1

    # 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
    # 1. 同一个sentinel对同一个master两次failover之间的间隔时间。
    # 2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
    # 3.当想要取消一个正在进行的failover所需要的时间。
    # 4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
    # 默认三分钟
    # sentinel failover-timeout <master-name> <milliseconds>
    sentinel failover-timeout mymaster 180000

    # SCRIPTS EXECUTION

    # 配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
    # 对于脚本的运行结果有以下规则:
    # 若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
    # 若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
    # 如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
    # 一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
    # 通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
    # 通知脚本
    # sentinel notification-script <master-name> <script-path>
    sentinel notification-script mymaster /var/redis/notify.sh

    # 客户端重新配置主节点参数脚本
    # 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
    # 以下参数将会在调用脚本时传给脚本:
    # <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
    # 目前<state>总是“failover”,
    # <role>是“leader”或者“observer”中的一个。
    # 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
    # 这个脚本应该是通用的,能被多次调用,不是针对性的。
    # sentinel client-reconfig-script <master-name> <script-path>
    sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

    如果主机宕机之后重新启动,他不会重新成为主机,而是成为新主机的一个从机

    • 优点:
      • 哨兵集群基于主从复制,主从复制的优点都有
      • 主从可以切换,故障可以转移,系统可用性更高
      • 哨兵模式是主从模式的升级,健壮性更高

缓存穿透与雪崩

服务的高可用问题

缓存穿透

查不到

image-20210921220552480

解决方案

  • 布隆过滤器

    是一种数据结构,对所有可能查询的数据以hash形式存储,在控制层先进行校验,不符合则丢弃,避免对底层存储系统的压力

    image-20210921220800881

  • 缓存空对象

    当存储层不命中后,即使返回空对象也将其缓存起来,并设置过期时间,之后相同的查询请求会从缓存中获取,保护后端数据源

    image-20210921220932184

    缺点:

    • 缓存需要更多地存储空间来保存许多值为空的key
    • 即使设置了过期时间,缓存层和存储层的数据还是会存在一段时间的不一致性(缓存中为空但已经向存储层存放了新值),对于需要保持一致性的业务会有影响

缓存击穿

量大,缓存过期

image-20210921221322161

解决方案

  • 设置热点数据不过期

  • 加互斥锁

    保证缓存过期后同时仅有一个线程能够查询数据

缓存雪崩

缓存集体失效或Redis宕机

image-20210921221700754

比如双十一:停掉一些服务,保证主要业务可用

解决方案

  • redis高可用

    多设Redis服务器,搭建大型redis集群

  • 限流降级

    缓存失效后通过加锁或队列来控制数据库写缓存的线程数量,比如某个key仅允许一个线程查询和写缓存

  • 数据预热

    正式部署之前把可能的数据先预先访问一遍,大量访问的数据就会加载到缓存中,在即将发生大的并发访问时手动触发加载不同的key设置不同的过期时间,让缓存失效的时间点尽量均匀

数据淘汰策略

配置文件对应项:maxmemory-policy

Redis提供了5种数据淘汰策略:

  • volatile-lru:使用LRU算法进行数据淘汰(淘汰上次使用时间最早的,且使用次数最少的key),只淘汰设定了有效期的key
  • allkeys-lru:使用LRU算法进行数据淘汰,所有的key都可以被淘汰
  • volatile-random:随机淘汰数据,只淘汰设定了有效期的key
  • allkeys-random:随机淘汰数据,所有的key都可以被淘汰
  • volatile-ttl:淘汰剩余有效期最短的key
  • no-eviction:不进行去主数据,直接报错(默认,但不推荐使用)

Redis4.0新增策略

  • volatile-lfu:从已设置过期时间的数据集挑选使用频率最低的数据淘汰。
  • allkeys-lfu:从数据集中挑选使用频率最低的数据淘汰。

​ 最好为Redis指定一种有效的数据淘汰策略以配合maxmemory设置,避免在内存使用满后发生写入失败的情况。

​ 一般来说,推荐使用的策略是volatile-lru,并辨识Redis中保存的数据的重要性。对于那些重要的,绝对不能丢弃的数据(如配置类数据等),应不设置有效期,这样Redis就永远不会淘汰这些数据。对于那些相对不是那么重要的,并且能够热加载的数据(比如缓存最近登录的用户信息,当在Redis中找不到时,程序会去DB中读取),可以设置上有效期,这样在内存不够时Redis就会淘汰这部分数据。

整合Java

阿里云redis连接失败的原因

  • 阿里云安全组策略是否开启对应端口?
  • redis-server配置文件中是否绑定0.0.0.0?
  • server密码问题
  • 服务器防火墙是否开放对应端口,如CentOS7系统:
    • 开放防火墙对应端口firewall-cmd --zone=public --add-port=6379/tcp --permanent
    • 查看端口开放情况netstat -ntlp

Jedis

java实例

导包

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
//Jedis实例能够调用方法以执行任何redis命令
System.out.println(jedis.ping());
jedis.flushDB();
System.out.println(jedis.set("key", "value"));
System.out.println(jedis.get("key"));
//记得结束后关闭客户端
jedis.close();
}

Jedis事务

1
2
3
4
5
6
7
8
9
10
11
12
public void multi(){
Jedis jedis = new Jedis("127.0.0.1", 6379);
//开启事务
Transaction multi = jedis.multi();
try {
System.out.println(multi.set("key", "value"));
System.out.println(multi.get("key"));
multi.exec();
} finally {
jedis.close();
}
}

整合SpringBoot

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

springboot2.x后原来的jedis被替换为了lettuce

  • jedis:采用直连方法,多个线程操作是不安全的,避免这种情况需要使用jedis pool,类似BIO模式
  • lettuce:采用netty,实例可以在多个线程中共享,不存在线程安全问题,减少线程数据,类似NIO模式

使用默认的RedisTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Autowired
private RedisTemplate redisTemplate;

@Test
void redisTest(){
//获取连接对象
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
connection.flushDb();

//先用opsForXXX()获取专门处理某类业务的工具
//如valueOperations专门操作字符串
ValueOperations valueOperations = redisTemplate.opsForValue();
//再调用valueOperations的方法来执行具体操作
valueOperations.set("key","value");
System.out.println(valueOperations.get("key"));

}

自定义RedisTemplate

自定义配置类给容器中注入自定义Bean,默认的Bean就会失效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);

//自定义Jackson序列化配置
Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
jsonRedisSerializer.setObjectMapper(objectMapper);

//key使用String的序列化方式
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
//hash的key也是用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
//value的key使用jackson的序列化方式
template.setValueSerializer(jsonRedisSerializer);
//hash的value也是用jackson的序列化方式
template.setHashValueSerializer(jsonRedisSerializer);
template.afterPropertiesSet();

return template;
}
}

自定义工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}


/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}


// ============================String=============================

/**
* 普通缓存获取
* @param key 键
* @return
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}

/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/

public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/

public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}


/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}


// ================================Map=================================

/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}

/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}

/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}


/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}


/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}


/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}


// ============================set=============================

/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/

public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

// ===============================list=================================

/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}

}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}

}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/

public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/

public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}

持久化

RDB(Redis DataBase)

主从复制中rdb可以在从机备用

image-20210921200555786

image-20210921200652340

rdb文件命默认为 dump.rdb,可以在配置文件中修改dbfilename进行修改

生成rdb文件的触发规则:

  • 满足save规则,自动生成rdb
  • 执行flushall命令,自动生成rdb(flushdb不会)
  • 退出redis,生成rdb文件

如何恢复rdb文件:

  • 将rdb文件放在redis启动目录,redis启动时会自动检查dump.rdb并恢复其中的数据

  • 查看需要存放的位置config get dir

  • 优点

    • 适合大规模数据恢复
    • 对数据完整性要求不高
  • 缺点

    • 需要一定的时间进行进程操作,如果redis意外宕机,最后一次修改的数据就会消失
    • fork进程的时候也会占用一定的内存空间

一般来说,rdb的默认机制就足够我们日常使用了

AOF(Append Only File)

将我们所有的命令都几乎下来,相当于history,恢复时即将所有命令全部记录下来

aof文件命默认为 appendonly.aof,可以在配置文件中修改dbfilename进行修改

  • 默认不开启,需要手动更改配置文件中appendonly 修改为yes进行开启

  • 优点(三种不同的同步策略)

    • 每一次修改都同步,文件完整性好
    • 每秒同步一次,可能丢失一秒的数据
    • 从不同步,效率最高
  • 缺点

    • 相对于数据文件来说,aof远远大于rdb,修复速度也比rdb慢
    • aof运行效率也比rdb慢,因此redis默认配置就是rdb持久化

扩展

image-20210921202615736

image-20210921202658568