By building on our asynchronous foundation, we have successfully transformed our AI agent from a manually-triggered tool into a fully autonomous, self-managing application.By building on our asynchronous foundation, we have successfully transformed our AI agent from a manually-triggered tool into a fully autonomous, self-managing application.

Autonomous Agents at Scale: Lessons from Redis + Symfony Scheduler

In the article “Your AI Agent Is Too Slow—Here’s How to Fix It” we significantly sped up our AI Agent by using an asynchronous, message-bus, and CQRS-based approach.

We’re now ready to take the next step: making our application completely autonomous from human intervention for polling incoming information channels. This opens up a wealth of exciting future development opportunities.

Enhancing State Management and Scalability

In our previous guide, “Forget Plug-and-Play AI—Here’s How to Roll Your Own” we polled a mailbox without any state persistence, which could lead to reprocessing emails. That example was intentionally simple. Now, we’re upgrading our application and introducing state management without a database (Doctrine Component). This enhancement will allow us to run multiple parallel instances of our application, each with multi-threaded consumers. This approach not only improves hardware resource utilization but also ensures the seamless, reliable processing of incoming data.

We could, of course, retrieve only UNSEEN emails from a mailbox. However, this approach has several limitations. For example, another bot or human might have accessed the mailbox, or a previous error might have prevented successful processing. Moreover, this method is only suitable for IMAP and not for messages from platforms like messengers or SMS providers.

To overcome these limitations, we’ll start by learning how to store the state of the last successful polling of our incoming message channels. While we could use a full-fledged database for this, we’ll avoid it for now since we don’t need that level of complexity and performance gain just yet. This approach will allow us to maintain state efficiently without the overhead of a traditional database, making our application more flexible and scalable.

We can store our application’s current state in a file on the system. However, if we want to use a stateless approach for deploying application instances, we’ll lose the latest state with every container restart. While this might not be critical, it’s not an ideal solution. Additionally, with a file-based approach, multiple application instances will face significant issues with concurrent access and file locking.

The Redis Solution

So what’s the solution? In the article “Your AI Agent Is Too Slow—Here’s How to Fix It” we already integrated Redis for the transport layer of our asynchronous application. Why not use Redis to store our state as well? It’s a robust, distributed key-value store that’s perfect for this use case, allowing us to maintain a shared state across multiple instances without the file-locking problems. This approach ensures our application remains both scalable and stateless, providing a clean and efficient architecture.

Using Dedicated Cache Pools in Symfony

We can leverage the Symfony Cache Component to store data. However, it’s crucial that this data isn’t deleted when the standard cache:clear command is executed.

This is where unique cache pools come in. They are a core feature of the Symfony Cache Component that allows us to create separate, isolated storage areas. By configuring a dedicated pool for our state data, we ensure that it remains persistent and is not affected by the clearing of the application’s primary cache. This approach provides a clean and reliable way to manage persistent application state.

Installing the Symfony Cache Component

First, let’s install the Symfony Cache Component and its dependencies. We can do this easily using Composer. Run the following command in your terminal:

\

composer require symfony/cache 

\ This command will install the necessary packages and integrate the component into your Symfony application, allowing us to start implementing our persistent cache pools.

We’ll add environment variable to our .env file to connect to the Redis Cache Database.

\

REDIS_DSN_CACHE=redis://localhost:6379/11 

\ /11: This is a crucial part of the DSN. It specifies that the connection should use database number 11. Redis supports 16 databases (numbered 0–15) by default, which allows you to logically separate your data within the same Redis instance.

For this example, we’ll make some changes in config/packages/cache.yaml file:

\

framework:     cache:         app: cache.adapter.redis         default_redis_provider: '%env(REDIS_DSN_CACHE)%'          pools:             inbound.state.cache:                 public: true 

\ We’ve now successfully added a dedicated cache pool to store the state of our data-fetching operations from external sources. This is a crucial step towards building a truly autonomous application.

Why a Separate Cache Pool Matters

By creating a unique cache pool, we’ve achieved two key goals:

Persistence: The data in this pool will not be cleared when you run bin/console cache:clear. This ensures that our application will always remember the state of the last successful operation, even after a standard cache reset or a new deployment.

Isolation: This pool is a self-contained unit, isolated from the rest of the application’s cache. This prevents any other part of the system from accidentally modifying or deleting our state data, guaranteeing its integrity.

This approach is highly effective for building stateless, scalable applications. By storing the state in a shared, external system like Redis, multiple instances of our application can access and update the same state information seamlessly. This eliminates the file-locking issues we discussed earlier and enables parallel processing, significantly improving performance and hardware utilization.

