事实上,情况并不清楚,所以可能会有所不同。但是我可以给你一些建议。这不是确切的解决办法,但我认为它可以帮助你给出一个想法。
首先,我阅读表格的细节;
>>> rdd1 = sc.textFile('/home/ali/table1.txt')
>>> table1 = rdd1.map(lambda x: x.split(':')).map(lambda x: (x[0],x[1])).toDF(['col_name','data_type'])
>>> table1.show()
+--------+-------------+
|col_name| data_type|
+--------+-------------+
| ename| varchar(10)|
| eid| smallint(5)|
| esal|numeric(10,3)|
+--------+-------------+
>>> rdd2 = sc.textFile('/home/ali/table2.txt')
>>> table2 = rdd2.map(lambda x: x.split(':')).map(lambda x: (x[0],x[1])).toDF(['col_name','data_type'])
>>> table2.show()
+--------+-----------+
|col_name| data_type|
+--------+-----------+
| sid|smallint(5)|
| sname|varchar(10)|
| sclass|varchar(10)|
+--------+-----------+
>>> from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
>>>
>>> schema1 = StructType([
... StructField("col1", StringType()),
... StructField("col2", IntegerType()),
... StructField("col3", DoubleType())
... ])
>>>
>>> schema2 = StructType([
... StructField("col1", IntegerType()),
... StructField("col2", StringType()),
... StructField("col3", StringType())
... ])
>>>
>>> data1 = spark.read.csv('/home/ali/file1.txt', schema=schema1)
>>> data1.show()
+----+----+---------+
|col1|col2| col3|
+----+----+---------+
| aa| 1|12222.009|
| bb| 2|12345.012|
+----+----+---------+
>>> data2 = spark.read.csv('/home/ali/file2.txt', schema=schema2)
>>> data2.show()
+----+----+---------+
|col1|col2| col3|
+----+----+---------+
| 1| s1|1st_class|
| 2| s2|2nd_class|
+----+----+---------+
我定义了一个函数来检查数据类型是否匹配。但是在定义函数时,应该转换一些数据库数据类型(例如:varchar->字符串,数字->double..)我只转换string、int和double数据类型。如果您想使用更多的数据类型,您应该定义所有这些类型
>>> def matchTableData(t,d):
... matched = []
... for k1,table in t.items():
... table_dtypes = []
... a = True
... for i in [i.data_type for i in table.select('data_type').collect()]:
... if 'char' in i:
... table_dtypes.append('string')
... elif 'int' in i:
... table_dtypes.append('int')
... elif 'numeric' in i:
... table_dtypes.append('double')
... for k2,data in d.items():
... data_dtypes = [i[1] for i in data.dtypes]
... if table_dtypes == data_dtypes:
... matched.append([k1,k2])
... return matched
现在我们可以比较数据类型了。我为表和数据创建了两个dict。
>>> tables = {'table1':table1, 'table2':table2}
>>> data = {'data1':data1, 'data2':data2}
>>> print(matchTableData(tables,data))
[['table1', 'data1'], ['table2', 'data2']]
如您所见,它返回匹配的。正如我之前所说,这可能不是精确的解决方案,但我认为你可以使用其中的一部分