代码之家  ›  专栏  ›  技术社区  ›  Sotos

高效的字符串后缀检测

  •  6
  • Sotos  · 技术社区  · 7 年前

    dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('domains')
    +----------------------------------------+
    |domains                                 |
    +----------------------------------------+
    |something.google.com                    |
    |something.google.com.somethingelse.ac.uk|
    |something.good.com.cy                   |
    |something.good.com.cy.mal.org           |
    +----------------------------------------+  
    
    dd1 =  spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
    +-----------+
    |gooddomains|
    +-----------+
    |google.com |
    |good.com.cy|
    +-----------+
    

    我想 domains gooddomains

    我要做的是过滤掉中匹配的字符串 dd 这并没有结束 dd1 . 在上面的例子中,我想过滤掉第1行和第3行,以

    +----------------------------------------+
    |domains                                 |
    +----------------------------------------+
    |something.google.com.somethingelse.ac.uk|
    |something.good.com.cy.mal.org           |
    +----------------------------------------+  
    

    verygood.co.ac.uk dd1

    def split_filter(x, whitelist):
        splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
        last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
           F.lit('.'), \
           splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
        last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
           F.lit('.'), \
           splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
           F.lit('.'), \
           splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
        x = x.withColumn('id', F.monotonically_increasing_id())
        last_two = last_two.withColumn('id', F.monotonically_increasing_id())
        last_three = last_three.withColumn('id', F.monotonically_increasing_id())
        final_d = x.join(last_two, ['id']).join(last_three, ['id'])
        df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
        df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
        return df2.drop('id')
    

    我将Spark 2.3.0与Python 2.7.5结合使用。

    2 回复  |  直到 7 年前
        1
  •  12
  •   user10938362 zero323    7 年前

    domains 为了获得更好的覆盖率:

    domains = spark.createDataFrame([
        "something.google.com",  # OK
        "something.google.com.somethingelse.ac.uk", # NOT OK 
        "something.good.com.cy", # OK 
        "something.good.com.cy.mal.org",  # NOT OK
        "something.bad.com.cy",  # NOT OK
        "omgalsogood.com.cy", # NOT OK
        "good.com.cy",   # OK 
        "sogood.example.com",  # OK Match for shorter redundant, mismatch on longer
        "notsoreal.googleecom" # NOT OK
    ], "string").toDF('domains')
    
    good_domains =  spark.createDataFrame([
        "google.com", "good.com.cy", "alsogood.com.cy",
        "good.example.com", "example.com"  # Redundant case
    ], "string").toDF('gooddomains')
    

    一个简单的解决方案,只使用Spark SQL原语 ,是为了简化您当前的方法。既然您已经声明可以安全地假设这些是有效的公共域,那么我们可以定义如下函数:

    from pyspark.sql.functions import col, regexp_extract
    
    def suffix(c): 
        return regexp_extract(c, "([^.]+\\.[^.]+$)", 1) 
    

    其中提取顶级域和一级子域:

    domains_with_suffix = (domains
        .withColumn("suffix", suffix("domains"))
        .alias("domains"))
    good_domains_with_suffix = (good_domains
        .withColumn("suffix", suffix("gooddomains"))
        .alias("good_domains"))
    
    domains_with_suffix.show()
    
    +--------------------+--------------------+
    |             domains|              suffix|
    +--------------------+--------------------+
    |something.google.com|          google.com|
    |something.google....|               ac.uk|
    |something.good.co...|              com.cy|
    |something.good.co...|             mal.org|
    |something.bad.com.cy|              com.cy|
    |  omgalsogood.com.cy|              com.cy|
    |         good.com.cy|              com.cy|
    |  sogood.example.com|         example.com|
    |notsoreal.googleecom|notsoreal.googleecom|
    +--------------------+--------------------+
    

    现在我们可以外接:

    from pyspark.sql.functions import (
        col, concat, lit, monotonically_increasing_id, sum as sum_
    )
    
    candidates = (domains_with_suffix
        .join(
            good_domains_with_suffix,
            col("domains.suffix") == col("good_domains.suffix"), 
            "left"))
    

    并过滤结果:

    is_good_expr = (
        col("good_domains.suffix").isNotNull() &      # Match on suffix
        (
    
            # Exact match
            (col("domains") == col("gooddomains")) |
            # Subdomain match
            col("domains").endswith(concat(lit("."), col("gooddomains")))
        )
    )
    
    not_good_domains = (candidates
        .groupBy("domains")  # .groupBy("suffix", "domains") - see the discussion
        .agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good"))
        .filter(~col("any_good"))
        .drop("any_good"))
    
    not_good_domains.show(truncate=False)     
    
    +----------------------------------------+
    |domains                                 |
    +----------------------------------------+
    |omgalsogood.com.cy                      |
    |notsoreal.googleecom                    |
    |something.good.com.cy.mal.org           |
    |something.google.com.somethingelse.ac.uk|
    |something.bad.com.cy                    |
    +----------------------------------------+
    

    Cartesian product required for direct join with LIKE join (如果出现以下情况,可以跳过此操作: good_domains 小到可以 broadcasted group_by + agg .

    不幸的是,Spark SQL不允许自定义分区器对这两个分区只使用一个洗牌(但在 composite key join(_, "key1") .groupBy("key1", _) .

    概率的 bounter 在他的小帮助下 toolz )

    from pyspark.sql.functions import concat_ws, reverse, split
    from bounter import bounter
    from toolz.curried import identity, partition_all
    
    # This is only for testing on toy examples, in practice use more realistic value
    size_mb = 20      
    chunk_size = 100
    
    def reverse_domain(c):
        return concat_ws(".", reverse(split(c, "\\.")))
    
    def merge(acc, xs):
        acc.update(xs)
        return acc
    
    counter = sc.broadcast((good_domains
        .select(reverse_domain("gooddomains"))
        .rdd.flatMap(identity)
        # Chunk data into groups so we reduce the number of update calls
        .mapPartitions(partition_all(chunk_size))
        # Use tree aggregate to reduce pressure on the driver, 
        # when number of partitions is large*
        # You can use depth parameter for further tuning
        .treeAggregate(bounter(need_iteration=False, size_mb=size_mb), merge, merge)))
    

    接下来定义一个用户定义的函数,如下所示

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from toolz import accumulate
    
    def is_good_counter(counter):
        def is_good_(x):
            return any(
                x in counter.value 
                for x in accumulate(lambda x, y: "{}.{}".format(x, y), x.split("."))
            )
    
        @pandas_udf("boolean", PandasUDFType.SCALAR)
        def _(xs):
            return xs.apply(is_good_)
        return _
    

    :

    domains.filter(
        ~is_good_counter(counter)(reverse_domain("domains"))
    ).show(truncate=False)
    
    +----------------------------------------+
    |domains                                 |
    +----------------------------------------+
    |something.google.com.somethingelse.ac.uk|
    |something.good.com.cy.mal.org           |
    |something.bad.com.cy                    |
    |omgalsogood.com.cy                      |
    |notsoreal.googleecom                    |
    +----------------------------------------+
    

    在斯卡拉 bloomFilter

    import org.apache.spark.sql.Column
    import org.apache.spark.sql.functions._
    import org.apache.spark.util.sketch.BloomFilter
    
    def reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\\.")))
    
    val checker = good_domains.stat.bloomFilter(
      // Adjust values depending on the data
      reverseDomain($"gooddomains"), 1000, 0.001 
    )
    
    def isGood(checker: BloomFilter) = udf((s: String) => 
      s.split('.').toStream.scanLeft("") {
        case ("", x) => x
        case (acc, x) => s"${acc}.${x}"
    }.tail.exists(checker mightContain _))
    
    
    domains.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)
    
    +----------------------------------------+
    +----------------------------------------+
    |something.google.com.somethingelse.ac.uk|
    |坏东西|
    |omgalsogood.com.cy|
    |谷歌公司|
    

    shouldn't be hard to call such code from Python .

    由于近似性质,这可能仍然不能完全令人满意。如果您需要确切的结果,您可以尝试 ,例如 trie datrie 实施)。

    如果 好的

    import string
    import datrie
    
    
    def seq_op(acc, x):
        acc[x] = True
        return acc
    
    def comb_op(acc1, acc2):
        acc1.update(acc2)
        return acc1
    
    trie = sc.broadcast((good_domains
        .select(reverse_domain("gooddomains"))
        .rdd.flatMap(identity)
        # string.printable is a bit excessive if you need standard domain
        # and not enough if you allow internationalized domain names.
        # In the latter case you'll have to adjust the `alphabet`
        # or use different implementation of trie.
        .treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))
    

    定义用户定义的函数:

    def is_good_trie(trie):
        def is_good_(x):
            if not x:
                return False
            else:
                return any(
                    x == match or x[len(match)] == "."
                    for match in trie.value.iter_prefixes(x)
                )
    
        @pandas_udf("boolean", PandasUDFType.SCALAR)
        def _(xs):
            return xs.apply(is_good_)
    
        return _
    

    domains.filter(
        ~is_good_trie(trie)(reverse_domain("domains"))
    ).show(truncate=False)
    
    +----------------------------------------+
    |something.google.com.somethingelse.ac.uk|
    |something.good.com.cy.mal.org|
    |坏东西|
    

    这种特定方法的工作假设是 可以压缩为单个trie,但可以轻松扩展以处理不满足此假设的情况。例如,您可以为每个顶级域或后缀构建一个trie(如naive解决方案中所定义的)

    (good_domains
        .select(suffix("gooddomains"), reverse_domain("gooddomains"))
        .rdd
        .aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))
    

    然后,从序列化版本按需加载模型,或者使用 RDD

    这两种非本机方法可以根据数据、业务需求(如近似解决方案中的误报容忍度)和可用资源(驱动程序内存、执行器内存、数据基数)进行进一步调整 suffixes ,访问与POSIX兼容的分布式文件系统,等等)。在应用这些选项时也需要考虑一些权衡。 DataFrames RDDs (内存使用、通信和序列化开销)。


    Understanding treeReduce() in Spark

        2
  •  4
  •   pault Tanjin    7 年前

    如果我理解正确,您只需要使用简单的SQL字符串匹配模式进行左反联接。

    from pyspark.sql.functions import expr
    
    dd.alias("l")\
        .join(
            dd1.alias("r"), 
            on=expr("l.domains LIKE concat('%', r.gooddomains)"), 
            how="leftanti"
        )\
        .select("l.*")\
        .show(truncate=False)
    #+----------------------------------------+
    #|domains                                 |
    #+----------------------------------------+
    #|something.google.com.somethingelse.ac.uk|
    #|something.good.com.cy.mal.org           |
    #+----------------------------------------+
    

    表情 concat('%', r.gooddomains) 将通配符前置到 r.gooddomains

    接下来,我们使用 l.domains LIKE concat('%', r.gooddomains)

    最后,指定 how="leftanti"


    使现代化 :如中所述 the comments @user10938362 这种方法有两个缺陷:

    example.com example.com subdomain.example.com ,但不是 fakeexample.com

    有两种方法可以做到这一点。首先是修改 LIKE 表达式来处理此问题。因为我们知道这些都是有效的域,所以我们可以检查精确匹配或域后面的点:

    like_expr = " OR ".join(
        [
            "(l.domains = r.gooddomains)",
            "(l.domains LIKE concat('%.', r.gooddomains))"
        ]
    )
    
    dd.alias("l")\
        .join(
            dd1.alias("r"), 
            on=expr(like_expr), 
            how="leftanti"
        )\
        .select("l.*")\
        .show(truncate=False)
    

    同样,可以使用 RLIKE

    here ,加入 喜欢 dd1 小到可以广播,那么这就不是问题了。


    关于PySparkSQL的更多信息 LIKE Apache HIVE docs :

    A LIKE B :

    _ B中的字符与A中的任何字符匹配(类似于 . % B中的字符与A中任意数量的字符匹配(类似于 .* 在posix正则表达式中)。例如 'foobar' LIKE 'foo' 'foobar' LIKE 'foo___' 'foobar' LIKE 'foo%' . 逃脱 % \ ( % % 字符)。如果数据包含分号,并且要搜索它,则需要对其进行转义, columnValue LIKE 'a\;b'


    :这利用了使用 pyspark.sql.functions.expr pass in a column value as a parameter to a function .

    推荐文章