代码之家  ›  专栏  ›  技术社区  ›  Vaibhav Sharma

使用Servlet连接KafkaProducer

  •  0
  • Vaibhav Sharma  · 技术社区  · 8 年前

    我正在尝试使用servlet连接KafkaProducer,并在eclipse中设置我的项目。但当我运行KafkaProducer时,我遇到了一个例外:

    组织。阿帕奇。卡特琳娜。果心StandardWrapperValve调用严重: Servlet。上下文中servlet[simple\u kafka.SimpleProducer]的service() with path[/kafka\u web]引发异常[Servlet执行引发 异常]与根本原因java。lang.ClassNotFoundException: 组织。阿帕奇。卡夫卡。客户。制作人卡夫卡制作人 组织。阿帕奇。卡特琳娜。装载机。WebappClassLoaderBase。loadClass(WebappClassLoaderBase.java:1291) 在 组织。阿帕奇。卡特琳娜。装载机。WebappClassLoaderBase。loadClass(WebappClassLoaderBase.java:1119) 在simple_kafka。SimpleProducer。doPost(SimpleProducer.java:56)位于 javax。servlet。http。HttpServlet。服务(HttpServlet.java:661) javax。servlet。http。HttpServlet。服务(HttpServlet.java:742) 在 组织。阿帕奇。卡特琳娜。果心应用程序过滤器链。doFilter(ApplicationFilterChain.java:166) 在 组织。阿帕奇。公猫websocket。服务器WsFilter。doFilter(WsFilter.java:52) 在 组织。阿帕奇。卡特琳娜。果心应用程序过滤器链。internalDoFilter(ApplicationFilterChain.java:193) 在 组织。阿帕奇。卡特琳娜。果心应用程序过滤器链。doFilter(ApplicationFilterChain.java:166) 在 组织。阿帕奇。卡特琳娜。果心标准包装阀。调用(StandardWrapperValve.java:199) 在 在 组织。阿帕奇。卡特琳娜。验证器。AuthenticatorBase。调用(AuthenticatorBase.java:478) 在 组织。阿帕奇。卡特琳娜。果心标准主阀。调用(StandardHostValve.java:140) 在 组织。阿帕奇。卡特琳娜。阀门。错误报告阀。调用(ErrorReportValve.java:81) 在 组织。阿帕奇。卡特琳娜。阀门。抽象逻辑阀。调用(AbstractAccessLogValve.java:650) 在 组织。阿帕奇。卡特琳娜。果心标准发动机阀。调用(StandardEngineValve.java:87) 在 组织。阿帕奇。卡特琳娜。连接器。郊狼适配器。服务(CoyoteAdapter.java:342) 在 组织。阿帕奇。郊狼。http11.http11处理器。服务(Http11Processor.java:803) 在 组织。阿帕奇。郊狼。AbstractProcessorLight。进程(AbstractProcessorLight.java:66) 在 组织。阿帕奇。郊狼。AbstractProtocol$ConnectionHandler。过程(AbstractProtocol.java:868) 在 组织。阿帕奇。公猫util。网NioEndpoint$SocketProcessor。doRun(NioEndpoint.java:1459) 在 组织。阿帕奇。公猫util。网SocketProcessorBase。运行(SocketProcessorBase.java:49) 在 在 Java语言util。同时发生的ThreadPoolExecutor$Worker。运行(ThreadPoolExecutor.java:624) 组织。阿帕奇。公猫util。螺纹。TaskThread$WrappingRunnable。运行(TaskThread.java:61) 在java。lang.Thread。运行(Thread.java:748)

    我的KafkaProducerServlet如下:

    package simple_kafka;
    
    import java.io.IOException;
    import java.util.Properties;
    
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    //import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    /**
     * Servlet implementation class SimpleProducer
     */
    @WebServlet("/producer")
    public class SimpleProducer extends HttpServlet {
        private static final long serialVersionUID = 1L;
    
        /**
         * @see HttpServlet#HttpServlet()
         */
        public SimpleProducer() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        /**
         * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response)
         */
        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
            // TODO Auto-generated method stub
    //      response.getWriter().append("Served at: ").append(request.getContextPath());
        }
    
        /**
         * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse response)
         */
        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
            // TODO Auto-generated method stub
            doGet(request, response);
            response.getWriter().append("Served at: ").append(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
            String topicName = "ReplicaTopic";
              String key = "Key1";
              String value = "Value-1";
    
              Properties props = new Properties();
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.59:9092,192.168.1.59:9093");
              props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
              KafkaProducer<String, String> producer = new KafkaProducer <String, String>(props);
              try {
                  ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
                  producer.send(record);           
                  producer.close();
              }
              catch(Exception e) {
                  response.getWriter().append("Exception at: ").append((CharSequence) e);
              }
    //        response.sendRedirect("/consumer");
        }
    }
    

    我在项目属性的BuildPath中包含了以下JAR:

    1. 卡夫卡2.12-0.11.0.1。罐子
    2. 卡夫卡客户端-0.11.0.1。罐子
    3. slf4j-api-1.7.25。罐子
    1 回复  |  直到 8 年前
        1
  •  0
  •   Vaibhav Sharma    8 年前

    经过一天的搜索,我发现您需要包含JAR文件WEB-INF->lib文件夹,您需要从eclipse中删除服务器;重新配置。这样做之后,我就可以运行卡夫卡制作人了。

    推荐文章