Hi Singlestore,
We are trying to load the data from Kafka and transform using the java application. We are running the java jar file from the bash script.
Pipeline
CREATE PIPELINE test_pipeline_transform
AS LOAD DATA KAFKA '<kafka-hostname:port>/<topic-name>'
WITH TRANSFORM('https://xxxx/xxxx/maven-app-transform.tar.gz','executable.sh','')
INTO TABLE test_table;
Bash script passed to the transform(maven-app-transform.tar.gz/executable.sh)
#!/bin/bash
java -jar $( dirname -- "$0"; )/my-app-1.0-SNAPSHOT.jar "$@"
Java file
public class App
{
public static void main( String[] args ) throws IOException {
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(System.out));
String input = readInput();
List<String> records = new ArrayList<>(Arrays.asList(input.substring(1).split("\n")));
for (String row : records) {
List<String> cellList = new ArrayList<>(Arrays.asList(row.split(",")));
String output = "";
for (String cell : cellList) {
output += cell + ",test-" + cell + "-123\n";
}
bufferedWriter.write(output);
}
bufferedWriter.flush();
}
public static String readInput() throws IOException {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
StringBuilder output = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
output.append(line);
output.append("\n");
}
bufferedReader.close();
return output.toString();
}
}
Published below messages in kafka topic
test 123
new line value
abc
We are able to transform the text. But we are getting one extra character in the first inserted message table as below. If we skip the first character in the java code then we are not getting the extra character.
Is the above approach correct or did I miss anything?
Please share any references for the pipeline transform using java if there are any.
Configurations:
-
Singlestore version - 8.0.4
-
Setup using cluster in a box
-
Red hat OS
-
Java version - 11
Thanks in advance