大数据
基础组件
Hive

Hive 04 - 函数及UDF

简介:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

1. Hive调优

  1. Explain解释执行计划

我们可以在HQL语句最前面加上explain关键字,来获得HQL语句执行计划的阶段情况:

  • hive> explain select * from hive1.table3;
  • OK
  • STAGE DEPENDENCIES:
  • Stage-0 is a root stage
  • STAGE PLANS:
  • Stage: Stage-0
  • Fetch Operator
  • limit: -1
  • Processor Tree:
  • TableScan
  • alias: table3
  • Statistics: Num rows: 4 Data size: 35 Basic stats: PARTIAL Column stats: NONE
  • Select Operator
  • expressions: id (type: int), name (type: string), age (type: int), province (type: string), city (type: string)
  • outputColumnNames: _col0, _col1, _col2, _col3, _col4
  • Statistics: Num rows: 4 Data size: 35 Basic stats: PARTIAL Column stats: NONE
  • ListSink
  • Time taken: 1.701 seconds, Fetched: 17 row(s)

在上面的执行假话中,并没有涉及到MapReduce的操作,这是由于简单的查询操作是本地执行的;当我们的HQL语句涉及到一些复杂的操作,就会触发MapReduce过程:

  • hive> explain select count(*) from hive1.table3;
  • OK
  • STAGE DEPENDENCIES:
  • Stage-1 is a root stage
  • Stage-0 depends on stages: Stage-1
  • STAGE PLANS:
  • Stage: Stage-1
  • Map Reduce
  • Map Operator Tree:
  • TableScan
  • alias: table3
  • Statistics: Num rows: 4 Data size: 35 Basic stats: PARTIAL Column stats: NONE
  • Select Operator
  • Statistics: Num rows: 4 Data size: 35 Basic stats: PARTIAL Column stats: NONE
  • Group By Operator
  • aggregations: count()
  • mode: hash
  • outputColumnNames: _col0
  • Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
  • Reduce Output Operator
  • sort order:
  • Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
  • value expressions: _col0 (type: bigint)
  • Reduce Operator Tree:
  • Group By Operator
  • aggregations: count(VALUE._col0)
  • mode: mergepartial
  • outputColumnNames: _col0
  • Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
  • File Output Operator
  • compressed: false
  • Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
  • table:
  • input format: org.apache.hadoop.mapred.SequenceFileInputFormat
  • output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
  • serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  • Stage: Stage-0
  • Fetch Operator
  • limit: -1
  • Processor Tree:
  • ListSink
  • Time taken: 0.412 seconds, Fetched: 42 row(s)

在解释的执行计划中,一个stage可以是一个MapReduce任务,也可以是一个抽样阶段,或者是一个合并阶段,还可以是一个limit阶段,以及Hive需要的其他某个任务的一个阶段。

我们还可以使用explain extended来获取更加详尽的执行计划:

  • hive> explain extended select count(*) from hive1.table3;
  • OK
  • STAGE DEPENDENCIES:
  • Stage-1 is a root stage
  • Stage-0 depends on stages: Stage-1
  • STAGE PLANS:
  • Stage: Stage-1
  • ...
  1. 启用limit调优

我们可以通过设置hive.limit.optimize.enabletrue来启用limit调优,使用抽样机制避免在某些查询操作中执行全表扫描。

  1. 使用Map端JOIN操作也可以对查询过程进行优化。

  2. 设置本地模式

在处理小数据集时,设置本地模式可以保持在单台机器上执行任务。通过设置hive.exec.mode.local.autotrue设置本地模式:

  • hive> set hive.exec.mode.local.auto=true;
  • hive> set mapreduce.framework.name=local;
  • hive> select count(*) from hive1.table3;
  • WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  • Query ID = ubuntu_20170626054751_ed84bc11-c56c-43f2-9a67-b4d81cfc1df5
  • Total jobs = 1
  • Launching Job 1 out of 1
  • Number of reduce tasks determined at compile time: 1
  • In order to change the average load for a reducer (in bytes):
  • set hive.exec.reducers.bytes.per.reducer=<number>
  • In order to limit the maximum number of reducers:
  • set hive.exec.reducers.max=<number>
  • In order to set a constant number of reducers:
  • set mapreduce.job.reduces=<number>
  • Job running in-process (local Hadoop)
  • 2017-06-26 05:47:54,713 Stage-1 map = 100%, reduce = 100%
  • Ended Job = job_local780490405_0001
  • MapReduce Jobs Launched:
  • Stage-Stage-1: HDFS Read: 327 HDFS Write: 0 SUCCESS
  • Total MapReduce CPU Time Spent: 0 msec
  • OK
  • 12
  • Time taken: 2.905 seconds, Fetched: 1 row(s)

