当前位置: 首页 > news >正文

建设一个网站用什么搭建国际新闻热点事件

建设一个网站用什么搭建,国际新闻热点事件,三门峡网站建设推广,莱芜雪野湖酒店总览 根据kafka的3.1.0的源码example模块进行分析#xff0c;如下图所示#xff0c;一般实例代码就是我们分析源码的入口。 可以将produce的发送主要流程概述如下#xff1a; 拦截器对发送的消息拦截处理#xff1b; 获取元数据信息#xff1b; 序列化处理#xff1b;…总览 根据kafka的3.1.0的源码example模块进行分析如下图所示一般实例代码就是我们分析源码的入口。 可以将produce的发送主要流程概述如下 拦截器对发送的消息拦截处理 获取元数据信息 序列化处理 分区处理 批次添加处理 发送消息。 总的大概是上面六个步骤下面将结合源码对每个步骤进行分析。 1. 拦截器  消息拦截器在消息发送开始阶段进行拦截this method does not throw exceptions注释加上代码可以看出即使拦截器抛出异常也不会中止我们的消息发送。 使用场景发送消息的统一处理类似spring的拦截器动态切入功能自定义拦截器打印日志、统计时间、持久化到本地数据库等。 Overridepublic FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions//1.拦截器对发送的消息拦截处理ProducerRecordK, V interceptedRecord this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}public ProducerRecordK, V onSend(ProducerRecordK, V record) {ProducerRecordK, V interceptRecord record;for (ProducerInterceptorK, V interceptor : this.interceptors) {try {interceptRecord interceptor.onSend(interceptRecord);} catch (Exception e) {// do not propagate interceptor exception, log and continue calling other interceptors// be careful not to throw exception from hereif (record ! null)log.warn(Error executing interceptor onSend callback for topic: {}, partition: {}, record.topic(), record.partition(), e);elselog.warn(Error executing interceptor onSend callback, e);}}return interceptRecord;} 2. 获取元数据信息 下图是发送消息主线程和发送网络请求sender线程配合获取元数据的流程: 首先找到获取kafka的元数据信息的入口maxBlockTimeMs最大的等待时间是60s: try {//2.获取元数据信息clusterAndWaitTime waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException(Producer closed while send in progress, e);throw e;}.define(MAX_BLOCK_MS_CONFIG,Type.LONG,60 * 1000,atLeast(0),Importance.MEDIUM,MAX_BLOCK_MS_DOC) 这里唤醒sender线程然后阻塞等待元数据信息 metadata.add(topic, nowMs elapsed);int version metadata.requestUpdateForTopic(topic);//唤醒线程更新元数据sender.wakeup();try {//阻塞等待metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException ex) {// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMsthrow new TimeoutException(String.format(Topic %s not present in metadata after %d ms.,topic, maxWaitMs));} 这里可以看一下 sender线程的初始化参数 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) 初始化内存池的大小为32M;.define(MAX_REQUEST_SIZE_CONFIG,Type.INT,1024 * 1024,atLeast(0),Importance.MEDIUM,MAX_REQUEST_SIZE_DOC) 默认单条消息最大为1M;构造函数中初始化了sender,并作为守护线程在后台运行 this.sender newSender(logContext, kafkaClient, this.metadata);String ioThreadName NETWORK_THREAD_PREFIX | clientId;this.ioThread new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start(); KafkaProducer(ProducerConfig config,SerializerK keySerializer,SerializerV valueSerializer,ProducerMetadata metadata,KafkaClient kafkaClient,ProducerInterceptorsK, V interceptors,Time time) {try {...this.maxRequestSize config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);this.totalMemorySize config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);this.compressionType CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.maxBlockTimeMs config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);int deliveryTimeoutMs configureDeliveryTimeout(config, log);this.apiVersions new ApiVersions();this.transactionManager configureTransactionState(config, logContext);this.accumulator new RecordAccumulator(logContext,config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));...this.errors this.metrics.sensor(errors);this.sender newSender(logContext, kafkaClient, this.metadata);String ioThreadName NETWORK_THREAD_PREFIX | clientId;this.ioThread new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug(Kafka producer started);} catch (Throwable t) {// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121close(Duration.ofMillis(0), true);// now propagate the exceptionthrow new KafkaException(Failed to construct kafka producer, t);}} .define(ACKS_CONFIG,Type.STRING,all,in(all, -1, 0, 1),Importance.LOW,ACKS_DOC) 默认所有broker同步消息才算发送成功 .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,Type.INT,5,atLeast(1),Importance.LOW,MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) 默认允许最多5个连接来发送消息如果需要保证顺序消息需要将其设置为1.Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {int maxInflightRequests configureInflightRequests(producerConfig);int requestTimeoutMs producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);ChannelBuilder channelBuilder ClientUtils.createChannelBuilder(producerConfig, time, logContext);ProducerMetrics metricsRegistry new ProducerMetrics(this.metrics);Sensor throttleTimeSensor Sender.throttleTimeSensor(metricsRegistry.senderMetrics);KafkaClient client kafkaClient ! null ? kafkaClient : new NetworkClient(new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),this.metrics, time, producer, channelBuilder, logContext),metadata,clientId,maxInflightRequests,producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),requestTimeoutMs,producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,throttleTimeSensor,logContext);short acks configureAcks(producerConfig, log);return new Sender(logContext,client,metadata,this.accumulator,maxInflightRequests 1,producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),acks,producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),metricsRegistry.senderMetrics,time,requestTimeoutMs,producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),this.transactionManager,apiVersions);} 等待元数据的版本更新挂起当前线程直到超时或者唤醒 public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {long currentTimeMs time.milliseconds();long deadlineMs currentTimeMs timeoutMs 0 ? Long.MAX_VALUE : currentTimeMs timeoutMs;time.waitObject(this, () - {// Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.maybeThrowFatalException();return updateVersion() lastVersion || isClosed();}, deadlineMs);if (isClosed())throw new KafkaException(Requested metadata update after close);}public void waitObject(Object obj, SupplierBoolean condition, long deadlineMs) throws InterruptedException {synchronized (obj) {while (true) {if (condition.get())return;long currentTimeMs milliseconds();if (currentTimeMs deadlineMs)throw new TimeoutException(Condition not satisfied before deadline);obj.wait(deadlineMs - currentTimeMs);}}} Sende通过NetWorkClient向kafak集群拉取元数据信息 Sendervoid runOnce() { client.poll(pollTimeout, currentTimeMs); }NetWorkClient:public ListClientResponse poll(long timeout, long now) {ensureActive();if (!abortedSends.isEmpty()) {// If there are aborted sends because of unsupported version exceptions or disconnects,// handle them immediately without waiting for Selector#poll.ListClientResponse responses new ArrayList();handleAbortedSends(responses);completeResponses(responses);return responses;}long metadataTimeout metadataUpdater.maybeUpdate(now);try {this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));} catch (IOException e) {log.error(Unexpected error during I/O, e);}// process completed actionslong updatedNow this.time.milliseconds();ListClientResponse responses new ArrayList();handleCompletedSends(responses, updatedNow);handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleInitiateApiVersionRequests(updatedNow);handleTimedOutConnections(responses, updatedNow);handleTimedOutRequests(responses, updatedNow);completeResponses(responses);return responses;} 如下代码 handleCompletedReceives处理返回元数据的响应然后调用handleSuccessfulResponse处理成功的响应最后调用ProducerMetadata更新本地元数据信息并唤醒了主线程。主线程获取到元数据后进行下面流程。 //NetWorkClient private void handleCompletedReceives(ListClientResponse responses, long now) {...if (req.isInternalRequest response instanceof MetadataResponse)metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);}public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {...this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now);}//ProducerMetadata public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {super.update(requestVersion, response, isPartialUpdate, nowMs);// Remove all topics in the response that are in the new topic set. Note that if an error was encountered for a// new topics metadata, then any work to resolve the error will include the topic in a full metadata update.if (!newTopics.isEmpty()) {for (MetadataResponse.TopicMetadata metadata : response.topicMetadata()) {newTopics.remove(metadata.topic());}}notifyAll();} 3. 序列化处理 根据初始化的序列化器将消息的key和value进行序列化以便后续发送网络请求 byte[] serializedKey;try {serializedKey keySerializer.serialize(record.topic(), record.headers(), record.key());System.out.println(serializedKey: Arrays.toString(serializedKey));} catch (ClassCastException cce) {throw new SerializationException(Cant convert key of class record.key().getClass().getName() to class producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() specified in key.serializer, cce);}byte[] serializedValue;try {serializedValue valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException(Cant convert value of class record.value().getClass().getName() to class producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() specified in value.serializer, cce);} 4.分区处理 消息有设置key根据hash值分区没有key采用粘性分区的方式详情可以看下面博客Kafka生产者的粘性分区算法_张家老院子的博客-CSDN博客 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;} 5.批次添加处理 如果第一次添加会为分区初始化一个双端队列然后获取批次为空会 ProducerBatch batch new ProducerBatch(tp, recordsBuilder, nowMs)创建一个新的批次放到队列中dq.addLast(batch); public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock,boolean abortOnNewBatch,long nowMs) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer buffer null;if (headers null) headers Record.EMPTY_HEADERS;try {// check if we have an in-progress batchDequeProducerBatch dq getOrCreateDeque(tp);synchronized (dq) {if (closed)throw new KafkaException(Producer closed while send in progress);RecordAppendResult appendResult tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult ! null)return appendResult;}// we dont have an in-progress record batch try to allocate a new batchif (abortOnNewBatch) {// Return a result that will cause another call to append.return new RecordAppendResult(null, false, false, true);}byte maxUsableMagic apiVersions.maxUsableProduceMagic();int size Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));log.trace(Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms, size, tp.topic(), tp.partition(), maxTimeToBlock);// 内存池中分配内存buffer free.allocate(size, maxTimeToBlock);// Update the current time in case the buffer allocation blocked above.nowMs time.milliseconds();synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException(Producer closed while send in progress);RecordAppendResult appendResult tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult ! null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesnt happen often...return appendResult;}MemoryRecordsBuilder recordsBuilder recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch new ProducerBatch(tp, recordsBuilder, nowMs);FutureRecordMetadata future Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,callback, nowMs));dq.addLast(batch);incomplete.add(batch);// Dont deallocate this buffer in the finally block as its being used in the record batchbuffer null;return new RecordAppendResult(future, dq.size() 1 || batch.isFull(), true, false);}} finally {if (buffer ! null)free.deallocate(buffer);appendsInProgress.decrementAndGet();}} 然后主流程中会再次调用添加此时有了批次将能够添加成功 result accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); 6.发送消息 批次满了或者创建了新的批次将唤醒消息发送线程 if (result.batchIsFull || result.newBatchCreated) {log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), partition);this.sender.wakeup();} 后续将更加深入分析kafka的NIO源码,探究它怎么多到高性能的。
http://www.dnsts.com.cn/news/37060.html

