Kettle精讲

一、kettle简介

1. kettle的发展史

Kettle最早是一个开源的ETL工具,全称为KDE Extraction, Transportation, Transformation and Loading Environment。KDE源于最开始的计划是在K Desktop Environment(www.kde.org)上开发这个软件,但这个计划被取消。在2006年,Pentaho公司收购了Kettle项目,原Kettle项目发起人Matt Casters加入了Pentaho团队,成为Pentaho套件数据集成架构师,从此,Kettle成为企业级数据集成及商业智能套件Pentaho的主要组成部分,Kettle亦重命名为Pentaho Data Integration(PDI)。Pentaho公司于2015年被Hitachi(日立) Data Systems收购,Hitachi Data Systems于2017年改名为Hitachi Vantara。

Pentaho Data Integration以Java开发,支持跨平台运行,可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。Pentaho Data Integration分为商业版与开源版,开源版的截止2021年1月的累计下载量达836万,其中19%来自中国。在中国,一般人仍习惯把Pentaho Data Integration(PDI)的开源版称为Kettle。

2. kettle 与ETL

ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程)

在各个企业中,对数据的处理几乎成为其数字化发展的必要流程,而数据的处理,无外乎抽取、统计分析、转换、装载,因此,各个企业目前都需要ETL工程师来完成数据的处理工作。

kettle是一个ETL工具,允许管理来自不同异构数据源的数据,并在基于图形化的工具中,完成对ETL的操作。

3. kettle的架构

Transformation:转换

  • 在大部分场景下,可以直接称之为“数据流”

  • 它可以完成对数据的【输入】-->【处理】-->【输出】

  • 一旦启动一个转换任务,则其中的所有组件会同时启动,并根据配置,逐条处理数据

Job:作业

  • 作业可以称之为步骤流或者控制流

  • 在作业中,可以挂载(调用)转换任务,也可以挂载(调用)job任务

  • 作业中的各个组件,按照顺序执行,可以对执行的结果进行判断并处理分支

  • 作业可以检测数据表、文件是否存在,执行Shell脚本,执行SQL脚本,获取数据,发送邮件等

核心组件:

组件

描述

spoon

【勺子】是kettle的图形化工具,可以通过简单的拖拉拽方式完成kettle任务的设计、运行与调试,为kettle最常用的组件。

Pan

【煎锅】Transformation执行器(命令行方式),Pan用于在终端执行Transformation,没有图形界面

Kitchen

【厨房】Job执行器(命令行方式),Kitchen用于在终端执行Job,没有图形界面。

Carte

嵌入式Web服务,用于远程执行Job或Transformation,Kettle通过Carte建立集群

4. kettle的特点

  1. 免费开源:基于java的免费开源的软件,对商业用户也没有限制,可以在任何的公司中使用。

  2. 容易配置:可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。

  3. 兼容各种数据源:ETL工具集,它允许你管理来自不同数据库的数据。

  4. 简单开发:通过图形界面设计与开发任务,无需写代码实现。

二、kettle的安装

下载地址:https://community.hitachivantara.com/s/article/data-integration-kettle

目前最新的版本为9.3版本,在此我们使用9.1版本的kettle进行安装

安装要求:

  • 安装所在的服务器或者Windows中,需要jdk1.8

在获取到压缩包之后,将压缩包解压至无中文路径下即可,注意,是整体路径中,任何一级目录中都不包含中文

解压后的目录结构如下:

三、kettle的初体验

需求:将一个csv文件中的数据内容输出到Excel文件中

1. 新增转换任务

新增一个转换任务的方式:

2. 配置csv输入组件(step)

3. 配置Excel输出组件(step)

1)通过拖拽的方式将Excel输出放入编辑页面中

2)将输入组件与输出组件连接到一起:

3)双击Excel输出组件,对内容进行配置

4. 创建作业(job)

1)双击【主对象树】中的作业或点击【文件】-【新建】-【作业】

2)每个任务由一个start组件开始

5. 在作业中挂载转换任务

