必威-必威-欢迎您

必威,必威官网企业自成立以来,以策略先行,经营致胜,管理为本的商,业推广理念,一步一个脚印发展成为同类企业中经营范围最广,在行业内颇具影响力的企业。

以下是通过java代码获取kafka最新offset必威:,根

2019-09-21 22:23 来源:未知

根据时间戳获取kafka的topic的偏移量,结果获取的偏移量量数据组的长度为0,就会出现如下的数组下标越界的异常,实现的原理是使用了kafka的getOffsetsBefore()方法:

转自:

之前笔者曾经写过通过scala的方式获取kafka最新的offset

既然包名是api,说明里面肯定都是一些常用的Kafka API了。

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException : 0

 

但是大多数的情况我们需要使用java的方式进行获取最新offset

一、ApiUtils.scala

at co.gridport.kafka.hadoop.KafkaInputFetcher.getOffset(KafkaInputFetcher.java:126)

Maven依赖包:

以下是通过java代码获取kafka最新offset

顾名思义,就是一些常见的api辅助类,定义的方法包括:

at co.gridport.kafka.hadoop.TestKafkaInputFetcher.testGetOffset(TestKafkaInputFetcher.java:68)

 

GetOffsetShellWrap

  1. readShortString: 从一个ByteBuffer中读取字符串长度和字符串。这个ByteBuffer的格式应该是:2个字节的字符串长度值N+N个字节的字符串

  2. writeShortString: 与readShortString相反,先写入2个字节的长度N,然后写入N个字节到ByteBuffer中

  3. shortStringLength: 获取符合上面方法中格式的ByteBuffer长度——即2+N

  4. readIntInRange: 返回ByteBuffer当前位置出的一个整数值并判断是否在给定的范围内。如果不在直接抛出异常,但其实调用这个方法时总是传入Int.MaxValue,所以通常要是满足的。这个整数值可以代表分区数、分区id、副本数、ISR数或topic数

  5. readShortInRange: 与readIntInRange类似,只是这个方法读取一个2字节的short数,这个short数通常都是被用作error code

  6. readLongInRange: 与前两个类似,只是它读取一个Long型的数,不过这个方法貌似没有被调用过

at co.gridport.kafka.hadoop.TestKafkaInputFetcher.main(TestKafkaInputFetcher.java:80)

  1. <dependency>  
  2.         <groupId>org.apache.kafka</groupId>  
  3.         <artifactId>kafka-clients</artifactId>  
  4.         <version>0.8.2.1</version>  
  5. </dependency>  
  6.           
  7. <dependency>  
  8.     <groupId>org.apache.kafka</groupId>  
  9.     <artifactId>kafka_2.11</artifactId>  
  10.     <version>0.8.2.1</version>  
  11. </dependency>  

