Skip to content

Commit f5e4ce3

Browse files
committed
tests: Add Kinesis Streams test cases for compression
Signed-off-by: Shelby Hagman <[email protected]>
1 parent 936d54d commit f5e4ce3

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed

tests/runtime/out_firehose.c

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,121 @@ void flb_test_firehose_aggregation_with_compression(void)
375375
flb_destroy(ctx);
376376
}
377377

378+
void flb_test_firehose_compression_zstd(void)
379+
{
380+
int ret;
381+
flb_ctx_t *ctx;
382+
int in_ffd;
383+
int out_ffd;
384+
const char *record1 = "[1, {\"message\":\"zstd_test1\"}]";
385+
const char *record2 = "[1, {\"message\":\"zstd_test2\"}]";
386+
387+
setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1);
388+
389+
ctx = flb_create();
390+
391+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
392+
TEST_CHECK(in_ffd >= 0);
393+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
394+
395+
out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL);
396+
TEST_CHECK(out_ffd >= 0);
397+
flb_output_set(ctx, out_ffd, "match", "*", NULL);
398+
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
399+
flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL);
400+
flb_output_set(ctx, out_ffd, "compression", "zstd", NULL);
401+
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);
402+
403+
ret = flb_start(ctx);
404+
TEST_CHECK(ret == 0);
405+
406+
/* Push records with ZSTD compression */
407+
flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1));
408+
flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2));
409+
410+
sleep(2);
411+
flb_stop(ctx);
412+
flb_destroy(ctx);
413+
}
414+
415+
void flb_test_firehose_compression_snappy(void)
416+
{
417+
int ret;
418+
flb_ctx_t *ctx;
419+
int in_ffd;
420+
int out_ffd;
421+
const char *record1 = "[1, {\"message\":\"snappy_test1\"}]";
422+
const char *record2 = "[1, {\"message\":\"snappy_test2\"}]";
423+
424+
setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1);
425+
426+
ctx = flb_create();
427+
428+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
429+
TEST_CHECK(in_ffd >= 0);
430+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
431+
432+
out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL);
433+
TEST_CHECK(out_ffd >= 0);
434+
flb_output_set(ctx, out_ffd, "match", "*", NULL);
435+
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
436+
flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL);
437+
flb_output_set(ctx, out_ffd, "compression", "snappy", NULL);
438+
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);
439+
440+
ret = flb_start(ctx);
441+
TEST_CHECK(ret == 0);
442+
443+
/* Push records with Snappy compression */
444+
flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1));
445+
flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2));
446+
447+
sleep(2);
448+
flb_stop(ctx);
449+
flb_destroy(ctx);
450+
}
451+
452+
void flb_test_firehose_compression_snappy_with_aggregation(void)
453+
{
454+
int ret;
455+
flb_ctx_t *ctx;
456+
int in_ffd;
457+
int out_ffd;
458+
int i;
459+
char record[100];
460+
461+
setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1);
462+
463+
ctx = flb_create();
464+
465+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
466+
TEST_CHECK(in_ffd >= 0);
467+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
468+
469+
out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL);
470+
TEST_CHECK(out_ffd >= 0);
471+
flb_output_set(ctx, out_ffd, "match", "*", NULL);
472+
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
473+
flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL);
474+
flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL);
475+
flb_output_set(ctx, out_ffd, "compression", "snappy", NULL);
476+
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);
477+
478+
ret = flb_start(ctx);
479+
TEST_CHECK(ret == 0);
480+
481+
/* Push many records with Snappy compression and aggregation */
482+
for (i = 0; i < 20; i++) {
483+
ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"snappy_agg\"}]", i);
484+
TEST_CHECK(ret < sizeof(record));
485+
flb_lib_push(ctx, in_ffd, record, strlen(record));
486+
}
487+
488+
sleep(3);
489+
flb_stop(ctx);
490+
flb_destroy(ctx);
491+
}
492+
378493
void flb_test_firehose_aggregation_combined_params(void)
379494
{
380495
int ret;
@@ -536,6 +651,9 @@ TEST_LIST = {
536651
{"aggregation_with_log_key", flb_test_firehose_aggregation_with_log_key },
537652
{"aggregation_many_records", flb_test_firehose_aggregation_many_records },
538653
{"aggregation_with_compression", flb_test_firehose_aggregation_with_compression },
654+
{"compression_zstd", flb_test_firehose_compression_zstd },
655+
{"compression_snappy", flb_test_firehose_compression_snappy },
656+
{"compression_snappy_with_aggregation", flb_test_firehose_compression_snappy_with_aggregation },
539657
{"aggregation_combined_params", flb_test_firehose_aggregation_combined_params },
540658
{"aggregation_empty_records", flb_test_firehose_aggregation_empty_records },
541659
{"aggregation_error_handling", flb_test_firehose_aggregation_error_handling },

0 commit comments

Comments
 (0)