Pulsar Flink connector is an elastic data processing with Apache Pulsar and Apache Flink.
- Java 8 or later
- Flink 1.9.0 or later
- Pulsar 2.4.0 or later
For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
groupId = io.streamnative.connectors
artifactId = pulsar-flink-connector_{{SCALA_BINARY_VERSION}}
version = {{PULSAR_FLINK_VERSION}}
Currently, the artifact is available in Bintray Maven repository of StreamNative.
For Maven project, you can add the repository to your pom.xml
as follows:
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
To build an application JAR that contains all dependencies required for libraries and pulsar flink connector, you can use the following shade plugin definition template:
<plugin>
<!-- Shade all the dependencies to avoid conflicts -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<minimizeJar>false</minimizeJar>
<artifactSet>
<includes>
<include>io.streamnative.connectors:*</include>
<!-- more libs to include here -->
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
As with any Flink applications, ./bin/flink run
is used to compile and launch your application.
If you have already built a fat jar using the shade maven plugin above, your jar can be added to flink run
using --classpath
.
The format of a path must be a protocol (for example,
file://
) and the path should be accessible on all nodes.
Example
$ ./bin/flink run
-c com.example.entry.point.ClassName file://path/to/jars/your_fat_jar.jar
...
For experimenting on the interactive Scala shell bin/start-scala-shell.sh
, you can use --addclasspath
to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
directly.
Example
$ ./bin/start-scala-shell.sh remote <hostname> <portnumber>
--addclasspath pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
For more information about submitting applications with CLI, see Command-Line Interface.
For playing with SQL Client Beta and writing queries in SQL to manipulate data in Pulsar, you can use --jar
to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
directly.
Example
$ ./bin/sql-client.sh embedded --jar pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar
By default, to use Pulsar catalog in SQL Client and get it registered automatically at startup, the SQL Client reads its configuration from the environment file ./conf/sql-client-defaults.yaml
. You need to add Pulsar catalog to catalogs
section in this YAML file:
catalogs:
- name: pulsarcatalog
type: pulsar
default-database: tn/ns
service.url: "pulsar://localhost:6650"
admin.url: "http://localhost:8080"
The following examples are in Scala.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("service.url", "pulsar://...")
props.setProperty("admin.url", "http://...")
props.setProperty("partitionDiscoveryIntervalMillis", "5000")
props.setProperty("startingOffsets", "earliest")
props.setProperty("topic", "test-source-topic")
val source = new FlinkPulsarSource(props)
// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable
val dataStream = env.addSource(source)(null)
// chain operations on dataStream of Row and sink the output
// end method chaining
env.execute()
The following examples are in Scala.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val props = new Properties()
props.setProperty("service.url", "pulsar://...")
props.setProperty("admin.url", "http://...")
props.setProperty("partitionDiscoveryIntervalMillis", "5000")
props.setProperty("startingOffsets", "earliest")
props.setProperty("topic", "test-source-topic")
tEnv
.connect(new Pulsar().properties(props))
.inAppendMode()
.registerTableSource("pulsar-test-table")
The following options must be set for the Pulsar source.
Option | Value | Description |
---|---|---|
`topic` | A topic name string | The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`topics` | A comma-separated list of topics | The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`topicsPattern` | A Java regex string | The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`service.url` | A service URL of your Pulsar cluster | The Pulsar `serviceUrl` configuration. |
`admin.url` | A service HTTP URL of your Pulsar cluster | The Pulsar `serviceHttpUrl` configuration. |
The following configurations are optional.
Option | Value | Default | Description |
---|---|---|---|
`startingOffsets` | The following are valid values:
|
"latest" |
Note:
|
`partitionDiscoveryIntervalMillis` | A long value or a string which can be converted to long | -1 |
|
-
For topics without schema or with primitive schema in Pulsar, messages payload is loaded to a
value
column with the corresponding type with Pulsar schema. -
For topics with Avro or JSON schema, their field names and field types are kept in the result rows.
Besides, each row in the source has the following metadata fields as well.
Column | Type |
---|---|
`__key` | Bytes |
`__topic` | String |
`__messageId` | Bytes |
`__publishTime` | Timestamp |
`__eventTime` | Timestamp |
Example
The Pulsar topic of AVRO schema s (example 1) converted to a Flink table has the following schema (example 2).
Example 1
case class Foo(@BeanProperty i: Int, @BeanProperty f: Float, @BeanProperty bar: Bar)
case class Bar(@BeanProperty b: Boolean, @BeanProperty s: String)
val s = Schema.AVRO(Foo.getClass)
Example 2
root
|-- i: INT
|-- f: FLOAT
|-- bar: ROW<`b` BOOLEAN, `s` STRING>
|-- __key: BYTES
|-- __topic: STRING
|-- __messageId: BYTES
|-- __publishTime: TIMESTAMP(3)
|-- __eventTime: TIMESTAMP(3)
The following is the schema of a Pulsar topic with Schema.DOUBLE
:
root
|-- value: DOUBLE
|-- __key: BYTES
|-- __topic: STRING
|-- __messageId: BYTES
|-- __publishTime: TIMESTAMP(3)
|-- __eventTime: TIMESTAMP(3)
The DataStream written to Pulsar can have an arbitrary type.
For DataStream[Row], __topic
field is used to identify the topic this message will be sent to, __key
is encoded as metadata of Pulsar message, and all the other fields are grouped and encoded using AVRO and put in value()
:
producer.newMessage().key(__key).value(avro_encoded_fields)
For DataStream[T] where T is a POJO type, each record in data stream will be encoded using AVRO and put in Pulsar messages value()
, optionally, you could provide an extra topicKeyExtractor
that identify topic and key for each record.
The following examples are in Scala.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = .....
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))
env.execute()
The following examples are in Scala.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
.connect(new Pulsar().properties(props))
.inAppendMode()
.registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()
The following options must be set for a Pulsar sink.
Option | Value | Description |
---|---|---|
`service.url` | A service URL of your Pulsar cluster | The Pulsar `serviceUrl` configuration. |
`admin.url` | A service HTTP URL of your Pulsar cluster | The Pulsar `serviceHttpUrl` configuration. |
The following configurations are optional.
Option | Value | Default | Description |
---|---|---|---|
`topic` | A topic name string | None | The topic to be write to. If this option is not set, DataStreams or tables write to Pulsar must contain a TopicKeyExtractor that return nonNull topics or `__topic` field. |
`flushOnCheckpoint` | Whether flush all records write until checkpoint and wait for confirms. | true |
At-least-once semantic is achieved when |
`failOnWrite` | Whether fail the sink while sending records to Pulsar fail. | false | None |
Currently, we provide at-least-once semantic when flushOnCheckpoint
is set to true
. Consequently, when writing streams to Pulsar, some records may be duplicated.
We would provide exactly-once sink semantic when Pulsar has transaction supports.
Client/producer/consumer configurations of Pulsar can be set in properties
with pulsar.client.
/pulsar.producer.
/pulsar.consumer.
prefix.
Example
prop.setProperty("pulsar.consumer.ackTimeoutMillis", "10000")
For possible Pulsar parameters, see Pulsar client libraries.
Flink always searches for tables, views, and UDFs in the current catalog and database. To use Pulsar catalog and treat topics in Pulsar as tables in Flink, you should use pulsarcatalog
that has been defined in ./conf/sql-client-defaults.yaml
.
tableEnv.useCatalog("pulsarcatalog")
tableEnv.useDatabase("public/default")
tableEnv.scan("topic0")
Flink SQL> USE CATALOG pulsarcatalog;
Flink SQL> USE `public/default`;
Flink SQL> select * from topic0;
The following configurations are optional in environment file or can be overridden in a SQL client session using the SET
command.
Option | Value | Default | Description |
---|---|---|---|
`default-database` | The default database name. | public/default | A topic in Pulsar is treated as a table in Flink when using Pulsar catalog, therefore, `database` is another name for `tenant/namespace`. The database is the basic path for table lookup or creation. |
`startingOffsets` | The following are valid values:
|
"latest" | `startingOffsets` option controls where a table reads data from. |
`table.partitions` | The default number of partitions when a table is created in Table API. | 5 | A table in Pulsar catalog is a topic in Pulsar, when creating table in Pulsar catalog, `table.partitions` controls the number of partitions when creating a topic. |
If you want to build a Pulsar Flink connector reading data from Pulsar and writing results to Pulsar, follow the steps below.
-
Check out the source code.
$ git clone https://github.com/streamnative/pulsar-flink.git $ cd pulsar-flink
-
Install Docker.
Pulsar-flink connector is using Testcontainers for integration tests. In order to run the integration tests, make sure you have installed Docker.
-
Set a Scala version.
Change
scala.version
andscala.binary.version
inpom.xml
.Scala version should be consistent with the Scala version of flink you use.
-
Build the project.
$ mvn clean install -DskipTests
-
Run the tests.
$ mvn clean install
Once the installation is finished, there is a fat jar generated under both local maven repo and target
directory.