大数据
基础组件

Pig的简单入门

简介:相比Java的MapReduce API,Pig为大型数据集的处理提供了更高层次的抽象,与MapReduce相比,Pig提供了更丰富的数据结构,一般都是多值和嵌套的数据结构。Pig还提供了一套更强大的数据变换操作,包括在MapReduce中被忽视的连接Join操作。

1. Pig

相比Java的MapReduce API,Pig为大型数据集的处理提供了更高层次的抽象,与MapReduce相比,Pig提供了更丰富的数据结构,一般都是多值和嵌套的数据结构。Pig还提供了一套更强大的数据变换操作,包括在MapReduce中被忽视的连接Join操作。

Pig包括两部分:

  • 用于描述数据流的语言,称为Pig Latin。
  • 用于执行Pig Latin程序的执行环境,当前有两个环境:单JVM中的本地执行环境和Hadoop集群上的分布式执行环境。

Pig内部,每个操作或变换是对输入进行数据处理,然后产生输出结果,这些变换操作被转换成一系列MapReduce作业,Pig让程序员不需要知道这些转换具体是如何进行的,这样工程师可以将精力集中在数据上,而非执行的细节上。

1.1. pig的安装

Pig的安装非常简单,只需要解压相应的安装包,配置环境变量即可:

  • ubuntu@s100:~/software$ tar zxf pig-0.15.0.tar.gz -C /soft/
  • ubuntu@s100:~/software$ cd /soft/
  • ubuntu@s100:/soft$ ln -s pig-0.15.0 pig
  • ...
  • PIG_HOME=/soft/pig
  • PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/soft/jdk/bin:/soft/hadoop/bin:/soft/hadoop/sbin:/soft/eclipse:/soft/maven/bin:/soft/hive/bin:/soft/hbase/bin:/soft/zk/bin:/soft/pig/bin"

安装完后,执行pig -version检查是否安装成功:

  • ubuntu@s100:/soft$ pig -version
  • Apache Pig version 0.15.0 (r1682971)
  • compiled Jun 01 2015, 11:44:35

1.2. 本地模式

