Job Orchestrator

Job Orchestrator for batches
(and bonus: how to call apex from Lightning layout button)

Інколи доводиться працювати з навантаженими організаціями, з мільйонами рекордів та тисячами юзерів. При таких масштабах більшість обробок рекордів реалізується за допомогою батчів або виноситься на кастомний бек-енд через величезний список SF лімітів.

У цій статті я наведу приклад того, як реалізувати чергу батчів, щоб побороти “Maximum number of batch Apex jobs in the Apex flex queue that are in Holding status” ліміт. Що це за ліміт і як саме він буде нам заважати? Покажу на прикладі батча для відправки імейлів.

В нас є наступна модель даних:

  • Marketing Campaign - рекламна кампанія, спрямована на відправку рекламних листів для контактів.
  • Campaign member - junction об’єкт, створений для додавання контактів до кампанії.

Далі скріншоти як це виглядає на прикладі рекордів.

На лайауті Marketing Campaign маємо кастомний екшн для виклику Apex методу, який приймає Campaign Id та викликає батч для відправки імейл темплейта для усіх Campaign Members даної Campaign.

Для цього прикладу я не реалізовував масове додавання Campaign Members та вибір імейл темлейту для відправки. Тому маємо лише декілька контактів та захардкожений Id темплейту.

Реалізація кнопки:

Реалізація Aura компонента:

<aura:component implements="force:lightningQuickActionWithoutHeader,force:hasRecordId"
controller="SendEmailForCampaignButtonCtrl">

<aura:html tag="style">

  .cuf-content {
    padding: 0 0rem !important;
  }

  .slds-p-around--medium {
    padding: 0rem !important;
  }

  .slds-modal__content{
    overflow-y:hidden !important;
    height:unset !important;
    max-height:unset !important;
  }

  .slds-fade-in-open {
    visibility: hidden !important;
  }

  .slds-spinner_container {
    background-color: transparent !important;
  }

  .closeIcon {
    display:none !important;
  }

</aura:html>

  <aura:handler name="init" value="{!this}" action="{!c.doInit}"/>
  <aura:attribute name="recordId" type="String" />
  <lightning:notificationsLibrary aura:id="notify"/>
  <lightning:spinner variant="inverse" size="medium" />

</aura:component>

({

  doInit : function(component, helper) {
    let recordId = component.get('v.recordId');
    var action = component.get("c.sendEmailsForCampaign");
    action.setParams({ 'campaignId' : recordId });

    action.setCallback(this, function(response) {
      $A.get("e.force:closeQuickAction").fire();
      component.find('notify').showToast({
        "variant": "success",
        "title": "Success",
        "message": "Emails will be sent soon!"
      });
    });

    $A.enqueueAction(action);
  }

})

Реалізація Apex контроллера:

public class SendEmailForCampaignButtonCtrl {
  @AuraEnabled
  public static void sendEmailsForCampaign(String campaignId){
    Database.executeBatch(new SendCampaignEmails(campaignId, '00X7Q000002eyEHUAY'), 10);
  }
}

Реалізація батча:

public class SendCampaignEmails implements Database.Batchable<sObject> {

  Id campaignId;
  Id emailTemplateId;

  public SendCampaignEmails(Id campaignId, Id emailTemplateId){
    this.campaignId = campaignId;
    this.emailTemplateId = emailTemplateId;
  }

  public Database.QueryLocator start(Database.BatchableContext bc) {
    String query = 'SELECT Contact__r.Email FROM Campaign_Member__c WHERE Marketing_Campaign__c = \'' + this.campaignId + '\''; //'comment to fix forum styling

    return Database.getQueryLocator(query);
  }

  public void execute(Database.BatchableContext bc, List<Campaign_Member__c> campaignMembers){
    for(Campaign_Member__c campaignMember : campaignMembers){
      // send emails for campaign members using template and contac email
    }
  }

  public void finish(Database.BatchableContext bc){}
}

Тепер уявiмо, що ми маємо сотні кампаній щомісяца, з десятками тисячами контактів, та десятками маркетінг менеджерів. Що відбудеться, якщо 100 юзерів одночасно натиснуть на “Send email” для кампаній, за які вони відповідальні? В один момент часу лише “5 batch Apex jobs can be queued or active concurrently”, усі інші зберігаються “in the Apex flex queue in Holding status”, і ліміт цієї черги є 100 джоб. То маємо просту відповідь — все буде норм :slightly_smiling_face:. Але ми майже вичерпали лімит flex queue.
Якщо згадати, що ми можемо мати справу з навантаженною органцізацією і таких фічей з використанням батчей може буде десятки, не все так радужно і виглядає :slightly_frowning_face:. При спробі додати 101 джобу в flex queue ви отримаєте помилку “You’ve exceeded the limit of 100 jobs in the flex queue”.

