代码之家  ›  专栏  ›  技术社区  ›  Jonas Pohlmann

Stormcrawler没有为elasticsearch获取/索引页面

  •  0
  • Jonas Pohlmann  · 技术社区  · 7 年前

    我正在使用Stormcrawler和Elasticsearch example 并且不显示带有 FETCHED 爬网网页时在Kibana中的状态 http://books.toscrape.com/

    仍然在控制台上,网页似乎被提取和解析

    48239 [Thread-26-fetcher-executor[3 3]] INFO  c.d.s.b.FetcherBolt - [Fetcher #3] Threads : 0    queues : 1      in_queues : 1
    48341 [FetcherThread #7] INFO  c.d.s.b.FetcherBolt - [Fetcher #3] Fetched http://books.toscrape.com/catalogue/category/books_1/index.html with status 200 in msec 86
    48346 [Thread-46-parse-executor[5 5]] INFO  c.d.s.b.JSoupParserBolt - Parsing : starting http://books.toscrape.com/catalogue/category/books_1/index.html
    48362 [Thread-46-parse-executor[5 5]] INFO  c.d.s.b.JSoupParserBolt - Parsed http://books.toscrape.com/catalogue/category/books_1/index.html in 13 msec
    

    此外,Elasticsearch的索引似乎可以获得一些项目,即使这些项目没有标题

    Screenshot of Kibana

    我扩展了 com.digitalpebble.stormcrawler.elasticsearch.bolt.IndexerBolt 还将网页的元数据存储在本地文件中,而它似乎根本没有得到任何元组。因为索引器还将url的状态标记为 取来 这可以解释在基巴纳提到的观察结果。

    这种行为有什么解释吗?我已经将爬虫配置恢复为标准配置,但爬虫中的索引螺栓除外。flux来管理我的班级。

    拓扑配置:

    name: "crawler"
    
    includes:
        - resource: true
          file: "/crawler-default.yaml"
          override: false
    
    
    
        - resource: false
          file: "es-conf.yaml"
          override: true
    
    spouts:
      - id: "spout"
        className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout"
        parallelism: 10
    
    bolts:
      - id: "partitioner"
        className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
        parallelism: 1
      - id: "fetcher"
        className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
        parallelism: 1
      - id: "sitemap"
        className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
        parallelism: 1
      - id: "parse"
        className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
        parallelism: 1
      - id: "index"
        className: "de.hpi.bpStormcrawler.IndexerBolt"
        parallelism: 1
      - id: "status"
        className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
        parallelism: 1
      - id: "status_metrics"
        className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
        parallelism: 1
    
    streams:
      - from: "spout"
        to: "partitioner"
        grouping:
          type: SHUFFLE
    
      - from: "spout"
        to: "status_metrics"
        grouping:
          type: SHUFFLE
    
      - from: "partitioner"
        to: "fetcher"
        grouping:
          type: FIELDS
          args: ["url"]
    
      - from: "fetcher"
        to: "sitemap"
        grouping:
          type: LOCAL_OR_SHUFFLE
    
      - from: "sitemap"
        to: "parse"
        grouping:
          type: LOCAL_OR_SHUFFLE
    
      - from: "parse"
        to: "index"
        grouping:
          type: LOCAL_OR_SHUFFLE
    
      - from: "fetcher"
        to: "status"
        grouping:
          type: FIELDS
          args: ["url"]
          streamId: "status"
    
      - from: "sitemap"
        to: "status"
        grouping:
          type: FIELDS
          args: ["url"]
          streamId: "status"
    
      - from: "parse"
        to: "status"
        grouping:
          type: FIELDS
          args: ["url"]
          streamId: "status"
    
      - from: "index"
        to: "status"
        grouping:
          type: FIELDS
          args: ["url"]
          streamId: "status"
    

    重新配置的索引器螺栓

    package de.hpi.bpStormcrawler;
    
    /**
     * Licensed to DigitalPebble Ltd under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * DigitalPebble licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    
    import static com.digitalpebble.stormcrawler.Constants.StatusStreamName;
    import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
    
    import java.io.*;
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.storm.metric.api.MultiCountMetric;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.elasticsearch.action.DocWriteRequest;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.digitalpebble.stormcrawler.Metadata;
    import com.digitalpebble.stormcrawler.elasticsearch.ElasticSearchConnection;
    import com.digitalpebble.stormcrawler.indexing.AbstractIndexerBolt;
    import com.digitalpebble.stormcrawler.persistence.Status;
    import com.digitalpebble.stormcrawler.util.ConfUtils;
    
    /**
     * Sends documents to ElasticSearch. Indexes all the fields from the tuples or a
     * Map <String,Object> from a named field.
     */
    @SuppressWarnings("serial")
    public class IndexerBolt extends AbstractIndexerBolt {
    
        private static final Logger LOG = LoggerFactory
                .getLogger(IndexerBolt.class);
    
        private static final String ESBoltType = "indexer";
    
        static final String ESIndexNameParamName = "es.indexer.index.name";
        static final String ESDocTypeParamName = "es.indexer.doc.type";
        private static final String ESCreateParamName = "es.indexer.create";
    
        private OutputCollector _collector;
    
        private String indexName;
        private String docType;
    
        // whether the document will be created only if it does not exist or
        // overwritten
        private boolean create = false;
        File indexFile;
    
        private MultiCountMetric eventCounter;
    
        private ElasticSearchConnection connection;
    
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public void prepare(Map conf, TopologyContext context,
                            OutputCollector collector) {
            super.prepare(conf, context, collector);
            _collector = collector;
    
            indexName = ConfUtils.getString(conf, IndexerBolt.ESIndexNameParamName,
                    "fetcher");
            docType = ConfUtils.getString(conf, IndexerBolt.ESDocTypeParamName,
                    "doc");
            create = ConfUtils.getBoolean(conf, IndexerBolt.ESCreateParamName,
                    false);
    
            try {
                connection = ElasticSearchConnection
                        .getConnection(conf, ESBoltType);
            } catch (Exception e1) {
                LOG.error("Can't connect to ElasticSearch", e1);
                throw new RuntimeException(e1);
            }
    
            this.eventCounter = context.registerMetric("ElasticSearchIndexer",
                    new MultiCountMetric(), 10);
    
            indexFile = new File("/Users/jonaspohlmann/code/HPI/BP/stormCrawlerSpike/spikeStormCrawler2/index.log");
        }
    
        @Override
        public void cleanup() {
            if (connection != null)
                connection.close();
        }
    
        @Override
        public void execute(Tuple tuple) {
    
            String url = tuple.getStringByField("url");
    
            // Distinguish the value used for indexing
            // from the one used for the status
            String normalisedurl = valueForURL(tuple);
    
            Metadata metadata = (Metadata) tuple.getValueByField("metadata");
            String text = tuple.getStringByField("text");
    
    
            //BP: added Content Field
            String content = new String(tuple.getBinaryByField("content"));
    
            boolean keep = filterDocument(metadata);
            if (!keep) {
                eventCounter.scope("Filtered").incrBy(1);
                // treat it as successfully processed even if
                // we do not index it
                _collector.emit(StatusStreamName, tuple, new Values(url, metadata,
                        Status.FETCHED));
                _collector.ack(tuple);
                return;
            }
    
            try {
                XContentBuilder builder = jsonBuilder().startObject();
    
                // display text of the document?
                if (fieldNameForText() != null) {
                    builder.field(fieldNameForText(), trimText(text));
                }
    
                // send URL as field?
                if (fieldNameForURL() != null) {
                    builder.field(fieldNameForURL(), normalisedurl);
                }
    
    
                // which metadata to display?
                Map<String, String[]> keyVals = filterMetadata(metadata);
    
                Iterator<String> iterator = keyVals.keySet().iterator();
                while (iterator.hasNext()) {
                    String fieldName = iterator.next();
                    String[] values = keyVals.get(fieldName);
                    if (values.length == 1) {
                        builder.field(fieldName, values[0]);
                        try {
                            saveStringToFile(indexFile, fieldName + "\t" + values[0]);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    } else if (values.length > 1) {
                        builder.array(fieldName, values);
                    }
                }
    
                builder.endObject();
    
                String sha256hex = org.apache.commons.codec.digest.DigestUtils
                        .sha256Hex(normalisedurl);
    
                IndexRequest indexRequest = new IndexRequest(indexName, docType,
                        sha256hex).source(builder);
    
                DocWriteRequest.OpType optype = DocWriteRequest.OpType.INDEX;
    
                if (create) {
                    optype = DocWriteRequest.OpType.CREATE;
                }
    
                indexRequest.opType(optype);
    
                connection.getProcessor().add(indexRequest);
    
                eventCounter.scope("Indexed").incrBy(1);
    
                _collector.emit(StatusStreamName, tuple, new Values(url, metadata,
                        Status.FETCHED));
                _collector.ack(tuple);
    
            } catch (IOException e) {
                LOG.error("Error sending log tuple to ES", e);
                // do not send to status stream so that it gets replayed
                _collector.fail(tuple);
            }
        }
        private void saveStringToFile(File file, String stringToWrite) throws IOException {
            String pathName = file.getPath();
            File folder = file.getParentFile();
    
            if (!folder.exists() && !folder.mkdirs()) {
                throw new IOException("Couldn't create the storage folder: " + folder.getAbsolutePath() + " does it already exist ?");
            }
    
            try (PrintWriter out = new PrintWriter(new FileOutputStream(file, true))) {
                out.append(stringToWrite + '\n');
    
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Julien Nioche    7 年前

    您是否已将所有配置(即通用SC+特定ES)合并为一个 es-conf.yaml ? 如果没有,则可能缺少Flux文件

       - resource: false
         file: "crawler-conf.yaml"
         override: true
    

    其中索引器配置通常如下所示:

      indexer.url.fieldname: "url"
      indexer.text.fieldname: "content"
      indexer.canonical.name: "canonical"
      indexer.md.mapping:
      - parse.title=title
      - parse.keywords=keywords
      - parse.description=description
      - domain=domain
    

    没有定义任何md映射可以解释为什么修改后的索引器不会写入文件,以及为什么索引包含URL但没有其他字段。

    请注意,“索引”索引(请原谅术语)不包含URL的状态。看见 https://stackoverflow.com/a/49316316/432844 有关状态与索引的说明。