Interfaces of the plugins for OpenData import

This article describes the basic interfaces of the plugins importing open data.

Terms

Job is the the sequence of actions for loading an array of open data through the http protocol, including their processing and loading into Geo2Tag service’s DB. 

Import is transfer data from an external DB into DB of a Geo2Tag service.

Job status is a set of parameters describing current status of a job: time spent, source of import, the flag of completion of the job, other import options.

REST interfaces

  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job GET – list of the jobs of import and its statuses.
  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job POST – add a job of import, parameters:
    • channelName, – the name of the channel where importing data will be performed; 
    • openDataUrl,  – link to downloading the set of data;
  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> GET – the status of the job.
  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> DELETE – stop the job.

Software interfaces

Classes

Legend:

  • @should_be_extended_in_descendents – method/the property should be extended/override in the derived class
  • @abstract_method – an abstract method which should be defined in the derived class

Job (abstract class)

class Job():

    def __init__(
            self,
            backgroundFunction,
            channelName,
            openDataUrl,
            importDataDict,
            serviceName):
        self.thread = None
        self._id = self.generateId()
        self.startTime = datetime.now()
        self.done = False
        self.timeElapsed = None
        self.backgroundFunction = backgroundFunction
        self.channelName = channelName
        self.openDataUrl = openDataUrl
        self.importDataDict = importDataDict
        self.serviceName = serviceName

    @abstract_method
    def internalStart(self):
    @abstract_method
    def internalStop(self):

    def start(self):
        self.startTime = datetime.now()
        self.internalStart()

    def stop(self):
        self.internalStop()
        self.done = True
        self.timeElapsed = datetime.now() - self.startTime

    def getTimeStatistics(self):
        if self.timeElapsed is None:
            return datetime.now() - self.startTime
        return self.timeElapsed

    @should_be_extended_in_descendents
    def describe(self):
        return {
            '_id': self._id,
            'time': str(
                self.getTimeStatistics()),
            'done': self.done,
            'channelName': self.channelName,
            'openDataUrl': self.openDataUrl,
            'serviceName': self.serviceName}

    @classmethod
    def generateId(cls):
        return ''.join(
            random.choice(
                string.ascii_uppercase +
                string.ascii_lowercase +
                string.digits) for x in range(12))

OpenDataObjectsLoader (abstract class)

class OpenDataObjectsLoader:
    def __init__(self, loadUrl):
        self.loadUrl = loadUrl
    @abstract_method
    def load(self):     

OpenDataToPointTranslator

class OpenDataToPointTranslator:
    def __init__(
            self, importDataDict,
            objectRepresentation,
            version,
            importSource,
            channelId):
        self.objectRepresentation = objectRepresentation
        self.version = version
        self.importSource = importSource
        self.channelId = channelId
        self.importDataDict = importDataDict
    @should_be_extended_in_descendents
    def getPointJson(self):
        obj = {}
        obj['version'] = self.version
        obj['import_source'] = self.importSource
        return obj
    @should_be_extended_in_descendents
    def getPoint(self):
        point = {'json': self.getPointJson()}
        point['channel_id'] = self.channelId
        return point

OpenDataObjectsParser (abstract class)

class OpenDataObjectsParser:
    def __init__(self, data):
        self.data = data
    @abstract_method
    def parse(self):

OpenDataToPointsLoader

class OpenKareliaDataToPointsLoader:
    pointsArray = []

    def __init__(self, serviceName, points):
        self.pointsArray = points
        self.serviceName = serviceName
    def loadPoints(self):
        collection = getDbObject(self.serviceName)[POINTS]
        for point in self.pointsArray:
            collection.save(point)

JobManager – class for managing jobs, starts and stops jobs, displays information about their status.

class JobManager:
    jobs = {}
    @classmethod
    def startJob(cls, job):
        jobId = job.describe().get('_id', '')
        job.start()
        cls.jobs[jobId] = job
        return jobId
    @classmethod
    def getJob(cls, jobId):
        return cls.jobs.get(jobId).describe()
    @classmethod
    def stopJob(cls, jobId):
        cls.jobs.get(jobId).stop()
    @classmethod
    def getJobs(cls):
        result = []
        for job in cls.jobs:
            result.append(cls.jobs[job].describe())
        return result

JobResource

class JobResource(Resource):
    @possibleException
    def get(self, serviceName, jobId):
        return JobManager.getJob(jobId)
    @possibleException
    def delete(self, serviceName, jobId):
        return JobManager.stopJob(jobId)

ODImportParser

class ODImportParser():  
    @should_be_extended_in_descendents
     MANDATORY_FIELDS = [CHANNEL_NAME, OPEN_DATA_URL]
    @staticmethod
    def parse(self):
        args = loads(request.get_data())
        return args
    @staticmethod
    def validate(self, args):    
       for key in MANDATORY_FIELDS:
            if key not in args:
                raise BadRequest('{0} parameter is missing'.format(key))
            elif not isinstance(args[key], unicode):
                raise BadRequest('{0} value is not unicode'.format(key)) 
    @staticmethod
    def parsePostParameters():
        args = self.parse()
        self.validate(args)
        return args

JobListResourceFactory – factory method to create JobListResource classes which we need.

def JobListResourceFactory(parserClass, jobClass, importFunction)
    class JobListResource(Resource):
        @possibleException
        def get(self, serviceName):
            getServiceIdByName(serviceName)
            return JobManager.getJobs()
        @possibleException
        def post(self, serviceName):
            importDataDict = parserClass.parsePostParameters()
            channelName = importDataDict.get('channelName')
            getServiceIdByName(serviceName)
            getChannelByName(serviceName, channelName)
            job = jobClass(importFunction, importDataDict.get('channelName'),
                           importDataDict.get('openDataUrl'), importDataDict,
                           serviceName)
        return JobManager.startJob(job)
    return JobListResource

Method performImportActions

performImportActions ( odLoaderClass, odParserClass, odToPointTranslatorClass, odToPointsLoaderClass, serviceName, channelName, openDataUrl, showObjectUrl, showImageUrl)

A function which contains the logic of importing from source of open data.

def performImportActions ( odLoaderClass, odParserClass, \
    odToPointTranslatorClass, odToPointsLoaderClass, \
    serviceName, channelName, openDataUrl, \
    importDataDict):
#showObjectUrl, showImageUrl
    channelId = getChannelIdByName(channelName)
    version = datetime.now()
    loader = odLoaderClass(openDataUrl)
    openData = loader.load()
    parser = odParserClass(openData)
    objects = parser.parse()
    points = [ ]
    for object in objects:
        translator = odToPointTranslatorClass(importDataDict, object, version,openDataUrl, channelId)
        points.append(translator.getPoint())
    pointsLoader = odToPointsLoaderClass(serviceName, points)
    pointsLoader.loadPoints()

 

 

ShareShare on FacebookShare on Google+Tweet about this on TwitterShare on LinkedInShare on VKEmail this to someone