doclist 阅读(21) 评论(0)

hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。

Metastore (hive元数据)
Hive将元数据存储在数据库中,比如mysql ,derby.Hive中的元数据包括表的名称,表的列和分区及其属性,表的数据所在的目录

Hive数据存储在HDFS,大部分的查询、计算由mapreduce完成
Hive数据仓库于数据库的异同

 (1)由于Hive采用了SQL的查询语言HQL,因此很容易将Hive理解为数据库。其实从结构上来看,Hive和数据库除了拥有类似的查询语言,
 再无类似之处。
(2)数据存储位置。  hdfs   raw local fs
(3)数据格式。 分隔符
(4)数据更新。hive读多写少。Hive中不支持对数据的改写和添加,所有的数据都是在加载的时候中确定好的。
INSERT INTO … VALUES添加数据,使用UPDATE … SET修改数据  不支持的
HDFS 一次写入多次读取
(5) 执行。hive通过MapReduce来实现的  而数据库通常有自己的执行引擎。
(6)执行延迟。由于没有索引,需要扫描整个表,因此延迟较高。另外一个导致Hive执行延迟高的因素是MapReduce框架
(7)可扩展性
(8)数据规模。
hive几种基本表类型:内部表、外部表、分区表、桶表
内部表(管理表)和外部表的区别:

创建表
外部表创建表的时候,不会移动数到数据仓库目录中(/user/hive/warehouse),只会记录表数据存放的路径
内部表会把数据复制或剪切到表的目录下
删除表
外部表在删除表的时候只会删除表的元数据信息不会删除表数据
内部表删除时会将元数据信息和表数据同时删除
表类型一、管理表或内部表Table Type: MANAGED_TABLE

create table if not exists dept(
deptno int,
deptname string,
address string
)
row  format delimited fields terminated by '\t';

//加载HDFS文件到Hive表中
load data inpath '/input/dept.txt' into table dept;

//用来指定原文件的列分隔符
row  format delimited fields terminated by '\t';
load 如果操作的HDFS上的文件,代表着会移动或者剪切文件

desc formatted dept;  //描述表结构信息
Location:               hdfs://bigdata/user/hive/warehouse/db01.db/dept  
Table Type:             MANAGED_TABLE 

 


表类型二、外部表

create  external table emp(
empno int,
empname string,
empjob string,
mgno int,
birthday string,
salary float,
bonus float,
depno int
)
row  format delimited fields terminated by '\t'
location '/input/demo';

//描述表结构
desc formatted emp;
Location:               hdfs://bigdata/input/demo    
Table Type:             EXTERNAL_TABLE

删除内部表
drop table dept;

删除外部表
drop table emp;

清空表数据
truncate table student;

 


表类型三、分区表

分区表创建表的时候需要指定分区字段,分区字段与普通字段的区别:分区字段会在HDFS表目录下生成一个分区字段名称的目录,而普通字段则不会,查询的时候可以当成普通字段来使用,一般不直接和业务直接相关。

create  table emp_part(
 empno int,
 empname string,
 empjob string,
 mgrno int,
 birthday string,
 salary float,
 bonus float,
 deptno   int
 )
 partitioned by (province string)
row format delimited fields terminated by '\t';

//向分区表加载数据
load data local inpath '/home/user01/emp.txt' into table emp_part partition (province='CHICAGO');

//描述表信息
desc formatted  emp_part;

//查询全表数据
select * from emp_part;

//查询分区字段表数据
select * from emp_part where province='CHICAGO';

//查看分区信息
show partitions emp_part;

//增加分区
aler table emp_part add [if not exist] partition(provine='zhejiang',city='hangzhou')

//删除分区
aler table emp_part drop [if  exist] partition(provine='zhejiang',city='hangzhou')

 


外部分区表

create external  table  dept_part(
deptno int,
deptname string,
address string
)
partitioned by (province string)
row  format delimited fields terminated by '\t'
location '/input/demo';

//手动增加分区字段及外部目录:
alter table dept_part add partition (province='BOSTON') location '/input/demo/BOSTON';

//手动增加分区字段(自动生成分区目录)
alter table dept_part add partition (province='NEW YORK');


表类型四:桶表

将内部表,外部表和分区表进一步组织成桶表
可以将表的列通过Hash算法进一步分解成不同的文件存储

create table test_bucket_table(
id int,
name string
)
clustered by (id) into 5 bucket;


创建表的方式
方式一 create + load

