jq获取input的输入值(java安装教程详细)

译者简介

何城波,就职于深信服 PaaS 平台部门,从事 Kafka、Pulsar 等消息中间件基础设施建设。

概述

Pulsar Functions[1]是 Apache Pulsar 的无服务器(Serverless)计算框架。默认情况下,一个 Pulsar Function 作为一个单实例运行。如果你期望它作为多个实例运行,你需要在创建时指定 Function 的并行度(即要运行多少个实例)。当你想要去调整运行实例的个数时,需要收集指标来判断是否需要扩缩容并手动调整并行度。但是,在 Kubernetes 中使用 Function Mesh 运行 Pulsar Functions 时,则无需这种手动调整。

Function Mesh[2]允许用户充分发挥 K8s 的应用部署、扩展伸缩和管理的能力,是一个能在 Kubernetes 上原生运行 Pulsar Functions 和 Connectors 的 Kubernetes operator。例如,Function Mesh 基于 Kubernetes 的调度功能,确保 Functions 具有容错能力,并使得 Functions 在任意时候都能被正确调度。

在Kubernetes Horizontal Pod Autoscaler (HPA)[3]的帮助下,Function Mesh 能自动扩缩 Pulsar Functions 的所需的实例个数。对于已经配置了 HPA 的 Functions,HPA 控制器会监控 Function 的 Pod,并在必要的时候添加或者删除 Pod 副本。

有两种方式去自动扩缩 Function Mesh。第一种是使用 Function Mesh 基于 CPU 和内存使用预定义的自动扩缩策略,如果你的场景只需要 CPU 和内存作为 HPA 的指标,我们推荐这种易于实施的方式。具体方法细节请参考文档[4]。

第二种方法是根据 Pulsar Functions 的指标自定义自动扩缩策略。这个方法实现起来更加复杂,但是用户可以根据自己的场景去自定义 HPA(该功能于 2021 年 6 月在Function Mesh 0.1.7 版本[5]首次发布)。预定义的指标[6]能帮助我们了解 Function 运行实例的工作负载和状态。Pulsar Functions 以 Prometheus 的格式将这些指标暴露出来,然后我们又可以通过 Kubernetes 的自定义指标 API (Kubernetes Custom Metrics API) 将这些指标提供给 HPA 以用来支持基于指标的自动扩缩。

这篇博客会通过 1)在 Function 自定义资源(Custom Resource)中定义 maxReplicas 以启用 HPA 控制器与 2)自定义 autoScalingMetrics 以指定指标列表,向你逐步展示如何使用自定义指标去启用 Pulsar Functions 的自动扩缩功能。

准备工作

在开始本教程之前,请先注意以下事项:

??安装 Kubernetes 1.17 ~ 1.21 版本

?? HPA v2beta2 在 Kubernetes 版本 v1.12 中发布。

?? Apache Pulsar 和 Prometheus Metrics Adapter 要求 Kubernetes 版本在 1.14 以上。

?? CustomResourceDefinition 的 API 版本 apiextensions.k8s.io/v1beta1 自 v1.22 起不再支持, – Function Mesh 尚未移到 apiextensions.k8s.io/v1。

??本教程基于Kubernetes v1.20[7]。

??安装 Apache Pulsar 2.8 及以上版本

??为了通过实际的工作负载测试 Function 实例,你需要一个现成的 Apache Pulsar 集群。

??本教程使用StreamNative 提供的 Helm Chart[8]将 Apache Pulsar 部署至 K8s 集群。

??安装Function Mesh 0.1.9[9]版本

??Prometheus(与 Apache Pulsar 和 Function Mesh 部署在一起)

??本教程使用 kube-prometheus 安装集群范围内的 Prometheus,并使用 Prometheus 收集、存储和查询指标。

??安装 Prometheus Metrics Adapter[10]

??本教程使用 Prometheus Metrics Adapter 将 Prometheus 的指标作为自定义指标暴露给 Kubernetes API 服务器,你也可以使用其他提供自定义指标 API 的APIservices[11]。

??你可以在 kube-prometheus 配置中启用 custom-metrics.libsonnet 来使用 kube-prometheus 部署 Prometheus Metrics Adapter。更多细节请参考Customizing Kube-Prometheus[12]。

步骤

在假设你的 Kubernetes 集群还没有部署任何服务的前提下,开始下面的步骤。

1. 安装需要的软件

??下载并安装 Helm3[13]

??安装 Apache Pulsar[14]

??安装 Function Mesh[15]

??安装 kube-prometheus[16]

??安装 Prometheus Metrics Adapter[17]

2. 创建 ServiceMonitor 以从 Pulsar Functions 收集指标

