""" A simple, fast, extensible JSON encoder JSON (JavaScript Object Notation) is a subset of JavaScript syntax (ECMA-262 3rd edition) used as a lightweight data interchange format.""" """ This is a stripped-down version of simplejson by Bob Ippolito, http://undefined.org/python/ Copyright (c) 2006 Bob Ippolito Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import re, sys ESCAPE = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t]') ESCAPE_ASCII = re.compile(r'([\\"]|[^\ -~])') HAS_UTF8 = re.compile(r'[\x80-\xff]') ESCAPE_DCT = { '\\': '\\\\', '"': '\\"', '\b': '\\b', '\f': '\\f', '\n': '\\n', '\r': '\\r', '\t': '\\t', } for i in range(0x20): ESCAPE_DCT.setdefault(chr(i), '\\u%04x' % (i,)) # Assume this produces an infinity on all machines (probably not guaranteed) INFINITY = float('1e66666') FLOAT_REPR = repr def floatstr(o, allow_nan=True): if o != o: text = 'NaN' elif o == INFINITY: text = 'Infinity' elif o == -INFINITY: text = '-Infinity' else: return FLOAT_REPR(o) if not allow_nan: raise ValueError("Out of range float values are not JSON compliant: %r" % (o,)) return text def encode_basestring(s): def replace(match): return ESCAPE_DCT[match.group(0)] return '"' + ESCAPE.sub(replace, s) + '"' def py_encode_basestring_ascii(s): if isinstance(s, str) and HAS_UTF8.search(s) is not None: s = s.decode('utf-8') def replace(match): s = match.group(0) try: return ESCAPE_DCT[s] except KeyError: n = ord(s) if n < 0x10000: return '\\u%04x' % (n,) else: # surrogate pair n -= 0x10000 s1 = 0xd800 | ((n >> 10) & 0x3ff) s2 = 0xdc00 | (n & 0x3ff) return '\\u%04x\\u%04x' % (s1, s2) return '"' + str(ESCAPE_ASCII.sub(replace, s)) + '"' encode_basestring_ascii = py_encode_basestring_ascii class JSONEncoder(object): item_separator = ', ' key_separator = ': ' def __init__(self, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, encoding='utf-8', default=None): self.skipkeys = skipkeys self.ensure_ascii = ensure_ascii self.check_circular = check_circular self.allow_nan = allow_nan self.sort_keys = sort_keys self.indent = indent self.current_indent_level = 0 if separators is not None: self.item_separator, self.key_separator = separators if default is not None: self.default = default self.encoding = encoding def _newline_indent(self): return '\n' + (' ' * (self.indent * self.current_indent_level)) def _iterencode_list(self, lst, markers=None): if not lst: yield '[]' return if markers is not None: markerid = id(lst) if markerid in markers: raise ValueError("Circular reference detected") markers[markerid] = lst yield '[' if self.indent is not None: self.current_indent_level += 1 newline_indent = self._newline_indent() separator = self.item_separator + newline_indent yield newline_indent else: newline_indent = None separator = self.item_separator first = True for value in lst: if first: first = False else: yield separator for chunk in self._iterencode(value, markers): yield chunk if newline_indent is not None: self.current_indent_level -= 1 yield self._newline_indent() yield ']' if markers is not None: del markers[markerid] def _iterencode_dict(self, dct, markers=None): if not dct: yield '{}' return if markers is not None: markerid = id(dct) if markerid in markers: raise ValueError("Circular reference detected") markers[markerid] = dct yield '{' key_separator = self.key_separator if self.indent is not None: self.current_indent_level += 1 newline_indent = self._newline_indent() item_separator = self.item_separator + newline_indent yield newline_indent else: newline_indent = None item_separator = self.item_separator first = True if self.ensure_ascii: encoder = encode_basestring_ascii else: encoder = encode_basestring allow_nan = self.allow_nan if self.sort_keys: keys = dct.keys() keys.sort() items = [(k, dct[k]) for k in keys] else: items = dct.iteritems() _encoding = self.encoding _do_decode = (_encoding is not None and not (_encoding == 'utf-8')) for key, value in items: if isinstance(key, str): if _do_decode: key = key.decode(_encoding) elif isinstance(key, basestring): pass # JavaScript is weakly typed for these, so it makes sense to # also allow them. Many encoders seem to do something like this. elif isinstance(key, float): key = floatstr(key, allow_nan) elif isinstance(key, (int, long)): key = str(key) elif key is True: key = 'true' elif key is False: key = 'false' elif key is None: key = 'null' elif self.skipkeys: continue else: raise TypeError("key %r is not a string" % (key,)) if first: first = False else: yield item_separator yield encoder(key) yield key_separator for chunk in self._iterencode(value, markers): yield chunk if newline_indent is not None: self.current_indent_level -= 1 yield self._newline_indent() yield '}' if markers is not None: del markers[markerid] def _iterencode(self, o, markers=None): if isinstance(o, basestring): if self.ensure_ascii: encoder = encode_basestring_ascii else: encoder = encode_basestring _encoding = self.encoding if (_encoding is not None and isinstance(o, str) and not (_encoding == 'utf-8')): o = o.decode(_encoding) yield encoder(o) elif o is None: yield 'null' elif o is True: yield 'true' elif o is False: yield 'false' elif isinstance(o, (int, long)): yield str(o) elif isinstance(o, float): yield floatstr(o, self.allow_nan) elif isinstance(o, (list, tuple)): for chunk in self._iterencode_list(o, markers): yield chunk elif isinstance(o, dict): for chunk in self._iterencode_dict(o, markers): yield chunk else: if markers is not None: markerid = id(o) if markerid in markers: raise ValueError("Circular reference detected") markers[markerid] = o for chunk in self._iterencode_default(o, markers): yield chunk if markers is not None: del markers[markerid] def _iterencode_default(self, o, markers=None): newobj = self.default(o) return self._iterencode(newobj, markers) def default(self, o): raise TypeError("%r is not JSON serializable" % (o,)) def encode(self, o): if isinstance(o, basestring): if isinstance(o, str): _encoding = self.encoding if (_encoding is not None and not (_encoding == 'utf-8')): o = o.decode(_encoding) if self.ensure_ascii: return encode_basestring_ascii(o) else: return encode_basestring(o) chunks = list(self.iterencode(o)) return ''.join(chunks) def iterencode(self, o): if self.check_circular: markers = {} else: markers = None return self._iterencode(o, markers) _default_encoder = JSONEncoder( skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, indent=None, separators=None, encoding='utf-8', default=None, ) """ Implementation of JSONDecoder """ import sre_parse import sre_compile import sre_constants from sre_constants import BRANCH, SUBPATTERN FLAGS = (re.VERBOSE | re.MULTILINE | re.DOTALL) class Scanner(object): def __init__(self, lexicon, flags=FLAGS): self.actions = [None] # Combine phrases into a compound pattern s = sre_parse.Pattern() s.flags = flags p = [] for idx, token in enumerate(lexicon): phrase = token.pattern try: subpattern = sre_parse.SubPattern(s, [(SUBPATTERN, (idx + 1, sre_parse.parse(phrase, flags)))]) except sre_constants.error: raise p.append(subpattern) self.actions.append(token) s.groups = len(p) + 1 # NOTE(guido): Added to make SRE validation work p = sre_parse.SubPattern(s, [(BRANCH, (None, p))]) self.scanner = sre_compile.compile(p) def iterscan(self, string, idx=0, context=None): """ Yield match, end_idx for each match """ match = self.scanner.scanner(string, idx).match actions = self.actions lastend = idx end = len(string) while True: m = match() if m is None: break matchbegin, matchend = m.span() if lastend == matchend: break action = actions[m.lastindex] if action is not None: rval, next_pos = action(m, context) if next_pos is not None and next_pos != matchend: # "fast forward" the scanner matchend = next_pos match = self.scanner.scanner(string, matchend).match yield rval, matchend lastend = matchend def pattern(pattern, flags=FLAGS): def decorator(fn): fn.pattern = pattern fn.regex = re.compile(pattern, flags) return fn return decorator def _floatconstants(): import struct import sys _BYTES = '7FF80000000000007FF0000000000000'.decode('hex') if sys.byteorder != 'big': _BYTES = _BYTES[:8][::-1] + _BYTES[8:][::-1] nan, inf = struct.unpack('dd', _BYTES) return nan, inf, -inf NaN, PosInf, NegInf = _floatconstants() def linecol(doc, pos): lineno = doc.count('\n', 0, pos) + 1 if lineno == 1: colno = pos else: colno = pos - doc.rindex('\n', 0, pos) return lineno, colno def errmsg(msg, doc, pos, end=None): lineno, colno = linecol(doc, pos) if end is None: return '%s: line %d column %d (char %d)' % (msg, lineno, colno, pos) endlineno, endcolno = linecol(doc, end) return '%s: line %d column %d - line %d column %d (char %d - %d)' % ( msg, lineno, colno, endlineno, endcolno, pos, end) _CONSTANTS = { '-Infinity': NegInf, 'Infinity': PosInf, 'NaN': NaN, 'true': True, 'false': False, 'null': None, } def JSONConstant(match, context, c=_CONSTANTS): s = match.group(0) fn = getattr(context, 'parse_constant', None) if fn is None: rval = c[s] else: rval = fn(s) return rval, None pattern('(-?Infinity|NaN|true|false|null)')(JSONConstant) def JSONNumber(match, context): match = JSONNumber.regex.match(match.string, *match.span()) integer, frac, exp = match.groups() if frac or exp: fn = getattr(context, 'parse_float', None) or float res = fn(integer + (frac or '') + (exp or '')) else: fn = getattr(context, 'parse_int', None) or int res = fn(integer) return res, None pattern(r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?')(JSONNumber) STRINGCHUNK = re.compile(r'(.*?)(["\\\x00-\x1f])', FLAGS) BACKSLASH = { '"': u'"', '\\': u'\\', '/': u'/', 'b': u'\b', 'f': u'\f', 'n': u'\n', 'r': u'\r', 't': u'\t', } DEFAULT_ENCODING = "utf-8" def py_scanstring(s, end, encoding=None, strict=True, _b=BACKSLASH, _m=STRINGCHUNK.match): if encoding is None: encoding = DEFAULT_ENCODING chunks = [] _append = chunks.append begin = end - 1 while 1: chunk = _m(s, end) if chunk is None: raise ValueError( errmsg("Unterminated string starting at", s, begin)) end = chunk.end() content, terminator = chunk.groups() if content: if not isinstance(content, unicode): content = unicode(content, encoding) _append(content) if terminator == '"': break elif terminator != '\\': if strict: raise ValueError(errmsg("Invalid control character %r at", s, end)) else: _append(terminator) continue try: esc = s[end] except IndexError: raise ValueError( errmsg("Unterminated string starting at", s, begin)) if esc != 'u': try: m = _b[esc] except KeyError: raise ValueError( errmsg("Invalid \\escape: %r" % (esc,), s, end)) end += 1 else: esc = s[end + 1:end + 5] next_end = end + 5 msg = "Invalid \\uXXXX escape" try: if len(esc) != 4: raise ValueError uni = int(esc, 16) if 0xd800 <= uni <= 0xdbff and sys.maxunicode > 65535: msg = "Invalid \\uXXXX\\uXXXX surrogate pair" if not s[end + 5:end + 7] == '\\u': raise ValueError esc2 = s[end + 7:end + 11] if len(esc2) != 4: raise ValueError uni2 = int(esc2, 16) uni = 0x10000 + (((uni - 0xd800) << 10) | (uni2 - 0xdc00)) next_end += 6 m = unichr(uni) except ValueError: raise ValueError(errmsg(msg, s, end)) end = next_end _append(m) return u''.join(chunks), end scanstring = py_scanstring def JSONString(match, context): encoding = getattr(context, 'encoding', None) strict = getattr(context, 'strict', True) return scanstring(match.string, match.end(), encoding, strict) pattern(r'"')(JSONString) WHITESPACE = re.compile(r'\s*', FLAGS) def JSONObject(match, context, _w=WHITESPACE.match): pairs = {} s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] # Trivial empty object if nextchar == '}': return pairs, end + 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end)) end += 1 encoding = getattr(context, 'encoding', None) strict = getattr(context, 'strict', True) iterscan = JSONScanner.iterscan while True: key, end = scanstring(s, end, encoding, strict) end = _w(s, end).end() if s[end:end + 1] != ':': raise ValueError(errmsg("Expecting : delimiter", s, end)) end = _w(s, end + 1).end() try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) pairs[key] = value end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == '}': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end - 1)) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end - 1)) object_hook = getattr(context, 'object_hook', None) if object_hook is not None: pairs = object_hook(pairs) return pairs, end pattern(r'{')(JSONObject) def JSONArray(match, context, _w=WHITESPACE.match): values = [] s = match.string end = _w(s, match.end()).end() # Look-ahead for trivial empty array nextchar = s[end:end + 1] if nextchar == ']': return values, end + 1 iterscan = JSONScanner.iterscan while True: try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) values.append(value) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == ']': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end)) end = _w(s, end).end() return values, end pattern(r'\[')(JSONArray) ANYTHING = [ JSONObject, JSONArray, JSONString, JSONConstant, JSONNumber, ] JSONScanner = Scanner(ANYTHING) class JSONDecoder(object): _scanner = Scanner(ANYTHING) __all__ = ['__init__', 'decode', 'raw_decode'] def __init__(self, encoding=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, strict=True): self.encoding = encoding self.object_hook = object_hook self.parse_float = parse_float self.parse_int = parse_int self.parse_constant = parse_constant self.strict = strict def decode(self, s, _w=WHITESPACE.match): obj, end = self.raw_decode(s, idx=_w(s, 0).end()) end = _w(s, end).end() if end != len(s): raise ValueError(errmsg("Extra data", s, end, len(s))) return obj def raw_decode(self, s, **kw): kw.setdefault('context', self) try: obj, end = self._scanner.iterscan(s, **kw).next() except StopIteration: raise ValueError("No JSON object could be decoded") return obj, end _default_decoder = JSONDecoder(encoding=None, object_hook=None) # public functions def dumps(obj, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, encoding='utf-8', default=None, **kw): if (skipkeys is False and ensure_ascii is True and check_circular is True and allow_nan is True and cls is None and indent is None and separators is None and encoding == 'utf-8' and default is None and not kw): return _default_encoder.encode(obj) if cls is None: cls = JSONEncoder return cls( skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, encoding=encoding, default=default, **kw).encode(obj) def loads(s, encoding=None, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, **kw): if (cls is None and encoding is None and object_hook is None and parse_int is None and parse_float is None and parse_constant is None and not kw): return _default_decoder.decode(s) if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook if parse_float is not None: kw['parse_float'] = parse_float if parse_int is not None: kw['parse_int'] = parse_int if parse_constant is not None: kw['parse_constant'] = parse_constant return cls(encoding=encoding, **kw).decode(s)