Key-Value - Input
In this tutorial we use key-values. Key-values are entries that take record a key and the corresponding value. This example will focus on taking inputs from a key-value topic to a non-key-value topic.
Prerequisites
This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.
Dataflow
Overview
In this example, we will write an example that takes inputs from a key-value topic then transforms it via a filter-map to select only students in a certain class.
Define the types
For this example, we will not be using primitive types. We have the following objects.
types:
student-info:
type: object
properties:
age:
type: u32
teacher:
type: string
grade:
type: f32
student-in-class:
type: object
properties:
name:
type: string
grade:
type: f32
We have two object types here. A student-info that stores the value of the source. And student-in-class that captures the sink's type.
Topic List
The following is our list of topics.
topics:
student-list:
schema:
key:
type: string
value:
type: student-info
profx-students:
schema:
value:
type: student-in-class
Source
The student-list is the source topic. It maps a primitive string to a student-info. For our example, the primitive string will be the name of the student.
Sink
The profx-students is the sink topic. It contains the type student-in-class defined above.
Transform
We will apply a transform to take the tuple (String,StudentInfo) entries into StudentInClass. We will apply a filter-map transform to filter out students that do not have profX in their teacher entry.
transforms:
- operator: filter-map
run: |
fn make_user(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
if info.teacher != "profX" {
return Ok(None);
}
let ret = StudentInClass{
name: name.unwrap_or("".to_string()),
grade: info.grade
};
Ok(Some(ret))
}
In this transfrom function, the input parameters contain a variable for the key, name, and another for the value, info.
Running the Example
Full Code
Copy and paste following config and save it as dataflow.yaml.
apiVersion: 0.5.0
meta:
name: key-value-type
version: 0.1.0
namespace: example
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End
types:
student-info:
type: object
properties:
age:
type: u32
teacher:
type: string
grade:
type: f32
student-in-class:
type: object
properties:
name:
type: string
grade:
type: f32
topics:
student-list:
schema:
key:
type: string
value:
type: student-info
profx-students:
schema:
value:
type: student-in-class
services:
filter-profx:
sources:
- type: topic
id: student-list
transforms:
- operator: filter-map
run: |
fn student_filter(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
if info.teacher != "profX" {
return Ok(None);
}
let ret = StudentInClass{
name: name.unwrap_or("".to_string()),
grade: info.grade
};
Ok(Some(ret))
}
sinks:
- type: topic
id: profx-students
Running SDF
To run example:
$ sdf run
Produce data
We will produce some data by first writing it into a file name student.txt.
Jerry>{"age":13,"teacher":"profX","grade":90.1}
Tom>{"age":14,"teacher":"profY","grade":99.2}
Jane>{"age":-1,"teacher":"profZ","grade":99.11}
Terry>{"age":13,"teacher":"profX","grade":50}
We can produce data via
$ fluvio produce student-list --key-separator ">" -f student.txt
We should see the following output if we consume with -k
$ fluvio consume student-list -Bdk
[Jerry] {"age":13,"teacher":"profX","grade":90.1}
[Tom] {"age":14,"teacher":"profY","grade":99.2}
[Jane] {"age":-1,"teacher":"profZ","grade":99.11}
[Terry] {"age":13,"teacher":"profX","grade":50}
Consume data
To consume the data.
$ fluvio consume profx-students -Bd
{"grade":90.1,"name":"Jerry"}
{"grade":50.0,"name":"Terry"}
Only students with teacher names profX is sent to the sink.
Cleanup
Exit sdf terminal and clean-up. The --force flag removes the topics:
$ sdf clean --force
Conclusion
This how-to focused on using key-values as inputs. The following pages contains another example of key-value as inputs.