读取数据
1 2 3 4 5 6 7 8 9 10 11 12
| from pyspark.sql import SparkSession, HiveContext, DataFrameWriter import time
spark = SparkSession.builder.enableHiveSupport().appName('test').getOrCreate() start = time.time()
input = '/aaa/bbb/ccc' data = spark.read.parquet(input) data.show(5)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from pyspark.sql import SparkSession, HiveContext import pandas as pd
spark = SparkSession.builder.enableHiveSupport().appName('test').getOrCreate() hive_context = HiveContext(spark) sql = ''' select start_date , end_date from db_nam.table_name where pt = '2021-07-30' and type = 'type1' ''' df = hive_context.sql(sql) df.show(5)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import pandas as pd
spark = SparkSession \ .builder \ .appNmae('xxxx') \ .getOrCreate()
sql = ''' select a from b '''
df_spark = spark.sql(sql) df = df_spark.toPandas()
df_spark_new = sqlContext.createDataFrame(df_res) df_spark_new.coalesce(1) df_spark_new.createOrReplaceTempView("tmp_table")
sql = ''' insert overwrite table ${DB_NAME}.app_xx_yy_zz partition (pt = '${bizdate}') select a , b , c from tmp_table '''
spark.sql(sql)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from pyhive import hive import os import pandas as pd
conn = hive.connect( host=os.environ['PYHIVE_HOST'], port=os.environ['PYHIVE_PORT'], username=os.environ['JUPYTER_HADOOP_USER'], password=os.environ['HADOOP_USER_PASSWORD'], auth='LDAP', configuration={ 'mapreduce.job.queuename': os.environ['JUPYTER_HADOOP_QUEUE'], 'hive.resultset.use.unique.column.names': 'false' } )
sql = ''' select a from table where pt = '%s' ''' % ( '2022-03-20' )
df = pd.read_sql_query(sql, conn) df.to_csv('filename.csv', encoding='utf_8_sig', index=False)
|
参考资料
Thank you for your approval.
支付宝
微信支付
WeChat Bezahlung