貼出一段程序,主要是對(duì)postgresql進(jìn)行讀取并進(jìn)行spark處理
def operator(x):
print(x[1])
x[2] =='kao'
if x[4] =='male':
x[4] =1
elif x[4] =='female':
x[4] =2
else:
x[4] =0
print(x[4])
return x
def tuple_convert_to_list(x):
list1 =list(x)
return list1
if __name__ =='__main__':
conn = psycopg2.connect(host='192.168.0.1',
port=5000,
user='test',
password='test20180910',
database='test')
cursor = conn.cursor()
try:
sql ="select *from unicom_2i_hold_schema.tb_user limit 100000"
print("sql:"+sql)
cursor.execute(sql)
rows = cursor.fetchall()
print(cursor.rowcount)
print(rows)
print(type(rows))
sc = SparkContext(appName="test")
rdd = sc.parallelize(rows)
print(type(rdd))
rdd2 = rdd.map(tuple_convert_to_list)
rdd3 = rdd2.map(operator)
print(type(rdd2))
print(rdd2.take(100))
print(rdd3.take(1000))
finally:
cursor.close;