r""" A simple, fast, extensible JSON encoder JSON (JavaScript Object Notation) is a subset of JavaScript syntax (ECMA-262 3rd edition) used as a lightweight data interchange format.""" # This is a stripped-down version of simplejson # by Bob Ippolito, http://undefined.org/python/ import re, sys ESCAPE = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t]') ESCAPE_ASCII = re.compile(r'([\\"]|[^\ -~])') HAS_UTF8 = re.compile(r'[\x80-\xff]') ESCAPE_DCT = { '\\': '\\\\', '"': '\\"', '\b': '\\b', '\f': '\\f', '\n': '\\n', '\r': '\\r', '\t': '\\t', } for i in range(0x20): ESCAPE_DCT.setdefault(chr(i), '\\u%04x' % (i,)) # Assume this produces an infinity on all machines (probably not guaranteed) INFINITY = float('1e66666') FLOAT_REPR = repr def floatstr(o, allow_nan=True): if o != o: text = 'NaN' elif o == INFINITY: text = 'Infinity' elif o == -INFINITY: text = '-Infinity' else: return FLOAT_REPR(o) if not allow_nan: raise ValueError("Out of range float values are not JSON compliant: %r" % (o,)) return text def encode_basestring(s): def replace(match): return ESCAPE_DCT[match.group(0)] return '"' + ESCAPE.sub(replace, s) + '"' def py_encode_basestring_ascii(s): if isinstance(s, str) and HAS_UTF8.search(s) is not None: s = s.decode('utf-8') def replace(match): s = match.group(0) try: return ESCAPE_DCT[s] except KeyError: n = ord(s) if n < 0x10000: return '\\u%04x' % (n,) else: # surrogate pair n -= 0x10000 s1 = 0xd800 | ((n >> 10) & 0x3ff) s2 = 0xdc00 | (n & 0x3ff) return '\\u%04x\\u%04x' % (s1, s2) return '"' + str(ESCAPE_ASCII.sub(replace, s)) + '"' encode_basestring_ascii = py_encode_basestring_ascii class JSONEncoder(object): item_separator = ', ' key_separator = ': ' def __init__(self, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, encoding='utf-8', default=None): self.skipkeys = skipkeys self.ensure_ascii = ensure_ascii self.check_circular = check_circular self.allow_nan = allow_nan self.sort_keys = sort_keys self.indent = indent self.current_indent_level = 0 if separators is not None: self.item_separator, self.key_separator = separators if default is not None: self.default = default self.encoding = encoding def _newline_indent(self): return '\n' + (' ' * (self.indent * self.current_indent_level)) def _iterencode_list(self, lst, markers=None): if not lst: yield '[]' return if markers is not None: markerid = id(lst) if markerid in markers: raise ValueError("Circular reference detected") markers[markerid] = lst yield '[' if self.indent is not None: self.current_indent_level += 1 newline_indent = self._newline_indent() separator = self.item_separator + newline_indent yield newline_indent else: newline_indent = None separator = self.item_separator first = True for value in lst: if first: first = False else: yield separator for chunk in self._iterencode(value, markers): yield chunk if newline_indent is not None: self.current_indent_level -= 1 yield self._newline_indent() yield ']' if markers is not None: del markers[markerid] def _iterencode_dict(self, dct, markers=None): if not dct: yield '{}' return if markers is not None: markerid = id(dct) if markerid in markers: raise ValueError("Circular reference detected") markers[markerid] = dct yield '{' key_separator = self.key_separator if self.indent is not None: self.current_indent_level += 1 newline_indent = self._newline_indent() item_separator = self.item_separator + newline_indent yield newline_indent else: newline_indent = None item_separator = self.item_separator first = True if self.ensure_ascii: encoder = encode_basestring_ascii else: encoder = encode_basestring allow_nan = self.allow_nan if self.sort_keys: keys = dct.keys() keys.sort() items = [(k, dct[k]) for k in keys] else: items = dct.iteritems() _encoding = self.encoding _do_decode = (_encoding is not None and not (_encoding == 'utf-8')) for key, value in items: if isinstance(key, str): if _do_decode: key = key.decode(_encoding) elif isinstance(key, basestring): pass # JavaScript is weakly typed for these, so it makes sense to # also allow them. Many encoders seem to do something like this. elif isinstance(key, float): key = floatstr(key, allow_nan) elif isinstance(key, (int, long)): key = str(key) elif key is True: key = 'true' elif key is False: key = 'false' elif key is None: key = 'null' elif self.skipkeys: continue else: raise TypeError("key %r is not a string" % (key,)) if first: first = False else: yield item_separator yield encoder(key) yield key_separator for chunk in self._iterencode(value, markers): yield chunk if newline_indent is not None: self.current_indent_level -= 1 yield self._newline_indent() yield '}' if markers is not None: del markers[markerid] def _iterencode(self, o, markers=None): if isinstance(o, basestring): if self.ensure_ascii: encoder = encode_basestring_ascii else: encoder = encode_basestring _encoding = self.encoding if (_encoding is not None and isinstance(o, str) and not (_encoding == 'utf-8')): o = o.decode(_encoding) yield encoder(o) elif o is None: yield 'null' elif o is True: yield 'true' elif o is False: yield 'false' elif isinstance(o, (int, long)): yield str(o) elif isinstance(o, float): yield floatstr(o, self.allow_nan) elif isinstance(o, (list, tuple)): for chunk in self._iterencode_list(o, markers): yield chunk elif isinstance(o, dict): for chunk in self._iterencode_dict(o, markers): yield chunk else: if markers is not None: markerid = id(o) if markerid in markers: raise ValueError("Circular reference detected") markers[markerid] = o for chunk in self._iterencode_default(o, markers): yield chunk if markers is not None: del markers[markerid] def _iterencode_default(self, o, markers=None): newobj = self.default(o) return self._iterencode(newobj, markers) def default(self, o): raise TypeError("%r is not JSON serializable" % (o,)) def encode(self, o): if isinstance(o, basestring): if isinstance(o, str): _encoding = self.encoding if (_encoding is not None and not (_encoding == 'utf-8')): o = o.decode(_encoding) if self.ensure_ascii: return encode_basestring_ascii(o) else: return encode_basestring(o) chunks = list(self.iterencode(o)) return ''.join(chunks) def iterencode(self, o): if self.check_circular: markers = {} else: markers = None return self._iterencode(o, markers) _default_encoder = JSONEncoder( skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, indent=None, separators=None, encoding='utf-8', default=None, ) """ Implementation of JSONDecoder """ import sre_parse import sre_compile import sre_constants from sre_constants import BRANCH, SUBPATTERN FLAGS = (re.VERBOSE | re.MULTILINE | re.DOTALL) class Scanner(object): def __init__(self, lexicon, flags=FLAGS): self.actions = [None] # Combine phrases into a compound pattern s = sre_parse.Pattern() s.flags = flags p = [] for idx, token in enumerate(lexicon): phrase = token.pattern try: subpattern = sre_parse.SubPattern(s, [(SUBPATTERN, (idx + 1, sre_parse.parse(phrase, flags)))]) except sre_constants.error: raise p.append(subpattern) self.actions.append(token) s.groups = len(p) + 1 # NOTE(guido): Added to make SRE validation work p = sre_parse.SubPattern(s, [(BRANCH, (None, p))]) self.scanner = sre_compile.compile(p) def iterscan(self, string, idx=0, context=None): """ Yield match, end_idx for each match """ match = self.scanner.scanner(string, idx).match actions = self.actions lastend = idx end = len(string) while True: m = match() if m is None: break matchbegin, matchend = m.span() if lastend == matchend: break action = actions[m.lastindex] if action is not None: rval, next_pos = action(m, context) if next_pos is not None and next_pos != matchend: # "fast forward" the scanner matchend = next_pos match = self.scanner.scanner(string, matchend).match yield rval, matchend lastend = matchend def pattern(pattern, flags=FLAGS): def decorator(fn): fn.pattern = pattern fn.regex = re.compile(pattern, flags) return fn return decorator def _floatconstants(): import struct import sys _BYTES = '7FF80000000000007FF0000000000000'.decode('hex') if sys.byteorder != 'big': _BYTES = _BYTES[:8][::-1] + _BYTES[8:][::-1] nan, inf = struct.unpack('dd', _BYTES) return nan, inf, -inf NaN, PosInf, NegInf = _floatconstants() def linecol(doc, pos): lineno = doc.count('\n', 0, pos) + 1 if lineno == 1: colno = pos else: colno = pos - doc.rindex('\n', 0, pos) return lineno, colno def errmsg(msg, doc, pos, end=None): lineno, colno = linecol(doc, pos) if end is None: return '%s: line %d column %d (char %d)' % (msg, lineno, colno, pos) endlineno, endcolno = linecol(doc, end) return '%s: line %d column %d - line %d column %d (char %d - %d)' % ( msg, lineno, colno, endlineno, endcolno, pos, end) _CONSTANTS = { '-Infinity': NegInf, 'Infinity': PosInf, 'NaN': NaN, 'true': True, 'false': False, 'null': None, } def JSONConstant(match, context, c=_CONSTANTS): s = match.group(0) fn = getattr(context, 'parse_constant', None) if fn is None: rval = c[s] else: rval = fn(s) return rval, None pattern('(-?Infinity|NaN|true|false|null)')(JSONConstant) def JSONNumber(match, context): match = JSONNumber.regex.match(match.string, *match.span()) integer, frac, exp = match.groups() if frac or exp: fn = getattr(context, 'parse_float', None) or float res = fn(integer + (frac or '') + (exp or '')) else: fn = getattr(context, 'parse_int', None) or int res = fn(integer) return res, None pattern(r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?')(JSONNumber) STRINGCHUNK = re.compile(r'(.*?)(["\\\x00-\x1f])', FLAGS) BACKSLASH = { '"': u'"', '\\': u'\\', '/': u'/', 'b': u'\b', 'f': u'\f', 'n': u'\n', 'r': u'\r', 't': u'\t', } DEFAULT_ENCODING = "utf-8" def py_scanstring(s, end, encoding=None, strict=True, _b=BACKSLASH, _m=STRINGCHUNK.match): if encoding is None: encoding = DEFAULT_ENCODING chunks = [] _append = chunks.append begin = end - 1 while 1: chunk = _m(s, end) if chunk is None: raise ValueError( errmsg("Unterminated string starting at", s, begin)) end = chunk.end() content, terminator = chunk.groups() if content: if not isinstance(content, unicode): content = unicode(content, encoding) _append(content) if terminator == '"': break elif terminator != '\\': if strict: raise ValueError(errmsg("Invalid control character %r at", s, end)) else: _append(terminator) continue try: esc = s[end] except IndexError: raise ValueError( errmsg("Unterminated string starting at", s, begin)) if esc != 'u': try: m = _b[esc] except KeyError: raise ValueError( errmsg("Invalid \\escape: %r" % (esc,), s, end)) end += 1 else: esc = s[end + 1:end + 5] next_end = end + 5 msg = "Invalid \\uXXXX escape" try: if len(esc) != 4: raise ValueError uni = int(esc, 16) if 0xd800 <= uni <= 0xdbff and sys.maxunicode > 65535: msg = "Invalid \\uXXXX\\uXXXX surrogate pair" if not s[end + 5:end + 7] == '\\u': raise ValueError esc2 = s[end + 7:end + 11] if len(esc2) != 4: raise ValueError uni2 = int(esc2, 16) uni = 0x10000 + (((uni - 0xd800) << 10) | (uni2 - 0xdc00)) next_end += 6 m = unichr(uni) except ValueError: raise ValueError(errmsg(msg, s, end)) end = next_end _append(m) return u''.join(chunks), end scanstring = py_scanstring def JSONString(match, context): encoding = getattr(context, 'encoding', None) strict = getattr(context, 'strict', True) return scanstring(match.string, match.end(), encoding, strict) pattern(r'"')(JSONString) WHITESPACE = re.compile(r'\s*', FLAGS) def JSONObject(match, context, _w=WHITESPACE.match): pairs = {} s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] # Trivial empty object if nextchar == '}': return pairs, end + 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end)) end += 1 encoding = getattr(context, 'encoding', None) strict = getattr(context, 'strict', True) iterscan = JSONScanner.iterscan while True: key, end = scanstring(s, end, encoding, strict) end = _w(s, end).end() if s[end:end + 1] != ':': raise ValueError(errmsg("Expecting : delimiter", s, end)) end = _w(s, end + 1).end() try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) pairs[key] = value end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == '}': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end - 1)) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end - 1)) object_hook = getattr(context, 'object_hook', None) if object_hook is not None: pairs = object_hook(pairs) return pairs, end pattern(r'{')(JSONObject) def JSONArray(match, context, _w=WHITESPACE.match): values = [] s = match.string end = _w(s, match.end()).end() # Look-ahead for trivial empty array nextchar = s[end:end + 1] if nextchar == ']': return values, end + 1 iterscan = JSONScanner.iterscan while True: try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) values.append(value) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == ']': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end)) end = _w(s, end).end() return values, end pattern(r'\[')(JSONArray) ANYTHING = [ JSONObject, JSONArray, JSONString, JSONConstant, JSONNumber, ] JSONScanner = Scanner(ANYTHING) class JSONDecoder(object): _scanner = Scanner(ANYTHING) __all__ = ['__init__', 'decode', 'raw_decode'] def __init__(self, encoding=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, strict=True): self.encoding = encoding self.object_hook = object_hook self.parse_float = parse_float self.parse_int = parse_int self.parse_constant = parse_constant self.strict = strict def decode(self, s, _w=WHITESPACE.match): obj, end = self.raw_decode(s, idx=_w(s, 0).end()) end = _w(s, end).end() if end != len(s): raise ValueError(errmsg("Extra data", s, end, len(s))) return obj def raw_decode(self, s, **kw): kw.setdefault('context', self) try: obj, end = self._scanner.iterscan(s, **kw).next() except StopIteration: raise ValueError("No JSON object could be decoded") return obj, end _default_decoder = JSONDecoder(encoding=None, object_hook=None) # public functions def dumps(obj, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, encoding='utf-8', default=None, **kw): if (skipkeys is False and ensure_ascii is True and check_circular is True and allow_nan is True and cls is None and indent is None and separators is None and encoding == 'utf-8' and default is None and not kw): return _default_encoder.encode(obj) if cls is None: cls = JSONEncoder return cls( skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, encoding=encoding, default=default, **kw).encode(obj) def loads(s, encoding=None, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, **kw): if (cls is None and encoding is None and object_hook is None and parse_int is None and parse_float is None and parse_constant is None and not kw): return _default_decoder.decode(s) if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook if parse_float is not None: kw['parse_float'] = parse_float if parse_int is not None: kw['parse_int'] = parse_int if parse_constant is not None: kw['parse_constant'] = parse_constant return cls(encoding=encoding, **kw).decode(s)