作者:Eddy  历史版本:1  最后编辑:龚清  更新时间:2024-09-20 15:13

业务开发如何支持ES检索

适用于v3.8.8+
该方案是延时同步,列表数据会根据logstash设置的数据同步时间出现延时,即数据不会马上在列表中展示;

软件及配置准备

  • Elasticsearch 6.8.1
  • Logstash 6.8.1
  • ibps-logstash-client.jar,用于接收IBPS推送出来的logstash配置文件并添加到logstashpipelines.yml配置文件中;
  • RabbitMQ3.x,配置文件推送使用的是MQ广播,暂时只支持RabbitMQ,它与ibps使用的是同一个MQ同一个用户同一个虚拟主机路径;
  • 数据库驱动文件,本文以mysql为例;
  • ik分词配置文件logstash.ik.json

软件安装

Elasticsearch安装

http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv0uf532kaj

Logstash安装

http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv11f1sq13t

RabbitMQ3安装

http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkinltsmcie9

ibps-logstash-client.jar安装

如果logstash有多台服务器,那么需要修改源码工程ibps-provider-logstash-clientcom.lc.ibps.logstash.client.rabbitmq.consumer.RabbitLogstashQueueConsumer的注解@Queue,广播消息需要同步配置到每一个logstash服务器中并保存对应的数据同步配置文件;

  • 该程序是ibps提供的一个客户端程序,直接打包源码工程ibps-provider-logstash-client即可;

  • 配置参数(通过环境变量方式设置,以windows为例)

    • logstash管道配置文件路径,LOGSTASH_PIPELINES=D:\docker\logstash\conf.6.8.1\pipelines.yml
    • logstash同步文件目录(即ibps在数据模版功能中生成的logstash同步mysql数据至elasticsearch的配置文件存放的目录),LOGSTASH_PIPELINES_DIR=D:\docker\logstash\conf.6.8.1\logstash\mysql
    • RabbitMQ主机IP,SPRING_RABBITMQ_HOST=192.168.3.220
    • RabbitMQ用户名,SPRING_RABBITMQ_USERNAME=ibpsoptimize
    • RabbitMQ密码,SPRING_RABBITMQ_PASSWORD=ibpsoptimize
    • RabbitMQ虚拟主机路径,SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
  • 客户端的启动脚本start.batstart.sh

    • start.bat

      @echo off
      set LOGSTASH_PIPELINES=D:\\docker\\logstash\\conf.6.8.1\\pipelines.yml
      set LOGSTASH_PIPELINES_DIR=D:\\docker\\logstash\\conf.6.8.1\\logstash\\mysql
      set SPRING_RABBITMQ_HOST=192.168.3.118
      set SPRING_RABBITMQ_USERNAME=ibpsoptimize
      set SPRING_RABBITMQ_PASSWORD=ibpsoptimize
      set SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
      java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ibps-logstash-client.jar
      pause
    • start.sh

      #!/bin/bash
      shome=$(cd `dirname $0`; pwd)
      # set variables
      export LOGSTASH_PIPELINES=/opt/docker/logstash-es/logstash_conf/pipelines.yml
      export LOGSTASH_PIPELINES_DIR=/opt/docker/logstash-es/logstash_conf/logstash/mysql
      export SPRING_RABBITMQ_HOST=192.168.3.220
      export SPRING_RABBITMQ_USERNAME=ibpsoptimize
      export SPRING_RABBITMQ_PASSWORD=ibpsoptimize
      export SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
      # run client
      java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ${shome}/ibps-logstash-client.jar >/dev/null 2>&1 &

数据库设计需要添加固定字段逻辑删除

  • 默认使用字段deleted_,属性deleted
  • 可以根据自己的需求调整字段名称;

仓库实现类实现Elasticsearch相关方法

  • 实现参数方法getPoClass

      @Override
      protected Class<PartyEmployeePo> getPoClass() {
          return PartyEmployeePo.class;
      }
  • 实现索引获取方法getInternalElasticsearchIndex

      @Override
      public String getInternalElasticsearchIndex() {
          return "ibps_party_employee";
      }