在上面的例子中,可以从打印信息的Job running in-process (local Hadoop)得知使用了本地模式,本地模式查询的过程没有进行MapReduce的操作,性能明显提高了。

  1. 开启并行任务

如果在HQL语句执行的过程中,各个Job之间没有依赖关系,可以并发执行以缩短执行时间,我们需要设置hive.exec.paralleltrue以开启并行执行。

  1. strict模式

可以设置Hive的一些严格模式禁止耗时及耗性能的操作,早期的版本中通过设置hive.mapred.modestrictnonstrict来开启或者关闭严格模式。新版本的Hive将严格模式分为了几个配置:

  • hive.strict.checks.large.query=true,开启后会禁用以下的操作(这只是一个查询模式,与数据量无关):
  1. 不指定limit的OrderBy操作;
  2. 对分区表不指定分区的进行查询的操作;
  • hive.strict.checks.type.safety=true,开启严格类型的安全检查,将不允许以下操作:
  1. bigint数据和string数据之间的比较;
  2. bigint数据和double数据之间的比较;
  • hive.strict.checks.cartesian.product=true,开启这个严格模式后将不允许笛卡尔积的连接查询操作(即全连接)。
  1. 调整Reducer的个数

我们可以通过hive.exec.reducers.bytes.per.reducer的值来指定每个Reducer Task处理的字节数,该值默认为256MB,也就是说当我们的输入数据量为1GB时,将会使用4个Reducer进行处理;

另外,我们还可以通过hive.exec.reducers.max值来指定Reducer的最大值,该值默认为1009;如果我们设置了mapreduce.job.reduces为负值,Hive将会在自动计算Reducer个数时使用该值作为Reducer Task的最大个数。

  1. JVM重用

实现JVM在一个Job中重复使用多次。需要配置以下几个值:

  1. mapreduce.job.ubertask.enable=true,开启Uber模式使用JVM重用;
  2. mapreduce.job.ubertask.maxmap=9,Map的个数,可以降低;
  3. mapreduce.job.ubertask.maxreduces=1,Reduce的个数。

注:这几个配置都在mapred-site.xml文件中。

  • hive> set mapreduce.job.ubertask.enable=true;
  • hive> set mapreduce.job.ubertask.maxmap=9;
  • hive> set mapreduce.job.ubertask.maxreduces=1;
  • hive> select province,count(*) from hive1.table3 group by province;
  • WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
  • Query ID = ubuntu_20170626064638_07a1371e-cca9-4dee-be95-741115c79750
  • Total jobs = 1
  • Launching Job 1 out of 1
  • Number of reduce tasks not specified. Estimated from input data size: 1
  • In order to change the average load for a reducer (in bytes):
  • set hive.exec.reducers.bytes.per.reducer=<number>
  • In order to limit the maximum number of reducers:
  • set hive.exec.reducers.max=<number>
  • In order to set a constant number of reducers:
  • set mapreduce.job.reduces=<number>
  • Job running in-process (local Hadoop)
  • 2017-06-26 06:46:40,065 Stage-1 map = 100%, reduce = 100%
  • Ended Job = job_local1133036260_0002
  • MapReduce Jobs Launched:
  • Stage-Stage-1: HDFS Read: 684 HDFS Write: 0 SUCCESS
  • Total MapReduce CPU Time Spent: 0 msec
  • OK
  • CA 3
  • FL 4
  • WA 5
  • Time taken: 2.069 seconds, Fetched: 3 row(s)
  1. 在查询前为数据集建立索引也是非常常用的优化方式。

  2. 动态分区的调整

