新增:私聊消息发送成功后保存会话列表

This commit is contained in:
安浩浩 2024-03-16 11:57:52 +08:00
parent 77f3131ef3
commit ff88d53b3b
11 changed files with 152 additions and 59 deletions

View File

@ -13,7 +13,8 @@ import lombok.Getter;
public enum ImConversationTypeEnum { public enum ImConversationTypeEnum {
PRIVATE(1, "单聊"), PRIVATE(1, "单聊"),
GROUP(2, "群聊"); GROUP(2, "群聊"),
NOTICE(4, "通知会话");
/** /**
* 类型 * 类型

View File

@ -24,7 +24,7 @@ public class ImConversationPageReqVO extends PageParam {
private Integer conversationType; private Integer conversationType;
@Schema(description = "单聊时,用户编号;群聊时,群编号", example = "21454") @Schema(description = "单聊时,用户编号;群聊时,群编号", example = "21454")
private String targetId; private Long targetId;
@Schema(description = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId") @Schema(description = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId")
private String no; private String no;

View File

@ -26,7 +26,7 @@ public class ImConversationRespVO {
@Schema(description = "单聊时,用户编号;群聊时,群编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "21454") @Schema(description = "单聊时,用户编号;群聊时,群编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "21454")
@ExcelProperty("单聊时,用户编号;群聊时,群编号") @ExcelProperty("单聊时,用户编号;群聊时,群编号")
private String targetId; private Long targetId;
@Schema(description = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId", requiredMode = Schema.RequiredMode.REQUIRED) @Schema(description = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId") @ExcelProperty("会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId")

View File

@ -24,7 +24,7 @@ public class ImConversationSaveReqVO {
@Schema(description = "单聊时,用户编号;群聊时,群编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "21454") @Schema(description = "单聊时,用户编号;群聊时,群编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "21454")
@NotEmpty(message = "单聊时,用户编号;群聊时,群编号不能为空") @NotEmpty(message = "单聊时,用户编号;群聊时,群编号不能为空")
private String targetId; private Long targetId;
@Schema(description = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId", requiredMode = Schema.RequiredMode.REQUIRED) @Schema(description = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId", requiredMode = Schema.RequiredMode.REQUIRED)
@NotEmpty(message = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId不能为空") @NotEmpty(message = "会话标志 单聊s_{userId}_{targetId},需要排序 userId 和 targetId 群聊g_groupId不能为空")

View File

@ -39,7 +39,7 @@ public class ImConversationDO extends BaseDO {
/** /**
* 单聊时用户编号群聊时群编号 * 单聊时用户编号群聊时群编号
*/ */
private String targetId; private Long targetId;
/** /**
* 会话标志 单聊s_{userId}_{targetId}需要排序 userId targetId 群聊g_groupId * 会话标志 单聊s_{userId}_{targetId}需要排序 userId targetId 群聊g_groupId
*/ */

View File

@ -52,4 +52,11 @@ public interface ImConversationService {
PageResult<ImConversationDO> getConversationPage(ImConversationPageReqVO pageReqVO); PageResult<ImConversationDO> getConversationPage(ImConversationPageReqVO pageReqVO);
/**
* 保存私聊会话
*
* @param fromUserId 发送者
* @param receiverId 接收者
*/
void savePrivateConversation(Long fromUserId, Long receiverId);
} }

View File

@ -6,6 +6,8 @@ import cn.iocoder.yudao.module.im.controller.admin.conversation.vo.ImConversatio
import cn.iocoder.yudao.module.im.controller.admin.conversation.vo.ImConversationSaveReqVO; import cn.iocoder.yudao.module.im.controller.admin.conversation.vo.ImConversationSaveReqVO;
import cn.iocoder.yudao.module.im.dal.dataobject.conversation.ImConversationDO; import cn.iocoder.yudao.module.im.dal.dataobject.conversation.ImConversationDO;
import cn.iocoder.yudao.module.im.dal.mysql.conversation.ConversationMapper; import cn.iocoder.yudao.module.im.dal.mysql.conversation.ConversationMapper;
import cn.iocoder.yudao.module.im.enums.conversation.ImConversationTypeEnum;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
@ -67,4 +69,28 @@ public class ImConversationServiceImpl implements ImConversationService {
return conversationMapper.selectPage(pageReqVO); return conversationMapper.selectPage(pageReqVO);
} }
@Override
public void savePrivateConversation(Long fromUserId, Long receiverId) {
// 创建并保存会话
createAndSaveConversation(fromUserId, receiverId);
createAndSaveConversation(receiverId, fromUserId);
}
private void createAndSaveConversation(Long userId, Long targetId) {
// 创建会话
ImConversationDO conversation = new ImConversationDO();
conversation.setUserId(userId);
conversation.setConversationType(ImConversationTypeEnum.PRIVATE.getType());
conversation.setTargetId(targetId);
conversation.setNo("s_" + userId + "_" + targetId);
conversation.setPinned(false);
// 根据 no 查询是否存在,不存在则新增
QueryWrapper<ImConversationDO> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("no", conversation.getNo());
if (conversationMapper.selectOne(queryWrapper) == null) {
conversationMapper.insert(conversation);
}
}
} }

