molyeo 阅读(2) 评论(0)

本文主要实际编程讲解kafka生产者相关内容,版本kafka_2.11-0.10.1.0

安装

linux集群安装过程请参考http://www.cnblogs.com/molyeo/p/7151949.html
window安装过程如下:

下载zookeeper安装包(zookeeper-3.4.6),解压到D:\Program\zookeeper,并设置环境变量

  • 添加系统变量ZOOKEEPER_HOME=D:\Program\zookeeper,并在path后面添加:%ZOOKEEPER_HOME%\bin

  • zoo_sample.cfg重命名为zoo.cfg,修改内容如下:

      tickTime=4000
      initLimit=10
      syncLimit=5
      dataDir=D:/Program/zookeeper/data
      clientPort=2181
      maxClientCnxns=60
      server.1=localhost:2888:3888
  • D:/Program/zookeeper/data目录下新建文件myid,并用文本软件打开,填入数字1

下载kafka安装包(kafka_2.11-0.10.1.0),解压到D:\Program\kafka,并设置环境变量

  • 添加系统变量KAFKA_HOME=D:\Program\kafka,并在path后面添加:%KAFKA_HOME%\bin

  • 修改D:\Program\kafka\config\server.properties配置文件如下:

      broker.id=0
      advertised.listeners=PLAINTEXT://LAPTOP-2CBRDCI0:9092
      advertised.port=9092
      advertised.host.name=LAPTOP-2CBRDCI0
      log.dirs=D:/Program/kafka/data/kafka-logs
      zookeeper.connect=localhost:2181/kafka
      zookeeper.connection.timeout.ms=60000

启动zookeeper
双击脚本D:\Program\zookeeper\bin\zkServer.cmd

启动kafka

命令行运行
D:\Program\kafka\bin\windows\kafka-server-start.bat D:/Program/kafka/config/server.properties

kafka创建topic
D:\Program\kafka\bin\windows\kafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --create --topic TEST1 --replication-factor 1 --partitions 3

D:\Program\kafka\bin\windows\kafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --describe --topic TEST 

实践

依赖

kafka 0.10.1.0版本中采用KafkaProducer对象用来向kafka broker集群发送消息。
编写代码前先引入相关依赖包:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.1.0</version>
</dependency>

基本配置和发送流程

KafkaProducer是线程安全的,即可以跨线程共享单个KafkaProducer实例,我们先看单线程发送消息的示例,以了解kafka发送消息的流程。

package com.molyeo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by zhangkh on 2018/7/11.
 */
public class SinglekafkaProducerDemo {
    public static void  main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
        producer.flush();
        producer.close();
    }
}

我们先构建一个props实例,用于保存kafka配置。
acks
acks配置项表示消息的确认机制。
acks=0表示生者不会等服务端确认,消息被立即添加到socket buffer中,并认为已经发送。这种情况下由于客户端不知道消息是否真实发送成功,配置项中的重试次数项retries也不会生效(即不会重试),每条消息返回的offset值均为-1
acks=1表示消息的leader分区收到消息后则被视为消息已发送成功,不会等待副本分区确认。如果leader分区收到消息后,然后所在节点立即宕机,follower分区还来不同步,则消息丢失。
acks=all或者acks=-1,表示消息的leaderfollower分区均已收到后才被视为消息已成功发送。这是最严格的确认机制,只要至少min.insync.replicas还活着,则消息不会丢失。

retries
如果网络原因或者其他异常导致发送请求失败,生产端可以根据参数retries进行重试。

batch.size
生产者为每个分区维护未发送消息的缓冲区,缓冲区的大小及batch.size,默认配置为16384,即16KB

linger.ms
逗留时间,默认为0,即使缓冲区有其他未使用的空间,也可以立即发送。
如果我们希望减少服务端的压力,则可以延迟一定时间,待消息量比较大时批量发送。
简单点说,只要满足batch.sizelinger.ms中的一个条件,生产者发送线程则会发送请求,具体的要分析org.apache.kafka.clients.producer.internals.Sender类。

buffer.memory
生产者总的消息缓冲区,超过该大小,阻塞max.block.ms

生产者其他配置项可参考http://kafka.apache.org/0101/documentation.html#brokerconfigs

重点说一下KafkaProducer的send方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

这两个send方法均是异步发送,一旦将记录存储在待发送的缓冲区中,均立即返回,这允许并行发送许多记录而不会阻塞,以便在每个记录之后等待响应。
下面的send(ProducerRecord<K, V> record, Callback callback)方法提供了当消息发送成功时的回调,返回的结果RecordMetadata指定记录发送到的分区,分配的偏移量和记录的时间戳。
如果想阻塞同步发送,可以调用Future的get方法:

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

完全异步发送,则采用Callback参数来提供请求完成用的回调:

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   }
               });

多线程并发发送

为充分利用kafka的高吞吐量,生产端可以采用多线程并发发送消息,前文已提到过KafkaProducer是线程安全的,即可以跨线程共享单个KafkaProducer实例。
kafka实际配置类KafkaCommonConfig:

package com.molyeo.kafka;

/**
 * Created by zhangkh on 2018/7/10.
 */
public class KafkaCommonConfig {
    public static String BOOTSTRAP_SERVERS="LAPTOP-2CBRDCI0:9092";
    public static String ACKS="all";
    public static int RETRIES=0;
    public static int BATCH_SIZE=16384;
    public static int LINGER_MS=1;
    public static int BUFFER_MEMORY=33554432;
    public static String KEY_SERIALIZER_CLASS="org.apache.kafka.common.serialization.StringSerializer";
    public static String VALUE_SERIALIZER_CLASS= "org.apache.kafka.common.serialization.StringSerializer";
}