Clearing the Cache Pool

Simply run the following command in your terminal:

\

./bin/console cache:pool:clear inbound.state.cache 

\ This command specifically targets and clears the inbound.state.cache pool, leaving all other cache data untouched. It’s a useful tool for debugging or for a full application state reset when necessary, giving you full control over your cache management.

Abstracting the Channel State

Let’s create a DTO (Data Transfer Object) to manage the state of our inbound information channels. To prepare for future expansion and different types of channels, we’ll design a flexible and scalable structure.

First, we’ll create an abstract base class, ChannelState.php. This class will define the common properties that all our channels will share, such as the message ID and the date of creation or reception.

This abstract class will act as a blueprint, ensuring that every new channel we add in the future will have these essential parameters, creating a consistent data structure across our application.

\

namespace App\DTO\State;  abstract class ChannelState {     public function __construct(         protected ?string    $lastMessageKey = null,         protected ?\DateTimeImmutable $lastMessageDate = null     )     {}      public function getLastMessageKey(): ?string     {         return $this->lastMessageKey;     }      public function setLastMessageKey(?string $lastMessageKey): void     {         $this->lastMessageKey = $lastMessageKey;     }      public function getLastMessageDate(): ?\DateTimeImmutable     {         return $this->lastMessageDate;     }      public function setLastMessageDate(?\DateTimeImmutable $lastMessageDate): void     {         $this->lastMessageDate = $lastMessageDate;     } } 

\

Implementing a Specific Channel

Next, we’ll extend this base class to create a concrete DTO for our email channel, MailChannelState.php. This class will inherit the common properties and add specific ones, such as the mailbox name and folder, which are unique to email processing.

\

namespace App\DTO\State\Channel;  use App\DTO\State\ChannelState;  class MailChannelState extends ChannelState {     public function __construct(         protected ?string    $lastMessageKey = null,         protected ?\DateTimeImmutable $lastMessageDate = null,         protected ?string    $email = null,         protected ?string    $folder = null,     )     {         parent::__construct($lastMessageKey, $lastMessageDate);     }      public function getEmail(): ?string     {         return $this->email;     }      public function setEmail(?string $email): void     {         $this->email = $email;     }      public function getFolder(): ?string     {         return $this->folder;     }      public function setFolder(?string $folder): void     {         $this->folder = $folder;     } } 

\ This approach allows us to easily add new channel types in the future (e.g., for SMS or messenger platforms) by simply creating new classes that extend our ChannelState abstract class. This design promotes code reuse and maintains a clean, organized architecture.

Creating the ChannelStateCacheService

We need to create a service to handle all our interactions with the cache pool. This service will be responsible for saving and retrieving state data from the cache, while intelligently handling the specific class type and additional parameters of our ChannelState implementations.

\

namespace App\Service;  use App\DTO\State\Channel\MailChannelState; use App\DTO\State\ChannelState; use Symfony\Contracts\Cache\CacheInterface;  readonly class ChannelStateService{     private const string STATE_CACHE_KEY_PREFIX = 'state_';      public function __construct(private CacheInterface $inboundStateCache){}      protected function getCacheKey(ChannelState $state): string     {         $reflection = new \ReflectionClass($state);          return self::STATE_CACHE_KEY_PREFIX.$reflection->getShortName().'_'.             hash('sha256',                 match(get_class($state)){                     MailChannelState::class => $state->getEmail() . $state->getFolder(),                     default => ''                 });     }      public function setChannelState(ChannelState $state): bool{         $cacheItem = $this->inboundStateCache->getItem($this->getCacheKey($state));          $cacheItem->set($state);         $this->inboundStateCache->save($cacheItem);          return true;     }     public function getChannelState(ChannelState $state): null|ChannelState{         $cacheItem = $this->inboundStateCache->getItem($this->getCacheKey($state));          if ($cacheItem->isHit()) {             return $cacheItem->get();         }         return null;     } } 

\ By centralizing this logic in a single service, we make our code cleaner, more manageable, and easier to test. This also abstracts the underlying cache implementation, so if we ever decide to switch from Redis to something else (like Memcached), we only need to update the configuration and this single service, without changing the rest of our application’s code.

Why Use a Hash?

A hash is a useful tool for generating a checksum from a list of parameters to create a unique cache key.