View File

@ -60,7 +60,7 @@ public interface ImMessageService {
* @param fromUserId 发送人编号 * @param fromUserId 发送人编号
* @return id * @return id
*/ */
Long savePrivateMessage(ImSendMessage imSendMessage, Long fromUserId); ImMessageDO savePrivateMessage(ImSendMessage imSendMessage, Long fromUserId);
/** /**
* 更新消息状态 * 更新消息状态

View File

@ -76,24 +76,25 @@ public class ImMessageServiceImpl implements ImMessageService {
@Override @Override
public Long savePrivateMessage(ImSendMessage message, Long senderId) { public ImMessageDO savePrivateMessage(ImSendMessage message, Long senderId) {
ImMessageSaveReqVO imMessageSaveReqVO = new ImMessageSaveReqVO(); ImMessageDO imMessageDO = new ImMessageDO();
imMessageSaveReqVO.setClientMessageId(message.getClientMessageId()); imMessageDO.setClientMessageId(message.getClientMessageId());
imMessageSaveReqVO.setSenderId(senderId); imMessageDO.setSenderId(senderId);
imMessageSaveReqVO.setReceiverId(message.getReceiverId()); imMessageDO.setReceiverId(message.getReceiverId());
//查询发送人昵称和发送人头像 //查询发送人昵称和发送人头像
AdminUserRespDTO user = adminUserApi.getUser(senderId); AdminUserRespDTO user = adminUserApi.getUser(senderId);
imMessageSaveReqVO.setSenderNickname(user.getNickname()); imMessageDO.setSenderNickname(user.getNickname());
imMessageSaveReqVO.setSenderAvatar(user.getAvatar()); imMessageDO.setSenderAvatar(user.getAvatar());
imMessageSaveReqVO.setConversationType(message.getConversationType()); imMessageDO.setConversationType(message.getConversationType());
imMessageSaveReqVO.setContentType(message.getContentType()); imMessageDO.setContentType(message.getContentType());
imMessageSaveReqVO.setConversationNo(senderId + "_" + message.getReceiverId()); imMessageDO.setConversationNo(senderId + "_" + message.getReceiverId());
imMessageSaveReqVO.setContent(message.getContent()); imMessageDO.setContent(message.getContent());
//消息来源 100-用户发送200-系统发送一般是通知不能为空 //消息来源 100-用户发送200-系统发送一般是通知不能为空
imMessageSaveReqVO.setSendFrom(100); imMessageDO.setSendFrom(100);
imMessageSaveReqVO.setSendTime(TimeUtil.now()); imMessageDO.setSendTime(TimeUtil.now());
imMessageSaveReqVO.setMessageStatus(ImMessageStatusEnum.SENDING.getStatus()); imMessageDO.setMessageStatus(ImMessageStatusEnum.SENDING.getStatus());
return createMessage(imMessageSaveReqVO); imMessageMapper.insert(imMessageDO);
return imMessageDO;
} }
@Override @Override

View File

@ -3,9 +3,9 @@ package cn.iocoder.yudao.module.im.websocket;
import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils;
import cn.iocoder.yudao.module.im.controller.admin.inbox.vo.ImInboxSaveReqVO; import cn.iocoder.yudao.module.im.controller.admin.inbox.vo.ImInboxSaveReqVO;
import cn.iocoder.yudao.module.im.dal.dataobject.message.ImMessageDO;
import cn.iocoder.yudao.module.im.dal.redis.inbox.SequenceGeneratorRedisDao; import cn.iocoder.yudao.module.im.dal.redis.inbox.SequenceGeneratorRedisDao;
import cn.iocoder.yudao.module.im.enums.conversation.ImConversationTypeEnum; import cn.iocoder.yudao.module.im.enums.conversation.ImConversationTypeEnum;
import cn.iocoder.yudao.module.im.enums.message.ImMessageStatusEnum; import cn.iocoder.yudao.module.im.enums.message.ImMessageStatusEnum;
@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
import java.util.List;
/** /**
* WebSocket im * WebSocket im
* *
@ -31,52 +29,97 @@ import java.util.List;
public class ImWebSocketMessageListener implements WebSocketMessageListener<ImSendMessage> { public class ImWebSocketMessageListener implements WebSocketMessageListener<ImSendMessage> {
@Resource @Resource
private WebSocketMessageSender webSocketMessageSender; private WebSocketMessageSender webSocketMessageSender; // WebSocket消息发送器
@Resource @Resource
private ImMessageService imMessageService; private ImMessageService imMessageService; // IM消息服务
@Resource @Resource
private ImConversationService imConversationService; private ImConversationService imConversationService; // IM会话服务
@Resource @Resource
private ImInboxService imInboxService; private ImInboxService imInboxService; // IM收件箱服务
@Resource @Resource
private SequenceGeneratorRedisDao sequenceGeneratorRedisDao; private SequenceGeneratorRedisDao sequenceGeneratorRedisDao; // 序列生成器Redis DAO
@Resource
private WebSocketSessionManager webSocketSessionManager;
/**
* 处理WebSocket消息
*
* @param session WebSocket会话
* @param message 发送的IM消息
*/
@Override @Override
public void onMessage(WebSocketSession session, ImSendMessage message) { public void onMessage(WebSocketSession session, ImSendMessage message) {
Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session); Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session); // 获取登录用户ID
//1插入消息表
Long messageId = imMessageService.savePrivateMessage(message, fromUserId);
// 私聊 // 如果是私人消息处理私人消息
if (message.getConversationType().equals(ImConversationTypeEnum.PRIVATE.getType())) { if (message.getConversationType().equals(ImConversationTypeEnum.PRIVATE.getType())) {
//2插入收件箱表私聊两条群聊每个群有一条 ImMessageDO imMessageDO = imMessageService.savePrivateMessage(message, fromUserId); // 保存私人消息
imInboxService.createInbox(new ImInboxSaveReqVO(message.getReceiverId(), messageId, sequenceGeneratorRedisDao.generateSequence(message.getReceiverId()))); handlePrivateMessage(fromUserId, imMessageDO, message);
imInboxService.createInbox(new ImInboxSaveReqVO(fromUserId, messageId, sequenceGeneratorRedisDao.generateSequence(fromUserId)));
//3推送消息
// 3.1判断是否在线
List<WebSocketSession> sessions = (List<WebSocketSession>) webSocketSessionManager.getSessionList(UserTypeEnum.ADMIN.getValue(), message.getReceiverId());
if (sessions.isEmpty()) {
//更新消息状态,为发送失败
imMessageService.updateMessageStatus(messageId, ImMessageStatusEnum.FAILURE.getStatus());
return;
}
//3.2发送
ImReceiveMessage toMessage = new ImReceiveMessage();
toMessage.setFromId(fromUserId);
toMessage.setConversationType(ImConversationTypeEnum.PRIVATE.getType());
toMessage.setContentType(message.getContentType());
toMessage.setContent(message.getContent());
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getReceiverId(), // 给指定用户
"im-message-receive", toMessage);
//4更新消息状态,为发送成功
imMessageService.updateMessageStatus(messageId, ImMessageStatusEnum.SUCCESS.getStatus());
} }
} }
/**
* 处理私人消息
*
* @param fromUserId 发送者用户ID
* @param imMessageDO IM消息数据对象
* @param message 发送的IM消息
*/
private void handlePrivateMessage(Long fromUserId, ImMessageDO imMessageDO, ImSendMessage message) {
Long fromUserSequence = sequenceGeneratorRedisDao.generateSequence(fromUserId); // 生成发送者序列
Long fromUserInboxId = createAndSaveInbox(fromUserId, imMessageDO.getId(), fromUserSequence); // 创建并保存发送者收件箱
Long receiverSequence = sequenceGeneratorRedisDao.generateSequence(fromUserId); // 生成接收者序列
Long receiverInboxId = createAndSaveInbox(message.getReceiverId(), imMessageDO.getId(), receiverSequence); // 创建并保存接收者收件箱
// 发送消息给接收者和发送者
sendMessage(fromUserId, receiverInboxId, imMessageDO, message, "im-message-receive", fromUserSequence);
sendMessage(fromUserId, fromUserInboxId, imMessageDO, message, "im-message-receive", receiverSequence);
// 更新消息状态为成功
imMessageService.updateMessageStatus(imMessageDO.getId(), ImMessageStatusEnum.SUCCESS.getStatus());
// 保存私人会话
imConversationService.savePrivateConversation(fromUserId, message.getReceiverId());
}
/**
* 创建并保存收件箱
*
* @param userId 用户ID
* @param messageId 消息ID
* @param sequence 序列
* @return 收件箱ID
*/
private Long createAndSaveInbox(Long userId, Long messageId, Long sequence) {
ImInboxSaveReqVO inboxSaveReqVO = new ImInboxSaveReqVO(userId, messageId, sequence); // 创建收件箱保存请求VO
return imInboxService.createInbox(inboxSaveReqVO); // 创建收件箱
}
/**
* 发送消息
*
* @param fromUserId 发送者用户ID
* @param inboxId 收件箱ID
* @param imMessageDO IM消息数据对象
* @param message 发送的IM消息
* @param messageType 消息类型
* @param sequence 序列
*/
private void sendMessage(Long fromUserId, Long inboxId, ImMessageDO imMessageDO, ImSendMessage message, String messageType, Long sequence) {
ImReceiveMessage receiveMessage = new ImReceiveMessage(); // 创建接收消息
receiveMessage.setFromId(fromUserId); // 设置发送者ID
receiveMessage.setConversationType(ImConversationTypeEnum.PRIVATE.getType()); // 设置会话类型为私人
receiveMessage.setContentType(message.getContentType()); // 设置内容类型
receiveMessage.setContent(message.getContent()); // 设置内容
receiveMessage.setMessageId(imMessageDO.getId()); // 设置消息ID
receiveMessage.setInboxId(inboxId); // 设置收件箱ID
receiveMessage.setSendTime(imMessageDO.getSendTime()); // 设置发送时间
receiveMessage.setSequence(sequence); // 设置序列
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), fromUserId, messageType, receiveMessage); // 发送消息
}
/**
* 获取类型
*
* @return 类型
*/
@Override @Override
public String getType() { public String getType() {
return "im-message-send"; return "im-message-send";

View File

@ -2,8 +2,11 @@ package cn.iocoder.yudao.module.im.websocket.message;
import cn.iocoder.yudao.module.im.dal.dataobject.message.body.ImMessageBody; import cn.iocoder.yudao.module.im.dal.dataobject.message.body.ImMessageBody;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Data; import lombok.Data;
import java.time.LocalDateTime;
@Schema(description = "管理后台 - 消息发送 receive") @Schema(description = "管理后台 - 消息发送 receive")
@Data @Data
public class ImReceiveMessage { public class ImReceiveMessage {
@ -20,4 +23,16 @@ public class ImReceiveMessage {
@Schema(description = "消息内容", requiredMode = Schema.RequiredMode.REQUIRED) @Schema(description = "消息内容", requiredMode = Schema.RequiredMode.REQUIRED)
private String content; private String content;
@Schema(description = "消息编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "12454")
private Long messageId;
@Schema(description = "发送时间", requiredMode = Schema.RequiredMode.REQUIRED)
private LocalDateTime sendTime;
@Schema(description = "收件箱编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "18389")
private Long inboxId;
@Schema(description = "序号,按照 user 递增", requiredMode = Schema.RequiredMode.REQUIRED)
private Long sequence;
} }