Function Mesh 为每个 Function 都创建一个与之绑定的 Service。Prometheus-operator 的 ServiceMonitor 会监控该 Service,并通过此 Service 去收集 Pulsar Function 的指标。为了创建一个监控 Pulsar Functions 的 ServiceMonitor,创建一个 YAML 文件(如下所示)并通过kubectl apply -f sample-pulsar-functions-service-monitor.yaml将这个文件部署到 Kubernetes 上。

Sample-pulsar-functions-service-monitor.yaml:

apiVersion:monitoring.coreos.com/v1kind:ServiceMonitormetadata:name:function-monitornamespace:defaultspec:endpoints:-path:/metricsport:”metrics”selector:matchLabels:app:function-meshcomponent:functionpodTargetLabels:-component-pulsar-component-pulsar-namespace-pulsar-tenant-pulsar-cluster-name-app

部署这个 ServiceMonitor 到 Kubernetes 之后,你可以通过kubectl get servicemonitor来查看资源。

$kubectlgetservicemonitorNAMEAGEfunction-monitor7s3. 配置 prometheus-metrics-adapter 并且添加 seriesQuery,以将 Pulsar Function 的指标作为自定义指标暴露出去。

Function Mesh 创建一个叫做 pulsar-function 的 function 容器,Pulsar Function 的指标以pulsar_function_开头。我们添加以下配置到适配器配置中以将 Pulsar Function 指标作为自定义指标暴露出去。

-“seriesQuery”:”{__name__=~\”^pulsar_function_.*\”,container=\”pulsar-function\”,namespace!=\”\”,pod!=\”\”}””metricsQuery”:”sum(<<.Series>>{<<.LabelMatchers>>})by(<<.GroupBy>>)””resources”:”template”:”<<.Resource>>”

当你安装 Prometheus Metrics Adapter 之后,会创建一个和适配器部署名称同名的 Prometheus Metrics Adapter 的 ConfigMap。你可以使用kubectl get configmap获取该 ConfigMap 的名称。

$kubectlgetconfigmap-nmonitoringNAMEDATAAGEadapter-config165m

kubectleditconfigmapprometheus-adapter-oyaml

请注意,自定义规则会将 “pulsar-function” 容器中所有以 “pulsarfunction” 开头的 Prometheus 指标映射到 Kubernetes 中的自定义指标。

4. 部署一个 Function

我们使用 Apache Pulsar 的一个示例UserMetricFunction并添加用户定义的指标 LetterCount。

publicclassUserMetricFunctionimplementsFunction<String,Void>{@OverridepublicVoidprocess(Stringinput,Contextcontext){context.recordMetric(“LetterCount”,input.length());returnnull;

}

}

为了部署该 Function 到 Function Mesh 中,我们需要创建以下的 YAML 文件,并使用kubectl apply将其部署到 Kubernetes 集群中。

apiVersion:compute.functionmesh.io/v1alpha1kind:Functionmetadata:labels:pulsar-cluster:pulsarpulsar-component:metrics-hpa-java-fnpulsar-namespace:defaultpulsar-tenant:publicname:metrics-hpa-java-fnnamespace:defaultspec:className:org.apache.pulsar.functions.api.examples.UserMetricFunctioncleanupSubscription:trueclusterName:pulsarforwardSourceMessageProperty:trueimage:streamnative/pulsar-all:2.8.1.29input:sourceSpecs:persistent://public/default/metrics-hpa-java-fn-input:isRegexPattern:falseschemaProperties:{}topics:-persistent://public/default/metrics-hpa-java-fn-inputtypeClassName:java.lang.Stringjava:extraDependenciesDir:/pulsar/instances/depsjar:/pulsar/examples/api-examples.jaroutput:producerConf:maxPendingMessages:0maxPendingMessagesAcrossPartitions:0useThreadLocalProducers:falsetypeClassName:java.lang.Voidpod:labels:pulsar-cluster:pulsarpulsar-component:metrics-hpa-java-fnpulsar-namespace:defaultpulsar-tenant:publicautoScalingMetrics:-type:Podspods:metric:name:pulsar_function_received_total_1minselector:matchLabels:pulsar_cluster:pulsarpulsar_component:metrics-hpa-java-fnpulsar_namespace:defaultpulsar_tenant:publictarget:type:AverageValueaverageValue:”1″pulsar:pulsarConfig:pulsar-function-mesh-configreplicas:1maxReplicas:10resources:limits:cpu:”1″memory:”1181116006″requests:cpu:”1″memory:”1073741824″retainKeyOrdering:false

retainOrdering:false

该 Pulsar Function 实例自动启用了 Prometheus 收集并使用 autoScalingMetrics 下的pulsar_function_received_total_1min作为自定义指标。为了启用自动扩缩,设置 maxReplicas 大于 1。

