代码之家  ›  专栏  ›  技术社区  ›  keyboardsamurai

获取StormCrawler上的拓扑以正确编写warc文件

  •  0
  • keyboardsamurai  · 技术社区  · 7 年前

    在我的项目中,stormcrawler maven原型似乎不能很好地处理warc模块。目前,它只创建名称为“crawl-20180802121925-00000”的空0字节文件。warc.gz公司". 我是不是漏了什么?

    我尝试通过创建一个默认项目来启用warc编写,如下所示:

    mvn archetype:generate -DarchetypeGroupId=com.digitalpebble.stormcrawler -DarchetypeArtifactId=storm-crawler-archetype -DarchetypeVersion=1.10
    

    然后在pom.xml文件就像这样

        <dependency>
            <groupId>com.digitalpebble.stormcrawler</groupId>
            <artifactId>storm-crawler-warc</artifactId>
            <version>1.10</version>
        </dependency>
    

    然后我将WARCHdfsBolt添加到fetch分组中,同时尝试写入本地文件系统目录。

    public class CrawlTopology extends ConfigurableTopology {
    
        public static void main(String[] args) throws Exception {
            ConfigurableTopology.start(new CrawlTopology(), args);
        }
    
        @Override
        protected int run(String[] args) {
            TopologyBuilder builder = new TopologyBuilder();
    
            String[] testURLs = new String[] { "http://www.lequipe.fr/",
                    "http://www.lemonde.fr/", "http://www.bbc.co.uk/",
                    "http://storm.apache.org/", "http://digitalpebble.com/" };
    
            builder.setSpout("spout", new MemorySpout(testURLs));
    
            builder.setBolt("partitioner", new URLPartitionerBolt())
                    .shuffleGrouping("spout");
    
            builder.setBolt("fetch", new FetcherBolt())
                    .fieldsGrouping("partitioner", new Fields("key"));
    
            builder.setBolt("warc", getWarcBolt())
                    .localOrShuffleGrouping("fetch");
    
            builder.setBolt("sitemap", new SiteMapParserBolt())
                    .localOrShuffleGrouping("fetch");
    
            builder.setBolt("feeds", new FeedParserBolt())
                    .localOrShuffleGrouping("sitemap");
    
            builder.setBolt("parse", new JSoupParserBolt())
                    .localOrShuffleGrouping("feeds");
    
            builder.setBolt("index", new StdOutIndexer())
                    .localOrShuffleGrouping("parse");
    
            Fields furl = new Fields("url");
    
            // can also use MemoryStatusUpdater for simple recursive crawls
            builder.setBolt("status", new StdOutStatusUpdater())
                    .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
                    .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
                    .fieldsGrouping("feeds", Constants.StatusStreamName, furl)
                    .fieldsGrouping("parse", Constants.StatusStreamName, furl)
                    .fieldsGrouping("index", Constants.StatusStreamName, furl);
    
            return submit("crawl", conf, builder);
        }
    
        private WARCHdfsBolt getWarcBolt() {
            String warcFilePath = "/Users/user/Documents/workspace/test/warc";
    
            FileNameFormat fileNameFormat = new WARCFileNameFormat()
                    .withPath(warcFilePath);
    
            Map<String,String> fields = new HashMap<>();
            fields.put("software:", "StormCrawler 1.0 http://stormcrawler.net/");
            fields.put("conformsTo:", "http://www.archive.org/documents/WarcFileFormat-1.0.html");
    
            WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt()
                    .withFileNameFormat(fileNameFormat);
            warcbolt.withHeader(fields);
    
            // can specify the filesystem - will use the local FS by default
    //        String fsURL = "hdfs://localhost:9000";
    //        warcbolt.withFsUrl(fsURL);
    
            // a custom max length can be specified - 1 GB will be used as a default
            FileSizeRotationPolicy rotpol = new FileSizeRotationPolicy(50.0f,
                    FileSizeRotationPolicy.Units.MB);
            warcbolt.withRotationPolicy(rotpol);
            return warcbolt;
        }
    }
    

    不管我在本地运行它有没有通量,似乎都没有什么区别。您可以在此处查看演示回购: https://github.com/keyboardsamurai/storm-test-warc

    1 回复  |  直到 7 年前
        1
  •  1
  •   Julien Nioche    6 年前

    谢谢你问这个。理论上,当

    1. 同步策略中设置了显式同步 which we have by default at 10 tuples
    2. 有一个自动的,通过滴答元组发生的 every 15 secs by default
    3. 文件是旋转的-在您的情况下,这应该发生在内容达到50MB时

    由于作为起点使用的拓扑不是递归的,并且不处理5个以上的URL,因此从不满足条件1和3。

    builder.setBolt("status", new MemoryStatusUpdater())
    

    相反。这样,新的网址将不断处理。或者,您可以添加

    warcbolt.withSyncPolicy(new CountSyncPolicy(1));
    

    以便在每个元组之后触发同步。实际上,在一个URL不断出现的真正的爬网中,您不需要这样做。

    现在奇怪的是,不管同步是由条件1还是条件2触发的,我都看不到对文件的任何更改,它仍然是0字节。版本1.8并非如此

    <dependency>
        <groupId>com.digitalpebble.stormcrawler</groupId>
        <artifactId>storm-crawler-warc</artifactId>
        <version>1.8</version>
    </dependency> 
    

    所以这可能是因为在那之后代码发生了变化。

    我知道有些用户一直依赖FileTimeSizeRotationPolicy,它可以根据时间触发上面的条件3。

    :压缩条目时出现错误,现已修复,并将成为下一个SC版本的一部分。

    issue 由OP寄出。