|
@@ -0,0 +1,122 @@
|
|
|
+package com.wtkj.handler;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
|
|
+import com.wtkj.config.KafkaConstant;
|
|
|
+import com.wtkj.entity.FileAndFolder;
|
|
|
+import com.wtkj.handler.es.Document;
|
|
|
+import com.wtkj.service.IFileAndFolderService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.elasticsearch.action.index.IndexRequest;
|
|
|
+import org.elasticsearch.action.search.SearchRequest;
|
|
|
+import org.elasticsearch.action.search.SearchResponse;
|
|
|
+import org.elasticsearch.action.update.UpdateRequest;
|
|
|
+import org.elasticsearch.client.RequestOptions;
|
|
|
+import org.elasticsearch.client.RestHighLevelClient;
|
|
|
+import org.elasticsearch.common.xcontent.XContentType;
|
|
|
+import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
|
+import org.elasticsearch.index.query.QueryBuilders;
|
|
|
+import org.elasticsearch.index.query.TermQueryBuilder;
|
|
|
+import org.elasticsearch.search.SearchHits;
|
|
|
+import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.kafka.support.Acknowledgment;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import static com.wtkj.config.MagicValue.PROJECT_RESOURCE_INDEX;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author Blizzard
|
|
|
+ * @create at 2023-09-27 11:31
|
|
|
+ * @describe
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class EsFileHandler {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private RestHighLevelClient client;
|
|
|
+ @Resource
|
|
|
+ private IFileAndFolderService fileAndFolderService;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 文件上传转为图片以后通知消息 --->保存至es
|
|
|
+ */
|
|
|
+ @KafkaListener(groupId = "parse-file-group", topics = KafkaConstant.PARSE_SUCCESS_MESSAGE, containerFactory = "ackContainerFactory")
|
|
|
+ public void handSaveFileToEs(ConsumerRecord record, Acknowledgment ack) {
|
|
|
+ try {
|
|
|
+ String message = (String) record.value();
|
|
|
+ log.info("收到文件解析成功消息,即将处理存入es逻辑:{}", message);
|
|
|
+
|
|
|
+ JSONObject msg = (JSONObject) JSONObject.parse(message);
|
|
|
+ Long fileId = msg.getLong("fileId");
|
|
|
+ String content = msg.getString("content");
|
|
|
+ Integer currentPage = msg.getInteger("currentPage");
|
|
|
+
|
|
|
+ LambdaQueryWrapper<FileAndFolder> lqw = new LambdaQueryWrapper<>();
|
|
|
+ lqw.eq(FileAndFolder::getBladeFileId, fileId);
|
|
|
+ lqw.eq(FileAndFolder::getType, 1);
|
|
|
+ List<FileAndFolder> files = fileAndFolderService.list(lqw);
|
|
|
+ if (!CollectionUtils.isEmpty(files)) {
|
|
|
+ for (FileAndFolder file : files) {
|
|
|
+ Long projectId = file.getProjectId();
|
|
|
+ //使用自定义类作为数据存储容器
|
|
|
+ Document document = new Document();
|
|
|
+ document.setId(IdWorker.getIdStr());
|
|
|
+ document.setContent(content);
|
|
|
+ document.setFileId(fileId);
|
|
|
+ document.setCurrentPage(currentPage);
|
|
|
+ document.setProjectId(projectId);
|
|
|
+
|
|
|
+ //存在即更新 不存在即插入
|
|
|
+ SearchRequest request = new SearchRequest();
|
|
|
+ request.indices(PROJECT_RESOURCE_INDEX);
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
|
+ BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
|
|
|
+ .must(new TermQueryBuilder("projectId", projectId))
|
|
|
+ .must(new TermQueryBuilder("fileId", fileId))
|
|
|
+ .must(new TermQueryBuilder("currentPage", currentPage));
|
|
|
+ searchSourceBuilder.query(boolQuery);
|
|
|
+ searchSourceBuilder.from((0));
|
|
|
+ searchSourceBuilder.size(1);
|
|
|
+ request.source(searchSourceBuilder);
|
|
|
+ SearchResponse response = client.search(request, RequestOptions.DEFAULT);
|
|
|
+ if (response != null) {
|
|
|
+ SearchHits hits = response.getHits();
|
|
|
+ int size = hits.getHits().length;
|
|
|
+ if (size > 0) {
|
|
|
+ //更新
|
|
|
+ for (int i = 0; i < hits.getHits().length; i++) {
|
|
|
+ String docId = hits.getHits()[i].getId();
|
|
|
+ UpdateRequest updateRequest = new UpdateRequest(PROJECT_RESOURCE_INDEX, docId);
|
|
|
+ updateRequest.doc(JSONObject.toJSONString(document), XContentType.JSON);
|
|
|
+ client.update(updateRequest, RequestOptions.DEFAULT);
|
|
|
+ //log.info("文档更新操作结果:" + updateResponse.toString());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //创建文档
|
|
|
+ IndexRequest indexRequest = new IndexRequest(PROJECT_RESOURCE_INDEX);
|
|
|
+ indexRequest.id(document.getId());
|
|
|
+ indexRequest.timeout("1s");
|
|
|
+ //将数据放入请求,将对象装为json格式
|
|
|
+ indexRequest.source(JSONObject.toJSONString(document), XContentType.JSON);
|
|
|
+ client.index(indexRequest, RequestOptions.DEFAULT);
|
|
|
+ //log.info("文档插入操作结果:" + putResponse.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("文件存入es异常:" + e.getMessage());
|
|
|
+ } finally {
|
|
|
+ ack.acknowledge();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|