事件总线
概述
Beaver IoT 平台提供了事件总线,用于在系统内部各个服务、集成之间进行通信,开发者可以通过订阅事件来实现自己的业务逻辑。平台事件总线支持基于Key表达式的事件订阅,以支持更细粒度的订阅。 当前平台支持的事件类型有:设备事件、实体事件、实体值数据交换事件(Exchange),开发者也可以自定义事件类型。
事件定义
Event 事件
Beaver IoT的某些资源发生变化时,会触发相关的事件Event
,订阅这些事件的处理方法就会被触发,这是整个系统运行的基础。
事件分为: 设备事件DeviceEvent
/ 实体事件EntityEvent
/ 实体值数据交换事件ExchangeEvent
DeviceEvent
表示设备元数据相关事件,事件的类型(DeviceEvent.EventType
)有:创建CREATED
/ 更新UPDATED
/ 删除DELETED
。这个事件会携带发生变化的设备。这个事件在保存设备元数据时会自动触发,一般不需要集成开发者发送事件。
DeviceEvent对应的Payload为Device
对象。
EntityEvent
表示实体元数据相关事件,事件的类型(EntityEvent.EventType
)有:创建CREATED
/ 更新UPDATED
/ 删除DELETED
。这个事件会携带发生变化的实体。这个事件在保存实体元数据时会自动触发,一般不需要集成开发者发送事件。
EntityEvent对应的Payload为Entity
对象。
ExchangeEvent
表示实体值数据相关事件,事件的类型(ExchangeEvent.EventType
)有:下行DOWN
/ 上行UP
。
ExchangeEvent的Payload为ExchangePayload
对象,如果Payload是采用注解方式定义的实体,那么这个实体类需要继承自ExchangePayload
。
Beaver IoT 平台提供了事件总线,开发者也可以自定义事件类型,只需实现自定义的Event事件和Payload即可,其中Event事件需要实现Event
接口,Payload需要实现IdentityKey
接口。
Event Payload
ExchangePayload
ExchangePayload是ExchangeEvent事件的载荷,用于携带实体上下行的数据,ExchangePayload在Beaver IoT平台中广泛应用。ExchangePayload是一个Map结构,其中包含了Exchange事件的Key-Value数据,Key是一个字符串,Value是一个Object对象。另外包含了Context上下文对象,用于携带在Exchange过程中额外需要的参数信息。
ExchangePayload的构建
//方式1: 构建单一值的ExchangePayload
ExchangePayload payload = ExchangePayload.create(key, value);
//方式2: 构建多值的ExchangePayload(传递Map)
ExchangePayload payload = ExchangePayload.create(Map.of("key1", value1));
//方式3: 通过已有的ExchangePayload,来构建ExchangePayload(将同时拷贝ExchangePayload值及Context上下文)
ExchangePayload payload = ExchangePayload.createFrom(exchangePayload);
//方式4: 构建空的ExchangePayload
ExchangePayload payload = ExchangePayload.empty();
ExchangePayload常用方法
- Payload相关的方法
// 根据指定的Key获取Value
Object getPayload(String key);
// 获取所有的Payload
Map<String, Object> getAllPayloads();
// 根据指定的EntityType获取Payload
Map<String, Object> getPayloadsByEntityType(EntityType entityType);
// 获取对应的实体
Map<String, Entity> getExchangeEntities();
- Context上下文相关的方法
// 获取Context上下文
Map<String, Object> getContext();
// 设置Context上下文
void setContext(Map<String, Object> context);
// 根据指定的Key获取Context上下文
Object getContext(String key);
// 根据指定的Key获取Context上下文,如果不存在则返回默认值
<T> T getContext(String key, T defaultValue);
// 设置Context上下文
void putContext(String key, Object value);
继承ExchangePayload实现自定义的实体注解对象
在上面章节中,我们已经介绍了如何基于注解构 建实体, 在实体定义中,我们可以通过继承ExchangePayload实现自定义的实体注解对象,实体注解对象一方面可以用来接收ExchangePayload数据,另一方面也可以采用ExchangePayload.createProxy(...)来创建一个实体注解的代理对象,这样我们就可以直接操作实体对象的属性来构建ExchangePayload对象了。
- 接收ExchangePayload数据示例
@EventSubscribe(payloadKeyExpression = "my-integration.integration.*", eventType = ExchangeEvent.EventType.DOWN)
public void onAddDevice(Event<MyIntegrationEntities> event) {
MyIntegrationEntities myIntegrationEntities = event.getPayload();
// 同样可以获取子Entity对象
String ip = myIntegrationEntities.getAddDevice().getIp();
...
}
- 构建ExchangePayload对象示例
@EventSubscribe(payloadKeyExpression = "my-integration.integration.*", eventType = ExchangeEvent.EventType.DOWN)
public void onAddDevice(Event<MyIntegrationEntities> event) {
// mark benchmark done
MyIntegrationEntities myIntegrationEntities = ExchangePayload.createProxy(MyIntegrationEntities.class);
myIntegrationEntities.setDetectStatus(MyIntegrationEntities.DetectStatus.STANDBY.ordinal());
myIntegrationEntities.setDetectReport(null);
// 同样适用于子Entity对象
MyIntegrationEntities.DetectReport detectReport = myIntegrationEntities.getDetectReport();
detectReport.setConsumedTime(endTimestamp - startTimestamp);
detectReport.setOnlineCount(activeCount.get());
detectReport.setOfflineCount(inactiveCount.get());
exchangeFlowExecutor.syncExchangeUp(donePayload);
...
}
构建ExchangePayload对象时,需要采用ExchangePayload.createProxy(MyIntegrationEntities.class)方式进行构建,此处返回的是一个代理对象,代理对象的属性值会被映射到ExchangePayload对象中。
事件发布
Beaver IoT平台提供了ExchangePayload事件发布的流程执行器,开发者可以通过调用ExchangeFlowExecutor
服务实现ExchangeEvent事件的发布。可支持同步和异步方式发布,另外支持指定EventType(即上行或下行)。
事件发布流程
ExchangeEvent
事件我们会触发通用内置流程,流程包含实体数据校验、实体当前值保存(仅属性实体)、实体历史值保存,然后才触发相关订阅的方法。
无论是同步请求还是异步请求,相关监听器都会被调用。
同步请求的情况下,某些监听器的执行抛出了异常,会先捕捉并收集,然后在所有同步监听器执行完毕后抛出一个异常。如果没有抛出异常,则返回所有同步监听器的执行结果。
代码示例
- 同步
- 异步
@Service
public class ExchangeDemoService {
@Autowired
private ExchangeFlowExecutor exchangeFlowExecutor;
public EventResponse exchangeUp(ExchangePayload payload){
//上行
return exchangeFlowExecutor.syncExchangeUp(payload);
}
public EventResponse exchangeDown(ExchangePayload payload){
//下行
return exchangeFlowExecutor.syncExchangeDown(payload);
}
}
@Service
public class ExchangeDemoService {
@Autowired
private ExchangeFlowExecutor exchangeFlowExecutor;
public void exchangeUp(ExchangePayload payload){
//上行
exchangeFlowExecutor.asyncExchangeUp(payload);
}
public void exchangeDown(ExchangePayload payload){
//下行
exchangeFlowExecutor.asyncExchangeDown(payload);
}
}
Beaver IoT 平台同步调用支持返回响应,同时支持多个监听器的执行,返回EventResponse
响应对象。
事件订阅
平台提供了@EventSubscribe注解,用于订阅事件,当前支持的事件类型有:设备事件、实体事件、实体值数据交换事件(Exchange)。同时支持Key通配符表达式匹配。
注解说明
- @EventSubscribe注解
- eventType:事件类型,参见事件定义章节的说明
- payloadKeyExpression: Key表达式,用于匹配事件的Payload对象,支持*通配符,例如:
my-integration.*
:匹配所有以my-integration.
开头的Key
事件订阅
订阅ExchangeEvent事件
@Service
public class MyDeviceService {
@EventSubscribe(payloadKeyExpression = "my-integration.integration.add_device.*", eventType = ExchangeEvent.EventType.DOWN)
public void onAddDevice(Event<MyIntegrationEntities.AddDevice> event) {
MyIntegrationEntities.AddDevice addDevice = event.getPayload(); //可使用实体注解对象作为入参接收ExchangePayload请求
String ip = addDevice.getIp();
// ...
}
@EventSubscribe(payloadKeyExpression = "my-integration.integration.delete_device", eventType = ExchangeEvent.EventType.DOWN)
public EventResponse onDeleteDevice(ExchangeEvent event) {
Device device = (Device) event.getPayload().getContext("device");
deviceServiceProvider.deleteById(device.getId());
//同步返回响应状态
return EventResponse.of("connectResult", device.getId());
}
}
- 可使用实体注解对象作为入参接收ExchangePayload请求,如:MyIntegrationEntities.AddDevice
- @EventSubscribe注解订阅事件,其中eventType为事件类型,非必填,当为空这表示订阅所有事件类型。
- Beaver IoT 平台异步订阅时将以异步方式执行,同步订阅时将以同步方式执行。开 发者可以自行添加@Async注解异步执行,实现业务异步线程池隔离。
订阅DeviceEvent事件
@Service
public class MyDeviceService {
@EventSubscribe(payloadKeyExpression = "my-integration.device.*", eventType = DeviceEvent.EventType.CREATED)
public void onSaveDevice(DeviceEvent event) {
...
}
}
Beaver IoT 平台开放设备添加、删除、修改的事件,开发者可以通过订阅这些事件来实现自己的业务逻辑。通常情况下开发者不需要关注这些事件。
订阅EntityEvent事件
@Service
public class MyEntityService {
@EventSubscribe(payloadKeyExpression = "my-integration.device.*", eventType = DeviceEvent.EventType.CREATED)
public void onSaveEntity(EntityEvent event) {
...
}
}
Beaver IoT 平台开放实体添加、删除、修改的事件,开发者可以通过订阅这些事件来实现自己的业务逻辑。通常情况下开发者不需要关注这些事件。