配置转换任务与结束节点

6. 测试运行

保存任务并执行

四、kettle名词解释

1. 转换

转换(transformation)是ETL解决方案中最主要的部分,它处理抽取、转换、加载各阶段各种对数据的操作。转换包含一个或多个“Step-步骤”,例如读取文件,过滤数据,数据加载等操作都是步骤。转换里的步骤通过“Hop-跳”来连接,跳定义了一个单向通道,允许数据从一个步骤向另一个步骤流动。此外,转换中的每个步骤还可以注释,目的主要是使转换文档化。

每个“Transformation-转换”对应的保存文件名称为“xx.ktr”

2. Step-步骤

Step是转换里的基本组成部分。

“CSV文件输入”和“Excel输出”显示了两个Step步骤。

每个Step都有唯一的一个名字,一个Step可以有多个输出跳,一个“步骤”的数据有多个输出跳时可以设置数据“分发”或者“复制”,“分发”是目标“步骤”轮流接收数据,“复制”是所有的记录被同时发送到所有的目标“步骤”。@

注意:分发会导致一份文件中的内容被发送到不通的文件中,输出到两个文件中的内容不同。

3. Hop-跳

“Hop-跳”就是步骤之间带箭头的连线,跳定义了步骤之间的数据通路。在转换中“跳”不能循环,因为每个步骤都依赖前一个步骤获取字段值。所以转换任务是一个DAG(有向无环图)

“Hop-跳”实际上是两个“Step-步骤”之间的记录行的缓存,缓存数据量可以在转换配置中设置(双击配置页面空白处),当缓存记录数满了,写数据的步骤停止写入,此时输出步骤不会停止,持续的读取缓存数据,直到缓存中有空间,写数据步骤继续运行。当缓存清空后,读取数据的步骤停止读取数据,直到缓存中又有数据。

当单击“跳”时,连线变灰色,代表不使用,再次单击变蓝代表启用。

4. 并行(转换)

当“Transformation-转换”启动后,所有“Step-步骤”都同时启动,这些“Step-步骤”都是并发方式运行,各自从对应的输入跳中读取数据,并把处理过的数据写到输出跳,直到输入跳里不再有数据,就终止步骤的运行,当所有的步骤都终止了,整个转换就停止了。

“Transformation-转换”里的步骤几乎是同时启动的,所以不可能判断出哪个步骤是第一个启动的步骤。如果想要一个任务沿着指定的顺序执行,那么就要使用“Job-作业”。

5. 数据类型

数据以数据行的形式沿着步骤移动。一个数据行是零到多个字段的集合,字段包括下面几种数据类型:

  • String: 字符类型

  • Number: 双精度浮点数(3.14)

  • Integer: 带符号长整型

  • BigNumber: 任意精度数值(3.141592653)

  • Date: 带毫秒精度的日期时间值

  • Boolean: 取值为true和false的布尔值

  • Binary: 二进制字段可以包括图形、声音、视频及其他类型的二进制数据

6. Job-作业

大多数ETL项目都需要完成各种各样的操作,而且这些操作要按照一定顺序完成。因为转换以并行方式执行,就需要一个可以串行执行的作业来处理这些操作。

作业是步骤流,转换是数据流,这是作业和转换的最大区别。

作业的每个步骤必须等到前面的步骤都跑完了,后面的步骤才会执行

而转换会一次性把所有的控件全部先启动(一个控件对应启动一个线程)然后数据流会从第一个控件开始,一条记录,一条记录的流向最后的控件。

一个作业包括一个或多个作业项,这些作业项以某种顺序来执行。作业执行顺序由作业项之间的跳和每个作业项的执行结果来决定。 可以单机作业跳来改变作业跳的状态,有三种状态(锁-必须执行、对号-执行成功时、错号-执行失败时,这三种状态可以通过单击连线进行切换)。