消息发送

package com.molyeo.kafka;

import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by zhangkh on 2018/7/5.
 */
public class MultiKafkaProducerDemo {
    private static final int PRODUCER_THREAD_NUM = 5;

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_THREAD_NUM);
        Producer<String, String> producer = new KafkaProducer<String, String>(getProducerConfig());
        String topic = "TEST";
        try {
            for (int i = 0; i < 20; i++) {
                Thread.sleep(20);
                String key = Integer.toString(i);
                String value = Long.toString(System.currentTimeMillis());
                ProducerRecord<String, String> record = new ProducerRecord<>(topic,i%3, key, value);
                executorService.submit(new CommonProducerThread<>(producer, record));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            //Block for a while
            Thread.sleep(60 * 1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            producer.flush();
            producer.close();
            executorService.shutdown();
        }
    }

    public static Properties getProducerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommonConfig.BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.ACKS_CONFIG, KafkaCommonConfig.ACKS);
        props.put(ProducerConfig.RETRIES_CONFIG, KafkaCommonConfig.RETRIES);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, KafkaCommonConfig.BATCH_SIZE);
        props.put(ProducerConfig.LINGER_MS_CONFIG, KafkaCommonConfig.LINGER_MS);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, KafkaCommonConfig.BUFFER_MEMORY);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.KEY_SERIALIZER_CLASS);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.VALUE_SERIALIZER_CLASS);
        return props;
    }
}


class CommonProducerThread<K, V> implements Runnable {
    Logger logger = LoggerFactory.getLogger(CommonProducerThread.class.getSimpleName());

    private final Producer producer;
    private final ProducerRecord<K, V> record;

    public CommonProducerThread(Producer producer, ProducerRecord record) {
        this.producer = producer;
        this.record = record;
    }

    @Override
    public void run() {
        logger.info("prepare to send msg:thread name={},key={},value={}", Thread.currentThread().getName(), record.key(), record.value());
        producer.send(record, new ProducerAckCallback(System.currentTimeMillis(), record.key(), record.value()));
    }
}

class ProducerAckCallback<K, V> implements Callback {
    Logger logger = LoggerFactory.getLogger(ProducerAckCallback.class.getSimpleName());
    private final long startTime;
    private final K key;
    private final V value;

    public ProducerAckCallback(long startTime, K key, V value) {
        this.startTime = startTime;
        this.key = key;
        this.value = value;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            logger.info("send success:key {},value {}, sent to partition {},offset {} in {} ms", key, value, metadata.partition(), metadata.offset(), elapsedTime);
        } else {
            exception.printStackTrace();
        }
    }
}

在程序中我们采用5个线程去发送20条消息,并且指定了消息的分区,每条消息的间隔20ms。输出结果如下:

18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=0,value=1531360688703
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=1,value=1531360688748
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=2,value=1531360688853
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=3,value=1531360688878
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=4,value=1531360688900
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=5,value=1531360688921
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=6,value=1531360688942
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 3,value 1531360688878, sent to partition 0,offset 59 in 91 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 0,value 1531360688703, sent to partition 0,offset 60 in 241 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 6,value 1531360688942, sent to partition 0,offset 61 in 17 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 1,value 1531360688748, sent to partition 1,offset 68 in 121 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 4,value 1531360688900, sent to partition 1,offset 69 in 71 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 2,value 1531360688853, sent to partition 2,offset 33 in 95 ms
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 5,value 1531360688921, sent to partition 2,offset 34 in 19 ms
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=7,value=1531360688976
18/07/12 09:58:08 INFO ProducerAckCallback: send success:key 7,value 1531360688976, sent to partition 1,offset 70 in 5 ms
18/07/12 09:58:08 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=8,value=1531360688997
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 8,value 1531360688997, sent to partition 2,offset 35 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=9,value=1531360689022
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 9,value 1531360689022, sent to partition 0,offset 62 in 3 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=10,value=1531360689043
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 10,value 1531360689043, sent to partition 1,offset 71 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=11,value=1531360689065
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 11,value 1531360689065, sent to partition 2,offset 36 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=12,value=1531360689089
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 12,value 1531360689089, sent to partition 0,offset 63 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=13,value=1531360689118
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 13,value 1531360689118, sent to partition 1,offset 72 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=14,value=1531360689141
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 14,value 1531360689141, sent to partition 2,offset 37 in 7 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-5,key=15,value=1531360689174
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 15,value 1531360689174, sent to partition 0,offset 64 in 5 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-2,key=16,value=1531360689198
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 16,value 1531360689198, sent to partition 1,offset 73 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-1,key=17,value=1531360689220
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 17,value 1531360689220, sent to partition 2,offset 38 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-3,key=18,value=1531360689245
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 18,value 1531360689245, sent to partition 0,offset 65 in 4 ms
18/07/12 09:58:09 INFO CommonProducerThread: prepare to send msg:thread name=pool-1-thread-4,key=19,value=1531360689266
18/07/12 09:58:09 INFO ProducerAckCallback: send success:key 19,value 1531360689266, sent to partition 1,offset 74 in 6 ms

本文总结了kafka生产者常用配置,并用多线程发送消息。

本文参考:
http://kafka.apache.org/0101/documentation.html#brokerconfigs