代码之家  ›  专栏  ›  技术社区  ›  Vasu

动态Kafka消费者与AOP

  •  0
  • Vasu  · 技术社区  · 6 年前

    我有几个动态卡夫卡消费者(基于部门ID等),你可以找到下面的代码。

    基本上,我想记录每一次的时间。 onMessage() 方法调用,因此我创建了 @LogExecutionTime 方法自定义自定义注释并将其添加到 onMessage() 方法。 但是我的 logExecutionTime() 属于 LogExecutionTimeAspect 即使我的电话 onMessage() 当有关于主题的消息时调用,其他一切都正常工作。

    你能帮我弄清楚我遗漏了什么吗 LogExecutionTimeAspect 上课让它开始工作?

    日志执行时间:

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface LogExecutionTime {
    }
    

    LogExecutionTimeAspect类:

    @Aspect
    @Component
    public class LogExecutionTimeAspect {
        @Around("within(com.myproject..*) && @annotation(LogExecutionTime)")
        public Object logExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable {
            long startTime = System.currentTimeMillis();
            Object object = joinPoint.proceed();
            long endTime = System.currentTimeMillis();
            System.out.println(" Time taken by Listener ::"+(endTime-startTime)+"ms");
            return object;
        }
    }
    

    部门信息消费者类别:

    @Component
    public class DepartmentsMessageConsumer implements MessageListener  {
    
        @Value(value = "${spring.kafka.bootstrap-servers}" )
        private String bootstrapAddress;
    
        @PostConstruct
        public void init() {
            Map<String, Object> consumerProperties = new HashMap<>();
            consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                                         bootstrapAddress);
            consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "DEPT_ID_HERE");
            ContainerProperties containerProperties = 
                new ContainerProperties("com.myproj.depts.topic");
            containerProperties.setMessageListener(this);
            DefaultKafkaConsumerFactory<String, Greeting> consumerFactory =
                    new DefaultKafkaConsumerFactory<>(consumerProperties, 
                        new StringDeserializer(), 
                        new JsonDeserializer<>(Department.class));
            ConcurrentMessageListenerContainer container =
                    new ConcurrentMessageListenerContainer<>(consumerFactory, 
                                containerProperties);
            container.start();
        }
    
        @Override
        @LogExecutionTime
        public void onMessage(Object message) {
            ConsumerRecord record = (ConsumerRecord) message;
            Department department = (Department)record.value();
            System.out.println(" department :: "+department);
        }
    }
    

    应用程序启动程序类:

    @SpringBootApplication
    @EnableKafka
    @EnableAspectJAutoProxy
    @ComponentScan(basePackages = { "com.myproject" })
    public class ApplicationLauncher extends SpringBootServletInitializer { 
        public static void main(String[] args) {
            SpringApplication.run(ApplicationLauncher.class, args);
        }
    }
    

    编辑:

    我试过了 @EnableAspectJAutoProxy(exposeProxy=true) ,但不起作用。

    1 回复  |  直到 6 年前
        1
  •  1
  •   Artem Bilan    6 年前

    你应该考虑在 @EnableAspectJAutoProxy 以下内容:

    /**
     * Indicate that the proxy should be exposed by the AOP framework as a {@code ThreadLocal}
     * for retrieval via the {@link org.springframework.aop.framework.AopContext} class.
     * Off by default, i.e. no guarantees that {@code AopContext} access will work.
     * @since 4.3.1
     */
    boolean exposeProxy() default false;
    

    另一方面,有这样的事情,这将比aop更好:

    /**
     * A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case
     * is for third-party components to hook into the consumer applications for custom monitoring, logging, etc.
     *
     * <p>
     * This class will get consumer config properties via <code>configure()</code> method, including clientId assigned
     * by KafkaConsumer if not specified in the consumer config. The interceptor implementation needs to be aware that it will be
     * sharing consumer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
     * <p>
     * Exceptions thrown by ConsumerInterceptor methods will be caught, logged, but not propagated further. As a result, if
     * the user configures the interceptor with the wrong key and value type parameters, the consumer will not throw an exception,
     * just log the errors.
     * <p>
     * ConsumerInterceptor callbacks are called from the same thread that invokes {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}.
     * <p>
     * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
     */
    public interface ConsumerInterceptor<K, V> extends Configurable {
    

    更新

    @EnableAspectJAutoProxy(exposeProxy=true) 没有工作,我知道我可以使用拦截器,但我想让它与aop一起工作。

    那么我建议你考虑 DepartmentsMessageConsumer 以及 ConcurrentMessageListenerContainer 是的。我是说把它挪开 ConcurrentMessageListenerContainer 分开的 @Configuration 上课。这个 ApplicationLauncher 是一个很好的候选人。作为一个 @Bean 依赖于你的 部门信息消费者 注射用。关键是你需要给一个AOP一个机会 部门信息消费者 ,但是 @PostConstruct ,现在从卡夫卡实例化并开始消费还为时过早。