1 storm消息的容错机制
说明:数据在处理中出现异常,需要保证数据被完整处理;需求:Spout---A---B---C---D,当其中一个环节出现异常时,Spout能够重新发送一份数据问题:1、Spout如何知道一条消息的处理状态? 成功:ack(Object msgId) 失败:fail(Object msgId) 2、Bolt如何告知Spout的处理状态? collector.emit(new Value()); collector.ack(tuple);//当消息处理成功时 collector.fail(tuple);//当消息处理失败时
1.1 Storm中的ack机制
1.1.2 Spout发送一条数据出去,需要知道数据处理成功和失败的状态,如果失败进行消息重新发送
步骤: a.自定义Spout实现BaseRichSpout,覆写ack和fail的方法;b.在自定义Spout发送数据的时候,需要指定messageId,messageId其实是一个Object;c.当消息处理成功或失败的时候,Storm框架会将messageId传输回来。如果消息要重发,直接通过messageId找到或直接转化成数据内容进行重发;d.自定义Bolt实现BaseRichBolt,在Bolt的execute的方法中进行两个操作: ①:发送数据时需要指定血缘关系(锚点),即collector.emit(父tuple,new 子tuple) ②:当executor处理完业务逻辑的时候,需要告诉Storm框架当前阶段处理的状态,即collector.ack(tuple)或collector.fail(tuple)
1.1.3编写Storm程序时,在bolt环节忘记手动ack或fail,Storm框架会等待反馈,达到超时的阈值之后,直接给fail
1.1.4编写Storm程序时,在bolt环节忘记标识锚点,Storm框架会认为你不关心后面阶段处理状况
说明:Storm框架针对BaseRichBolt的api过于繁琐,Storm框架引入另外的api:BaseBasicBolt。如果实现了BaseBasicBolt,就不需要手动指定锚点、手动ack和fail
1.2 Storm框架ack机制的原理分析
1.3 Acker机制总结
1、Spout发送一条消息,会由系统生成一个RootId;2、由于用户在发送数据的时候,指定了MessageId,所以在发送数据的时候会创建PendingMap,PendingMap以RootId为key,以用户的MessageId为Value;3、Spout将数据发送给Bolt的时候,不仅发送一份DataTuple消息给下游的Bolt,还发送一份AckTuple消息给AckerBolt。DataTuple和AckTuple的区别是AckTuple没有数据只有RootId和锚点Id;4、Bolt处理完业务逻辑之后,发送一份ack信号,将ack信号转成AckTuple。