publicclass GetOffsetShellWrap {privatestaticLoggerlog= LoggerFactory.getLogger(GetOffsetShellWrap.class);privateStringtopic;privateintport;privateStringhost;privateinttime;publicGetOffsetShellWrap(Stringtopic,intport,Stringhost,inttime) {this.topic = topic;this.port = port;this.host = host;this.time = time; }publicMap getEveryPartitionMaxOffset() {//1.获取topic所有分区 以及每个分区的元数据 => 返回 Map<分区id,分区元数据>TreeMap partitionIdAndMeta = findTopicEveryPartition(); Mapmap=newHashMap();for(Entry entry : partitionIdAndMeta.entrySet {intleaderPartitionId = entry.getKey();//2.根据每个分区的元数据信息 ==> 获取leader分区的主机StringleadBroker = entry.getValue().leader;StringclientName ="Client_"+ topic +"_"+ leaderPartitionId; SimpleConsumer consumer =newSimpleConsumer(leadBroker, port,100000,64*1024, clientName);//3.从leader主机获取分区的offsetlongreadOffset = getLastOffset(consumer, topic, leaderPartitionId, clientName);map.put(String.valueOf(leaderPartitionId),String.valueOf(readOffset));if(consumer !=null) consumer.close(); }returnmap; }privateTreeMap findTopicEveryPartition(){ TreeMapmap=newTreeMap(); SimpleConsumer consumer =null;try{ consumer =newSimpleConsumer(host, port,100000,64*1024,"leaderLookup"+newDate().getTime; List topics = Collections.singletonList; TopicMetadataRequest req =newTopicMetadataRequest; kafka.javaapi.TopicMetadataResponse resp = consumer.send; List metaData = resp.topicsMetadata();if(metaData!=null&& !metaData.isEmpty{ TopicMetadata item = metaData.get;for(PartitionMetadata part : item.partitionsMetadata {map.put(part.partitionId; } } }catch(Exception e) { e.printStackTrace(); }finally{if(consumer !=null) consumer.close(); }returnmap; }privatelonggetLastOffset(SimpleConsumer consumer,Stringtopic,intleaderPartitionId,StringclientName) { TopicAndPartition topicAndPartition =newTopicAndPartition(topic,leaderPartitionId); Map requestInfo =newHashMap(); requestInfo.put(topicAndPartition,newPartitionOffsetRequestInfo; kafka.javaapi.OffsetRequest request =newkafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore;if(response.hasError {log.error("Error fetching data Offset Data the Broker. Reason: "+ response.errorCode(topic, leaderPartitionId));return0; }long[] offsets = response.offsets(topic, leaderPartitionId);returnoffsets[0]; } }

二、RequestOrResponse.scala

OffsetResponse(0,Map([barrage_detail,0] -> error: kafka.common.UnknownException offsets: ))

代码如下:

GetOffsetShellWrapJavaTest

Kafka中有很多种客户请求(request),该文件定义了一个Request object抽象出了所有请求共同拥有的属性:

源码如下:

 

publicclass GetOffsetShellWrapJavaTest {publicstaticvoidmain(String[] args) {intport =9092;Stringtopic ="2017-11-6-test";inttime =-1; GetOffsetShellWrap offsetSearch =newGetOffsetShellWrap(topic,port,"hadoop-01",time); Mapmap= offsetSearch.getEveryPartitionMaxOffset();for(Stringkey:map.keySet { System.out.println(key+"---"+map.get; } }}

OrdinaryConsumerId: 表示follower的副本id

/*

 

结果输出:

DebuggingConsumerId: 仅供Debug使用

* 得到partition的offset Finding Starting Offset for Reads

  1. import java.util.Properties;  
  2.   
  3. import org.apache.kafka.clients.producer.Callback;  
  4. import org.apache.kafka.clients.producer.KafkaProducer;  
  5. import org.apache.kafka.clients.producer.ProducerRecord;  
  6. import org.apache.kafka.clients.producer.RecordMetadata;  
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.   
  10. public class KafkaProducerTest {  
  11.       
  12.     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);  
  13.       
  14.     private static Properties properties = null;  
  15.       
  16.     static {  
  17.         properties = new Properties();  
  18.         properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
  19.         properties.put("producer.type", "sync");  
  20.         properties.put("request.required.acks", "1");  
  21.         properties.put("serializer.class", "kafka.serializer.DefaultEncoder");  
  22.         properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");  
  23.         properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
  24. //      properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  25.         properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
  26. //      properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  27.     }  
  28.       
  29.     public void produce() {  
  30.         KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);  
  31.         ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(  
  32.                 "test", "kkk".getBytes(), "vvv".getBytes());  
  33.         kafkaProducer.send(kafkaRecord, new Callback() {  
  34.             public void onCompletion(RecordMetadata metadata, Exception e) {  
  35.                 if(null != e) {  
  36.                     LOG.info("the offset of the send record is {}", metadata.offset());  
  37.                     LOG.error(e.getMessage(), e);  
  38.                 }  
  39.                 LOG.info("complete!");  
  40.             }  
  41.         });  
  42.         kafkaProducer.close();  
  43.     }  
  44.   
  45.     public static void main(String[] args) {  
  46.         KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();  
  47.         for (int i = 0; i < 10; i++) {  
  48.             kafkaProducerTest.produce();  
  49.         }  
  50.     }  
  51. }  

0---160961---159302---16099

isValidBrokerId: 是否是合法的Broker id,必须是非负值

*/

 

必威 1

下面还定义了一个抽象类,这个类特别重要,因为后面所有种类的请求或响应都继承了该类

public Long getOffset(Long time) throws IOException {

  1. import java.util.List;  
  2. import java.util.Map;  
  3. import java.util.Properties;  
  4.   
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;  
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;  
  7. import org.apache.kafka.clients.consumer.ConsumerRecords;  
  8. import org.apache.kafka.clients.consumer.KafkaConsumer;  
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. public class KafkaConsumerTest {  
  13.       
  14.     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);  
  15.       
  16.     public static void main(String[] args) {  
  17.         Properties properties = new Properties();  
  18.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,   
  19.                 "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
  20.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");              
  21.         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");              
  22.         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");  
  23.         properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");  
  24. //      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");  
  25.         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");    
  26.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   
  27.                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
  28.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,   
  29.                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
  30.           
  31.         KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);  
  32.         kafkaConsumer.subscribe("test");  
  33. //      kafkaConsumer.subscribe("*");  
  34.         boolean isRunning = true;              
  35.         while(isRunning) {  
  36.             Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);  
  37.             if (null != results) {  
  38.                 for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {  
  39.                     LOG.info("topic {}", entry.getKey());  
  40.                     ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();  
  41.                     List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();  
  42.                     for (int i = 0, len = records.size(); i < len; i++) {  
  43.                         ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);  
  44.                         LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());  
  45.                         try {  
  46.                             LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));  
  47.                         } catch (Exception e) {  
  48.                             LOG.error(e.getMessage(), e);  
  49.                         }  
  50.                     }  
  51.                 }  
  52.             }  
  53.         }  
  54.           
  55.         kafkaConsumer.close();    
  56.           
  57.     }  
  58.   
  59. }  

