redis应用篇

登录

短信登录-基于Session

  1. image-20240314220915520
  2. 集群的session共享问题
    1. 多台Tomcat并不共享session存储空间,当请求切换到不同Tomcat服务时导致数据丢失的问题。
    2. session的替代方案应该满足:数据共享,内存存储,key、value结构

基于Redis实现共享session登录

  1. image-20240315162854734
  2. 以手机号为key存储验证码-String结构
  3. 对象的每个字段独立存储,可以针对单个字段做crud,并且内存占用更少。——Hash类型。以随机Token为key存储用户数据。
  4. 以随机Tokenkey获取用户数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public String login(LoginFormDTO loginForm, HttpSession session) {
// 校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
throw new RuntimeException("请输入正确的手机号");
}
// 校验验证码是否和redis存储的是否相同
String code = loginForm.getCode();
String codeRedis = stringRedisTemplate.opsForValue()
.get(RedisConstants.LOGIN_CODE_KEY + phone);
if (code == null || !code.equals(codeRedis)) {
throw new RuntimeException("验证码有误");
}
// 根据手机号查询用户是否存在
User user = query().eq("phone", phone).one();
if (user == null) {
// 不存在则将其保存到数据库-注册
user = createUser(loginForm);
save(user);
}
// 将 user保存到session中
// session.setAttribute("user", user);

// 将user保存到Redis中
//通过UUID创建key
String userToken = RedisConstants.LOGIN_USER_KEY + UUID.randomUUID();
// 将user对象转换为map集合存储到Redis中
Map<String, Object> map = BeanUtil.beanToMap(user, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((filedName, fieldValue) -> fieldValue.toString())
);
stringRedisTemplate.opsForHash().putAll(userToken, map);
return userToken;
}

登录拦截器的优化

image-20240315171040175

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 获取Token
String token = request.getHeader("authorization");
// 获取redis中user信息,检查是否存在
if (token == null){
return true;
}
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(token);
if (entries.isEmpty()){
return true;
}
// 存在将其保存到ThreadLocal中
UserDTO dto = BeanUtil.fillBeanWithMap(entries, new UserDTO(), false);
UserDTO userDTO = BeanUtil.copyProperties(dto, UserDTO.class);
UserHolder.saveUser(userDTO);
// 每次请求将token刷新
stringRedisTemplate.expire(token,RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
//放行
return true;

}

缓存

  1. 定义:缓存就是数据交换缓冲区,是存储数据的临时地方,一般读写性能较高。

  2. 作用:

    1. 降低后端负载
    2. 提高读写效率,降低响应时间。
  3. 成本

    1. 数据一致性成本
    2. 代码维护成本
    3. 运维成本
  4. 添加Redis缓存

    1. 缓存作用模型

    2. image-20240315175136106

    3. 根据ID查询商铺缓存的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 2. 用于解决缓存击穿,使用互斥锁的方式
