首页 技术 正文
技术 2022年11月15日
0 收藏 372 点赞 2,887 浏览 2911 个字

大致架构

* 每个应用实例部署一个日志agent

* agent实时将日志发送到kafka

* storm实时计算日志

* storm计算结果保存到hbase

storm消费kafka

  • 创建实时计算项目并引入storm和kafka相关的依赖
<dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-core</artifactId>    <version>1.0.2</version>    <scope>provided</scope></dependency><dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka</artifactId>    <version>1.0.2</version></dependency><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.10</artifactId>    <version>0.8.2.0</version></dependency>
  • 创建消费kafka的spout,直接用storm提供的KafkaSpout即可。
  • 创建处理从kafka读取数据的Bolt,JsonBolt负责解析kafka读取到的json并发送到下个Bolt进一步处理(下一步处理的Bolt不再写,只要继承BaseRichBolt就可以对tuple处理)。
public class JsonBolt extends BaseRichBolt {    private static final Logger LOG = LoggerFactory            .getLogger(JsonBolt.class);    private Fields fields;    private OutputCollector collector;    public JsonBolt() {        this.fields = new Fields("hostIp", "instanceName", "className",                "methodName", "createTime", "callTime", "errorCode");    }    @Override    public void prepare(Map stormConf, TopologyContext context,            OutputCollector collector) {        this.collector = collector;    }    @Override    public void execute(Tuple tuple) {        String spanDataJson = tuple.getString(0);        LOG.info("source data:{}", spanDataJson);        Map<String, Object> map = (Map<String, Object>) JSONValue                .parse(spanDataJson);        Values values = new Values();        for (int i = 0, size = this.fields.size(); i < size; i++) {            values.add(map.get(this.fields.get(i)));        }        this.collector.emit(tuple, values);        this.collector.ack(tuple);    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(this.fields);    }}
  • 创建拓扑MyTopology,先配置好KafkaSpout的配置SpoutConfig,其中zk的地址端口和根节点,将id为KAFKA_SPOUT_ID的spout通过shuffleGrouping关联到jsonBolt对象。
public class MyTopology {    private static final String TOPOLOGY_NAME = "SPAN-DATA-TOPOLOGY";    private static final String KAFKA_SPOUT_ID = "kafka-stream";    private static final String JsonProject_BOLT_ID = "jsonProject-bolt";    public static void main(String[] args) throws Exception {        String zks = "132.122.252.51:2181";        String topic = "span-data-topic";        String zkRoot = "/kafka-storm";        BrokerHosts brokerHosts = new ZkHosts(zks);        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot,                KAFKA_SPOUT_ID);        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());        spoutConf.zkServers = Arrays.asList(new String[] { "132.122.252.51" });        spoutConf.zkPort = 2181;        JsonBolt jsonBolt = new JsonBolt();        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConf));        builder.setBolt(JsonProject_BOLT_ID, jsonBolt).shuffleGrouping(                KAFKA_SPOUT_ID);        Config config = new Config();        config.setNumWorkers(1);        if (args.length == 0) {            LocalCluster cluster = new LocalCluster();            cluster.submitTopology(TOPOLOGY_NAME, config,                    builder.createTopology());            Utils.waitForSeconds(100);            cluster.killTopology(TOPOLOGY_NAME);            cluster.shutdown();        } else {            StormSubmitter.submitTopology(args[0], config,                    builder.createTopology());        }    }}
  • 本地测试时直接不带运行参数运行即可,放到集群是需带拓扑名称作为参数。
  • 另外需要注意的是:KafkaSpout默认从上次运行停止时的位置开始继续消费,即不会从头开始消费一遍,因为KafkaSpout默认每2秒钟会提交一次kafka的offset位置到zk上,如果要每次运行都从头开始消费可以通过配置实现。
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,088
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,565
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,413
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,186
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,822
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,905