Звичайно, ми можемо перевірити, яка поточна завантаженість flex queue та повідомити юзера, щось на кшталт “спробуй пізніше, зараз зовсім ніяк”. Але є і інший підхід. Ми можемо реалізувату кастомну чергу реквестів на запуск батчів. Будемо мати шедулер, який буде брати пулл цих реквестів та запускати батчи, якщо в нас є місце в черзі. Якщо черга заповнена, шедулер запустить батчі наступного разу. Таким чином ми виключаємо фактор “пощастило/не пощастило” для юзера, який натискає кнопку. Юзеру не потрібно буде через деякий час відкривати знов цю кампанію, натискати кнопку, та сподіватися, що цього разу місце в черзі є. Натомість ми збережемо його реквест на запуск батчу, і цей батч відпрацює “коли буде сприятливий для цього час”.

Таким чином маємо наступні задачі:

  • Створення кастомного об’єкта для додавання реквестів у чергу
  • Адаптація чинного контролера для кнопки
  • Створення шедулеру
  • Адаптація поточного батчу

Наша черга буде являти собою упорядковані рекорди об’єкта Job_request__c. Працюючи з рекордами цього об’єкта будемо сортувати їх за датою створення, починаючи обробку з найстаріших рекордів. Також нам знадобляться наступні поля:

Поле Тип Опис
Job_Parameters__c Long Text Area(131072) Серіалізовані параметри для батча. В нашому випадку це буде ID темплейта та ID кампанії. Якщо для усіх параметрів не вистачає розмірів поля, можна створити більше полів або створити дочірній об’єкт на кшталт Job_Parameters_Part__с та зберігати частини JSON у пов’язаних записах.
Job_Type__c Picklist Шедулер буде використовувати Type для десеріалізації параметрів та виклику потрібного батча.
Job_Status__c Picklist Статус реквеста. Необхідний для коректного опрацьовування реквестів, для репортінгу, та можливого сценарію обробки помилок.
Status_Info__c Long Text Area(32768) Опціональне поле для опису помилки або будь-якою іншої необхідною інформацією для репортінгу.

Тепер до нашого контролера, ми більше не хочемо викликати батч напряму. Перепишемо sendEmailsForCampaign метод:

public class SendEmailForCampaignButtonCtrl {
  @AuraEnabled
  public static void sendEmailsForCampaign(String campaignId){
    SendCampaignEmails.CampaignEmailsJob params = new SendCampaignEmails.CampaignEmailsJob();
    params.campaignId = campaignId;
    params.emailTemplateId = '00X7Q000002eyEHUAY';

    insert new Job_request__c(
      Job_Type__c = JobOrchestrator.MARKETING_EMAIL_JOB,
      Job_Parameters__c = JSON.serialize(params),
      Job_Status__c = 'New'
    );
  }
}

У оновленій версії методу використовуємо новий inner класс CampaignEmailsJob у классі батчу (реалізація буде нижче). Цей класс має два параметри, необхідні для валідною роботи батча, які ми сералізуємо і зберігаємо як JSON у Job_Parameters__c. Сам Job_request збергіаємо як MARKETING_EMAIL_JOB (константа) зі статусом “New”.

Для обробки реквестів необхідно створити новий шедулер:

public class JobOrchestrator implements Schedulable {
  public static final String MARKETING_EMAIL_JOB = 'Marketing Email Sending';
  private static final Integer JOBS_IN_QUEUE_THRESHOLD = 50;
  //put this number in CMTD, CS or Label to easily configure and manage if needed

  public void execute(SchedulableContext context){
    orchestrate();
  }

  public static void orchestrate(){
    List<Job_request__c> jobRequests = [
      SELECT Job_type__c, Job_parameters__c
      FROM Job_request__c
      WHERE Job_Status__c = 'New'
      ORDER BY CreatedDate ASC
      LIMIT :JOBS_IN_QUEUE_THRESHOLD
    ];

    for(Job_request__c jobRequest : jobRequests){
      Integer numberOfJobsInTheQueue = getNumberOfJobsInTheQueue();
      //yes, line above is SOQL in FOR loop
      //but we want to receive up-to-date number each time we run a new batch
      //and we know that jobRequests.size() is limited to 50

      if(numberOfJobsInTheQueue < JOBS_IN_QUEUE_THRESHOLD){
        executeJob(jobRequest);
      }
    }

    update jobRequests;
  }