public <R,T> R queryWithLock(String keyPrefix, T id, Class<R> type, Function<T,R> handleDB, Long time, TimeUnit unit) {
// 1. 从Redis中根据ID查询店铺
String key = keyPrefix + id;
String res = redisTemplate.opsForValue().get(key);
// 2. 查到了返回即可
if (!StrUtil.isBlankIfStr(res)) {
return JSONUtil.toBean(res, type);
}
// 如果返回shop为空值"",抛出异常
if (res != null && res.equals("")) {
throw new RuntimeException("该店铺不存在");
}

R s;
try {
if (!addLock(id)) {
Thread.sleep(100);
return queryWithLock(keyPrefix, id, type, handleDB, time, unit);
}

// 3. 未查到,到数据库中查询
s = handleDB.apply(id);

// 4. 未查到抛出异常即可
if (s == null) {
redisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
throw new RuntimeException("未查到该店铺");
}
// 将对象转换成JSON字符串存储到Redis中
String shopBean = JSONUtil.toJsonStr(s);

// 5. 查到了,将其缓存到Redis中
redisTemplate.opsForValue().set(key, shopBean, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
removeLock(id);
}
return s;
}

缓存更新策略

  1. image-20240315203830925
  2. 业务场景
    1. 低一致性需求:使用内存淘汰机制,例如店铺类型的查询缓存
    2. 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存。
  3. 主动更新策略
    1. 读操作:缓存命中则直接返回,缓存未命中则查询数据库,并写入缓存,设定超时时间。
    2. 写操作:先写数据库,然后再删除缓存,要确保数据库与缓存操作的原子性。

缓存穿透

  1. 产生原因:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会达到数据库,当有不怀好意的人创建无数个线程发起这样的请求,将会击垮数据库。
  2. 常见的两种解决方案
    1. :o:缓存空对象
      1. 优点:实现简单,维护方便
      2. 缺点:额外的内存消耗,可能造成短期的不一致性
      3. image-20240315212446430
    2. 布隆过滤
      1. 优点:内存占用较少,没有多余key
      2. 缺点:实现复杂,存在误判可能。
      3. image-20240315212816143
    3. 增强ID的复杂度,避免被猜测id规律,做好参数校验
    4. 做好热点参数的限流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 1. 用于解决缓存穿透的方法
// 1.1 将对象序列化存储到Redis中
public void set(String key, Object o, Long time, TimeUnit unit){
redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(o),time,unit);
}
// 1.2 根据key查询缓存,并反序列化指定类型
public <R,T> R handleStrike(String keyPrefix, T id, Class<R> type, Function<T,R> handleDB, Long time, TimeUnit unit ){
// 1. 从Redis中根据ID查询店铺
String key = keyPrefix + id;
String shop = redisTemplate.opsForValue().get(key);
// 2. 查到了返回即可
if(!StrUtil.isBlankIfStr(shop)){
return JSONUtil.toBean(shop, type);
}
// 如果返回shop为空值"",抛出异常
if(shop != null){
throw new RuntimeException("该店铺不存在");
}
// 3. 未查到,到数据库中查询
R res = handleDB.apply(id);

// 4. 未查到抛出异常即可
if (res == null){
redisTemplate.opsForValue().set(key,"",time, unit);
throw new RuntimeException("未查到该店铺");
}
// 5. 查到了,将其缓存到Redis中
this.set(key,res,time,unit);
return res;
}

缓存雪崩

  1. 产生原因:在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
  2. image-20240315215601739
  3. 解决方案
    1. 给不同的key的TTL添加随机值
    2. 利用Redis集群提高服务的可用性
    3. 给缓存业务添加降级限流策略
    4. 给业务添加多级缓存

缓存击穿

  1. 缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务比较复杂的key突然失效了,无数的请求访问数据库会瞬间给数据库带来巨大的冲击。

  2. image-20240315220625432

  3. 解决方案

    1. 互斥锁

      image-20240315220832060

    2. 逻辑过期

      image-20240315221414110

      image-20240315221446643

基于互斥锁方式解决缓存击穿问题
  1. 总结:也就是说查询数据库只能一个来,而不能多人同时来,当第一个来查询Redis中没有查询到,他就上锁,自己去查询数据库,然后将结果返回的同时将其存储到Redis中,最后将锁释放;其他人来时会遇到锁,就递归调用查询Redis,确保它不能查询到数据库,缓解数据库的压力。锁是Redis中String类型同setnx指令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 2. 用于解决缓存击穿,使用互斥锁的方式
