代码之家  ›  专栏  ›  技术社区  ›  ByeBye

KairosDB-缺失值的线性插值

  •  0
  • ByeBye  · 技术社区  · 8 年前

    我的kairos客户端是用Java编写的。KairosDB版本1.1.3-1。

    我有数据点:

    t1 - 1000, v1 - 100

    t2 - 2000, v2 - 200

    t3 - 3000, v3 - 300

    t4 - 4000, v4 - 500

    我想产生一个输出,它将是这些点的插值,从 ts - 1500 在一段时间内 tp - 1000 . 因此,输出应如下所示(数据的插值点):

    [1500,150],[2500,250],[3500,400]

    有可能用普通的凯洛斯做吗?我们有外部图书馆吗

    2 回复  |  直到 8 年前
        1
  •  0
  •   Georg Muehlenberg    8 年前

    据我所知,Kairosdb中没有线性插值聚合器。我已经能够在GAPS聚合器的帮助下编写一个。以下代码说明了基于值的采样和线性插值。Kairosdb版本1.1.3和Java客户端版本2.2.0。

    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Preconditions;
    import com.google.common.collect.Range;
    import org.apache.commons.collections.CollectionUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.kairosdb.client.HttpClient;
    import org.kairosdb.client.builder.*;
    import org.kairosdb.client.builder.aggregator.SamplingAggregator;
    import org.kairosdb.client.response.QueryResponse;
    import org.kairosdb.client.response.Result;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.annotation.Nonnull;
    import java.io.IOException;
    import java.net.MalformedURLException;
    import java.net.URISyntaxException;
    import java.time.Duration;
    import java.time.Instant;
    import java.time.ZoneId;
    import java.time.ZonedDateTime;
    import java.time.temporal.ChronoUnit;
    import java.util.*;
    
    import static org.hamcrest.MatcherAssert.assertThat;
    import static org.hamcrest.core.Is.is;
    import static org.hamcrest.number.IsCloseTo.closeTo;
    
    public class KairosBulkLinearInterpolationTest {
    
        // constants
        private static final ZoneId UTC = ZoneId.of("UTC");
        private static final String dbUrl = "localhost";
        private static final String port = "8083";
    
        private static final Logger logger = LoggerFactory.getLogger(KairosBulkLinearInterpolationTest.class);
    
        private HttpClient client = null;
        private String metricName;
    
        @Before
        public void setUp() throws Exception {
            client = new HttpClient("http://" + dbUrl + ":" + port);
    
            metricName = "bulkInterpolationTest" + new Random().nextInt(10000);
            MetricBuilder metricBuilder = MetricBuilder.getInstance();
            Metric metric = metricBuilder.addMetric(metricName);
            metric.addTag("test", "test");
            metric.addDataPoint(1000000, 100);
            metric.addDataPoint(2000000, 200);
            metric.addDataPoint(3000000, 300);
            metric.addDataPoint(4000000, 500);
    
            client.pushMetrics(metricBuilder);
        }
    
        @Test
        public void bulkInterpolation() throws IOException, URISyntaxException {
            ZonedDateTime start = Instant.ofEpochMilli(1000000).atZone(UTC);
            Map<ZonedDateTime, Double> valuesMap = interpolationBulk(metricName, new HashMap<>(), Range.closed(start, start.plusSeconds(3500)),
                                                                     Duration.ofSeconds(500));
            Set<Map.Entry<ZonedDateTime, Double>> entries = valuesMap.entrySet();
            ArrayList<Map.Entry<ZonedDateTime, Double>> list = new ArrayList<>(entries);
    
            assertValueOf(list.get(0), 1000L, 100d);
            assertValueOf(list.get(1), 1500L, 150d);
            assertValueOf(list.get(2), 2000L, 200d);
            assertValueOf(list.get(3), 2500L, 250d);
            assertValueOf(list.get(4), 3000L, 300d);
            assertValueOf(list.get(5), 3500L, 400d);
            assertValueOf(list.get(6), 4000L, 500d);
        }
    
        protected void assertValueOf(Map.Entry<ZonedDateTime, Double> entry, long seconds, double value) {
            assertThat(entry.getKey().toEpochSecond(), is(seconds));
            assertThat(entry.getValue(), closeTo(value, 0d));
        }
    
        @After
        public void tearDown() throws Exception {
            client.deleteMetric(metricName);
            client.shutdown();
        }
    
        @VisibleForTesting
        protected Map<ZonedDateTime, Double> interpolationBulk(@Nonnull String metricName,
                                                               @Nonnull Map<String, String> tags,
                                                               @Nonnull Range<ZonedDateTime> inRange,
                                                               @Nonnull Duration samplingPeriod) throws MalformedURLException {
            Preconditions.checkNotNull(metricName, "Missing required parameter 'metricName'!");
            Preconditions.checkNotNull(tags, "Missing required parameter 'tags'!");
            Preconditions.checkNotNull(inRange, "Missing required parameter 'inRange'!");
            Preconditions.checkNotNull(samplingPeriod, "Missing required parameter 'samplingPeriod'!");
    
            // load the values in the given range
            QueryBuilder queryBuilder = QueryBuilder.getInstance();
            queryBuilder.addMetric(metricName).addTags(tags);
            QueryMetric queryMetric = queryBuilder.getMetrics().get(0);
    
            // first average all the given values - we are sampling, remember
            queryMetric.addAggregator(new SamplingAggregator("avg", (int) samplingPeriod.getSeconds(), TimeUnit.SECONDS));
            // and then mark the remaining with null
            queryMetric.addAggregator(new SamplingAggregator("gaps", (int) samplingPeriod.getSeconds(), TimeUnit.SECONDS));
    
            if (inRange.hasLowerBound()) {
                queryBuilder.setStart(new Date(inRange.lowerEndpoint().toInstant().toEpochMilli()));
            }
            if (inRange.hasUpperBound()) {
                queryBuilder.setEnd(new Date(inRange.upperEndpoint().toInstant().toEpochMilli()));
            }
    
            try {
                QueryResponse queryResponse = client.query(queryBuilder);
                List<String> errors = queryResponse.getErrors();
                if (!errors.isEmpty()) {
                    logger.error("Error while querying Kairosb! " + StringUtils.join(errors));
                    return new HashMap<>();
                }
    
                List<Result> results = queryResponse.getQueryResponse(metricName).getResults();
                TreeMap<ZonedDateTime, Double> valuesMap = new TreeMap<>();
                results.forEach(result -> {
                    if (CollectionUtils.isNotEmpty(result.getDataPoints())) {
                        result.getDataPoints().forEach(dataPoint -> {
                            try {
                                valuesMap.put(Instant.ofEpochMilli(dataPoint.getTimestamp()).atZone(UTC), dataPoint.getValue() == null ? null : dataPoint.doubleValue());
                            } catch (DataFormatException dfe) {
                                logger.error("Data format exception while reading the Kairos database response!", dfe);
                            }
                        });
                    }
                });
    
                TreeMap<ZonedDateTime, Double> overrideValues = new TreeMap<>();
                // fill the gaps
                valuesMap.entrySet().stream().filter(entry -> entry.getValue() == null).map(Map.Entry::getKey).forEach(gapTime -> {
                    // we can trust the entries to be not-null here because we trust Kairosdb to mark the gaps with null. And a gap is exactly a point with a missing left and right side.
                    Map.Entry<ZonedDateTime, Double> previous = valuesMap.floorEntry(gapTime.minus(1L, ChronoUnit.MILLIS));
                    Map.Entry<ZonedDateTime, Double> next = valuesMap.ceilingEntry(gapTime.plus(1L, ChronoUnit.MILLIS));
                    overrideValues.put(gapTime,
                                       Math.abs(next.getValue() + previous.getValue()) / 2);
                });
                overrideValues.forEach(valuesMap::put);
                return valuesMap;
            } catch (IOException ioex) {
                logger.error("I/O-Exception while querying the Kairos database!", ioex);
                return new TreeMap<>();
    
            } catch (URISyntaxException e) {
                logger.error("URISyntaxException while querying the Kairos database!", e);
                return new TreeMap<>();
            }
    
        }
    
    }
    
        2
  •  0
  •   ByeBye    8 年前

    不幸的是,kairosdb没有线性插值聚合器。也没有任何外部工作库 1.1.3-1 版本

    唯一的方法是编写自己的聚合器,并将其作为插件添加到kairos实例中。

    推荐文章