PySpark's DataFrame API covers a lot of ground. Joins, aggregations, window functions, the works. But every now and then you hit a wall. Maybe you want to fan out hundreds of file uploads across your cluster, or you need a scipy function that has no Spark SQL equivalent. The native API just doesn't have a way to do these.
We ran into both problems. In this post, we will see how @pandas_udf and applyInPandas solved them, and what we learned along the way.
Both rely on Apache Arrow under the hood instead of Python's pickle protocol, which is what makes them fast. A standard @udf serializes and deserializes every single row individually across the JVM-Python boundary. At 100 million rows, that's 100 million pickle/unpickle cycles. Pandas UDFs skip all of that by transferring entire batches (default 10,000 rows) in Arrow's columnar format. For numeric columns without nulls, the conversion is zero-copy. The result is anywhere from 3x to 100x faster than row-at-a-time UDFs, depending on the workload. Those numbers come from the original Two Sigma benchmarks when they introduced the feature in Spark 2.3.
Use Case 1: Distributed Signal Processing with applyInPandas
Here's the problem: we have time-series signal data from multiple sensors, partitioned by sensor ID and recording session. For each sensor, we need to compute a power spectral densiy (PSD) spectrogram, a scipy.signal.spectrogram call that has no equivalent in Spark SQL.
The data lives in a silver layer table with columns like sensor_id, session_id, time, sampling_rate, and signal value columns for different states (Eg., signal_active, signal_idle, signal_combined). Each sensor-session combination needs its own spectrogram computation.
Spark can't do this with built-in functions. But what if we could group the data by sensor and session, hand each group to a regular Python function as a pandas DataFrame, run scipy on it, and collect the results back? That's exactly what applyInPandas does.
The Code
The function itself is plain Python. Nothing Spark-specific about it:
import pandas as pd
import numpy as np
import scipy.signal
def compute_spectrogram(group):
group = group.sort_values(by='time')
fs = group['sampling_rate'].iloc[0]
values = np.array(group['signal_value'])
freqs, _, Sxx = scipy.signal.spectrogram(
values, fs,
nperseg=int(fs),
mode='psd',
noverlap=None, # defaults to nperseg // 8 (~12.5% overlap)
window='hann'
)
return pd.DataFrame([{
'sensor_id': group['sensor_id'].iloc[0],
'session_id': group['session_id'].iloc[0],
'psd': np.nanmean(Sxx, axis=1),
'frequencies': freqs,
}])
scipy.signal.spectrogram computes the short-time Fourier transform under the hood, np.nanmean averages the power across time windows, and we pack the result into a single-row DataFrame.
Applying It at Scale
Let's apply it:
result_schema = "sensor_id string, session_id string, psd array<float>, frequencies array<float>"
result = (
df.groupBy('sensor_id', 'session_id')
.applyInPandas(compute_spectrogram, result_schema)
)
That's it. groupBy().applyInPandas() implements split-apply-combine: Spark shuffles data so all rows sharing the same group key land on the same executor, materializes each group as a pandas.DataFrame, runs our function, and collects the results back. Your output can have a completely different schema and row count from the input, which is exactly what we need here. Thousands of time-series rows go in, one row with array-typed PSD values comes out per group. This was introduced in Spark 3.0, replacing the older GROUPED_MAP pandas UDF type.
In the actual implementation, we ran this for multiple signal states (active, idle, combined) and across both baseline and observation periods. Same pattern, just more columns in the output schema. The function stays the same, you just loop over the states inside it.
Watch Out: Memory and Group Sizes
The entire group gets loaded into memory at once. If one group has 10x more rows than the others, that's your OOM (Out Of Memory) candidate. One thing I found helpful was running a quick check before applying:
from pyspark.sql.functions import desc
df.groupBy("sensor_id", "session_id").count().orderBy(desc("count")).show()
For signal processing workloads, scipy functions like spectrogram also create intermediate arrays roughly 2-3x the size of the input. If your groups are large, set spark.executor.pyspark.memory to give the Python worker dedicated memory outside the JVM heap.
applyInPandas solves the per-group computation problem. Hand each group to Python, run whatever science you need, collect the results. But what if the work isn't computation at all, what if it's I/O?
Use Case 2: Distributed SFTP Uploads with @pandas_udf
This one might raise some eyebrows. Using Spark to upload files via SFTP? Isn't that what a shell script is for?
In our case, a business requirement had us uploading around 500 CSV files totaling to around 140GB to an external SFTP server twice a week. Doing this sequentially from the driver would have taken hours, not to mention connection timeouts. What you really want is to fan out the uploads across all your executor cores, each opening its own SFTP connection and uploading its assigned files in parallel.
A @pandas_udf turns out to be a surprisingly clean way to do this.
The File Manifest
The trick is to create a Spark DataFrame of file paths, just metadata, no file content, and repartition it to control parallelism:
file_data = [{"local_path": f"{source_dir}{f}", "remote_path": f"{remote_dir}{f}"} for f in csv_files]
num_partitions = 100 # ~5 files per partition, see note below
files_df = spark.createDataFrame(file_data)
files_df = files_df.repartition(num_partitions)
The partition count matters, and not just on the Spark side. Each partition opens its own SFTP connection, so the number of partitions directly controls how many concurrent connections hit the server. SFTP servers have connection limits (OpenSSH's MaxStartups defaults to dropping connections beyond 10 unauthenticated and refusing all beyond 100), so blindly matching partitions to files can get you connection refusals. We went with 100 partitions for 500 files, roughly 5 files per partition. On our cluster that meant up to 100 concurrent SFTP connections, which the server handled fine.
Our files were more or less equally sized, so each partition took roughly the same time to finish its batch of 5 uploads. No stragglers. If your file sizes vary widely, fewer files per partition gives Spark finer scheduling granularity since it can assign work to idle cores faster. You could go as far as 1 file per partition, but then cap cluster concurrency with spark.executor.instances or spark.executor.cores to stay within what the SFTP server can handle.
The Upload UDF
@pandas_udf takes a function that operates on pandas.Series and makes it Spark-compatible. The variant we're using is Series to Series: it takes one or more pd.Series and returns a pd.Series of the same length. One file path in, one result row out.
When the return type is a StructType, Spark lets you return a pd.DataFrame instead of a pd.Series. The row-count rule still applies, but each row can carry multiple fields. That's how we get structured results (file name, status, error, duration) out of each upload:
import pandas as pd
from pyspark.sql.functions import pandas_udf, col
result_schema = "file string, status string, error string, size_bytes long, duration_sec double, attempt int"
@pandas_udf(result_schema)
def upload_batch(local_paths: pd.Series, remote_paths: pd.Series) -> pd.DataFrame:
results = []
for local_path, remote_path in zip(local_paths, remote_paths):
# upload_with_retry opens an SFTP connection, uploads the file,
# retries up to 3 times with exponential backoff on failure
results.append(upload_with_retry(local_path, remote_path))
return pd.DataFrame(results)
In the full implementation, upload_with_retry handles the SFTP connection (using a library like paramiko), retries up to 3 times with exponential backoff, and returns a dict with the file name, status, size, duration, and attempt count. Nothing Spark-specific, just plain Python I/O with error handling.
There are other @pandas_udf shapes too. Iterator of Series to Iterator of Series is useful when you have expensive one-time setup (loading a model, opening a connection) that you want to pay once per partition, not once per batch. And Series to Scalar works with groupBy().agg(). But for the SFTP pattern, the basic Series-to-Series variant is all we need.
Running It
results_df = (
files_df
.select(upload_batch(col("local_path"), col("remote_path")).alias("result"))
.select("result.*")
)
display(results_df)
Once the action triggers, Spark fans out the partitions across executor cores. Each partition's batch gets uploaded via its own SFTP connection. You get back a structured DataFrame with per-file status, so checking for failures is straightforward:
failed = results_df.filter(col("status") == "failed")
if failed.count() > 0:
raise Exception(f"Upload failed for {failed.count()} files")
What would have taken hours sequentially from the driver now finishes in minutes.
Watch Out: Closures, Secrets, and Retries
Closure serialization and secrets. Variables like the hostname and credentials get captured as closure variables and pickled to each executor. On Databricks this is actually the recommended approach since dbutils.secrets.get() is a driver-side-only API that can't be called from within a UDF. Fetch secrets on the driver, let closure serialization carry them. Databricks automatically redacts secret values in notebook output and logs. For additional safety, broadcast variables are another option.
Task retries and speculative execution. If an executor dies and Spark retries a task, your UDF runs again. If speculative execution is enabled (spark.speculation=true), Spark may run the same task on two executors concurrently, both attempting the upload. Make sure your uploads are idempotent (overwrite mode) or track completion externally.
Library availability. Libraries used inside a Pandas UDF must be installed on every executor node, not just the driver. On Databricks, numpy, scipy, pandas, and scikit-learn come pre-installed with the Runtime. For paramiko, install it as a cluster library or via %pip install.
When to Use Which
Two patterns, two different problems.
Use applyInPandas when each group needs its own computation and the output can be a completely different shape from the input. Different schema, different row count, scientific computation, per-group model training. The entire group lands in your function as a single pandas DataFrame.
Use @pandas_udf when you need to transform columns while keeping the same row count. It plugs into select() and withColumn() like any other UDF, just faster. But as we saw with the SFTP example, it also doubles as a distributed task runner where each row represents a unit of work rather than data to transform.
It should however be understood that if native Spark functions can do the job, those are the better choice. They're optimized by Catalyst, run on the JVM (or Photon on Databricks), and skip the Python serialization round-trip entirely. Reach for Pandas UDFs when you need the Python ecosystem: scipy, scikit-learn, paramiko, or anything else that doesn't exist in Spark SQL.
Wrapping Up
One of the things I found really useful about Pandas UDFs and applyInPandas is how they turn Spark into a general-purpose distributed computing platform. The entire Python ecosystem, available on every executor.
We covered two patterns here, per-group scientific computation and distributed I/O, but the same approach works for model training, custom statistical tests, image processing, and pretty much anything where you need Python libraries running at cluster scale.
That's a wrap guys!




