首页 技术 正文
技术 2022年11月20日
0 收藏 766 点赞 3,508 浏览 4105 个字

kafka 0.10.0.0 released

 

Interceptors的概念应该来自flume

参考,http://blog.csdn.net/xiao_jun_0820/article/details/38111305

比如,flume提供的

Timestamp Interceptor

Host Interceptor

Static Interceptor

Regex Filtering Interceptor

Regex Extractor Interceptor

可以对于流过的message进行一些包装,比如插入时间,host,或做些过滤等etl操作

 

所以kafka在producer和consumer端也都提供这样的Interceptors接口,

 

ProducerInterceptor

/**
* A plugin interface to allow things to intercept events happening to a producer record,
* such as sending producer record or getting an acknowledgement when a record gets published
*/
public interface ProducerInterceptor<K, V> extends Configurable {
/**
* This is called when client sends record to KafkaProducer, before key and value gets serialized.
* @param record the record from client
* @return record that is either original record passed to this method or new record with modified key and value.
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); /**
* This is called when the send has been acknowledged
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). The metadata information may be only partially filled, if an error occurred. Topic will be always set, and if partition is not -1, partition will be set partition set/assigned to this record.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception); /**
* This is called when interceptor is closed
*/
public void close();
}

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned.

If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called.

多个multiple interceptors之间是可以串联的

ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread.

 

ConsumerInterceptor

/**
* A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
* by a client.
*/
public interface ConsumerInterceptor<K, V> extends Configurable {
/**
* This is called when the records are about to be returned to the client.
* @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
* @return records that is either original 'records' passed to this method or modified set of records
*/
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); /**
* This is called when offsets get committed
* This method will be called when the commit request sent to the server has been acknowledged.
* @param offsets A map of the offsets and associated metadata that this callback applies to
*/
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); /**
* This is called when interceptor is closed
*/
public void close();
}

onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords.

onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread.

 

总结,

Interceptor作为一种plugin可以做些,对message的decorate或cleaning或filtering等一些轻量的工作,最主要的用途还是用于监控,trace message

Interceptor可以串联执行

Interceptor必须要轻量,因为如果耗时就会影响链路的throughput

 

confluent公司也提供相应的interceptor产品,用于data stream的监控

http://docs.confluent.io/3.0.0/control-center/docs/clients.html

 

同时,为了更好的监控和audit

Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32).

We propose to add remaining record’s metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit.

For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.

We will add checksum and record size fields to RecordMetadata and ConsumerRecord.

public final class RecordMetadata {

private final long offset;

private final TopicPartition topicPartition;

private final long checksum;                <<== NEW: checksum of the record

private final int size;                     <<== NEW: record size in bytes(before compression)

 

public final class ConsumerRecord<K, V> {

.......

private final long checksum;               <<== NEW: checksum of the record

private final int size;                    <<== NEW: record size in bytes (after decompression)

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,994
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,507
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,350
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,135
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,768
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,845