要用python操作Cassandra颤陶,首先需要安裝Cassandra的驅(qū)動模塊(cassandra-driver)浸船,可以通過pip安裝褪储。
1. 基本操作
- 連接Cassandra
from cassandra.cluster import Cluster #引入Cluster模塊
cluster = Cluster() #連接本地數(shù)據(jù)庫蟀架,如果是本地地址峻呛,寫不寫都可以
cluster = Cluster(['127.0.0.1'])#連接本地數(shù)據(jù)庫
- 創(chuàng)建了Cluster后,并不會自動連接上數(shù)據(jù)庫辜窑,需要我們執(zhí)行連接操作钩述。
session = cluster.connect()#簡單的連接
session = cluster.connect('keyspacename')#指定連接keyspace,相當(dāng)于sql中use dbname
session.set_keyspace('otherkeyspace') #設(shè)置穆碎、修改keyspace
session.execute('use keyspacename')#設(shè)置牙勘、修改keyspace
- 查詢:查詢操作使用execute(),將cql語句拼接作為參數(shù)傳入即可所禀。
rows = session.execute('select * from emp')
for row in rows:#遍歷查詢的結(jié)果
print(str(row[0])+row[1]+row[2]+row[3]+str(row[4])) #如果你的row[0] 不是varchar 或者text類型方面,需要轉(zhuǎn)一下類型,不然python會報錯
for (emp_id,emp_city,emp_email,emp_name,emp_phone) in rows:#也可以用這種方式遍歷查詢的結(jié)果
print(str(emp_id)+emp_city+emp_email+emp_name+str(emp_phone))
- 傳參查詢
(1)位置傳參:
session.execute(
"""
INSERT INTO emp (emp_id,emp_city,emp_email,emp_name,emp_phone)
VALUES (%s, %s, %s, %s, %s)
""",
(4, 'tianjin', '156.com','pon',145645)
)
session.execute("INSERT INTO emp (emp_id) VALUES (%s)", (5,)) #如果只傳一個參數(shù)色徘,用tuple的形式必須后面加“恭金,”,或者用list的形式
session.execute("INSERT INTO emp (emp_id) VALUES (%s)", [6])
(2)名字傳參
通常用這種方式傳遞數(shù)據(jù)褂策,像keyspace名横腿、表名、列名必須在開始就設(shè)定好斤寂。
session.execute(
"""
INSERT INTO emp (emp_id,emp_city,emp_email,emp_name,emp_phone)
VALUES (%(emp_id)s, %(emp_city)s, %(emp_email)s, %(emp_name)s, %(emp_phone)s)
""",{'emp_id': 7, 'emp_city': 'xian', 'emp_email': '777777.qq.com', 'emp_name': 'xiaoming', 'emp_phone': 55555})
- 關(guān)閉連接
cluster.shutdown()
cluster.is_shutdown #查看是否關(guān)閉連接
Out[5]: True
2. 批量插入數(shù)據(jù)
如果只是一條一條插入耿焊,會非常慢,我試了下遍搞,5萬條數(shù)據(jù)大概需要4分鐘罗侯,如果用batch批量插入數(shù)據(jù),就可以非诚常快了钩杰,差不多1s插入1萬條纫塌,是不是很爽,哈哈讲弄,趕緊試一下吧~
tic = time.time()
i=0
sql = 'BEGIN BATCH\n'
with open(r'C:\Users\admin\Desktop\output\cassandra116w.csv', 'r') as f:
while True:
line = f.readline().strip()
if (line == '' or line == np.nan):
if(sql != 'BEGIN BATCH\n'):
sql += 'APPLY BATCH;'
session.execute(sql)
break
ll = line.split(',')
sql += 'INSERT INTO lead2(name,current_title,current_company,location,id) VALUES (' + '\''+ll[0]+'\'' + ','+'\''+ll[1]+'\''+',' +'\''+ ll[2] +'\''+ ',' +'\''+ ll[3]+'\'' +',' +'\''+ ll[4] +'\''+');\n'
i=i+1
if (i>300):
sql += 'APPLY BATCH;'
session.execute(sql)
i=0
sql = 'BEGIN BATCH\n'
toc = time.time()
print('vectorized version:' +str((toc - tic)) +'s')
vectorized version:116.4514513015747s #插入116萬條數(shù)據(jù)措左,用時116秒