上图中的“Start”是“job-作业”的起点,一个作业只能定义一个“Start”。上图中每个“转换”就是作业的作业项,作业项是作业的基本构成部分。默认情况下作业中作业项都是以串行的方式制定,只是在特殊的情况下以并行方式执行。

当作业中有多条路径时,会采用回溯算法来执行作业项,如下图:

回溯算法就是:假设扫行到了图里的一条路径的某个节点时,要依次扫行这个节点的所有子路径,直到没有再可以执行的子路径,就返回该节点的上一节点,再反复这个过程。

上图中的三个作业的执行顺序如下:

首先“开始”作业项搜索所有下一个节点作业项,找到了“A”和“C”

  1. 执行“A”

  2. 搜索“A”后面的作业项,发现了“B”

  3. 执行“B”

  4. 搜索“B”后面的作业项,没有找到任何作业项

  5. 回到“A”,也没有发现其他作业项(需要被执行的作业项)

  6. 回到Start,发现另一个要执行的作业项“C”

  7. 执行“C”

  8. 搜索“C”后面的作业项,没有找到任何作业项

  9. 回到Start,没有找到任何作业项

  10. 作业结束。

以上执行过程就是Start->A->B->C,也有可能是Start->C->A->B。

作业除了以上串行执行外,还可以并行执行:

每个“Job-作业”对应的保存文件名称为“xx.kjb”。

五、转换核心对象

1. 输入

1) CSV文件输入

2) Excel输入

通过一个需求来熟悉一下Excel输入:

同时读取将两个Excel文件,并将其输出到指定的结果Excel文件中

  1. 创建Excel输入

注意:kettle不支持读取使用Excel2007创建的Excel2003文件

  1. 创建Excel输出

  1. 保存执行

3) 文本文件输入

  1. 创建文本文件输入组件

  1. 根据具体内容设置文件文件的配置

  1. 获取字段信息,完成组件配置

4) 生成记录

可以通过该步骤,完成测试数据的生成

补充:如何将任务放入到linux中执行

  • 首先将kettle的压缩包放入linux中,并通过unzip进行解压,如果没有指令可以通过yum install unzip 进行安装

  • 解压指令:unzip 包名 -d /opt/installs

  • 将配置好的任务传入linux中,建议在kettle的目录中新建一个job目录,将其放入

  • vim 配置文件.ktr 将其中的输出与输入目录改为linux中的具体目录

  • 在kettle的主目录中,执行:./pan.sh -file=./job/generate_input.ktr

5) 表输入

Kettle支持抽取数据库表中的数据,以MySQL为例, 需要将mysql的驱动包放入Kettle解压目录“...\data-integration\lib”下,这里放入之后,需要重新启动Kettle。

在新建的一个【转换】配置了mysql数据库的连接,但在其他的转换任务中,无法直接使用,需要将此数据库连接共享才可以完成所有转换任务的共同使用。

配置表输入step

2. 输出

1) Excel输出/MicrosoftExcel输出

Excel输出”和“MicrosoftExcel输出”都是Excel输出,不同的是“Excel输出”写出支持“xls”格式,“MicrosoftExcel输出”支持“xls”和“xlsx”格式。

2) SQL文件输出

在某些场景下,需要对数据库中的某个表进行备份或者迁移的动作,可以使用【SQL文件输出】步骤进行数据的处理。

生成结果示例:

3) 文本文件输出

文本文件输出可以将各种数据源中的数据生成为文本文件,大多数的使用场景是将数据放入linux中,供hive等数据库数据分析使用。

4) 表输出

注意:如果数据中存在中文,则需要在数据库配置处添加命名参数

命名参数:characterEncoding

值:utf8

3. 转换

1) Concat fields

在输出的时候,记得设定合并后的字段的长度不能太短

2) 值映射

当遇到需要将某个字段中的值,根据值的内容来进行数据映射的时候,使用【值映射】步骤

类似于 0 代表男 1 代表女,输出的时候不输出 0 或者1 ,而是输出 男或者女

要求:在值映射的组件中进行字段值的配置时,“源值”与“目标值”的字段类型需要一致。

