淘先锋技术网

首页 1 2 3 4 5 6 7

转载 https://www.w3cschool.cn/apache_kafka/apache_kafka_simple_producer_example.html

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerDemo {
  
   public static void main(String[] args) throws Exception{
      String topicName = "zhaox";
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
            
      for(int i = 10; i < 20; i++)
		  producer.send(new ProducerRecord<String,String>(topicName,Integer.toString(i),Integer.toString(i)));
	  System.out.println("send success");
	  producer.close();
   }
}

linux有些陌生,在windows下调试的
在这里插入图片描述

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

//http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

public class KafkaConsumerDemo implements Runnable {
      private final AtomicBoolean closed = new AtomicBoolean(false);
      KafkaConsumer<String, String> consumer;// = new KafkaConsumer<>(props);

      public void run() {
            try {
                  Properties props = new Properties();
                  props.put("bootstrap.servers", "localhost:9092");
                  props.put("group.id", "0");
                  props.put("enable.auto.commit", "true");
                  props.put("auto.commit.interval.ms", "1000");
                  props.put("session.timeout.ms", "30000");
                  props.put("key.deserializer",
                              "org.apache.kafka.common.serialization.StringDeserializer");
                  props.put("value.deserializer",
                              "org.apache.kafka.common.serialization.StringDeserializer");
                  consumer = new KafkaConsumer<>(props);
                  consumer.subscribe(Arrays.asList("zhaox"));
                  while (!closed.get()) {
                        ConsumerRecords<String, String> records = consumer.poll(10000);
                        // Handle new records
                        for (final ConsumerRecord<String, String> rc : records) {
                              System.out.println("msg=" + rc.value());
                        }
                        try {
                              Thread.sleep(500);
                        } catch (InterruptedException e) {
                              e.printStackTrace();
                        }
                  }
            } catch (WakeupException e) {
                  if (!closed.get())
                        throw e;
            } finally {
                  consumer.close();
            }
      }

      public void shutdown() {
            closed.set(true);
            consumer.wakeup();
      }
      
      public static void main(String[] args) {
            KafkaConsumerDemo sub1 = new KafkaConsumerDemo();
            Thread tsub1 = new Thread(sub1);
            tsub1.start();
      }
}

可以走通
在这里插入图片描述