this project
package com.salesforce.hw
import com.salesforce.op._
import com.salesforce.op.evaluators.Evaluators
import com.salesforce.op.features.FeatureBuilder
import com.salesforce.op.features.types._
import com.salesforce.op.readers.DataReaders
import com.salesforce.op.stages.impl.classification.BinaryClassificationModelSelector
import com.salesforce.op.stages.impl.classification.BinaryClassificationModelsToTry._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class Passenger
(
id: Int,
survived: Int,
pClass: Option[Int],
name: Option[String],
sex: Option[String],
age: Option[Double],
sibSp: Option[Int],
parCh: Option[Int],
ticket: Option[String],
fare: Option[Double],
cabin: Option[String],
embarked: Option[String]
)
object OpTitanicSimple {
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
println("You need to pass in the CSV file path as an argument")
sys.exit(1)
}
val csvFilePath = args(0)
println(s"Using user-supplied CSV file path: $csvFilePath")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$"))
implicit val spark = SparkSession.builder.config(conf).getOrCreate()
val survived = FeatureBuilder.RealNN[Passenger].extract(_.survived.toRealNN).asResponse
val pClass = FeatureBuilder.PickList[Passenger].extract(_.pClass.map(_.toString).toPickList).asPredictor
val name = FeatureBuilder.Text[Passenger].extract(_.name.toText).asPredictor
val sex = FeatureBuilder.PickList[Passenger].extract(_.sex.map(_.toString).toPickList).asPredictor
val age = FeatureBuilder.Real[Passenger].extract(_.age.toReal).asPredictor
val sibSp = FeatureBuilder.Integral[Passenger].extract(_.sibSp.toIntegral).asPredictor
val parCh = FeatureBuilder.Integral[Passenger].extract(_.parCh.toIntegral).asPredictor
val ticket = FeatureBuilder.PickList[Passenger].extract(_.ticket.map(_.toString).toPickList).asPredictor
val fare = FeatureBuilder.Real[Passenger].extract(_.fare.toReal).asPredictor
val cabin = FeatureBuilder.PickList[Passenger].extract(_.cabin.map(_.toString).toPickList).asPredictor
val embarked = FeatureBuilder.PickList[Passenger].extract(_.embarked.map(_.toString).toPickList).asPredictor
val familySize = sibSp + parCh + 1
val estimatedCostOfTickets = familySize * fare
val pivotedSex = sex.pivot()
val normedAge = age.fillMissingWithMean().zNormalize()
val ageGroup = age.map[PickList](_.value.map(v => if (v > 18) "adult" else "child").toPickList)
val passengerFeatures = Seq(
pClass, name, age, sibSp, parCh, ticket,
cabin, embarked, familySize, estimatedCostOfTickets,
pivotedSex, ageGroup
).transmogrify()
val sanityCheck = true
val finalFeatures = if (sanityCheck) survived.sanityCheck(passengerFeatures) else passengerFeatures
val prediction =
BinaryClassificationModelSelector.withTrainValidationSplit(
modelTypesToUse = Seq(OpLogisticRegression)
).setInput(survived, finalFeatures).getOutput()
val evaluator = Evaluators.BinaryClassification().setLabelCol(survived).setPredictionCol(prediction)
import spark.implicits._
val trainDataReader = DataReaders.Simple.csvCase[Passenger](
path = Option(csvFilePath),
key = _.id.toString
)
val workflow =
new OpWorkflow()
.setResultFeatures(survived, prediction)
.setReader(trainDataReader)
val fittedWorkflow = workflow.train()
println(s"Summary: ${fittedWorkflow.summary()}")
println("Scoring the model")
val (dataframe, metrics) = fittedWorkflow.scoreAndEvaluate(evaluator = evaluator)
println("Transformed dataframe columns:")
dataframe.columns.foreach(println)
println("Metrics:")
fittedWorkflow .save("/tmp/my-model1")
println("model_saved")
val loadedModel = workflow.loadModel("/tmp/my-model1")
println("model_loaded")
val passengersDatas = DataReaders.Simple.csvCase[Passenger](
Option(csvFilePaths),
key = _.row_id.toString)
val scores = loadedModel.setReader(passengersDatas).score()
print("model_scored")
scores.show(true)
println(scores.collectAsList())
}
}
所以输出是score,我想将输出保存到csv文件,如果我使用score.show()它显示20行结果,但是使用scores.collectAsList()显示所有结果数据,如何将这些数据保存到csv格式?