代码之家  ›  专栏  ›  技术社区  ›  Nurzhan Nogerbek

如何向Dataframe添加一些信息?

  •  1
  • Nurzhan Nogerbek  · 技术社区  · 7 年前

    我有非常简单的数据框架。

    val df = Seq(
        ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
        ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
        ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
        ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
        ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
    ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
    
    df.show()
    

    在Scala项目中,我将此数据帧转换为CSV文件。我需要在开头添加一些信息,如示例中所示:

    | REQUEST_DATE | 2019-02-05 20:00:00 |
    | USER         | Kate                |
    | SEARCH_TYPE  | Global              |
    
    | NAME         | START_DATE          | END_DATE            | STATUS |
    | Alex         | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
    | Bob          | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
    | Mark         | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
    | Mark         | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
    | Meggy        | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
    

    我试图创建新的数据框架并将它们连接在一起。不幸的是,你不能用不同的模式连接两个数据帧。

    1 回复  |  直到 7 年前
        1
  •  -1
  •   jgp    7 年前

    假设您不想在将文件写入磁盘后执行此操作,您可以:

    1. 将两个数据帧中的所有内容都转换为字符串。但是,输出将如下所示:

     | REQUEST_DATE | 2019-02-05 20:00:00 |                     |        |
     | USER         | Kate                |                     |        |
     | SEARCH_TYPE  | Global              |                     |        |
     |              |                     |                     |        |
     | NAME         | START_DATE          | END_DATE            | STATUS |
     | Alex         | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
     | Bob          | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
     | Mark         | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
     | Mark         | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
     | Meggy        | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
    
    1. 构建客户输出编写器,在保存前添加标题。你可以找到更多信息 there -查找保存/写入部分。

    使现代化

    如果要执行#1,下面是转换第一个数据帧(使用te数据)的代码:

    Dataset<Row> transitionDf = dataDf
        .withColumn("_c1", dataDf.col("NAME"))
        .withColumn("_c2",
            dataDf.col("START_DATE").cast(DataTypes.StringType))
        .withColumn("_c3",
            dataDf.col("END_DATE").cast(DataTypes.StringType))
        .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
        .drop("NAME")
        .drop("START_DATE")
        .drop("END_DATE")
        .drop("STATUS");
    

    关键是 cast() 你的专栏,这样你就可以使用 unionByName() 合并两个数据帧。在Java中(我不做Scala),整个代码都是这样的:

    package net.jgp.labs.spark.l901Union;
    
    import java.sql.Date;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    /**
     * Use of unionByName() to create a complex header on a dataframe.
     * 
     * @author jgp
     */
    public class UnionApp {
      private SimpleDateFormat format = null;
    
      public UnionApp() {
        this.format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      }
    
      /**
       * main() is your entry point to the application.
       * 
       * @param args
       * @throws ParseException
       */
      public static void main(String[] args) throws ParseException {
        UnionApp app = new UnionApp();
        app.start();
      }
    
      /**
       * The processing code.
       * 
       * @throws ParseException
       */
      private void start() throws ParseException {
        // Creates a session on a local master
        SparkSession spark = SparkSession.builder()
            .appName("expr()")
            .master("local")
            .getOrCreate();
    
        // DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
        // HH:mm:ss", Locale.ENGLISH);
    
        // Data
        StructType dataSchema = DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField(
                "NAME",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "START_DATE",
                DataTypes.DateType,
                false),
            DataTypes.createStructField(
                "END_DATE",
                DataTypes.DateType,
                false),
            DataTypes.createStructField(
                "STATUS",
                DataTypes.StringType,
                false) });
        List<Row> dataRows = new ArrayList<Row>();
        dataRows.add(RowFactory.create("Alex", toDate("2018-01-01 00:00:00"),
            toDate("2018-02-01 00:00:00"), "OUT"));
        dataRows.add(RowFactory.create("Bob", toDate("2018-02-01 00:00:00"),
            toDate("2018-02-05 00:00:00"), "IN"));
        dataRows.add(RowFactory.create("Mark", toDate("2018-02-01 00:00:00"),
            toDate("2018-03-01 00:00:00"), "IN"));
        dataRows.add(RowFactory.create("Mark", toDate("2018-05-01 00:00:00"),
            toDate("2018-08-01 00:00:00"), "OUT"));
        dataRows.add(RowFactory.create("Meggy", toDate("2018-02-01 00:00:00"),
            toDate("2018-02-01 00:00:00"), "OUT"));
        Dataset<Row> dataDf = spark.createDataFrame(dataRows, dataSchema);
        dataDf.show();
        dataDf.printSchema();
    
        // Header
        StructType headerSchema = DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField(
                "_c1",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "_c2",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "_c3",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "_c4",
                DataTypes.StringType,
                false) });
        List<Row> headerRows = new ArrayList<Row>();
        headerRows.add(RowFactory.create("REQUEST_DATE",
            format.format(new java.util.Date()), "", ""));
        headerRows.add(RowFactory.create("USER", "Kate", "", ""));
        headerRows.add(RowFactory.create("SEARCH_TYPE", "Global", "", ""));
        headerRows.add(RowFactory.create("", "", "", ""));
        headerRows
            .add(RowFactory.create("NAME", "START_DATE", "END_DATE", "STATUS"));
        Dataset<Row> headerDf = spark.createDataFrame(headerRows, headerSchema);
        headerDf.show(false);
        headerDf.printSchema();
    
        // Transition
        Dataset<Row> transitionDf = dataDf
            .withColumn("_c1", dataDf.col("NAME"))
            .withColumn("_c2",
                dataDf.col("START_DATE").cast(DataTypes.StringType))
            .withColumn("_c3",
                dataDf.col("END_DATE").cast(DataTypes.StringType))
            .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
            .drop("NAME")
            .drop("START_DATE")
            .drop("END_DATE")
            .drop("STATUS");
        transitionDf.show(false);
        transitionDf.printSchema();
    
        // Union
        Dataset<Row> unionDf = headerDf.unionByName(transitionDf);
        unionDf.show(false);
        unionDf.printSchema();
      }
    
      private Date toDate(String dateAsText) throws ParseException {
        java.util.Date parsed;
        parsed = format.parse(dateAsText);
        return new Date(parsed.getTime());
      }
    }
    

    我把它作为我人生的一部分 Spark and Java labs and GitHub 。等效的Scala代码可能会更紧凑一些:)。

    推荐文章