使用分区能够将原数据集进行一定的划分,对于分区也有一些配置项可用于控制分区的配置:

  • hive.exec.dynamic.partition.mode,这个配置项用于控制动态分区的严格模式,默认为nonstrict;
  • hive.exec.max.dynamic.partitions,这个配置项用于设置最大的分区数;
  • hive.exec.max.dynamic.partitions.prenode,这个配置项用于设置每个节点的最大分区数。
  1. 推测执行,可以让多个Mapper或Reducer实例并发执行,这些配置在mapred-site.xml文件中:
  • mapreduce.map.speculative,用于设置Mapper的推测执行,默认为true;
  • mapreduce.reduce.speculative,用于设置Reducer的推测执行,默认为true;
  1. 多个分组优化。若多个Group By操作使用的是一个相同的字段,则这些Group By可以生产一个MapReduce进行处理。通过配置hive.multigroupby.singlereducer为true开启该模式,该配置默认为true。

  2. 虚拟列;在数据库中有一些虚拟列并不在表中展示,但是我们可以查询这些虚拟列,如果需要查询这些虚拟列,要通过设置hive.exec.rowoffset为true来开启:

  • hive> set hive.exec.rowoffset=true;
  • hive> select INPUT__FILE__NAME,BLOCK__OFFSET__INSIDE__FILE from hive1.table3;
  • OK
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA/city=Sacramento/persons 0
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA/city=Sacramento/persons 9
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA/city=Sacramento/persons 19
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=FL/city=Tallahassee/000000_0 0
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=FL/city=Tallahassee/000000_0 9
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=FL/city=Tallahassee/000000_0 19
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=FL/city=Tallahassee/000000_0 29
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=WA/city=Olympia/000000_0 0
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=WA/city=Olympia/000000_0 10
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=WA/city=Olympia/persons2 0
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=WA/city=Olympia/persons2 10
  • hdfs://s100/user/ubuntu/data/hive1.db/table3/province=WA/city=Olympia/persons2 20
  • Time taken: 0.134 seconds, Fetched: 12 row(s)

2. 压缩

Hive支持对压缩数据进行处理,这里的压缩与Hadoop中的压缩是类似的;

  1. 可以通过以下命令查看已经开启的压缩编解码器:
  • ubuntu@s100:~$ hive -e "set io.compression.codecs";
  • ...
  • io.compression.codecs is undefined
  1. 通过设置下面的配置来决定是否启用中间结果压缩:
  • hive> set hive.exec.compress.intermediate=true;
  1. 还可以通过修改mapred-site.xml中的配置类修改Map输出结果的压缩,默认值为DefaultCodec:
  • <property>
  • <name>mapred.map.output.compression.codec</name>
  • <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  • </property>
  1. 通过下面的配置设置最终的Job输出结果是否为压缩格式:
  • 首先需要配置是否对Hive的查询结果进行压缩,该值默认为false:
  • >hive set hive.exec.compress.output=true;
  • 具体的压缩配置需要在mapred-site.xml中配置:
  • <property>
  • <name>mapred.output.compression.codec</name>
  • <value>org.apache.hadoop.io.compress.GzipCodec</value>
  • </propertt>
  1. 还可以直接使用SequenceFile作为表数据的存储格式:
  • hive> create table hive1.table5(id int,name string,age int)
  • > row format delimited
  • > fields terminated by '\t'
  • > lines terminated by '\n'
  • > stored as sequencefile;
  • OK
  • Time taken: 0.386 seconds
  • hive> insert into hive1.table5(id,name,age) select id,name,age from hive1.table3;
  • ...
  • OK
  • Time taken: 63.828 seconds
  • hive> dfs -ls -R /;
  • ...
  • drwxr-xr-x - ubuntu supergroup 0 2017-06-26 08:29 /user/ubuntu/data/hive1.db/table5
  • -rwxr-xr-x 3 ubuntu supergroup 350 2017-06-26 08:29 /user/ubuntu/data/hive1.db/table5/000000_0
  • hive> dfs -cat /user/ubuntu/data/hive1.db/table5/000000_0;
  • 0EQ"orgTomache.20oop.i1.BytesWT-+1ble22.apac1e.hadooT-+1.Tex24U•‡* 3ÂZO¬×-UT-+2 26 1 T-+1 24 3 T-+2 26 3 P0+- T-+ 18 1ic+ Jac+ 20 J-h+ 32
  • 2 Jerry 22
  • hive> dfs -text /user/ubuntu/data/hive1.db/table5/000000_0;
  • 0 Tom 20
  • 1 Tom1 22
  • 1 Tom1 24
  • 3 Tom2 26
  • 1 Tom1 24
  • 3 Tom2 26
  • 3 Polo 21
  • 4 Mick 29
  • 5 John 32
  • 0 Tom 18
  • 1 Jack 20
  • 2 Jerry 22
  1. 可以控制在SequenceFile作为表数据存储格式的情况下,Job端输出时的文件的压缩类型:
  • <property>
  • <name>mapred.output.compression.type</name>
  • <value>BLOCK</value>
  • </property>