public <R,T> R queryWithLock(String keyPrefix, T id, Class<R> type, Function<T,R> handleDB, Long time, TimeUnit unit) {
// 1. 从Redis中根据ID查询店铺
String key = keyPrefix + id;
String res = redisTemplate.opsForValue().get(key);
// 2. 查到了返回即可
if (!StrUtil.isBlankIfStr(res)) {
return JSONUtil.toBean(res, type);
}
// 如果返回shop为空值"",抛出异常
if (res != null && res.equals("")) {
throw new RuntimeException("该店铺不存在");
}

R s;
try {
if (!addLock(id)) {
Thread.sleep(100);
return queryWithLock(keyPrefix, id, type, handleDB, time, unit);
}

// 3. 未查到,到数据库中查询
s = handleDB.apply(id);

// 4. 未查到抛出异常即可
if (s == null) {
redisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
throw new RuntimeException("未查到该店铺");
}
// 将对象转换成JSON字符串存储到Redis中
String shopBean = JSONUtil.toJsonStr(s);

// 5. 查到了,将其缓存到Redis中
redisTemplate.opsForValue().set(key, shopBean, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
removeLock(id);
}
return s;
}
1
2
3
4
5
6
7
8
9
10
11
// 添加锁
private <T> boolean addLock(T id){
String key = RedisConstants.LOCK_SHOP_KEY + id;
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "");
return BooleanUtil.isTrue(flag);
}
//删除锁
private <T> void removeLock(T id){
redisTemplate.delete(RedisConstants.LOCK_SHOP_KEY+id);
}

基于逻辑过期解决缓存击穿问题

?

缓存工具封装

  1. 基于StringRedisTemplate封装一个缓存工具类
  2. 解决缓存穿透
    1. 将任意java对象序列化为JSON并存储到String类型的key中,并可以设置ttl过期时间
    2. 根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  3. 解决缓存击穿
    1. 将任意java对象序列化为JSON并存储到String类型的key中,并可以设置逻辑过期时间,用于处理缓存击穿问题
    2. 根据指定的key查询缓存,并反序列化为指定类型,利用逻辑过期的方式解决缓存击穿问题

秒杀

全局唯一ID

  1. 原因:如订单表如果使用数据库自增ID就存在一些问题
    1. id的规律太明显
    2. 受单表数量的限制
  2. 全局ID生成器:是一种在分布式系统下用来生成全局唯一的工具,一般要满足下列特性
    1. 高可用
    2. 唯一性
    3. 高性能
    4. 递增性
    5. 安全性
  3. 为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息
    1. ID的组成部分
      1. 符号位:1bit,永远为0
      2. 时间戳:31bit,以秒为单位,可以使用69年
      3. 序列号:32bit,秒内的计数器,支持每秒产生2^32^个不同ID
    2. image-20240316194651030
  4. 全局唯一ID生成策略
    1. UUID
    2. Redis自增
    3. snowflake算法
    4. 数据库自增
  5. Redis自增ID策略
    1. 每天一个key,方便统计订单量
    2. id构造为时间戳+计数器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
	private final static long BEGIN_TIME = 1704067200L;
private final static Integer BIT = 32;
/**
* 生成一个全局唯一的ID。
* 该方法通过结合当前时间戳和序列号来生成一个唯一的ID。时间戳部分确保了ID的时间顺序,而序列号部分则确保了在同一时间点内可以生成多个不同的ID。
* 使用Redis的自增操作来获取序列号,确保了在高并发情况下的唯一性。
* @param keyPrefix Redis键的前缀,用于区分不同的ID生成序列。
* @return 生成的唯一ID。
*/
public long nextId(String keyPrefix) {
// 获取当前时间戳,单位为秒
// 1. 生成时间戳
long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
// 计算自定义起始时间以来的秒数差,用于时间戳部分
long timeStamp = now - BEGIN_TIME;

// 获取当前日期,用于序列号部分的键名
// 2. 生成序列号
String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
// 以指定的键名在Redis中自增,获取当前日期的序列号
long count = redisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 将时间戳部分和序列号部分组合成最终的ID
// 3. 连接时间戳和序列号
return timeStamp << BIT | count;
}

