<返回更多

Mysql和Elasticsearch的数据同步

2022-06-30    Java热点
加入收藏

Elasticsearch的数据来自MySQL数据库中,所以当我们的MySQL发生改变时,Elasticsearch也要跟着改变,这时候我们的es的数据就要和mysql同步了

同步实现思路

常见的数据同步方案有三种:

方案一:

Mysql和Elasticsearch的数据同步

 

也就是说MySQL修改完去修改es的数据

方案二

Mysql和Elasticsearch的数据同步

 

方式三:

Mysql和Elasticsearch的数据同步

 

在这里使用的是第二种实现方案:使用MQ来写

同步案例代码

用来操控ES的代码(负责监听MQ队列)

/**
     * 监听增加和修改的队列
     * 因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
     * 所以队列过来的id不管是新增还是修改es都可以判断如果有这个数据id那么就先删除再新增,如果没有这个数据就直接新增,所以新增和修改他俩用一个方法就行了
     *
     * @param id 队列中需要进行操作的id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),
            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
            key = MqConstants.HOTEL_INSERT_KEY
    ))
    public void insertAndUpdate(Long id) {
        if (id == null) {
            return;
        }
        log.info("入参:{}", id);
        //监听到以后拿到id去数据库查询整个数据
        Hotel hotel = iHotelService.getById(id);
        //因为查的mysql数据和es的数据有些不一样所以需要做转换
        HotelDoc hotelDoc = new HotelDoc(hotel);
        //转换为json
        String hotelDocJson = JSON.toJSONString(hotelDoc);
        System.out.println("hotelDocJson = " + hotelDocJson);
        //发送到ES中,因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
        //创建请求语义对象 添加文档数据
        IndexRequest request = new IndexRequest("hotel");
        //这个新增就是PUT在es中
        request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);
        //发送请求
        try {
            IndexResponse response = client.index(request, RequestOptions.DEFAULT);
            RestStatus status = response.status();
            log.info("响应结果为:{}", status);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 监听删除队列
     *
     * @param id 队列中需要进行操作的id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),
            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
            key = MqConstants.HOTEL_DELETE_KEY
    ))
    public void deleteByMqId(Long id) {
        if (id == null) {
            return;
        }
        log.info("入参:{}", id);
        //先创建语义对象,直接就可以给里面写id的字段
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        //发送请求
        try {
            DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
            RestStatus status = response.status();
            log.info("响应结果为:{}", status);
        } catch (IOException e) {
            e.printStackTrace();
        }

用来操作MySQL的代码:

@RestController
@RequestMApping("hotel")
public class HotelController {

    //注入和RabbitMQ链接
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private IHotelService hotelService;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id) {
        return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ) {
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel) {
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel) {
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
    }
}

当然也是可以不使用注解来写,直接在配置文件中写队列绑定的交换机

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>