Pub/Sub 是一个分布式系统中的常见模式,它有许多服务用于解偶、异步消息传递。 使用Pub/Sub,您可以在事件消费者与事件生产者解偶的场景中启用。
Dapr 提供了一个可扩展的 Pub/Sub 系统(保证消息至少传递一次),允许开发者发布和订阅主题。 Dapr 为 Pub/Sub 提供组件,使操作者能够使用他们所喜欢的基础设施,例如 Redis Streams 和 Kafka 等。
当发布消息时,必须指定所发送数据的内容类型。 除非指定, Dapr 将假定类型为 text/plain
。 当使用 Dapr 的 HTTP API时,内容类型可以设置在 Content-Type
头中。 gRPC 客户端和 SDK 有一个专用的内容类型参数。
然后发布一条消息给 deathStarStatus
主题:
第一步是设置 Pub/Sub 组件:
运行 dapr init
时默认在本地机器上安装 Redis 流。
在 Linux/MacOS 上打开 ~/.dapr/components/pubsub.yam
或在 Windows 上打开%UserProfile%\.dapr\components\pubsub.yaml
组件文件以验证:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
You can override this file with another Redis instance or another pubsub component by creating a components
directory containing the file and using the flag --components-path
with the dapr run
CLI command.
To deploy this into a Kubernetes cluster, fill in the metadata
connection details of your desired pubsub component in the yaml below, save as pubsub.yaml
, and run kubectl apply -f pubsub.yaml
.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
Dapr 允许两种方法订阅主题:
您可以使用以下自定义资源定义 (CRD) 订阅主题。 创建名为 subscription.yaml
的文件并粘贴以下内容:
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
route: /dsstatus
pubsubname: pubsub
scopes:
- app1
- app2
上面的示例显示了 deathStarStatus
主题的事件订阅,对于pubsub 组件 pubsub
。
route
告诉 Dapr 将所有主题消息发送到应用程序中的 /dsstatus
端点。scopes
为 app1
和 app2
启用订阅。设置组件:
将 CRD 放在 ./components
目录中。 当 Dapr 启动时,它将加载组件和订阅。
注意:默认情况下,在 MacOS/Linux 上从 $HOME/.dapr/components
加载组件,以及 %USERPROFILE%\.dapr\components
在Windows上。
还可以通过将 Dapr CLI 指向组件路径来覆盖默认目录:
dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py
注意:如果你将订阅置于自定义组件路径中,请确保Pub/Sub 组件也存在。
在 Kubernetes 中,将 CRD 保存到文件中并将其应用于群集:
kubectl apply -f subscription.yaml
创建名为 app1.py
的文件,并粘贴以下内容:
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
创建名为" app1.py
的文件,并粘贴如下内容:
pip install flask
pip install flask_cors
创建 app1.py
后,确保 flask 和 flask_cors 已经安装了:
dapr --app-id app1 --app-port 5000 run python app1.py
设置上述订阅后,将此 javascript(Node > 4.16)下载到 app2.js
文件中:
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.post('/dsstatus', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
设置上述订阅后,将此 javascript(Node > 4.16)下载到 app2.js
文件中:
dapr --app-id app2 --app-port 3000 run node app2.js
创建名为 app1.py
的文件,并粘贴以下内容:
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->post('/dsstatus', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
在创建 app1.php
并安装 SDK后, 继续启动应用程序:
dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php
若要订阅主题,请使用您选择的编程语言启动 Web 服务器,并监听以下 GET
终结点: /dapr/subscribe
。 Dapr 实例将在启动时调用到您的应用,并期望对的订阅主题响应 JOSN:
pubsubname
: Dapr 用到的 pub/sub 组件topic
: 订阅的主题route
:当消息来到该主题时,Dapr 需要调用哪个终结点import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'deathStarStatus',
'route': 'dsstatus'}]
return jsonify(subscriptions)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
创建 app1.py
后,确保 flask 和 flask_cors 已经安装了:
pip install flask
pip install flask_cors
然后运行:
dapr --app-id app1 --app-port 5000 run python app1.py
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "deathStarStatus",
route: "dsstatus"
}
]);
})
app.post('/dsstatus', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
运行此应用:
dapr --app-id app2 --app-port 3000 run node app2.js
更新 app1.php
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus'),
]]));
$app->post('/dsstatus', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
运行此应用:
dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php
/dsstatus
终结点与订阅中定义的 route
相匹配,这是 Dapr 将所有主题消息发送至的位置。
要发布主题,您需要运行一个 Dapr sidecar 的实例才能使用 Pub/Sub Redis 组件。 您可以使用安装在您本地环境中的默认的Redis组件。
用名为 testpubsub
的 app-id 启动一个 Dapr 实例:
dapr run --app-id testpubsub --dapr-http-port 3500
然后发布一条消息给 deathStarStatus
主题:
dapr publish --publish-app-id testpubsub --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'
然后发布一条消息给 deathStarStatus
主题:
curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'
然后发布一条消息给 deathStarStatus
主题:
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"status": "completed"}' -Uri 'http://localhost:3500/v1.0/publish/pubsub/deathStarStatus'
Dapr 将在符合 Cloud Events v1.0 的信封中自动包装用户有效负载,对 datacontenttype
属性使用 Content-Type
头值。
为了告诉Dapr 消息处理成功,返回一个 200 OK
响应。 如果 Dapr 收到超过 200
的返回状态代码,或者你的应用崩溃,Dapr 将根据 At-Least-Once 语义尝试重新传递消息。
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.post('/dsstatus', (req, res) => {
res.sendStatus(200);
});
如果您喜欢使用代码发布一个主题,下面就是一个例子。
const express = require('express');
const path = require('path');
const request = require('request');
const bodyParser = require('body-parser');
const app = express();
app.use(bodyParser.json());
const daprPort = process.env.DAPR_HTTP_PORT || 3500;
const daprUrl = `http://localhost:${daprPort}/v1.0`;
const port = 8080;
const pubsubName = 'pubsub';
app.post('/publish', (req, res) => {
console.log("Publishing: ", req.body);
const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
request( { uri: publishUrl, method: 'POST', json: req.body } );
res.sendStatus(200);
});
app.listen(process.env.PORT || port, () => console.log(`Listening on port ${port}!`));
如果您喜欢使用代码发布一个主题,下面就是一个例子。
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory, \Psr\Log\LoggerInterface $logger) {
$publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
$publisher->topic('deathStarStatus')->publish('operational');
$logger->alert('published!');
});
您可以将此保存到 app2.php
当 app1
正在另一个终端中运行时,执行:
dapr --app-id app2 run -- php app2.php
Dapr 自动接收发布请求上发送的数据,并将其包装在CloudEvent 1.0 信封中。 如果您想使用自己自定义的 CloudEvent,请确保指定内容类型为 application/ cloudevents+json
。
Read about content types here, and about the Cloud Events message format.