create [external] table table_name(
    col1_name col1_type,
    ...
    coln_name coln_type
    )
    row  format delimited fields terminated  by '\t';

   //load加载数据
    laod data [local] inpth '本地文件(linux)/HDFS' [overwrite] into table  table_name;

方式二 like + load

//复制表结构
    create table tableB like tableA;    //首先必须要有tableA

//load加载数据
    laod data [local] inpth '本地文件(linux)/HDFS' [overwrite] into table  table_name;

 


方式三 as 创建表的同时加载数据

create table emp_insert(
id int,
name string,
job string,
salary float
)
row format  delimited fields terminated by ',';

//insert into 加载数据
insert into table emp_insert select empno,empname,empjob,salary from emp_part1 where day='20170308' and  hour='14';

方式四 create + insert

//创建表

create table emp_insert(
id int,
name string,
job string,
salary float
)
row format  delimited fields terminated by ',';

//insert into 加载数据
insert into table emp_insert select empno,empname,empjob,salary from emp_part1 where day='20170308' and  hour='14';


加载数据的方式
加载方式一

//加载本地文件到Hive表   --使用存储介质(移动硬盘)
laod data local inpth '本地文件(linux)' [overwrite] into table  table_name;

加载方式二

//创建表时通过select查询语句加载数据
 create table tableB row format delimited filelds termianted by ','  as select * from tableA;  

 


加载方式三

//创建表时通过select查询语句加载数据
 create table tableB row format delimited filelds termianted by ','  as select * from tableA;  


加载方式四

//创建表时通过select查询语句加载数据
 create table tableB row format delimited filelds termianted by ','  as select * from tableA;  


加载方式五

//先创建表,通过insert into table table_namea  select  *  fom tableB


加载方式六

<property>
  <name>hive.fetch.task.conversion</name>
  <value>more</value>
  <description>
    Some select queries can be converted to single FETCH task
    minimizing latency.Currently the query should be single
    sourced not having any subquery and should not have
    any aggregations or distincts (which incurrs RS),
    lateral views and joins.
    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
    2. more    : SELECT, FILTER, LIMIT only (+TABLESAMPLE, virtual columns)
  </description>
</property>



几种导出数据的方式

    1.insert overwrite ... 导出到本地目录
     insert overwrite local directory '/home/user01/export' row format delimited fields terminated by ' ' select * from emp_part1;

    2.insert overwrite ... 导出到HDFS之上
    insert overwrite directory '/export' select * from emp_part1 where day='20170308';

    3.hive -e 'HQL query'  >> test
    bin/hive -e 'select * from db01.student' >> test.txt

    4)sqoop


Hive 自定义函数函数

UDF 一进一出 处理原文件内容某些字段包含 [] “”
UDAF 多进一出 sum() avg() max() min()
UDTF 一进多出 ip -> 国家 省 市
Hive4种排序

    order by //可以指定desc 降序 asc 升序
    order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个Reducer,会导致当输入规模较大时,消耗较长的计算时间。

    sort by 【对分区内的数据进行排序】
    sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。sort by不同于order by,它不受Hive.mapred.mode属性的影响,sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。

    distribute by 【对map输出进行分区】
    distribute by是控制在map端如何拆分数据给reduce端的。hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用。

    cluster by
    cluster by除了具有distribute by的功能外还兼具sort by的功能。当distribute by和sort by 是同一个字段的时候可以使用cluster by替代。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。

三种分组的区别

    row_number:不管col2字段的值是否相等,行号一直递增,比如:有两条记录的值相等,但一个是第一,一个是第二
    rank:上下两条记录的col2相等时,记录的行号是一样的,但下一个col2值的行号递增N(N是重复的次数),比如:有两条并列第一,下一个是第三,没有第二
    dense_rank:上下两条记录的col2相等时,下一个col2值的行号递增1,比如:有两条并列第一,下一个是第二

Hive优化

    1.fetch task任务不走MapReduce,可以在hive配置文件中设置最大化和最小化fetch task任务;通常在使用hiveserver2时调整为more;

设置参数的优先级:在命令行或者代码设置参数 > hive-site.xml>hive-default.xml
 set hive.fetch.task.conversion=more;   //单次交互模式下有效,
 bin/hive --hiveconf hive.fetch.task.conversion=more

上面的两种方法都可以开启了Fetch任务,但是都是临时起作用的;如果你想一直启用这个功能,可以在${HIVE_HOME}/conf/hive-site.xml里面加入以下配置:

