[Android] How We Managed to Send Events Sequentially with Rate Limiting And Controlling Each Event?
While working with one of our products, we figured out one of the strange problems: we were working over some feature and it was triggering lot of events while this was required, we wanted those but still we wanted to limit them somehow also we saw sevents triggered go in any random order, and some events are getting fired multiple times though they were expected, but still, we need to control those. In this article, we will delve into one such scenario with code!
Problem statement:
Imagine we have four distinct events: E1, E2, E3, and E4, each of which can trigger several times within a short period. Each event can trigger multiple times within a 5-second window. Our goal is to limit the number of times each event is processed to a maximum of 10 within that period.
As shown in the image:
- E1 triggers 6 times, all of which are allowed.
- E2 triggers 15 times, but only 10 occurrences are processed.
- E3 triggers 3 times, all are processed.
- E4 triggers 12 times, but only 10 are allowed.
The task is to rate-limit each event to ensure no more than 10 occurrences of any event are processed in a 5-second window. This 5 second window is specific to a event.
What’s an Event?
When working with our Android application, events are essential. Think of them as heartbeat pulses. Just like checking your pulse by placing two fingers on your wrist — if it’s beating, you know something is happening. Is it fast? Is it slow? These rhythms give us important signals.
Similarly, events help us track activities in the app, make decisions, detect irregularities, or identify opportunities for improvement.
Earlier Implementation
In our previous setup, we had an AnalyticsUseCase
that called the sendEvent
method to trigger events. We manually created a coroutine scope and triggered our events.
// Use case class for sending analytics events
class AnalyticUseCase @Inject constructor(
private val analyticRepo: AnalyticsRepo // Injecting repository to handle analytics operations
): UseCase<EventParam, Unit>() {
// Executes the use case, sending the event via the repository
override suspend fun run(params: EventParam): Result<Failure, Unit> {
return Result.Success(analyticRepo.sendEvent(params)) // Return success after sending event
}
}
// Data class representing the event parameters
data class EventParam(
var eventName: String // Event name, can be extended with more properties
)
// Function that triggers an event
fun sendEvent() {
// Launches a coroutine on IO dispatcher to send the event asynchronously
scope.launch(Dispatchers.IO) {
// Calls the use case to send the event with the given parameters
analyticUseCase.invoke(
scope = this, // The current coroutine scope
params = EventParam(eventName = "Sample_Event") // Example event being sent
)
}
}
Our Issue?
Since this use case was called from various parts of the application, multiple threads were created, triggering many events simultaneously. All these events hitting the sendEvent
function at once caused overlapping, irregularities, and a lack of rate limiting on how they were sent to the server. This led us to adopt a sequential approach for handling event triggers, ensuring smoother and more predictable behavior.
Thinking
When we think of this, the first thing that comes to our mind is a Queue that will be filled and polled to trigger events sequentially, and for Rate Limit, we will somehow maintain a hashmap of events and count, or we will use a token system that will be over after 10 attempts within 5 seconds and which will be kept on refreshing.
Somewhere I should add thread safety and this should work.
Since, for every event we will be having a rate limiter token system will how to refresh token for each events? should I go for N times while(true)? where N = no. of unique events
Our Solution: Sequential Event Handling via Flow with Rate Limiting via token method!
To address the issues, we tried implementing a singleton and single-thread classPublishEventManager
that ensures events are sent sequentially and incorporates a rate limiter to avoid sending too many events quickly.
To incorporate sequential over single-threaded system, we thought of going with Mutex instead of Synchronizations
Why We Used Mutex
A Mutex
(mutual exclusion) is a synchronization primitive that ensures only one coroutine can access a critical code section at a time.
In our implementation, we used a Mutex
to ensure safe and sequential processing of events.
The Problem with Concurrency
- Race Conditions: Multiple events accessing shared resources simultaneously cause unexpected behavior.
- Out-of-Order Execution: Events might not triggered in sequence.
- Resource Contention: Competing threads can degrade performance and cause deadlocks.
How Mutex Helps
- Sequential Processing: By using
mutex.withLock
, we ensure that only one event is processed at a time, preserving the order of events as they were triggered. - Preventing Race Conditions: It prevents multiple coroutines from accessing shared resources simultaneously, avoiding race conditions.
- Thread Safety: It provides a safe way to synchronize access to shared resources without using complex and error-prone manual synchronization mechanisms.
Singleton : TriggerEventManager
The TriggerEventManager
class is designed to manage the publishing of events in a sequential manner using a combination of coroutines, a single-threaded executor, and a Mutex
. Instead of Queue we went with a Flow approach.
It ensures that events are processed one at a time and adds a rate-limiting mechanism to prevent overloading the system with too many event triggers in a short period. It has aConcurrentHashMap
to maintain a list of rate limiters for each event type, ensuring that no more than 10 events are processed within a 5-second window.
@Singleton
class TriggerEventManager @Inject constructor(
private val analyticHelper: AnalyticsHelper // Dependency to handle actual event publishing
) {
// A shared flow to hold events for sequential processing
private val _eventsFlow = MutableSharedFlow<EventParams>()
val eventsFlow = _eventsFlow.asSharedFlow()
// Single-threaded dispatcher to ensure events are processed sequentially
private val thread = Executors.newSingleThreadExecutor()
private val dispatcher = thread.asCoroutineDispatcher()
private val scope = CoroutineScope(dispatcher + SupervisorJob())
// Mutex to synchronize access and maintain order of event processing
private val mutex = Mutex()
// ConcurrentHashMap to store rate limiters for different event types
private val rateLimitersMap = ConcurrentHashMap<String, RateLimiter>()
init {
// Collect events from the flow and process them sequentially using Mutex
scope.launch {
eventsFlow.collect { event ->
mutex.withLock {
// Retrieve or create a rate limiter for the event type
getRateLimiter(event).execute()
}
}
}
}
// Method to send an event for processing
fun sendEvent(event: EventParams) {
scope.launch {
_eventsFlow.emit(event) // Add the event to the flow
}
}
// Retrieves the RateLimiter for an event or creates a new one if not already present
private suspend fun getRateLimiter(event: EventParams): RateLimiter {
return rateLimitersMap.computeIfAbsent(event.eventName) {
RateLimiter(
scope = scope,
maxAttempts = 10, // Maximum 10 events in the time window
timeWindowMillis = 5000, // 5 seconds time window
callback = {
// Callback triggered when rate limit allows event execution
scope.launch {
Log.d("RateLimiter", "Event Triggered: ${event.eventName}")
analyticHelper.sendEvent(event) // Actual event sending logic
}
},
onExpire = {
// Remove the rate limiter after the time window expires
rateLimitersMap.remove(event.eventName)
}
)
}
}
}
Purpose: To retrieve or create a RateLimiter
for a given event type, ensuring that events of the same type are rate-limited effectively.
Functionality:
- Retrieve or Create: It checks if a
RateLimiter
for the specified event type already exists in theConcurrentHashMap
. If not, it creates a new one. - Configuration: The
RateLimiter
is configured to allow a maximum of 10 events every 5 seconds. - Callback: When the rate limit allows, the
RateLimiter
triggers thecallback
, which processes the event by callinganalyticHelper.sendEvent(event)
. - Cleanup: After the time window expires, the
RateLimiter
is removed from the map to free up resources.
Internal RateLimiter Class
The RateLimiter
class is responsible for limiting the rate at which specific events can be sent. It uses coroutines and flows to manage tokens and refill them periodically, and they will expire after a set duration.
Thought Process
Initially, we considered refilling tokens continuously in a while(true)
loop for each event, but we realized this would be inefficient. Instead, we decided to limit token refills to three cycles and then expire the tokens. To ensure controlled access, we marked the class as internal
, restricting its use to our analytics module.
// Internal RateLimiter Class
// Controls how often specific events can be triggered
internal class RateLimiter(
private val scope: CoroutineScope, // Coroutine scope to manage background tasks
private val maxTokens: Int, // Maximum number of allowed events (tokens)
private val rateWindowMillis: Long, // Time window for token refill (in milliseconds)
private val callback: () -> Unit = {}, // Action to perform when an event is allowed
private val onExpire: () -> Unit = {}, // Action when rate limiter expires
private val EXPIRATION_DURATION: Long = 15000L // Time after which limiter expires (e.g., 15 seconds)
) {
private var refillCollection: Job // Job to handle token collection
private var refillEmission: Job // Job to handle periodic token refill
private var tokens: AtomicInteger = AtomicInteger(maxTokens) // Current token count, thread-safe
private val _refillFlow = MutableSharedFlow<Unit>() // Flow to emit refill events
val refillFlow = _refillFlow.asSharedFlow() // Expose the refill flow as read-only
// Initialization block, starts token refill and token collection jobs
init {
// Emission job refills tokens at regular intervals, stops after expiration duration
refillEmission = scope.launch {
for (i in 0..ceil(EXPIRATION_DURATION.toDouble() / rateWindowMillis).toInt()) {
delay(rateWindowMillis) // Wait for the rate window
_refillFlow.emit(Unit) // Emit token refill event
}
stop() // Stop refilling after expiration
}
// Collection job listens for refill events and refills tokens
refillCollection = scope.launch {
refillFlow.collect {
refillTokens() // Refill the tokens when a new event is collected
}
}
}
// Refills the token to the maximum count
private fun refillTokens() {
tokens.set(maxTokens)
}
// THIS IS THE METHOD WHICH IS CALLED FROM TriggerEventManager
// Called to execute an event if tokens are available
fun execute() {
if (tokens.getAndDecrement() > 0) {
callback.invoke() // Trigger event if tokens are available
} else {
Log.d("RateLimiter", "Limit exceeded") // Log if the rate limit is exceeded
}
}
// Stops the rate limiter when expired
fun stop() {
onExpire.invoke() // Trigger onExpire callback
refillEmission.cancel() // Cancel the emission job
refillCollection.cancel() // Cancel the collection job
}
}
How It Works
- Sequential Execution: The
TriggerEventManager
uses a single-threaded executor to ensure that events are handled one at a time. - Mutex for Safety: A
Mutex
locks the event handling, ensuring that only one event is processed. - Rate Limiting: The
RateLimiter
the class manages the rate at which events can be sent, preventing any single event type from overwhelming the system. - Expiration: Rate limiters are automatically removed after a period of inactivity to free up resources.
Usage Example
Here’s how you can use them TriggerEventManager
in your application:
fun main() {
// Assuming you have a way to get an instance of PublishEventManager via DI
val manager: TriggerEventManager = TriggerEventManager(AnalyticsHelper())// Create events
val event1 = EventParams("Event1")
val event2 = EventParams("Event2")
// Send events
manager.sendEvent(event1)
manager.sendEvent(event2)
}
This setup ensures that your events are handled, controlled, and efficiently, improving your application's stability and performance.
Conclusion
By implementing a TriggerEventManager
with a rate limiter, we managed to control the flow of events in our application, ensuring they were sent sequentially and within specified rate limits. This approach helped us avoid the issues of overlapping and irregular event triggers, improving the reliability and performance of our analytics system.
I thought of writing an article on this. I feel great that I have such amazing teammates with whom we can discuss these and finally implement what we believe.
Do share insights or mention any possible issues/feedback/upgrades or how this could have been improved further.
About the Author
I’m just a passionate Android developer who loves finding creative elegant solutions to complex problems. Feel free to chat on LinkedIn for Android-related stuff and more. Thank You for reading this!