Mapper文件中resultMap的ID属性一定要设置为实体类的简单类名

以员工employee列表为例;

    <resultMap id="PartyEmployeePo" type="com.lc.ibps.org.party.persistence.entity.PartyEmployeePo">
        <id property="id" column="ID_" jdbcType="VARCHAR"/>
        <result property="name" column="NAME_" jdbcType="VARCHAR"/>
        <result property="status" column="STATUS_" jdbcType="VARCHAR"/>
        <result property="profile" column="PROFILE_" jdbcType="VARCHAR"/>
        <result property="gender" column="GENDER_" jdbcType="VARCHAR"/>
        <result property="email" column="EMAIL_" jdbcType="VARCHAR"/>
        <result property="address" column="ADDRESS_" jdbcType="VARCHAR"/>
        <result property="mobile" column="MOBILE_" jdbcType="VARCHAR"/>
        <result property="qq" column="QQ_" jdbcType="VARCHAR"/>
        <result property="photo" column="PHOTO_" jdbcType="VARCHAR"/>
        <result property="positions" column="POSITIONS_" jdbcType="VARCHAR"/>
        <result property="groupID" column="GROUP_ID_" jdbcType="VARCHAR"/>
        <result property="createTime" column="CREATE_TIME_" jdbcType="TIMESTAMP"/>
        <result property="updateTime" column="UPDATE_TIME_" jdbcType="TIMESTAMP"/>
        <result property="wcAccount" column="WC_ACCOUNT_" jdbcType="VARCHAR"/>
        <result property="tenantId" column="TENANT_ID_" jdbcType="VARCHAR"/>

        <result property="orgName" column="ORG_NAME_" jdbcType="VARCHAR"/>
        <result property="account" column="ACCOUNT_" jdbcType="VARCHAR"/>
        <result property="isSuper" column="IS_SUPER_" jdbcType="VARCHAR"/>
    </resultMap>

手动编写logstash数据同步配置文件

input {
  jdbc {    
    jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-5.1.36.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.3.240:3306/ibps_boot_v3_optimize_05?useUnicode=true&characterEncoding=utf-8&autoReconnect=true"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "100000"
    schedule => "* * * * *"
    statement => "SELECT    * FROM IBPS_PARTY_EMPLOYEE LEFT JOIN ( SELECT ID_ USER_ID_, ACCOUNT_, IS_SUPER_ FROM IBPS_PARTY_USER ) A ON USER_ID_ = ID_ LEFT JOIN ( SELECT ID_ ORG_ID_, NAME_ ORG_NAME_ FROM IBPS_PARTY_ORG ) B ON ORG_ID_ = GROUP_ID_ WHERE update_time_ >= :sql_last_value order by update_time_ asc"
    type => "jdbc"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time_"
    record_last_run => true
    lowercase_column_names => true
    last_run_metadata_path => "/usr/share/logstash/data/sync_point_of_ibps_party_employee"
  }
}

filter {
    ruby {
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    ruby {
        code => "event.set('create_time_', event.get('create_time_').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('update_time_', event.get('update_time_').time.localtime + 8*60*60)" 
    }
}

output {
    elasticsearch {
        hosts => ["192.168.3.118:9200"]
        index => "ibps_party_employee"
        document_id => "%{id_}"
        document_type => "ibps_party_employee"
        template_overwrite => true
        template => "/usr/share/logstash/pipeline/logstash.ik.json"
    }
    stdout {
        codec => json_lines
    }
}

IBPS启用Elasticsearch

以员工employee列表为例;
业务配置key规则elasticsearch.[repository实现类完整类型].[method方法名].enabled

  • 启用全局配置com.lc.db.elasticsearch.enabled:true
  • 启用业务列表的ES检索配置elasticsearch.com.lc.ibps.org.party.repository.impl.PartyEmployeeRepositoryImpl.query.enabled: true