ElasticsearchSinkFunction

 class StringData {
    private String data;
 }

 StringData data = new StringData(jsonData);

IndexRequest indexRequest = Requests.indexRequest()
                .index(indices)
                .type("_doc")
                .id(id)
                .version(System.currentTimeMillis())
                .versionType(VersionType.EXTERNAL)
                // .source(data, XContentType.JSON) 直接传入对象,会调用进行嵌套解析导致错误。
                .source(mapper.writeValueAsString(data), XContentType.JSON)
                ;