2020/3/6
Kafka Producer API
-
producer.send (new ProducerRecord<byte[],byte[]>(topIc, partition, key1, value1), callback);
(1) ProducerRecord - asynchronously send message to the topic
(2) callback - the callback sent back to the user when the server confirm the record -
public void send(KeyedMessage<k,v> message)
-sends the data to a single topic, partitioned by key using either sync or async producer. -
public void send(List<KeyedMessage<k,v>>messages)
-sends data to multiple topics. - SETTING PRODICERCONFIG
properties prop = new Properties();
prop.put(producer.type,”async”) asynchronization -> asynchronous
ProducerConfig config = new ProducerConfig(prop);
-
public void flush()
-confirm that all the sending function has finished -
public Map metrics()
-partition for get partition metadata of specifical topic - producer API
- clinet.id
- producer.type
-async/sync - acks ???
acks配置控制生產(chǎn)者請求下的標(biāo)準(zhǔn)是完全的。
- bootstrapping broker list
- linger.ms
- retry
-setting linger.ms higher than specifical value to reduce requests - key.serializer
- value.serializer
- batch.size
- buffer.memory
- ProducerRecord
public ProducerRecord ( string topic, int partition, k key, v value)
-
public ProducerRecord ( string topic, k key, v value)
-no partition -
public ProducerRecord ( string topic, v value)
-either partiton and key
SimpleProducer application
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer"
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “l(fā)ocalhost:9092");
//your ip:host based on config/servers.properties
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
}
System.out.println(“Message sent successfully");
producer.close();
}
}
-
compile
javac "$KAFKA_HOME/libs/*" SimpleProducer.java
-
run
java "$KAFKA_HOME/libs/*" SimpleProducer
kafka consumer API
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
- consumer api
-
public java.util.Set< TopicPar- tition> assignment()
-get partition set currently divided by user -
public string subscription()
-get dynamic signature in specific topics -
public void subscribe(java.util.List< java.lang.String> topics茧跋,ConsumerRe-balanceListener listener)
-get dynamic signature in specific topics. -
public void unsubscribe()
-cancel subscription -
public void subscribe(java.util.List< java.lang.String> topics)
-if topics is null, share same function with unsubscribe public void subscribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)
public void assign(java.util.List< TopicPartion> partitions)
-
poll()
-get data from partition -
public void commitSync()
-return the offset -
public void seek(TopicPartition partition,long offset)
-comsumer will adopt current offset in next poll public void resume
public void wakeup
PublicRecord API
-
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
-get record from kafka group -
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<Consumer-Record>K,V>>> records)
-as the container of record-
public int count()
- count of all topics -
public Set partitions()
- return partition with data set public Iterator()
public List records()
-
ConsumerClient API
- broker list
- group.id
- enable.auto.commit
- auto.commit.interval.ms
- session.timeout.ms
SimpleConsumer
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}