When we have multiple parameters that determine a unique cache key (for example, a mailbox name, folder, and an offset), simply concatenating them can be messy and lead to very long, unwieldy keys. A hash function takes all these parameters as input and produces a fixed-length string of characters, or a digest.

This digest acts as a unique fingerprint for that specific set of parameters. Even a minor change in any of the input values will result in a completely different hash, which is perfect for ensuring cache key uniqueness.

Step 1: Update the ImapMailService

Now we just need to update the ImapMailService to make its properties and methods protected. This will allow us to inherit from it and create a new service, ImapMailStateService, which will handle the logic for saving and retrieving the current state.

First, open ImapMailService.php and change the visibility of the properties and methods you want to be accessible to child classes from private to protected. This makes it possible for the new ImapMailStateService to interact with them directly.

Step 2: Create the ImapMailStateService

Next, create the new service ImapMailStateService.php and make it extend the ImapMailService you just modified. This new class will contain the logic for saving and retrieving the state using our previously created ChannelStateCacheService.

This is an example of the Decorator Pattern, where you extend a class’s functionality without changing its core behavior. Our ImapMailStateService will inherit all the original mail-fetching capabilities and simply “decorate” them with state management logic.

By doing this, we create a clean separation of concerns. The base ImapMailService remains focused on its primary task of interacting with IMAP, while the new service handles the state persistence. This makes the code more modular, easier to maintain, and more flexible for future development.

\

namespace App\Service;  use App\DTO\DataCollection; use App\DTO\MailMessage; use App\DTO\State\Channel\MailChannelState; use App\DTO\State\ChannelState;  class ImapMailStateService extends ImapMailService {     public function __construct(                                 private ChannelStateService $channelStateService,                                 protected string $host,                                 protected string $username,                                 protected string $password,                                 protected string $mailbox = 'INBOX',     )     {         parent::__construct($this->host, $this->username, $this->password, $this->mailbox);     }      public function fetchEmails(int $limit = 10): DataCollection     {         $collection = new DataCollection();         $connection = $this->connect();          /** @var null|MailChannelState $lastState */         $lastState = $this->channelStateService->getChannelState(new MailChannelState());          $criteria = SORTARRIVAL;          if (($lastState instanceof MailChannelState) && ($lastState->getLastMessageDate() instanceof \DateTimeImmutable)) {             $criteria = ' SINCE "' . $lastState->getLastMessageDate()->format('d-M-Y') . '"';         }          $emails = imap_sort($connection, $criteria, 1, SE_UID);          if (!$emails) {             return $collection;         }          $emails = array_slice($emails, 0, $limit);          $stateMailMessage = null;          foreach ($emails as $emailUid) {             $overview = imap_fetch_overview($connection, $emailUid, FT_UID);             $headers = imap_fetchheader($connection, $emailUid, FT_UID);              if (is_null($stateMailMessage)){                 $stateMailMessage = new MailMessage(                     $overview[0]->subject,                     $overview[0]->from,                     $overview[0]->to,                     null,                     $overview[0]->date,                     $emailUid);             }              if (                 $this->isCurrentMailboxState($lastState)                     &&                 (                     $lastState->getLastMessageKey() === $emailUid                     ||                     $lastState->getLastMessageDate()<=$overview[0]->date                 )             ) {                 continue;             }              if ($this->isTextContentType($headers)) {                 $body = imap_body($connection, $emailUid, FT_UID);                  $mailMessage = new MailMessage(                     $overview[0]->subject,                     $overview[0]->from,                     $overview[0]->to,                     $body,                     $overview[0]->date,                     $emailUid                 );                  $collection->add($mailMessage);             }         }          imap_close($connection);          if ($stateMailMessage instanceof MailMessage) {             $this->saveCurrentState($stateMailMessage);         }          return $collection;     }      protected function saveCurrentState(MailMessage $message, ?ChannelState $currentState = null): void{          if (             $this->isCurrentMailboxState($currentState)             &&             ($currentState->getLastMessageKey() <= $message->getId()             ||             $currentState->getLastMessageDate() <= new \DateTime($message->getDate())             )         ) {             return;         }          $channelState = new MailChannelState(             $message->getId(),             new \DateTimeImmutable($message->getDate()),             $this->username,             $this->mailbox         );          $this->channelStateService->setChannelState($channelState);     }      protected function isCurrentMailboxState(?ChannelState $state = null): bool{         return $state instanceof MailChannelState             &&             $state->getEmail()===$this->getUsername()             &&             $state->getFolder()===$this->getMailbox();     } } } 

\

