写点什么

hive 数据导入:Python 脚本

  • 2022 年 7 月 21 日
  • 本文字数:3271 字

    阅读完需:约 11 分钟

1 前言

在对 hive 表进行数据导入时,针对有分区表插入数据时,总是使用外部临时表映射静态数据文件,然后再用查询的方式插入数据。

这样每次都需要进行创建临时表、上传 HDFS、执行插入 SQL 等一系列操作,这样步骤非常繁琐,也容易在环境中残留一些垃圾文件。

其实 HIVE 也是支持 SQL 语句插入数据的,也就是 insert 语句,当插入大量数据时,可以拼装 insert 语句进行插入。

想要快速使用并且轻量化,就要用到我们今天说的 Python 脚本了。

首先我们先说一下使用的大数据组件 Impala。

2 Impala

什么是 Impala

Impala 是 Cloudera 公司主导开发的新型查询系统,它提供 SQL 语义,能查询存储在 Hadoop 的 HDFS 和 HBase 中的 PB 级大数据。已有的 Hive 系统虽然也提供了 SQL 语义,但由于 Hive 底层执行使用的是 MapReduce 引擎,仍然是一个批处理过程,难以满足查询的交互性。相比之下,Impala 的最大特点也是最大卖点就是它的快速。

想要在 python 中安装 impala,直接 pip install impala 是不行的,需要按照一下步骤依次进行:

pip install sixpip install bit_arraypip install thriftpypip install thrift_saslpip install impyla
复制代码

测试,不报错则表示安装成功:

>>> from impala.dbapi import connect>>>
复制代码

3 Python 脚本

程序在运行时需要使用 impala,并且需要通过 csv 文件导入数据,所以需要引用一下包

import pandas as pdfrom impala.dbapi import connect
复制代码

首先要写一个连接 hive 并且可以执行 sql 的类

class Connect_Hive:    def __init__(self, host, port, auth_mechanism, user, password, database):        self.host = host        self.port = port        self.auth_mechanism = auth_mechanism        self.user = user        self.password = password        self.database = database
def __GetConnect(self): self.conn = connect(host=self.host, port=self.port, auth_mechanism=self.auth_mechanism, user=self.user, password=self.password, database=self.database) cur = self.conn.cursor() if not cur: raise (NameError, "连接数据库失败") else: return cur
#执行查询返回结果list def ExecQuery(self, sql): cur = self.__GetConnect() cur.execute(sql) resList = cur.fetchall()
# 查询完毕后必须关闭连接 self.conn.close() return resList
#无返回值的执行sql def ExecNonQuery(self, sql): cur = self.__GetConnect() cur.execute(sql) self.conn.commit() self.conn.close() #获取数据库列表 def get_databases_list(self): DATABASES_LIST = [] DATABASES = self.ExecQuery('SHOW DATABASES') for i in DATABASES: i = str(i).replace("('", "").replace("',)", "") DATABASES_LIST.append(i) default = [i for i, x in enumerate(DATABASES_LIST) if x == 'dataplatform_beluga'] return DATABASES_LIST, default[0] #获取所有表 def get_tables_list(self): TABLES_LIST = [] TABLES = self.ExecQuery('SHOW TABLES') for i in TABLES: i = str(i).replace("('", "").replace("',)", "") TABLES_LIST.append(i) return TABLES_LIST
复制代码

下面编写一个通过 csv 文件,把要导入的数据转成一个 sql

def insert_info_hive(input_file, table, host, port, auth_mechanism, user, password, database):    if input_file is not None and input_file.name.endswith(".csv"):        data = pd.read_csv(input_file)        resList = Connect_Hive(host, port, auth_mechanism, user, password, database).ExecQuery('DESCRIBE ' + table)        header_list = {}        # 封装查询结果        for value in resList:            if value[0] == '' or '#' in value[0]:                header_list = header_list            else:                header_list[value[0]] = value[1]        headers = header_list        headkeys = str(list(headers.keys())).replace('[', '(').replace(']', ')').replace("'", "")        data_headers = data.columns        Max_value = ''        for i in range(len(data)):            values = []            for head in headers:                if head in data_headers:                    if head == 'dt':                        value = int(data[head].loc[i])                    else:                        value = Type.type_conversion(headers[head], data[head].loc[i])                    values.append(value)                else:                    if head == 'dt':                        value = Type.type_conversion(headers[head], Time.func("$date", 0))                    else:                        if 'decimal' in headers[head]:                            co = list(headers[head].replace('decimal', '').replace('(', '').replace(')', '').split(','))                            d = int(co[1])                            getcontext().prec = d + 1                            value = Decimal(random.uniform(1, 10)) / 1                            value = float(str(value).replace("'Decimal('", "").replace("')", ""))                        else:                            value = Type.type_conversion(headers[head], random.randrange(1, 100, 1))                    values.append(value)            if i == len(data) - 1:                values = str(values).replace('[', '(').replace(']', ')')            else:                values = str(values).replace('[', '(').replace(']', '),')            Max_value = Max_value + values        SQL = f'''INSERT INTO TABLE {table} {headkeys} VALUES {Max_value}'''        return SQL, data
复制代码

最后传入文件,选择好要插入的表,执行 sql 语句即可

DATABASES_LIST = Connect_Hive(host, port, auth_mechanism, user, password, database).get_databases_list()[0]index = Connect_Hive(host, port, auth_mechanism, user, password, database).get_databases_list()[1]database = st.selectbox('DataBases List', DATABASES_LIST, index=index)TABLES_LIST = Connect_Hive(host, port, auth_mechanism, user, password, database).get_tables_list()table = st.selectbox('Tables List', TABLES_LIST)input_file = st.file_uploader("Upload a CSV File", type=['csv'], key="upload")    if input_file:        info = insert_info_hive(input_file, table, host, port, auth_mechanism, user, password, database)        DF = st.checkbox('Show dataframe', value=False)        if DF:            st.dataframe(info[1])            st.info(info[0])        Sub = st.button('提交', key='Sub_Button', disabled=False)        if Sub:            st.write('wait some times')            path = f'image/loading.gif'            st.image(get_pic_bytes(path))            Connect_Hive(host, port, auth_mechanism, user, password, database).ExecNonQuery(info[0])            st.success('提交成功')    else:        st.button('提交', key='Sub_Button', disabled=True)
复制代码



结束语

我是怀瑾握瑜,一只大数据萌新,上能 code 下能 teach 的全能奶爸,家有两只吞金兽,嘉与嘉

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~

发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2022.07.01 加入

还未添加个人简介

评论

发布
暂无评论
hive数据导入:Python脚本_Python_怀瑾握瑜的嘉与嘉_InfoQ写作社区