Wednesday, October 11, 2017

Salesforce Streaming API Summary

Was just working on streaming API and just putting thoughts together with different variations of streaming API :

  1. Push Topic
  2. Generic Topic
  3. Platform Event 

Even though underneath the cover, they all use same technology stack, they provide very different features.

Push TopicGeneric TopicPlatform Events
OverviewPush topic is used for SOQL based subscriptionGeneric Topic is used to subscribe and publish arbitrary events
Platform Events is used for structured publish and subscribe

There is a lot more native support for both publish and subscribe

Full control over structure (payload) of the event

Replay : 

When client disconnect and reconnect again, they can replay from last 24 hours with id where they left from
special ids : -1 from the beginning, -2 : all new events

Supported (version 36.0 +)Supported (version 36.0 +)Supported (version 36.0 +)
Create (Setup)
1) Using Apex Insert Statements


2) workbench 
- it defaults all the param except SOQL query

Using Streaming Channel Tab


2) Workbench
Create __e object
Support for Trigger SubscriptionNoNoYes
How to Publishupon SOQLUsing Rest API

API : Post like sobject /services/data/v41.0/sobjects/Low_Ink__e/ 

Process Builder

How to Subscribe
workbench (36.0) (later are causing problems)

using Java (cometd lib)

Using Javascript (cometd lib)

using Java (cometd lib)

Using Javascript (cometd lib)
using Java (cometd)

Using Javascript (cometd)

Using Visualforce (cometd)

Process Flow



Channel Name

This is useful when we subscribe using cometd library




Push Topic

Push Topic Setup

we can setup a push topic using workbench, but it doesn't allow granular control over topic configuration.


  • NotifyForFields can be Referenced (part of query - default), Where (in where condition), All (all changes)
  • Once topic is inserted, we can use PushTopic object to make any update
 PushTopic pushTopic = new PushTopic();  
 pushTopic.Name = 'AccountUpdatePushTopic';  
 pushTopic.Query = 'SELECT Id, Name, AccountNumber from Account';  
 pushTopic.ApiVersion = 40.0;  
 pushTopic.NotifyForOperationCreate = true;  
 pushTopic.NotifyForOperationUpdate = true;  
 pushTopic.NotifyForOperationUndelete = true;  
 pushTopic.NotifyForOperationDelete = true;  
 pushTopic.NotifyForFields = 'Referenced';  
 insert pushTopic;  

Push Topic Publish

There is no API support for publish. Any change in the data based on condition (e.g. Notify Operation and fields configured) would fire the event on the topic.

Push Topic Subscribe


  • This is quick way to test and also record the channel, as we will need it later for cometd library

covered later

Generic Topic

Generic Topic Setup

  • We must setup generic toipc using salesforce UI
  • There is no apex or workbench support to create generic topic

Generic Topic Publish

Rest API (Workbench)

URL : /services/data/v/sobjects/StreamingChannel//push

We can find streaming channel id using query (SELECT Name, ID FROM StreamingChannel)

e.g. /services/data/v40.0/sobjects/StreamingChannel/0M61I000000TN1FSAW/push

payload : 

  "pushEvents": [
          "payload": "Broadcast message to all subscribers",
          "userIds": []


Generic Topic Subscribe


Covered Later

Platform Events

Platform Event Setup

  • Essentially created two attributes (ObjectName__c and RecordId__c) so that we can publish the event having those attributes

Platform Event Publish


  • Publish event call doesn't fail, hence we have to go through the results
  • Also publish call doesn't participate in transaction, meaning if transaction has to fail, event will still publish

 List events = new List();  
 UpdateObjectEvent__e event = new UpdateObjectEvent__e(objectName__c='Account', recordId__c='1234');  
 events.add( event );  
 List results = EventBus.publish(events);  
 if( results != null && results.size() > 0 ) {  
   for (Database.SaveResult sr : results) {  
     if (sr.isSuccess()) {  
       System.debug('Successfully published event. ' + results.size() );  
     } else {  
       for(Database.Error err : sr.getErrors()) {  
         System.debug('Error returned: ' + err.getStatusCode() + ' - ' + err.getMessage());  
 } else {  
   System.debug(' Noting is published. ');  

Rest API

Endpoint  : /services/data/v40.0/sobjects/UpdateObjectEvent__e/
Payload    : { "RecordId__c" : "123455634343", "ObjectName__c" : "Account" }

Soap API
Similar to rest api, it is just call to insert into sObject

Process Builder 
not covered, but straightforward

Visual Flow 
not covered, but straightforward

Platform Event Subscribe


  • only after insert is supported

 trigger UpdateObjectEventTrigger on UpdateObjectEvent__e (after insert) {  
   System.debug(' UpdateObjectEventTrigger ' );  
   for (UpdateObjectEvent__e event : Trigger.New) {  
     System.debug(' : ' + event.RecordId__c + ' ' + event.ObjectName__c );  

Process Builder 
not covered, but straightforward

Visual Flow 
not covered, but straightforward

Covered later

Platform Event Debug

  • debug statements in trigger doesn't show up in debug logs, we need to enable them using below

Generic Java Subscriber Client

We need to download and build the EMP connector from salesforce 
  • download :
  • unzip, and run "mvn clean install"
  • Use emp-connector-0.0.1-SNAPSHOT-phat.jar in the new project that we are going to create
  • Create a new project (java 1.8) and use below code. 
  • Please note that channel name can be changed as per which topic we are subscribing

 package com.spring.client;  
 import com.salesforce.emp.connector.BayeuxParameters;  
 import com.salesforce.emp.connector.EmpConnector;  
 import com.salesforce.emp.connector.TopicSubscription;  
 import java.util.Map;  
 import java.util.concurrent.TimeUnit;  
 import java.util.function.Consumer;  
 import static com.salesforce.emp.connector.LoginHelper.login;  
 public class StreamingClient {  
   public static void main(String args[]) throws Exception {  
     long replayFrom = EmpConnector.REPLAY_FROM_EARLIEST;  
     BayeuxParameters params = login("", "Welcome1");  
     EmpConnector connector = new EmpConnector(params);  
     Consumer<Map<String, Object>> consumer = event -> System.out.println(String.format("Received:\n%s", event));  
     connector.start().get(5, TimeUnit.SECONDS);  
     TopicSubscription subscription = connector.subscribe("/event/UpdateObjectEvent__e", replayFrom, consumer ).get(5, TimeUnit.SECONDS);  
     System.out.println(String.format("Subscribed: %s", subscription));  

Generic Javascript Subscriber Client

We can use cometD library to listen to the event, I had to write customer wrapper (cometdCustom.js) to greatly simply the visualforce page. We can also use this in independent HTML page,as long as we can get oauth session id.

Source Code

Java code                       :    
Salesforce and JS code  :


Unknown said...

Chintan, when I try this code,it hangs at post.send in loginhelper

Unknown said...

Am running the code behind the firewall