本文是参考Apache Flink v1.3官方文档,本文所使用的是scala版本的API,基础架构参见《Learning Apache Flink(BASIC)》
业务场景
Flink接kafka的数据,然后通过初步的过滤得到一个结果集,再进行“打标签”,最后对“打标签”的结果进行过滤,最终输出到kafka中。例如,在topic foo中的数据表示"imsi,lac,cell",先通过imsi字段筛选出所有以460开头的字段,再通过lac和cell字段判断是否在指定的区域,增加一个字段isSpecifiedLocation,值为true或者false。最终输出到kafka中的字段为"imsi,lac,cell,isSpecifiedLocation,timestamp",且isSpecifiedLocation为true。
Flink读kafka数据
注:本文中所使用的kafka的版本为0.10.0
官方文档中Provided TableSources针对kafka指提供了json和avro格式的接入,所以如果是在topic中的数据是csv格式的,可以模仿Kafka010JsonTableSource
和JsonRowDeserializationSchema
自定义KafkaCsvTableSource
和CsvRowDeserializationSchema
解析csv格式数据(具体实现参见完整代码章节),然后就可以通过下面的方法注册一个TableSource
//Register a TableSourceval kafkaTableSource = new KafkaCsvTableSource( "foo", properties, new CsvRowDeserializationSchema(typeInfo), typeInfo)tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource)val kafkaCsvTable = tableEnv.scan("KafkaCsvTable")
得到一个Table之后,就可以使用Table API,进行数据的过滤
val filterResult = kafkaCsvTable.where('imsi like "460%").select("imsi,lac,cell")
DataStream动态增加字段
- 将Table转换为DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream(filterResult)
- 增加字段
val newDsRows = dsRow.map(row => { val ret = new Row(row.getArity() + 2) for(i <- 0 to row.getArity()-1) { ret.setField(i, row.getField(i)) } val isSpecifiedLocation = if(ret.getField(1).equals(ret.getField(2))) true else false ret.setField(row.getArity(), isSpecifiedLocation) ret.setField(row.getArity()+1, System.currentTimeMillis()) ret})
- 再将新生成的DataStream注册为Table,进行最终的过滤
tableEnv.registerDataStream("newTable", newDsRows) val newKafkaCsvTable = tableEnv.scan("newTable") val newResult = newKafkaCsvTable.filter('isSpecifiedLocation === true).select("imsi,lac,cell,isSpecifiedLocation,timestamp")
Flink向kafka写数据
本文使用的是Flink提供的Kafka09JsonTableSink类直接将结果输出为json格式
val sink = new Kafka09JsonTableSink("bar", properties, new FlinkFixedPartitioner[Row]) newResult.writeToSink(sink)
测试用例
执行
./bin/flink run -c com.woople.streaming.scala.examples.kafka.FlinkKafkaDemo /opt/flink-tutorials-1.0-bundle.jar
向topic foo中写入
4601234,1,1
数据,在topic bar中可以得到{"imsi":"4601234","lac":"1","cell":"1","isSpecifiedLocation":true,"timestamp":1511222771896}
结果,如果输入的是4601234,2,1
则不符合条件不会输出。
Troubleshooting
在代码调试过程中遇到一个错误
org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
在网上找到FLINK-6500,参考里面的方法,在代码中添加了这行代码之后,问题解决了
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names)
完整代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.woople</groupId> <artifactId>flink-tutorials</artifactId> <version>1.0</version> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.10</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.10</artifactId> <version>1.3.2</version> </dependency> </dependencies> <build> <defaultGoal>package</defaultGoal> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <configuration> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <goals> <goal>copy-resources</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <id>eclipse-add-source</id> <goals> <goal>add-source</goal> </goals> </execution> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> <execution> <id>attach-scaladocs</id> <phase>verify</phase> <goals> <goal>doc-jar</goal> </goals> </execution> </executions> <configuration> <scalaVersion>2.11.8</scalaVersion> <recompileMode>incremental</recompileMode> <useZincServer>true</useZincServer> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <finalName>${project.artifactId}-${project.version}-bundle</finalName> </configuration> </plugin> </plugins> </build></project>
KafkaCsvTableSource.java
package com.woople.flink.streaming.connectors.kafka;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;import org.apache.flink.streaming.util.serialization.DeserializationSchema;import org.apache.flink.table.sources.StreamTableSource;import org.apache.flink.types.Row;import org.apache.flink.util.Preconditions;import java.util.Properties;public class KafkaCsvTableSource implements StreamTableSource<Row> { /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; /** Deserialization schema to use for Kafka records. */ private final DeserializationSchema<Row> deserializationSchema; /** Type information describing the result type. */ private final TypeInformation<Row> typeInfo; /** * Creates a generic Kafka {@link StreamTableSource}. * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param deserializationSchema Deserialization schema to use for Kafka records. * @param typeInfo Type information describing the result type. */ public KafkaCsvTableSource( String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, TypeInformation<Row> typeInfo) { this.topic = Preconditions.checkNotNull(topic, "Topic"); this.properties = Preconditions.checkNotNull(properties, "Properties"); this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information"); } /** * NOTE: This method is for internal use only for defining a TableSource. * Do not use it in Table API programs. */ @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment env) { // Version-specific Kafka consumer FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema); return env.addSource(kafkaConsumer); } @Override public TypeInformation<Row> getReturnType() { return typeInfo; } /** * Returns the version-specific Kafka consumer. * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param deserializationSchema Deserialization schema to use for Kafka records. * @return The version-specific Kafka consumer */ private FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { return new FlinkKafkaConsumer010<Row>(topic, deserializationSchema, properties); } /** * Returns the deserialization schema. * * @return The deserialization schema */ protected DeserializationSchema<Row> getDeserializationSchema() { return deserializationSchema; } @Override public String explainSource() { return ""; }}
CsvRowDeserializationSchema.java
package com.woople.flink.streaming.connectors.kafka;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.streaming.util.serialization.DeserializationSchema;import org.apache.flink.types.Row;import org.apache.flink.util.Preconditions;import java.io.IOException;public class CsvRowDeserializationSchema implements DeserializationSchema<Row> { /** Type information describing the result type. */ private final TypeInformation<Row> typeInfo; /** Field names to parse. Indices match fieldTypes indices. */ private final String[] fieldNames; /** Types to parse fields as. Indices match fieldNames indices. */ private final TypeInformation<?>[] fieldTypes; /** Flag indicating whether to fail on a missing field. */ private boolean failOnMissingField; /** * Creates a JSON deserialization schema for the given fields and types. * * @param typeInfo Type information describing the result type. The field names are used * to parse the JSON file and so are the types. */ public CsvRowDeserializationSchema(TypeInformation<Row> typeInfo) { Preconditions.checkNotNull(typeInfo, "Type information"); this.typeInfo = typeInfo; this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames(); this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes(); } @Override public Row deserialize(byte[] message) throws IOException { try { String messages = new String(message); String[] messagesArray = messages.split(","); Row row = new Row(fieldNames.length); for (int i = 0; i < fieldNames.length; i++) { row.setField(i, messagesArray[i]); } return row; } catch (Throwable t) { throw new IOException("Failed to deserialize JSON object.", t); } } @Override public boolean isEndOfStream(Row nextElement) { return false; } @Override public TypeInformation<Row> getProducedType() { return typeInfo; } /** * Configures the failure behaviour if a JSON field is missing. * * <p>By default, a missing field is ignored and the field is set to null. * * @param failOnMissingField Flag indicating whether to fail or not on a missing field. */ public void setFailOnMissingField(boolean failOnMissingField) { this.failOnMissingField = failOnMissingField; }}
FlinkKafkaDemo.scala
package com.woople.streaming.scala.examples.kafkaimport java.util.Propertiesimport com.woople.flink.streaming.connectors.kafka.{CsvRowDeserializationSchema, KafkaCsvTableSource}import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}import org.apache.flink.api.java.typeutils.RowTypeInfoimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSinkimport org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitionerimport org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._import org.apache.flink.types.Rowobject FlinkKafkaDemo { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val typeInfo = Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING) val properties = new Properties() properties.setProperty("bootstrap.servers", "10.1.236.66:6667") properties.setProperty("group.id", "test") //Register a TableSource val kafkaTableSource = new KafkaCsvTableSource( "foo", properties, new CsvRowDeserializationSchema(typeInfo), typeInfo) tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource) val kafkaCsvTable = tableEnv.scan("KafkaCsvTable") val filterResult = kafkaCsvTable.where('imsi like "460%").select("imsi,lac,cell") val dsRow: DataStream[Row] = tableEnv.toAppendStream(filterResult) { val types = Array[TypeInformation[_]]( Types.STRING, Types.STRING, Types.STRING, Types.BOOLEAN, Types.LONG) val names = Array("imsi","lac","cell","isSpecifiedLocation","timestamp") implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names) val newDsRows = dsRow.map(row => { val ret = new Row(row.getArity() + 2) for(i <- 0 to row.getArity()-1) { ret.setField(i, row.getField(i)) } val isSpecifiedLocation = if(ret.getField(1).equals(ret.getField(2))) true else false ret.setField(row.getArity(), isSpecifiedLocation) ret.setField(row.getArity()+1, System.currentTimeMillis()) ret }) tableEnv.registerDataStream("newTable", newDsRows) val newKafkaCsvTable = tableEnv.scan("newTable") val newResult = newKafkaCsvTable.filter('isSpecifiedLocation === true).select("imsi,lac,cell,isSpecifiedLocation,timestamp") val sink = new Kafka09JsonTableSink("bar", properties, new FlinkFixedPartitioner[Row]) newResult.writeToSink(sink) env.execute("Flink kafka demo") } }}
总结
本文只是一个简单的样例,代码中并没有考虑性能等因素。后续会对相关内容进行深入的研究。
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。