<property>
  <name>hive.fetch.task.conversion</name>
  <value>more</value>
  <description>
    Some select queries can be converted to single FETCH task
    minimizing latency.Currently the query should be single
    sourced not having any subquery and should not have
    any aggregations or distincts (which incurrs RS),
    lateral views and joins.
    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
    2. more    : SELECT, FILTER, LIMIT only (+TABLESAMPLE, virtual columns)
  </description>
</property>

 
    2.strict mode:严格模式设置,严格模式下将会限制一些查询操作
    文件格式,ORC PARQUET 等
    分区表
    select 查询不加where过滤条件,不会执行

开启严格模式
hive提供的严格模式,禁止3种情况下的查询模式。
a:当表为分区表时,where字句后没有分区字段和限制时,不允许执行。
b:当使用order by语句时,必须使用limit字段,因为order by 只会产生一个reduce任务。
c:限制笛卡尔积的查询。sql语句不加where不会执行

<property>
  <name>hive.mapred.mode</name>
  <value>nonstrict</value>
  <description>The mode in which the Hive operations are being performed.
     In strict mode, some risky queries are not allowed to run. They include:
       Cartesian Product.
       No partition being picked up for a query.
       Comparing bigints and strings.
       Comparing bigints and doubles.
       Orderby without limit.
  </description>
</property>

 


    3.优化sql语句,如先过滤再join,先分组再做distinct;

Select count(*) cnt
From store_sales ss
     join household_demographics hd on (ss.ss_hdemo_sk = hd.hd_demo_sk)
     join time_dim t on (ss.ss_sold_time_sk = t.t_time_sk)
     join store s on (s.s_store_sk = ss.ss_store_sk)
Where
     t.t_hour = 8
     t.t_minute >= 30
     hd.hd_dep_count = 2
order by cnt;

 


    4.MapReduce过程的map、shuffle、reduce端的snappy压缩

    需要先替换hadoop的native本地包开启压缩
    在mapred-site.xml文件设置启用压缩及压缩编码
    在执行SQL执行时设置启用压缩和指定压缩编码

set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org apache.hadoop.io.compress.SnappyCodec;


    5.大表拆分成子表,提取中间结果集,减少每次加载数据
    多维度分析,多个分析模块
    每个分析模块涉及字段不一样,而且并不是表的全部字段

    6.分区表及外部表
    设计二级分区表(一级字段为天,二级字段设置小时)
    创建的的是外部表,创建表时直接指定数据所在目录即可,不用再用load加载数据

    7.设置map和reduce个数:默认情况下一个块对应一个map任务,map数据我们一般不去调整,reduce个数根据reduce处理的数据量大小进行适当调整体现“分而治之”的思想

hive-site.xml
hive.mapred.reduce.tasks.speculative.execution=true;
<property>
  <name>hive.mapred.reduce.tasks.speculative.execution</name>
  <value>true</value>
  <description>Whether speculative execution for reducers should be turned on. </description>
</property>

 


    8.JVM重用:一个job可能有多个map reduce任务,每个任务会开启一个JVM虚拟机,默认情况下一个任务对应一个JVM,任务运行完JVM即销毁,我们可以设置JVM重用参数,一般不超过5个,这样一个JVM内可以连续运行多个任务

JVM重用是Hadoop调优参数的内容,对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。hadoop默认配置是使用派生JVM来执行map和reduce任务的,这是jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。
JVM重用可以使得JVM实例在同一个JOB中重新使用N次,N的值可以在Hadoop的mapre-site.xml文件中进行设置(建议参考5~10)
mapred.job.reuse.jvm.num.tasks(旧版)
mapreduce.job.jvm.numtasks(新版)
hadoop.apache.org/docs/r2.5.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
http://hadoop.apache.org/docs/r2.5.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml

也可在hive的执行设置:

hive-site.xml
hive.mapred.reduce.tasks.speculative.execution=true;
<property>
  <name>hive.mapred.reduce.tasks.speculative.execution</name>
  <value>true</value>
  <description>Whether speculative execution for reducers should be turned on. </description>
</property>

 
    9.推测执行:例如一个Job应用有10个MapReduce任务(map 及reduce),其中9个任务已经完成,那么application Master会在另外启动一个相同的任务来运行未完成的那个,最后哪个先运行完成就把另一个kill掉

    启用speculative最大的好处是,一个map执行的时候,系统会在其他空闲的服务器上启动相同的map来同时运行,哪个运行的快就使用哪个的结果,另一个运行慢的在有了结果之后就会被kill。

hive-site.xml
hive.mapred.reduce.tasks.speculative.execution=true;
<property>
  <name>hive.mapred.reduce.tasks.speculative.execution</name>
  <value>true</value>
  <description>Whether speculative execution for reducers should be turned on. </description>
