数据采集 ETL 工具 bboss-datatran v6.7.7 发布,支持 Elasticsearch 8 以及其他 Elasticsearch 低版本和 Opensearch 之间数据同步。

  1. 新增轻量级但功能强大的大数据指标分析计算模块,可以非常方便地实现基于时间窗口的多种维度的实时指标计算和离线指标计算功能,适用于有限维度指标 key 和无限维度指标 key,同时可以非常方便地将指标分析计算结果存储到各种数据库,以极低成本快速构建企业级大数据分析应用,导入以下包即可:
<dependency>
  <groupId>com.bbossgroups.plugins</groupId>
  <artifactId>bboss-datatran-metrics</artifactId>
  <version>6.7.7</version></dependency>

使用案例

https://gitee.com/bboss/bboss-elastic-tran/tree/master/bboss-datatran-core/src/test/java/org/frameworkset/tran/metrics

  1. 增量采集状态表 increament_tab 结构调整,增加以下字段:

    jobType varchar (500),用于保存输入插件类型,不能为空,不同的插件类型对应的值见下面的表格说明,避免不同类型作业加载增量状态时,互相干扰;

    jobId varchar (500),用于保存外部设置的作业 id,可为空,相同的进程内启动多个同类型的输入插件的作业时,必须指定每个作业的 jobId, 避免各个作业加载增量状态时,互相干扰

插件类型jobTypejobId
HttpInputDataTranPluginHttpInputDataTranPlugin
DBInputDataTranPluginDBInputDataTranPlugin
ElasticsearchInputDataTranPluginElasticsearchInputDataTranPlugin
FileInputDataTranPluginFileInputDataTranPlugin
HBaseInputDatatranPluginHBaseInputDatatranPlugin
Kafka2InputDatatranPluginKafka2InputDatatranPlugin
MongoDBInputDatatranPluginMongoDBInputDatatranPlugin

升级注意事项:升级 6.7.7 前,需要手动增加 jobType 和 jobId 两个字段,并修改 increament_tab 表中的状态记录,根据作业输入插件类型,填写正确的 jobType,然后再启动作业,这样作业才能继续正常工作。

  1. 优化 kafka 输出插件任务状态记录管理功能,采用指标分析 Metrics 对数据发送情况进行聚合统计,按照指定的时间窗口进行聚合计算后,执行回调任务处理 success 方法,任务 taskMetrics 为聚合计算后的统计信息,可以通过开关控制是否进行预聚合功能:
kafkaOutputConfig.setEnableMetricsAgg(true);//启用预聚合功能kafkaOutputConfig.setMetricsAggWindow(60);//指定统计时间窗口,单位:秒,默认值60秒

4. 优化 kafka 输入插件拦截器功能:定时记录统计插件消费 kafka 数据记录情况,并调用任务拦截器的 aftercall 方法输出统计 jobMetrics 信息,可以指定统计时间间隔:

kafka2InputConfig.setMetricsInterval(300 * 1000L);//300秒做一次任务拦截调用,默认值

5. 部分插件增加字段映射功能,涉及插件:日志采集插件、excel 采集插件、生成日志 /excel 文件插件、kafka 输入插件

文件采集插件字段映射配置示例:

FileInputConfig fileInputConfig = new FileInputConfig();_fileInputConfig = fileInputConfig;FileConfig fileConfig = new FileConfig();
    fileConfig.setFieldSplit(";");//指定日志记录字段分割符
  //指定字段映射配置
    fileConfig.addDateCellMapping(0, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());

    fileConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
    fileConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());

kafka 映射配置示例:

Kafka2InputConfig kafka2InputConfig = new Kafka2InputConfig();

    kafka2InputConfig.setFieldSplit(";");//指定kafka记录字段分割符
  //指定字段映射配置
    kafka2InputConfig.addDateCellMapping(0, //记录切割得到的字段列表位置索引,从0开始
                excelCellMapping.getFieldName(), //映射的字段名称
                                          cellType, //字段值类型
                                          excelCellMapping.getDefaultValue(), //字段默认值
                                         excelCellMapping.getDataFormat());//字段格式:日期格式或者数字格式

    kafka2InputConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
    kafka2InputConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());

cellType 取值范围:

public static final int CELL_BOOLEAN = 5;public static final int CELL_DATE = 3;public static final int CELL_NUMBER = 2;public static final int CELL_NUMBER_INTEGER = 6;public static final int CELL_NUMBER_LONG = 7;public static final int CELL_NUMBER_FLOAT = 8;public static final int CELL_NUMBER_SHORT = 9;public static final int CELL_STRING = 1;

6. 增加全局 JobContext,用于存放在作业中使用的初始化数据

7. 作业拦截器、任务拦截器异常方法参数 Exception 类型调整为 Throwable 类型

更多变更,参考提交记录:https://gitee.com/bboss/bboss-elastic-tran/commits/master

数据同步作业开发视频教程

https://www.bilibili.com/video/BV1xf4y1Z7xu

bboss 案例大全

https://esdoc.bbossgroups.com/#/bboss-datasyn-demo

Quick Start

https://esdoc.bbossgroups.com/#/quickstart

开发交流

https://www.bbossgroups.com/forum.html