services:     ...     App\Service\ImapMailStateService:         arguments:             $host: '%env(IMAP_HOST)%'             $username: '%env(IMAP_USERNAME)%'             $password: '%env(IMAP_PASSWORD)%' 

\ With the core logic for state management and future scalability in place, we can now move on to the next crucial step: scheduling our application to check for new messages. Since we don’t have push notifications or incoming webhooks yet, we’ll need to initiate the checks ourselves.

Introducing the Symfony Scheduler Component

It provides a robust, built-in solution for running tasks at specific intervals. It is far more reliable and versatile than a simple crontab and integrates seamlessly with Symfony’s Messenger and dependency injection systems.

Using the Scheduler, we can:

  • Define our task: We’ll create a task that runs our ImapMailStateService to check for new emails.
  • Set a schedule: We can specify exactly when and how often this task should run (e.g., every minute, every 5 minutes, or on a specific day of the week).
  • Decouple the process: The Scheduler will handle the execution of our task as a background process, ensuring our main application remains responsive.

Let’s install the Symfony Scheduler Component and its dependencies. We can do this easily using Composer. Run the following command in your terminal:

\

composer require symfony/scheduler 

\ By implementing the Scheduler, we’re taking a significant step toward making our application truly autonomous, as it will now be able to poll for new messages without any manual intervention.

Scheduler cycle in symfony

Cron Expression Triggers

We’ll use cron expressions to define our schedule. They provide a powerful and flexible way to specify recurring time intervals. Before we can use them, we need to install the necessary dependency.

\

composer require dragonmantank/cron-expression 

\ This command will install the component, giving us the tools to define complex schedules for our application’s tasks, such as polling for new emails every minute.

Creating the Scheduler Message

Now, let’s create the message that the scheduler will use to trigger our email-fetching process. This is a crucial step in connecting our scheduler to our application’s business logic.

We’ll create a simple class named CheckInboundChannelsMessage. This message class doesn’t need to contain any data; its sole purpose is to serve as a signal for our message bus that a scheduled task needs to be executed.

Here’s the code for the message:

\

namespace App\Message\Schedule;  class CheckInboundChannelsMessage {  } 

\ This simple class acts as the “trigger” for our automated process. In the next steps, we’ll configure the Symfony Scheduler to dispatch this message at a specific interval CheckInboundChannelsProvider.php (e.g., using a cron expression). We will then create a handler CheckInboundChannelsMessageHandler.php that “listens” for this message and executes our ImapMailStateService to fetch new mail, completing the automated workflow.

\

namespace App\Scheduler;  use App\Message\Schedule\CheckInboundChannelsMessage; use Symfony\Component\Scheduler\Attribute\AsSchedule; use Symfony\Component\Scheduler\RecurringMessage; use Symfony\Component\Scheduler\ScheduleProviderInterface; use Symfony\Component\Scheduler\Schedule;  #[AsSchedule('default')] class CheckInboundChannelsProvider implements ScheduleProviderInterface {     public function getSchedule(): Schedule     {         return (new Schedule())             ->add(             // This task will be triggered every day by UTC                 RecurringMessage::cron('@daily', new CheckInboundChannelsMessage(),new \DateTimeZone('UTC'))             );     } } 

\

namespace App\Handler\Schedule;  use App\Message\Command\AIAgentSummarizeMessage; use App\Message\Schedule\CheckInboundChannelsMessage; use App\Service\ImapMailService; use Symfony\Component\Messenger\Attribute\AsMessageHandler; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface;  #[AsMessageHandler] readonly class CheckInboundChannelsMessageHandler {     public function __construct(         private ImapMailStateService $imapMailStateService,         private MessageBusInterface $messageBus,     )     {     }      public function __invoke(CheckInboundChannelsMessage $checkInboundChannelsMessage){         $emailCollection = $this->imapMailStateService->fetchEmails(2);          $this->messageBus->dispatch(             new Envelope(                 new AIAgentSummarizeMessage(                     $emailCollection,                     'I have emails. Please summarize them into a concise overview (100-150 words) focusing on key decisions, action items, and deadlines. Use bullet points to organize the summary by email or theme, whichever is clearer. Here’s the email content:'                 )             )         );     } } 

\ Now that all the necessary components are in place, we can launch our Symfony Messenger consumer. This single command will start the engine for our entire automated system.

Running the Consumer

The consumer will listen for messages from all configured transports, including a new one called schedulerdefault. This schedulerdefault transport is created automatically by Symfony thanks to the #[AsSchedule] attribute, which tells the framework to set up a dedicated pipeline for scheduled messages.

