代码之家  ›  专栏  ›  技术社区  ›  CompEng

如何通过查看我的两个秋天来复制行?

  •  0
  • CompEng  · 技术社区  · 3 年前

    我有一张这样的大桌子:

    **StartIp,EndIp,value**
    (1.0.0.9,1.0.0.18,a)
    (172.0.0.2,180.0.0.1,c)
    

    我想要这样的结果:

    (1.0.0.9,a)
    (1.0.0.10,a)
    (1.0.0.11,a)
    .
    .
    .
    (1.0.0.18,a)
    (172.0.0.2,c)
    .
    .
    (180.0.0.1.c)
    

    我写这封信是为了找到这个函数来查找范围之间的ips

    import java.net.InetAddress
    import org.apache.spark.sql.{Row, SparkSession}
    
    object ExpandIpRange {
      def main(args: Array[String]): Unit = {
        // Define the start and end IP addresses
        val startIp = "192.168.0.1"
        val endIp = "192.168.0.5"
    
        // Create a SparkSession
        val spark = SparkSession.builder()
          .appName("ExpandIpRange")
          .getOrCreate()
    
        try {
          // Define a function to expand an IP range
          def expandIpRange(startIp: String, endIp: String): Seq[String] = {
            val startAddress = InetAddress.getByName(startIp)
            val endAddress = InetAddress.getByName(endIp)
    
            // Convert the IP addresses to integers
            val startInt = ipToInt(startAddress)
            val endInt = ipToInt(endAddress)
    
            // Create a sequence of IP addresses
            (startInt to endInt).map(intToIp)
          }
    
          // Define a function to convert an IP address to an integer
          def ipToInt(address: InetAddress): Int = {
            val bytes = address.getAddress
            ((bytes(0) & 0xFF) << 24) |
              ((bytes(1) & 0xFF) << 16) |
              ((bytes(2) & 0xFF) << 8) |
              (bytes(3) & 0xFF)
          }
    
          // Define a function to convert an integer to an IP address
          def intToIp(address: Int): String = {
            InetAddress.getByAddress(intToBytes(address)).getHostAddress
          }
    
          // Define a function to convert an integer to a byte array
          def intToBytes(address: Int): Array[Byte] = {
            Array(
              ((address >> 24) & 0xFF).toByte,
              ((address >> 16) & 0xFF).toByte,
              ((address >> 8) & 0xFF).toByte,
              (address & 0xFF).toByte
            )
          }
    
          // Expand the IP range
          val ips = expandIpRange(startIp, endIp)
    
          // Convert the IP addresses to rows
          val rows = ips.map(ip => Row(ip))
    

    如何应用于我的数据帧

    我有20亿张唱片,所以我需要用一种好的方式

    0 回复  |  直到 3 年前
        1
  •  0
  •   Gastón Schabas    3 年前

    您具有解决ip范围并返回所需的Seq的函数

    def expandIpRange(startIp: String, endIp: String): Seq[String]
    

    像这样的东西怎么样

    dataframeWithIpRanges
      .flatMap { row =>
        expandIpRange(row.startIp, row.endIp)
          .map(range => Row(range, row.value)
      }
    

    不知道确切的spark语法,但我们的想法是使用 flatMap 其中,您将传递一个函数(根据参数接收的范围创建IP列表的函数),然后将列表转换为 Row(IP, value) 这是您正在寻找的预期结果。