您的位置: 首页 - 站长

shopify建站公司营销型网站的运营配套不包括

当前位置: 首页 > news >正文

shopify建站公司,营销型网站的运营配套不包括,马鞍山网站建设公司,网站注册便宜目录 一、参考二、路由规则#xff08;分片规则#xff09;三、触发重复消费的场景场景一#xff1a;触发rebalance问题描述可能原因实际影响参数在kafka0.10.1 之前:在kafka0.10.1之后#xff1a;解决方案 场景二#xff1a;服务宕机可能原因解决方案 消息幂等性 四、kaf… 目录 一、参考二、路由规则分片规则三、触发重复消费的场景场景一触发rebalance问题描述可能原因实际影响参数在kafka0.10.1 之前:在kafka0.10.1之后解决方案 场景二服务宕机可能原因解决方案 消息幂等性 四、kafka参数特性常见配置 五、死信队列方案一工厂类KafkaListenerContainerFactory配置工厂类死信队列消费者 方案二错误ConsumerAwareListenerErrorHandler配置消费者 六、顺序消费场景 一、参考 kafka如何解决重复消费? Kafka重复消费 二、路由规则分片规则 发一个消息如何知道消息被默认分片到哪里 1.如果没有指定key,是随机分片 2.如果指定了key即 kafkaTemplate.send(topic, null, jsonValue); 可以套用一下公式计算: key.hashCode() % 12例如有一个topic 叫test,有8个patition,key“1”则日志文件在 1.hashCode() % 81在 ***/log/test-1/ 目录下面 三、触发重复消费的场景 场景一触发rebalance 参考使用Kafka时一定要注意防止消费速度过慢触发rebalance而导致的重复消费 参考spring设置kafka超时时间没有生效的解决方法(解决rebalancing问题) rebalance就是kafka认为消费者已经离线或者挂掉就会触发rebalance把消息分配给新的消费者kafka重新平衡是按group 即当消费速度过慢时有可能会触发rebalance, 这批消息被分配到另一个消费者然后新的消费者还会消费过慢再次rebalance, 这样一直恶性循环下去。发生这种情况最明显的标志就是日志里能看到CommitFailedException异常然后还会带上下面一段话 问题描述 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 可能原因 原因1强行kill线程导致消费后的数据offset没有提交消费系统宕机、重启等。原因2设置offset为自动提交关闭kafka时如果在close之前调用 consumer.unsubscribe() 则有可能部分offset没提交下次重启会重复消费。原因3:重复消费最常见的原因消费后的数据当offset还没有提交时partition就断开连接。比如通常会遇到消费的数据处理很耗时导致超过了Kafka的session timeout时间0.10.x版本默认是30秒那么就会re-blance重平衡此时有一定几率offset没提交会导致重平衡后重复消费。原因4当消费者重新分配partition的时候可能出现从头开始消费的情况导致重发问题。原因5当消费者消费的速度很慢的时候可能在一个session周期内还未完成导致心跳机制检测报告出问题。原因6并发很大可能在规定的时间session.time.out默认30s内没有消费完就会可能导致reblance重平衡导致一部分offset自动提交失败然后重平衡后重复消费 实际影响参数 这里我们需要明确一下在Kafka 0.10.1.0以后的版本中影响rebalance触发的参数有三个说明如下 spring.kafka.properties.session.timeout.ms(默认10秒10000) 这个参数定义了当broker多久没有收到consumer的心跳请求后就触发rebalance默认值是10s。在0.10.1.0之前的版本中由于心跳请求是在poll()拉取消息的方法中执行的因此如果当前批次处理消息耗时太长就会导致consumer没有机会按时发送心跳broker认为消费者已死触发rebalance。在0.10.1.0或更新的版本中解决了这个问题心跳请求会在单独的线程中发送因此就不会出现因为消息处理过长而发不出心跳的问题了。而每次发送心跳请求的时间 spring.kafka.properties.heartbeat.interval.ms 3000默认三秒 spring.kafka.properties.max.poll.interval.ms默认值为5分钟300000 这个参数定义了两次poll()之间的最大间隔默认值为5分钟。如果超过这个间隔同样会触发rebalance。在多数情况下这个参数是导致rebalance消息重复的关键即业务处理消息耗时太长导致一直没有commit确认收到的消息然后超过了消费者设置的最大拉取时间。有人可能会疑惑如果5分钟都没处理完消息那肯定时出了问题其实不然。能否在5min内处理完还取决于你每次拉取了多少条消息如果一次拿到了成千上万条的话5min就够呛了。有也可能是某个消费者节点正在调试导致线程一直阻塞在那里然后超过了最大拉取时间. spring.kafka.consumer.max.poll.records 这个参数定义了poll()方法最多可以返回多少条消息默认值为500。注意这里的用词是最多也就是说如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条就只返回500。这个默认值是比较坑人的如果你的消息处理逻辑比较重比如需要查数据库调用接口甚至是复杂计算那么你很难保证能够在5min内处理完500条消息也就是说如果上游真的突然大爆发生产了成千上万条消息而平摊到每个消费者身上的消息达到了500的又无法按时消费完成的话就会触发rebalance, 然后这批消息会被分配到另一个消费者中还是会处理不完又会触发rebalance, 这样这批消息就永远也处理不完而且一直在重复处理。
在kafka0.10.1 之前: 检查整个消费者死亡和检查消费则处理线程使用的同一个线程如果设置的max.poll.interval.ms大于session.timeout.ms遇到一个处理时间过长的消息会由于线程忙于处理消息而无法发送心跳导致kafka认为改消费则已完全死亡进而进行Rebalance 所以推荐设置heartbeat.inerval.ms max.poll.interval.ms session.timeout.ms 在kafka0.10.1之后 session.timeout.ms 和 max.poll.interval.ms 解耦了拆成了两个线程不用再担心它们之间的依赖关系 推荐设置heartbeat.interval.ms session.timeout.ms 解决方案 要避免出现上述问题也很简单那就是提前评估好处理一条消息最长需要多少时间然后务必覆盖默认的max.poll.records参数。在spring-kafka中这个原生参数对应的参数项是max-poll-records。对于消息处理比较重的操作建议把这个值改到50以下会保险一些。 调整几个参数 spring.kafka.properties.max.poll.interval.ms 600000 spring.kafka.consumer.max.poll.records 20 spring.kafka.properties.session.timeout.ms 25000# spring设置kafka参数session超时时间时要小于请求超时时间与处理超时时间例如request.timeout.ms 30000 session.timeout.ms 15000 max.poll.interval.ms 300000session.timeout.ms request.timeout.mssession.timeout.ms max.poll.interval.ms把这个组里比较重要的几个topic移动出去换到其它组(java里只需要改一行) //这里没有显式配置组用的是上方KafkaConfig.java里的commonGroup组 //KafkaListener(topics \({kafka.topic.commit})//改为了显式配置组把这个topic移动到新组 commitGroup KafkaListener(topics \){kafka.topic.commit}, groupId commitGroup)减少每次拉取的消息记录数和增大poll之间的时间间隔拉取到消息之后异步处理保证成功消费 场景二服务宕机 可能原因 消费者宕机、重启等。导致消息已经消费但是没有提交offset。 由于网络问题重复消费不可避免因此消费者需要实现消费幂等。#
解决方案 ①消息表 ②数据库唯一索引 ③缓存消费过的消息id
消息幂等性 可以通过redis.setnx方法 key topic:pardition:offset redis.setnx(key ,alue);如果没设置过返回1设置过返回0 四、kafka参数特性 新版kafka的broker幂等性具体实现原理   kafka每次发送消息会生成PID和Sequence Number并将这两个属性一起发送给brokerbroker会将PID和Sequence Number跟消息绑定一起存起来下次如果生产者重发相同消息broker会检查PID和    Sequence Number如果相同不会再接收。 PID每个新的 Producer 在初始化的时候会被分配一个唯一的 PID这个PID对用户完全是透明的。生产者如果重启则会生成新的PID。 常见配置 fetch.min.byte配置Consumer一次拉取请求中能从Kafka中拉取的最小数据量默认为1B如果小于这个参数配置的值就需要进行等待直到数据量满足这个参数的配置大小。调大可以提交吞吐量但也会造成延迟 fetch.max.bytes一次拉取数据的最大数据量默认为52428800B也就是50M但是如果设置的值过小甚至小于每条消息的值实际上也是能消费成功的 fetch.wait.max.ms若是不满足fetch.min.bytes时等待消费端请求的最长等待时间默认是500ms max.poll.records单次poll调用返回的最大消息记录数如果处理逻辑很轻量可以适当提高该值。一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完默认值为500 consumer.poll(100) 100 毫秒是一个超时时间一旦拿到足够多的数据fetch.min.bytes 参数设置consumer.poll(100)会立即返回 ConsumerRecordsString, String records。如果没有拿到足够多的数据会阻塞100ms但不会超过100ms就会返回 max.poll.interval.ms两次拉取消息的间隔默认5分钟通过消费组管理消费者时该配置指定拉取消息线程最长空闲时间若超过这个时间间隔没有发起poll操作则消费组认为该消费者已离开了消费组将进行再均衡操作将分区分配给组内其他消费者成员 若超过这个时间则报如下异常 org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.    即无法完成提交因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长这通常意味着poll循环花费了太多的时间来处理消息。 可以通过增加max.poll.interval.ms来解决这个问题也可以通过减少在poll()中使用max.poll.records返回的批的最大大小来解决这个问题 max.partition.fetch.bytes该属性指定了服务器从每个分区返回给消费者的最大字节数默认为 1MB。 session.timeout.ms消费者在被认为死亡之前可以与服务器断开连接的时间默认是 3s将触发再均衡操作 对于每一个Consumer GroupKafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事 维持Group成员的组成。这包括加入新的成员检测成员的存活性清除不再存活的成员。 协调Group成员的行为。 poll机制 ①每次poll的消息处理完成之后再进行下一次poll是同步操作 ②每次poll之前检查是否可以进行位移提交如果可以那么就会提交上一次轮询的位移 ③每次poll时consumer都将尝试使用上次消费的offset作为起始offset然后依次拉取消息 ④poll(long timeout)timeout指等待轮询缓冲区的数据所花费的时间单位是毫秒 五、死信队列 方案一工厂类KafkaListenerContainerFactory 配置工厂类 import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Scope; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.*; import org.springframework.stereotype.Component; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff;import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Optional;/
* 消费MQ的消息配置* author demo* create 2022-11-12*/Slf4j Component Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) ConditionalOnProperty(name mq.pipeline, havingValue IotConstant.MQ_KAFKA) public class KafkaReceiverConfig {Resourceprivate ConsumerFactoryString,String consumerFactory;Resourceprivate KafkaTemplateString,String kafkaTemplate;Beanpublic KafkaListenerContainerFactory? retryKafkaFactory() {ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);// 最大重试次数2次重试间隔10秒超过2次本身一次重试一次还没成功进入死信队列// 注意目前自动创建主题的配置关闭了需要提前手动去创建好死信队列主题 死信队列主题的命名方式原主题名称 .DLTfactory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, kafkaTemplate), (cr, e) - new TopicPartition(cr.topic() .DLT, cr.partition())), new FixedBackOff(10 * 1000L, 2L)));return factory;}}### 消费者java/*** 发送消息* param consumerRecord 消息记录* param topicGroupId 消费组/KafkaListener(topics #{${mq.alarm.inner.topic.name}.split(,)}, containerFactory retryKafkaFactory)public void consumer(ConsumerRecord?, String consumerRecord, Header(KafkaHeaders.GROUP_ID) String topicGroupId) {OptionalString kafkaMessage Optional.ofNullable(consumerRecord.value());if (kafkaMessage.isPresent()) {String id null;try {String json kafkaMessage.get();// todo…} catch (Throwable t) {// 判断是否可恢复异常if(isRecoverable(t)){// …IotCacheUtil.deleteMessageId(id);// 送入死信队列throw t;} else {log.error(消费失败 topic:{}, messageId:{}, offset:{}, partition:{} ,异常:{},consumerRecord.topic(),id,consumerRecord.offset(),consumerRecord.partition(),t);}}}}死信队列消费者 KafkaListener( topics iotAiAlarmInner.DLT)public void messageListenerDLT(ConsumerRecord?, String consumerRecord, Header(KafkaHeaders.GROUP_ID) String topicGroupId) {log.info(告警死信队列 topic:{}, offset:{}, partition:{} ,key:{}, message:{},consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),Optional.ofNullable(consumerRecord.value()).orElse(null));OptionalString kafkaMessage Optional.ofNullable(consumerRecord.value());if (kafkaMessage.isPresent()) {}}方案二错误ConsumerAwareListenerErrorHandler 配置 /** 消费MQ的消息配置* author demo* create 2022-11-12/Slf4j Component Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) ConditionalOnProperty(name mq.pipeline, havingValue IotConstant.MQ_KAFKA) public class KafkaReceiverConfig {Resourceprivate ConsumerFactoryString,String consumerFactory;Resourceprivate KafkaTemplateString,Object kafkaTemplate;/** 针对tag消息过滤* producer 将tag写进header里* return ConcurrentKafkaListenerContainerFactory*/Beanpublic ConcurrentKafkaListenerContainerFactoryString,String filterContainerFactory() {ConcurrentKafkaListenerContainerFactoryString,String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃factory.setAckDiscarded(true);factory.setRecordFilterStrategy(consumerRecord - {if (Optional.ofNullable(consumerRecord.value()).isPresent()) {for (Header header : consumerRecord.headers()) {if (header.key().equals(IotConstant.MQ_TAG) new String(header.value()).equals(new String(IotConstant.MQ_TAG_VALUE.getBytes(StandardCharsets.UTF_8)))) {return false;}}}//返回true将会被丢弃return true;});return factory;}Beanpublic ConsumerAwareListenerErrorHandler consumerIotAlarmAwareErrorHandler() {return new ConsumerAwareListenerErrorHandler() {Overridepublic Object handleError(Message? message, ListenerExecutionFailedException e, Consumer?, ? consumer) {log.error(consumerAwareErrorHandler receive : {}, error:{},message.getPayload(),e);//获取消息处理异常主题MessageHeaders headers message.getHeaders();/ListString topics headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);ListInteger partitions headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);ListLong offsets headers.get(KafkaHeaders.OFFSET, List.class);/MapTopicPartition, Long offsetsToReset new HashMap(); // String topicheaders.get(kafka_receivedTopic)TOPIC_DLT;String topiciotAlarmInner KafkaAlarmListener.TOPIC_DLT;//放入死信队列kafkaTemplate.send(topic,message.getPayload());return message;}};} } 消费者 import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.net.UnknownHostException; import java.util.Optional;/*** 内部自产自销消费者* author demo* create 2022-11-09/ Slf4j Component ConditionalOnProperty(name mq.pipeline, havingValue IotConstant.MQ_KAFKA) public class KafkaAlarmListener {PostConstructpublic void init() {log.info(启动了通用告警消费者);}public static final String TOPIC_DLT.DLT;Resourceprivate DeviceService deviceService;/** 发送消息* param consumerRecord 消息记录* param topicGroupId 消费组/KafkaListener(topics #{${mq.alarm.inner.topic.name}.split(,)},errorHandler consumerIotAlarmAwareErrorHandler,concurrency 3)public void consumer(ConsumerRecord?, String consumerRecord, Header(KafkaHeaders.GROUP_ID) String topicGroupId) {execute(consumerRecord, false);}/** 死信队列* param consumerRecord 消息记录* param topicGroupId 消费组*/KafkaListener( topics iotAlarmInnerTOPIC_DLT)public void deadConsumer(ConsumerRecord?, String consumerRecord, Header(KafkaHeaders.GROUP_ID) String topicGroupId) {log.info(告警死信队列 topic:{}, offset:{}, partition:{} ,key:{},consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key());execute(consumerRecord, true);}private void execute(ConsumerRecord?, String consumerRecord,boolean dead){String title dead?告警死信:告警;OptionalString kafkaMessage Optional.ofNullable(consumerRecord.value());if (kafkaMessage.isPresent()) {String id null;try {String json kafkaMessage.get();// 解析AlarmMessageDTO message JsonUtil.parse(json,AlarmMessageDTO.class);// 判断消息是否已处理id message.getId();if(json.length()10000){log.info({} topic:{}, offset:{}, partition:{} ,key:{}, messageId:{},title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),id);}else{log.info({} topic:{}, offset:{}, partition:{} ,key:{}, content:{}, messageId:{},title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),consumerRecord.key(),id, json);}if(IotCacheUtil.isExistMessageId(id)){log.error({} topic:{}, messageId:{} 重复消息,title,consumerRecord.topic(), id);return;}// 处理deviceService.uploadAlarm(message);} catch (Throwable t) {if(dead){log.error({} 消费异常 topic:{}, offset:{}, partition:{}, messageId:{} ,异常:{},title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),id,t);return;}// 判断是否可恢复异常if(isRecoverable(t)){// …IotCacheUtil.deleteMessageId(id);// 触发失败errorHandler死信队列throw t;} else {log.error({} 消费失败 topic:{}, offset:{}, partition:{}, messageId:{} ,异常:{},title,consumerRecord.topic(),consumerRecord.offset(),consumerRecord.partition(),id,t);}}}}private boolean isRecoverable(Throwable t){if(t instanceof NullPointerException){return false;}else if( t instanceof UnknownHostException){return false;}return true;}} 六、顺序消费 利用kafka的分区(pardition)功能通过生产者send的时候设置keykafka的broker会根据key计算hash发送到对应的分区 场景 比如用户三次修改名字 我们再发送消息的时候把userId设置为key这样保证三条消息都在一个pardition