超卖问题

  1. 超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁。

  2. 悲观锁

    1. 认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行(一个个来执行)
    2. 例如Synchronized,Lock都属于悲观锁。
  3. 乐观锁

    1. 认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其他线程对数据做了修改

    2. 如果没有修改则认为是安全的,自己才更新数据。

    3. 如果已经被其他线程修改说明发生了安全问题,此时可以重试或异常。

    4. 版本号法

      image-20240317134334006

    5. CAS(比较and修改)法(推荐)

      image-20240317134539700

      改进:库存大于0即可

一人一单

  1. 优惠券秒杀,要求同一个优惠券,一个用户只能抢购一单。
  2. 查询数据库判断该用户是否语句抢购了优惠券,如果抢购了,则抛出异常即可,但是为了避免高并发问题,需要悲观锁。
  3. 一人一单的并发安全问题
    1. 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
    2. 我们将访问启动两份,端口分别为8081和8082
    3. image-20240317145641913
    4. 然后修改Nginx的conf目录下订单Nginx.conf文件,配置反向代理和负载均衡
    5. image-20240317145728959
    6. 现在,用户请求会在两个节点上负载均衡,再次测试下是否存在线程安全问题。
    7. image-20240317151102680
    8. 解决方式:多个JVM共用一把锁,解决集群模式下的并发问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Transactional
public void getOrderId(VoucherOrder voucher) {
Long id = voucher.getUserId();
// 实现一人一单功能,查询该用户
int count = query().eq("user_id", id).eq("voucher_id", voucher.getVoucherId()).count();
if (count > 0) {
throw new RuntimeException("每人只限一张秒杀优惠券哦");
}
// 5. 数据库优惠券-1
boolean success = seckillVoucherService.update().setSql("stock = stock -1")
.eq("voucher_id", voucher.getVoucherId()).gt("stock", 0)
.update();
if (!success) {
throw new RuntimeException("秒杀优惠券已被抢完");
}
// 6. 添加一条订单
VoucherOrder order = new VoucherOrder();
long orderId = redisIdCreator.nextId("order");
order.setId(orderId);
order.setVoucherId(voucher.getVoucherId());
order.setUserId(id);
save(order);
}

分布式锁

  1. 定义:满足分布式系统或集群模式下多进程可见并且互斥的锁。

  2. 满足的要求

    1. 多进程可见
    2. 互斥
    3. 高可用
    4. 高性能
    5. 安全性
  3. 分布式锁的核心是实现多进程之间互斥

    image-20240317155038783

  4. 实现方法

    1. 获取锁

      1. 互斥:确保只能有一个线程获取锁

    2. 释放锁

      1. 手动释放

      2. 超时释放:获取锁时添加一个超时时间

        image-20240317161722988

    3. 改进Redis的分布式锁

      1. 在获取锁时存入线程标识(可以使用UUID)
      2. 在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致
        1. 如果一致则释放锁
        2. 如果不一致则不释放锁

Redis的Lua脚本

  1. Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。

  2. Redis提供的调用函数

    image-20240319201315077

    1. 例如我们要执行set name Jack,则脚本为

      image-20240319201357667

    2. 若要先执行set name Jack,再执行get name,则脚本为

      image-20240319201443968

  3. 使用Redis调用脚本

    image-20240319201801663

    执行Redis.call(‘set’,’name’,’jack’)这个脚本(无参数)

    image-20240319201853446

  4. 如果脚本中key,value不想写死,可以作为参数传递,key类型参数会放入keys数组,其他参数会放入argv数组,在脚本中可以从keys和argv数组获取这些参数。

    image-20240319203039702

    image-20240319205004934

基于Lua脚本实现分布式锁的释放锁逻辑

  1. RedisTemplate调用Lua脚本的API
  2. image-20240320181225512
  3. image-20240321152743211
  4. image-20240319204631961
  5. image-20240319210037602

