转载 https://www.w3cschool.cn/apache_kafka/apache_kafka_simple_producer_example.html
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception{
String topicName = "zhaox";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 10; i < 20; i++)
producer.send(new ProducerRecord<String,String>(topicName,Integer.toString(i),Integer.toString(i)));
System.out.println("send success");
producer.close();
}
}
linux有些陌生,在windows下调试的
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
//http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
public class KafkaConsumerDemo implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
KafkaConsumer<String, String> consumer;// = new KafkaConsumer<>(props);
public void run() {
try {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "0");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("zhaox"));
while (!closed.get()) {
ConsumerRecords<String, String> records = consumer.poll(10000);
// Handle new records
for (final ConsumerRecord<String, String> rc : records) {
System.out.println("msg=" + rc.value());
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (WakeupException e) {
if (!closed.get())
throw e;
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
public static void main(String[] args) {
KafkaConsumerDemo sub1 = new KafkaConsumerDemo();
Thread tsub1 = new Thread(sub1);
tsub1.start();
}
}
可以走通