Required dependencies
You must first add the following JAR files located in <DATADIR>/javabin/plugin
to your project:
interconnector-service-java-framework.jar
datainteg-java-commons-queue.jar
Required dependenciesYou must first add the following JAR files located in
Master connector sample codeTo allow connection between the connectors and the Interconnecter server, you must first check that an Interconnector server has been deployed in the Administration Console (Deployment > Roles). For more details, see Configure the Interconnector Server in Exalead CloudView Connectors Guide. You must also add two configuration keys to your connector:
//Master connector //While processing a column containing a path, adds a File System Query (FS Query = a document path) to the message bus //Instantiation of the Interconnector Service InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder(); builder.withDestination(config.slaveConnector); //a configuration key has been added to the connector, to know the //name of the slave connector builder.withQuerySerializer(new FileSystemQuerySerializer()); //the file system query serializer (to xml) supplied by //the JDBC connector builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName); //a configuration key has been //added to the connector, to know the interconnector server instance name the query will be sent to InterConnectorServiceImpl service = builder.build(); //this is time consuming, the service should be instantiated //only once per application (as a Singleton) //End of the instantiation //Creation of the File System Query FileSystemQuery fileSystemQuery = new FileSystemQuery(); fileSystemQuery.setPath(filePath); //Calls to the service to delete and add a query service.deleteQuery(docURI.toString()); //clear the query before adding the new one service.addQuery(fileSystemQuery, false, true, docURI.toString()); //docURI is the URI of the JDBC document that is currently processed //Creation of the parent document in the Consolation Box, with type "aggregated" PushAPITransformationHelpers.addArcTo(document, "parent", docURI.toString() + "_REL"); PushAPITransformationHelpers.setType(document, "aggregated"); //Don't forget to close the service when all the processing is done service.closeService(); Slave connector sample codeBelow is a sample code for your slave connector (File System here). //Slave connector //Enumerates the watched queries InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder(); builder.withReceiverName(key.connector.getConnectorName()); //the receiver is the connector itself builder.withQuerySerializer(new FileSystemQuerySerializer()); builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName); InterConnectorServiceImpl service = builder.build(); //this is time consuming, the service should be instantiated only //once per application (as a Singleton) service.pollMessageQueue(); Iterable<ImmutablePair<String, UserPayloadWithUri<String, String>>> tripletIterable = service.getQueries(); Iterator<ImmutablePair<String, UserPayloadWithUri<String, String>>> iteratorQueries = tripletIterable.iterator(); if (iteratorQueries != null && iteratorQueries.hasNext()){ try { final ImmutablePair<String, UserPayloadWithUri<String, String>> triplet = iteratorQueries.next(); UserPayloadWithUri<String, String> queryAndFlags = triplet.getRight(); Query query = service.getSerializer().deserialize(queryAndFlags.getValue()); String checkpoint = triplet.getLeft(); String filepath = query.getUID(); ... FilesystemKey skey = new FilesystemKey(key.connector, filepath, connectorconfig.createFileFromRootPath (filesystemRootPathConfig), false, true); try { service.notifyEndOfQueryJob(checkpoint); } catch (Exception e){ logger.warn("Error while notifying end of query job to storage"); } return (FSKey) skey; } catch (Exception e){ logger.error("Error while adding a root key ",e); return null; } } //Processes a watched query, i.e. a file system path in this connector //Adding a "parent_uri" meta to link the indexed file system document to the indexed JDBC document try { ArrayList<String> listParentURIs = service.getParentURIFromUID(file.getAbsolutePath()); if (listParentURIs != null && !listParentURIs.isEmpty()){ for (String parentURI : listParentURIs){ collect.addMeta("parent_uri", parentURI); } } service.closeService(); } catch (Exception e ){ logger.debug("Error retrieving parent URI while building PAPI document "+ absolutePath, e); } //Processes the "parent_uri" metas to create arcs and documents in the consolidation box Collection<Meta> parents_meta = doc.getMetaContainer().getMetaValues("parent_uri"); if (parents_meta != null && !parents_meta.isEmpty()){ Iterator<Meta> iterator = parents_meta.iterator(); while (iterator.hasNext()){ String uri = iterator.next().getValue(); // creating the "relation" intermediate document in the consolidation box, then link it to the child document PushAPITransformationHelpers.createUnmanagedDocument(doc, uri + "_REL", "relation"); PushAPITransformationHelpers.addArcFrom(doc, "rel", uri + "_REL"); } PushAPITransformationHelpers.setType(doc, "child"); } //Enumerates and processes the deleted queries Iterable<ImmutablePair<String, UserPayload<String, String>>> deleteIterable = service.getDeletedQueries(); Iterator<ImmutablePair<String, UserPayload<String, String>>> iteratorDeletedQueries = tripletIterable.iterator(); while (iteratorDeletedQueries != null && iteratorDeletedQueries.hasNext()){ final ImmutablePair<String, UserPayloadWithUri<String, String>> triplet = iteratorDeletedQueries.next(); UserPayloadWithUri<String, String> queryAndFlags = triplet.getRight(); Query query = service.getSerializer().deserialize(queryAndFlags.getValue()); ; String filepath = query.getUID(); papi.deleteDocument(filepath); } Interconnector aggregation processorYou must now configure the Interconnector aggregation processor in the Administration Console with the appropriate document types and arcs defined in your code. For more details, see Add an Interconnector Aggregation Processor in Exalead CloudView Connectors Guide. You can scan your master connector, then your slave connector. |