大数据开发之常用命令大全
Linux(vi/vim)
一般模式
编辑模式
指令模式
压缩和解压 gzip/gunzip 压缩
(1)只能压缩文件不能压缩目录
(2)不保留原来的文件
gzip 压缩:gzip hello.txt
gunzip 解压缩文件:gunzip hello.txt.gz
zip/unzip 压缩可以压缩目录且保留源文件
zip 压缩(压缩 1.txt 和 2.txt,压缩后的名称为 mypackage.zip):zip hello.zip hello.txt world.txt
unzip 解压:unzip hello.zip
unzip 解压到指定目录:unzip hello.zip -d /opt
tar 打包 tar 压缩多个文件:tar -zcvf hello.txt world.txt
tar 压缩目录:tar -zcvf hello.tar.gz opt/
tar 解压到当前目录:tar -zxvf hello.tar.gz
tar 解压到指定目录:tar -zxvf hello.tar.gz -C /opt
RPMRPM 查询命令:rpm -qa |grep firefox
RPM 卸载命令:
rpm -e xxxxxx
rpm -e --nodeps xxxxxx(不检查依赖)
RPM 安装命令:
rpm -ivh xxxxxx.rpm
rpm -ivh --nodeps fxxxxxx.rpm(--nodeps,不检测依赖进度)
Shell 输入/输出重定向
脚本编辑
Hadoop 启动类命令
hadoop fs/hdfs dfs 命令
yarn 命令
Zookeeper 启动命令
基本操作
四字母命令
Kafka「注:」 这里机器我只写一个。命令你们也可使用 ./bin/xx.sh (如:./bin/kafka-topics.sh)
查看当前服务器中的所有 topickafka-topics --zookeeper xxxxxx:2181 --list --exclude-internal
说明:
exclude-internal:排除 kafka 内部 topic
比如: --exclude-internal --topic "test_.*"
创建 topickafka-topics --zookeeper xxxxxx:2181 --create--replication-factor--partitions 1--topic topic_name
说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数删除 topic「注意:」 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除
kafka-topics --zookeeper xxxxxx:2181 --delete --topic topic_name 生产者 kafka-console-producer --broker-list xxxxxx:9092 --topic topic_name
可加:--property parse.key=true(有 key 消息)消费者 kafka-console-consumer --bootstrap-server xxxxxx:9092 --topic topic_name
注:可选
--from-beginning:会把主题中以往所有的数据都读取出来
--whitelist '.*' :消费所有的 topic
--property print.key=true:显示 key 进行消费
--partition 0:指定分区消费
--offset:指定起始偏移量消费查看某个 Topic 的详情 kafka-topics --zookeeper xxxxxx:2181 --describe --topic topic_name 修改分区数 kafka-topics --zookeeper xxxxxx:2181 --alter --topic topic_name --partitions 6 查看某个消费者组信息 kafka-consumer-groups --bootstrap-server xxxxxx:9092 --describe --group group_name 删除消费者组 kafka-consumer-groups --bootstrap-server xxxxxx:9092 ---delete --group group_name 重置 offsetkafka-consumer-groups --bootstrap-server xxxxxx:9092 --group group_name
--reset-offsets --all-topics --to-latest --executeleader 重新选举指定 Topic 指定分区用重新 PREFERRED:优先副本策略 进行 Leader 重选举
kafka-leader-election --bootstrap-server xxxxxx:9092--topic topic_name --election-type PREFERRED --partition 0 所有 Topic 所有分区用重新 PREFERRED:优先副本策略 进行 Leader 重选举
kafka-leader-election --bootstrap-server xxxxxx:9092--election-type preferred --all-topic-partitions 查询 kafka 版本信息 kafka-configs --bootstrap-server xxxxxx:9092--describe --version
增删改配置
topic 添加/修改动态配置
kafka-configs --bootstrap-server xxxxxx:9092--alter --entity-type topics --entity-name topic_name--add-config file.delete.delay.ms=222222,retention.ms=999999
topic 删除动态配置
kafka-configs --bootstrap-server xxxxxx:9092--alter --entity-type topics --entity-name topic_name--delete-config file.delete.delay.ms,retention.ms 持续批量拉取消息单次最大消费 10 条消息(不加参数意为持续消费)
kafka-verifiable-consumer --bootstrap-server xxxxxx:9092--group group_name--topic topic_name --max-messages 10 删除指定分区的消息删除指定 topic 的某个分区的消息删除至 offset 为 1024
json 文件 offset-json-file.json
{"partitions": [{"topic": "topic_name","partition": 0,"offset": 1024}],"version": 1}kafka-delete-records --bootstrap-server xxxxxx:9092--offset-json-file offset-json-file.json 查看 Broker 磁盘信息查询指定 topic 磁盘信息
kafka-log-dirs --bootstrap-server xxxxxx:9090--describe --topic-list topic1,topic2
查询指定 Broker 磁盘信息
kafka-log-dirs --bootstrap-server xxxxxx:9090--describe --topic-list topic1 --broker-list 0
Hive
启动类
hive 启动元数据服务(metastore 和 hiveserver2)和优雅关闭脚本
启动:hive.sh start 关闭:hive.sh stop 重启:hive.sh restart 状态:hive.sh status
脚本如下
#!/bin/bashHIVE_LOG_DIR=$HIVE_HOME/logs
mkdir -p $HIVE_LOG_DIR
#检查进程是否运行正常,参数 1 为进程名,参数 2 为进程端口 function check_process(){pid=1 | awk '{print 2}')ppid=(netstat -nltp 2>/dev/null | grep 2 | awk '{print 7}' | cut -d '/' -f 1)echo pid" =~ "ppid" ]] && [ "ppid" ] && return 0 || return 1}
function hive_start(){metapid=HIVE_LOG_DIR/metastore.log 2>&1 &"cmd=cmd" sleep4; hdfs dfsadmin -safemode wait >/dev/null 2>&1"[ -z "metapid" ] && eval (check_process HiveServer2 10000)cmd="nohup hive --service hiveserver2 >HIVE_LOG_DIR/hiveServer2.log 2>&1 &"[ -z "server2pid" ] && eval $cmd || echo "HiveServer2 服务已启动"}
function hive_stop(){metapid=metapid" ] && kill (check_process HiveServer2 10000)[ "server2pid" ] && kill server2pid || echo "HiveServer2 服务未启动"}
case 1 in"start")hive_start;;"stop")hive_stop;;"restart")hive_stopsleep 2hive_start;;"status")check_process HiveMetastore 9083 >/dev/null && echo "Metastore服务运行正常" || echo "Metastore服务运行异常"check_process HiveServer2 10000 >/dev/null && echo "HiveServer2服务运行正常" || echo "HiveServer2服务运行异常";;*)echo Invalid Args!echo 'Usage: '(basename $0)' start|stop|restart|status';;esac
常用交互命令
SQL 类(特殊的)
内置函数
(1) NVL
给值为 NULL 的数据赋值,它的格式是 NVL( value,default_value)。它的功大数据培训能是如果 value 为 NULL,则 NVL 函数返回 default_value 的值,否则返回 value 的值,如果两个参数都为 NULL ,则返回 NULL
select nvl(column, 0) from xxx;
(2)行转列
(3)列转行(一列转多行)
「Split(str, separator):」 将字符串按照后面的分隔符切割,转换成字符 array。
「EXPLODE(col):」将 hive 一列中复杂的 array 或者 map 结构拆分成多行。
「LATERAL VIEW」
用法:
LATERAL VIEW udtf(expression) tableAlias AS columnAlias 解释:lateral view 用于和 split, explode 等 UDTF 一起使用,它能够将一行数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
lateral view 首先为原始表的每行调用 UDTF,UDTF 会把一行拆分成一或者多行,lateral view 再把结果组合,产生一个支持别名表的虚拟表。
「准备数据源测试」
「SQL」
SELECT movie,category_nameFROM movie_infolateral VIEWexplode(split(category,",")) movie_info_tmp AS category_name ;
「测试结果」
《功勋》 记录《功勋》 剧情《战狼 2》 战争《战狼 2》 动作《战狼 2》 灾难
窗口函数(1)OVER()
定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化。
(2)CURRENT ROW(当前行)
n PRECEDING:往前 n 行数据
n FOLLOWING:往后 n 行数据(3)UNBOUNDED(无边界)
UNBOUNDED PRECEDING 前无边界,表示从前面的起点
UNBOUNDED FOLLOWING 后无边界,表示到后面的终点「SQL 案例:由起点到当前行的聚合」
selectsum(money) over(partition by user_id order by pay_time rows between UNBOUNDED PRECEDING and current row)from or_order;「SQL 案例:当前行和前面一行做聚合」
selectsum(money) over(partition by user_id order by pay_time rows between 1 PRECEDING and current row)from or_order;「SQL 案例:当前行和前面一行和后一行做聚合」
selectsum(money) over(partition by user_id order by pay_time rows between 1 PRECEDING AND 1 FOLLOWING )from or_order;「SQL 案例:当前行及后面所有行」
selectsum(money) over(partition by user_id order by pay_time rows between current row and UNBOUNDED FOLLOWING )from or_order;
(4)LAG(col,n,default_val)
往前第 n 行数据,没有的话 default_val
(5)LEAD(col,n, default_val)
往后第 n 行数据,没有的话 default_val
「SQL 案例:查询用户购买明细以及上次的购买时间和下次购买时间」
selectuser_id,,pay_time,money,
lag(pay_time,1,'1970-01-01') over(PARTITION by name order by pay_time) prev_time,
lead(pay_time,1,'1970-01-01') over(PARTITION by name order by pay_time) next_timefrom or_order;
(6)FIRST_VALUE(col,true/false)
当前窗口下的第一个值,第二个参数为 true,跳过空值。
(7)LAST_VALUE (col,true/false)
当前窗口下的最后一个值,第二个参数为 true,跳过空值。
「SQL 案例:查询用户每个月第一次的购买时间 和 每个月的最后一次购买时间」
selectFIRST_VALUE(pay_time)over(partition by user_id,month(pay_time) order by pay_timerows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) first_time,
LAST_VALUE(pay_time)over(partition by user_id,month(pay_time) order by pay_time rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) last_timefrom or_order;
(8)NTILE(n)
把有序窗口的行分发到指定数据的组中,各个组有编号,编号从 1 开始,对于每一行,NTILE 返回此行所属的组的编号。(用于将分组数据按照顺序切分成 n 片,返回当前切片值)
「SQL 案例:查询前 25%时间的订单信息」
select * from (select User_id,pay_time,money,
) twhere sorted = 1;
4 个 By(1)Order By
全局排序,只有一个 Reducer。
(2)Sort By
分区内有序。
(3)Distrbute By
类似 MR 中 Partition,进行分区,结合 sort by 使用。
(4) Cluster By
当 Distribute by 和 Sorts by 字段相同时,可以使用 Cluster by 方式。Cluster by 除了具有 Distribute by 的功能外还兼具 Sort by 的功能。但是排序只能是升序排序,不能指定排序规则为 ASC 或者 DESC。
在生产环境中 Order By 用的比较少,容易导致 OOM。
在生产环境中 Sort By+ Distrbute By 用的多。
排序函数
(1)RANK()
排序相同时会重复,总数不会变
11335
(2)DENSE_RANK()
排序相同时会重复,总数会减少
11223(3)ROW_NUMBER()
会根据顺序计算
12345 日期函数 datediff:返回结束日期减去开始日期的天数
datediff(string enddate, string startdate)
select datediff('2021-11-20','2021-11-22')date_add:返回开始日期 startdate 增加 days 天后的日期
date_add(string startdate, int days)
select date_add('2021-11-20',3)date_sub:返回开始日期 startdate 减少 days 天后的日期
date_sub (string startdate, int days)
select date_sub('2021-11-22',3)
Redis 启动类
key
String
List
Set
Hash
zset(Sorted set)
Flink
启动
./start-cluster.sh
run
./bin/flink run [OPTIONS]
./bin/flink run -m yarn-cluster -c com.wang.flink.WordCount /opt/app/WordCount.jar
info
./bin/flink info [OPTIONS]
list
./bin/flink list [OPTIONS]
stop
./bin/flink stop [OPTIONS] <Job ID>
cancel(弱化)
./bin/flink cancel [OPTIONS] <Job ID>
savepoint
./bin/flink savepoint [OPTIONS] <Job ID>
原创作者:王了个博
评论