3) 增加序列

相当于 row_number() over()

4) 字段选择

当需要对抽取的数据只获取其中的某几列字段的时候,可以使用字段选择step

【字段选择】step还可以对字段的名称进行更改,以及变更其数据类型

5) 拆分字段

如果遇到需要将数据由一个字段拆分成多个字段的需求,可以使用【拆分字段】step

比如,有字段数据如下:

“中国,河南,郑州,高新区”

“中国,北京,,昌平区”

将其拆分成:

国家

省份

地市

中国

河南

郑州

高新区

中国

北京


昌平

准备数据:

zhangsan    中国,河南,郑州,高新区
lisi    中国,北京,,昌平区
wangwu    中国,河南,郑州,金水区
zhaoliu    中国,河南,郑州,惠济区

6) 列拆分为多行(炸裂)

当需要将输入的数据进行“炸裂”处理时,可以使用【列拆分为多行】step

源数据:

zhangsan        中国,河南,郑州,高新区
lisi        中国,北京,,昌平区
wangwu        中国,河南,郑州,金水区
zhaoliu        中国,河南,郑州,惠济区
wangba        中国,河南,郑州

7)字段处理step汇总

  1. 字符串替换

  1. 字符串操作

  1. 剪切字符串

8)排序记录

9)去除重复记录

4. 应用

1) 替换null值

将数据流中某些为空的字段值替换成指定的值

2)写日志

通常在调试阶段使用写日志step,用来查看数据的流通情况

注意:在启动任务时,会出现启动配置,此时,启动配置中的日志级别需要大于等于任务中【写日志】step中配置的日志级别。

5. 流程

1) switch/case

“Switch/case”类似Java中的Switch...case...操作,可以根据匹配不同的值做不同的操作,可以让数据流由一路到多路

注意:该step只能通过相等的判断对数据进行分流

2) 过滤记录

“过滤记录”可以根据指定条件获取true或者false让数据流数据由一路到两路

6. 脚本

1) 执行sql脚本

“执行SQL脚本”可以执行自己定义的SQL语句,例如执行一个update语句,用来更新某个表中的数据。

7. 查询

1) 数据库查询

当在处理一个文本文件或者Excel文件的时候,需要去关联数据库中的数据,将数据库中的数据(字段)关联出来,作为输出,可以使用【数据库查询】step

2) 流查询

在【数据库查询】查询的基础上,流查询可以将任意类型的数据进行融合关联,完成指定需求字段的输出。

比如:可以将两个文本文件一起完成left join的操作

8. 连接

1) 记录集连接

“记录集连接”可以做类似数据库中左连接、右连接、内连接、外连接操作。在对流数据进行连接之前,需要对记录集进行排序

9. 统计-分组

“分组”是按照某一个或者某几个字段进行分组,同时可以将其余字段按照某种规则进行合并。需要对记录集进行排序。

根据需求来进行认识【统计】-【分组】step的功能:

统计一个部门下,每个性别的平均工资、工资总和以及平均年龄

测试数据:

