120 lines
3.9 KiB
Python
120 lines
3.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
from urllib import parse as urlparse
|
|
|
|
class ApiTest(object):
|
|
|
|
def __init__(self, workDir):
|
|
self._workDir = workDir + '/tests/'
|
|
|
|
def convert(self):
|
|
self._env = json.loads(open(self._workDir + 'env.json', 'r').read())
|
|
self._projObj = {
|
|
'info': {
|
|
'_postman_id': '',
|
|
'name': self._env['name'],
|
|
'schema': 'https://schema.getpostman.com/json/collection/v2.1.0/collection.json'
|
|
},
|
|
'variable': self._env['variables'],
|
|
'item': []
|
|
}
|
|
sys.path.append(self._workDir + 'collections')
|
|
for file in os.listdir(self._workDir + 'collections'):
|
|
if file[0:1] == '_':
|
|
continue
|
|
m = __import__(file.replace('.py', ''))
|
|
for name in dir(m):
|
|
if name[0:1] != '_':
|
|
c = getattr(m, name)()
|
|
self._convertCollection(c)
|
|
self._out()
|
|
|
|
def _out(self):
|
|
wf = open(self.getOutFileName(), 'w')
|
|
wf.write(json.dumps(self._projObj, indent = 4))
|
|
wf.close()
|
|
|
|
def _convertCollection(self, c):
|
|
for item in c.items:
|
|
itemObj = {
|
|
'name': item.get('name', ''),
|
|
'event': [],
|
|
'request': {
|
|
'method': item.get('method', 'GET'),
|
|
'header': [],
|
|
'url': {
|
|
'raw': self._getRaw(item),
|
|
'protocol': self._getProtocol(item['url']),
|
|
'host': self._getHost(item['url']),
|
|
'path': self._getPath(item['url']),
|
|
'query': []
|
|
}
|
|
},
|
|
'response': []
|
|
}
|
|
if 'prerequest' in item:
|
|
itemObj['event'].append({
|
|
'listen': 'prerequest',
|
|
'script': {
|
|
'exec': [
|
|
item['prerequest'].strip()
|
|
],
|
|
'type': 'text/javascript'
|
|
}
|
|
})
|
|
if 'test' in item:
|
|
itemObj['event'].append({
|
|
'listen': 'test',
|
|
'script': {
|
|
'exec': [
|
|
item['test'].strip()
|
|
],
|
|
'type': 'text/javascript'
|
|
}
|
|
})
|
|
for key, val in self._getArgs(item['url']).items():
|
|
itemObj['request']['url']['query'].append({
|
|
'key': key,
|
|
'value': val[0],
|
|
})
|
|
for param in item['params']:
|
|
itemObj['request']['url']['query'].append({
|
|
'key': param[0],
|
|
'value': param[1],
|
|
})
|
|
self._projObj['item'].append(itemObj)
|
|
|
|
def _getRaw(self, item):
|
|
raw = item['url']
|
|
for param in item['params']:
|
|
if raw.index('?') == -1:
|
|
raw += '?'
|
|
if raw[-1] in ('&', '?'):
|
|
raw += param[0] + '=' + param[1]
|
|
else:
|
|
raw += '&' + param[0] + '=' + param[1]
|
|
return raw
|
|
|
|
def _getProtocol(self, url):
|
|
parsed_tuple = urlparse.urlparse(url)
|
|
return parsed_tuple.scheme
|
|
|
|
def _getHost(self, url):
|
|
parsed_tuple = urlparse.urlparse(url)
|
|
return [parsed_tuple.netloc]
|
|
|
|
def _getPath(self, url):
|
|
parsed_tuple = urlparse.urlparse(url)
|
|
return [path for path in parsed_tuple.path.split('/') if path != '']
|
|
|
|
def _getArgs(self, url):
|
|
parsed_tuple = urlparse.urlparse(url)
|
|
args = urlparse.parse_qs(parsed_tuple.query)
|
|
return args
|
|
|
|
def getOutFileName(self):
|
|
return self._workDir + self._env['name'] + '.postman.json'
|