  private static void executeJob(Job_request__c jobRequest){
    try{
      if(jobRequest.Job_type__c == MARKETING_EMAIL_JOB){
        SendCampaignEmails.CampaignEmailsJob params = (SendCampaignEmails.CampaignEmailsJob)
        JSON.deserialize(jobRequest.Job_parameters__c, SendCampaignEmails.CampaignEmailsJob.class);
        params.jobRequestId = jobRequest.Id;

        Database.executeBatch(new SendCampaignEmails(params), 10);
      }
      /*
      else if (jobRequest.Job_type__c == another job type you need)){
        deserialize parameter to class with parameter for another batch
        execute another batch with needed parameters
      }
      */

      jobRequest.Job_Status__c = 'Processing';
    } catch (Exception e){
      jobRequest.Job_Status__c = 'Failed to run';
      jobRequest.Status_Info__c = e.getLineNumber() + ':' + e.getStackTraceString();
    }
  }

  private static Integer getNumberOfJobsInTheQueue(){
    return [
      SELECT COUNT()
      FROM AsyncApexJob
      WHERE JobType = 'BatchApex' AND Status IN ('Processing','Preparing','Queued','Holding')
    ];
  }
}

MARKETING_EMAIL_JOB — константа, підтримуваний тип реквеста. Чим більше різних батчів будемо додавати в оркестратор, тим більше таких констант будемо мати.

JOBS_IN_QUEUE_THRESHOLD — константа, максимальна кількість реквестів, якI ми можемо обробити з один запуск шедулера. Можна зберігати як в коді, так і як метадату. Максимальне значення 100 (як раз той самий SF ліміт), але я не рекомендую ставити більше ніж 80. Залиште трошки місця для батчів, які ще не оброблюються оркестратором. Також можемо зберігати цей параметр як поле в Job_request__c рекорді, якщо треба мати різний treshhold для різних типів реквестів.

Метод orchestrate — основний метод цього класу. Він є публічним, щоб мати змогу викликати його окремо, якщо треба: дев консоль, кнопка, обробник помилок тощо. Для останнього можна передавати Job_Status__c як параметр в метод, а в самому шедулері завжди передавати статус “New”. Загалом, метод робить запит на реквести зі статусом “New”, перевіряє чи є змога додати нову джобу у чергу, і якщо так, то викликається метод executeJob.

Метод executeJob — саме тут відбувається перевірка типу реквесту, десеріалізація параметрів та виклик необхідного батчу. З кожним новим батчем метод буде розширюватися. Не забуваємо змінити статус реквеста, щоб він не викликався наступного виклику шедулера.

Сам батч також підлягає змінам:

public class SendCampaignEmails implements Database.Batchable<sObject> {

  CampaignEmailsJob params;

  public SendCampaignEmails(CampaignEmailsJob params){
    this.params = params;
  }

  public Database.QueryLocator start(Database.BatchableContext bc) {
    String query = 'SELECT Contact__r.Email FROM Campaign_Member__c WHERE Marketing_Campaign__c = \'' + this.params.campaignId + '\'';  //'comment to fix forum styling
    return Database.getQueryLocator(query);
  }

  public void execute(Database.BatchableContext bc, List<Campaign_Member__c> campaignMembers){
    String emailTempalteId = this.params.emailTemplateId;
    for(Campaign_Member__c campaignMember : campaignMembers){
      // send emails for campaign members using template and contact's email
    }
  }

  public void finish(Database.BatchableContext bc){
    AsyncApexJob batchJob = [
      SELECT Status
      FROM AsyncApexJob WHERE Id = :bc.getJobId()
    ];

    update new Job_request__c(
      Id = this.params.jobRequestId,
      Job_Status__c = batchJob.Status == 'Failed' ? 'Finished with an error' : 'Finished'
    );
  }

    public class CampaignEmailsJob{
      public String jobRequestId;
      public String campaignId;
      public String emailTemplateId;
  }
}

Відтепер класс приймає CampaignEmailsJob params у конструкторі. Cаме ці параметри використовуються у методах start та execute. Також використовуємо finish метод для встановлення фінального статусу реквеста. Бажано мати репорт для “'Failed” та “Failed to run” реквестів, та аналізувати помилки.

У підсумку маємо рішення, яке допоможе контролювати навантаженість організації у питанні роботи батчів.

P.S. В Salesforce вже є стандартні Campaign і CampaignMember об’єкти та Mass Email функціональність, у реальному проекті спочатку спробуйте стандартний підхід. Ціль статі - показати реалізацію черги батчів, не масову відправку імейлів для контактів.

P.P.S. Весь код та метадату можна знайти тут, велкам.

5 Вподобань