We encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client.
You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):org.apache.kafka kafka-clients
Examples showing how to use the producer are given in the
推荐大家都用新的Java client来代替旧的scala的client,
A Kafka client that publishes records to the Kafka cluster.
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); //ack方式,all,会等所有的commit最慢的方式 props.put("retries", 0); //失败是否重试,设置会有可能产生重复数据 props.put("batch.size", 16384); //对于每个partition的batch buffer大小 props.put("linger.ms", 1); //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满 props.put("buffer.memory", 33554432); //整个producer可以用于buffer的内存大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
The buffer.memory
controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted.
after which it throws a TimeoutException. producer所能buffer数据的大小,如果数据产生的比发送的快,那么这个buffer会耗尽,因为producer的send的异步的,会先放到buffer,但是如果buffer满了,那么send就会被block,并且当达到max.block.ms时会触发TimeoutException
ProducerRecordrecord = new ProducerRecord ("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
As of the 0.9.0 release we have added a new Java consumer to replace our existing high-level ZooKeeper-based consumer and low-level consumer APIs.
2.2.3 New Consumer API
This new unified consumer API removes the distinction between the 0.8 high-level and low-level consumer APIs. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
org.apache.kafka kafka-clients
Examples showing how to use the consumer are given in the
Detecting Consumer Failures
The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned.
: By increasing the session timeout, you can give the consumer more time to handle a batch of records returned from . The only drawback is that it will take longer for the server to detect hard consumer failures, which can cause a delay before a rebalance can be completed. However, clean shutdown with is not impacted since the consumer will send an explicit message to the server to leave the group and cause an immediate rebalance.max.poll.records
: Processing time in the poll loop is typically proportional to the number of records processed, so it's natural to want to set a limit on the number of records handled at once. This setting provides that. By default, there is essentially no limit.可以配置session.timeout.ms
如果要保证数据不丢,往往不会依赖auto commit,而是当逻辑处理完后,再手动的commit;如果处理延迟太长,该consumer已经超时,此时去做commit,会报 异常
Automatic Offset Committing
自动offset commit的例子,
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); //自动commit props.put("auto.commit.interval.ms", "1000"); //定时commit的周期 props.put("session.timeout.ms", "30000"); //consumer活性超时时间 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); //subscribe,foo,bar,两个topic while (true) { ConsumerRecords records = consumer.poll(100); //100是超时等待时间 for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
Manual Offset Control
手工commit offset,
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); //关闭自动commit props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List > buffer = new ArrayList<>(); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); //批量完成写入后,手工sync offset buffer.clear(); } }
上面的方式,批量的sync offset
The above example uses to mark all received messages as committed. In some cases you may wish to have even finer control over which messages have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the messages in each partition.
try { while(running) { ConsumerRecordsrecords = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { //按partition处理 List > partitionRecords = records.records(partition); //取出partition对应的Records for (ConsumerRecord record : partitionRecords) { //处理每条record System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //取出last offset consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); //独立的sync每个partition的offset } } } finally { consumer.close(); }
Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling you should add one to the offset of the last message processed.
Manual Partition Assignment
In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group.
However, in some cases you may need finer control over the specific partitions that are assigned.For example:
To use this mode, instead of subscribing to the topic using , you just call with the full list of partitions that you want to consume.
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
调用consumer.assign,指定该consumer读topic “foo”中的0,1两个partition
Once assigned, you can call in a loop, just as in the preceding examples to consume records.
The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to .然后仍然用poll读取records,仍然用定义的consumer group来committing offsets;但是这个对应关系,除非再次调用assign,否则不会改变
Controlling The Consumer's Position
In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to the most recent records without actually consuming the intermediate records.
Kafka allows specifying the position using to specify the new position. 重置某个partition的offset
Special methods for seeking to the earliest and latest offset the server maintains are also available ( and respectively).
Consumption Flow Control
If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.
One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider fetching other topics.
Kafka supports dynamic controlling of consumption flows by using and to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future calls.
Managing Consumer Groups
With the ConsumerGroupCommand tool, we can list, delete, or describe consumer groups. For example, to list all consumer groups across all topics:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --listtest-consumer-group
To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group test-consumer-groupGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERtest-consumer-group test-foo 0 1 3 2 test-consumer-group_postamac.local-1456198719410-29ccd54f-0
When you're using the where the broker handles coordination of partition handling and rebalance, you can manage the groups with the "--new-consumer" flags:
> bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server broker1:9092 --list
As of the 0.10.0 release we have added a new client library named Kafka Streams to let users implement their stream processing applications with data stored in Kafka topics.
org.apache.kafka kafka-streams
Kafka Streams allows for performing continuous computation on input coming from one or more input topics and sends output to zero or more output topics.
Mapprops = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages.
Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
只需要关闭producer的重发机制并且在收到message后就先直接commit,就可以达到at most once语义理论上,单靠kafka是无法实现 Exactly-once 的,需要配合其他如存储系统进行去重
Availability and Durability Guarantees
When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas.
Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify acks=all will succeed. However, these writes could be lost if the remaining replica also fails.Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability.
Therefore, we provide two topic-level configurations that can be used to prefer message durability over availability:当producer的设置为,acknowledgement by all replicas,这里的all replicas不是指AR,而是指ISR,所以虽然是3 replicas,但如果只有一个replica alive,那么只要这个replica ack就算是all ack
1. 关闭unclean leader election, 这样leader必须要是在isr中的replica,如果没有replica available,那么该partition会offline,这样可以在牺牲可用性的情况下,降低丢数据的可能性
2. Specify a minimum ISR size,如果ISR的数目小于这个值,那么这个partition就会直接offline
这个配置仅仅在acks=all时才有意义,这样只要有minimum个isr,并完成ack,我们就可以认为该all ack 明显这个做法也会大大降低可用性
Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition. It addresses use cases and scenarios such as restoring state after application crashes or system failure, or reloading caches after application restarts during operational maintenance.
Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id="test-client" has a produce quota of 10MB/sec, this is shared across all instances with that same id.
By default, each unique client-id receives a fixed quota in bytes/sec as configured by the cluster (quota.producer.default, quota.consumer.default). This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!
通过quota.producer.default, quota.consumer.default来限定某个client-id 的fixed quota in bytes/sec ;注意这个限流是per-broker的,而不是per-cluster的
How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under it's quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.
Client byte rate is measured over multiple small windows (e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly.
Typically, having large measurement windows (for e.g. 10 windows of 30 seconds each) leads to large bursts of traffic followed by long delays which is not great in terms of user experience.broker当发现quota violation时,不会直接拒绝响应,而是去delay response;这样对client的影响会比较小
并且我们在做流量统计的时候,是基于多个小时间窗口,这样更准确一些It is possible to override the default quota for client-ids that need a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides. Client-id overrides are written to ZooKeeper under/config/clients. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See for details.
当然你可以通过动态配置去修改某个client的quotas配置,Client-id overrides are written to ZooKeeper under/config/clients.
The following sets the default quota per producer and consumer client-id to 10MB/sec.
quota.producer.default=10485760 quota.consumer.default=10485760
It is also possible to set custom quotas for each client.
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name clientA --entity-type clientsUpdated config for clientId: "clientA".
Here's how to describe the quota for a given client.
> ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name clientA --entity-type clientsConfigs for clients:clientA are producer_byte_rate=1024,consumer_byte_rate=2048
In release, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:1. 可以对brokers和clients,brokers,tools之间的connection进行Authentication,使用SSL,SASL
2. 对brokers和zookeeper之间的连接进行Authentication
3. 数据传输用SSL加密,性能会下降
4. 对clients的读写操作进行Authorization
5. Authorization 是pluggable,并可以使用外部的authorization services
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.
Kafka Connect currently supports two modes of execution: standalone (single process) and distributed.
In standalone mode all work is performed in a single process. You can start a standalone process with the following command:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
> bin/connect-distributed.sh config/connect-distributed.properties
In particular, the following configuration parameters are critical to set before starting your cluster:
(default connect-cluster
) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDsconfig.storage.topic
(default connect-configs
) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.offset.storage.topic
(default connect-offsets
) - topic to use for storing offsets; this topic should have many partitions and be replicatedstatus.storage.topic
(default connect-status
) - topic to use for storing statuses; this topic can have multiple partitions and should be replicated上面4个配置是需要在connect-*.properties里面指定的
但是对于distributed的方式,use the REST API described below to create, modify, and destroy connectors.
- Unique name for the connector. Attempting to register again with the same name will fail.connector.class
- The Java class for the connectortasks.max
- The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.topics
- A list of topics to use as input for this connectorREST API
Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default this service runs on port 8083. The following are the currently supported endpoints:
GET /connectors
- return a list of active connectorsPOST /connectors
- create a new connector; the request body should be a JSON object containing a string name
field and a object config
field with the connector configuration parametersGET /connectors/{name}
- get information about a specific connector
比较有价值的改动是,使用新的Producer和Consumer client,尤其Consumer会大大降低之前使用low-level consumer的复杂度
至于stream API和connect,有些鸡肋