import json import re import os def ensure_json_values(bson_str): def replace_special_types(match): prefix = match.group(1) key = match.group(2) value = match.group(3).replace('"', "") suffix = match.group(4) comma_or_newline = match.group(5) return f'{prefix}{key}{value}{suffix}{comma_or_newline}' patterns = { 'normal': re.compile(r'([ \t]*)(\"[^\"]*\"\s*:\s*)([^\",\{\}\[\]\s].*?)([,\n])'), 'special_types': re.compile( r'([ \t]*)(\"(?:ObjectId|ISODate|NumberInt|NumberLong|NumberDecimal|Binary|Boolean|Timestamp|RegExp|DBRef|JavaScript code|Symbol|MinKey|MaxKey)\()\"([^\"\)]*)\"(\).+?)([,\n])') } # 使用正则表达式替换非特殊类型的值 bson_str = re.sub(patterns['normal'], lambda match: f'{match.group(1)}{match.group(2)}"{match.group(3)}"{match.group(4)}', bson_str) # 使用定义的函数替换特殊类型的值 bson_str = re.sub(patterns['special_types'], replace_special_types, bson_str) return bson_str def analyze_json_objects(file_path): json_objects = [] with open(file_path, 'r') as file: content = ensure_json_values(file.read()) json_strings = re.split(r'\n\s*\n', content.strip()) for js in json_strings: if js: try: json_obj = json.loads(js) json_objects.append(json_obj) except json.JSONDecodeError: print(f'Error decoding JSON from string: {js}') id_to_json_map = {} for jsonObj in json_objects: if isinstance(jsonObj, dict): json_id = jsonObj.get('_id', None) if json_id is not None: if json_id in id_to_json_map: id_to_json_map[json_id].append(jsonObj) else: id_to_json_map[json_id] = [jsonObj] return id_to_json_map def compare_json_objects(id_to_json_map): output_lines = [] for json_id, json_list in id_to_json_map.items(): if len(json_list) != 2: continue first, second = json_list first_set = set(first.keys()) second_set = set(second.keys()) intersect_keys = first_set & second_set unique_first = first_set - second_set unique_second = second_set - first_set output_lines.append(f"比较的MongoId: {json_id}") if unique_first: output_lines.append("新表中独有的:") for key in unique_first: output_lines.append(f" {key}: {first.get(key)}") if unique_second: output_lines.append("旧表中独有的:") for key in unique_second: output_lines.append(f" {key}: {second.get(key)}") differing_values = [] for key in intersect_keys: if first.get(key) != second.get(key): differing_values.append(f" {key}: {first.get(key)} vs {second.get(key)}") if differing_values: output_lines.append("两表value不同的:") output_lines.extend(differing_values) output_lines.append("") current_path = os.getcwd() target_folder = 'tendata-warehouse' base_path = get_base_path(current_path, target_folder) # 可以指定自己的输出路径 file_path = os.path.join(base_path, "workspace", "output.txt") with open(file_path, 'w') as output_file: output_file.write('\n'.join(output_lines)) print("可在服务器查看{}".format(file_path)) def get_base_path(current_path, target_folder): path_parts = current_path.split(os.sep) target_index = path_parts.index(target_folder) base_path = os.sep.join(path_parts[:target_index + 1]) return base_path def main(): current_path = os.getcwd() target_folder = 'tendata-warehouse' base_path = get_base_path(current_path, target_folder) relative_path = input("请输入比较文件的相对路径 Path From Content Root: ") file_path = os.path.join(base_path, relative_path) id_to_json_map = analyze_json_objects(file_path) compare_json_objects(id_to_json_map) if __name__ == "__main__": main()