RedisSon

  1. Redisson是一个在Redis的基础上实现的java主内存数据网格。它不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

    image-20240319211127248

Redisson入门

  1. 引入依赖

    image-20240319211217801

  2. 配置Redisson客户端image-20240319211233188

  3. 使用Redisson的分布式锁

    image-20240319211513978

Redisson可重入锁原理

image-20240319214517501

  1. 获取锁的Lua脚本

  2. image-20240319214537405

  3. 释放锁的Lua脚本

    image-20240319214805819

  4. 总结:基于Redis的Hash数据类型,判断锁标识是否是自己的,如果是自己的,则获取锁成功且锁+1,重置锁的有效期,否则获取锁失败;

Redisson分布式锁原理

  1. image-20240319223234624
  2. Redisson可解决以下问题
  3. image-20240320165125813
  4. 可重入:利用Hash结构记录线程id和重入次数;利用watchDog延续锁时间,利用信号量控制锁重试等待。
    1. 缺陷:Redis宕机引起锁失效问题。
  5. 可重试:利用信号量和pubSub功能实现等待,唤醒,获取锁失败的重试机制
  6. 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置重试时间
  7. 主从一致性:
    1. image-20240320165910349
    2. 原理:多个独立的Redis节点,必须在所有结点都获取重入锁,才算获取成功。
    3. 缺陷:运维成本高,实现复杂。

异步秒杀

image-20240320173906420

image-20240321165316889

Redis消息队列实现异步秒杀

  1. 消息队列(Message Queue):存放消息的队列。最简单的消息队列模型包括3个角色
    1. 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
    2. 生产者:发送消息到消息队列
    3. 消费者:从消息队列获取消息并处理消息
    4. image-20240321165743821
  2. Redis提供了三种不同的方式来实现消息队列
    1. List结构:基于List结构模拟消息队列
    2. PubSub:基本的点对点消息模型
    3. Stream:比较完善的消息队列模型

基于List结构模拟消息队列

  1. 优点
    1. 利用Redis存储,不受限于JVM内存上限
    2. 基于Redis的持久化机制,数据安全性有保证
    3. 可以满足消息有序性
  2. 缺点
    1. 无法避免消息丢失
    2. 只支持消费者

基于PubSub的消息队列

  1. Pubsub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
    1. subscribe channel[channel]:订阅一个或多个频道
    2. publish channel msg: 向一个频道发送消息
    3. psubscribe pattern[pattern]:订阅与pattern 格式匹配的所有频道
    4. image-20240321173257854
  2. 优点
    1. 采用发布订阅模型,支持多生产,多消费者
  3. 缺点
    1. 不支持数据持久化
    2. 无法避免消息丢失
    3. 消息堆积有上限,超出时数据丢失

:person_with_blond_hair: 基于Stream的消息队列

  1. Stream是Redis5.0引入的一种数据类型,可以实现一个功能非常完善的消息队列。

  2. 发送消息命令

    image-20240321174826373

  3. 读取消息

    方式一:xread

    image-20240321175441007

    xread阻塞方式,读取最新的消息

    Stream类型消息队列的xread命令特点

    1. 消息可回溯
    2. 应该消息可以被多个消费者读取
    3. 可以阻塞读取
    4. 有消息漏读的风险
  4. 消费者组

    1. 将多个消费者划分到一个组中,监听同一个队列。具备下列特点

    2. ==消息分流==:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。

    3. ==消息标示==:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息,确保每一个消息会被消费。

    4. ==消息确认==:消费者获取消息后,消息处于pending(d待处理)状态,并存入一个pending-list。当处理完成后需要通过xack来确认消息,标记消息为已处理,才会从pending-list移除。

    5. 创建消费者组

      1. image-20240321202134452
      2. image-20240321202143040
    6. 从消费者组读取消息

      image-20240321203440211

    7. xreadgroup命令的特点

      1. 消息可回溯
      2. 可以多消费者争抢消息,加快消费速度
      3. 可以阻塞读取
      4. 没有消息漏读的风险
      5. 有消息确认机制,保证消息至少被消费一次。
  5. Redis消息队列

    image-20240321204114995

