May 8, 2025
The Evolution of Arbitrary Stateful Stream Processing in Spark

The Evolution of Arbitrary Stateful Stream Processing in Spark

Introduction

Stateful processing in Apache Spark™ Structured Streaming has evolved significantly to meet the growing demands of complex streaming applications. Initially, the applyInPandasWithState API allowed developers to perform arbitrary stateful operations on streaming data. However, as the complexity and sophistication of streaming applications increased, the need for a more flexible and feature-rich API became apparent. To address these needs, the Spark community introduced the vastly improved transformWithStateInPandas API, available in Apache Spark™ 4.0, which can now fully replace the existing applyInPandasWithState operator. transformWithStateInPandas provides far greater functionality such as flexible data modeling and composite types for defining state, timers, TTL on state, operator chaining, and schema evolution.

In this blog, we will focus on Python to compare transformWithStateInPandas with the older applyInPandasWithState API and use coding examples to show how transformWithStateInPandas can express everything applyInPandasWithState can and more.

By the end of this blog, you will understand the advantages of using transformWithStateInPandas over applyInPandasWithState, how an applyInPandasWithState pipeline can be rewritten as a transformWithStateInPandas pipeline, and how transformWithStateInPandas can simplify the development of stateful streaming applications in Apache Spark™.

Overview of applyInPandasWithState

applyInPandasWithState is a powerful API in Apache Spark™ Structured Streaming that allows for arbitrary stateful operations on streaming data. This API is particularly useful for applications that require custom state management logic. applyInPandasWithState enables users to manipulate streaming data grouped by a key and apply stateful operations on each group.

Most of the business logic takes place in the func, which has the following type signature.

For example, the following function does a running count of the number of values for each key. It’s worth noting that this function breaks the single responsibility principle: it’s responsible for handling when new data arrives, as well as when the state has timed out.

A full example implementation is as follows:

Overview of transformWithStateInPandas

transformWithStateInPandas is a new custom stateful processing operator introduced in Apache Spark™ 4.0. Compared to applyInPandasWithState, you’ll notice that its API is more object-oriented, flexible, and feature-rich. Its operations are defined using an object that extends StatefulProcessor, as opposed to a function with a type signature. transformWithStateInPandas guides you by giving you a more concrete definition of what needs to be implemented, thereby making the code much easier to reason about.

The class has five key methods:

  • init: This is the setup method where you initialize the variables etc. for your transformation.
  • handleInitialState: This optional step lets you prepopulate your pipeline with initial state data.
  • handleInputRows: This is the core processing stage, where you process incoming rows of data.
  • handleExpiredTimers: This stage lets you to manage timers that have expired. This is crucial for stateful operations that need to track time-based events.
  • close: This stage lets you perform any necessary cleanup tasks before the transformation ends.

With this class, an equivalent fruit-counting operator is shown below.

And it can be implemented in a streaming pipeline as follows:

Working with state

Number and types of state

applyInPandasWithState and transformWithStateInPandas differ in terms of state handling capabilities and flexibility. applyInPandasWithState supports only a single state variable, which is managed as a GroupState. This allows for simple state management but limits the state to a single-valued data structure and type. By contrast, transformWithStateInPandas is more versatile, allowing for multiple state variables of different types. In addition to transformWithStateInPandas's ValueState type (analogous to applyInPandasWithState’s GroupState), it supports ListState and MapState, offering greater flexibility and enabling more complex stateful operations. These additional state types in transformWithStateInPandas also bring performance benefits: ListState and MapState allow for partial updates without requiring the entire state structure to be serialized and deserialized on every read and write operation. This can significantly improve efficiency, especially with large or complex states.

  applyInPandasWithState transformWithStateInPandas
Number of state objects 1 many
Types of state objects GroupState (Similar to ValueState) ValueState
ListState
MapState

CRUD operations

