Customizing Connectors to use the Interconnector Service

You can customize connectors to allow the use of the Interconnector service between them.

This page discusses:

Required dependencies

You must first add the following JAR files located in <DATADIR>/javabin/plugin to your project:

  • interconnector-service-java-framework.jar
  • datainteg-java-commons-queue.jar

Master connector sample code

To allow connection between the connectors and the Interconnecter server, you must first check that an Interconnector server has been deployed in the Administration Console (Deployment > Roles). For more details, see Configure the Interconnector Server in Exalead CloudView Connectors Guide.

You must also add two configuration keys to your connector:

  • the Interconnector server instance name
  • the slave connector name
Below is a sample code for your master connector (JDBC here).

//Master connector

//While processing a column containing a path, adds a File System Query (FS Query = a document path) to the message bus

//Instantiation of the Interconnector Service
InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder();
builder.withDestination(config.slaveConnector);  //a configuration key has been added to the connector, to know the
//name of the slave connector
builder.withQuerySerializer(new FileSystemQuerySerializer()); //the file system query serializer (to xml) supplied by 
//the JDBC connector
builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName); //a configuration key has been 
//added to the connector, to know the interconnector server instance name the query will be sent to
InterConnectorServiceImpl service = builder.build();    //this is time consuming, the service should be instantiated 
//only once per application (as a Singleton)
//End of the instantiation

//Creation of the File System Query
FileSystemQuery fileSystemQuery = new FileSystemQuery();
fileSystemQuery.setPath(filePath);
//Calls to the service to delete and add a query
service.deleteQuery(docURI.toString());  //clear the query before adding the new one
service.addQuery(fileSystemQuery, false, true, docURI.toString());   //docURI is the URI of the JDBC document 
that is currently processed

//Creation of the parent document in the Consolation Box, with type "aggregated"
PushAPITransformationHelpers.addArcTo(document, "parent", docURI.toString() + "_REL");
PushAPITransformationHelpers.setType(document, "aggregated");

//Don't forget to close the service when all the processing is done
service.closeService();

Slave connector sample code

Below is a sample code for your slave connector (File System here).

//Slave connector

//Enumerates the watched queries
 InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder();
 builder.withReceiverName(key.connector.getConnectorName());  //the receiver is the connector itself
 builder.withQuerySerializer(new FileSystemQuerySerializer());
 builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName);
 InterConnectorServiceImpl service = builder.build();  //this is time consuming, the service should be instantiated only
//once per application (as a Singleton)
 service.pollMessageQueue();
 Iterable<ImmutablePair<String, UserPayloadWithUri<String, String>>> tripletIterable = service.getQueries();
 Iterator<ImmutablePair<String, UserPayloadWithUri<String, String>>> iteratorQueries = tripletIterable.iterator();

  if (iteratorQueries != null && iteratorQueries.hasNext()){
     try {
         final ImmutablePair<String, UserPayloadWithUri<String, String>> triplet = iteratorQueries.next();
         UserPayloadWithUri<String, String> queryAndFlags = triplet.getRight();
         Query query = service.getSerializer().deserialize(queryAndFlags.getValue());    
         String checkpoint = triplet.getLeft();
         String filepath = query.getUID();
         ...
         FilesystemKey skey = new FilesystemKey(key.connector, filepath, connectorconfig.createFileFromRootPath
(filesystemRootPathConfig), false, true);
         try {
  service.notifyEndOfQueryJob(checkpoint);
         }
         catch (Exception e){
  logger.warn("Error while notifying end of query job to storage");
         }    
         return (FSKey) skey;
     }
     catch (Exception e){
         logger.error("Error while adding a root key ",e);
         return null;
     }
 }


 //Processes a watched query, i.e. a file system path in this connector
 //Adding a "parent_uri" meta to link the indexed file system document to the indexed JDBC document
 try {
     ArrayList<String> listParentURIs = service.getParentURIFromUID(file.getAbsolutePath());
     if (listParentURIs != null && !listParentURIs.isEmpty()){
         for (String parentURI : listParentURIs){
  collect.addMeta("parent_uri", parentURI);
         }
     }
     service.closeService();
 }
 catch (Exception e ){
     logger.debug("Error retrieving parent URI while building PAPI document "+ absolutePath, e);
 }

 //Processes the "parent_uri" metas to create arcs and documents in the consolidation box
  Collection<Meta> parents_meta = doc.getMetaContainer().getMetaValues("parent_uri");
  if (parents_meta != null &&  !parents_meta.isEmpty()){
      Iterator<Meta> iterator = parents_meta.iterator();
      while (iterator.hasNext()){
          String uri = iterator.next().getValue();    
        // creating the "relation" intermediate document in the consolidation box, then link it to the child document
          PushAPITransformationHelpers.createUnmanagedDocument(doc, uri + "_REL",  "relation");
          PushAPITransformationHelpers.addArcFrom(doc, "rel",  uri + "_REL");
      }
      PushAPITransformationHelpers.setType(doc, "child");
  }

//Enumerates and processes the deleted queries
Iterable<ImmutablePair<String, UserPayload<String, String>>> deleteIterable = service.getDeletedQueries();
Iterator<ImmutablePair<String, UserPayload<String, String>>> iteratorDeletedQueries = tripletIterable.iterator();


while (iteratorDeletedQueries != null && iteratorDeletedQueries.hasNext()){
         final ImmutablePair<String, UserPayloadWithUri<String, String>> triplet = iteratorDeletedQueries.next();
         UserPayloadWithUri<String, String> queryAndFlags = triplet.getRight();
         Query query = service.getSerializer().deserialize(queryAndFlags.getValue());    ;         
         String filepath = query.getUID();
         papi.deleteDocument(filepath);
 }

Interconnector aggregation processor

You must now configure the Interconnector aggregation processor in the Administration Console with the appropriate document types and arcs defined in your code. For more details, see Add an Interconnector Aggregation Processor in Exalead CloudView Connectors Guide.

You can scan your master connector, then your slave connector.