当前位置: 首页 > news >正文

福建省建设资格注册中心网站视频制作软件app下载

福建省建设资格注册中心网站,视频制作软件app下载,asp网站 手机登录,网站编程培训机构文章目录Pulsar Functions(轻量级计算框架)基础定义工作流程函数运行时处理保证和订阅类型窗口函数定义窗口类型滚动窗口滑动窗口函数配置函数示例有状态函数示例窗口函数示例自定义函数开发定义原生语言接口示例Pulsar函数SDK示例Pulsar Functions(轻量级计算框架) 基础定义 … 文章目录Pulsar Functions(轻量级计算框架)基础定义工作流程函数运行时处理保证和订阅类型窗口函数定义窗口类型滚动窗口滑动窗口函数配置函数示例有状态函数示例窗口函数示例自定义函数开发定义原生语言接口示例Pulsar函数SDK示例Pulsar Functions(轻量级计算框架) 基础定义 Function instance(函数实例)是函数执行框架的核心元素由以下元素组成: 消费来自不同输入主题的消息的消费者的集合。调用函数的执行程序。将函数的结果发送到输出主题的生产者。 函数实例的内部工作流 一个函数可以有多个实例每个实例执行一个函数的副本。可以在配置文件中指定实例数。函数实例中的使用者使用FQFN作为订阅者名以基于订阅类型在多个实例之间实现负载平衡。订阅类型可以在函数级别指定。每个函数都有一个单独的FQFN状态存储。可指定一个状态接口以便在BookKeeper中持久化中间结果。其他用户可以查询函数的状态并提取这些结果。 工作流程 Function worker 是一个逻辑组件用于在Pulsar Functions的集群模式部署中监视、编排和执行单个函数。每个函数实例都可以作为线程或进程执行具体取决于所选的配置。如果Kubernetes集群可用则可以在Kubernetes中以StatefulSets的形式生成函数。 Function worker的内部架构和工作流如下 用户向REST服务器发送请求以执行函数实例。REST服务器响应请求并将请求传递给功能元数据管理器。函数元数据管理器将请求更新写入函数元数据主题。并跟踪所有与元数据相关的消息并使用函数元数据主题持久化函数的状态更新。函数元数据管理器从函数元数据主题读取更新并触发调度管理器计算分配。日程管理器将作业更新写入作业主题。函数运行时管理器侦听分配主题读取分配更新并更新其内部状态该状态包含所有工作人员的所有分配的全局视图如果更新更改了工作对象上的赋值函数运行时管理器将通过启动或停止函数实例的执行来具体化新的赋值。会员管理要求协调主题选举一个领导工作者所有工作人员都订阅故障转移订阅中的协调主题但活动的工作人员成为领导者并执行分配从而保证该主题只有一个活动消费者。成员管理器从协调主题读取更新。 函数运行时 函数实例是在运行时内调用的许多实例可以并行运行。Pulsar支持三种不同成本和隔离保证的函数运行时类型以最大限度地提高部署灵活性。可以根据需要使用其中之一来运行函数。 线程运行时每个实例都作为一个线程运行。由于线程模式的代码是用Java编写的所以它只适用于Java实例。当函数以线程模式运行时它与函数工作者运行在同一个Java虚拟机(JVM)上。进程运行时每个实例都作为一个进程运行。当函数以进程模式运行时它运行在函数工作者运行的同一台机器上。Kubernetes运行时函数由worker以Kubernetes StatefulSet的形式提交每个函数实例作为pod运行。Pulsar支持在启动函数时向Kubernetes StatefulSets和服务添加标签这有助于选择目标Kubernetes对象。 处理保证和订阅类型 Pulsar提供了三种不同的消息传递语义可以将它们应用于一个函数。根据ack时间节点确定不同的传递语义实现。 At-most-once最多一次发送到函数的每个消息都将尽最大努力处理。不能保证消息是否会被处理。当选择此语义时autoAck配置必须设置为true否则启动将失败(autoAck配置将在将来的版本中弃用)。Ack时间节点:函数处理之前。At-least-once最少一次发送到函数的每个消息都可以被处理多次(以防处理失败或重新交付)。如果创建函数时没有指定——processing- guaranteed标志则该函数提供至少一次交付保证。Ack时间节点:发送消息到输出后。Effectively-once精确一次送到函数的每条消息都可以被处理多次但它只有一个输出。重复的消息将被忽略。有效地在至少一次处理和有保证的服务器端重复数据删除的基础上实现一次。这意味着一个状态更新可以发生两次但是相同的状态更新只应用一次另一个重复的状态更新在服务器端被丢弃。Ack时间节点:发送消息到输出后。Manual当选择这个语义时框架不会执行任何ack操作需要在函数中调用context.getCurrentRecord().ack()方法来手动执行ack操作。Ack时间节点:在函数方法中自定义。 提示 默认情况下Pulsar函数提供至少一次交付保证。如果创建函数时没有为——processing guaranteed标志提供值则该函数提供至少一次保证。排他订阅类型在Pulsar函数中不可用原因 如果只有一个实例exclusive等于故障转移。如果有多个实例exclusive可能会在函数重新启动时崩溃并重新启动。在这种情况下排他不等于故障转移。因为当主消费者断开连接时所有未确认的和后续的消息都被传递到下一个。 要将订阅类型从shared更改为key_shared可以在pulse -admin中使用- retain-key- ordered选项。 可以在创建函数时设置函数的处理保证。如下面的命令创建了一个应用了“精确一次”保证的函数。 bin/pulsar-admin functions create \--name my-effectively-once-function \--processing-guarantees EFFECTIVELY_ONCE \可以使用update命令更改应用于函数的处理保证。 bin/pulsar-admin functions update \--processing-guarantees ATMOST_ONCE \窗口函数 定义 目前窗口函数仅在Java中可用并且不支持MANUAL和effective -once delivery语义。窗口函数是跨数据窗口(即事件流的有限子集)执行计算的函数。如下图所示流被划分为“桶”其中可以应用函数。 函数的数据窗口定义包含两个策略: 清除策略:控制在窗口中收集的数据量。触发策略:控制何时触发一个函数并执行该函数以根据清除策略处理窗口中收集的所有数据。 触发策略和驱逐策略都由时间或计数驱动。 提示 同时支持处理时间和事件时间。处理时间是根据函数实例构建和处理窗口时的壁时间定义的。窗口完整性的判断很简单您不必担心数据到达混乱。 窗口类型 滚动窗口 滚动窗口将元素分配给具有指定时间长度或计数的窗口。滚动窗口的驱逐策略总是基于窗口已满。因此只需要指定触发器策略基于计数或基于时间。在具有基于计数的触发策略的滚动窗口中如以下示例所示触发策略被设置为2。当窗口中有两个项目时无论时间如何都会触发并执行每个函数。 相反如下面的示例所示滚动窗口的窗口长度为10秒这意味着当10秒的时间间隔过去时函数将被触发而不管窗口中有多少事件。 滑动窗口 滑动窗口方法通过设置清除策略来限制保留用于处理的数据量并使用滑动间隔设置触发器策略来定义固定的窗口长度。如果滑动间隔小于窗口长度则存在数据重叠这意味着同时落入相邻窗口的数据将被多次用于计算。如下面的示例所示窗口长度为2秒这意味着任何超过2秒的数据都将被清除不会在计算中使用。滑动间隔被配置为1秒这意味着该函数每秒执行一次以处理整个窗口长度内的数据。 函数配置 在独立的Pulsar中创建和验证函数(包括有状态函数和窗口函数)的分步说明和示例 在conf/broker.conf文件(对于Pulsar standalone, conf/standalone.conf)中将functionsWorkerEnabled设置为true。vim conf/broker.conf functionsWorkerEnabledtrue如果是standalone Pulsar 在conf/standalone.conf文件中增加上面的字段。 重启broker bin/pulsar-daemon stop broker bin/pulsar-daemon start broker检查Pulsar Function集群 bin/pulsar-admin functions-worker get-cluster函数示例 使用官方的函数示例演示查看根目录下examples文件夹 创建租户和命名空间 bin/pulsar-admin tenants create my-test bin/pulsar-admin namespaces create my-test/my-namespace bin/pulsar-admin namespaces list my-test修改 vim examples/example-function-config.yaml tenant: my-test namespace: my-namespace name: example className: org.apache.pulsar.functions.api.examples.ExclamationFunction inputs: [persistent://my-test/my-namespace/test_src] userConfig:PublishTopic: persistent://my-test/my-namespace/test_resultoutput: persistent://my-test/my-namespace/test_result autoAck: true parallelism: 1创建函数 bin/pulsar-admin functions create \--function-config-file examples/example-function-config.yaml \--jar examples/api-examples.jar查看函数的配置 bin/pulsar-admin functions get \--tenant my-test \--namespace my-namespace \--name example查看状态 bin/pulsar-admin functions status \--tenant my-test \--namespace my-namespace \--name example消费消息 bin/pulsar-client consume persistent://my-test/my-namespace/test_result -s my-subscription -p Earliest -n 0生产消息 bin/pulsar-client produce persistent://my-test/my-namespace/test_src --messages test-messages-date -n 10查看消费者的输出 有状态函数示例 在BookKeeper中启用streamStorage服务。目前服务使用的是NAR包需要在conf/bookkeeper.conf文件中进行配置。vim conf/bookkeeper.conf ### Grpc Server ### # ## the grpc server port to listen on. default is 4181 storageserver.grpc.port4181 # #### Dlog Settings for table service ### # ##### Replication Settings dlog.bkcEnsembleSize3 dlog.bkcWriteQuorumSize2 dlog.bkcAckQuorumSize2 # #### Storage ### # ## local storage directories for storing table ranges data (e.g. rocksdb sst files) storage.range.store.dirsdata/bookkeeper/ranges # ## whether the storage server capable of serving readonly tables. default is false. storage.serve.readonly.tablesfalse # ## the cluster controller schedule interval, in milliseconds. default is 30 seconds. storage.cluster.controller.schedule.interval.ms30000创建vim examples/example-stateful-function-config.yaml tenant: my-test namespace: my-namespace name: word_count className: org.apache.pulsar.functions.api.examples.WordCountFunction inputs: [persistent://my-test/my-namespace/wordcount_src] # this function will read messages from these topics autoAck: true parallelism: 1创建函数 bin/pulsar-admin functions create \--function-config-file examples/example-stateful-function-config.yaml \--jar examples/api-examples.jar查询带有itxs键的函数的状态表。该操作监视与itxs相关的更改。 bin/pulsar-admin functions querystate \--tenant my-test \--namespace my-namespace \--name word_count -k itxs -w消费消息 bin/pulsar-client consume persistent://my-test/my-namespace/wordcount_result -s my-subscription -p Earliest -n 0bin/pulsar-client consume test_wordcount_dest -s my-subscription -p Earliest -n 0生产消息 bin/pulsar-client produce persistent://my-test/my-namespace/wordcount_src --messages itxs -n 10窗口函数示例 创建vim examples/example-stateful-function-config.yaml tenant: my-test namespace: my-namespace name: window-example className: org.apache.pulsar.functions.api.examples.AddWindowFunction inputs: [persistent://my-test/my-namespace/window_src] userConfig:PublishTopic: persistent://my-test/my-namespace/window_resultoutput: persistent://my-test/my-namespace/window_result autoAck: true parallelism: 1 windowConfig:windowLengthCount: 10slidingIntervalCount: 5创建函数 bin/pulsar-admin functions create \--function-config-file examples/example-window-function-config.yaml \--jar examples/api-examples.jar消费消息 bin/pulsar-client consume -s test-sub -n 0 persistent://my-test/my-namespace/window_result生产消息 bin/pulsar-client produce -m 3 -n 10 persistent://my-test/my-namespace/window_src查看消费窗口输出 自定义函数开发 定义 Pulsar 函数支持Java、Python和Go等语言如果是Java语言则支持下面三类接口 使用原生语言接口不需要特定于Pulsar的库或特殊依赖(只需要JDK核心库)适合于不需要访问上下文的函数。使用Pulsar函数SDK特定于脉冲星的库提供了语言本机接口中无法提供的一系列功能例如状态管理或用户配置适用于需要访问上下文的函数。扩展Pulsar函数SDK对特定于pulse的库的扩展在Java中提供初始化和关闭接口。适用于需要初始化和释放外部资源的函数。 原生语言接口示例 新建Maven工程Pom文件内容如下 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.itxs/groupIdartifactIdpulsar-demo/artifactIdversion1.0/versionpropertiesmaven.compiler.source17/maven.compiler.sourcemaven.compiler.target17/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesbuildpluginspluginartifactIdmaven-assembly-plugin/artifactIdconfigurationappendAssemblyIdfalse/appendAssemblyIddescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefsarchivemanifestmainClasssn.itxs.pulsar.function.NativeFunctionDemo/mainClass/manifest/archive/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalassembly/goal/goals/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.10.1/version/plugin/plugins/build /project创建NativeFunctionDemo.java package sn.itxs.pulsar.function;import java.util.function.Function;public class NativeFunctionDemo implements FunctionString, String {Overridepublic String apply(String s) {return String.format(hahaha,native implement %s!, s);} }打包生成pulsar-demo-1.0.jar,上传到安装Pulsar服务器上的这里就放在pulsar根目录下的examples文件夹后续的操作就和前面函数示例一样 创建函数描述文件,vim examples/native-example-function-config.yaml tenant: my-test namespace: my-namespace name: native-example className: sn.itxs.pulsar.function.NativeFunctionDemo inputs: [persistent://my-test/my-namespace/native_src] userConfig:PublishTopic: persistent://my-test/my-namespace/native_resultoutput: persistent://my-test/my-namespace/native_result autoAck: true parallelism: 1创建函数 bin/pulsar-admin functions create \--function-config-file examples/native-example-function-config.yaml \--jar examples/pulsar-demo-1.0.jar消费消息 bin/pulsar-client consume persistent://my-test/my-namespace/native_result -s my-subscription -p Earliest -n 0生产消息 bin/pulsar-client produce persistent://my-test/my-namespace/native_src --messages actual pulsar -n 10查看消费者的输出 Pulsar函数SDK示例 由于依赖Pulsar函数SDK因此JDK需要选择17,在前面的工程添加Pom依赖 propertiespulsar.version2.11.0/pulsar.version/propertiesdependenciesdependencygroupIdorg.apache.pulsar/groupIdartifactIdpulsar-functions-api/artifactIdversion${pulsar.version}/version/dependency/dependencies打包指定sn.itxs.pulsar.function.SdkFunctionDemo 创建SdkFunctionDemo.java package sn.itxs.pulsar.function;import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function;public class SdkFunctionDemo implements FunctionString, String {Overridepublic String process(String input, Context context) {return String.format(hahaha,pulsar sdk implement %s!, input);} }打包生成pulsar-demo-1.0.jar,上传到安装Pulsar服务器上的这里还是覆盖pulsar根目录下的examples文件夹文件其他和前面一样 创建函数描述文件,vim examples/sdk-example-function-config.yaml tenant: my-test namespace: my-namespace name: sdk-example className: sn.itxs.pulsar.function.SdkFunctionDemo inputs: [persistent://my-test/my-namespace/sdk_src] userConfig:PublishTopic: persistent://my-test/my-namespace/sdk_resultoutput: persistent://my-test/my-namespace/sdk_result autoAck: true parallelism: 1创建函数 bin/pulsar-admin functions create \--function-config-file examples/sdk-example-function-config.yaml \--jar examples/pulsar-demo-1.0.jar消费消息 bin/pulsar-client consume persistent://my-test/my-namespace/sdk_result -s my-subscription -p Earliest -n 0生产消息 bin/pulsar-client produce persistent://my-test/my-namespace/sdk_src --messages actual pulsar -n 10查看消费者的输出 本人博客网站IT小神 www.itxiaoshen.com
http://www.dnsts.com.cn/news/123757.html

相关文章:

  • zencart 网站入侵seo搜索优化费用
  • 游戏网站如何做宿迁做网站公司
  • 中航长江建设工程有限公司网站长沙关键词快速排名
  • 做彩票网站是违法的吗海宁网站设计公司
  • 服装定制网站源码腾讯公司
  • 深圳宝安上市公司网站建设报价地图定位网站开发
  • 最新设计网站大全wordpress文章加音频
  • 哪个网站教人做美食百度推广按效果付费是多少钱
  • 公司网站自己创建wordpress替换图片不显示
  • 郑州网站的建设网站排名优化推广
  • 网站设计理念哪个网站可以做分销
  • 网易做相册的网站宜昌市建设监理协会网站
  • 深圳最好的网站开发公司自己做电影网站有没有钱赚
  • 公共场所建设网站东莞代码网站建设
  • 门户网站开发多少钱怎么做电商
  • 天津市网站建设天津市住房和城乡建设部网站
  • 企业咨询端app厦门百度快照优化排名
  • 网站开发从零到跨境电商选品
  • 郴州建设网站的公司做网站谁家好
  • 一级门户网站建设费用dede 中英文网站
  • 建一个网站报价登封网站建设公司
  • 网站开发的开发工具好看的免费的小说网站模板
  • 网站正在建设中 源码下载织梦iis7搭建网站
  • 可以上传自己做的视频的网站wordpress站群管理系统
  • 龙岗网站多少钱创业平台app有哪些
  • 有什么网站可以接单做兼职的自己怎么做外贸英文网站
  • 360网站提交入口地址兴义住房和城乡建设局网站
  • 网站建设教程l0元首充的手游平台
  • 深圳市建设局网站首页优化搜索曝光次数的方法
  • 怎样做网站的源代码马克斯网站建设