相关文章:

  • 论坛网站模板广州专业展台制作价格
  • 杭州seo整站优化老薛主机wordpress慢
  • 上海做网站seo东营区建设局网站
  • 手机怎么样做网站网络推广方法有
  • 可以做兼职的网站有哪些工作室导航网站好处
  • 建设资格执业注册中心网站湖南省住房与城乡建设网站
  • 网站面包屑导航怎么做的wordpress侧边栏显示子分类文字数
  • 免费观看行情软件网站下载余姚网站建设 熊掌号
  • 老版本网站开发工具温州网络推广服务好吗
  • 广西网站建设价格低淄博网站建设专家
  • 网站的总体架构写wordpress
  • 网站开发公司安心加盟建站平台哪个最好
  • 陕西建设网官方网站怎么学网站开发
  • 做暧在线网站门户网站的建设
  • 建设工程网站什么时候可以同步做网站讯息
  • 网站信息内容建设责任制落实情况建德营销型网站建设
  • 鄢陵县北京网站建设网络营销上市公司
  • 开通网站空间旅游网站开题报告
  • 怎么做北京pk10的网站jsp网站开发源码实例
  • 东莞企业网站建设哪家好英文网站建设需要注意的五点问题
  • 成华区微信网站建设公农产品电子商务网站建设要求
  • 南昌企业网站开发网页制作软件frontpage2000属于
  • 境内境外网站区别wordpress调用置顶文章
  • 网站建设清单网络销售平台
  • 网上做网站赚钱劳务派遣好还是外包好
  • 定制开发app方案情感网站seo
  • 建设微网站网站制作需要多少钱?
  • 广州网站营销优化开发飞飞cms官网
  • 给公司建立网站京东当前网站做的营销活动
  • wordpress怎么加快网站打开速度关键词优化和seo