博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Strom的trident小例子
阅读量:7020 次
发布时间:2019-06-28

本文共 3956 字,大约阅读时间需要 13 分钟。

 

上代码:

1 public class TridentFunc { 2      3     /** 4      * 类似于普通的bolt 5      */ 6     public static class MyFunction extends BaseFunction{ 7         @Override 8         public void execute(TridentTuple tuple, TridentCollector collector) { 9             Integer value = tuple.getIntegerByField("sentence");10             System.out.println(value);11         }12     }13     14     public static void main(String[] args) {15         @SuppressWarnings("unchecked")16         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));17         spout.setCycle(true);//让spout循环发送数据18         19         TridentTopology tridentTopology = new TridentTopology();20         tridentTopology.newStream("spoutid",spout)21             .each(new Fields("sentence"), new MyFunction(), new Fields(""));22         23         LocalCluster localCluster = new LocalCluster();24         String simpleName = TridentFunc.class.getSimpleName();25         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());26         //运行结果就是  一直循环打印 1 2 1 2  27     }28 }

 多数据源

1 public class TridentMeger { 2      3     /** 4      * 类似于普通的bolt 5      */ 6     public static class MyFunction extends BaseFunction{ 7         @Override 8         public void execute(TridentTuple tuple, TridentCollector collector) { 9             Integer value = tuple.getIntegerByField("sentence");10             System.out.println(value);11         }12     }13     14     public static void main(String[] args) {15         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));16         //spout.setCycle(true);//让spout循环发送数据17         18         TridentTopology tridentTopology = new TridentTopology();19         //指定多个数据源,流连接20         Stream newStream1 = tridentTopology.newStream("spoutid1",spout);21         Stream newStream2 = tridentTopology.newStream("spoutid2",spout);22         23         //tridentTopology.newStream("spoutid",spout) 之前是这种  但是只能有 一个数据源  24         tridentTopology.merge(newStream1,newStream2)//使用这种就可以有多个数据源.25             .each(new Fields("sentence"), new MyFunction(), new Fields(""));26         27         LocalCluster localCluster = new LocalCluster();28         String simpleName = TridentMeger.class.getSimpleName();29         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());30     }31 }

 增加过滤器

1 public class TridentFilter { 2      3     /** 4      * 类似于普通的bolt 5      */ 6     public static class MyFunction extends BaseFunction{ 7         @Override 8         public void execute(TridentTuple tuple, TridentCollector collector) { 9             Integer value = tuple.getIntegerByField("sentence");10             System.out.println(value);11         }12     }13     14     public static class MyFilter extends BaseFilter{
//专门封装了一个Filter功能.15 //对数据进行过滤 如果过滤出的数据不要了就false 保留就ture16 @Override17 public boolean isKeep(TridentTuple tuple) {18 Integer value = tuple.getIntegerByField("sentence");19 return value%2==0?true:false; //只要偶数不要奇数20 }21 }22 23 public static void main(String[] args) {24 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));25 spout.setCycle(true);//让spout循环发送数据26 27 TridentTopology tridentTopology = new TridentTopology();28 tridentTopology.newStream("spoutid",spout) //这个地方只能指定一个数据源,如果想指定多个数据源Spout 看TridentMeger.java29 .each(new Fields("sentence"), new MyFilter())30 .each(new Fields("sentence"), new MyFunction(), new Fields(""));31 32 LocalCluster localCluster = new LocalCluster();33 String simpleName = TridentFilter.class.getSimpleName();34 localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());35 }36 }

 

转载于:https://www.cnblogs.com/DreamDrive/p/6675991.html

你可能感兴趣的文章
mkdir 命令
查看>>
linux目录结构详细介绍
查看>>
我的友情链接
查看>>
软工15团队作业2——团队计划
查看>>
shell脚本之-------------if 语句参数
查看>>
一年来,3D电视给力降价
查看>>
多业务安全路由器网关走俏的原因
查看>>
mysql自定义函数
查看>>
我的友情链接
查看>>
C# 25个必须知道的基础概念1
查看>>
this指针
查看>>
centos 零碎学习小记 7.
查看>>
我的友情链接
查看>>
通用权限管理系统组件遭遇VS2008是英文版,OS是日语XP后。。
查看>>
优化JAVA代码的效率
查看>>
Vim(Vi)编辑器的操作大全(2)——修改档案。
查看>>
我的友情链接
查看>>
Mysql修改存储过程相关权限问题
查看>>
百万宝贝影评感受
查看>>
海量数据高性能分页
查看>>