name,age,sex,salary,dept
Rjwocifmu,45,0,20101,dept_A
Ieskrwckk,46,0,21534,dept_A
Skgleqosu,37,0,20259,dept_A
Sfheedjid,30,1,16486,dept_A
Ssfdjgdrp,32,0,19925,dept_A
Coakcbugt,23,1,18207,dept_A
Kyvirozly,43,1,15335,dept_A
Reebtrsqk,45,1,19498,dept_A
Kgxiasvuh,20,1,19482,dept_A
Uwlagtmvm,27,1,19120,dept_A
Pvewwinwx,40,0,14804,dept_B
Yxzevcmmg,18,1,18178,dept_B
Bgorfjbbe,24,1,20737,dept_B
Smwkgpene,33,0,14889,dept_B
Dtiwcqqrj,39,0,19409,dept_B
Zmycjvdhh,51,1,13692,dept_B
Vgglsssuv,28,1,20920,dept_B
Qfjxejhhk,38,1,15277,dept_B
Wbeduyvjj,23,0,13080,dept_B
Tasongucb,25,0,21229,dept_B
Vvxnjkuej,36,1,17332,dept_B
Yvkkwqhza,32,0,21576,dept_B
Wpaeogpck,38,0,16441,dept_B
Ywwxagilk,24,1,14263,dept_B
Qvzbeseqe,53,1,20635,dept_C
Vasljxiee,37,0,21092,dept_C
Ualdzgpfi,32,0,20405,dept_C
Kpfsxsjve,49,1,17139,dept_C
Tugooglcv,37,1,15113,dept_C
Smclndumc,32,0,13602,dept_C
Bfowgnhmu,37,1,13348,dept_C
Dhhoiywlq,36,0,14326,dept_C
Jtoubmytz,47,1,15001,dept_C
Pjetadrpw,34,1,21592,dept_C
Edyrinujs,25,0,14743,dept_C
Nwtxrxlku,41,1,19254,dept_C
Nimowqjzw,52,0,17987,dept_C

先说一下为什么要排序:

kettle处理数据的时候,总是将数据放入内存(类似数组)

如果要做数据统计的话:

  1. 没有顺序:需要根据分组字段,识别下一条数据是否与之相同,如果相同则算为一组进行累计,数据没有顺序的情况下,输入的数据会出现多个分组(不相邻即识别为新的分组),而导致的统计误差。

  1. 已对数据进行排序:数据在处理时,持续的遇到相同组的数据,累加器可以正常工作,对数据进行完整的统计处理。

六、作业核心对象

1. start

“Start”是定义作业的起点,一个作业只能有一个开始作业项。在start组件中,可以定义启动周期与定时启动。

2. 转换

“转换”作业项可以包含一个转换,我们可以将详细的操作过程存入一个转换,然后包含到一个“转换”作业项中。

可以通过这种方式,来完成多个转换任务之间的顺序执行,以满足更加复杂的数据处理需求。

3. 邮件-发送邮件

当作业或转换运行出现问题或者运行成功等运行状态出现,维护人员想要在第一时间获取到任务的运行情况,可以通过【邮件】-【发送邮件】这个step来完成通知。

163邮箱SMTP服务器地址: smtp.163.com 端口:25

配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,可以在163网页端进行配置。

4. 执行shell脚本

首先要确认的一点是:shell一定是在linux服务器中运行的,也就意味着编写的作业是需要在linux中运行的。

而且,调用的shell也需要在linux存放。

需要注意的点:

  1. 在填写文件的时候,需要给一个绝对路径

  2. 在编写好shell脚本之后,要chmod 777 shell脚本名

七. 变量

1) 全局参数(变量)

在ETL过程中可能需要一些预先定义好的参数,在ETL业务中可以使用这些参数。这些参数可以定义成全局参数。

全局参数定义是通过当前用户下.kettle目录中的kettle.properties文件来定义的,定义方式使用Key=Value方式,例如:ID=1。注意:在定义配置全局参数后需要重启Kettle才会生效。定义好的全局参数在任何转换和作业中可以直接使用。使用参数有两种方式:“%%变量名%%”和“${变量名}”。

在企业中,应用进行版本的迭代,数据的生成发生了变化,历史数据不再进行抽取处理,或者查询等操作

2) 转换命名参数(局部变量)

转换命名参数就是在转换的内部定义变量,作用范围在转换的内部,相当于是局部参数。设置转换命名参数可以在转换空白处双击打开“转换属性”->“命名参数”进行设置

3) 设置变量 / 获取变量

在转换中有“作业”分类,在里面有“设置变量”和“获取变量”的步骤,“设置变量”可以定义一些值,这些值不能在当前转换中使用,需要在下一个转换中使用,在下一个转换中通过“获取变量”来使用。