To start the consumer and have it listen to our scheduled tasks, simply run the following command in your terminal:

\

./bin/console messenger:consume --all -vv 

\ Once this consumer is running, it will automatically receive and process the CheckInboundChannelsMessage every time the scheduler dispatches it according to your cron expression.

Your application is now fully autonomous, capable of checking for new messages on its own without any manual intervention.

Conclusion

By building on our asynchronous foundation, we have successfully transformed our AI agent from a manually-triggered tool into a fully autonomous, self-managing application.

By using Redis and a dedicated Symfony cache pool, we created a reliable system for storing the last successful operation state. This method ensures that our application can be deployed in a stateless manner, and that multiple instances can run in parallel without the risk of data loss or file-locking conflicts.

By leveraging Symfony Scheduler and Symfony Messenger components, we were able to detach the message-gathering process from manual triggers.

Our application now operates completely on its own, maximizing hardware utilization and ensuring that no incoming message is ever missed. This robust, event-driven architecture is a powerful blueprint for building intelligent, scalable, and fully autonomous applications.

All that’s left is to containerize our application into a Docker image and launch as many instances as we need, but we’ll cover that in future articles.

Stay tuned — and let’s keep the conversation going.

Market Opportunity
null Logo
null Price(null)
--
----
USD
null (null) Live Price Chart
Disclaimer: The articles reposted on this site are sourced from public platforms and are provided for informational purposes only. They do not necessarily reflect the views of MEXC. All rights remain with the original authors. If you believe any content infringes on third-party rights, please contact service@support.mexc.com for removal. MEXC makes no guarantees regarding the accuracy, completeness, or timeliness of the content and is not responsible for any actions taken based on the information provided. The content does not constitute financial, legal, or other professional advice, nor should it be considered a recommendation or endorsement by MEXC.

You May Also Like

Why The Green Bay Packers Must Take The Cleveland Browns Seriously — As Hard As That Might Be

Why The Green Bay Packers Must Take The Cleveland Browns Seriously — As Hard As That Might Be

The post Why The Green Bay Packers Must Take The Cleveland Browns Seriously — As Hard As That Might Be appeared on BitcoinEthereumNews.com. Jordan Love and the Green Bay Packers are off to a 2-0 start. Getty Images The Green Bay Packers are, once again, one of the NFL’s better teams. The Cleveland Browns are, once again, one of the league’s doormats. It’s why unbeaten Green Bay (2-0) is a 8-point favorite at winless Cleveland (0-2) Sunday according to betmgm.com. The money line is also Green Bay -500. Most expect this to be a Packers’ rout, and it very well could be. But Green Bay knows taking anyone in this league for granted can prove costly. “I think if you look at their roster, the paper, who they have on that team, what they can do, they got a lot of talent and things can turn around quickly for them,” Packers safety Xavier McKinney said. “We just got to kind of keep that in mind and know we not just walking into something and they just going to lay down. That’s not what they going to do.” The Browns certainly haven’t laid down on defense. Far from. Cleveland is allowing an NFL-best 191.5 yards per game. The Browns gave up 141 yards to Cincinnati in Week 1, including just seven in the second half, but still lost, 17-16. Cleveland has given up an NFL-best 45.5 rushing yards per game and just 2.1 rushing yards per attempt. “The biggest thing is our defensive line is much, much improved over last year and I think we’ve got back to our personality,” defensive coordinator Jim Schwartz said recently. “When we play our best, our D-line leads us there as our engine.” The Browns rank third in the league in passing defense, allowing just 146.0 yards per game. Cleveland has also gone 30 straight games without allowing a 300-yard passer, the longest active streak in the NFL.…
Share
BitcoinEthereumNews2025/09/18 00:41
Academic Publishing and Fairness: A Game-Theoretic Model of Peer-Review Bias

Academic Publishing and Fairness: A Game-Theoretic Model of Peer-Review Bias

Exploring how biases in the peer-review system impact researchers' choices, showing how principles of fairness relate to the production of scientific knowledge based on topic importance and hardness.
Share
Hackernoon2025/09/17 23:15
The Role of Reference Points in Achieving Equilibrium Efficiency in Fair and Socially Just Economies

The Role of Reference Points in Achieving Equilibrium Efficiency in Fair and Socially Just Economies

This article explores how a simple change in the reference point can achieve a Pareto-efficient equilibrium in both free and fair economies and those with social justice.
Share
Hackernoon2025/09/17 22:30