Using Hive Tables in EG Spark Sessions

I recon I might be lacking a fundamental piece of understanding here.

So far, I got my own pyspark kernels running in k8s (with my local jupyter lab connected to the eg) and inside I managed to read a parquet file from s3 (i.e. I am using a kernel image with all the necessary jars installed).

However, I thought it would be very useful to have Hive table functionality available.

My first question: Is this the following a misconception or bad practice? I can see that just creating dataframes and then saving to parquet files on s3 (to persist work between sessions and to share between multiple user) is theoretically possible. However, I think that being able to save data as tables and then be able to listTables() seems powerful and a lot more convenient. Is this already a misconception and not the intended use?

Assuming the above is fine, what I tried to do is to select a persistent path for spark.sql.warehouse.dir, enable hive support, and just give it a try.

    conf.set("spark.hadoop.fs.s3a.endpoint", s3_endpoint_loc)
    conf.set("spark.hadoop.fs.s3a.access.key", s3_access_key)
    conf.set("spark.hadoop.fs.s3a.secret.key", s3_secret_key )
    conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
    conf.set("spark.hadoop.fs.s3a.connection.maximum", 20)
    conf.set("spark.hadoop.fs.s3a.attempts.maximum", 20)
    conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    
    warehouse_loc = "s3a://<bucket_name>/"
    conf.set("spark.sql.warehouse.dir", warehouse_loc)
    spark = SparkSession.builder.appName(app_name).config(conf=conf).enableHiveSupport().getOrCreate()

What happend is that this setup seemed to work initially, i.e. I was able to create and then read back a table. The parquet files were created in my s3 bucket. However, if I run the notebook again, or run another notebook, the state is gone.

In particular, spark.catalog.listTables()returns an empty list. However, the data is still there in the s3 bucket and I even get an exception if I want to create it again, because the path is already taken.

I assume I am missing a crucial component in my setup. Could you please point me in the right direction and possible how to set that up?

Hi @BBuchhold,

I am by no means a Spark expert but it makes sense that the current Spark session would not reflect previous notebook kernel/Spark sessions since its lifecycle is essentially that of the notebook kernel.

It sounds like you’ve made good progress on the persistence side of things and I’m wondering if you could amend your “startup logic” to first load from s3 any state relative to your kernel session (if it exists) and essentially “hydrate” that into your Spark session?

Which kernelspec (and ProcessProxy) are you using? I believe the SparkOperator process proxy may provide more flexibility at the Spark session level.

You might also try opening a discussion in the EG repository (please cross-reference it with this post) to provide more exposure.

Thanks, that sounds interesting. In the meanwhile I found a post: Running Spark on Kubernetes with Remote Hive Metastore and S3 Warehouse | by Eugene Lopatkin | Towards Dev which sounded like exactly what I needed.

The spark data stored on s3 as I had already accomplished, but something persistent (the metastore service – here in-memory, later for prod workflows based on an RDBMS) to hold the metadata across sessions. However, if I follow the post, I can see the create table commands reach the logs of the metastore pods, but then writing the data fails, because the driver seems to try to write to /opt/hive/data/warehouse/... which doesn’t exist don’t he driver (and to the best of my understanding also shouldn’t exist), even though I passed the s3-path in all the config parameters. I am not sure if I am on the right track there.

W.r.t. the kernelspec:
I have added my own kernelspec that is basically spark_python_kubernetes but referencing my own image (for which I upgraded kernel-spark-py to use spark 3.5.0 and jdk-11 and then based my image off of that and downloaded a few more jars for s3 connectivity and pip installed a few packages) and some minor config changes (e.g. “–RemoteProcessProxy.spark-context-initialization-mode”, “none” to control the number of executors form the notebook and ImagePullPolicy Always). I believe for the purpose of my question, it should basically be equivalent to the spark_python_kubernetes. I have not looked into the SparkOperator process proxy at all and will do so next. If that doesn’t help, I will report back and also crosspost to the EG repository.

1 Like

Sorry, for not reporting back for so long.

  1. I indeed need to make use of the hive metastore to get the persistent catalog.
  2. My initial experiments with deploying a standalone-metastore to my kubernetes cluster failed, because the configuration for the s3-based warehouse has then to be made with the metastore. Fortunately, there are nice resources out there w.r.t. how to set it up, e.g. GitHub - naushadh/hive-metastore: Apache Hive Metastore as a Standalone server in Docker

Deploying a metastore pod + service and then referencing them in by SparkSession did the trick.

1 Like