当前位置: 首页 > biancheng >正文

Kafka教程(二)API开发-生产者、消费者、topic

一、地址

1、实时更新的思维导图

https://www.mubucm.com/doc/4uqlpedefuj

2、图片

二、具体内容

  • 5.producer生产者
    • demo
      • 发送pro.send(new ProducerRecord<String,String>("test","123"))
      • ProducerRecord的属性
        • 重载方法234
          • 只topic和内容
          • 多key
            • hash(key)确定发送的分区
          • 多key和partition
            • key不起作用,发到指定分区
        • topic
        • partiton
        • headers
        • K
          • 用于hash计算分区
        • V
          • 发送的消息值
        • timestamp
      • ack应答机制
        • 用于保证数据发送可靠
    • 必要参数
      • bootstrap.servers:整个集群的地址
    • 发送模式
      • 发后即忘
        • producer.send(rcd)
      • 同步发送sync
        • send(rcd).get()
      • 异步发送async
        • send(rcd,new CallBack(){onCompletion(){xxxx}})
  • 6.consumer消费者
    • demo
      • 订阅多个topic
      • poll(time超时时间)获取消息ConsumerRecord
      • record包含的内容.get
        • offset
        • key
        • value
    • 必要参数
      • group id
      • bootstrap.servers
        • 无需设置整个集群,只需设置单个
    • 消息订阅
      • subscribe订阅
        • 重载方法*4
        • ConsumerRebalanceListener再均衡监听器?
        • 方式
          • 指定集合方式
          • 正则方式订阅
            • 传参Pattern.compile("ods_*")
      • assign订阅
        • consumer.assign(Collection<TopicPartition> collection)
        • 可以手动订阅多个topic的指定分区
        • 单个:Arrays.asList(new TopicPartition("topic1",0))
      • 区别
        • 粒度
          • topic粒度(group管理)
          • topic-partition粒度(自己管理)
        • 是否具有rebalance分区再均衡功能
      • 取消订阅
        • consumer.unsubscribe()
        • subscribe(new ArrayList<T>())
          • String
          • TopicPatition
    • 消费模式
      • 拉取模式poll(time)
        • time为阻塞时间(多久拉一次)
        • 设置为Long.MAX_VALUE,可以提高吞吐率
      • 消息类型ConsumerRecord
        • ConsumerRecords<String,String> records=consumer.poll(100)
        • ConsumerRecord rd=records.record(new TopicPartition("topic1",0))
    • 提交偏移量
      • 指定位移消费
        • consumer.seek(TopicPartition,offset)
      • 自动提交
        • 两个配置
          • auto.commit
          • interval.ms
            • 到点提交各分区最大位移
          • 默认true,5000
        • 存在问题
          • 重复消费
            • 消费者崩溃
          • 丢失消息
            • 拉取消息放入阻塞队列BlockingQueue
            • 阻塞队列的处理线程异常,从上次提交的位移处消费
            • 即实际消费到了3,已经提交了6
      • 手动提交
        • 调用API实现
          • 关闭自动提交:auto.commit设为false
          • 类型
            • 同步提交
              • commitSync()-处理完提交(提交和拉取会阻塞)
              • 含参commitAsync(Map<TopicPatition,OffsetAndMetaData>)
              • record获取offset,+1后作为Meta构造参数
              • 提交的偏移量是消费完record的偏移量+1
            • 异步提交
              • 提交和拉取不会阻塞,提高消费者性能
              • 重载的commitAsync
                • commitAsync()
                • commitAsync(OffsetCommitCallback)-带回调
                • commitAsync(Map<TopicPatition,OffsetAndMetaData>,OffsetCommitCallback)-指定分区&偏移量+回调
        • 手动提交的时机
          • 处理完成前提交
            • 存在漏处理(数据丢失)
            • 实现了at most once语义
          • 处理完成后提交
            • 存在重复处理/消费(数据重复)
            • 原因:处理后提交前出bug
            • 实现了at least once语义
          • 理想语义:Exactly once(精确一次)
            • ★通过kafka的事务机制实现
      • 提交方式总结
        • 全自动
        • 半自动
        • 全手动
        • 提交的位置:__consumer_offset
    • 重要参数介绍
      • 一次拉取的最大最小数据量
      • 拉取的最大等待时长
      • 每个分区拉取的最大数据量
      • 一次拉取的最大条数
      • 等待请求响应、闲置、重试间隔
      • 消息隔离级别:读未提交或已提交
        • read_uncommit:能消费到LSO
        • read_committed:可以消费到HW
      • 超时时长,超过认为消费者已离开cg
  • 7.topic管理
    • 工具类KafkaAdminClient
      • 用于集成内部系统,实现多功能的生态平台
      • 功能:管理broker、配置、ACL、管理topic
      • 创建方式:KafkaAdminClient.create(props)
    • 列出主题
      • listTopics()
    • 查看主题信息
      • describeTopics(Arrays.asList(xx,yy))
    • 创建主题
      • createTopics(new NewTopic(name,replicaAssignment))
    • 删除主题
      • deleteTopics(Arrays.asList(xx,yy))
    • 其他管理
      • 动态参数管理
        • 修改配置:alterConfigs(Map<ConfigResource,Config>)
      • 分区管理
        • 新增分区:createPartitions(Map<String,NewPartitions> map)
以上内容整理于 幕布文档

相关文章:

  • 牛客练习赛#84 F 莫比乌斯反演+杜教筛+技巧+斐波那契数列和gcd的结论+矩阵快速幂
  • ZZNUOJ_用C语言编写程序实现1342:支配值数目(附完整源码)
  • java毕业设计后勤管理系统餐饮评价监督系统(附源码、数据库)
  • 前端基础学习笔记
  • 【TS】联合类型--类型断言--类型推断
  • 谈笑风声的秘密
  • QT影城网上售票系统
  • NetCDF数据在ArcMap中的使用
  • 打怪升级(考验思路)
  • 持续精进,改变自己