综上所述,对于压缩有以下的几种配置:

  • mapred.map.outputt.compression.codec:Map端输出使用的Codec;
  • hive.exec.compress.intermediate:设置中间结果是否压缩;
  • hive.exec.compress.output:设置Job输出是否压缩;
  • mapred.output.compression.codec:设置Job输出格式的Codec;
  • mapred.output.compression.type:设置Job输出格式的压缩类型。

3. 存档分区

我们可以将某个表中的某些分区通过Hadoop的存档技术打包为Har文件,Har文件用于存放量多的小文件,对大量的文件进行存档可以减轻NameNode管理这些文件的压力。Har文件并不能改善查询效率,并且也不是压缩的。接下来我们将演示如何打包一个分区内的文件为Har包。

注:需要注意的是,Har打包只能针对管理表,外部表无法使用Har进行打包。

我们首先需要启动Hive的归档打包功能:

  • hive> set hive.archive.enabled=true;

然后使用alter对表的指定分区进行打包:

  • hive> alter table hive1.table3 archive partition(province='CA');
  • intermediate.archived is hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA_INTERMEDIATE_ARCHIVED
  • intermediate.original is hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA_INTERMEDIATE_ORIGINAL
  • Creating data.har for hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA
  • in hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA/.hive-staging_hive_2017-06-27_05-50-39_690_4113257115906811694-1/-ext-10000/partlevel
  • Please wait... (this may take a while)
  • FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org/apache/hadoop/tools/HadoopArchives

打印信息显示org/apache/hadoop/tools/HadoopArchives这个类出现了错误,这是由于Hadoop的相关工具类需要拷贝到${HIVE_HOME}/lib目录下:

  • ubuntu@s100:~$ cp /soft/hadoop/share/hadoop/tools/lib/hadoop-archives-2.7.2.jar /soft/hive/lib/

然后退出Hive重新执行:

  • hive> set hive.archive.enabled=true;
  • hive> alter table hive1.table3 archive partition(province='CA');
  • intermediate.archived is hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA_INTERMEDIATE_ARCHIVED
  • intermediate.original is hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA_INTERMEDIATE_ORIGINAL
  • Creating data.har for hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA
  • in hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA/.hive-staging_hive_2017-06-27_05-54-24_213_6306081865254274136-1/-ext-10000/partlevel
  • Please wait... (this may take a while)
  • Moving hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA/.hive-staging_hive_2017-06-27_05-54-24_213_6306081865254274136-1/-ext-10000/partlevel to hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA_INTERMEDIATE_ARCHIVED
  • Moving hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA to hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA_INTERMEDIATE_ORIGINAL
  • Moving hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA_INTERMEDIATE_ARCHIVED to hdfs://s100/user/ubuntu/data/hive1.db/table3/province=CA
  • OK
  • Time taken: 32.098 seconds

这次就归档成功了;我们可以查看HDFS中的归档文件:

  • hive> dfs -ls -R /;
  • drwxr-xr-x - ubuntu supergroup 0 2017-06-27 05:54 /user/ubuntu/data/hive1.db/table3
  • -rwxr-xr-x 3 ubuntu supergroup 9 2017-06-24 04:00 /user/ubuntu/data/hive1.db/table3/000000_0
  • drwxr-xr-x - ubuntu supergroup 0 2017-06-27 05:54 /user/ubuntu/data/hive1.db/table3/province=CA
  • drwxr-xr-x - ubuntu supergroup 0 2017-06-27 05:54 /user/ubuntu/data/hive1.db/table3/province=CA/data.har
  • -rw-r--r-- 3 ubuntu supergroup 0 2017-06-27 05:54 /user/ubuntu/data/hive1.db/table3/province=CA/data.har/_SUCCESS
  • -rw-r--r-- 5 ubuntu supergroup 404 2017-06-27 05:54 /user/ubuntu/data/hive1.db/table3/province=CA/data.har/_index
  • -rw-r--r-- 5 ubuntu supergroup 23 2017-06-27 05:54 /user/ubuntu/data/hive1.db/table3/province=CA/data.har/_masterindex
  • -rw-r--r-- 3 ubuntu supergroup 30 2017-06-27 05:54 /user/ubuntu/data/hive1.db/table3/province=CA/data.har/part-0
  • ...

