I want to add TRANSFORM into my pipeline. I want to add new columns into the data coming from source.
my pipeline code :
CREATE OR REPLACE AGGREGATOR PIPELINE test.test_2
AS LOAD DATA FS ‘/home/osboxes/Desktop/data1.csv’
WITH TRANSFORM(‘file://localhost/home/osboxes/Desktop/trans.py’,’’,’’)
INTO TABLE test_tbl111
FIELDS TERMINATED BY ‘,’
IGNORE 1 LINES;
my transform code:
#!/usr/bin/python
import struct
import sys
import pandas as pd
import csv
binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer
def log(message):
“”"
Log an informational message to stderr which will show up in MemSQL in
the event of transform failure.
“”"
binary_stderr.write(message + b"\n")
def add_Data(file):
with open(file) as csv_file:
df = pd.read_csv(file)
df[“active_date”] = ’ 25/08/2020’
df_output = df.to_csv(df, index=False)
binary_stdout.write(dfFinal)
log(b"Begin transform")
We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
# Since this is an identity transform we just emit what we receive.
add_Data(data)
log(b"End transform")
Pipeline gets created successfully without errors.
But when TEST pipeline I get following error
Error:
ERROR 1953 ER_TRANSFORM_NONZERO_EXIT_VALUE: Leaf Error (127.0.0.1:3307): Transform for pipeline exited with nonzero exit code. Stderr: 2020-08-25 02:05:42.606 ERROR: Subprocess exited with failure result (2 : No such file or directory). Check that your transform has a proper shebang and is executable in this environment.
The WITH TRANSFORM clause does look syntactically right. Can you confirm that the paths /home/osboxes/Desktop/trans.py and ‘/usr/bin/python’ can be read and executed by the memsql user on the host/container on which the leaf is running?
For testing purposes, you can model exactly what the pipeline will do by running EXTRACT INTO OUTFILE (SingleStoreDB Cloud · SingleStore Documentation), adopting the same Linux user as memsqld, and piping the resulting file into the executable named by the TRANSFORM clause.
Thank you for responding. Yes, the paths /home/osboxes/Desktop/trans.py and ‘/home/osboxes/anaconda3/bin/python’ (changed this path) can be read and executed by the memsql user on the host/container on which the leaf is running. I am using VM version to learn memsql. So I have one Master Node, one Child node and one leaf node on same host with ports 3307,3308 & 3309 respectively.
Waiting for your reply.
When you say ‘changed this path’, do you mean you changed the transform to start with #!/home/osboxes/anaconda3/bin/python? One thing that may be relevant: we copy/download the transform executable to a memsql-managed directory at create pipeline time and don’t attempt to “check for updates”. So if you change it, make sure to drop and recreate (or just create or replace) the pipeline before testing the change.
In the cases I’ve tried locally, I see that error is unique to the case when the file after #! is not found. If the transform itself were running but failing to find one of the files it’s written to open, we’d output the python stack trace. If the transform script were itself not found, you’d get an error at create pipeline time. So indeed it seems /usr/bin/python couldn’t be found by memsql in the VM and I suspect that if the anaconda3 version does exist, recreating the pipeline will do the trick.