I’m forced to load all the data from the s3 bucket every time I create a pipeline. The set offset latest feature doesn’t work for s3. Is there any workarrond?
It’s intended to work for S3. What’s select @@memsql_version
and what behavior are you seeing?
On our end, I see earliest
and latest
handling a basic test case just fine:
MySQL [db]> create pipeline p as load data s3 'test' config '{"region": "us-east-1", "endpoint_url": "http://localhost:5553"}' into table j format avro (`a` <- % );
Query OK, 0 rows affected (0.381 sec)
MySQL [db]> select * from information_schema.pipelines_files;
+---------------+---------------+-------------+-----------------------+-----------+------------+
| DATABASE_NAME | PIPELINE_NAME | SOURCE_TYPE | FILE_NAME | FILE_SIZE | FILE_STATE |
+---------------+---------------+-------------+-----------------------+-----------+------------+
| db | p | S3 | sanity.avro | 1488949 | Unloaded |
+---------------+---------------+-------------+-----------------------+-----------+------------+
1 row in set (0.002 sec)
MySQL [db]> alter pipeline p set offsets latest;
Query OK, 0 rows affected (0.002 sec)
MySQL [db]> select * from information_schema.pipelines_files;
+---------------+---------------+-------------+-----------------------+-----------+------------+
| DATABASE_NAME | PIPELINE_NAME | SOURCE_TYPE | FILE_NAME | FILE_SIZE | FILE_STATE |
+---------------+---------------+-------------+-----------------------+-----------+------------+
| db | p | S3 | sanity.avro | 1488949 | Loaded |
+---------------+---------------+-------------+-----------------------+-----------+------------+
1 row in set (0.002 sec)
MySQL [db]> start pipeline p foreground;
Query OK, 0 rows affected (0.236 sec)
MySQL [db]> select * from j;
Empty set (0.005 sec)
MySQL [db]> alter pipeline p set offsets earliest;
Query OK, 0 rows affected (0.001 sec)
MySQL [db]> start pipeline p foreground;
Query OK, 100000 rows affected (2.216 sec)
MySQL [db]> select count(*) from j;
+----------+
| count(*) |
+----------+
| 100000 |
+----------+
1 row in set (0.287 sec)
That said, if you already have a pipeline which has loaded the data you’d like to skip, show create pipeline ... extended
will output a list of more specific alter pipeline
statements which can be run against a new pipeline to make it continue where the first pipeline left off.
Thanks for your answer, I have a bucket with many files. I want to start procesing only the new files added to the bucket
Creating a pipeline and then running alter pipeline p set offsets latest
should cause the pipeline to only load files added after the alter
query. Let us know if those aren’t the semantics you want or if it doesn’t appear to be working as advertised when you try it.
@sasha I tried again and still not working. I’m configuring the s3 pipeline like this. am I doing something wrong?
CREATE PIPELINE `test_s3_pipeline`
AS LOAD DATA S3 ‘test/testfolder/data/’
CONFIG ‘{“region”: “us-east-1”}’
CREDENTIALS ‘{“aws_access_key_id”: “XXXXXXXXXXXXXx”, “aws_secret_access_key”: “XXXXXXXXXXXXXXXXX”}’
BATCH_INTERVAL 60000
INTO TABLE data
FIELDS TERMINATED BY ‘\t’ ENCLOSED BY ‘’ ESCAPED BY ‘\’
LINES TERMINATED BY ‘\n’ STARTING BY ‘’
(
data
.raw_json
);
ALTER PIPELINE test_s3_pipeline SET OFFSETS LATEST;