”我自己是一名从事了十余年的后端的老程序员,辞职后目前在做讲师,近期我花了一个月整理了一份最适合2018年学习的JAVA干货(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)从事后端的小伙伴们都可以来了解一下的,这里是程序员秘密聚集地,各位还在架构师的道路上挣扎的小伙伴们速来。“

RequestOrResponse类——即请求或响应类

TopicAndPartition topicAndPartition = new TopicAndPartition(this.topic , this.partition );

 

加QQ群:611481448

如果是表示请求,那么子类必须传入一个requestId表示请求的种类(具体种类在RequestKeys中定义);如果是表示响应,那么子类不需要传入任何参数直接调用无参构造函数。这个多功能类定义了4个抽象方法:

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

发现KafkaConsumer的poll方法未实现

  1. sizeInBytes: 计算请求或响应所占字节数

  2. writeTo: 将请求或响应写入ByteBuffer

  3. handleError: 主要用于处理请求时的错误

  4. describe: ​只用于请求,返回对该请求的一个描述字符串

requestInfo.put( topicAndPartition, new PartitionOffsetRequestInfo;

 

三、RequestKeys.scala

kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

  1. @Override  
  2. public Map<String, ConsumerRecords<K,V>> poll(long timeout) {  
  3.      // TODO Auto-generated method stub  
  4.      return null;  
  5. }  

定义了所有的请求种类,包括ProduceKey、FetchKey、OffsetsKey等,每种请求都有一个编号。另外还定义了一个Map将请求种类编号与读取请求或响应的函数关联起来以及两个对应的方法分别返回请求种类的名称以及对应的解析函数。

requestInfo, kafka.api.OffsetRequest.CurrentVersion(), this. client_id);

后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行

四、GenericRequestAndHeader.scala

OffsetResponse response = this. kafka_consumer.getOffsetsBefore;

 

一个抽象类,继承了RequestOrResponse类,自然也要实现RequestOrResponse类定义的4个抽象方法

