Wednesday, October 11, 2017

Salesforce Streaming API Summary

Was just working on streaming API and just putting thoughts together with different variations of streaming API :


  1. Push Topic
  2. Generic Topic
  3. Platform Event 

Even though underneath the cover, they all use same technology stack, they provide very different features.


Push TopicGeneric TopicPlatform Events
OverviewPush topic is used for SOQL based subscriptionGeneric Topic is used to subscribe and publish arbitrary events
Platform Events is used for structured publish and subscribe

There is a lot more native support for both publish and subscribe

Full control over structure (payload) of the event

Replay : 

When client disconnect and reconnect again, they can replay from last 24 hours with id where they left from
special ids : -1 from the beginning, -2 : all new events

Supported (version 36.0 +)Supported (version 36.0 +)Supported (version 36.0 +)
Create (Setup)
1) Using Apex Insert Statements

or 

2) workbench 
- it defaults all the param except SOQL query

Using Streaming Channel Tab

or

2) Workbench
Create __e object
Support for Trigger SubscriptionNoNoYes
How to Publishupon SOQLUsing Rest API
EventBus.publish;

API : Post like sobject /services/data/v41.0/sobjects/Low_Ink__e/ 

Process Builder

Flow
How to Subscribe
workbench (36.0) (later are causing problems)

using Java (cometd lib)

Using Javascript (cometd lib)
workbench

using Java (cometd lib)

Using Javascript (cometd lib)
using Java (cometd)

Using Javascript (cometd)

Using Visualforce (cometd)

Process Flow

Flow

Trigger

Workbench
Channel Name

This is useful when we subscribe using cometd library
/topic/<>

e.g.
/topic/AccountUpdatePushTopic
/u/<>

e.g.
/u/GenericUpdateTopic
/event/<>

e.g.
/event/UpdateObjectEvent__e


Push Topic


Push Topic Setup


Workbench
we can setup a push topic using workbench, but it doesn't allow granular control over topic configuration.



APEX

  • NotifyForFields can be Referenced (part of query - default), Where (in where condition), All (all changes)
  • Once topic is inserted, we can use PushTopic object to make any update
 PushTopic pushTopic = new PushTopic();  
 pushTopic.Name = 'AccountUpdatePushTopic';  
 pushTopic.Query = 'SELECT Id, Name, AccountNumber from Account';  
 pushTopic.ApiVersion = 40.0;  
 pushTopic.NotifyForOperationCreate = true;  
 pushTopic.NotifyForOperationUpdate = true;  
 pushTopic.NotifyForOperationUndelete = true;  
 pushTopic.NotifyForOperationDelete = true;  
 pushTopic.NotifyForFields = 'Referenced';  
 insert pushTopic;  
   



Push Topic Publish

There is no API support for publish. Any change in the data based on condition (e.g. Notify Operation and fields configured) would fire the event on the topic.


Push Topic Subscribe

Workbench

  • This is quick way to test and also record the channel, as we will need it later for cometd library

Java/JavaScript
covered later


Generic Topic



Generic Topic Setup

  • We must setup generic toipc using salesforce UI
  • There is no apex or workbench support to create generic topic



Generic Topic Publish

Rest API (Workbench)

URL : /services/data/v/sobjects/StreamingChannel//push

We can find streaming channel id using query (SELECT Name, ID FROM StreamingChannel)

e.g. /services/data/v40.0/sobjects/StreamingChannel/0M61I000000TN1FSAW/push

payload : 

{
  "pushEvents": [
      {
          "payload": "Broadcast message to all subscribers",
          "userIds": []
      }
   ]

}



Generic Topic Subscribe

WorkBench


Java/JavaScript:
Covered Later


Platform Events


Platform Event Setup


  • Essentially created two attributes (ObjectName__c and RecordId__c) so that we can publish the event having those attributes


Platform Event Publish

