This article describes the basic interfaces of the plugins importing open data.
Terms
Job is the the sequence of actions for loading an array of open data through the http protocol, including their processing and loading into Geo2Tag service’s DB.
Import is transfer data from an external DB into DB of a Geo2Tag service.
Job status is a set of parameters describing current status of a job: time spent, source of import, the flag of completion of the job, other import options.
REST interfaces
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job GET – list of the jobs of import and its statuses.
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job POST – add a job of import, parameters:
- channelName, – the name of the channel where importing data will be performed;
- openDataUrl, – link to downloading the set of data;
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> GET – the status of the job.
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> DELETE – stop the job.
Software interfaces
Classes
Legend:
- @should_be_extended_in_descendents – method/the property should be extended/override in the derived class
- @abstract_method – an abstract method which should be defined in the derived class
Job (abstract class)
class
Job():
def
__init__(
self
,
backgroundFunction,
channelName,
openDataUrl,
importDataDict,
serviceName):
self
.thread
=
None
self
._id
=
self
.generateId()
self
.startTime
=
datetime.now()
self
.done
=
False
self
.timeElapsed
=
None
self
.backgroundFunction
=
backgroundFunction
self
.channelName
=
channelName
self
.openDataUrl
=
openDataUrl
self
.importDataDict
=
importDataDict
self
.serviceName
=
serviceName
@abstract_method
def
internalStart(
self
):
@abstract_method
def
internalStop(
self
):
def
start(
self
):
self
.startTime
=
datetime.now()
self
.internalStart()
def
stop(
self
):
self
.internalStop()
self
.done
=
True
self
.timeElapsed
=
datetime.now()
-
self
.startTime
def
getTimeStatistics(
self
):
if
self
.timeElapsed
is
None
:
return
datetime.now()
-
self
.startTime
return
self
.timeElapsed
@should_be_extended_in_descendents
def
describe(
self
):
return
{
'_id'
:
self
._id,
'time'
:
str
(
self
.getTimeStatistics()),
'done'
:
self
.done,
'channelName'
:
self
.channelName,
'openDataUrl'
:
self
.openDataUrl,
'serviceName'
:
self
.serviceName}
@classmethod
def
generateId(
cls
):
return
''.join(
random.choice(
string.ascii_uppercase
+
string.ascii_lowercase
+
string.digits)
for
x
in
range
(
12
))
OpenDataObjectsLoader (abstract class)
class
OpenDataObjectsLoader:
def
__init__(
self
, loadUrl):
self
.loadUrl
=
loadUrl
@abstract_method
def
load(
self
):
OpenDataToPointTranslator
class OpenDataToPointTranslator:
def __init__(
self, importDataDict,
objectRepresentation,
version,
importSource,
channelId):
self.objectRepresentation = objectRepresentation
self.version = version
self.importSource = importSource
self.channelId = channelId
self.importDataDict = importDataDict
@should_be_extended_in_descendents
def getPointJson(self):
obj = {}
obj['version'] = self.version
obj['import_source'] = self.importSource
return obj
@should_be_extended_in_descendents
def getPoint(self):
point = {'json': self.getPointJson()}
point['channel_id'] = self.channelId
return point
OpenDataObjectsParser (abstract class)
class
OpenDataObjectsParser:
def
__init__(
self
, data):
self
.data
=
data
@abstract_method
def
parse(
self
):
OpenDataToPointsLoader
class
OpenKareliaDataToPointsLoader:
pointsArray
=
[]
def
__init__(
self
, serviceName, points):
self
.pointsArray
=
points
self
.serviceName
=
serviceName
def
loadPoints(
self
):
collection
=
getDbObject(
self
.serviceName)[POINTS]
for
point
in
self
.pointsArray:
collection.save(point)
JobManager – class for managing jobs, starts and stops jobs, displays information about their status.
class
JobManager:
jobs
=
{}
@classmethod
def
startJob(
cls
, job):
jobId
=
job.describe().get(
'_id'
, '')
job.start()
cls
.jobs[jobId]
=
job
return
jobId
@classmethod
def
getJob(
cls
, jobId):
return
cls
.jobs.get(jobId).describe()
@classmethod
def
stopJob(
cls
, jobId):
cls
.jobs.get(jobId).stop()
@classmethod
def
getJobs(
cls
):
result
=
[]
for
job
in
cls
.jobs:
result.append(
cls
.jobs[job].describe())
return
result
JobResource
class
JobResource(Resource):
@possibleException
def
get(
self
, serviceName, jobId):
return
JobManager.getJob(jobId)
@possibleException
def
delete(
self
, serviceName, jobId):
return
JobManager.stopJob(jobId)
ODImportParser
class
ODImportParser():
@should_be_extended_in_descendents
MANDATORY_FIELDS
=
[CHANNEL_NAME, OPEN_DATA_URL]
@staticmethod
def
parse(
self
):
args
=
loads(request.get_data())
return
args
@staticmethod
def
validate(
self
, args):
for
key
in
MANDATORY_FIELDS:
if
key
not
in
args:
raise
BadRequest(
'{0} parameter is missing'
.
format
(key))
elif
not
isinstance
(args[key],
unicode
):
raise
BadRequest(
'{0} value is not unicode'
.
format
(key))
@staticmethod
def
parsePostParameters():
args
=
self
.parse()
self
.validate(args)
return
args
JobListResourceFactory – factory method to create JobListResource classes which we need.
def
JobListResourceFactory(parserClass, jobClass, importFunction)
class
JobListResource(Resource):
@possibleException
def
get(
self
, serviceName):
getServiceIdByName(serviceName)
return
JobManager.getJobs()
@possibleException
def
post(
self
, serviceName):
importDataDict
=
parserClass.parsePostParameters()
channelName
=
importDataDict.get(
'channelName'
)
getServiceIdByName(serviceName)
getChannelByName(serviceName, channelName)
job
=
jobClass(importFunction, importDataDict.get(
'channelName'
),
importDataDict.get(
'openDataUrl'
), importDataDict,
serviceName)
return
JobManager.startJob(job)
return
JobListResource
Method performImportActions
performImportActions ( odLoaderClass, odParserClass, odToPointTranslatorClass, odToPointsLoaderClass, serviceName, channelName, openDataUrl, showObjectUrl, showImageUrl)
A function which contains the logic of importing from source of open data.
def
performImportActions ( odLoaderClass, odParserClass, \
odToPointTranslatorClass, odToPointsLoaderClass, \
serviceName, channelName, openDataUrl, \
importDataDict):
channelId
=
getChannelIdByName(channelName)
version
=
datetime.now()
loader
=
odLoaderClass(openDataUrl)
openData
=
loader.load()
parser
=
odParserClass(openData)
objects
=
parser.parse()
points
=
[ ]
for
object
in
objects:
translator
=
odToPointTranslatorClass(importDataDict,
object
, version,openDataUrl, channelId)
points.append(translator.getPoint())
pointsLoader
=
odToPointsLoaderClass(serviceName, points)
pointsLoader.loadPoints()