该 Function 部署之后,你会看到一个 StatefulSet、一个 Service 和一个 HPAv2beta2 实例,它们都带有metrics-hpa-java-fn前缀。

然后 HPA 就能使用 Pulsar Function 的pulsar_function_received_total_1min指标,并且当这个指标平均值大于 1 时扩展 Function。你也可以在 autoScalingMetrics 自定义自己的 HPA 规则。

注意:为了更方便观察自动扩缩,我们将平均值设置为 1。

更多关于 Function Mesh 的 HPA 的内容,请查看 Function Mesh 文档Scaling[18]的部分。

5. 验证指标

在该 Function 准备就绪并且运行之后,Prometheus 就可以开始从其 Pod 中收集各类指标,并且自定义指标 API 应该显示在探索发现中。你可以尝试获取其发现的信息:

$kubectlget–raw/apis/custom.metrics.k8s.io/v1beta1

因为我们已经设置了 Prometheus 收集 Pulsar Functions 的指标,你应该能看到一个pods/pulsar_function_received_total_1min资源,并且你可以使用下面的 kubectl 命令从 Kubernetes API 查询自定义的指标。

$kubectlget–raw/apis/custom.metrics.k8s.io/v1beta1/namespaces/default/pods/*/pulsar_function_received_total_1min|jq–color-output{“kind”:”MetricValueList”,”apiVersion”:”custom.metrics.k8s.io/v1beta1″,”metadata”:{“selfLink”:”/apis/custom.metrics.k8s.io/v1beta1/namespaces/default/pods/*/pulsar_function_received_total_1min”},”items”:[{“describedObject”:{“kind”:”Pod”,”namespace”:”default”,”name”:”metrics-hpa-java-fn-function-0″,”apiVersion”:”/v1″},”metricName”:”pulsar_function_received_total_1min”,”timestamp”:”2022-01-06T01:16:12Z”,”value”:”0″,”selector”:null}]}当你能从自定义指标 API 中通过上述的命令获取指标时,这说明 HPA 已经准备就绪,所以你能观察相关的指标。$kubectlgethpaNAMEREFERENCETARGETSMINPODSMAXPODSREPLICASAGEmetrics-hpa-java-fn-functionFunction/metrics-hpa-java-fn0/1110123h$kubectldescribehpametrics-hpa-java-fn-functionName:metrics-hpa-java-fn-functionNamespace:defaultLabels:app=function-meshcomponent=functionname=metrics-hpa-java-fnnamespace=defaultAnnotations:<none>CreationTimestamp:Wed,05Jan202210:15:07 0800Reference:Function/metrics-hpa-java-fnMetrics:(current/target)”pulsar_function_received_total_1min”onpods:0/1Minreplicas:1Maxreplicas:10Functionpods:1current/1desiredConditions:TypeStatusReasonMessage———————–AbleToScaleTrueReadyForNewScalerecommendedsizematchescurrentsizeScalingActiveTrueValidMetricFoundtheHPAwasabletosuccessfullycalculateareplicacountfrompodsmetricpulsar_function_received_total_1minScalingLimitedTrueTooFewReplicasthedesiredreplicacountislessthantheminimumreplicacount

从 kubectl describe 的输出中,我们能看到 HPA 的状态是AbleToScale和ScalingActive,这说明此 HPA 已经准备可用。6. 对 Function 产生负载

我们可以创建一个生产者,向这个 Function 生产大量消息,以下是一个生产者示例。