Apex


  • Publish event call doesn't fail, hence we have to go through the results
  • Also publish call doesn't participate in transaction, meaning if transaction has to fail, event will still publish


 List events = new List();  
 UpdateObjectEvent__e event = new UpdateObjectEvent__e(objectName__c='Account', recordId__c='1234');  
 events.add( event );  
 List results = EventBus.publish(events);  
 if( results != null && results.size() > 0 ) {  
   for (Database.SaveResult sr : results) {  
     if (sr.isSuccess()) {  
       System.debug('Successfully published event. ' + results.size() );  
     } else {  
       for(Database.Error err : sr.getErrors()) {  
         System.debug('Error returned: ' + err.getStatusCode() + ' - ' + err.getMessage());  
       }  
     }  
   }  
 } else {  
   System.debug(' Noting is published. ');  
 }  
   


Rest API

Endpoint  : /services/data/v40.0/sobjects/UpdateObjectEvent__e/
Payload    : { "RecordId__c" : "123455634343", "ObjectName__c" : "Account" }



Soap API
Similar to rest api, it is just call to insert into sObject

Process Builder 
not covered, but straightforward

Visual Flow 
not covered, but straightforward


Platform Event Subscribe

Trigger

  • only after insert is supported

 trigger UpdateObjectEventTrigger on UpdateObjectEvent__e (after insert) {  
   System.debug(' UpdateObjectEventTrigger ' );  
     
   for (UpdateObjectEvent__e event : Trigger.New) {  
     System.debug(' : ' + event.RecordId__c + ' ' + event.ObjectName__c );  
   }  
     
 }  

Process Builder 
not covered, but straightforward

Visual Flow 
not covered, but straightforward

Java/JavaScript 
Covered later


Platform Event Debug

  • debug statements in trigger doesn't show up in debug logs, we need to enable them using below







Generic Java Subscriber Client



We need to download and build the EMP connector from salesforce 
  • download : https://github.com/forcedotcom/EMP-Connector
  • unzip, and run "mvn clean install"
  • Use emp-connector-0.0.1-SNAPSHOT-phat.jar in the new project that we are going to create
  • Create a new project (java 1.8) and use below code. 
  • Please note that channel name can be changed as per which topic we are subscribing



 package com.spring.client;  
   
 import com.salesforce.emp.connector.BayeuxParameters;  
 import com.salesforce.emp.connector.EmpConnector;  
 import com.salesforce.emp.connector.TopicSubscription;  
   
 import java.util.Map;  
 import java.util.concurrent.TimeUnit;  
 import java.util.function.Consumer;  
   
 import static com.salesforce.emp.connector.LoginHelper.login;  
   
 public class StreamingClient {  
   
   public static void main(String args[]) throws Exception {  
     long replayFrom = EmpConnector.REPLAY_FROM_EARLIEST;  
     BayeuxParameters params = login("streaming@springsoa.com", "Welcome1");  
     EmpConnector connector = new EmpConnector(params);  
     Consumer<Map<String, Object>> consumer = event -> System.out.println(String.format("Received:\n%s", event));  
     connector.start().get(5, TimeUnit.SECONDS);  
     TopicSubscription subscription = connector.subscribe("/event/UpdateObjectEvent__e", replayFrom, consumer ).get(5, TimeUnit.SECONDS);  
     System.out.println(String.format("Subscribed: %s", subscription));  
     //subscription.cancel();  
     //connector.stop();  
   }  
 }  
   



Generic Javascript Subscriber Client


We can use cometD library to listen to the event, I had to write customer wrapper (cometdCustom.js) to greatly simply the visualforce page. We can also use this in independent HTML page,as long as we can get oauth session id.






Source Code

Java code                       :   https://github.com/c-shah/streaming-java-client    
Salesforce and JS code  :   https://github.com/c-shah/salesforce-streaming

2 comments:

Unknown said...

Chintan, when I try this code,it hangs at login.at post.send in loginhelper

Unknown said...

Am running the code behind the firewall