if ( response.hasError {

  1. import java.nio.ByteBuffer;  
  2. import java.util.ArrayList;  
  3. import java.util.Collections;  
  4. import java.util.HashMap;  
  5. import java.util.List;  
  6. import java.util.Map;  
  7.   
  8. import kafka.api.FetchRequest;  
  9. import kafka.api.FetchRequestBuilder;  
  10. import kafka.api.PartitionOffsetRequestInfo;  
  11. import kafka.cluster.Broker;  
  12. import kafka.common.ErrorMapping;  
  13. import kafka.common.TopicAndPartition;  
  14. import kafka.javaapi.FetchResponse;  
  15. import kafka.javaapi.OffsetRequest;  
  16. import kafka.javaapi.OffsetResponse;  
  17. import kafka.javaapi.PartitionMetadata;  
  18. import kafka.javaapi.TopicMetadata;  
  19. import kafka.javaapi.TopicMetadataRequest;  
  20. import kafka.javaapi.TopicMetadataResponse;  
  21. import kafka.javaapi.consumer.SimpleConsumer;  
  22. import kafka.message.MessageAndOffset;  
  23.   
  24. public class KafkaSimpleConsumerTest {  
  25.       
  26.     private List<String> borkerList = new ArrayList<String>();    
  27.         
  28.     public KafkaSimpleConsumerTest() {    
  29.         borkerList = new ArrayList<String>();    
  30.     }    
  31.     
  32.     public static void main(String args[]) {    
  33.         KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();    
  34.         // 最大读取消息数量    
  35.         long maxReadNum = Long.parseLong("3");    
  36.         // 订阅的topic    
  37.         String topic = "test";    
  38.         // 查找的分区    
  39.         int partition = Integer.parseInt("0");    
  40.         // broker节点  
  41.         List<String> seeds = new ArrayList<String>();    
  42.         seeds.add("centos.master");    
  43.         seeds.add("centos.slave1");    
  44.         seeds.add("centos.slave2");    
  45.         // 端口    
  46.         int port = Integer.parseInt("9092");    
  47.         try {    
  48.             kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);    
  49.         } catch (Exception e) {    
  50.             System.out.println("Oops:" + e);    
  51.             e.printStackTrace();    
  52.         }    
  53.     }    
  54.     
  55.     public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {    
  56.         // 获取指定topic partition的元数据    
  57.         PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);    
  58.         if (metadata == null) {    
  59.             System.out.println("can't find metadata for topic and partition. exit");    
  60.             return;    
  61.         }    
  62.         if (metadata.leader() == null) {    
  63.             System.out.println("can't find leader for topic and partition. exit");    
  64.             return;    
  65.         }    
  66.         String leadBroker = metadata.leader().host();    
  67.         String clientName = "client_" + topic + "_" + partition;    
  68.     
  69.         SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);    
  70.         long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);    
  71.         int numErrors = 0;    
  72.         while (maxReadNum > 0) {    
  73.             if (consumer == null) {    
  74.                 consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);    
  75.             }    
  76.             FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();    
  77.             FetchResponse fetchResponse = consumer.fetch(req);    
  78.     
  79.             if (fetchResponse.hasError()) {    
  80.                 numErrors++;    
  81.                 short code = fetchResponse.errorCode(topic, partition);    
  82.                 System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);    
  83.                 if (numErrors > 5)    
  84.                     break;    
  85.                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {    
  86.                     readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);    
  87.                     continue;    
  88.                 }    
  89.                 consumer.close();    
  90.                 consumer = null;    
  91.                 leadBroker = findNewLeader(leadBroker, topic, partition, port);    
  92.                 continue;    
  93.             }    
  94.             numErrors = 0;    
  95.     
  96.             long numRead = 0;    
  97.             for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {    
  98.                 long currentOffset = messageAndOffset.offset();    
  99.                 if (currentOffset < readOffset) {    
  100.                     System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);    
  101.                     continue;    
  102.                 }    
  103.     
  104.                 readOffset = messageAndOffset.nextOffset();    
  105.                 ByteBuffer payload = messageAndOffset.message().payload();    
  106.     
  107.                 byte[] bytes = new byte[payload.limit()];    
  108.                 payload.get(bytes);    
  109.                 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));    
  110.                 numRead++;    
  111.                 maxReadNum--;    
  112.             }    
  113.     
  114.             if (numRead == 0) {    
  115.                 try {    
  116.                     Thread.sleep(1000);    
  117.                 } catch (InterruptedException ie) {    
  118.                 }    
  119.             }    
  120.         }    
  121.         if (consumer != null)    
  122.             consumer.close();    
  123.     }    
  124.      
  125.     /** 
  126.      * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker 
  127.      * @param seedBrokers 
  128.      * @param port 
  129.      * @param topic 
  130.      * @param partition 
  131.      * @return 
  132.      */  
  133.     private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {    
  134.         PartitionMetadata partitionMetadata = null;    
  135.         loop: for (String seedBroker : seedBrokers) {    
  136.             SimpleConsumer consumer = null;    
  137.             try {    
  138.                 consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");    
  139.                 List<String> topics = Collections.singletonList(topic);    
  140.                 TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);    
  141.                 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);    
  142.     
  143.                 List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();    
  144.                 for (TopicMetadata topicMetadata : topicMetadatas) {    
  145.                     for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {    
  146.                         if (pMetadata.partitionId() == partition) {    
  147.                             partitionMetadata = pMetadata;    
  148.                             break loop;    
  149.                         }    
  150.                     }    
  151.                 }    
  152.             } catch (Exception e) {    
  153.                 System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);    
  154.             } finally {    
  155.                 if (consumer != null)    
  156.                     consumer.close();    
  157.             }    
  158.         }    
  159.         if (partitionMetadata != null) {    
  160.             borkerList.clear();    
  161.             for (Broker replica : partitionMetadata.replicas()) {    
  162.                 borkerList.add(replica.host());    
  163.             }    
  164.         }    
  165.         return partitionMetadata;    
  166.     }    
  167.     
  168.     public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {    
  169.         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);    
  170.         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();    
  171.         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));    
  172.         OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);    
  173.         OffsetResponse response = consumer.getOffsetsBefore(request);    
  174.         if (response.hasError()) {    
  175.             System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));    
  176.             return 0;    
  177.         }    
  178.         long[] offsets = response.offsets(topic, partition);    
  179.         return offsets[0];    
  180.     }    
  181.     
  182.     private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {    
  183.         for (int i = 0; i < 3; i++) {    
  184.             boolean goToSleep = false;    
  185.             PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);    
  186.             if (metadata == null) {    
  187.                 goToSleep = true;    
  188.             } else if (metadata.leader() == null) {    
  189.                 goToSleep = true;    
  190.             } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {    
  191.                 goToSleep = true;    
  192.             } else {    
  193.                 return metadata.leader().host();    
  194.             }    
  195.             if (goToSleep) {    
  196.                 try {    
  197.                     Thread.sleep(1000);    
  198.                 } catch (InterruptedException ie) {    
  199.                 }    
  200.             }    
  201.         }    
  202.         System.out.println("unable to find new leader after broker failure. exit");    
  203.         throw new Exception("unable to find new leader after broker failure. exit");    
  204.     }    
  205.     
  206. }   
  1. writeTo: 写入版本、correlationId、clientId和body

  2. sizeInBytes: 2个字节的版本号+4个字节的correlation号+(2 + N)个字节的客户端id+body的字节数

  3. toString/describe: 两个方法一起构成了请求的描述字符串