For the sake of comparison, we will only compare applyInPandasWithState’s GroupState to transformWithStateInPandas's ValueState, as ListState and MapState have no equivalents. The biggest difference when working with state is that with applyInPandasWithState, the state is passed into a function; whereas with transformWithStateInPandas, each state variable needs to be declared on the class and instantiated in an init function. This makes creating/setting up the state more verbose, but also more configurable. The other CRUD operations when working with state remain largely unchanged.

  GroupState (applyInPandasWithState) ValueState (transformWithStateInPandas)
create Creating state is implied. State is passed into the function via the state variable. self._state is an instance variable on the class. It needs to be declared and instantiated.
def func(
    key: _,
    pdf_iter: _,
    state: GroupState
) -> Iterator[pandas.DataFrame]
class MySP(StatefulProcessor):
   def init(self, handle: StatefulProcessorHandle) -> None:
       self._state = handle.getValueState("state", schema)
read
state.get # or raise PySparkValueError
state.getOption # or return None
self._state.get() # or return None
update
state.update(v)
self._state.update(v)
delete
state.remove()
self._state.clear()
exists
state.exists
self._state.exists()

Let’s dig a little into some of the features this new API makes possible. It’s now possible to

  • Work with more than a single state object, and
  • Create state objects with a time to live (TTL). This is especially useful for use cases with regulatory requirements
  applyInPandasWithState transformWithStateInPandas
