read_sq()
multiprocessing
需要在单独的文件中。该文件如下所示:
import pandas as pd
from sqlalchemy import event, create_engine
from math import radians, cos, sin, asin, sqrt
import numpy as np
engine = create_engine('engine-path')
data = pd.read_sql("SELECT * from SCHEMA.TABLE", engine)
def cartesian_product_simplified(left, right):
la, lb = len(left), len(right)
ia2, ib2 = np.broadcast_arrays(*np.ogrid[:la,:lb])
return pd.DataFrame(np.column_stack([left.values[ia2.ravel()], right.values[ib2.ravel()]]))
def haversine_np(lon1, lat1, lon2, lat2):
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2
c = 2 * np.arcsin(np.sqrt(a))
m = 3956.269 * c
return m
def getDistance(chunk):
df = cartesian_product_simplified(chunk, data)
df = df.rename(columns={1:'lat1',2:'lon1',6:'lat2',7:'lon2'})
df = df.astype({"lat1": float,"lon1": float,"lat2": float,"lon2": float})
m = haversine_np(df['lon1'],df['lat1'],df['lon2'],df['lat2'])
dist = pd.DataFrame(m.values)
result = df.join(dist)
result = result.rename(columns={0:'dist'})
result = result[result['dist']<=3]
return result
主笔记本如下所示:
import pandas as pd
from dist_func import getDistance
from multiprocessing import Pool
if __name__ == '__main__':
global result
p = Pool(20)
for chunk in pd.read_sql("select top 10 * from SCHEMA.SecondTable", engine, chunksize=1):
result = p.map(getDistance, chunk)
p.terminate()
p.join()
这将导致此回溯:
Traceback (most recent call last):
File "C:\Users\filepath\anaconda\lib\multiprocessing\pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "C:\Users\filepath\anaconda\lib\multiprocessing\pool.py", line 44, in mapstar
return list(map(*args))
File "C:\Users\filepath\dist_func.py", line 30, in getDistance
df = cartesian_product_simplified(chunk, vendor_geo)
File "C:\Users\filepath\dist_func.py", line 18, in cartesian_product_simplified
return pd.DataFrame(np.column_stack([left.values[ia2.ravel()], right.values[ib2.ravel()]]))
AttributeError: 'str' object has no attribute 'values'
这是指
cartesian_product_simplified
提供给
getDistance
功能。但是,当我删除多处理并简单地通过
read_sql()
像这样的查询。。。
for chunk in pd.read_sql("select top 100 * from SCHEMA.SecondTable", engine, chunksize=10):
df = cartesian_product_simplified(chunk, data)
df = df.astype({"lat1": float,"lon1": float,"lat2": float,"lon2": float})
df = df.astype({"lat1": float,"lon1": float,"lat2": float,"lon2": float})
m = haversine_np(df['lon1'],df['lat1'],df['lon2'],df['lat2'])
dist = pd.DataFrame(m.values)
result = df.join(dist)
result = result.rename(columns={0:'dist'})
result = result[result['dist']<=3]
df_list.append(result)
…不会抛出这样的错误。这是使用完全相同的函数。为什么这个错误会发生,当它看起来像是被输入两个数据帧,并且它工作时没有涉及多个处理?