0%

Spark | PySpark

读取数据

1
2
3
4
5
6
7
8
9
10
11
12
## 从HDFS读取
from pyspark.sql import SparkSession, HiveContext, DataFrameWriter
import time


spark = SparkSession.builder.enableHiveSupport().appName('test').getOrCreate()
start = time.time()

## HDFS上载入parquet格式
input = '/aaa/bbb/ccc'
data = spark.read.parquet(input)
data.show(5) ## 预览前5行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql import SparkSession, HiveContext
import pandas as pd

spark = SparkSession.builder.enableHiveSupport().appName('test').getOrCreate()
hive_context = HiveContext(spark)
sql = '''
select start_date
, end_date
from db_nam.table_name
where pt = '2021-07-30'
and type = 'type1'
'''
df = hive_context.sql(sql)
df.show(5) ## 预览前5行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import pandas as pd 

spark = SparkSession \
.builder \
.appNmae('xxxx') \
.getOrCreate()

## 读取数据
sql = '''
select a
from b
'''

df_spark = spark.sql(sql)
df = df_spark.toPandas()

## 写入数据
df_spark_new = sqlContext.createDataFrame(df_res)
df_spark_new.coalesce(1)
df_spark_new.createOrReplaceTempView("tmp_table")

sql = '''
insert overwrite table ${DB_NAME}.app_xx_yy_zz partition (pt = '${bizdate}')
select a
, b
, c
from tmp_table
'''

spark.sql(sql)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pyhive import hive
import os
import pandas as pd

conn = hive.connect(
host=os.environ['PYHIVE_HOST'],
port=os.environ['PYHIVE_PORT'],
username=os.environ['JUPYTER_HADOOP_USER'],
password=os.environ['HADOOP_USER_PASSWORD'],
auth='LDAP',
configuration={
'mapreduce.job.queuename': os.environ['JUPYTER_HADOOP_QUEUE'],
'hive.resultset.use.unique.column.names': 'false'
}
)


sql = '''
select a
from table
where pt = '%s'
''' % (
'2022-03-20'
)

df = pd.read_sql_query(sql, conn)
df.to_csv('filename.csv', encoding='utf_8_sig', index=False)

参考资料

Thank you for your approval.

欢迎关注我的其它发布渠道