Hi,
Im trying to create a pipeline that will do the following:
- Read HDFS Folder with multiple JSON Files
- Use a pipeline to continously read new files
- Include in the pipeline a transform python script (that will transform the json to python pandas do some transformation and keep only the necessary columns)
- Load into a MemSQL Columnar Table
Currently my pipeline works withouth the tranform, this is the python script I made:
#!/usr/bin/python
import struct
import sys
from datetime import date, datetime
import time, json, ijson
import pandas as pd
from pandas.io.json import json_normalize
binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer
def input_stream():
"""
Consume STDIN and yield each record that is received from MemSQL
"""
while True:
byte_len = binary_stdin.read(8)
if len(byte_len) == 8:
byte_len = struct.unpack("L", byte_len)[0]
result = binary_stdin.read(byte_len)
yield result
else:
assert len(byte_len) == 0, byte_len
return
def log(message):
"""
Log an informational message to stderr which will show up in MemSQL in
the event of transform failure.
"""
binary_stderr.write(message + b"\n")
def transformToCSV(file):
with open(file) as jsonFile:
data = json.load(jsonFile)
dataNormal = json_normalize(data, sep='_')
df = pd.DataFrame(dataNormal)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['date'] = pd.to_datetime(df['timestamp']).dt.date
df = df[["metric_id","timestamp","date","service","type",
"scene","name","observation","observation"]]
dfFinal = df.to_csv(df, encoding="utf-8",index=False)
binary_stdout.write(dfFinal)
log(b"Begin transform")
# We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
# Since this is an identity transform we just emit what we receive.
transformToCSV(data)
log(b"End transform")
But when I test the pipeline I get the following error:
ERROR: ImportedOS::ExecuteSubprocessAsync()
(Pipeline without the transform works just fine)
What I am missing? I couldn’t find any example of using pipelines transform with hdfs data.
Thanks,