业务开发如何支持ES检索
适用于
v3.8.8+
;
该方案是延时同步
,列表数据会根据logstash设置的数据同步时间
出现延时,即数据不会马上
在列表中展示;
软件及配置准备
- Elasticsearch
6.8.1
; - Logstash
6.8.1
; ibps-logstash-client.jar
,用于接收IBPS
推送出来的logstash配置文件
并添加到logstash
的pipelines.yml
配置文件中;- RabbitMQ3.x,配置文件推送使用的是MQ广播,
暂时只支持RabbitMQ
,它与ibps使用的是同一个MQ同一个用户同一个虚拟主机路径; - 数据库驱动文件,本文以
mysql
为例; - ik分词配置文件
logstash.ik.json
;
软件安装
Elasticsearch安装
http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv0uf532kaj
Logstash安装
http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkv11f1sq13t
RabbitMQ3安装
http://doc.bpmhome.cn/docs/ibps_v3_deploy/ibps_v3_deploy-1bkinltsmcie9
ibps-logstash-client.jar安装
如果logstash有多台服务器,那么需要修改源码工程
ibps-provider-logstash-client
中com.lc.ibps.logstash.client.rabbitmq.consumer.RabbitLogstashQueueConsumer
的注解@Queue
,广播消息需要同步配置到每一个logstash服务器中并保存对应的数据同步配置文件;
该程序是ibps提供的一个客户端程序,直接打包源码工程
ibps-provider-logstash-client
即可;配置参数(通过环境变量方式设置,以
windows
为例)- logstash管道配置文件路径,LOGSTASH_PIPELINES=D:\docker\logstash\conf.6.8.1\pipelines.yml
- logstash同步文件目录(即ibps在数据模版功能中生成的logstash同步mysql数据至elasticsearch的配置文件存放的目录),LOGSTASH_PIPELINES_DIR=D:\docker\logstash\conf.6.8.1\logstash\mysql
- RabbitMQ主机IP,SPRING_RABBITMQ_HOST=192.168.3.220
- RabbitMQ用户名,SPRING_RABBITMQ_USERNAME=ibpsoptimize
- RabbitMQ密码,SPRING_RABBITMQ_PASSWORD=ibpsoptimize
- RabbitMQ虚拟主机路径,SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize
客户端的启动脚本
start.bat
、start.sh
;start.bat
@echo off set LOGSTASH_PIPELINES=D:\\docker\\logstash\\conf.6.8.1\\pipelines.yml set LOGSTASH_PIPELINES_DIR=D:\\docker\\logstash\\conf.6.8.1\\logstash\\mysql set SPRING_RABBITMQ_HOST=192.168.3.118 set SPRING_RABBITMQ_USERNAME=ibpsoptimize set SPRING_RABBITMQ_PASSWORD=ibpsoptimize set SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ibps-logstash-client.jar pause
start.sh
#!/bin/bash shome=$(cd `dirname $0`; pwd) # set variables export LOGSTASH_PIPELINES=/opt/docker/logstash-es/logstash_conf/pipelines.yml export LOGSTASH_PIPELINES_DIR=/opt/docker/logstash-es/logstash_conf/logstash/mysql export SPRING_RABBITMQ_HOST=192.168.3.220 export SPRING_RABBITMQ_USERNAME=ibpsoptimize export SPRING_RABBITMQ_PASSWORD=ibpsoptimize export SPRING_RABBITMQ_VIRTUAL_HOST=/ibpsoptimize # run client java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Duser.timezone=GMT+8 -XX:SurvivorRatio=8 -Xms128m -Xmx128m -Xmn80m -jar ${shome}/ibps-logstash-client.jar >/dev/null 2>&1 &
数据库设计需要添加固定字段逻辑删除
- 默认使用字段
deleted_
,属性deleted
; - 可以根据自己的需求调整字段名称;
仓库实现类实现Elasticsearch相关方法
实现参数方法
getPoClass
@Override protected Class<PartyEmployeePo> getPoClass() { return PartyEmployeePo.class; }
实现索引获取方法
getInternalElasticsearchIndex
@Override public String getInternalElasticsearchIndex() { return "ibps_party_employee"; }
Mapper文件中resultMap
的ID属性一定要设置为实体类的简单类名
以员工
employee
列表为例;
<resultMap id="PartyEmployeePo" type="com.lc.ibps.org.party.persistence.entity.PartyEmployeePo">
<id property="id" column="ID_" jdbcType="VARCHAR"/>
<result property="name" column="NAME_" jdbcType="VARCHAR"/>
<result property="status" column="STATUS_" jdbcType="VARCHAR"/>
<result property="profile" column="PROFILE_" jdbcType="VARCHAR"/>
<result property="gender" column="GENDER_" jdbcType="VARCHAR"/>
<result property="email" column="EMAIL_" jdbcType="VARCHAR"/>
<result property="address" column="ADDRESS_" jdbcType="VARCHAR"/>
<result property="mobile" column="MOBILE_" jdbcType="VARCHAR"/>
<result property="qq" column="QQ_" jdbcType="VARCHAR"/>
<result property="photo" column="PHOTO_" jdbcType="VARCHAR"/>
<result property="positions" column="POSITIONS_" jdbcType="VARCHAR"/>
<result property="groupID" column="GROUP_ID_" jdbcType="VARCHAR"/>
<result property="createTime" column="CREATE_TIME_" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="UPDATE_TIME_" jdbcType="TIMESTAMP"/>
<result property="wcAccount" column="WC_ACCOUNT_" jdbcType="VARCHAR"/>
<result property="tenantId" column="TENANT_ID_" jdbcType="VARCHAR"/>
<result property="orgName" column="ORG_NAME_" jdbcType="VARCHAR"/>
<result property="account" column="ACCOUNT_" jdbcType="VARCHAR"/>
<result property="isSuper" column="IS_SUPER_" jdbcType="VARCHAR"/>
</resultMap>
手动编写logstash数据同步配置文件
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/pipeline/mysql-connector-java-5.1.36.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.3.240:3306/ibps_boot_v3_optimize_05?useUnicode=true&characterEncoding=utf-8&autoReconnect=true"
jdbc_user => "root"
jdbc_password => "root"
jdbc_paging_enabled => "true"
jdbc_page_size => "100000"
schedule => "* * * * *"
statement => "SELECT * FROM IBPS_PARTY_EMPLOYEE LEFT JOIN ( SELECT ID_ USER_ID_, ACCOUNT_, IS_SUPER_ FROM IBPS_PARTY_USER ) A ON USER_ID_ = ID_ LEFT JOIN ( SELECT ID_ ORG_ID_, NAME_ ORG_NAME_ FROM IBPS_PARTY_ORG ) B ON ORG_ID_ = GROUP_ID_ WHERE update_time_ >= :sql_last_value order by update_time_ asc"
type => "jdbc"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time_"
record_last_run => true
lowercase_column_names => true
last_run_metadata_path => "/usr/share/logstash/data/sync_point_of_ibps_party_employee"
}
}
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
ruby {
code => "event.set('create_time_', event.get('create_time_').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('update_time_', event.get('update_time_').time.localtime + 8*60*60)"
}
}
output {
elasticsearch {
hosts => ["192.168.3.118:9200"]
index => "ibps_party_employee"
document_id => "%{id_}"
document_type => "ibps_party_employee"
template_overwrite => true
template => "/usr/share/logstash/pipeline/logstash.ik.json"
}
stdout {
codec => json_lines
}
}
IBPS启用Elasticsearch
以员工
employee
列表为例;
业务配置key规则elasticsearch.[repository实现类完整类型].[method方法名].enabled
;
- 启用全局配置
com.lc.db.elasticsearch.enabled:true
; - 启用业务列表的
ES检索
配置elasticsearch.com.lc.ibps.org.party.repository.impl.PartyEmployeeRepositoryImpl.query.enabled: true
;