点赞功能

  1. 实现步骤
    1. 给Blog类添加一个isLike字段,标示是否被当前用户点赞。
    2. 修改电站功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1.
    3. 修改根据ID查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段。
    4. 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 点赞修改
* @param id 博客Id
* @return ok
*/
@Override
public Result likeBlog(Long id) {
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.BLOG_LIKED_KEY + id;
// 查询当前用户是否点赞过了
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if (score == null) {
// 如果未点赞,则点赞数+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
if (isSuccess){
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
}
}else {
// 如果为点赞了,则取消点赞
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
if (isSuccess){
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
}
}
return Result.ok();
}

点赞排行榜

  1. 使用Redis的setSorted来保存点赞的用户,以时间戳作为score,用户Id作为value存储。
  2. 将前n个从其中取出,封装为DTO返回到前端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 获取点赞排行榜top5
* @param id Blog Id
* @return ok
*/
@Override
public Result getBlogByLike(Long id) {
String key = RedisConstants.BLOG_LIKED_KEY + id;
// 从Redis中获取点赞前五条
Set<String> set = stringRedisTemplate.opsForZSet().range(key, 0, 4);
// 为空返回
if (set == null || set.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 不为空,解析Id出来从数据库中查询点赞的用户
List<Long> ids = set.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);

// 封装为List<UserDTO>,返回
List<UserDTO> userDTOS = userService.query().in("id", ids).last("order by field(id," + idStr + ")").list()
.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

return Result.ok(userDTOS);
}

好友关注

关注和取关

  1. 数据库的增和删
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 关注和取关
* @param followId 博主Id
* @param isFollow 关注、取关
* @return ok
*/
@Override
public Result follow(Long followId, Boolean isFollow) {
Long userId = UserHolder.getUser().getId();
// redis关注用户key
String key = "follow:" + userId;
if (isFollow) {
Follow follow = new Follow();
follow.setFollowUserId(followId);
follow.setUserId(userId);
boolean isSuccess = save(follow);
// 将关注的用户保存到Redis中
if (isSuccess) {
stringRedisTemplate.opsForSet().add(key,followId.toString());
}
}else {
boolean isSuccess = followMapper.deleteFollow(followId, userId);
// 删除Redis中的关注集合
if (isSuccess){
stringRedisTemplate.opsForZSet().remove(key,followId.toString());
}
}
return Result.ok();
}

共同关注

  1. 将关注的对象id存放到Redis的Set集合中,以”follow”+自己id作为key,关注对象id作为value。
  2. 使用Redis的Set集合的sinter指令获取交集,将关注对象id解析为Long类型,根据关注对象id查询其,封装为DTO返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 求共同关注
* @param id 目标用户ID
* @return ok
*/
@Override
public Result commonFollow(Long id) {
// 求好友和该用户的共同关注交集
Long userId = UserHolder.getUser().getId();
String key1 = "follow:"+userId;
String key2 = "follow:"+id;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
if (intersect == null || intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}

// 解析intersect获取共同关注的用户ID
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());