Grunt是Pig的外壳程序(shell)。本地模式下,Pig运行在单个JVM中,访问本地文件系统,该模式用于测试或处理小规模数据集:

  • ubuntu@s100:~$ pig -x local
  • SLF4J: Class path contains multiple SLF4J bindings.
  • SLF4J: Found binding in [jar:file:/soft/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-hive.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-pig.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-thin-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  • SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  • 2017-07-13 08:19:21,310 [main] INFO org.apache.pig.Main - Apache Pig version 0.15.0 (r1682971) compiled Jun 01 2015, 11:44:35
  • 2017-07-13 08:19:21,311 [main] INFO org.apache.pig.Main - Logging error messages to: /home/ubuntu/pig_1499959161278.log
  • 2017-07-13 08:19:21,364 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/ubuntu/.pigbootup not found
  • 2017-07-13 08:19:21,570 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
  • grunt>

1.3. MapReduce模式

  • ubuntu@s100:~$ pig -x mapreduce
  • 2017-07-13 08:22:46,928 [main] INFO org.apache.pig.Main - Apache Pig version 0.15.0 (r1682971) compiled Jun 01 2015, 11:44:35
  • 2017-07-13 08:22:46,928 [main] INFO org.apache.pig.Main - Logging error messages to: /home/ubuntu/pig_1499959366916.log
  • 2017-07-13 08:22:46,954 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/ubuntu/.pigbootup not found
  • 2017-07-13 08:22:47,532 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://s100/
  • SLF4J: Class path contains multiple SLF4J bindings.
  • SLF4J: Found binding in [jar:file:/soft/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-hive.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-pig.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/phoenix-4.9.0-HBase-1.2-thin-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: Found binding in [jar:file:/soft/hbase-1.2.4/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  • SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  • SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

需要注意的是,MapReduce模式需要开启Hadoop的历史服务进程historyserver

  • ubuntu@s100:~$ mr-jobhistory-daemon.sh start historyserver
  • starting historyserver, logging to /soft/hadoop-2.7.2/logs/mapred-ubuntu-historyserver-s100.out

1.4. grunt shell常用命令

  1. sh command:执行shell命令
  • grunt> sh ls -al /
  1. help:查看帮助
  • grunt> help
  • Commands:
  • <pig latin statement>; - See the PigLatin manual for details: http://hadoop.apache.org/pig
  • File system commands:
  • fs <fs arguments> - Equivalent to Hadoop dfs command: http://hadoop.apache.org/common/docs/current/hdfs_shell.html
  • Diagnostic commands:
  • describe <alias>[::<alias] - Show the schema for the alias. Inner aliases can be described as A::B.
  • explain [-script <pigscript>] [-out <path>] [-brief] [-dot|-xml] [-param <param_name>=<param_value>]
  • [-param_file <file_name>] [<alias>] - Show the execution plan to compute the alias or for entire script.
  • -script - Explain the entire script.
  • -out - Store the output into directory rather than print to stdout.
  • -brief - Don't expand nested plans (presenting a smaller graph for overview).
  • -dot - Generate the output in .dot format. Default is text format.
  • -xml - Generate the output in .xml format. Default is text format.
  • -param <param_name - See parameter substitution for details.
  • -param_file <file_name> - See parameter substitution for details.
  • alias - Alias to explain.
  • dump <alias> - Compute the alias and writes the results to stdout.
  • Utility Commands:
  • exec [-param <param_name>=param_value] [-param_file <file_name>] <script> -
  • Execute the script with access to grunt environment including aliases.
  • -param <param_name - See parameter substitution for details.
  • -param_file <file_name> - See parameter substitution for details.
  • script - Script to be executed.
  • run [-param <param_name>=param_value] [-param_file <file_name>] <script> -
  • Execute the script with access to grunt environment.
  • -param <param_name - See parameter substitution for details.
  • -param_file <file_name> - See parameter substitution for details.
  • script - Script to be executed.
  • sh <shell command> - Invoke a shell command.
  • kill <job_id> - Kill the hadoop job specified by the hadoop job id.
  • set <key> <value> - Provide execution parameters to Pig. Keys and values are case sensitive.
  • The following keys are supported:
  • default_parallel - Script-level reduce parallelism. Basic input size heuristics used by default.
  • debug - Set debug on or off. Default is off.
  • job.name - Single-quoted name for jobs. Default is PigLatin:<script name>
  • job.priority - Priority for jobs. Values: very_low, low, normal, high, very_high. Default is normal
  • stream.skippath - String that contains the path. This is used by streaming.
  • any hadoop property.
  • help - Display this message.
  • history [-n] - Display the list statements in cache.
  • -n Hide line numbers.
  • quit - Quit the grunt shell.

1.5. 本地模式示例

  1. 加载本地数据

有本地数据:

  • ubuntu@s100:~$ cat customer.data
  • 1,Tom,18
  • 2,John,20
  • 3,Jack,22
  • 4,Rose,21

将其加载到Pig环境:

  • grunt> customer = LOAD '/home/ubuntu/customer.data' USING PigStorage(',');

这个命令在Local模式下执行,将本地的文件中的数据加载到了一个Pig环境下的customer变量中,在加载数据时以,分隔每行数据的每个字段。对于customer来说,它即是Pig中的一种关系(relations)数据类型:一个关系就是一个包,确切的说是一个外包(outer bag),相当于数据库中的表。

  1. dump查看变量数据
  • grunt> dump customer;
  • ...
  • (1,Tom,18)
  • (2,John,20)
  • (3,Jack,22)
  • (4,Rose,21)

对于dump显示的每行数据,都是一个元组数据类型,一个元祖是一系列有序字段的集合;元组中的单个数据,即是字段,一个字段就是一个数据;一个包是一系列元祖的集合。

  1. 定义带有scheme的表变量
  • grunt> customer = LOAD '/home/ubuntu/customer.data' USING PigStorage(',') as (id:int,name:chararray,age:int);

这样加载的变量数据与上面的并无差别,但每个字段都对应了相应的scheme。

  1. 查看表变量的scheme
  • grunt> describe customer;
  • customer: {id: int,name: chararray,age: int}
  1. 显式查看变量执行时的MapReduce计划
  • grunt> explain customer;
  • 2017-07-13 08:36:05,404 [main] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
  • 2017-07-13 08:36:05,404 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator, GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}
  • #-----------------------------------------------
  • # New Logical Plan:
  • #-----------------------------------------------
  • customer: (Name: LOStore Schema: id#13:int,name#14:chararray,age#15:int)
  • |
  • |---customer: (Name: LOForEach Schema: id#13:int,name#14:chararray,age#15:int)
  • | |
  • | (Name: LOGenerate[false,false,false] Schema: id#13:int,name#14:chararray,age#15:int)ColumnPrune:OutputUids=[13, 14, 15]ColumnPrune:InputUids=[13, 14, 15]
  • | | |
  • | | (Name: Cast Type: int Uid: 13)
  • | | |
  • | | |---id:(Name: Project Type: bytearray Uid: 13 Input: 0 Column: (*))
  • | | |
  • | | (Name: Cast Type: chararray Uid: 14)
  • | | |
  • | | |---name:(Name: Project Type: bytearray Uid: 14 Input: 1 Column: (*))
  • | | |
  • | | (Name: Cast Type: int Uid: 15)
  • | | |
  • | | |---age:(Name: Project Type: bytearray Uid: 15 Input: 2 Column: (*))
  • | |
  • | |---(Name: LOInnerLoad[0] Schema: id#13:bytearray)
  • | |
  • | |---(Name: LOInnerLoad[1] Schema: name#14:bytearray)
  • | |
  • | |---(Name: LOInnerLoad[2] Schema: age#15:bytearray)
  • |
  • |---customer: (Name: LOLoad Schema: id#13:bytearray,name#14:bytearray,age#15:bytearray)RequiredFields:null
  • #-----------------------------------------------
  • # Physical Plan:
  • #-----------------------------------------------
  • customer: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-27
  • |
  • |---customer: New For Each(false,false,false)[bag] - scope-26
  • | |
  • | Cast[int] - scope-18
  • | |
  • | |---Project[bytearray][0] - scope-17
  • | |
  • | Cast[chararray] - scope-21
  • | |
  • | |---Project[bytearray][1] - scope-20
  • | |
  • | Cast[int] - scope-24
  • | |
  • | |---Project[bytearray][2] - scope-23
  • |
  • |---customer: Load(/home/ubuntu/customer.data:PigStorage(',')) - scope-16
  • 2017-07-13 08:36:05,416 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
  • 2017-07-13 08:36:05,416 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
  • 2017-07-13 08:36:05,416 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
  • #--------------------------------------------------
  • # Map Reduce Plan
  • #--------------------------------------------------
  • MapReduce node scope-28
  • Map Plan
  • customer: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-27
  • |
  • |---customer: New For Each(false,false,false)[bag] - scope-26
  • | |
  • | Cast[int] - scope-18
  • | |
  • | |---Project[bytearray][0] - scope-17
  • | |
  • | Cast[chararray] - scope-21
  • | |
  • | |---Project[bytearray][1] - scope-20
  • | |
  • | Cast[int] - scope-24
  • | |
  • | |---Project[bytearray][2] - scope-23
  • |
  • |---customer: Load(/home/ubuntu/customer.data:PigStorage(',')) - scope-16--------
  • Global sort: false
  • ----------------
  1. illustrate,查看声明语句的每一步执行情况
  • grunt> illustrate customer;
  • ...
  • 2017-07-13 08:37:24,336 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map - Aliases being processed per job phase (AliasName[line,offset]): M: customer[2,11] C: R:
  • -------------------------------------------------------------
  • | customer | id:int | name:chararray | age:int |
  • -------------------------------------------------------------
  • | | 2 | John | 20 |
  • -------------------------------------------------------------
  1. 数据存储,我们可以将一个包的数据存储到本地
  • grunt> store customer into '/home/ubuntu/customer_dir' USING PigStorage(':');
  • ...
  • Input(s):
  • Successfully read 4 records from: "/home/ubuntu/customer.data"
  • Output(s):
  • Successfully stored 4 records in: "/home/ubuntu/customer_dir"
  • Counters:
  • Total records written : 4
  • Total bytes written : 0
  • Spillable Memory Manager spill count : 0
  • Total bags proactively spilled: 0
  • Total records proactively spilled: 0
  • Job DAG:
  • job_local217803336_0002
  • 2017-07-13 08:45:54,375 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

成功后查看相应目录的数据:

  • ubuntu@s100:~$ cd customer_dir/
  • ubuntu@s100:~/customer_dir$ ll
  • total 20
  • drwxrwxr-x 2 ubuntu ubuntu 4096 Jul 13 08:45 ./
  • drwxr-xr-x 14 ubuntu ubuntu 4096 Jul 13 08:45 ../
  • -rw-r--r-- 1 ubuntu ubuntu 39 Jul 13 08:45 part-m-00000
  • -rw-r--r-- 1 ubuntu ubuntu 12 Jul 13 08:45 .part-m-00000.crc
  • -rw-r--r-- 1 ubuntu ubuntu 0 Jul 13 08:45 _SUCCESS
  • -rw-r--r-- 1 ubuntu ubuntu 8 Jul 13 08:45 ._SUCCESS.crc
  • ubuntu@s100:~/customer_dir$ cat part-m-00000
  • 1:Tom:18
  • 2:John:20
  • 3:Jack:22
  • 4:Rose:21
  1. 对关系分组

我们先修改测试数据如下:

  • 1,Tom,18,CA
  • 2,John,20,FL
  • 3,Jack,22,OH
  • 5,Rose,21,FL
  • 6,Lee,21,CO
  • 7,Tim,21,CA
  • 8,Mick,21,FL
  • 9,Lily,21,OH
  • 10,lucy,21,PA
  • 11,Ted,21,CO
  • 12,James,21,CA

添加了多行数据,并且增加了一个州的列,然后将这些数据加载到Pig中:

  • grunt> customer = LOAD '/home/ubuntu/customer.data' USING PigStorage(',') as (id:int,name:chararray,age:int,province:chararray);

接下来进行关系分组:

  • grunt> g1 = Group customer BY age;
  • grunt> dump g1;
  • ...
  • (18,{(1,Tom,18,CA)})
  • (20,{(2,John,20,FL)})
  • (21,{(12,James,21,CA),(11,Ted,21,CO),(10,lucy,21,PA),(9,Lily,21,OH),(8,Mick,21,FL),(7,Tim,21,CA),(6,Lee,21,CO),(5,Rose,21,FL)})
  • (22,{(3,Jack,22,OH)})

可以发现,该操作将数据根据age的值进行分组了。

  1. 多列分组,与上面的例子类似,我们可以同时指定多个分组条件
  • grunt> g2 = Group customer BY (province,age);
  • grunt> dump g2;
  • ...
  • ((CA,18),{(1,Tom,18,CA)})
  • ((CA,21),{(12,James,21,CA),(7,Tim,21,CA)})
  • ((CO,21),{(11,Ted,21,CO),(6,Lee,21,CO)})
  • ((FL,20),{(2,John,20,FL)})
  • ((FL,21),{(8,Mick,21,FL),(5,Rose,21,FL)})
  • ((OH,21),{(9,Lily,21,OH)})
  • ((OH,22),{(3,Jack,22,OH)})
  • ((PA,21),{(10,lucy,21,PA)})
  1. 全分组
  • grunt> g3 = Group customer ALL;
  • grunt> dump g3;
  • ...
  • (all,{(12,James,21,CA),(11,Ted,21,CO),(10,lucy,21,PA),(9,Lily,21,OH),(8,Mick,21,FL),(7,Tim,21,CA),(6,Lee,21,CO),(5,Rose,21,FL),(3,Jack,22,OH),(2,John,20,FL),(1,Tom,18,CA)})
  1. 协分组,即对多个关系进行分组

我们添加一组数据内容如下:

  • 1,No01,18.22,1
  • 2,No03,4.10,4
  • 3,No02,5.12,5
  • 4,No04,4.20,3
  • 5,No06,2.22,6
  • 6,No04,23.30,1
  • 7,No08,19.99,2
  • 8,No05,13.00,6
  • 9,No11,1.99,6
  • 10,No9,12.00,7
  • 11,No12,20.90,8

然后将该数据加载到Pig中:

  • grunt> orders = LOAD '/home/ubuntu/order.data' USING PigStorage(',') as (id:int,num:chararray,price:float,customer:chararray);

接下来进行协分组:

  • grunt> g4 = COGROUP customer BY age, orders by price;
  • 2017-07-13 09:08:17,585 [main] WARN org.apache.pig.newplan.BaseOperatorPlan - Encountered Warning IMPLICIT_CAST_TO_FLOAT 1 time(s).
  • grunt> dump g4;
  • ...
  • (1.99,{},{(9,No11,1.99,6)})
  • (2.22,{},{(5,No06,2.22,6)})
  • (4.1,{},{(2,No03,4.1,4)})
  • (4.2,{},{(4,No04,4.2,3)})
  • (5.12,{},{(3,No02,5.12,5)})
  • (12.0,{},{(10,No9,12.0,7)})
  • (13.0,{},{(8,No05,13.0,6)})
  • (18.0,{(1,Tom,18,CA)},{})
  • (18.22,{},{(1,No01,18.22,1)})
  • (19.99,{},{(7,No08,19.99,2)})
  • (20.0,{(2,John,20,FL)},{})
  • (20.9,{},{(11,No12,20.9,8)})
  • (21.0,{(12,James,21,CA),(11,Ted,21,CO),(10,lucy,21,PA),(9,Lily,21,OH),(8,Mick,21,FL),(7,Tim,21,CA),(6,Lee,21,CO),(5,Rose,21,FL)},{})
  • (22.0,{(3,Jack,22,OH)},{})
  • (23.3,{},{(6,No04,23.3,1)})
  • grunt> g5 = COGROUP customer BY id, orders by id;
  • 2017-07-13 09:08:17,585 [main] WARN 2017-07-13 09:10:56,208 [main] WARN org.apache.pig.newplan.BaseOperatorPlan - Encountered Warning IMPLICIT_CAST_TO_FLOAT 1 time(s).
  • grunt> dump g5;
  • ...
  • (1,{(1,Tom,18,CA)},{(1,No01,18.22,1)})
  • (2,{(2,John,20,FL)},{(2,No03,4.1,4)})
  • (3,{(3,Jack,22,OH)},{(3,No02,5.12,5)})
  • (4,{},{(4,No04,4.2,3)})
  • (5,{(5,Rose,21,FL)},{(5,No06,2.22,6)})
  • (6,{(6,Lee,21,CO)},{(6,No04,23.3,1)})
  • (7,{(7,Tim,21,CA)},{(7,No08,19.99,2)})
  • (8,{(8,Mick,21,FL)},{(8,No05,13.0,6)})
  • (9,{(9,Lily,21,OH)},{(9,No11,1.99,6)})
  • (10,{(10,lucy,21,PA)},{(10,No9,12.0,7)})
  • (11,{(11,Ted,21,CO)},{(11,No12,20.9,8)})
  • (12,{(12,James,21,CA)},{})

2. Pig连接查询操作

  1. 内连接
  • grunt> customers = LOAD '/home/ubuntu/customer.data' USING PigStorage(',') as (id:int,name:chararray,age:int,addr:chararray);
  • grunt> orders = LOAD '/home/ubuntu/order.data' USING PigStorage(',') as (id:int,name:chararray,price:float,cid:int);
  • grunt> r1 = join customers by id , orders by cid;
  • grunt> dump r1;
  • ...
  • (1,Tom,18,CA,6,No04,23.3,1)
  • (1,Tom,18,CA,1,No01,18.22,1)
  • (2,John,20,FL,7,No08,19.99,2)
  • (3,Jack,22,OH,4,No04,4.2,3)
  • (5,Rose,21,FL,3,No02,5.12,5)
  • (6,Lee,21,CO,9,No11,1.99,6)
  • (6,Lee,21,CO,8,No05,13.0,6)
  • (6,Lee,21,CO,5,No06,2.22,6)
  • (7,Tim,21,CA,10,No9,12.0,7)
  • (8,Mick,21,FL,11,No12,20.9,8)
  1. 自连接

自连接一般可以用于级联查询的数据组成。例如我们有以下的城市级联表数据:

  • 0,Alabama
  • 1,Montgomery,0
  • 2,Autauga,0
  • 3,Clarke,0
  • 4,Arizona
  • 5,Bisbee,4
  • 6,Globe,4
  • 7,Kingman,4
  • 8,Maine
  • 9,Auburn,8
  • 10,Augusta,8
  • 11,Bangor,8

可以使用Pig对其进行自连接查询:

  • grunt> cities = LOAD '/home/ubuntu/cities.data' USING PigStorage(',') as (id:int,name:chararray,pid:int);
  • grunt> cities_copy = LOAD '/home/ubuntu/cities.data' USING PigStorage(',') as (id:int,name:chararray,pid:int);
  • grunt> r2 = join cities by id, cities_copy by pid;
  • grunt> dump r2;
  • ...
  • (0,Alabama,,3,Clarke,0)
  • (0,Alabama,,2,Autauga,0)
  • (0,Alabama,,1,Montgomery,0)
  • (4,Arizona,,7,Kingman,4)
  • (4,Arizona,,6,Globe,4)
  • (4,Arizona,,5,Bisbee,4)
  • (8,Maine,,11,Bangor,8)
  • (8,Maine,,10,Augusta,8)
  • (8,Maine,,9,Auburn,8)
  1. 左外连接
  • grunt> r3 = join customers by id left, orders by cid;
  • grunt> dump r3;
  • ...
  • (1,Tom,18,CA,6,No04,23.3,1)
  • (1,Tom,18,CA,1,No01,18.22,1)
  • (2,John,20,FL,7,No08,19.99,2)
  • (3,Jack,22,OH,4,No04,4.2,3)
  • (5,Rose,21,FL,3,No02,5.12,5)
  • (6,Lee,21,CO,9,No11,1.99,6)
  • (6,Lee,21,CO,8,No05,13.0,6)
  • (6,Lee,21,CO,5,No06,2.22,6)
  • (7,Tim,21,CA,10,No9,12.0,7)
  • (8,Mick,21,FL,11,No12,20.9,8)
  • (9,Lily,21,OH,,,,)
  • (10,lucy,21,PA,,,,)
  • (11,Ted,21,CO,,,,)
  • (12,James,21,CA,,,,)
  1. 右外连接
  • grunt> r4 = join customers by id right, orders by cid;
  • grunt> dump r4;
  • ...
  • (1,Tom,18,CA,6,No04,23.3,1)
  • (1,Tom,18,CA,1,No01,18.22,1)
  • (2,John,20,FL,7,No08,19.99,2)
  • (3,Jack,22,OH,4,No04,4.2,3)
  • (,,,,2,No03,4.1,4)
  • (5,Rose,21,FL,3,No02,5.12,5)
  • (6,Lee,21,CO,9,No11,1.99,6)
  • (6,Lee,21,CO,8,No05,13.0,6)
  • (6,Lee,21,CO,5,No06,2.22,6)
  • (7,Tim,21,CA,10,No9,12.0,7)
  • (8,Mick,21,FL,11,No12,20.9,8)
  1. 全外连接
  • grunt> r5 = join customers by id full, orders by cid;
  • grunt> dump r5;
  • ...
  • (1,Tom,18,CA,6,No04,23.3,1)
  • (1,Tom,18,CA,1,No01,18.22,1)
  • (2,John,20,FL,7,No08,19.99,2)
  • (3,Jack,22,OH,4,No04,4.2,3)
  • (,,,,2,No03,4.1,4)
  • (5,Rose,21,FL,3,No02,5.12,5)
  • (6,Lee,21,CO,9,No11,1.99,6)
  • (6,Lee,21,CO,8,No05,13.0,6)
  • (6,Lee,21,CO,5,No06,2.22,6)
  • (7,Tim,21,CA,10,No9,12.0,7)
  • (8,Mick,21,FL,11,No12,20.9,8)
  • (9,Lily,21,OH,,,,)
  • (10,lucy,21,PA,,,,)
  • (11,Ted,21,CO,,,,)
  • (12,James,21,CA,,,,)
  1. 使用多个key进行连接
  • grunt> r6 = join customers by (id,name), orders by (cid,name);
  • grunt> dump r6;
  • ...
  1. 笛卡尔积
  • grunt> r7 = cross customers, orders;
  • grunt> dump r7;
  • ...
  1. Union操作,与笛卡尔积类似,但是需要两张表的字段个数和类型都一致。
  • grunt> r8 = union customers, orders;
  • 2017-07-16 00:34:41,066 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1051: Cannot cast to bytearray
  • Details at logfile: /home/ubuntu/pig_1500188876717.log
  • ...

3. Order操作

  1. Split操作,将表切分为成多个关系
  • grunt> split customers into c1 if addr == 'CA', c2 if addr == 'FL';
  • grunt> dump c1;
  • ...
  • (1,Tom,18,CA)
  • (7,Tim,21,CA)
  • (12,James,21,CA)
  • grunt> dump c2;
  • ...
  • (2,John,20,FL)
  • (5,Rose,21,FL)
  • (8,Mick,21,FL)
  1. Fliter操作,类似于Where
  • grunt> r1 = filter customers by (addr == 'OH');
  • grunt> dump r1;
  • ...
  • (3,Jack,22,OH)
  • (9,Lily,21,OH)
  1. Distinct去重操作
  • grunt> r2 = distinct customers;
  • grunt> dump r2;
  • ...
  • (1,Tom,18,CA)
  • (2,John,20,FL)
  • (3,Jack,22,OH)
  • (5,Rose,21,FL)
  • (6,Lee,21,CO)
  • (7,Tim,21,CA)
  • (8,Mick,21,FL)
  • (9,Lily,21,OH)
  • (10,lucy,21,PA)
  • (11,Ted,21,CO)
  • (12,James,21,CA)
  1. Foreach操作,类似于投影查询
  • grunt> r3 = foreach customers generate id,name;
  • grunt> dump r3;
  • ...
  • (1,Tom)
  • (2,John)
  • (3,Jack)
  • (5,Rose)
  • (6,Lee)
  • (7,Tim)
  • (8,Mick)
  • (9,Lily)
  • (10,lucy)
  • (11,Ted)
  • (12,James)
  1. Limit分页查询
  • grunt> r4 = limit customers 4;
  • grunt> dump r4;
  • ...
  • (1,Tom,18,CA)
  • (2,John,20,FL)
  • (3,Jack,22,OH)
  • (5,Rose,21,FL)
  1. 排序
  • grunt> r6 = order customers by age;
  • grunt> dump r6;
  • ...
  • (1,Tom,18,CA)
  • (2,John,20,FL)
  • (12,James,21,CA)
  • (11,Ted,21,CO)
  • (10,lucy,21,PA)
  • (9,Lily,21,OH)
  • (8,Mick,21,FL)
  • (7,Tim,21,CA)
  • (6,Lee,21,CO)
  • (5,Rose,21,FL)
  • (3,Jack,22,OH)
  1. 嵌套查询
  • grunt> r7 = order (limit (distinct (order customers by addr desc)) 4) by id desc;
  • grunt> dump r7;
  • ...
  • (5,Rose,21,FL)
  • (3,Jack,22,OH)
  • (2,John,20,FL)
  • (1,Tom,18,CA)

4. 求值函数

  1. 平均值
  • grunt> g1 = group customers all;
  • grunt> r1 = foreach g1 generate (customers.id,customers.name), AVG(customers.age);
  • grunt> dump r1;
  • (({(12),(11),(10),(9),(8),(7),(6),(5),(3),(2),(1)},{(James),(Ted),(lucy),(Lily),(Mick),(Tim),(Lee),(Rose),(Jack),(John),(Tom)}),20.727272727272727)
  1. 最大值和最小值
  • grunt> r2 = foreach g1 generate MAX(customers.age);
  • grunt> dump r2;
  • ...
  • (22)
  • ...
  • grunt> r3 = foreach g1 generate MIN(customers.age);
  • grunt> dump r3;
  • ...
  • (18)
  1. 求和
  • grunt> r4 = foreach g1 generate SUM(customers.age);
  • grunt> dump r4;
  • ...
  • (228)

5. 聚集函数

  1. COUNT和COUNT_STAR
  • grunt> r5 = foreach g1 generate COUNT(customers.age);
  • grunt> dump r5;
  • ...
  • (11)

注:使用COUNT是不会包含空值的,如果要包含空值则使用COUNT_STAR。

  1. IsEmpty,判断包或Map是否为空

首先我们准备两份数据如下:

sales.data:

  • 1,Robin,22,25000,sales
  • 2,BOB,23,30000,sales
  • 3,Maya,23,25000,sales
  • 4,Sara,25,40000,sales
  • 5,David,23,45000,sales
  • 6,Maggy,22,35000,sales

bonus.data:

  • 1,Robin,22,25000,sales
  • 2,Jaya,23,20000,admin
  • 3,Maya,23,25000,sales
  • 4,Alia,25,50000,admin
  • 5,David,23,45000,sales
  • 6,Omar,30,30000,admin

将两份数据添加到Pig的Local环境中:

  • grunt> sales = LOAD '/home/ubuntu/sales.data' USING PigStorage(',') as (sno:int, name:chararray, age:int, salary:int, dept:chararray);
  • grunt> bonus = LOAD '/home/ubuntu/bonus.data' USING PigStorage(',') as (sno:int, name:chararray, age:int, salary:int, dept:chararray);

然后进行协分组操作:

  • grunt> g1 = cogroup sales by sno, bonus by age;
  • grunt> dump g1;
  • ...
  • (1,{(1,Robin,22,25000,sales)},{})
  • (2,{(2,BOB,23,30000,sales)},{})
  • (3,{(3,Maya,23,25000,sales)},{})
  • (4,{(4,Sara,25,40000,sales)},{})
  • (5,{(5,David,23,45000,sales)},{})
  • (6,{(6,Maggy,22,35000,sales)},{})
  • (22,{},{(1,Robin,22,25000,sales)})
  • (23,{},{(5,David,23,45000,sales),(3,Maya,23,25000,sales),(2,Jaya,23,20000,admin)})
  • (25,{},{(4,Alia,25,50000,admin)})
  • (30,{},{(6,Omar,30,30000,admin)})

然后进行判断是否为空操作:

  • grunt> empty1 = filter g1 by IsEmpty(sales);
  • grunt> dump empty1;
  • ...
  • (22,{},{(1,Robin,22,25000,sales)})
  • (23,{},{(5,David,23,45000,sales),(3,Maya,23,25000,sales),(2,Jaya,23,20000,admin)})
  • (25,{},{(4,Alia,25,50000,admin)})
  • (30,{},{(6,Omar,30,30000,admin)})
  • grunt> empty2 = filter g1 by IsEmpty(bonus);
  • grunt> empty2
  • ...
  • (1,{(1,Robin,22,25000,sales)},{})
  • (2,{(2,BOB,23,30000,sales)},{})
  • (3,{(3,Maya,23,25000,sales)},{})
  • (4,{(4,Sara,25,40000,sales)},{})
  • (5,{(5,David,23,45000,sales)},{})
  • (6,{(6,Maggy,22,35000,sales)},{})
  • ...
  1. SIZE,计算某个字段值的长度
  • grunt> s1 = foreach customers generate name , SIZE(name);
  • grunt> dump s1;
  • ...
  • (Tom,3)
  • (John,4)
  • (Jack,4)
  • (Rose,4)
  • (Lee,3)
  • (Tim,3)
  • (Mick,4)
  • (Lily,4)
  • (lucy,4)
  • (Ted,3)
  • (James,5)
  1. TOBAG,进行Bag包装
  • grunt> b1 = foreach orders generate TOBAG(id,cid);
  • grunt> dump b1;
  • ...
  • ({(1),(1)})
  • ({(2),(4)})
  • ({(3),(5)})
  • ({(4),(3)})
  • ({(5),(6)})
  • ({(6),(1)})
  • ({(7),(2)})
  • ({(8),(6)})
  • ({(9),(6)})
  • ({(10),(7)})
  • ({(11),(8)})
  1. TOP操作,取出每组前n条数据
  • grunt> g2 = group orders by cid ;
  • grunt> t1 = FOREACH g2 {top = TOP(1, 0, orders); GENERATE top;}
  • grunt> dump t1;
  • ...
  • ({(6,No04,23.3,1)})
  • ({(7,No08,19.99,2)})
  • ({(4,No04,4.2,3)})
  • ({(2,No03,4.1,4)})
  • ({(3,No02,5.12,5)})
  • ({(9,No11,1.99,6)})
  • ({(10,No9,12.0,7)})
  • ({(11,No12,20.9,8)})
  1. TOTUPLE,进行Tuple包装
  • grunt> t2 = foreach orders generate TOTUPLE(id,price);
  • grunt> dump t2;
  • ...
  • ((1,18.22))
  • ((2,4.1))
  • ((3,5.12))
  • ((4,4.2))
  • ((5,2.22))
  • ((6,23.3))
  • ((7,19.99))
  • ((8,13.0))
  • ((9,1.99))
  • ((10,12.0))
  • ((11,20.9))
  1. TOMAP,进行Map包装;key必须是chararray类型,value可以是任何类型
  • grunt> t3 = foreach orders generate TOMAP(name,price);
  • grunt> dump t3;
  • ...
  • ([No01#18.22])
  • ([No03#4.1])
  • ([No02#5.12])
  • ([No04#4.2])
  • ([No06#2.22])
  • ([No04#23.3])
  • ([No08#19.99])
  • ([No05#13.0])
  • ([No11#1.99])
  • ([No9#12.0])
  • ([No12#20.9])