归档文件中有一个索引文件,记录了分区的偏移量信息,可以查看:

  • hive> dfs -cat /user/ubuntu/data/hive1.db/table3/province=CA/data.har/_index;
  • %2F dir 1498568065343+493+ubuntu+supergroup 0 0 .hive-staging_hive_2017-06-27_05-54-24_213_6306081865254274136-1 city%3DSacramento
  • %2F.hive-staging_hive_2017-06-27_05-54-24_213_6306081865254274136-1 dir 1498568077929+493+ubuntu+supergroup 0 0
  • %2Fcity%3DSacramento dir 1498304685657+493+ubuntu+supergroup 0 0 persons
  • %2Fcity%3DSacramento%2Fpersons file part-0 0 30 1498304685732+493+ubuntu+supergroup

4. Hive函数

在Hive中,定义了很多内置的函数,同时我们还可以自定义特定功能的函数。我们可以使用下面的操作来查看函数的相关帮助:

  • # 可查看所有函数
  • hive> show functions;
  • # 查看函数的帮助
  • hive> desc function |;
  • OK
  • a | b - Bitwise or
  • Time taken: 0.019 seconds, Fetched: 1 row(s)
  • # 查看函数扩展帮助
  • hive> desc function extended |;
  • OK
  • a | b - Bitwise or
  • Example:
  • > SELECT 3 | 5 FROM src LIMIT 1;
  • 7
  • Time taken: 0.009 seconds, Fetched: 4 row(s)

4.1. 内置函数

  1. 数学函数
  • round:四舍五入
  • select round(数值,小数点位数);
  • ceil:向上取整
  • select ceil(45.6); --46
  • floor:向下取整
  • select floor(45.6); --45
  1. 字符函数
  • lower:转成小写
  • select lower('Hive'); --hive
  • upper:转成大写
  • select lower('Hive'); --HIVE
  • length:长度
  • select length('Hive'); --4
  • concat:拼接字符串
  • select concat('hello','Hive'); --helloHive
  • substr:求子串
  • select substr('hive',2); --ive
  • select substr('hive',2,1); --i
  • trim:去掉前后的空格
  • select trim(' hive '); -hive
  • lpad:左填充
  • # 对hive填充到10位,补位用#
  • select lpad('hive',10,'#'); --######hive
  • rpad:右填充
  • select rpad('hive',10,'#'); --hive######
  1. 收集函数
  • select size(map(1,'yy',2,'xx')); --2 map结合的元素个数
  1. 转换函数
  • select cast(1 as float); --1.0
  • select cast('2016-05-22' as date); --2016-05-22
  1. 日期函数
  • to_date
  • select to_date('2015-05-22 15:34:23'); --2015-05-22
  • year
  • select year('2015-05-22 15:34:23'); --2015
  • month
  • select month('2015-05-22 15:34:23'); --5
  • day
  • select day('2015-05-22 15:34:23'); --22
  • weekofyear
  • select weekofyear('2015-05-22 15:34:23'); --21
  • datediff
  • select datediff('2015-05-22 15:34:23','2015-05-29 15:34:23'); --[-7]
  • date_add
  • select date_add('2015-05-22 15:34:23',2); --2015-05-24
  • date_sub
  • select date_sub('2015-05-22 15:34:23',2); --2015-05-20
  1. 条件函数
  • coalesce:从左到右返回第一个不为null的值
  • case…when…:条件表达式
  • select ename,job,sal,
  • case job when 'president' then sal+100
  • when 'manager' then sal+800
  • else sal+400
  • end
  • from emp;

4.2. 聚合函数

  • count:总数
  • sum:和
  • max:最大值
  • min:最小值
  • avg:平均数

4.3. 表生成函数

表生成函数即UDTF(User Define Table Function)。

  • select explode(map(1,'xx',2,'yy',3,'zz'));

其结果如下:

  • 1 xx
  • 2 yy
  • 3 zz

4.4. 自定义函数(UDF)