publicclassLoadProducer{publicstaticvoidmain(String[]args)throwsPulsarClientException{PulsarClientclient=PulsarClient.builder().serviceUrl(“http://localhost:8080”).build();Producer<byte[]>producer=client.newProducer().topic(“persistent://public/default/metrics-hpa-java-fn-input”).create();for(inti=0;i<1000000;i ){producer.sendAsync(“helloworld”.getBytes(StandardCharsets.UTF_8));}}}

当生产者运行时,我们可以进行下一步验证 HPA 的状态。

7. 观察自动扩缩

随着消息进入到输入主题,我们能看到最少有 2 个或者 3 个新的 Pod 被创建并运行来处理积压的消息。

运行kubectl get pods来验证是否有多个以metrics-hpa-java-fn为名称前缀的 Pod,如图 1 所示。为了深入了解该 HPA,你可以使用kubectl describe hpa获取更多详细的输出,来了解 HPA 添加和删除副本的原因。

jq获取input的输入值(java安装教程详细)

图 1

你可以使用kubectl get hpa来获取 HPA 名称。假设创建的名称是metrics-hpa-java-fn-hpa,你可以使用以下watch命令观察 HPA,如图 2watch -n 1 “kubectl describe hpa metrics-hpa-java-fn-hpa”所示。

图 2

根据图 1 和图 2 可以看出,当指标增加的时候,HPA 能扩展到 10(即最大的副本数),并且在负载生产者完成消息泛洪(message flood)之后,这个 Function 的 pulsar_function_received_total_1min 指标最终会减少到 0,然后 Function 的副本数也会重置为最小值,如图 3 所示。

图 3

未来展望:自动扩缩到/从 0

我们希望尽快带来 Function Mesh 的归零(scale-to-zero)功能。当一个 Function 的输入主题没有消息积压的时候,该功能可以帮助 Function Mesh 将该 Function 缩小到 0 副本以降低成本。然而,在当前的 Kubernetes 稳定版本(v1.19)中,HPA 默认不支持将副本缩小至 0,你只能在启用Kubernetes Feature Gates HPAScaleToZero[19]后将其作为一个 alpha 功能。Kubernetes 社区正在积极开发稳定版本的归零功能(参见issue #2021[20]和PR #2022)[21],希望很快能看到这个功能的进展与实现。

与此同时,我们将可能借助第三方工具(比如KEDA[22]),通过实现一种空闲机制(比如 OpenShift 中的service-idler[23])来探索如何给 Function Mesh 带来归零功能,并同时尽量减少启用这个功能所带来的资源消耗。

参考资料

??Function Mesh[24]

??Introducing Function Mesh -Simplify Complex Streaming Jobs in Cloud[25]

??Kubernetes -Horizontal Pod Autoscaling[26]

??Kubernetes -Metrics API[27]

??Prometheus Operator[28]

??Prometheus Adapter end-to-end walkthrough[29]

??Prometheus Adapter configuration walkthrough[30]

引用链接

[1]Pulsar Functions:https://pulsar.apache.org/docs/en/functions-overview/[2]Function Mesh:https://streamnative.io/blog/release/2021-05-03-function-mesh-open-source[3]Kubernetes Horizontal Pod Autoscaler (HPA):https://cloud.google.com/kubernetes-engine/docs/concepts/horizontalpodautoscaler[4]文档:https://functionmesh.io/docs/next/releases/release-note-0-1-7/#function-mesh-provides-multiple-options-for-auto-scaling-the-number-of-pods[5]Function Mesh 0.1.7 版本:https://functionmesh.io/docs/0.1.7/[6]预定义的指标:https://pulsar.apache.org/docs/en/reference-metrics/#pulsar-functions[7]Kubernetes v1.20:https://kubernetes.io/blog/2020/12/08/kubernetes-1-20-release-announcement/[8]StreamNative 提供的 Helm Chart:https://github.com/streamnative/charts[9]Function Mesh 0.1.9:https://functionmesh.io/[10]安装 Prometheus Metrics Adapter:https://github.com/kubernetes-sigs/prometheus-adapter[11]APIservices:https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/apiserver-aggregation/[12]Customizing Kube-Prometheus:https://github.com/prometheus-operator/kube-prometheus#customizing-kube-prometheus[13]下载并安装 Helm3:https://helm.sh/docs/intro/install/[14]安装 Apache Pulsar:https://github.com/streamnative/charts/blob/master/charts/pulsar/README.md[15]安装 Function Mesh:https://functionmesh.io/docs/next/install-function-mesh#install-function-mesh-through-helm[16]安装 kube-prometheus:https://prometheus-operator.dev/[17]安装 Prometheus Metrics Adapter:https://github.com/kubernetes-sigs/prometheus-adapter[18]Scaling:https://functionmesh.io/docs/scaling[19]Kubernetes Feature Gates HPAScaleToZero:https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/[20]issue #2021:https://github.com/kubernetes/enhancements/issues/2021[21]PR #2022):https://github.com/kubernetes/enhancements/pull/2022[22]KEDA:https://keda.sh/[23]service-idler:https://github.com/openshift/service-idler[24]Function Mesh:https://functionmesh.io/[25]Simplify Complex Streaming Jobs in Cloud:https://streamnative.io/blog/release/2021-05-03-function-mesh-open-source/[26]Horizontal Pod Autoscaling:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/[27]Metrics API:https://github.com/kubernetes/metrics[28]Prometheus Operator:https://prometheus-operator.dev/[29]Prometheus Adapter end-to-end walkthrough:https://github.com/kubernetes-sigs/prometheus-adapter/blob/master/docs/walkthrough.md[30]Prometheus Adapter configuration walkthrough:https://github.com/kubernetes-sigs/prometheus-adapter/blob/master/docs/config-walkthrough.md

????加入ApachePulsar中文交流群????

发表评论

登录后才能评论