Work with multiple state objects Not Possible
class MySP(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        self._state1 = handle.getValueState("state1", schema1)
        self._state2 = handle.getValueState("state2", schema2)
Create state objects with a TTL Not Possible
class MySP(StatefulProcessor):
   def init(self, handle: StatefulProcessorHandle) -> None:
       self._state = handle.getValueState(
           state_name="state", 
           schema="c LONG", 
           ttl_duration_ms=30 * 60 * 1000 # 30 min
       )

Reading Internal State

Debugging a stateful operation used to be challenging because it was difficult to inspect a query’s internal state. Both applyInPandasWithState and transformWithStateInPandas make this easy by seamlessly integrating with the state data source reader. This powerful feature makes troubleshooting much simpler by allowing users to query specific state variables, along with a range of other supported options.

Below is an example of how each state type is displayed when queried. Note that every column, except for partition_id, is of type STRUCT. For applyInPandasWithState the entire state is lumped together as a single row. So it’s up to the user to pull the variables apart and explode in order to get a nice breakdown. transformWithStateInPandas gives a nicer breakdown of each state variable, and each element is already exploded into its own row for easy data exploration.

Operator State Class Read statestore
applyInPandasWithState GroupState
display(
 spark.read.format("statestore")
 .load("/Volumes/foo/bar/baz")
)

Group State

transformWithStateInPandas ValueState
display(
 spark.read.format("statestore")
 .option("stateVarName", "valueState")
 .load("/Volumes/foo/bar/baz")
)

Value State

ListState
display(
 spark.read.format("statestore")
 .option("stateVarName", "listState")
 .load("/Volumes/foo/bar/baz")
)

List State

MapState
display(
 spark.read.format("statestore")
 .option("stateVarName", "mapState")
 .load("/Volumes/foo/bar/baz")
)

Map State

Setting up the initial state

applyInPandasWithState doesn’t provide a way of seeding the pipeline with an initial state. This made pipeline migrations extremely difficult because the new pipeline couldn’t be backfilled. On the other hand, transformWithStateInPandas has a method that makes this easy. The handleInitialState class function lets users customize the initial state setup and more. For example, the user can use handleInitialState to configure timers as well.

  applyInPandasWithState transformWithStateInPandas
Passing in the initial state Not possible
.transformWithStateInPandas(
     MySP(),
     "fruit STRING, count LONG",
     "append",
     "processingtime",
     grouped_df
 )
Customizing initial state Not possible
class MySP(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        self._state = handle.getValueState("countState", "count LONG")
        self.handle = handle
  
    def handleInitialState(
        self, 
        key: Tuple[str], 
        initialState: pd.DataFrame, 
        timerValues: TimerValues
    ) -> None:
        self._state.update((initialState.at[0, "count"],))
        self.handle.registerTimer(
          timerValues.getCurrentProcessingTimeInMs() + 10000
        )

So if you’re interested in migrating your applyInPandasWithState pipeline to use transformWithStateInPandas, you can easily do so by using the state reader to migrate the internal state of the old pipeline into the new one.

Schema Evolution

Schema evolution is a crucial aspect of managing streaming data pipelines, as it allows for the modification of data structures without interrupting data processing.

With applyInPandasWithState, once a query is started, changes to the state schema are not permitted. applyInPandasWithState verifies schema compatibility by checking for equality between the stored schema and the active schema. If a user tries to alter the schema, an exception is thrown, resulting in the query’s failure. Consequently, any changes must be managed manually by the user.

Customers usually resort to one of two workarounds: either they start the query from a new checkpoint directory and reprocess the state, or they wrap the state schema using formats like JSON or Avro and manage the schema explicitly. Neither of these approaches is particularly favored in practice.

On the other hand, transformWithStateInPandas provides more robust support for schema evolution. Users simply need to update their pipelines, and as long as the schema change is compatible, Apache Spark™ will automatically detect and migrate the data to the new schema. Queries can continue to run from the same checkpoint directory, eliminating the need to rebuild the state and reprocess all the data from scratch. The API allows for defining new state variables, removing old ones, and updating existing ones with only a code change.

In summary, transformWithStateInPandas's support for schema evolution significantly simplifies the maintenance of long-running streaming pipelines.

Schema change applyInPandasWithState transformWithStateInPandas
Add columns (including nested columns) Not Supported Supported
Remove columns (including nested columns) Not Supported Supported
Reorder columns Not Supported Supported
Type widening (eg. Int → Long) Not Supported Supported

Working with streaming data

applyInPandasWithState has a single function that is triggered when either new data arrives, or a timer fires. It’s the user’s responsibility to determine the reason for the function call. The way to determine that new streaming data arrived is by checking that the state has not timed out. Therefore, it’s a best practice to include a separate code branch to handle timeouts, or there is a risk that your code will not work correctly with timeouts.

In contrast, transformWithStateInPandas uses different functions for different events:

  • handleInputRows is called when new streaming data arrives, and
  • handleExpiredTimer is called when a timer goes off.

As a result, no additional checks are necessary; the API manages this for you.

  applyInPandasWithState transformWithStateInPandas
Work with new data
def func(key, rows, state):
    if not state.hasTimedOut:
        ...
class MySP(StatefulProcessor):
    def handleInputRows(self, key, rows, timerValues):
        ...

Working with timers

Timers vs. Timeouts

transformWithStateInPandas introduces the concept of timers, which are much easier to configure and reason about than applyInPandasWithState’s timeouts.

Timeouts only trigger if no new data arrives by a certain time. Additionally, each applyInPandasWithState key can only have one timeout, and the timeout is automatically deleted every time the function is executed.

In contrast, timers trigger at a certain time without exception. You can have multiple timers for each transformWithStateInPandas key, and they only automatically delete when the designated time is reached.

  Timeouts (applyInPandasWithState) Timers (transformWithStateInPandas)
Number per key 1 Many
Trigger event If no new data arrives by time x At time x
Delete event On every function call At time x

These differences might seem subtle, but they make working with time so much simpler. For example, say you wanted to trigger an action at 9 AM and again at 5 PM. With applyInPandasWithState, you would need to create the 9 AM timeout first, save the 5 PM one to state for later, and reset the timeout every time new data arrives. With transformWithState, this is easy: register two timers, and it’s done.

Detecting that a timer went off

In applyInPandasWithState, state and timeouts are unified in the GroupState class, meaning that the two are not treated separately. To determine whether a function invocation is because of a timeout expiring or new input, the user needs to explicitly call the state.hasTimedOut method, and implement if/else logic accordingly.

With transformWithState, these gymnastics are no longer necessary. Timers are decoupled from the state and treated as distinct from each other. When a timer expires, the system triggers a separate method, handleExpiredTimer, dedicated solely to handling timer events. This removes the need to check if state.hasTimedOut or not – the system does it for you.

  applyInPandasWithState transformWithStateInPandas
Did a timer go off?
def func(key, rows, state):
    if state.hasTimedOut:
        # yes
        ...
    else:
        # no
        ...
class MySP(StatefulProcessor):
    def handleExpiredTimer(self, key, expiredTimerInfo, timerValues):
        when = expiredTimerInfo.getExpiryTimeInMs()
        ...

CRUDing with Event Time vs. Processing Time

A peculiarity in the applyInPandasWithState API is the existence of distinct methods for setting timeouts based on processing time and event time. When using GroupStateTimeout.ProcessingTimeTimeout, the user sets a timeout with setTimeoutDuration. In contrast, for EventTimeTimeout, the user calls setTimeoutTimestamp instead. When one method works, the other throws an error, and vice versa. Additionally, for both event time and processing time, the only way to delete a timeout is to also delete its state.

In contrast, transformWithStateInPandas offers a more straightforward approach to timer operations. Its API is consistent for both event time and processing time; and provides methods to create (registerTimer), read (listTimers), and delete (deleteTimer) a timer. With transformWithStateInPandas, it’s possible to create multiple timers for the same key, which greatly simplifies the code needed to emit data at various points in time.

  applyInPandasWithState transformWithStateInPandas
Create one
state.setTimeoutTimestamp(tsMilli)
self.handle.registerTimer(tsMilli)
Create many Not possible
self.handle.registerTimer(tsMilli_1)
self.handle.registerTimer(tsMilli_2)
read
state.oldTimeoutTimestamp
self.handle.listTimers()
update
state.setTimeoutTimestamp(tsMilli) # for EventTime
state.setTimeoutDuration(durationMilli) # for ProcessingTime
self.handle.deleteTimer(oldTsMilli)
self.handle.registerTimer(newTsMilli)
delete
state.remove() # but this deletes the timeout and the state
self.handle.deleteTimer(oldTsMilli)

Working with Multiple Stateful Operators

Chaining stateful operators in a single pipeline has traditionally posed challenges. The applyInPandasWithState operator does not allow users to specify which output column is associated with the watermark. As a result, stateful operators can’t be placed after an applyInPandasWithState operator. Consequently, users have had to split their stateful computations across multiple pipelines, requiring Kafka or other storage layers as intermediaries. This increases both cost and latency.

In contrast, transformWithStateInPandas can safely be chained with other stateful operators. Users simply need to specify the event time column when adding it to the pipeline, as illustrated below:

This approach lets the watermark information pass through to downstream operators, enabling late record filtering and state eviction without having to set up a new pipeline and intermediate storage.

Conclusion

The new transformWithStateInPandas operator in Apache Spark™ Structured Streaming offers significant advantages over the older applyInPandasWithState operator. It provides greater flexibility, enhanced state management capabilities, and a more user-friendly API. With features such as multiple state objects, state inspection, and customizable timers, transformWithStateInPandas simplifies the development of complex stateful streaming applications.

While applyInPandasWithState may still be familiar to experienced users, transformWithState's improved functionality and versatility make it the better choice for modern streaming workloads. By adopting transformWithStateInPandas, developers can create more efficient and maintainable streaming pipelines. Try it out for yourself in Apache Spark™ 4.0, and Databricks Runtime 16.2 and above.

Feature applyInPandasWithState (State v1) transformWithStateInPandas (State v2)
Supported Languages Scala, Java, and Python Scala, Java, and Python
Processing Model Function-based Object-oriented
Input Processing Processes input rows per grouping key Processes input rows per grouping key
Output Processing Can generate output optionally Can generate output optionally
Supported Time Modes Processing Time & Event Time Processing Time & Event Time
Fine-Grained State Modeling Not supported (only single state object is passed) Supported (users can create any state variables as needed)
Composite Types Not supported Supported (currently supports Value, List and Map types)
Timers Not supported Supported
State Cleanup Manual Automated with support for state TTL
State Initialization Partial Support (only available in Scala) Supported in all languages
Chaining Operators in Event Time Mode Not Supported Supported
State Data Source Reader Support Supported Supported
State Model Evolution Not Supported Supported
State Schema Evolution Not Supported Supported

Leave a Reply

Your email address will not be published. Required fields are marked *