-
Notifications
You must be signed in to change notification settings - Fork 995
Description
I am running into issue while reading data from Elastic Search.
Following steps to reproduce.
`
SPARK_CONF = {
"spark.jars.packages": "org.elasticsearch:elasticsearch-spark-30_2.12:8.15.1"
}
query = {
'query': {'match_all': {}}
}
schema = StructType([
#------
StructField("tags", ArrayType(StructType([
StructField("id", StringType(), True),
StructField("owningOrganizationUnit", StringType(), True),
StructField("parentId", StringType(), True),
StructField("tagId", StringType(), True),
StructField("tagName", StringType(), True),
StructField("tenantId", StringType(), True),
]), True), True),
StructField("securityContext", StructType([
StructField("tenantId", StringType(), True),
StructField("owningOrganizationUnit", ArrayType(StringType(), True), True)
]), True)
])
es_index_conf = {
'es.net.ssl': 'true',
'es.nodes.wan.only': 'true',
'es.read.field.as.array.include': 'securityContext.owningOrganizationUnit',
'es.resource': ****,
'es.nodes': -----,
'es.port': ------,
'es.net.http.auth.user': -------,
'es.net.http.auth.pass': ---------,
'es.query': json.dumps(query)
}
conf = SparkConf()
for k, v in self.app_config.SPARK_CONF.items():
conf.set(k, v)
spark = SparkSession.builder.config(conf=conf).appName("Sample").getOrCreate()
df = spark.read
.format("org.elasticsearch.spark.sql")
.options(**es_index_conf)
.schema(schema)
.load()
I am getting below erro with stack trace
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/13 16:23:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkRuntimeException: Error while encoding: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of string
Caused by: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ValidateExternalType_58$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_65$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_2_11$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)
... 20 more
`