//根据ids查询共同关注的用户列表
List<User> userList = userService.listByIds(ids);
// 将userList封装为UserDTO返回
List<UserDTO> userDTOList = userList.stream().map(user -> BeanUtil.toBean(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(userDTOList);
}

关注推送–Feed流

  1. 关注推送也叫做feed流,直译为投喂。为用户持续的提供”沉浸式”的体验,通过无限下拉刷新获取新的信息
  2. image-20240325175457703
  3. feed流的模式
    1. timeline:不做内容筛选,简单的按照发布时间进行排序,常用于好友或关注。例如朋友圈
      1. 优点:信息全面,不会有缺失。并且实现也相对简单
      2. 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
    2. 智能排序:利用智能算法屏蔽 违规的,用户不感兴趣的内容,推送用户感兴趣的信息来吸引用户
      1. 优点:投喂优化感兴趣信息,用户粘度很高,容易沉迷。
      2. 缺点:如果算法不精确,可能起到反作用
  4. timeline模式,实现的三种方案
    1. 拉模式:也叫做读扩散
      1. image-20240325180426216
      2. 缺点:延迟,如果关注的人多,内容多则大大延迟
      3. 优点:内存占用小
    2. 推模式:也叫做写扩散
      1. image-20240325180743673
      2. 缺点:如果粉丝多则需要发多份,内存占用大
      3. 优点:延迟快
    3. 推拉结合模式:也叫做读写混合,兼具推和啦两种模式的优点。:rabbit:
      1. image-20240325181331274
      2. image-20240325181349624
  5. feed流的分页问题-list数据类型
    1. feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式
    2. image-20240326195745091
    3. 会出现重复查询的情况
  6. feed流的滚动分页-zset数据类型
    1. image-20240326200051405
    2. 因此使用Redis的zset数据类型来存储。
  7. 滚动分页查询参数
    1. max: 第一次为当前时间戳 | 上一次查询的最小时间戳
    2. min:0
    3. offset:第一次为0 | 在上一次的结果中,与最小值一样的元素的个数
    4. count:3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 滚动分页查询
* @param max 上一次查询最小时间戳
* @param offset 偏移量
* @return ok
*/
@Override
public Result queryBlogList(Long max, Integer offset) {
// 获取当前id
Long userId = UserHolder.getUser().getId();
String key = RedisConstants.FEED_KEY + userId;
// 根据id查询blogId
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate
.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 2);

if (typedTuples == null) {
return Result.ok();
}
// 将Id存储起来,从数据库根据Id查找blog
List<Long> ids = new ArrayList<>();
long minTime = 0;
// 偏移量os
int os = 1;
// 解析数据
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
ids.add(Long.valueOf(Objects.requireNonNull(tuple.getValue())));
long time = Objects.requireNonNull(tuple.getScore()).longValue();
if(time == minTime){
os++;
}else {
minTime = time;
os = 1;
}
}
// 根据ids查询博客返回排序好的List
String idStr = StrUtil.join(",", ids);
List<Blog> blogList = query().in("id", ids).last("order by field(id," + idStr + ")").list();
for (Blog blog : blogList) {
getBlog(blog);
isBlogLike(blog);
}
// 封装dto返回
ScrollResult result = new ScrollResult();
result.setList(blogList);
result.setOffset(os);
result.setMinTime(minTime);
return Result.ok(result);
}

附近商铺

GEO数据结构

  1. GEO就是Geolocation的简写形式,代表地理坐标,允许存储地理坐标信息,帮助我们根据经纬来检索数据
  2. geoadd:添加一个独立空间信息,包含:经度(longitude),纬度(latitude),值(member)
  3. geodist:计算指定的两个点之间的距离并返回
  4. geohash:将指定member的坐标作为hash字符串形式返回
  5. geopos:返回指定member的坐标
  6. georadius:指定圆心,半径,找到该圆内包含的member,并按照与圆心之间的距离排序后返回,6.2以后已废弃
  7. geosearch:在指定范围内搜索member,并按照指定点之间的距离排序后返回。范围可以是圆形或矩形
  8. geosearchstore:与geosearch功能一致,不过可以把结果存储到一个指定key。

用户签到