我们可以使用Java编写自己的函数,添加到Hive的环境中,使用Hive的SQL语句执行。接下来我们就来演示这个过程:

  • 首先我们需要使用Java编写自己的函数,代码如下:
  • package com.coderap.hive.function;
  • import java.text.DateFormat;
  • import java.text.ParseException;
  • import java.text.SimpleDateFormat;
  • import java.util.Date;
  • import org.apache.hadoop.hive.ql.exec.Description;
  • import org.apache.hadoop.hive.ql.exec.UDF;
  • import org.apache.hadoop.hive.ql.udf.UDFType;
  • /**
  • * ToDate
  • */
  • @Description(name="ToDate",value="This is a Function to convert Date and String",extended="select ToDate('2017/01/21 12:00:00')")
  • @
  • UDFType(deterministic =true, stateful =false)
  • public class ToDate extends UDF {
  • public Date evaluate(){
  • return new Date();
  • }
  • public Date evaluate(String dateStr){
  • try {
  • DateFormat df = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
  • return df.parse(dateStr);
  • } catch (ParseException e) {
  • e.printStackTrace();
  • }
  • return null ;
  • }
  • public Date evaluate(String dateStr,String fmt){
  • try {
  • DateFormat df = new SimpleDateFormat(fmt);
  • return df.parse(dateStr);
  • } catch (ParseException e) {
  • e.printStackTrace();
  • }
  • return null ;
  • }
  • }

接下来需要添加Jar包到Hive的Claspath下,如果是临时添加,只需要执行 下列脚本即可:

  • hive> add jar /home/ubuntu/hive-0.0.1-SNAPSHOT.jar;
  • Added [/home/ubuntu/hive-0.0.1-SNAPSHOT.jar] to class path
  • Added resources: [/home/ubuntu/hive-0.0.1-SNAPSHOT.jar]

注:如果需要永久生效,就需要在hite-site.xml文件中设置hive.aux.jars.path值为相应的路径即可。

然后我们可以在Hive命令行中添加函数声明:

  • hive> create temporary function toDate as 'com.coderap.hive.ToDate';
  • OK
  • Time taken: 0.009 seconds

注:如果需要永久添加,使用CREATE FUNCTION toDate AS 'com.coderap.hive.ToDate' USING JAR 'file:///home/ubuntu/hive_functions/hive-0.0.1-SNAPSHOT.jar';

在上面的声明中toDate就是以后调用该函数的函数名;我们可以在命令行中查看函数的信息:

  • hive> desc function ToDate;
  • OK
  • This is a Function to convert Date and String
  • Time taken: 0.005 seconds, Fetched: 1 row(s)
  • hive> desc function extended ToDate;
  • OK
  • This is a Function to convert Date and String
  • Synonyms: todate
  • select ToDate('2017/01/21 12:00:00')
  • Time taken: 0.008 seconds, Fetched: 3 row(s)

尝试执行函数:

  • hive> select ToDate("2017/06/27 18:00:00");
  • OK
  • {"fasttime":1498611600000,"cdate":null}
  • Time taken: 0.104 seconds, Fetched: 1 row(s)

如果需要删除函数,只需要执行DROP TEMPORARY FUNCTION IF EXISTS ToDate;即可。

4.5. UDAF

4.5.1. UDTF

UDTF通常用在ELT(ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、交互转换(transform)、加载(load)至目的端的过程)数据转换和查询上。

4.5.2. 集成自定义函数的方式

常用的三种集成自定义函数的方式:

首先要求创建的function是永久function,不能是临时function。我们有三种方式可以创建永久的function,并且避免每次都需要进行add jar操作:

第一种:修改hive-site.xml文件,添加参数hive.aux.jars.path,value为jar包的Linux本地路径,要求是以file:///开头的绝对路径。

第二种:直接将jar包移动到Hive安装目录的lib文件夹中。

第三种:将jar包移动到HDFS上,然后在创建function的时候指定function使用的HDFS上的jar文件绝对路径(包括hdfs://namenode:8020/前缀),这样在使用的时候,Hive会自动将jar下载到本地进行缓存。

另外一种Hive集成自定义函数的方式:

我们可以通过修改Hive的源码,进行自定义函数的添加,添加完成后,我们就不需要再手动创建函数。添加步骤如下:

  1. 假设自定义函数的整个包名为com.coderap.ql.udf.UDFTest, jar文件为CoderapUserUDF.jar。将该jar包移动到Hive的lib文件夹中;
  2. 修改hive源文件$HIVE_HOME/src/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java,添加import com.coderap.ql.udf.UDFTest;registerUDF("test", UDFTest.class,false);
  3. 编译Hive后,进行jar包的替换,然后就可以使用函数了。