Hi,
I am currently working on Oracle CDC with Golden Gate thats sending CDC events to Kafka.
I have created a pipeline to load data kafka into stored procedure.
Inserts and deletes are fine however update is challenging.
Here are the list of fields in the Kafka JSON message and I am extracting these as input parameters to Stored Proc, “before” and “after” are JSON fields which I plan to extract fields within Stored Procedure:
table - Name of the source table
op_type - Contains operation type: I = Insert, U = Update and D = Delete
op_ts - timestamp when this operation was executed in source database(I assume)
current_ts - current timestamp
before - JSON type, populated with primary key of the source table for delete events
after - JSON type, For inserts: I see entire row data(all columns), For updates (columns that are updated as part of the operation are included, others arent part of the JSON string)
How can I update MemSQL table given that fields in the “after” JSON is dynamic and contains only fields that have changed in source table as part of the operation.
I have initially thought of either constructing update query dynamically but that turned out to be complicated. Another option was to get current data from MemSQL table, and replace it with data from changed columns in “after” JSON and then replace record in MemSQL by providing all columns but this means three operations for an update (Select, Replace(delete & insert)). I was hoping “before” will be populated with entire row for update operation but thats not the case either.
Also with select and replace approach, I am still running into an issue, when a field is not available in the “after” JSON, it is returning NULL so I wont be able to figure out whether NULL is actual value for a given column/field that is updated as part of the operation or if its due to missing field in the JSON.
How can this be solved? Please provide some suggestions.
Has anyone successfully implemented CDC with Oracle golden date with Kafka as target and message format as JSON?