bitMap

  1. 我们按月来统计用户签到信息,签到记录为1,未签到记录为0
  2. image-20240327171710442
  3. 把每一个bit为对应当月的每一天,形成了映射关系。用0和1标识业务状态,这种思路就称为==位图(bitMap)==
  4. redis中是利用string类型数据结构实现bitmap,因此最大上限为512M,转换为bit则是2^32个bit位
  5. 因为bitmap底层是基于string数据结构,因此其操作也都封装在字符串相关操作中了
  6. image-20240327173643421
  1. setbit:向指定位置(offset)存入一个0或1
  2. getbit:获取指定位置(offset)的bit值
  3. bitcount:统计bitmap中值为1的bit位的数量
  4. bitfield:操作(查询,修改,自增)bitmap中bit数组中指定位置(offset)的值
  5. bitfield_ro:获取bitmap中bit数组,并以十进制形式返回
  6. bitop:将多个bitmap的结果做位运算(与或非)
  7. bitpos:查询bit数组 中指定范围内第一个0或1出现的位置。

UserServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 签到功能
* @return ok
*/
@Override
public Result sign() {

// 拼接key
// 获取用户id
Long userId = UserHolder.getUser().getId();
log.info("用户签到,{}",userId);
// 获取当前时间
LocalDateTime time = LocalDateTime.now();
String keyPost = time.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = RedisConstants.USER_SIGN_KEY + userId + keyPost;
// 获取当日是本月的第几天
int dayOfMonth = time.getDayOfMonth();
// 写入redis中
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth - 1,true);
return Result.ok("success");
}

签到统计

  1. 连续签到天数:从最后一次签到开始向前统计,直到遇到第一次为签到为止,计算总的签到次数,技术连续签到天数。
  2. image-20240327181334236
  3. 获取本月到今天为止的所有签到数据:bitfield key get u [dayOfMonth] 0
  4. 从后向前遍历每个bit位:与1做与运算,就能得到最后个bit位,随后右移一位,下一个bit位就成为最后一个bit位。
  5. image-20240327181605422

UserServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 统计连续签到天数
* @return ok
*/
@Override
public Result signCount() {
// 拼接key
// 获取用户id
Long userId = UserHolder.getUser().getId();
log.info("用户签到,{}",userId);
// 获取当前时间
LocalDateTime time = LocalDateTime.now();
String keyPost = time.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = RedisConstants.USER_SIGN_KEY + userId + keyPost;
// 获取当日是本月的第几天
int dayOfMonth = time.getDayOfMonth();
// 获取redis中存放的bitmap
List<Long> field = stringRedisTemplate.opsForValue().bitField(key, BitFieldSubCommands
.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));
if (field == null) {
return Result.ok(0);
}
// 获取集合中的第一个元素
Long res = field.get(0);
if (res == 0){
return Result.ok(0);
}
// 定义计数器
int count = 0;
// 循环遍历
while (true){
if ((res & 1) == 0) {
break;
}else {
count++;
}
// 向右一位,去掉最后一个bit位
res >>>= 1;
}
return Result.ok(count);
}

UV统计

hyperLogLog用法

  1. UV:全称为Unique Visitor,也叫做独立访客量,是指通过互联网访问,浏览这个网页的自然人。一天内同一个用户多次访问该网站,值记录一次。
  2. PV:取出Page View,也叫做页面访问量或点击量,用户每访问网站的一个页面,记录一次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
  3. UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis总,数据量会非常恐怖。
  1. HyperLogLog(HLL)是从loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储器所有值。
  2. Redis中HLL是基于string结构实现的,单个hll的内存用于小于16kb,内存占用很低,其测量结果是概率性的,有小于0.81%的误差,可以忽略。
  3. image-20240327211551509

细节

htool工具类

  1. BeanUtil.copyProperties(对象,copy到的类.class):copy属性

    1
    UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
  2. BeanUtil.beanToMap(Object):将一个对象转换成map集合。

  3. BeanUtil.fillBeanWithMap(map,obejct,false):将一个map转换为Object对象

  4. JSONUtil.toBean(json,类.class):将JSON字符串转换成对象。

  5. JSONUtil.toJSON(Object):将对象转换成JSON字符串