1 year ago

#388289

test-img

Antonio Miranda

Spring cloud data flow custom stream auto kafka binding

Actually I'm doing my own streams in spring cloud data flow, developing my own processors and sinks. But I'm a bit stucked with the internal kafka bindings. I will expose an example:

I make this stream:

enter image description here

The code of the custom proccesor seems like this:

@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    private CoordDispatcher dispatcher= new CoordDispatcher () ;

    @Bean
    public Function<String, List<Message>> split() {
        System.setProperty("spring.cloud.stream.function.bindings.split-in-0", "input");
        System.setProperty("spring.cloud.stream.function.bindings.split-out-0", "output");
        System.setProperty("spring.cloud.stream.bindings.input.destination", "processor-indorgps.time");
        System.setProperty("spring.cloud.stream.bindings.output.destination", "processor-indorgps.processor-indoorgps");
        return string -> {

....

The code and the stream works properly, but I'm a bit disgusting with these System.setProperty (Similar to write them in properties file). They are necesary to bind the kafka topics.

I remember that the older version used annotations (like @Input and @Output ) or something to bind the topics directly.

Any idea to do it in the newest version ?

Thanks a lot.

Versions:

- dataflow=2.9.2
- skipper=2.8.2
- Spring boot version=2.6.3

Edit: I will add some info about the older version

The code was something like this

@EnableBinding(Processor.class)
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    @Splitter(inputChannel = Processor.INPUT, 
              outputChannel = Processor.OUTPUT)
    public List<Message> split(String in) throws ParseException, IOException {

        String uuid=".....";
        Long date = new Date().getTime();
        ...........


- Spring boot version=2.1.4.RELEASE
- dataflow=2.9.2
- skipper=2.8.2

In this version, I not need any property to bind the kafka topics. I think that this binding was done for the annotations @EnableBinding and @Splitter , but this annotations not exist in the new Spring boot version.

java

streaming

spring-cloud-stream

spring-cloud-dataflow

0 Answers

Your Answer

Accepted video resources