log.error( "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(this.topic, this. partition));

五、GenericResponseAndHeader.scala

throw new IOException ( "Error fetching kafka Offset by time:" + response.errorCode(this.topic, this. partition));

与GenericRequestAndHeader对应的response类,代码中写的是extends RequestOrResponse(requestId),由于所有response应该是extends RequestOrResponse(),所以我谨慎的怀疑它这里写错了,不过反正requestId也没有在该类中使用。该类因为继承了RequestOrResponse,自然也要实现那4个方法: writeTo,sizeInBytes, toString和describe。这里就不赘述了。

}

 

// if (response.offsets(this.topic, this.partition).length == 0){

下面将Request和Response组合在一起说了,

// return getOffset(kafka.api.OffsetRequest

六、ProducerRequest.scala/ProducerResponse.scala

// .EarliestTime;

在具体说对应的request/response之前,先说说Kafka通用的request和response的结构:(以下大部分内容来自:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest)

// }

RequestOrResponse => size + (requestMessage 或者 responseMessage)。其中size是一个32位的整数,表明后面request或response的长度。

return response.offsets( this. topic, this. partition)[0];

Request格式 => ApiKey + ApiVersion + CorrelationId + ClientId + RequestMessage

}

——ApiKey: SHOTRT类型的整数,标识这个request的类型,比如是metadata request、producer request还是fectch request等,具体定义在RequestKeys.scala中

返回的response对象会有error: kafka.common.UnknownException offsets如下异常:

——ApiVersion: SHORT类型的整数,主要用于迭代升级使用,目前的值是0,比如说增加一些request的字段,版本变为1等。不过目前统一是0

OffsetResponse(0,Map([barrage_detail,0] -> error: kafka.common.UnknownException offsets: ))

——CorrelationId: 4个字节的整数,在服务器端和客户端关联reponse使用

同时呢,response.hasError()检查不到error。

——ClientId: 用户自定义的一个名称,可用于记录日志、监控使用。比如监控不同应用产生的请求数

TAG标签:
版权声明:本文由必威发布于必威-编程,转载请注明出处:以下是通过java代码获取kafka最新offset必威:,根