Context 上下文
1、Content
语法
Context 基本语法
create context context_name partition [by] event_property [and event_property [and ...]] from stream_def [, event_property [...] from stream_def] [, ...]
context_name:上下文名字,唯一。event_property:事件的属性名,多个属性名之间用and或逗号连接。stream_def:事件流的定义。
举例
create context NewUser partition by id and name from User -- id 和 name 是 User 的属性
如果 context 包含多个流,例子如下:
-- sid 是 Student 的属性,tid 是 Teacher 的属性 create context Person partition by sid from Student, tid from Teacher
注意: 多个流中每个流的中用于 context 的属性的数量要一样,数据类型也要一致。比如下面 错误示例:
-- 错误:sid是int,tname是String,数据类型不一致 create context Person partition by sid from Student, tname from Teacher -- 错误:Student 有一个属性,Teacher 有两个属性,属性数量不一致 create context Person partition by sid from Student, tid,tname from Teacher -- 错误:sid对应tname,sname对应tid,并且sname和tname是String,sid和tid是int,属性数量一样,但是对应的数据类型不一致 create context Person partition by sid,sname from Student, tname,tid from Teacher
过滤
实际上可以对进入 context 的事件增加过滤条件,不符合条件的就被过滤掉,就像下面这样:
-- age 大于 20 的 Student 事件才能建立或者进入 context create context Person partition by sid from Student(age > 20)
partition by 后面的属性,就是作为 context 的一个约束,比如说 id,如果 id 相等的则进入同一个 context 里,如果 id 不同,那就新建一个 context。好比根据 id 分组,id 相同的会被分到一个组里,不同的会新建一个组并等待相同的进入。
如果 parition by 后面跟着同一个流的两个属性,那么必须两个属性值一样才能进入 context。比如说 A事件 id=1,name=a,那么会以 1和a 两个值建立 context,有点像数据库里的联合主键。然后 B事件id=1,name=b,则又会新建一个context。接着 C事件 id=1,name=a,那么会进入 A事件建立的context。
如果 partition by 后面跟着两个流的一个属性,那么两个属性值一样才能进入 context。比如说 Student 事件 sid=1,那么会新建一个 context,然后来了个 Teacher 事件 tid=1,则会进入 sid=1 的那个context。多个流也一样,不用关心是什么事件,只用关心事件的属性值一样即可进入同一个context。
2、Built-In Context Properties
Context 本身自带一些属性,最关键的是可以查看所创建的 context 的标识,并帮助我们理解context的语法。
如上所示,name 表示 上下文名称,这个是不会变的。id 是每个上下文的唯一标识,从 0 开始。key1 和 keyN 表示上下文定义时所选择的属性的值,1 和 N 表示属性的位置。例如:
-- key1为sid,key2为sname EPL: create context Person partition by sid, sname from Student
完整示例
为了说明对这几个属性的应用,举一个比较完整的例子。
1、定义事件模型
@Data
@AllArgsConstructor
public class Apple {
private int id;
private int price;
}2、测试
public static void main(String[] args) {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
EPRuntime runtime = epService.getEPRuntime();
// 指定事件模型
String apple = Apple.class.getName();
// 创建 context
String epl1 = "create context appleTest partition by id,price from " + apple;
// context.id 针对不同的 apple 的 id,price 建立一个 context
// 如果事件的id和price相同,则 context.id 也相同,即表明事件进入了同一个context
String epl2 = "context appleTest select context.id,context.name,context.key1,context.key2 from " + apple;
// 添加事后处理
admin.createEPL(epl1);
EPStatement state = admin.createEPL(epl2);
// 注册监听
state.addListener((newEvents, oldEvents) -> {
if (newEvents != null) {
EventBean event = newEvents[0];
System.out.println("context.name:" + event.get("name") + ",context.id:" + event.get("id") + ",context.key1:" + event.get("key1") + ",context.key2:" + event.get("key2"));
}
});
// 模拟事件发生
System.out.println("sendEvent: id=1, price=20");
runtime.sendEvent(new Apple(1, 20));
System.out.println("sendEvent: id=2, price=30");
runtime.sendEvent(new Apple(2, 30));
System.out.println("sendEvent: id=1, price=20");
runtime.sendEvent(new Apple(1, 20));
System.out.println("sendEvent: id=4, price=20");
runtime.sendEvent(new Apple(4, 20));
}执行结果:
sendEvent: id=1, price=20 context.name:appleTest,context.id:0,context.key1:1,context.key2:20 sendEvent: id=2, price=30 context.name:appleTest,context.id:1,context.key1:2,context.key2:30 sendEvent: id=1, price=20 context.name:appleTest,context.id:0,context.key1:1,context.key2:20 sendEvent: id=4, price=20 context.name:appleTest,context.id:2,context.key1:4,context.key2:20
这个例子说得比较明白,针对不同的 id和price,都会新建一个 context,并 context.id 会从 0 开始增加作为其标识。如果 id和price 一样,事件就会进入之前已经存在的 context,所以 e3事件就和 e1事件一样存在于 context.id=0 的 context 里面。
其他示例
-- context 定义 create context test partition by id from Apple -- 每当 5 个 id 相同的 Apple 事件进入时,统计 price 总和 context test select sum(price) from Apple.win:length_batch(5) -- 根据不同的id,统计 3 秒内进入的事件的平均 price,且 price 必须大于10 context test select avg(price) from Apple(price>10).win:time(3 sec)
关于 .win:length 或者 .win:time,可以理解为一堆事件,后面会介绍。
3、Hash Context
前面介绍的Context语法是以事件属性来定义的,Esper 提供了以 Hash 值为标准定义 Context,通俗一点说就是提供事件属性参与 hash 值的计算,计算的值再对某个值是同余的则进入到同一个 context 中。
语法
create context context_name coalesce [by] hash_func_name(hash_func_param) from stream_def [, hash_func_name(hash_func_param) from stream_def ] [, ...] granularity granularity_value [preallocate] -- create context 上下文名称 coalesce [by] hash函数名称(hash函数参数) from 事件类型 [, hash函数名称(hash函数参数) from 事件类型 ] [, ...] granularity granularity_value [preallocate]
hash_func_name为 hash 函数的名称,Esper提供了CRC32或者使用Java的hashcode函数来计算hash值,分别为consistent_hash_crc32和hash_code。你也可以自己定义hash函数,不过这需要配置。hash_func_param为参与计算的属性列表,比如之前的sid或者tname属性。stream_def就是事件类型,可以是多个。不同于前面的Context语法要求,Hash Context不管有多个少属性作为基础来计算hash值,hash值都只有一个,并且为int型。所以就不用关心这些属性的个数以及数据类型了。granularity是必选参数,表示为最多能创建多少个context。granularity_value就是那个用于取余的“某个值”,因为Esper为了防止内存溢出,就想出了取余这种办法来限制context创建的数量。也就是说context.id=hash函数名称(hash函数参数) % granularity_value。preallocate是一个可选参数,如果使用它,那么Esper会预分配空间来创建granularity_value数量的context。比如说granularity_value 为 1024,那么Esper会预创建1024个context。内存不大的话不建议使用这个参数。
过滤条件
Hash Context 同样可以过滤事件,举个完整的例子:
-- 以 Java 的 hashcode 方法 计算 sid 的值(sid必须大于5) -- 以 CRC32 算法计算 tid 的值,然后对 10 取余后的值来建立 context create context HashPerson coalesce by hash_code(sid) from Student(sid>5), consistent_hash_crc32(tid) from Teacher granularity 10
Hash Context 也有 Built-In Context Properties,只不过只有context.id和context.name了。用法和前面说的一样。
小贴士:
如果用于
hash计算的属性比较多,那么就不建议使用CRC32算法了,因为他会把这些属性值先序列化字节数组以后才能计算hash值。hashcode方法相对它能快很多。如果使用
preallocate参数,建议granularity_value不要超过1000。如果
granularity_value超过65536,引擎查找context会比较费劲,进而影响计算速度。
4、Category Context
Category Context 相对之前的 Context 和 Hash Context 要简单许多,语法说明如下
语法
create context context_name group [by] group_expression as category_label [, group [by] group_expression as category_label] [, ...] from stream_def
group_expression表示分组策略的表达式。category_label为策略定义一个名字。
一个 context 可以有多个策略同时存在,但是特殊的是之能有一个 stream_def。
例如:
create context CategoryByTemp group temp < 5 as cold, group temp between 5 and 85 as normal, group temp > 85 as large from Temperature
Category Context 也有它自带的属性。
label 指明进入的事件所处的 group 是什么。
完整示例
完整例子如下:
1、定义事件模型
@Data
@AllArgsConstructor
public class Apple {
private int id;
private int price;
}2、测试
public static void main(String[] args) {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
EPRuntime runtime = epService.getEPRuntime();
String apple = Apple.class.getName();
String epl1 = "create context appleTest group by id<0 as low, group by id>0 and id<10 as middle, group by id>10 as high from " + apple;
String epl2 = "context appleTest select context.id, context.name, context.label, price from " + apple;
admin.createEPL(epl1);
EPStatement state = admin.createEPL(epl2);
// 注册监听
state.addListener((newEvents, oldEvents) -> {
if (newEvents != null) {
EventBean event = newEvents[0];
System.out.println("context.name:" + event.get("name") + ",context.id:" + event.get("id") + ",context.label:" + event.get("label"));
}
});
System.out.println("sendEvent: id=1, price=20");
runtime.sendEvent(new Apple(1, 20));
System.out.println("sendEvent: id=0, price=30");
runtime.sendEvent(new Apple(0, 30));
System.out.println("sendEvent: id=11, price=20");
runtime.sendEvent(new Apple(11, 20));
System.out.println("sendEvent: id=-1, price=40");
runtime.sendEvent(new Apple(-1, 40));
}执行结果
sendEvent: id=1, price=20 context.name:appleTest,context.id:1,context.label:middle sendEvent: id=0, price=30 sendEvent: id=11, price=20 context.name:appleTest,context.id:2,context.label:high sendEvent: id=-1, price=40 context.name:appleTest,context.id:0,context.label:low
可以发现,id=0 的事件,并没有触发监听器,那是因为 context 里的三个 category 没有包含 id=0 的情况,所以这个事件就被排除掉了。
5、Non-Overlapping Context
这类Context有个特点,是由开始和结束两个条件构成context。语法如下:
语法
create context context_name start start_condition end end_condition
这个 context 有两个条件做限制,形成一个约束范围。当开始条件和结束条件都没被触发时,引擎会观察事件的进入是否会触发开始条件。如果开始条件被触发了,那么就新建一个 context,并且观察结束条件是否被触发。如果结束条件被触发,那么 context 结束,引擎继续观察开始条件何时被触发。所以说这类 Context 的另一个特点是,要么 context 存在并且只有一个,要么条件都没被触发,也就一个 context 都没有了。
start_condition 和 end_condition 可以是时间,或者是事件类型。比如说:
-- 9点到17点此context才可用(以引擎的时间为准)。如果事件进入的时间不在此范围内,则不受该context影响 create context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *)
完整示例
以某类事件开始,以某类事件结束。
1、定义事件模型
/**
* 开始事件
*/
public class StartEvent {
}
/**
* 结束事件
*/
public class EndEvent {
}
/**
* 其他事件
*/
@Getter
@AllArgsConstructor
public class OtherEvent {
private int id;
}2、测试类
public static void main(String[] args) {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
EPRuntime runtime = epService.getEPRuntime();
String start = StartEvent.class.getName();
String end = EndEvent.class.getName();
String other = OtherEvent.class.getName();
// 以StartEvent事件作为开始条件,EndEvent事件作为结束条件
String epl1 = "create context NoOverLapping start " + start + " end " + end;
String epl2 = "context NoOverLapping select * from " + other;
admin.createEPL(epl1);
EPStatement state = admin.createEPL(epl2);
// 注册监听
state.addListener((newEvents, oldEvents) -> {
EventBean event = newEvents[0];
System.out.println("Class:" + event.getUnderlying().getClass().getName() + ", id:" + event.get("id"));
});
System.out.println("sendEvent: StartEvent");
runtime.sendEvent(new StartEvent());
System.out.println("sendEvent: OtherEvent");
runtime.sendEvent(new OtherEvent(2));
System.out.println("sendEvent: EndEvent");
runtime.sendEvent(new EndEvent());
System.out.println("sendEvent: OtherEvent");
runtime.sendEvent(new OtherEvent(4));
}执行结果
sendEvent: StartEvent sendEvent: OtherEvent Class:com.esper.non_overlapping_context.OtherEvent, id:2 sendEvent: EndEvent sendEvent: OtherEvent
由此看出,在 NoOverLapping Context 下监控 OtherEvent,必须是在 StartEvent 被触发才能监控到,所以在 EndEvent 发送后,再发送一个 OtherEvent 是不会触发 Listener 的。
6、OverLapping
OverLapping 和 NoOverLapping 一样都有两个条件限制,但是区别在于 OverLapping 的初始条件可以被触发多次,并且只要被触发就会新建一个 context,但是当终结条件被触发时,之前建立的所有 context 都会被销毁。
语法
create context context_name initiated [by] initiating_condition terminated [by] terminating_condition
initiating_condition 和 terminating_condition 可以为事件类型,事件或者别的条件表达式。
完整示例
1、定义事件模型
/**
* 初始事件
*/
public class InitialEvent {
}
/**
* 终止事件
*/
public class TerminateEvent {
}
/**
* 其他事件
*/
@Getter
@AllArgsConstructor
public class SomeEvent {
private int id;
}2、测试类
public static void main(String[] args) {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
EPRuntime runtime = epService.getEPRuntime();
String initial = InitialEvent.class.getName();
String terminate = TerminateEvent.class.getName();
String some = SomeEvent.class.getName();
// 以 InitialEvent 事件作为初始事件,TerminateEvent 事件作为终结事件
String epl1 = "create context OverLapping initiated " + initial + " terminated " + terminate;
String epl2 = "context OverLapping select context.id from " + initial;
String epl3 = "context OverLapping select * from " + some;
admin.createEPL(epl1);
EPStatement state = admin.createEPL(epl2);
EPStatement state1 = admin.createEPL(epl3);
// 注册监听
state.addListener((newEvents, oldEvents) -> {
if (newEvents != null) {
EventBean event = newEvents[0];
System.out.println("context.id:" + event.get("id") + ", id:" + event.get("id"));
}
});
state1.addListener((newEvents, oldEvents) -> {
if (newEvents != null) {
EventBean event = newEvents[0];
System.out.println("Class:" + event.getUnderlying().getClass().getName() + ", id:" + event.get("id"));
}
});
System.out.println("sendEvent: InitialEvent");
runtime.sendEvent(new InitialEvent());
System.out.println("sendEvent: SomeEvent");
runtime.sendEvent(new SomeEvent(2));
System.out.println("sendEvent: InitialEvent");
runtime.sendEvent(new InitialEvent());
System.out.println("sendEvent: TerminateEvent");
runtime.sendEvent(new TerminateEvent());
System.out.println("sendEvent: SomeEvent");
runtime.sendEvent(new SomeEvent(4));
}执行结果
sendEvent: InitialEvent context.id:0, id:0 sendEvent: SomeEvent Class:com.esper.context_sample.overlapping.SomeEvent, id:2 sendEvent: InitialEvent context.id:1, id:1 sendEvent: TerminateEvent sendEvent: SomeEvent
从结果可以看得出来,每发送一个 InitialEvent,都会新建一个 context,以至于 context.id=0和1。并且当发送 TerminateEvent 后,再发送 SomeEvent 监听器也不会被触发了。
另外,context.id 是每一种 Context 都会有的自带属性,而且针对 OverLapping,还增加了 startTime 和 endTime 两种属性,表明 context 的开始时间和结束时间。
7、Context Condition
Context Condition 主要包含 Filter,Pattern,Crontab 以及 Time Period
Filter是对属性值的过滤,比如:create context NewUser partition by id from User(id > 10)Pattern是复杂事件流的代表,比如:”A事件到达后跟着B事件到达” 这是一个完整的Pattern。Crontab是定时任务,主要用于NoOverLapping,就像前面提到的(0, 9, *, *, *),括号里的五项代表(分,时,天,月,年)。Time Period在这里只有一种表达式,就是after time_period_expression时间周期,举例如下。
-- 以 0秒 为时间初始点,新建一个 context,于 10秒 后开始,1 分钟后结束。下一个 context 从 1分20秒 开始 create context NonOverlap10SecFor1Min start after 10 seconds end after 1 minute
8、Context Nesting
Context 也可以嵌套,意义就是多个 Context 联合在一起组成一个大的 Context,以满足复杂的限制需求。
语法
create context context_name context nested_context_name [as] nested_context_definition , context nested_context_name [as] nested_context_definition [, ...]
举例
create context NineToFiveSegmented context NineToFive start (0, 9, *, *, *) end (0, 17, *, *, *), context SegmentedByUser partition by userId from User
用法和普通的 Context 没区别。
另外针对嵌套 Context,其自带的属性使用方式会有些变化。比如针对上面这个,若想查看 NineToFive 的 startTime 和 SegmentedByUser 的第一个属性值,要按照下面这样写:
context NineToFiveSegmented select context.NineToFive.startTime, context.SegmentedByUser.key1 from User
9、Output When Context Partition Ends
当 Context 销毁时,如果你想同时查看此时 Context 里的东西,那么Esper提供了一种办法来输出其内容。例如:
create context OverLapping initiated InitialEvent terminated TerminateEvent context OverLapping select * from User output snapshot when terminated
当终结事件发送到引擎后,会立刻输出 OverLapping 的快照。如果你想以固定的频率查看 Context 的内容,Esper也支持。例如:
-- 每两分钟输出OverLapping的事件 context OverLapping select * from User output snapshot every 2 minute
以上的内容算是包含了Context的所有方面,可能还有些细节需要各位自己去研读他的手册,并且多加练习。
补充一下 Context和 和 Group by 有什么区别?
其实如果只是很简单的用Context,那么确实没太大区别,无非是在Context下select可以不包含group by修饰的属性。但是Group by明显没有Context强大,很多复杂的分组Group by是没法做到的。不过在能达到同样效果的情况下,我还是建议使用Group by,毕竟Context的名字是不能重复的,而且在高并发的情况下Context会短时间锁住。至于原因,这已经是Esper的高级篇了,这里暂且不说。
未经允许请勿转载:程序喵 » Esper教程 —— Context上下文有哪些(4)
程序喵