</property>


数据倾斜

对于普通的join操作,会在map端根据key的hash值,shuffle到某一个reduce上去,在reduce端做join连接操作,内存中缓存join左边的表,遍历右边的表,依次做join操作。所以在做join操作时候,将数据量多的表放在join的右边。

当数据量比较大,并且key分布不均匀,大量的key都shuffle到一个reduce上了,就出现了数据的倾斜。

常见的数据倾斜出现在group by和join..on..语句中。

join(数据倾斜)
在进行两个表join的过程中,由于hive都是从左向右执行,要注意讲小表在前,大表在后(小表会先进行缓存)。

map/reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完,此称之为数据倾斜。hive在跑数据时经常会出现数据倾斜的情况,使的作业经常reduce完成在99%后一直卡住,最后的1%花了几个小时都没跑完,这种情况就很可能是数据倾斜的原因,

hive.groupby.skewindata=true; 

如果是group by过程出现倾斜应将此项设置true。
<property>
  <name>hive.groupby.skewindata</name>
  <value>false</value>
  <description>Whether there is skew in data to optimize group by queries</description>
</property>

hive.optimize.skewjoin.compiletime=true;

如果是join 过程中出现倾斜应将此项设置为true
不影响结果可以考虑过滤空值
<property>
  <name>hive.optimize.skewjoin.compiletime</name>
  <value>false</value>
</property> 

hive.optimize.skewjoin.compiletime=true; 如果是join过程出现倾斜应该设置为true
此时会将join语句转化为两个mapreduce任务,第一个会给jion字段加随机散列
set hive.skewjoin.key=100000; 这个是join的键对应的记录条数超过这个值则会进行优化。

可以在空值前面加随机散列
3种常见的join
Map-side Join

mapJoin的主要意思就是,当链接的两个表是一个比较小的表和一个特别大的表的时候,我们把比较小的table直接放到内存中去,然后再对比较大的表格进行map操作。join就发生在map操作的时候,每当扫描一个大的table中的数据,就要去去查看小表的数据,哪条与之相符,继而进行连接。这里的join并不会涉及reduce操作。map端join的优势就是在于没有shuffle,真好。在实际的应用中,我们这样设置:
***1. set hive.auto.convert.join=true;
这样设置,hive就会自动的识别比较小的表,继而用mapJoin来实现两个表的联合。看看下面的两个表格的连接。

<property>
  <name>hive.auto.convert.join.noconditionaltask.size</name>
  <value>10000000</value> The default is 10MB
 </property>


DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程

这里的第一句话就是运行本地的map join任务,继而转存文件到XXX.hashtable下面,在给这个文件里面上传一个文件进行map join,之后才运行了MR代码去运行计数任务。说白了,在本质上mapjoin根本就没有运行MR进程,仅仅是在内存就进行了两个表的联合。

mapjoin使用场景
1.关联操作中有一张表非常小
2.不等值的链接操作

自动执行

set hive.auto.convert.join=true;
hive.mapjoin.smalltable.filesize=25;默认值是25mb  
<property>
  <name>hive.mapjoin.smalltable.filesize</name>
  <value>25000000</value>
 </property>


手动执行 A为小表 如果A表超过25M,还想使用map join;
select /+mapjoin(A)/ f.a,f.b from A t join B f on(f.a==t.a)

hive入门学习:join的三种优化方式 - HAHA的专栏 - 博客频道 - CSDN.NET
http://blog.csdn.net/liyaohhh/article/details/50697519
Reduce-side Join

***hive join操作默认使用的就是reduce join
Reduce-side Join原理上要简单得多,它也不能保证相同key但分散在不同dataset中的数据能够进入同一个Mapper,整个数据集合的排序
在Mapper之后的shuffle过程中完成。相对于Map-side Join,它不需要每个Mapper都去读取所有的dataset,这是好处,但也有坏处,
即这样一来Mapper之后需要排序的数据集合会非常大,因此shuffle阶段的效率要低于Map-side Join。
***reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag)

semi join 小表对大表 是reudce join的变种 map阶段过滤掉不需要join的字段 相当于Hivw SQL加的where过滤
SMB Join(sort merge bucket)

SMB 存在的目的主要是为了解决大表与大表间的 Join 问题,分桶其实就是把大表化成了“小表”,然后 Map-Side Join 解决之,这是典型的分而治之的思想。

1 set hive.enforce.bucketing=true;
2 set hive.enforce.sorting=true;


表优化数据目标:相同数据尽量聚集在一起