适用场景:

  1. 某业务系统,每日产生数据量过大,单个mysql库无法存放一日的完整数据,需要不同服务器上的两个数据库进行数据存放。

  2. 数据在中午12点开始进行存放位置切换,会同步向两个服务器中的mysql库进行insert,持续3分钟

  3. 在并行存放数据3分钟后,停止向即将存储满的第一台服务器中的mysql进行insert,此时数据只向第二台服务器中的mysql中插入数据。

  4. 需求:抽取全天数据,且保证数据的不重复

  5. 实现:在抽取完第一台服务器中mysql的表数据后,通过分组获取数据ID的最大值,并通过【设置变量】进行变量的设置,新建一个转换任务,在转换中使用【获取变量】对上一步转换设置的变量进行获取,然后将新增的局部变量放入SQL条件中(ID > ${最大值变量}),用来完成数据的范围限定,从而保证数据的不重复。

注意:在进行变量的设置与获取时,变量的名称应全程一致,不然在表输入中无法获取到变量的中的值。

八、整体案例

1. 批量采集mysql数据到文件中

确认需求:将一个数据库中的所有表数据,全部输出到文件中,每个表生成一个以表名作为文件名的文件。

方案构思:

  1. 创建相应数量的转换任务,一个个执行。(可以但不推荐)

  2. 创建一个转换任务,不停的进行表名和文件的替换,然后重跑。(可以但不推荐)

  3. 有没有一种可能,可以让kettle进行循环

    1. 循环依据:将数据库中的表名放入一个数组,对数组进行循环

    2. 循环过程:每次从数组中获取一个表名,将这个表名放入两个地方,分别是表输入中的from后面的表名,与文本文件输出中的文件名。

    3. 调用起来,循环执行。

在需要获取数据库中的所有表名的时候,可以使用以下SQL语句进行查询:

select table_name from  information_schema.TABLES
where TABLE_SCHEMA = 'test1'

2. 根据文件的后缀不同,分别处理

需求:在一个文件夹下有很多文件,且这些文件的后缀有多种,现在要求通过一个转换将文件中的数据分发不同的目的地

3. 输出数据到hdfs

1)配置hdfs环境

  1. 添加驱动:

驱动位于“D:\pdi-ce-9.1.0.0-324\data-integration\ADDITIONAL-FILES\drivers”路径下:

添加之后一定要重启Kettle

  1. 新建集群

注意:如果在执行过程中出现编码问题可以修改“D:\pdi-ce-9.1.0.0-324\data-integration\Spoon.bat”文件第127行,添加以下内容,指定编码为UTF-8(注意添加时,有空格),添加后重启Kettle

"-DHADOOP_USER_NAME=root" "-Dfile.encoding=UTF-8"

如果在测试连接成功的情况下,还是无法在步骤中使用hdfs的step完成数据的抽取或者输出,则将两个地方的缓存文件夹删除即可:

  1. C:\Users\admin.kettle

  2. D:\BigData-2204\14.kettle\pdi-ce-9.1.0.0-324\data-integration\system\karaf\caches

注意:在抽取hdfs文件的时候,如果点击【获取字段】出现报错,需要变更文件的格式,具体如下:

4. 输出数据到hive

1)配置hive环境

  1. 修改plugin.properties配置(根据自己安装目录)

进入D:\pdi-ce-9.1.0.0-324\data-integration\plugins\pentaho-big-data-plugin修改plugin.properties文件

# 这里指的是一个文件夹的名字,是相对路径
#D:\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations
active.hadoop.configuration=hdp30
  1. 将hdfs-site.xml,core-site.xml,mapred-site.xml,hive-site.xml配置文件拷贝到此目录下(根据自己安装目录)

D:\pdi-ce-9.1.0.0-324\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30
  1. 将以下Jar包放入D:\pdi-ce-9.1.0.0-324\data-integration\lib下,操作后,重启Kettle

  • hadoop-common-*.jar(在hadoop安装目录下 /opt/installs/hadoop3.1.4/share/hadoop/common 中)

  • hive-*.jar(在hive安装目录下lib中)

2)配置ETL

【表输入(MySQL)】--【表输出(hive)】