作为一名在Java后端摸爬滚打十年的老兵,Redis在我眼里早已不是简单的“缓存工具”——它更像一把瑞士军刀,能在分布式系统的各种场景中“救场”。从电商秒杀的库存守护,到直播平台的在线人数统计,再到O2O项目的附近门店查询,Redis用其灵活的数据结构和高性能特性,解决了无数数据库扛不住、代码写起来麻烦的难题。今天,我将结合真实项目案例,拆解Redis的8大核心非缓存场景,补充底层原理、进阶优化和避坑细节,所有代码均可直接落地生产。
一、分布式锁:高并发下的“线程秩序官”
在分布式系统中,多服务、多线程同时操作共享资源(如库存、订单)时,数据库行锁会成为瓶颈,而本地锁(如synchronized)又无法跨服务生效——这时候Redis分布式锁就是“救命稻草”。
1. 为什么不推荐自己实现?
很多新手会尝试用setnx + expire实现分布式锁,但这种方案存在致命缺陷:
- 原子性问题:
setnx和expire是两步操作,若执行完setnx后网络中断,锁会永久存在,导致死锁; - 锁超时问题:业务执行时间超过锁过期时间,会导致“锁提前释放”,其他线程误抢锁;
- 集群一致性问题:主从复制异步执行,主节点挂了后从节点未同步锁信息,会出现“锁丢失”。
2. 生产级方案:Redisson 一站式解决
Redisson是Redis官方推荐的Java客户端,内置分布式锁实现,解决了上述所有问题,核心特性包括:
- 自动续期(Watch Dog机制):业务未执行完时,自动延长锁过期时间;
- 原子性操作:底层用
SET key value NX EX命令,一步完成加锁和过期设置; - 集群兼容:支持RedLock算法,多主节点部署确保锁不丢失。
实战代码(Spring Boot集成)
<!-- 引入Redisson依赖(建议3.20+版本,修复大量旧bug) -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.3</version>
</dependency>
@Service
public class StockService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private StockMapper stockMapper;
// 秒杀扣库存:key格式=业务类型:资源ID(如stock:1001表示商品1001的库存锁)
public boolean deductStock(Long goodsId, Integer quantity) {
// 1. 获取锁(命名规范:业务+资源,避免key冲突)
RLock lock = redissonClient.getLock("stock:" + goodsId);
try {
// 2. 尝试加锁:最多等待30秒(获取锁的超时时间),锁自动续期30秒(释放时间)
// 等待时间:防止线程一直阻塞,超过则放弃;释放时间:防止死锁
boolean locked = lock.tryLock(30, 30, TimeUnit.SECONDS);
if (!locked) {
// 3. 加锁失败:返回“秒杀拥挤”提示
return false;
}
// 4. 加锁成功:执行业务逻辑(扣库存)
Stock stock = stockMapper.selectById(goodsId);
if (stock == null || stock.getRemaining() < quantity) {
return false; // 库存不足
}
// 数据库扣库存(建议加事务)
stock.setRemaining(stock.getRemaining() - quantity);
return stockMapper.updateById(stock) > 0;
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态,避免线程状态异常
return false;
} finally {
// 5. 释放锁:仅释放当前线程持有的锁,避免误删他人锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
3. 进阶优化:锁的细粒度与集群适配
- 细粒度锁:避免用“stock:all”这类粗粒度锁,改用“stock:goodsId”,减少锁竞争(如商品1001和1002的锁互不影响);
- RedLock配置:Redis集群部署时,启用RedLock确保高可用:
Config config = new Config(); // 配置3个主节点(RedLock需至少3个独立主节点) config.useRedLock() .addNodeAddress("redis://192.168.0.101:6379") .addNodeAddress("redis://192.168.0.102:6379") .addNodeAddress("redis://192.168.0.103:6379"); RedissonClient redisson = Redisson.create(config); - 避免锁重入问题:Redisson的
RLock支持重入(类似synchronized),但需注意:同一线程多次加锁后,需对应次数的unlock,或用lock.unlockAsync()异步释放。
二、计数器:高并发下的“性能救火员”
Redis的INCR/DECR命令是原子操作,支持秒级万级并发,远超数据库的UPDATE性能,适合在线人数、接口限流、订单号生成等场景。
1. 经典场景:直播在线人数统计
痛点回顾
早期用数据库统计在线人数:
UPDATE live_room SET online_count = online_count + 1 WHERE room_id = 1001;
QPS仅200就出现数据库连接池满、锁等待超时——因为online_count字段会触发行锁,所有请求串行执行。
Redis方案:原子计数+过期自动清理
@Service
public class LiveRoomService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String ONLINE_COUNT_KEY = "live:online:room:";
private static final long EXPIRE_SECONDS = 86400; // 24小时过期(避免僵尸key)
// 观众进入直播间:在线人数+1
public Long userEnter(Long roomId) {
String key = ONLINE_COUNT_KEY + roomId;
// 1. 原子递增:INCR命令是单线程执行,无并发问题
Long newCount = redisTemplate.opsForValue().increment(key);
// 2. 首次计数时设置过期时间(避免重复执行EXPIRE,节省性能)
if (newCount != null && newCount == 1) {
redisTemplate.expire(key, EXPIRE_SECONDS, TimeUnit.SECONDS);
}
return newCount;
}
// 观众离开直播间:在线人数-1(需防止负数,比如异常退出未触发时)
public Long userLeave(Long roomId) {
String key = ONLINE_COUNT_KEY + roomId;
// 1. 原子递减:用DECRBY确保减少指定数量(此处减1)
Long newCount = redisTemplate.opsForValue().decrement(key);
// 2. 若计数<=0,删除key(避免负数占用内存)
if (newCount != null && newCount <= 0) {
redisTemplate.delete(key);
}
return newCount != null ? Math.max(newCount, 0) : 0;
}
// 获取当前在线人数(兜底:若key不存在,返回0)
public Long getOnlineCount(Long roomId) {
String key = ONLINE_COUNT_KEY + roomId;
String countStr = redisTemplate.opsForValue().get(key);
return countStr == null ? 0 : Long.parseLong(countStr);
}
}
2. 进阶场景:接口限流(滑动窗口算法)
固定窗口限流(如“1分钟最多100次请求”)存在“边界漏洞”(如59秒和1秒分别请求100次,实际2秒内200次),而滑动窗口算法可解决此问题——用ZSET记录每次请求的时间戳,实时清理过期请求。
滑动窗口限流代码
@Component
public class RateLimiter {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String RATE_LIMIT_KEY = "rate:limit:user:";
private static final int MAX_REQUESTS = 100; // 每分钟最多100次请求
private static final long WINDOW_SIZE = 60 * 1000; // 窗口大小:60秒(毫秒)
public boolean allowRequest(Long userId) {
String key = RATE_LIMIT_KEY + userId;
long now = System.currentTimeMillis();
// 1. 开启Redis事务(确保操作原子性)
redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.multi(); // 事务开始
ZSetOperations zSetOps = operations.opsForZSet();
// 2. 步骤1:删除窗口外的请求(清理60秒前的记录)
zSetOps.removeRangeByScore(key, 0, now - WINDOW_SIZE);
// 3. 步骤2:记录当前请求的时间戳(value和score都用时间戳,确保唯一)
zSetOps.add(key, now + ":" + Thread.currentThread().getId(), now);
// 4. 步骤3:设置key过期时间(避免内存泄漏)
operations.expire(key, WINDOW_SIZE + 1000, TimeUnit.MILLISECONDS);
operations.exec(); // 事务提交
return null;
}
});
// 5. 统计当前窗口内的请求数
Long count = redisTemplate.opsForZSet().zCard(key);
// 6. 判断是否超过限流阈值
return count != null && count <= MAX_REQUESTS;
}
}
3. 避坑指南
- 避免计数溢出:Redis的
INCR支持64位整数,日常场景足够,但需注意:若用INCRBY递增较大值,需提前校验是否超过业务上限; - 过期时间必设:计数器key必须加过期时间,否则会成为“僵尸key”,长期占用内存(如直播结束后,在线人数key需自动清理);
- 集群分片问题:若用户ID分布不均,可能导致某一分片的计数器key过多,需用“key+随机后缀”分散(如
rate:limit:user:1001:shard:3)。
三、排行榜:ZSET的“凡尔赛时刻”
Redis的ZSET(有序集合)是天然的排行榜工具,支持按分数排序、范围查询、排名获取,性能远超数据库的ORDER BY + LIMIT——尤其适合点赞榜、销量榜、积分榜等场景。
1. 经典场景:博客点赞排行榜
需求拆解
- 用户点赞某篇博客,该博客的点赞数+1;
- 展示点赞Top10的博客(含点赞数);
- 查询某篇博客的实时排名。
实战代码(Spring Data Redis)
@Service
public class BlogRankService {
@Autowired
private StringRedisTemplate redisTemplate;
// ZSET的key:业务类型+时间范围(如blog:like:rank:202509表示2025年9月的点赞榜)
private static final String RANK_KEY_TEMPLATE = "blog:like:rank:%s";
// 1. 点赞:给指定博客的分数+1(分数=点赞数)
public void likeBlog(Long blogId) {
String key = getCurrentMonthRankKey();
// incrementScore:原子递增分数,不存在则创建(分数初始为0,递增后为1)
redisTemplate.opsForZSet().incrementScore(key, blogId.toString(), 1);
// 设置过期时间:保留3个月(避免历史数据占用内存)
redisTemplate.expire(key, 3, TimeUnit.MONTHS);
}
// 2. 取消点赞:给指定博客的分数-1(需防止分数为负)
public void cancelLike(Long blogId) {
String key = getCurrentMonthRankKey();
// 先查询当前分数,避免减到负数
Double currentScore = redisTemplate.opsForZSet().score(key, blogId.toString());
if (currentScore != null && currentScore > 0) {
redisTemplate.opsForZSet().incrementScore(key, blogId.toString(), -1);
// 若分数<=0,删除该博客(避免无效数据)
if (currentScore - 1 <= 0) {
redisTemplate.opsForZSet().remove(key, blogId.toString());
}
}
}
// 3. 获取Top10点赞榜(倒序:分数高的在前)
public List<BlogRankVO> getTop10Blogs() {
String key = getCurrentMonthRankKey();
// reverseRangeWithScores:倒序查询(0-9表示前10条),包含分数
Set<ZSetOperations.TypedTuple<String>> top10 = redisTemplate.opsForZSet()
.reverseRangeWithScores(key, 0, 9);
if (top10 == null || top10.isEmpty()) {
return Collections.emptyList();
}
// 转换为VO返回(含博客ID、点赞数、排名)
List<BlogRankVO> result = new ArrayList<>();
int rank = 1;
for (ZSetOperations.TypedTuple<String> tuple : top10) {
BlogRankVO vo = new BlogRankVO();
vo.setBlogId(Long.parseLong(tuple.getValue()));
vo.setLikeCount(tuple.getScore().longValue());
vo.setRank(rank++);
result.add(vo);
}
return result;
}
// 4. 查询某篇博客的实时排名(reverseRank:倒序排名,0表示第1名)
public Integer getBlogRank(Long blogId) {
String key = getCurrentMonthRankKey();
Long rank = redisTemplate.opsForZSet().reverseRank(key, blogId.toString());
return rank == null ? null : rank.intValue() + 1; // 转换为1开始的排名
}
// 辅助方法:获取当前月份的排行榜key(按月份拆分,避免单key过大)
private String getCurrentMonthRankKey() {
LocalDate now = LocalDate.now();
String month = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
return String.format(RANK_KEY_TEMPLATE, month);
}
// 排行榜VO
@Data
public static class BlogRankVO {
private Long blogId;
private Long likeCount;
private Integer rank;
}
}
2. 进阶优化:带时间权重的排行榜
若需“近期点赞权重更高”(如昨天点赞算10分,上周点赞算1分),可在递增分数时加入时间系数:
// 点赞时按时间计算分数:当前时间与博客发布时间的间隔越短,权重越高
public void likeBlogWithTimeWeight(Long blogId, LocalDateTime publishTime) {
String key = getCurrentMonthRankKey();
// 计算时间差(天):发布时间距今越近,权重越高
long days = ChronoUnit.DAYS.between(publishTime, LocalDateTime.now());
double weight = days <= 1 ? 10.0 : (days <= 7 ? 5.0 : 1.0); // 1天内10分,1周内5分,超过1周1分
// 按权重递增分数
redisTemplate.opsForZSet().incrementScore(key, blogId.toString(), weight);
}
3. 性能优化要点
- 拆分key:按时间(如每月、每周)拆分ZSET,避免单key数据量过大(建议单key不超过10万条,否则查询变慢);
- 冷热分离:Top100的“热榜”用ZSET实时查询,100名后的“冷榜”每天凌晨归档到数据库(如MySQL的
blog_rank_history表); - 批量操作:若需初始化历史数据,用
addAll批量添加ZSET元素,避免循环调用add(减少Redis网络请求)。
四、消息队列:轻量级的“异步快递站”
Redis的List和Pub/Sub可实现轻量级消息队列,适合中小规模的异步场景(如订单通知、日志收集),虽然功能不如RabbitMQ/Kafka完善,但胜在部署简单、无额外组件依赖。
1. 两种模式对比与选型
| 模式 | 数据结构 | 核心特性 | 优点 | 缺点 | 适合场景 |
|---|---|---|---|---|---|
| 队列模式 | List | FIFO(先进先出) | 支持持久化、重试、消息回溯 | 不支持广播、无消息确认机制 | 订单处理、日志收集、任务调度 |
| 发布订阅 | Pub/Sub | 广播(一对多) | 实时性高、支持多订阅者 | 不支持持久化、消息易丢失 | 实时通知、聊天系统、监控告警 |
2. 队列模式:可靠消费(带重试机制)
需求:订单创建后异步发送短信通知
@Service
public class OrderMessageService {
@Autowired
private StringRedisTemplate redisTemplate;
// 队列key:业务类型+队列用途(如order:queue:sms表示订单短信通知队列)
private static final String ORDER_SMS_QUEUE = "order:queue:sms";
// 重试队列key:失败的消息放入重试队列,延迟重试
private static final String ORDER_SMS_RETRY_QUEUE = "order:queue:sms:retry";
// 最大重试次数
private static final int MAX_RETRY_COUNT = 3;
// 1. 生产者:订单创建后,发送消息到队列
public void sendSmsMessage(OrderSmsVO smsVO) {
try {
// 序列化消息(用JSON格式,便于反序列化)
String message = JSON.toJSONString(smsVO);
// lpush:从队列头部添加消息(消费者用rpop从尾部获取,实现FIFO)
redisTemplate.opsForList().leftPush(ORDER_SMS_QUEUE, message);
// 设置队列过期时间(避免无消费者时消息堆积)
redisTemplate.expire(ORDER_SMS_QUEUE, 24, TimeUnit.HOURS);
} catch (Exception e) {
// 异常处理:如记录日志、报警,避免消息丢失
log.error("发送订单短信消息失败,orderId:{}", smsVO.getOrderId(), e);
}
}
// 2. 消费者:监听队列,处理消息(建议用独立线程池,避免阻塞业务线程)
@PostConstruct // 服务启动后自动执行
public void startConsumer() {
Executors.newSingleThreadExecutor().submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// brpop:阻塞读取队列(0表示一直阻塞,直到有消息)
// 返回值:List[0]是队列名,List[1]是消息内容
List<String> messageList = redisTemplate.opsForList()
.rightPop(ORDER_SMS_QUEUE, 0, TimeUnit.SECONDS);
if (messageList == null || messageList.size() < 2) {
continue;
}
String message = messageList.get(1);
OrderSmsVO smsVO = JSON.parseObject(message, OrderSmsVO.class);
// 处理消息(发送短信)
boolean success = processSmsMessage(smsVO);
if (!success) {
// 处理失败:放入重试队列,记录重试次数
handleRetryMessage(smsVO);
}
} catch (Exception e) {
log.error("消费订单短信消息异常", e);
// 避免异常导致循环过快,休眠1秒
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
});
}
// 处理短信发送
private boolean processSmsMessage(OrderSmsVO smsVO) {
try {
// 调用短信服务商API(如阿里云短信)
smsClient.send(smsVO.getPhone(), smsVO.getContent());
log.info("订单短信发送成功,orderId:{}", smsVO.getOrderId());
return true;
} catch (Exception e) {
log.error("订单短信发送失败,orderId:{}", smsVO.getOrderId(), e);
return false;
}
}
// 处理重试消息:超过最大重试次数则归档到失败表
private void handleRetryMessage(OrderSmsVO smsVO) {
// 1. 读取当前重试次数(消息中携带,初始为0)
int retryCount = smsVO.getRetryCount() == null ? 0 : smsVO.getRetryCount();
retryCount++;
smsVO.setRetryCount(retryCount);
if (retryCount > MAX_RETRY_COUNT) {
// 2. 超过最大重试次数:归档到数据库(后续人工处理)
orderSmsFailMapper.insert(new OrderSmsFail(smsVO));
log.warn("订单短信重试超过最大次数,归档失败记录,orderId:{}", smsVO.getOrderId());
} else {
// 3. 未超过:放入重试队列,延迟重试(指数退避:1s、2s、4s)
long delay = (long) Math.pow(2, retryCount - 1); // 重试延迟:1s、2s、4s
String message = JSON.toJSONString(smsVO);
// 用zadd实现延迟队列:score=当前时间+延迟时间(毫秒)
redisTemplate.opsForZSet().add(ORDER_SMS_RETRY_QUEUE, message,
System.currentTimeMillis() + delay * 1000);
// 启动重试队列消费者(单独线程)
startRetryConsumer();
}
}
// 重试队列消费者:定时扫描延迟到期的消息
private void startRetryConsumer() {
Executors.newSingleThreadExecutor().submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
long now = System.currentTimeMillis();
// 1. 查询score<=当前时间的消息(延迟到期的消息)
Set<String> messages = redisTemplate.opsForZSet()
.rangeByScore(ORDER_SMS_RETRY_QUEUE, 0, now);
if (messages == null || messages.isEmpty()) {
Thread.sleep(1000); // 无消息,休眠1秒
continue;
}
// 2. 处理每条到期的消息
for (String message : messages) {
OrderSmsVO smsVO = JSON.parseObject(message, OrderSmsVO.class);
// 重新发送到主队列
redisTemplate.opsForList().leftPush(ORDER_SMS_QUEUE, message);
// 从重试队列删除
redisTemplate.opsForZSet().remove(ORDER_SMS_RETRY_QUEUE, message);
}
} catch (Exception e) {
log.error("消费重试短信消息异常", e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
});
}
// 短信消息VO
@Data
public static class OrderSmsVO {
private Long orderId;
private String phone;
private String content;
private Integer retryCount; // 重试次数
}
}
3. 发布订阅模式:实时通知
需求:用户下单后,多个模块(库存、积分、日志)接收通知
@Service
public class OrderPubSubService {
@Autowired
private StringRedisTemplate redisTemplate;
// 频道名:业务类型+事件(如order:channel:created表示订单创建事件)
private static final String ORDER_CREATED_CHANNEL = "order:channel:created";
// 1. 发布者:订单创建后发布事件
public void publishOrderCreatedEvent(OrderCreatedEvent event) {
try {
String message = JSON.toJSONString(event);
// convertAndSend:向指定频道发布消息
redisTemplate.convertAndSend(ORDER_CREATED_CHANNEL, message);
} catch (Exception e) {
log.error("发布订单创建事件失败,orderId:{}", event.getOrderId(), e);
}
}
// 2. 订阅者1:库存模块接收通知(扣库存)
@PostConstruct
public void subscribeStock() {
// 订阅指定频道
redisTemplate.getConnectionFactory().getConnection().subscribe(
new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
// 解析消息
String eventJson = new String(message.getBody(), StandardCharsets.UTF_8);
OrderCreatedEvent event = JSON.parseObject(eventJson, OrderCreatedEvent.class);
// 处理库存扣减
stockService.deductStock(event.getGoodsId(), event.getQuantity());
}
},
ORDER_CREATED_CHANNEL.getBytes(StandardCharsets.UTF_8)
);
}
// 3. 订阅者2:积分模块接收通知(加积分)
@PostConstruct
public void subscribePoint() {
redisTemplate.getConnectionFactory().getConnection().subscribe(
new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
String eventJson = new String(message.getBody(), StandardCharsets.UTF_8);
OrderCreatedEvent event = JSON.parseObject(eventJson, OrderCreatedEvent.class);
// 处理积分增加
pointService.addPoint(event.getUserId(), event.getPoint());
}
},
ORDER_CREATED_CHANNEL.getBytes(StandardCharsets.UTF_8)
);
}
// 订单创建事件
@Data
public static class OrderCreatedEvent {
private Long orderId;
private Long userId;
private Long goodsId;
private Integer quantity;
private Integer point; // 订单可获得的积分
}
}
4. 消息队列避坑指南
- 持久化配置:开启Redis的RDB或AOF持久化,避免Redis重启导致队列消息丢失;
- 消息幂等:消费者必须实现幂等(如用
orderId作为唯一键,避免重复处理),因为Redis队列可能出现消息重复(如重试机制导致); - 不适合大规模场景:若QPS超过1万、需要消息追踪或事务消息,建议改用Kafka/RabbitMQ,Redis队列仅适合中小规模。
五、会话共享:分布式系统的“登录管家”
在分布式系统中,用户登录后Session默认存储在单个服务器的内存中,若请求被转发到其他服务器,会出现“登录状态丢失”——Redis可作为分布式Session存储,实现跨服务登录状态共享。
1. 传统Session的痛点与Redis解决方案
| 问题 | 传统Session(内存存储) | Redis分布式Session |
|---|---|---|
| 跨服务共享 | 不支持,用户需重新登录 | 支持,所有服务共享Redis中的Session |
| 服务器重启 | Session丢失,用户需重新登录 | Session持久化,重启不丢失 |
| 内存占用 | 服务器内存有限,支持用户数少 | Redis内存可扩展,支持大规模用户 |
| 过期管理 | 依赖服务器内存回收,不稳定 | 支持精确过期时间,自动清理 |
2. Spring Session集成Redis(零侵入)
Spring Session可无缝替换Tomcat的默认Session,自动将Session存储到Redis,无需修改业务代码。
步骤1:引入依赖
<!-- Spring Session + Redis -->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<!-- Redis客户端(Lettuce,Spring Boot默认) -->
<dependency>
<groupId>io.lettuce.core</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
步骤2:配置Redis与Session
@Configuration
// 启用Redis Session,设置过期时间(30分钟)
@EnableRedisHttpSession(
maxInactiveIntervalInSeconds = 1800,
redisNamespace = "spring:session:mall" // 命名空间,避免与其他项目冲突
)
public class RedisSessionConfig {
// 配置Redis连接(若application.yml已配置,可省略)
@Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("127.0.0.1");
config.setPort(6379);
// 若Redis有密码,添加配置:config.setPassword(RedisPassword.of("password"));
return new LettuceConnectionFactory(config);
}
// 自定义Session序列化(默认是JDK序列化,建议改用JSON,可读性更高)
@Bean
public RedisSerializer<Object> springSessionDefaultRedisSerializer() {
// 使用Jackson2JsonRedisSerializer序列化Session
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule()); // 支持LocalDateTime等JDK8时间类型
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
serializer.setObjectMapper(objectMapper);
return serializer;
}
}
步骤3:业务代码(与传统Session完全一致)
@Controller
@RequestMapping("/user")
public class UserController {
// 登录:存储用户信息到Session
@PostMapping("/login")
@ResponseBody
public String login(HttpSession session, @RequestParam String username, @RequestParam String password) {
// 1. 校验用户名密码(省略,实际项目需查数据库)
User user = userService.login(username, password);
if (user == null) {
return "登录失败";
}
// 2. 存储用户信息到Session(自动同步到Redis)
session.setAttribute("loginUser", user);
// 3. 返回SessionID(可选,前端可存储到Cookie)
return "登录成功,SessionID:" + session.getId();
}
// 个人中心:从Session获取用户信息(跨服务可共享)
@GetMapping("/profile")
@ResponseBody
public User profile(HttpSession session) {
// 从Session获取用户信息(自动从Redis读取)
User user = (User) session.getAttribute("loginUser");
if (user == null) {
throw new RuntimeException("请先登录");
}
return user;
}
// 退出登录:销毁Session(自动删除Redis中的Session)
@PostMapping("/logout")
@ResponseBody
public String logout(HttpSession session) {
session.invalidate(); // 销毁Session
return "退出成功";
}
}
3. 安全与性能优化
- SessionID安全:启用HTTPS,防止SessionID被劫持;SessionID生成时加入随机盐值(Spring Session默认支持);
- 避免大对象存储:Session中仅存储用户ID、角色等核心信息,不存储大对象(如用户头像、详细资料),避免Redis内存占用过大;
- Session刷新:用户活跃时自动刷新Session过期时间(Spring Session默认支持,可通过
maxInactiveIntervalInSeconds配置); - 集群适配:Redis集群部署时,启用
RedisClusterConfiguration,确保Session在集群中均匀分布。
六、地理位置(GEO):“附近的人”背后的技术
Redis的GEO数据结构专门用于存储地理位置信息(经纬度),支持“附近的点查询”“两点距离计算”等操作,底层基于Geohash算法,查询效率远超数据库的经纬度计算(如MySQL的ST_Distance函数)。
1. 经典场景:O2O平台查询附近门店
需求拆解
- 存储门店的经纬度信息;
- 查询用户当前位置附近5公里的门店(按距离排序);
- 计算用户与某门店的直线距离。
实战代码
@Service
public class ShopGeoService {
@Autowired
private StringRedisTemplate redisTemplate;
// GEO的key:业务类型+资源(如shop:geo:locations表示门店地理位置)
private static final String SHOP_GEO_KEY = "shop:geo:locations";
// 距离单位:公里(可改为METERS米、MILES英里)
private static final Metrics DISTANCE_METRIC = Metrics.KILOMETERS;
// 1. 存储门店地理位置(添加或更新)
public void addShopLocation(Long shopId, double longitude, double latitude) {
try {
// geoAdd:添加经纬度(注意:Redis GEO的参数是“经度、纬度、成员ID”)
redisTemplate.opsForGeo().add(
SHOP_GEO_KEY,
new Point(longitude, latitude), // Point(longitude, latitude)
shopId.toString()
);
// 设置过期时间(若门店信息不会频繁变更,可省略;否则建议定期更新)
redisTemplate.expire(SHOP_GEO_KEY, 30, TimeUnit.DAYS);
} catch (Exception e) {
log.error("添加门店地理位置失败,shopId:{}", shopId, e);
}
}
// 2. 查询附近门店(按距离升序,返回前20条)
public List<ShopGeoVO> findNearbyShops(double userLongitude, double userLatitude, double radius) {
// 1. 构建查询条件:圆心(用户位置)、半径、单位
Circle circle = new Circle(
new Point(userLongitude, userLatitude),
new Distance(radius, DISTANCE_METRIC)
);
// 2. 配置查询参数:包含距离、按距离升序、限制20条
RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs()
.includeDistance() // 包含距离信息
.includeCoordinates() // 包含经纬度(可选)
.sortAscending() // 按距离升序
.limit(20); // 最多返回20条
// 3. 执行查询
GeoResults<RedisGeoCommands.GeoLocation<String>> results = redisTemplate.opsForGeo()
.radius(SHOP_GEO_KEY, circle, args);
if (results == null || results.getContent().isEmpty()) {
return Collections.emptyList();
}
// 4. 转换为VO返回
List<ShopGeoVO> shopList = new ArrayList<>();
for (GeoResult<RedisGeoCommands.GeoLocation<String>> result : results.getContent()) {
RedisGeoCommands.GeoLocation<String> location = result.getContent();
ShopGeoVO vo = new ShopGeoVO();
vo.setShopId(Long.parseLong(location.getName()));
// 获取距离(保留2位小数)
double distance = result.getDistance().getValue();
vo.setDistance(Math.round(distance * 100.0) / 100.0);
// 获取经纬度(可选)
vo.setLongitude(location.getPoint().getX());
vo.setLatitude(location.getPoint().getY());
shopList.add(vo);
}
return shopList;
}
// 3. 计算两个门店之间的直线距离
public Double calculateDistance(Long shopId1, Long shopId2) {
try {
// geoDist:计算两个成员之间的距离
Distance distance = redisTemplate.opsForGeo().distance(
SHOP_GEO_KEY,
shopId1.toString(),
shopId2.toString(),
DISTANCE_METRIC
);
return distance != null ? distance.getValue() : null;
} catch (Exception e) {
log.error("计算门店距离失败,shopId1:{}, shopId2:{}", shopId1, shopId2, e);
return null;
}
}
// 4. 删除门店地理位置(如门店关闭)
public void removeShopLocation(Long shopId) {
redisTemplate.opsForGeo().remove(SHOP_GEO_KEY, shopId.toString());
}
// 门店地理位置VO
@Data
public static class ShopGeoVO {
private Long shopId;
private double longitude; // 经度
private double latitude; // 纬度
private double distance; // 与用户的距离(公里)
}
}
2. 底层原理:Geohash算法
Redis GEO底层用Geohash算法将二维经纬度转换为一维字符串(如“wx4g0s8q”),通过字符串前缀匹配实现“附近查询”——前缀相同的点距离较近,查询时只需扫描前缀相同的区域,效率极高。
3. 避坑与优化
- 经纬度精度:经纬度保留6位小数即可(精确到1米),过多小数位无意义且浪费存储空间;
- 数据量限制:单个GEO key建议存储不超过10万个点,否则查询性能会下降(超过时按区域拆分key,如“shop:geo:beijing”“shop:geo:shanghai”);
- 距离误差:Geohash算法在边界区域可能存在误差(约5%),若需高精度场景(如导航),建议结合其他地图服务(如高德、百度地图API)。
七、其他实战场景:Redis的“野路子”用法
除了上述核心场景,Redis还有很多“冷门但好用”的功能,能解决一些特殊业务问题。
1. 分布式事务:秒杀防超卖(WATCH + MULTI)
Redis的WATCH + MULTI + EXEC可实现简单的分布式事务,适合秒杀场景的“库存预扣减”,确保库存不超卖。
public boolean seckillGoods(Long goodsId, Long userId) {
String stockKey = "seckill:stock:" + goodsId;
String userKey = "seckill:user:" + goodsId; // 记录已秒杀的用户,防止重复秒杀
Jedis jedis = null;
try {
jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection();
// 1. WATCH:监控库存和用户集合(若这两个key被修改,事务会取消)
jedis.watch(stockKey, userKey);
// 2. 检查库存和是否已秒杀
String stockStr = jedis.get(stockKey);
if (stockStr == null || Integer.parseInt(stockStr) <= 0) {
return false; // 库存不足
}
if (jedis.sismember(userKey, userId.toString())) {
return false; // 已秒杀过,防止重复
}
// 3. MULTI:开启事务
Transaction tx = jedis.multi();
// 4. 执行业务操作:库存-1,添加用户到已秒杀集合
tx.decr(stockKey);
tx.sadd(userKey, userId.toString());
// 5. EXEC:提交事务(若WATCH的key被修改,返回null)
List<Object> results = tx.exec();
return results != null && results.size() == 2; // 事务执行成功
} catch (Exception e) {
log.error("秒杀商品失败,goodsId:{}, userId:{}", goodsId, userId, e);
return false;
} finally {
if (jedis != null) {
jedis.unwatch(); // 解除WATCH
jedis.close();
}
}
}
2. 实时在线用户:SET集合去重
用SET存储在线用户ID,支持快速添加、删除和去重,查询在线用户数比数据库SELECT COUNT(DISTINCT user_id)快10倍以上。
@Service
public class OnlineUserService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String ONLINE_USER_KEY = "online:user:set";
private static final long EXPIRE_SECONDS = 30; // 用户30秒无活动则视为离线
// 用户上线:添加到SET,并设置过期时间(需配合心跳机制刷新)
public void userOnline(Long userId) {
redisTemplate.opsForSet().add(ONLINE_USER_KEY, userId.toString());
// 刷新过期时间(用户每操作一次,调用此方法刷新)
redisTemplate.expire(ONLINE_USER_KEY, EXPIRE_SECONDS, TimeUnit.SECONDS);
}
// 用户下线:从SET中删除
public void userOffline(Long userId) {
redisTemplate.opsForSet().remove(ONLINE_USER_KEY, userId.toString());
}
// 获取在线用户数
public Long getOnlineUserCount() {
return redisTemplate.opsForSet().size(ONLINE_USER_KEY);
}
// 获取在线用户列表(前1000条,避免数据量过大)
public Set<String> getOnlineUserList() {
return redisTemplate.opsForSet().members(ONLINE_USER_KEY);
}
// 心跳机制:用户定期调用,刷新在线状态
@Scheduled(fixedRate = 20000) // 每20秒执行一次
public void heartbeat() {
// 刷新SET的过期时间(所有在线用户共享一个过期时间,简化实现)
redisTemplate.expire(ONLINE_USER_KEY, EXPIRE_SECONDS, TimeUnit.SECONDS);
}
}
3. 位运算:用户签到(BITCOUNT)
Redis的SETBIT和BITCOUNT支持位运算,适合存储用户签到记录——1个字节可存储8天的签到状态,1年仅需46字节,极大节省内存。
@Service
public class SignInService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String SIGN_IN_KEY_TEMPLATE = "sign:in:user:%d:year:%d";
// 用户签到:标记某天已签到(1表示签到,0表示未签到)
public boolean signIn(Long userId, LocalDate date) {
int year = date.getYear();
int dayOfYear = date.getDayOfYear() - 1; // 天数从0开始(0表示1月1日)
String key = String.format(SIGN_IN_KEY_TEMPLATE, userId, year);
// SETBIT:设置第dayOfYear位为1(原子操作)
Boolean success = redisTemplate.opsForValue().setBit(key, dayOfYear, true);
// 设置过期时间:保留2年(避免历史数据占用内存)
redisTemplate.expire(key, 2, TimeUnit.YEARS);
return success != null && !success; // 返回false表示当天已签到,true表示首次签到
}
// 查询某天是否签到
public boolean isSignedIn(Long userId, LocalDate date) {
int year = date.getYear();
int dayOfYear = date.getDayOfYear() - 1;
String key = String.format(SIGN_IN_KEY_TEMPLATE, userId, year);
// GETBIT:获取第dayOfYear位的值
Boolean isSigned = redisTemplate.opsForValue().getBit(key, dayOfYear);
return isSigned != null && isSigned;
}
// 查询某段时间内的签到次数
public Long getSignInCount(Long userId, LocalDate startDate, LocalDate endDate) {
if (startDate.getYear() != endDate.getYear()) {
throw new RuntimeException("暂不支持跨年度查询");
}
int year = startDate.getYear();
int startDay = startDate.getDayOfYear() - 1;
int endDay = endDate.getDayOfYear() - 1;
String key = String.format(SIGN_IN_KEY_TEMPLATE, userId, year);
// BITCOUNT:统计startDay到endDay之间的1的个数(签到次数)
return redisTemplate.execute((RedisCallback<Long>) connection ->
connection.bitCount(key.getBytes(), startDay, endDay)
);
}
// 查询某月的签到日历(返回签到的天数列表)
public List<Integer> getSignInCalendar(Long userId, int year, int month) {
LocalDate monthStart = LocalDate.of(year, month, 1);
LocalDate monthEnd = monthStart.plusMonths(1).minusDays(1);
int startDay = monthStart.getDayOfYear() - 1;
int endDay = monthEnd.getDayOfYear() - 1;
String key = String.format(SIGN_IN_KEY_TEMPLATE, userId, year);
List<Integer> signDays = new ArrayList<>();
for (int day = startDay; day <= endDay; day++) {
Boolean isSigned = redisTemplate.opsForValue().getBit(key, day);
if (isSigned != null && isSigned) {
// 转换为当月的天数(如startDay是31,monthStart是2月1日,则当月天数是1)
int dayOfMonth = monthStart.plusDays(day - startDay).getDayOfMonth();
signDays.add(dayOfMonth);
}
}
return signDays;
}
}
八、老码农的10条血泪经验(避坑指南)
- 数据结构优先于命令:Redis的核心是数据结构(List/Set/ZSET/GEO),选对数据结构比记住命令更重要(如排行榜用ZSET,而非List排序);
- key命名规范:用“业务类型:资源:ID:属性”格式(如
stock:goods:1001:remaining),避免key冲突,便于维护; - 过期时间必设:除了持久化数据(如用户Session),所有临时数据(计数器、排行榜、队列)必须加过期时间,避免内存泄漏;
- 避免大key:单key的value不超过10KB(如大JSON、长列表),否则会阻塞Redis主线程,可拆分大key为多个小key;
- 集群分片注意热点:热点key(如秒杀商品的库存key)会导致某一分片压力过大,需用“key+随机后缀”分散(如
seckill:stock:1001:shard:2); - 批量操作减少网络请求:用
mget/mset/addAll等批量命令,避免循环调用单条命令(减少Redis与应用的网络交互次数); - 事务与Lua脚本:复杂业务(如秒杀防超卖)用Lua脚本实现原子操作,比
MULTI/EXEC更灵活,且支持条件判断; - 监控比调优重要:用Prometheus+Grafana监控Redis的核心指标(内存使用率、命中率、碎片率、QPS),内存碎片率超过1.5时需重启Redis;
- 持久化配置:生产环境建议开启AOF+RDB混合持久化(AOF保证数据不丢失,RDB用于快速恢复);
- 不要过度依赖Redis:Redis适合做“辅助工具”,核心业务数据(如订单、用户信息)仍需存储在数据库,避免Redis宕机导致业务中断。
总结:Redis的核心价值
Redis之所以成为后端开发的“瑞士军刀”,并非因为它能替代所有工具,而是因为它用简单的架构、灵活的数据结构、极致的性能,解决了分布式系统中的“小而美”的问题——从缓存到分布式锁,从计数器到排行榜,从消息队列到地理位置查询,Redis用少量代码就能实现复杂功能,这才是它的真正魅力。
最后送给大家一句话:Redis的强大,不在于它能做多少事,而在于你是否真正理解它的数据结构,并用对场景。别再只把它当缓存用了,多尝试这些“不务正业”的玩法,你会发现很多难题都能迎刃而解。
除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接
文章评论