Skip to content

Commit fbbe38d

Browse files
committed
tests: Add Kinesis Streams test cases for compression
Signed-off-by: Shelby Hagman <shelbyzh@amazon.com>
1 parent f7b8134 commit fbbe38d

File tree

1 file changed

+156
-0
lines changed

1 file changed

+156
-0
lines changed

tests/runtime/out_kinesis.c

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,158 @@ void flb_test_kinesis_aggregation_many_records(void)
443443
flb_destroy(ctx);
444444
}
445445

446+
void flb_test_kinesis_compression_gzip(void)
447+
{
448+
int ret;
449+
flb_ctx_t *ctx;
450+
int in_ffd;
451+
int out_ffd;
452+
const char *record1 = "[1, {\"message\":\"gzip_test1\"}]";
453+
const char *record2 = "[1, {\"message\":\"gzip_test2\"}]";
454+
455+
setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1);
456+
457+
ctx = flb_create();
458+
459+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
460+
TEST_CHECK(in_ffd >= 0);
461+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
462+
463+
out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL);
464+
TEST_CHECK(out_ffd >= 0);
465+
flb_output_set(ctx, out_ffd, "match", "*", NULL);
466+
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
467+
flb_output_set(ctx, out_ffd, "stream", "fluent", NULL);
468+
flb_output_set(ctx, out_ffd, "compression", "gzip", NULL);
469+
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);
470+
471+
ret = flb_start(ctx);
472+
TEST_CHECK(ret == 0);
473+
474+
/* Push records with GZIP compression */
475+
flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1));
476+
flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2));
477+
478+
sleep(2);
479+
flb_stop(ctx);
480+
flb_destroy(ctx);
481+
}
482+
483+
void flb_test_kinesis_compression_zstd(void)
484+
{
485+
int ret;
486+
flb_ctx_t *ctx;
487+
int in_ffd;
488+
int out_ffd;
489+
const char *record1 = "[1, {\"message\":\"zstd_test1\"}]";
490+
const char *record2 = "[1, {\"message\":\"zstd_test2\"}]";
491+
492+
setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1);
493+
494+
ctx = flb_create();
495+
496+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
497+
TEST_CHECK(in_ffd >= 0);
498+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
499+
500+
out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL);
501+
TEST_CHECK(out_ffd >= 0);
502+
flb_output_set(ctx, out_ffd, "match", "*", NULL);
503+
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
504+
flb_output_set(ctx, out_ffd, "stream", "fluent", NULL);
505+
flb_output_set(ctx, out_ffd, "compression", "zstd", NULL);
506+
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);
507+
508+
ret = flb_start(ctx);
509+
TEST_CHECK(ret == 0);
510+
511+
/* Push records with ZSTD compression */
512+
flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1));
513+
flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2));
514+
515+
sleep(2);
516+
flb_stop(ctx);
517+
flb_destroy(ctx);
518+
}
519+
520+
void flb_test_kinesis_compression_snappy(void)
521+
{
522+
int ret;
523+
flb_ctx_t *ctx;
524+
int in_ffd;
525+
int out_ffd;
526+
const char *record1 = "[1, {\"message\":\"snappy_test1\"}]";
527+
const char *record2 = "[1, {\"message\":\"snappy_test2\"}]";
528+
529+
setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1);
530+
531+
ctx = flb_create();
532+
533+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
534+
TEST_CHECK(in_ffd >= 0);
535+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
536+
537+
out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL);
538+
TEST_CHECK(out_ffd >= 0);
539+
flb_output_set(ctx, out_ffd, "match", "*", NULL);
540+
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
541+
flb_output_set(ctx, out_ffd, "stream", "fluent", NULL);
542+
flb_output_set(ctx, out_ffd, "compression", "snappy", NULL);
543+
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);
544+
545+
ret = flb_start(ctx);
546+
TEST_CHECK(ret == 0);
547+
548+
/* Push records with Snappy compression */
549+
flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1));
550+
flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2));
551+
552+
sleep(2);
553+
flb_stop(ctx);
554+
flb_destroy(ctx);
555+
}
556+
557+
void flb_test_kinesis_compression_snappy_with_aggregation(void)
558+
{
559+
int ret;
560+
flb_ctx_t *ctx;
561+
int in_ffd;
562+
int out_ffd;
563+
int i;
564+
char record[100];
565+
566+
setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1);
567+
568+
ctx = flb_create();
569+
570+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
571+
TEST_CHECK(in_ffd >= 0);
572+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
573+
574+
out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL);
575+
TEST_CHECK(out_ffd >= 0);
576+
flb_output_set(ctx, out_ffd, "match", "*", NULL);
577+
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
578+
flb_output_set(ctx, out_ffd, "stream", "fluent", NULL);
579+
flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL);
580+
flb_output_set(ctx, out_ffd, "compression", "snappy", NULL);
581+
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);
582+
583+
ret = flb_start(ctx);
584+
TEST_CHECK(ret == 0);
585+
586+
/* Push many records with Snappy compression and aggregation */
587+
for (i = 0; i < 20; i++) {
588+
ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"snappy_agg\"}]", i);
589+
TEST_CHECK(ret < sizeof(record));
590+
flb_lib_push(ctx, in_ffd, record, strlen(record));
591+
}
592+
593+
sleep(3);
594+
flb_stop(ctx);
595+
flb_destroy(ctx);
596+
}
597+
446598
/* Test list */
447599
TEST_LIST = {
448600
{"success", flb_test_firehose_success },
@@ -457,5 +609,9 @@ TEST_LIST = {
457609
{"aggregation_with_time_key", flb_test_kinesis_aggregation_with_time_key },
458610
{"aggregation_with_log_key", flb_test_kinesis_aggregation_with_log_key },
459611
{"aggregation_many_records", flb_test_kinesis_aggregation_many_records },
612+
{"compression_gzip", flb_test_kinesis_compression_gzip },
613+
{"compression_zstd", flb_test_kinesis_compression_zstd },
614+
{"compression_snappy", flb_test_kinesis_compression_snappy },
615+
{"compression_snappy_with_aggregation", flb_test_kinesis_compression_snappy_with_aggregation },
460616
{NULL, NULL}
461617
};

0 commit comments

Comments
 (0)