Commit e350e84a authored by Johannes Henkel's avatar Johannes Henkel Committed by Commit Bot

[DevTools] Roll inspector_protocol (V8).

New Revision: d48ba2079ffcdaf2d99f4153127aab6dbe32a954

Change-Id: Idde7388b4f92492609c1714fc003ec3234c8bf82
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1686451
Auto-Submit: Johannes Henkel <johannes@chromium.org>
Reviewed-by: 's avatarAlexei Filippov <alph@chromium.org>
Commit-Queue: Alexei Filippov <alph@chromium.org>
Cr-Commit-Position: refs/heads/master@{#62503}
parent 39eab44d
......@@ -15,8 +15,8 @@ https://cs.chromium.org/chromium/src/v8/third_party/inspector_protocol/
See also [Contributing to Chrome Devtools Protocol](https://docs.google.com/document/d/1c-COD2kaK__5iMM5SEx-PzNA7HFmgttcYfOHHX0HaOM/edit).
We're working on enabling standalone builds for parts of this package for
testing and development, please feel free to ignore this for now.
But, if you're familiar with
testing and development.
If you're familiar with
[Chromium's development process](https://www.chromium.org/developers/contributing-code)
and have the depot_tools installed, you may use these commands
to fetch the package (and dependencies) and build and run the tests:
......@@ -24,8 +24,9 @@ to fetch the package (and dependencies) and build and run the tests:
fetch inspector_protocol
cd src
gn gen out/Release
ninja -C out/Release json_parser_test
out/Release/json_parser_test
ninja -C out/Release encoding_test bindings_test
out/Release/encoding_test
out/Release/bindings_test
You'll probably also need to install g++, since Clang uses this to find the
standard C++ headers. E.g.,
......
......@@ -2,7 +2,7 @@ Name: inspector protocol
Short Name: inspector_protocol
URL: https://chromium.googlesource.com/deps/inspector_protocol/
Version: 0
Revision: aec57d43b6a2c41c37fb0a2507108e89a9342177
Revision: 373efb7fe33a7ae84038868ed08b9f1bd328b55d
License: BSD
License File: LICENSE
Security Critical: no
......
......@@ -11,18 +11,18 @@ import functools
import re
import copy
try:
import json
import json
except ImportError:
import simplejson as json
import simplejson as json
import pdl
try:
unicode
unicode
except NameError:
# Define unicode for Py3
def unicode(s, *_):
return s
# Define unicode for Py3
def unicode(s, *_):
return s
# Path handling for libraries and templates
# Paths have to be normalized because Jinja uses the exact template path to
......@@ -35,662 +35,695 @@ except NameError:
module_path, module_filename = os.path.split(os.path.realpath(__file__))
def read_config():
# pylint: disable=W0703
def json_to_object(data, output_base, config_base):
def json_object_hook(object_dict):
items = [(k, os.path.join(config_base, v) if k == "path" else v) for (k, v) in object_dict.items()]
items = [(k, os.path.join(output_base, v) if k == "output" else v) for (k, v) in items]
keys, values = list(zip(*items))
return collections.namedtuple('X', keys)(*values)
return json.loads(data, object_hook=json_object_hook)
def init_defaults(config_tuple, path, defaults):
keys = list(config_tuple._fields) # pylint: disable=E1101
values = [getattr(config_tuple, k) for k in keys]
for i in range(len(keys)):
if hasattr(values[i], "_fields"):
values[i] = init_defaults(values[i], path + "." + keys[i], defaults)
for optional in defaults:
if optional.find(path + ".") != 0:
continue
optional_key = optional[len(path) + 1:]
if optional_key.find(".") == -1 and optional_key not in keys:
keys.append(optional_key)
values.append(defaults[optional])
return collections.namedtuple('X', keys)(*values)
try:
cmdline_parser = argparse.ArgumentParser()
cmdline_parser.add_argument("--output_base", type=unicode, required=True)
cmdline_parser.add_argument("--jinja_dir", type=unicode, required=True)
cmdline_parser.add_argument("--config", type=unicode, required=True)
cmdline_parser.add_argument("--config_value", default=[], action="append")
cmdline_parser.add_argument(
"--inspector_protocol_dir", type=unicode, required=True,
help=("directory with code_generator.py and C++ encoding / binding "
"libraries, relative to the root of the source tree."))
arg_options = cmdline_parser.parse_args()
jinja_dir = arg_options.jinja_dir
output_base = arg_options.output_base
config_file = arg_options.config
config_base = os.path.dirname(config_file)
config_values = arg_options.config_value
inspector_protocol_dir = arg_options.inspector_protocol_dir.lstrip('/')
except Exception:
# Work with python 2 and 3 http://docs.python.org/py3k/howto/pyporting.html
exc = sys.exc_info()[1]
sys.stderr.write("Failed to parse command-line arguments: %s\n\n" % exc)
exit(1)
try:
config_json_file = open(config_file, "r")
config_json_string = config_json_file.read()
config_partial = json_to_object(config_json_string, output_base, config_base)
config_json_file.close()
defaults = {
".use_snake_file_names": False,
".use_title_case_methods": False,
".imported": False,
".imported.export_macro": "",
".imported.export_header": False,
".imported.header": False,
".imported.package": False,
".imported.options": False,
".protocol.export_macro": "",
".protocol.export_header": False,
".protocol.options": False,
".protocol.file_name_prefix": "",
".exported": False,
".exported.export_macro": "",
".exported.export_header": False,
".lib": False,
".lib.export_macro": "",
".lib.export_header": False,
# The encoding lib consists of encoding/encoding.h and
# encoding/encoding.cc in its subdirectory, which binaries
# must link / depend on.
".encoding_lib.header": os.path.join(inspector_protocol_dir, "encoding/encoding.h"),
".encoding_lib.namespace": "",
# Ditto for bindings, see bindings/bindings.h.
".bindings_lib.header": os.path.join(inspector_protocol_dir, "bindings/bindings.h"),
".bindings_lib.namespace": ""
}
for key_value in config_values:
parts = key_value.split("=")
if len(parts) == 2:
defaults["." + parts[0]] = parts[1]
return (jinja_dir, config_file, init_defaults(config_partial, "", defaults))
except Exception:
# Work with python 2 and 3 http://docs.python.org/py3k/howto/pyporting.html
exc = sys.exc_info()[1]
sys.stderr.write("Failed to parse config file: %s\n\n" % exc)
exit(1)
# pylint: disable=W0703
def json_to_object(data, output_base, config_base):
def json_object_hook(object_dict):
items = [(k, os.path.join(config_base, v) if k == "path" else v)
for (k, v) in object_dict.items()]
items = [(k, os.path.join(output_base, v) if k == "output" else v)
for (k, v) in items]
keys, values = list(zip(*items))
return collections.namedtuple('X', keys)(*values)
return json.loads(data, object_hook=json_object_hook)
def init_defaults(config_tuple, path, defaults):
keys = list(config_tuple._fields) # pylint: disable=E1101
values = [getattr(config_tuple, k) for k in keys]
for i in range(len(keys)):
if hasattr(values[i], "_fields"):
values[i] = init_defaults(values[i], path + "." + keys[i], defaults)
for optional in defaults:
if optional.find(path + ".") != 0:
continue
optional_key = optional[len(path) + 1:]
if optional_key.find(".") == -1 and optional_key not in keys:
keys.append(optional_key)
values.append(defaults[optional])
return collections.namedtuple('X', keys)(*values)
try:
cmdline_parser = argparse.ArgumentParser()
cmdline_parser.add_argument("--output_base", type=unicode, required=True)
cmdline_parser.add_argument("--jinja_dir", type=unicode, required=True)
cmdline_parser.add_argument("--config", type=unicode, required=True)
cmdline_parser.add_argument("--config_value", default=[], action="append")
cmdline_parser.add_argument(
"--inspector_protocol_dir", type=unicode, required=True,
help=("directory with code_generator.py and C++ encoding / binding "
"libraries, relative to the root of the source tree."))
arg_options = cmdline_parser.parse_args()
jinja_dir = arg_options.jinja_dir
output_base = arg_options.output_base
config_file = arg_options.config
config_base = os.path.dirname(config_file)
config_values = arg_options.config_value
inspector_protocol_dir = arg_options.inspector_protocol_dir.lstrip('/')
except Exception:
# Work with python 2 and 3 http://docs.python.org/py3k/howto/pyporting.html
exc = sys.exc_info()[1]
sys.stderr.write("Failed to parse command-line arguments: %s\n\n" % exc)
exit(1)
try:
config_json_file = open(config_file, "r")
config_json_string = config_json_file.read()
config_partial = json_to_object(config_json_string, output_base,
config_base)
config_json_file.close()
defaults = {
".use_snake_file_names": False,
".use_title_case_methods": False,
".imported": False,
".imported.export_macro": "",
".imported.export_header": False,
".imported.header": False,
".imported.package": False,
".imported.options": False,
".protocol.export_macro": "",
".protocol.export_header": False,
".protocol.options": False,
".protocol.file_name_prefix": "",
".exported": False,
".exported.export_macro": "",
".exported.export_header": False,
".lib": False,
".lib.export_macro": "",
".lib.export_header": False,
# The encoding lib consists of encoding/encoding.h and
# encoding/encoding.cc in its subdirectory, which binaries
# must link / depend on.
".encoding_lib.header": os.path.join(inspector_protocol_dir,
"encoding/encoding.h"),
".encoding_lib.namespace": "",
# Ditto for bindings, see bindings/bindings.h.
".bindings_lib.header": os.path.join(inspector_protocol_dir,
"bindings/bindings.h"),
".bindings_lib.namespace": ""
}
for key_value in config_values:
parts = key_value.split("=")
if len(parts) == 2:
defaults["." + parts[0]] = parts[1]
return (jinja_dir, config_file, init_defaults(config_partial, "", defaults))
except Exception:
# Work with python 2 and 3 http://docs.python.org/py3k/howto/pyporting.html
exc = sys.exc_info()[1]
sys.stderr.write("Failed to parse config file: %s\n\n" % exc)
exit(1)
# ---- Begin of utilities exposed to generator ----
def to_title_case(name):
return name[:1].upper() + name[1:]
return name[:1].upper() + name[1:]
def dash_to_camelcase(word):
prefix = ""
if word[0] == "-":
prefix = "Negative"
word = word[1:]
return prefix + "".join(to_title_case(x) or "-" for x in word.split("-"))
prefix = ""
if word[0] == "-":
prefix = "Negative"
word = word[1:]
return prefix + "".join(to_title_case(x) or "-" for x in word.split("-"))
def to_snake_case(name):
return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name, sys.maxsize).lower()
return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name, sys.maxsize).lower()
def to_method_case(config, name):
if config.use_title_case_methods:
return to_title_case(name)
return name
if config.use_title_case_methods:
return to_title_case(name)
return name
def join_arrays(dict, keys):
result = []
for key in keys:
if key in dict:
result += dict[key]
return result
result = []
for key in keys:
if key in dict:
result += dict[key]
return result
def format_include(config, header, file_name=None):
if file_name is not None:
header = header + "/" + file_name + ".h"
header = "\"" + header + "\"" if header[0] not in "<\"" else header
if config.use_snake_file_names:
header = to_snake_case(header)
return header
if file_name is not None:
header = header + "/" + file_name + ".h"
header = "\"" + header + "\"" if header[0] not in "<\"" else header
if config.use_snake_file_names:
header = to_snake_case(header)
return header
def format_domain_include(config, header, file_name):
return format_include(config, header, config.protocol.file_name_prefix + file_name)
return format_include(config, header,
config.protocol.file_name_prefix + file_name)
def to_file_name(config, file_name):
if config.use_snake_file_names:
return to_snake_case(file_name).replace(".cpp", ".cc")
return file_name
if config.use_snake_file_names:
return to_snake_case(file_name).replace(".cpp", ".cc")
return file_name
# ---- End of utilities exposed to generator ----
def initialize_jinja_env(jinja_dir, cache_dir, config):
# pylint: disable=F0401
sys.path.insert(1, os.path.abspath(jinja_dir))
import jinja2
jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(module_path),
# Bytecode cache is not concurrency-safe unless pre-cached:
# if pre-cached this is read-only, but writing creates a race condition.
bytecode_cache=jinja2.FileSystemBytecodeCache(cache_dir),
keep_trailing_newline=True, # newline-terminate generated files
lstrip_blocks=True, # so can indent control flow tags
trim_blocks=True)
jinja_env.filters.update({"to_title_case": to_title_case, "dash_to_camelcase": dash_to_camelcase, "to_method_case": functools.partial(to_method_case, config)})
jinja_env.add_extension("jinja2.ext.loopcontrols")
return jinja_env
# pylint: disable=F0401
sys.path.insert(1, os.path.abspath(jinja_dir))
import jinja2
jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(module_path),
# Bytecode cache is not concurrency-safe unless pre-cached:
# if pre-cached this is read-only, but writing creates a race condition.
bytecode_cache=jinja2.FileSystemBytecodeCache(cache_dir),
keep_trailing_newline=True, # newline-terminate generated files
lstrip_blocks=True, # so can indent control flow tags
trim_blocks=True)
jinja_env.filters.update({
"to_title_case": to_title_case,
"dash_to_camelcase": dash_to_camelcase,
"to_method_case": functools.partial(to_method_case, config)})
jinja_env.add_extension("jinja2.ext.loopcontrols")
return jinja_env
def create_imported_type_definition(domain_name, type, imported_namespace):
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<%s::%s::API::%s>" % (imported_namespace, domain_name, type["id"]),
"pass_type": "std::unique_ptr<%s::%s::API::%s>" % (imported_namespace, domain_name, type["id"]),
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<%s::%s::API::%s>" % (imported_namespace, domain_name, type["id"]),
"raw_type": "%s::%s::API::%s" % (imported_namespace, domain_name, type["id"]),
"raw_pass_type": "%s::%s::API::%s*" % (imported_namespace, domain_name, type["id"]),
"raw_return_type": "%s::%s::API::%s*" % (imported_namespace, domain_name, type["id"]),
}
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<%s::%s::API::%s>" % (
imported_namespace, domain_name, type["id"]),
"pass_type": "std::unique_ptr<%s::%s::API::%s>" % (
imported_namespace, domain_name, type["id"]),
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<%s::%s::API::%s>" % (
imported_namespace, domain_name, type["id"]),
"raw_type": "%s::%s::API::%s" % (
imported_namespace, domain_name, type["id"]),
"raw_pass_type": "%s::%s::API::%s*" % (
imported_namespace, domain_name, type["id"]),
"raw_return_type": "%s::%s::API::%s*" % (
imported_namespace, domain_name, type["id"]),
}
def create_user_type_definition(domain_name, type):
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::%s::%s>" % (domain_name, type["id"]),
"pass_type": "std::unique_ptr<protocol::%s::%s>" % (domain_name, type["id"]),
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::%s::%s>" % (domain_name, type["id"]),
"raw_type": "protocol::%s::%s" % (domain_name, type["id"]),
"raw_pass_type": "protocol::%s::%s*" % (domain_name, type["id"]),
"raw_return_type": "protocol::%s::%s*" % (domain_name, type["id"]),
}
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::%s::%s>" % (
domain_name, type["id"]),
"pass_type": "std::unique_ptr<protocol::%s::%s>" % (
domain_name, type["id"]),
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::%s::%s>" % (domain_name, type["id"]),
"raw_type": "protocol::%s::%s" % (domain_name, type["id"]),
"raw_pass_type": "protocol::%s::%s*" % (domain_name, type["id"]),
"raw_return_type": "protocol::%s::%s*" % (domain_name, type["id"]),
}
def create_object_type_definition():
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::DictionaryValue>",
"pass_type": "std::unique_ptr<protocol::DictionaryValue>",
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::DictionaryValue>",
"raw_type": "protocol::DictionaryValue",
"raw_pass_type": "protocol::DictionaryValue*",
"raw_return_type": "protocol::DictionaryValue*",
}
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::DictionaryValue>",
"pass_type": "std::unique_ptr<protocol::DictionaryValue>",
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::DictionaryValue>",
"raw_type": "protocol::DictionaryValue",
"raw_pass_type": "protocol::DictionaryValue*",
"raw_return_type": "protocol::DictionaryValue*",
}
def create_any_type_definition():
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::Value>",
"pass_type": "std::unique_ptr<protocol::Value>",
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::Value>",
"raw_type": "protocol::Value",
"raw_pass_type": "protocol::Value*",
"raw_return_type": "protocol::Value*",
}
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::Value>",
"pass_type": "std::unique_ptr<protocol::Value>",
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::Value>",
"raw_type": "protocol::Value",
"raw_pass_type": "protocol::Value*",
"raw_return_type": "protocol::Value*",
}
def create_string_type_definition():
# pylint: disable=W0622
return {
"return_type": "String",
"pass_type": "const String&",
"to_pass_type": "%s",
"to_raw_type": "%s",
"to_rvalue": "%s",
"type": "String",
"raw_type": "String",
"raw_pass_type": "const String&",
"raw_return_type": "String",
}
# pylint: disable=W0622
return {
"return_type": "String",
"pass_type": "const String&",
"to_pass_type": "%s",
"to_raw_type": "%s",
"to_rvalue": "%s",
"type": "String",
"raw_type": "String",
"raw_pass_type": "const String&",
"raw_return_type": "String",
}
def create_binary_type_definition():
# pylint: disable=W0622
return {
"return_type": "Binary",
"pass_type": "const Binary&",
"to_pass_type": "%s",
"to_raw_type": "%s",
"to_rvalue": "%s",
"type": "Binary",
"raw_type": "Binary",
"raw_pass_type": "const Binary&",
"raw_return_type": "Binary",
}
# pylint: disable=W0622
return {
"return_type": "Binary",
"pass_type": "const Binary&",
"to_pass_type": "%s",
"to_raw_type": "%s",
"to_rvalue": "%s",
"type": "Binary",
"raw_type": "Binary",
"raw_pass_type": "const Binary&",
"raw_return_type": "Binary",
}
def create_primitive_type_definition(type):
# pylint: disable=W0622
typedefs = {
"number": "double",
"integer": "int",
"boolean": "bool"
}
defaults = {
"number": "0",
"integer": "0",
"boolean": "false"
}
jsontypes = {
"number": "TypeDouble",
"integer": "TypeInteger",
"boolean": "TypeBoolean",
}
return {
"return_type": typedefs[type],
"pass_type": typedefs[type],
"to_pass_type": "%s",
"to_raw_type": "%s",
"to_rvalue": "%s",
"type": typedefs[type],
"raw_type": typedefs[type],
"raw_pass_type": typedefs[type],
"raw_return_type": typedefs[type],
"default_value": defaults[type]
}
# pylint: disable=W0622
typedefs = {
"number": "double",
"integer": "int",
"boolean": "bool"
}
defaults = {
"number": "0",
"integer": "0",
"boolean": "false"
}
jsontypes = {
"number": "TypeDouble",
"integer": "TypeInteger",
"boolean": "TypeBoolean",
}
return {
"return_type": typedefs[type],
"pass_type": typedefs[type],
"to_pass_type": "%s",
"to_raw_type": "%s",
"to_rvalue": "%s",
"type": typedefs[type],
"raw_type": typedefs[type],
"raw_pass_type": typedefs[type],
"raw_return_type": typedefs[type],
"default_value": defaults[type]
}
def wrap_array_definition(type):
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
"pass_type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
"raw_type": "protocol::Array<%s>" % type["raw_type"],
"raw_pass_type": "protocol::Array<%s>*" % type["raw_type"],
"raw_return_type": "protocol::Array<%s>*" % type["raw_type"],
"out_type": "protocol::Array<%s>&" % type["raw_type"],
}
# pylint: disable=W0622
return {
"return_type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
"pass_type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
"to_raw_type": "%s.get()",
"to_pass_type": "std::move(%s)",
"to_rvalue": "std::move(%s)",
"type": "std::unique_ptr<protocol::Array<%s>>" % type["raw_type"],
"raw_type": "protocol::Array<%s>" % type["raw_type"],
"raw_pass_type": "protocol::Array<%s>*" % type["raw_type"],
"raw_return_type": "protocol::Array<%s>*" % type["raw_type"],
"out_type": "protocol::Array<%s>&" % type["raw_type"],
}
class Protocol(object):
def __init__(self, config):
self.config = config
self.json_api = {"domains": []}
self.imported_domains = []
self.exported_domains = []
self.generate_domains = self.read_protocol_file(config.protocol.path)
if config.protocol.options:
self.generate_domains = [rule.domain for rule in config.protocol.options]
self.exported_domains = [rule.domain for rule in config.protocol.options if hasattr(rule, "exported")]
if config.imported:
self.imported_domains = self.read_protocol_file(config.imported.path)
if config.imported.options:
self.imported_domains = [rule.domain for rule in config.imported.options]
self.patch_full_qualified_refs()
self.create_notification_types()
self.create_type_definitions()
self.generate_used_types()
def read_protocol_file(self, file_name):
input_file = open(file_name, "r")
parsed_json = pdl.loads(input_file.read(), file_name)
input_file.close()
version = parsed_json["version"]["major"] + "." + parsed_json["version"]["minor"]
domains = []
for domain in parsed_json["domains"]:
domains.append(domain["domain"])
domain["version"] = version
self.json_api["domains"] += parsed_json["domains"]
return domains
def patch_full_qualified_refs(self):
def patch_full_qualified_refs_in_domain(json, domain_name):
if isinstance(json, list):
for item in json:
patch_full_qualified_refs_in_domain(item, domain_name)
if not isinstance(json, dict):
return
for key in json:
if key == "type" and json[key] == "string":
json[key] = domain_name + ".string"
if key != "$ref":
patch_full_qualified_refs_in_domain(json[key], domain_name)
continue
if json["$ref"].find(".") == -1:
json["$ref"] = domain_name + "." + json["$ref"]
return
for domain in self.json_api["domains"]:
patch_full_qualified_refs_in_domain(domain, domain["domain"])
def all_references(self, json):
refs = set()
if isinstance(json, list):
for item in json:
refs |= self.all_references(item)
if not isinstance(json, dict):
return refs
for key in json:
if key != "$ref":
refs |= self.all_references(json[key])
else:
refs.add(json["$ref"])
return refs
def generate_used_types(self):
all_refs = set()
for domain in self.json_api["domains"]:
domain_name = domain["domain"]
if "commands" in domain:
for command in domain["commands"]:
if self.generate_command(domain_name, command["name"]):
all_refs |= self.all_references(command)
if "events" in domain:
for event in domain["events"]:
if self.generate_event(domain_name, event["name"]):
all_refs |= self.all_references(event)
all_refs.add(domain_name + "." + to_title_case(event["name"]) + "Notification")
dependencies = self.generate_type_dependencies()
queue = set(all_refs)
while len(queue):
ref = queue.pop()
if ref in dependencies:
queue |= dependencies[ref] - all_refs
all_refs |= dependencies[ref]
self.used_types = all_refs
def generate_type_dependencies(self):
dependencies = dict()
domains_with_types = (x for x in self.json_api["domains"] if "types" in x)
for domain in domains_with_types:
domain_name = domain["domain"]
for type in domain["types"]:
related_types = self.all_references(type)
if len(related_types):
dependencies[domain_name + "." + type["id"]] = related_types
return dependencies
def create_notification_types(self):
for domain in self.json_api["domains"]:
if "events" in domain:
for event in domain["events"]:
event_type = dict()
event_type["description"] = "Wrapper for notification params"
event_type["type"] = "object"
event_type["id"] = to_title_case(event["name"]) + "Notification"
if "parameters" in event:
event_type["properties"] = copy.deepcopy(event["parameters"])
if "types" not in domain:
domain["types"] = list()
domain["types"].append(event_type)
def create_type_definitions(self):
imported_namespace = "::".join(self.config.imported.namespace) if self.config.imported else ""
self.type_definitions = {}
self.type_definitions["number"] = create_primitive_type_definition("number")
self.type_definitions["integer"] = create_primitive_type_definition("integer")
self.type_definitions["boolean"] = create_primitive_type_definition("boolean")
self.type_definitions["object"] = create_object_type_definition()
self.type_definitions["any"] = create_any_type_definition()
self.type_definitions["binary"] = create_binary_type_definition()
for domain in self.json_api["domains"]:
self.type_definitions[domain["domain"] + ".string"] = create_string_type_definition()
self.type_definitions[domain["domain"] + ".binary"] = create_binary_type_definition()
if not ("types" in domain):
continue
for type in domain["types"]:
type_name = domain["domain"] + "." + type["id"]
if type["type"] == "object" and domain["domain"] in self.imported_domains:
self.type_definitions[type_name] = create_imported_type_definition(domain["domain"], type, imported_namespace)
elif type["type"] == "object":
self.type_definitions[type_name] = create_user_type_definition(domain["domain"], type)
elif type["type"] == "array":
self.type_definitions[type_name] = self.resolve_type(type)
elif type["type"] == domain["domain"] + ".string":
self.type_definitions[type_name] = create_string_type_definition()
elif type["type"] == domain["domain"] + ".binary":
self.type_definitions[type_name] = create_binary_type_definition()
else:
self.type_definitions[type_name] = create_primitive_type_definition(type["type"])
def check_options(self, options, domain, name, include_attr, exclude_attr, default):
for rule in options:
if rule.domain != domain:
continue
if include_attr and hasattr(rule, include_attr):
return name in getattr(rule, include_attr)
if exclude_attr and hasattr(rule, exclude_attr):
return name not in getattr(rule, exclude_attr)
return default
return False
# ---- Begin of methods exposed to generator
def type_definition(self, name):
return self.type_definitions[name]
def resolve_type(self, prop):
if "$ref" in prop:
return self.type_definitions[prop["$ref"]]
if prop["type"] == "array":
return wrap_array_definition(self.resolve_type(prop["items"]))
return self.type_definitions[prop["type"]]
def __init__(self, config):
self.config = config
self.json_api = {"domains": []}
self.imported_domains = []
self.exported_domains = []
self.generate_domains = self.read_protocol_file(config.protocol.path)
def generate_command(self, domain, command):
if not self.config.protocol.options:
return domain in self.generate_domains
return self.check_options(self.config.protocol.options, domain, command, "include", "exclude", True)
if config.protocol.options:
self.generate_domains = [rule.domain for rule in config.protocol.options]
self.exported_domains = [rule.domain for rule in config.protocol.options
if hasattr(rule, "exported")]
def generate_event(self, domain, event):
if not self.config.protocol.options:
return domain in self.generate_domains
return self.check_options(self.config.protocol.options, domain, event, "include_events", "exclude_events", True)
def generate_type(self, domain, typename):
return domain + "." + typename in self.used_types
def is_async_command(self, domain, command):
if not self.config.protocol.options:
return False
return self.check_options(self.config.protocol.options, domain, command, "async", None, False)
def is_exported(self, domain, name):
if not self.config.protocol.options:
return False
return self.check_options(self.config.protocol.options, domain, name, "exported", None, False)
def is_imported(self, domain, name):
if not self.config.imported:
return False
if not self.config.imported.options:
return domain in self.imported_domains
return self.check_options(self.config.imported.options, domain, name, "imported", None, False)
def is_exported_domain(self, domain):
return domain in self.exported_domains
def generate_disable(self, domain):
if "commands" not in domain:
return True
if config.imported:
self.imported_domains = self.read_protocol_file(config.imported.path)
if config.imported.options:
self.imported_domains = [rule.domain
for rule in config.imported.options]
self.patch_full_qualified_refs()
self.create_notification_types()
self.create_type_definitions()
self.generate_used_types()
def read_protocol_file(self, file_name):
input_file = open(file_name, "r")
parsed_json = pdl.loads(input_file.read(), file_name)
input_file.close()
version = '%s.%s' % (parsed_json["version"]["major"],
parsed_json["version"]["minor"])
domains = []
for domain in parsed_json["domains"]:
domains.append(domain["domain"])
domain["version"] = version
self.json_api["domains"] += parsed_json["domains"]
return domains
def patch_full_qualified_refs(self):
def patch_full_qualified_refs_in_domain(json, domain_name):
if isinstance(json, list):
for item in json:
patch_full_qualified_refs_in_domain(item, domain_name)
if not isinstance(json, dict):
return
for key in json:
if key == "type" and json[key] == "string":
json[key] = domain_name + ".string"
if key != "$ref":
patch_full_qualified_refs_in_domain(json[key], domain_name)
continue
if json["$ref"].find(".") == -1:
json["$ref"] = domain_name + "." + json["$ref"]
return
for domain in self.json_api["domains"]:
patch_full_qualified_refs_in_domain(domain, domain["domain"])
def all_references(self, json):
refs = set()
if isinstance(json, list):
for item in json:
refs |= self.all_references(item)
if not isinstance(json, dict):
return refs
for key in json:
if key != "$ref":
refs |= self.all_references(json[key])
else:
refs.add(json["$ref"])
return refs
def generate_used_types(self):
all_refs = set()
for domain in self.json_api["domains"]:
domain_name = domain["domain"]
if "commands" in domain:
for command in domain["commands"]:
if command["name"] == "disable" and self.generate_command(domain["domain"], "disable"):
return False
return True
if self.generate_command(domain_name, command["name"]):
all_refs |= self.all_references(command)
if "events" in domain:
for event in domain["events"]:
if self.generate_event(domain_name, event["name"]):
all_refs |= self.all_references(event)
all_refs.add('%s.%sNotification' % (domain_name,
to_title_case(event["name"])))
dependencies = self.generate_type_dependencies()
queue = set(all_refs)
while len(queue):
ref = queue.pop()
if ref in dependencies:
queue |= dependencies[ref] - all_refs
all_refs |= dependencies[ref]
self.used_types = all_refs
def generate_type_dependencies(self):
dependencies = dict()
domains_with_types = (x for x in self.json_api["domains"] if "types" in x)
for domain in domains_with_types:
domain_name = domain["domain"]
for type in domain["types"]:
related_types = self.all_references(type)
if len(related_types):
dependencies[domain_name + "." + type["id"]] = related_types
return dependencies
def create_notification_types(self):
for domain in self.json_api["domains"]:
if "events" in domain:
for event in domain["events"]:
event_type = dict()
event_type["description"] = "Wrapper for notification params"
event_type["type"] = "object"
event_type["id"] = to_title_case(event["name"]) + "Notification"
if "parameters" in event:
event_type["properties"] = copy.deepcopy(event["parameters"])
if "types" not in domain:
domain["types"] = list()
domain["types"].append(event_type)
def create_type_definitions(self):
imported_namespace = ""
if self.config.imported:
imported_namespace = "::".join(self.config.imported.namespace)
self.type_definitions = {}
self.type_definitions["number"] = create_primitive_type_definition("number")
self.type_definitions["integer"] = create_primitive_type_definition("integer")
self.type_definitions["boolean"] = create_primitive_type_definition("boolean")
self.type_definitions["object"] = create_object_type_definition()
self.type_definitions["any"] = create_any_type_definition()
self.type_definitions["binary"] = create_binary_type_definition()
for domain in self.json_api["domains"]:
self.type_definitions[domain["domain"] + ".string"] = (
create_string_type_definition())
self.type_definitions[domain["domain"] + ".binary"] = (
create_binary_type_definition())
if not ("types" in domain):
continue
for type in domain["types"]:
type_name = domain["domain"] + "." + type["id"]
if type["type"] == "object" and domain["domain"] in self.imported_domains:
self.type_definitions[type_name] = create_imported_type_definition(
domain["domain"], type, imported_namespace)
elif type["type"] == "object":
self.type_definitions[type_name] = create_user_type_definition(
domain["domain"], type)
elif type["type"] == "array":
self.type_definitions[type_name] = self.resolve_type(type)
elif type["type"] == domain["domain"] + ".string":
self.type_definitions[type_name] = create_string_type_definition()
elif type["type"] == domain["domain"] + ".binary":
self.type_definitions[type_name] = create_binary_type_definition()
else:
self.type_definitions[type_name] = create_primitive_type_definition(
type["type"])
def check_options(self, options, domain, name, include_attr, exclude_attr,
default):
for rule in options:
if rule.domain != domain:
continue
if include_attr and hasattr(rule, include_attr):
return name in getattr(rule, include_attr)
if exclude_attr and hasattr(rule, exclude_attr):
return name not in getattr(rule, exclude_attr)
return default
return False
# ---- Begin of methods exposed to generator
def type_definition(self, name):
return self.type_definitions[name]
def resolve_type(self, prop):
if "$ref" in prop:
return self.type_definitions[prop["$ref"]]
if prop["type"] == "array":
return wrap_array_definition(self.resolve_type(prop["items"]))
return self.type_definitions[prop["type"]]
def generate_command(self, domain, command):
if not self.config.protocol.options:
return domain in self.generate_domains
return self.check_options(self.config.protocol.options, domain, command,
"include", "exclude", True)
def generate_event(self, domain, event):
if not self.config.protocol.options:
return domain in self.generate_domains
return self.check_options(self.config.protocol.options, domain, event,
"include_events", "exclude_events", True)
def generate_type(self, domain, typename):
return domain + "." + typename in self.used_types
def is_async_command(self, domain, command):
if not self.config.protocol.options:
return False
return self.check_options(self.config.protocol.options, domain, command,
"async", None, False)
def is_exported(self, domain, name):
if not self.config.protocol.options:
return False
return self.check_options(self.config.protocol.options, domain, name,
"exported", None, False)
def is_imported(self, domain, name):
if not self.config.imported:
return False
if not self.config.imported.options:
return domain in self.imported_domains
return self.check_options(self.config.imported.options, domain, name,
"imported", None, False)
def is_exported_domain(self, domain):
return domain in self.exported_domains
def generate_disable(self, domain):
if "commands" not in domain:
return True
for command in domain["commands"]:
if command["name"] == "disable" and self.generate_command(
domain["domain"], "disable"):
return False
return True
def is_imported_dependency(self, domain):
return domain in self.generate_domains or domain in self.imported_domains
def is_imported_dependency(self, domain):
return domain in self.generate_domains or domain in self.imported_domains
def main():
jinja_dir, config_file, config = read_config()
protocol = Protocol(config)
if not config.exported and len(protocol.exported_domains):
sys.stderr.write("Domains [%s] are exported, but config is missing export entry\n\n" % ", ".join(protocol.exported_domains))
exit(1)
jinja_dir, config_file, config = read_config()
protocol = Protocol(config)
if not config.exported and len(protocol.exported_domains):
sys.stderr.write(("Domains [%s] are exported, but config is missing export "
"entry\n\n") % ", ".join(protocol.exported_domains))
exit(1)
if not os.path.exists(config.protocol.output):
os.mkdir(config.protocol.output)
if len(protocol.exported_domains) and not os.path.exists(
config.exported.output):
os.mkdir(config.exported.output)
jinja_env = initialize_jinja_env(jinja_dir, config.protocol.output, config)
inputs = []
inputs.append(__file__)
inputs.append(config_file)
inputs.append(config.protocol.path)
if config.imported:
inputs.append(config.imported.path)
templates_dir = os.path.join(module_path, "templates")
inputs.append(os.path.join(templates_dir, "TypeBuilder_h.template"))
inputs.append(os.path.join(templates_dir, "TypeBuilder_cpp.template"))
inputs.append(os.path.join(templates_dir, "Exported_h.template"))
inputs.append(os.path.join(templates_dir, "Imported_h.template"))
h_template = jinja_env.get_template("templates/TypeBuilder_h.template")
cpp_template = jinja_env.get_template("templates/TypeBuilder_cpp.template")
exported_template = jinja_env.get_template("templates/Exported_h.template")
imported_template = jinja_env.get_template("templates/Imported_h.template")
outputs = dict()
for domain in protocol.json_api["domains"]:
class_name = domain["domain"]
file_name = config.protocol.file_name_prefix + class_name
template_context = {
"protocol": protocol,
"config": config,
"domain": domain,
"join_arrays": join_arrays,
"format_include": functools.partial(format_include, config),
"format_domain_include": functools.partial(format_domain_include, config),
}
if not os.path.exists(config.protocol.output):
os.mkdir(config.protocol.output)
if len(protocol.exported_domains) and not os.path.exists(config.exported.output):
os.mkdir(config.exported.output)
jinja_env = initialize_jinja_env(jinja_dir, config.protocol.output, config)
if domain["domain"] in protocol.generate_domains:
outputs[os.path.join(config.protocol.output, to_file_name(
config, file_name + ".h"))] = h_template.render(template_context)
outputs[os.path.join(config.protocol.output, to_file_name(
config, file_name + ".cpp"))] = cpp_template.render(template_context)
if domain["domain"] in protocol.exported_domains:
outputs[os.path.join(config.exported.output, to_file_name(
config, file_name + ".h"))] = exported_template.render(
template_context)
if domain["domain"] in protocol.imported_domains:
outputs[os.path.join(config.protocol.output, to_file_name(
config, file_name + ".h"))] = imported_template.render(
template_context)
if config.lib:
template_context = {
"config": config,
"format_include": functools.partial(format_include, config),
}
inputs = []
inputs.append(__file__)
inputs.append(config_file)
inputs.append(config.protocol.path)
if config.imported:
inputs.append(config.imported.path)
templates_dir = os.path.join(module_path, "templates")
inputs.append(os.path.join(templates_dir, "TypeBuilder_h.template"))
inputs.append(os.path.join(templates_dir, "TypeBuilder_cpp.template"))
inputs.append(os.path.join(templates_dir, "Exported_h.template"))
inputs.append(os.path.join(templates_dir, "Imported_h.template"))
h_template = jinja_env.get_template("templates/TypeBuilder_h.template")
cpp_template = jinja_env.get_template("templates/TypeBuilder_cpp.template")
exported_template = jinja_env.get_template("templates/Exported_h.template")
imported_template = jinja_env.get_template("templates/Imported_h.template")
outputs = dict()
for domain in protocol.json_api["domains"]:
class_name = domain["domain"]
file_name = config.protocol.file_name_prefix + class_name
template_context = {
"protocol": protocol,
"config": config,
"domain": domain,
"join_arrays": join_arrays,
"format_include": functools.partial(format_include, config),
"format_domain_include": functools.partial(format_domain_include, config),
}
if domain["domain"] in protocol.generate_domains:
outputs[os.path.join(config.protocol.output, to_file_name(config, file_name + ".h"))] = h_template.render(template_context)
outputs[os.path.join(config.protocol.output, to_file_name(config, file_name + ".cpp"))] = cpp_template.render(template_context)
if domain["domain"] in protocol.exported_domains:
outputs[os.path.join(config.exported.output, to_file_name(config, file_name + ".h"))] = exported_template.render(template_context)
if domain["domain"] in protocol.imported_domains:
outputs[os.path.join(config.protocol.output, to_file_name(config, file_name + ".h"))] = imported_template.render(template_context)
if config.lib:
template_context = {
"config": config,
"format_include": functools.partial(format_include, config),
}
lib_templates_dir = os.path.join(module_path, "lib")
# Note these should be sorted in the right order.
# TODO(dgozman): sort them programmatically based on commented includes.
protocol_h_templates = [
"ErrorSupport_h.template",
"Values_h.template",
"Object_h.template",
"ValueConversions_h.template",
"DispatcherBase_h.template",
"Parser_h.template",
]
protocol_cpp_templates = [
"Protocol_cpp.template",
"ErrorSupport_cpp.template",
"Values_cpp.template",
"Object_cpp.template",
"DispatcherBase_cpp.template",
"Parser_cpp.template",
]
forward_h_templates = [
"Forward_h.template",
"FrontendChannel_h.template",
]
base_string_adapter_h_templates = [
"base_string_adapter_h.template",
]
base_string_adapter_cc_templates = [
"base_string_adapter_cc.template",
]
def generate_lib_file(file_name, template_files):
parts = []
for template_file in template_files:
inputs.append(os.path.join(lib_templates_dir, template_file))
template = jinja_env.get_template("lib/" + template_file)
parts.append(template.render(template_context))
outputs[file_name] = "\n\n".join(parts)
generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "Forward.h")), forward_h_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "Protocol.h")), protocol_h_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "Protocol.cpp")), protocol_cpp_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "base_string_adapter.h")), base_string_adapter_h_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(config, "base_string_adapter.cc")), base_string_adapter_cc_templates)
# Make gyp / make generatos happy, otherwise make rebuilds world.
inputs_ts = max(map(os.path.getmtime, inputs))
up_to_date = True
for output_file in outputs.keys():
if not os.path.exists(output_file) or os.path.getmtime(output_file) < inputs_ts:
up_to_date = False
break
if up_to_date:
sys.exit()
for file_name, content in outputs.items():
out_file = open(file_name, "w")
out_file.write(content)
out_file.close()
main()
lib_templates_dir = os.path.join(module_path, "lib")
# Note these should be sorted in the right order.
# TODO(dgozman): sort them programmatically based on commented includes.
protocol_h_templates = [
"ErrorSupport_h.template",
"Values_h.template",
"Object_h.template",
"ValueConversions_h.template",
"DispatcherBase_h.template",
"Parser_h.template",
]
protocol_cpp_templates = [
"Protocol_cpp.template",
"ErrorSupport_cpp.template",
"Values_cpp.template",
"Object_cpp.template",
"DispatcherBase_cpp.template",
"Parser_cpp.template",
]
forward_h_templates = [
"Forward_h.template",
"FrontendChannel_h.template",
]
base_string_adapter_h_templates = [
"base_string_adapter_h.template",
]
base_string_adapter_cc_templates = [
"base_string_adapter_cc.template",
]
def generate_lib_file(file_name, template_files):
parts = []
for template_file in template_files:
inputs.append(os.path.join(lib_templates_dir, template_file))
template = jinja_env.get_template("lib/" + template_file)
parts.append(template.render(template_context))
outputs[file_name] = "\n\n".join(parts)
generate_lib_file(os.path.join(config.lib.output, to_file_name(
config, "Forward.h")), forward_h_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(
config, "Protocol.h")), protocol_h_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(
config, "Protocol.cpp")), protocol_cpp_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(
config, "base_string_adapter.h")), base_string_adapter_h_templates)
generate_lib_file(os.path.join(config.lib.output, to_file_name(
config, "base_string_adapter.cc")), base_string_adapter_cc_templates)
# Make gyp / make generatos happy, otherwise make rebuilds world.
inputs_ts = max(map(os.path.getmtime, inputs))
up_to_date = True
for output_file in outputs.keys():
if (not os.path.exists(output_file)
or os.path.getmtime(output_file) < inputs_ts):
up_to_date = False
break
if up_to_date:
sys.exit()
for file_name, content in outputs.items():
out_file = open(file_name, "w")
out_file.write(content)
out_file.close()
if __name__ == "__main__":
main()
......@@ -7,33 +7,36 @@ import os.path
import sys
try:
import json
import json
except ImportError:
import simplejson as json
import simplejson as json
import pdl
def main(argv):
if len(argv) < 1:
sys.stderr.write("Usage: %s <protocol-1> [<protocol-2> [, <protocol-3>...]] <output-file>\n" % sys.argv[0])
return 1
domains = []
version = None
for protocol in argv[:-1]:
file_name = os.path.normpath(protocol)
if not os.path.isfile(file_name):
sys.stderr.write("Cannot find %s\n" % file_name)
return 1
input_file = open(file_name, "r")
parsed_json = pdl.loads(input_file.read(), file_name)
domains += parsed_json["domains"]
version = parsed_json["version"]
output_file = open(argv[-1], "w")
json.dump({"version": version, "domains": domains}, output_file, indent=4, sort_keys=False, separators=(',', ': '))
output_file.close()
if len(argv) < 1:
sys.stderr.write(
"Usage: %s <protocol-1> [<protocol-2> [, <protocol-3>...]] "
"<output-file>\n" % sys.argv[0])
return 1
domains = []
version = None
for protocol in argv[:-1]:
file_name = os.path.normpath(protocol)
if not os.path.isfile(file_name):
sys.stderr.write("Cannot find %s\n" % file_name)
return 1
input_file = open(file_name, "r")
parsed_json = pdl.loads(input_file.read(), file_name)
domains += parsed_json["domains"]
version = parsed_json["version"]
output_file = open(argv[-1], "w")
json.dump({"version": version, "domains": domains}, output_file,
indent=4, sort_keys=False, separators=(',', ': '))
output_file.close()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
sys.exit(main(sys.argv[1:]))
......@@ -1370,7 +1370,7 @@ class JSONEncoder : public StreamingParserHandler {
// If we have enough bytes in our input, decode the remaining ones
// belonging to this Unicode character into |codepoint|.
if (ii + num_bytes_left > chars.size())
if (ii + num_bytes_left >= chars.size())
continue;
while (num_bytes_left > 0) {
c = chars[++ii];
......
......@@ -1366,6 +1366,32 @@ TEST(JsonEncoder, OverlongEncodings) {
EXPECT_EQ("\"\"", out); // Empty string means that 0x7f was rejected (good).
}
TEST(JsonEncoder, IncompleteUtf8Sequence) {
std::string out;
Status status;
std::unique_ptr<StreamingParserHandler> writer =
NewJSONEncoder(&GetTestPlatform(), &out, &status);
writer->HandleArrayBegin(); // This emits [, which starts an array.
{ // 🌎 takes four bytes to encode in UTF-8. We test with the first three;
// This means we're trying to emit a string that consists solely of an
// incomplete UTF-8 sequence. So the string in the JSON output is emtpy.
std::string world_utf8 = "🌎";
ASSERT_EQ(4u, world_utf8.size());
std::vector<uint8_t> chars(world_utf8.begin(), world_utf8.begin() + 3);
writer->HandleString8(SpanFrom(chars));
EXPECT_EQ("[\"\"", out); // Incomplete sequence rejected: empty string.
}
{ // This time, the incomplete sequence is at the end of the string.
std::string msg = "Hello, \xF0\x9F\x8C";
std::vector<uint8_t> chars(msg.begin(), msg.end());
writer->HandleString8(SpanFrom(chars));
EXPECT_EQ("[\"\",\"Hello, \"", out); // Incomplete sequence dropped at end.
}
}
TEST(JsonStdStringWriterTest, HelloWorld) {
std::string out;
Status status;
......
......@@ -12,160 +12,167 @@ import sys
description = ''
primitiveTypes = ['integer', 'number', 'boolean', 'string', 'object', 'any', 'array', 'binary']
primitiveTypes = ['integer', 'number', 'boolean', 'string', 'object',
'any', 'array', 'binary']
def assignType(item, type, is_array=False, map_binary_to_string=False):
if is_array:
item['type'] = 'array'
item['items'] = collections.OrderedDict()
assignType(item['items'], type, False, map_binary_to_string)
return
if type == 'enum':
type = 'string'
if map_binary_to_string and type == 'binary':
type = 'string'
if type in primitiveTypes:
item['type'] = type
else:
item['$ref'] = type
if is_array:
item['type'] = 'array'
item['items'] = collections.OrderedDict()
assignType(item['items'], type, False, map_binary_to_string)
return
if type == 'enum':
type = 'string'
if map_binary_to_string and type == 'binary':
type = 'string'
if type in primitiveTypes:
item['type'] = type
else:
item['$ref'] = type
def createItem(d, experimental, deprecated, name=None):
result = collections.OrderedDict(d)
if name:
result['name'] = name
global description
if description:
result['description'] = description.strip()
if experimental:
result['experimental'] = True
if deprecated:
result['deprecated'] = True
return result
result = collections.OrderedDict(d)
if name:
result['name'] = name
global description
if description:
result['description'] = description.strip()
if experimental:
result['experimental'] = True
if deprecated:
result['deprecated'] = True
return result
def parse(data, file_name, map_binary_to_string=False):
protocol = collections.OrderedDict()
protocol['version'] = collections.OrderedDict()
protocol['domains'] = []
domain = None
item = None
subitems = None
nukeDescription = False
global description
lines = data.split('\n')
for i in range(0, len(lines)):
if nukeDescription:
description = ''
nukeDescription = False
line = lines[i]
trimLine = line.strip()
if trimLine.startswith('#'):
if len(description):
description += '\n'
description += trimLine[2:]
continue
protocol = collections.OrderedDict()
protocol['version'] = collections.OrderedDict()
protocol['domains'] = []
domain = None
item = None
subitems = None
nukeDescription = False
global description
lines = data.split('\n')
for i in range(0, len(lines)):
if nukeDescription:
description = ''
nukeDescription = False
line = lines[i]
trimLine = line.strip()
if trimLine.startswith('#'):
if len(description):
description += '\n'
description += trimLine[2:]
continue
else:
nukeDescription = True
if len(trimLine) == 0:
continue
match = re.compile(
r'^(experimental )?(deprecated )?domain (.*)').match(line)
if match:
domain = createItem({'domain' : match.group(3)}, match.group(1),
match.group(2))
protocol['domains'].append(domain)
continue
match = re.compile(r'^ depends on ([^\s]+)').match(line)
if match:
if 'dependencies' not in domain:
domain['dependencies'] = []
domain['dependencies'].append(match.group(1))
continue
match = re.compile(r'^ (experimental )?(deprecated )?type (.*) '
r'extends (array of )?([^\s]+)').match(line)
if match:
if 'types' not in domain:
domain['types'] = []
item = createItem({'id': match.group(3)}, match.group(1), match.group(2))
assignType(item, match.group(5), match.group(4), map_binary_to_string)
domain['types'].append(item)
continue
match = re.compile(
r'^ (experimental )?(deprecated )?(command|event) (.*)').match(line)
if match:
list = []
if match.group(3) == 'command':
if 'commands' in domain:
list = domain['commands']
else:
list = domain['commands'] = []
else:
if 'events' in domain:
list = domain['events']
else:
nukeDescription = True
if len(trimLine) == 0:
continue
match = re.compile(r'^(experimental )?(deprecated )?domain (.*)').match(line)
if match:
domain = createItem({'domain' : match.group(3)}, match.group(1), match.group(2))
protocol['domains'].append(domain)
continue
match = re.compile(r'^ depends on ([^\s]+)').match(line)
if match:
if 'dependencies' not in domain:
domain['dependencies'] = []
domain['dependencies'].append(match.group(1))
continue
match = re.compile(r'^ (experimental )?(deprecated )?type (.*) extends (array of )?([^\s]+)').match(line)
if match:
if 'types' not in domain:
domain['types'] = []
item = createItem({'id': match.group(3)}, match.group(1), match.group(2))
assignType(item, match.group(5), match.group(4), map_binary_to_string)
domain['types'].append(item)
continue
match = re.compile(r'^ (experimental )?(deprecated )?(command|event) (.*)').match(line)
if match:
list = []
if match.group(3) == 'command':
if 'commands' in domain:
list = domain['commands']
else:
list = domain['commands'] = []
else:
if 'events' in domain:
list = domain['events']
else:
list = domain['events'] = []
item = createItem({}, match.group(1), match.group(2), match.group(4))
list.append(item)
continue
match = re.compile(r'^ (experimental )?(deprecated )?(optional )?(array of )?([^\s]+) ([^\s]+)').match(line)
if match:
param = createItem({}, match.group(1), match.group(2), match.group(6))
if match.group(3):
param['optional'] = True
assignType(param, match.group(5), match.group(4), map_binary_to_string)
if match.group(5) == 'enum':
enumliterals = param['enum'] = []
subitems.append(param)
continue
match = re.compile(r'^ (parameters|returns|properties)').match(line)
if match:
subitems = item[match.group(1)] = []
continue
match = re.compile(r'^ enum').match(line)
if match:
enumliterals = item['enum'] = []
continue
match = re.compile(r'^version').match(line)
if match:
continue
match = re.compile(r'^ major (\d+)').match(line)
if match:
protocol['version']['major'] = match.group(1)
continue
match = re.compile(r'^ minor (\d+)').match(line)
if match:
protocol['version']['minor'] = match.group(1)
continue
match = re.compile(r'^ redirect ([^\s]+)').match(line)
if match:
item['redirect'] = match.group(1)
continue
match = re.compile(r'^ ( )?[^\s]+$').match(line)
if match:
# enum literal
enumliterals.append(trimLine)
continue
print('Error in %s:%s, illegal token: \t%s' % (file_name, i, line))
sys.exit(1)
return protocol
list = domain['events'] = []
item = createItem({}, match.group(1), match.group(2), match.group(4))
list.append(item)
continue
match = re.compile(
r'^ (experimental )?(deprecated )?(optional )?'
r'(array of )?([^\s]+) ([^\s]+)').match(line)
if match:
param = createItem({}, match.group(1), match.group(2), match.group(6))
if match.group(3):
param['optional'] = True
assignType(param, match.group(5), match.group(4), map_binary_to_string)
if match.group(5) == 'enum':
enumliterals = param['enum'] = []
subitems.append(param)
continue
match = re.compile(r'^ (parameters|returns|properties)').match(line)
if match:
subitems = item[match.group(1)] = []
continue
match = re.compile(r'^ enum').match(line)
if match:
enumliterals = item['enum'] = []
continue
match = re.compile(r'^version').match(line)
if match:
continue
match = re.compile(r'^ major (\d+)').match(line)
if match:
protocol['version']['major'] = match.group(1)
continue
match = re.compile(r'^ minor (\d+)').match(line)
if match:
protocol['version']['minor'] = match.group(1)
continue
match = re.compile(r'^ redirect ([^\s]+)').match(line)
if match:
item['redirect'] = match.group(1)
continue
match = re.compile(r'^ ( )?[^\s]+$').match(line)
if match:
# enum literal
enumliterals.append(trimLine)
continue
print('Error in %s:%s, illegal token: \t%s' % (file_name, i, line))
sys.exit(1)
return protocol
def loads(data, file_name, map_binary_to_string=False):
if file_name.endswith(".pdl"):
return parse(data, file_name, map_binary_to_string)
return json.loads(data)
if file_name.endswith(".pdl"):
return parse(data, file_name, map_binary_to_string)
return json.loads(data)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment