import pytest import networkx as nx from networkx.utils import nodes_equal, edges_equal from networkx.readwrite.graphml import GraphMLWriter import io import tempfile import os class BaseGraphML: @classmethod def setup_class(cls): cls.simple_directed_data = """ """ cls.simple_directed_graph = nx.DiGraph() cls.simple_directed_graph.add_node("n10") cls.simple_directed_graph.add_edge("n0", "n2", id="foo") cls.simple_directed_graph.add_edge("n0", "n2") cls.simple_directed_graph.add_edges_from( [ ("n1", "n2"), ("n2", "n3"), ("n3", "n5"), ("n3", "n4"), ("n4", "n6"), ("n6", "n5"), ("n5", "n7"), ("n6", "n8"), ("n8", "n7"), ("n8", "n9"), ] ) cls.simple_directed_fh = io.BytesIO(cls.simple_directed_data.encode("UTF-8")) cls.attribute_data = """ yellow green blue red turquoise 1.0 1.0 2.0 1.1 """ cls.attribute_graph = nx.DiGraph(id="G") cls.attribute_graph.graph["node_default"] = {"color": "yellow"} cls.attribute_graph.add_node("n0", color="green") cls.attribute_graph.add_node("n2", color="blue") cls.attribute_graph.add_node("n3", color="red") cls.attribute_graph.add_node("n4") cls.attribute_graph.add_node("n5", color="turquoise") cls.attribute_graph.add_edge("n0", "n2", id="e0", weight=1.0) cls.attribute_graph.add_edge("n0", "n1", id="e1", weight=1.0) cls.attribute_graph.add_edge("n1", "n3", id="e2", weight=2.0) cls.attribute_graph.add_edge("n3", "n2", id="e3") cls.attribute_graph.add_edge("n2", "n4", id="e4") cls.attribute_graph.add_edge("n3", "n5", id="e5") cls.attribute_graph.add_edge("n5", "n4", id="e6", weight=1.1) cls.attribute_fh = io.BytesIO(cls.attribute_data.encode("UTF-8")) cls.attribute_named_key_ids_data = """ val1 val2 val_one val2 edge_value """ cls.attribute_named_key_ids_graph = nx.DiGraph() cls.attribute_named_key_ids_graph.add_node("0", prop1="val1", prop2="val2") cls.attribute_named_key_ids_graph.add_node("1", prop1="val_one", prop2="val2") cls.attribute_named_key_ids_graph.add_edge("0", "1", edge_prop="edge_value") fh = io.BytesIO(cls.attribute_named_key_ids_data.encode("UTF-8")) cls.attribute_named_key_ids_fh = fh cls.attribute_numeric_type_data = """ 1 2.0 1 k 1.0 """ cls.attribute_numeric_type_graph = nx.DiGraph() cls.attribute_numeric_type_graph.add_node("n0", weight=1) cls.attribute_numeric_type_graph.add_node("n1", weight=2.0) cls.attribute_numeric_type_graph.add_edge("n0", "n1", weight=1) cls.attribute_numeric_type_graph.add_edge("n1", "n1", weight=1.0) fh = io.BytesIO(cls.attribute_numeric_type_data.encode("UTF-8")) cls.attribute_numeric_type_fh = fh cls.simple_undirected_data = """ """ # cls.simple_undirected_graph = nx.Graph() cls.simple_undirected_graph.add_node("n10") cls.simple_undirected_graph.add_edge("n0", "n2", id="foo") cls.simple_undirected_graph.add_edges_from([("n1", "n2"), ("n2", "n3")]) fh = io.BytesIO(cls.simple_undirected_data.encode("UTF-8")) cls.simple_undirected_fh = fh cls.undirected_multigraph_data = """ """ cls.undirected_multigraph = nx.MultiGraph() cls.undirected_multigraph.add_node("n10") cls.undirected_multigraph.add_edge("n0", "n2", id="e0") cls.undirected_multigraph.add_edge("n1", "n2", id="e1") cls.undirected_multigraph.add_edge("n2", "n1", id="e2") fh = io.BytesIO(cls.undirected_multigraph_data.encode("UTF-8")) cls.undirected_multigraph_fh = fh cls.undirected_multigraph_no_multiedge_data = """ """ cls.undirected_multigraph_no_multiedge = nx.MultiGraph() cls.undirected_multigraph_no_multiedge.add_node("n10") cls.undirected_multigraph_no_multiedge.add_edge("n0", "n2", id="e0") cls.undirected_multigraph_no_multiedge.add_edge("n1", "n2", id="e1") cls.undirected_multigraph_no_multiedge.add_edge("n2", "n3", id="e2") fh = io.BytesIO(cls.undirected_multigraph_no_multiedge_data.encode("UTF-8")) cls.undirected_multigraph_no_multiedge_fh = fh cls.multigraph_only_ids_for_multiedges_data = """ """ cls.multigraph_only_ids_for_multiedges = nx.MultiGraph() cls.multigraph_only_ids_for_multiedges.add_node("n10") cls.multigraph_only_ids_for_multiedges.add_edge("n0", "n2") cls.multigraph_only_ids_for_multiedges.add_edge("n1", "n2", id="e1") cls.multigraph_only_ids_for_multiedges.add_edge("n2", "n1", id="e2") fh = io.BytesIO(cls.multigraph_only_ids_for_multiedges_data.encode("UTF-8")) cls.multigraph_only_ids_for_multiedges_fh = fh class TestReadGraphML(BaseGraphML): def test_read_simple_directed_graphml(self): G = self.simple_directed_graph H = nx.read_graphml(self.simple_directed_fh) assert sorted(G.nodes()) == sorted(H.nodes()) assert sorted(G.edges()) == sorted(H.edges()) assert sorted(G.edges(data=True)) == sorted(H.edges(data=True)) self.simple_directed_fh.seek(0) PG = nx.parse_graphml(self.simple_directed_data) assert sorted(G.nodes()) == sorted(PG.nodes()) assert sorted(G.edges()) == sorted(PG.edges()) assert sorted(G.edges(data=True)) == sorted(PG.edges(data=True)) def test_read_simple_undirected_graphml(self): G = self.simple_undirected_graph H = nx.read_graphml(self.simple_undirected_fh) assert nodes_equal(G.nodes(), H.nodes()) assert edges_equal(G.edges(), H.edges()) self.simple_undirected_fh.seek(0) PG = nx.parse_graphml(self.simple_undirected_data) assert nodes_equal(G.nodes(), PG.nodes()) assert edges_equal(G.edges(), PG.edges()) def test_read_undirected_multigraph_graphml(self): G = self.undirected_multigraph H = nx.read_graphml(self.undirected_multigraph_fh) assert nodes_equal(G.nodes(), H.nodes()) assert edges_equal(G.edges(), H.edges()) self.undirected_multigraph_fh.seek(0) PG = nx.parse_graphml(self.undirected_multigraph_data) assert nodes_equal(G.nodes(), PG.nodes()) assert edges_equal(G.edges(), PG.edges()) def test_read_undirected_multigraph_no_multiedge_graphml(self): G = self.undirected_multigraph_no_multiedge H = nx.read_graphml(self.undirected_multigraph_no_multiedge_fh) assert nodes_equal(G.nodes(), H.nodes()) assert edges_equal(G.edges(), H.edges()) self.undirected_multigraph_no_multiedge_fh.seek(0) PG = nx.parse_graphml(self.undirected_multigraph_no_multiedge_data) assert nodes_equal(G.nodes(), PG.nodes()) assert edges_equal(G.edges(), PG.edges()) def test_read_undirected_multigraph_only_ids_for_multiedges_graphml(self): G = self.multigraph_only_ids_for_multiedges H = nx.read_graphml(self.multigraph_only_ids_for_multiedges_fh) assert nodes_equal(G.nodes(), H.nodes()) assert edges_equal(G.edges(), H.edges()) self.multigraph_only_ids_for_multiedges_fh.seek(0) PG = nx.parse_graphml(self.multigraph_only_ids_for_multiedges_data) assert nodes_equal(G.nodes(), PG.nodes()) assert edges_equal(G.edges(), PG.edges()) def test_read_attribute_graphml(self): G = self.attribute_graph H = nx.read_graphml(self.attribute_fh) assert nodes_equal(G.nodes(True), sorted(H.nodes(data=True))) ge = sorted(G.edges(data=True)) he = sorted(H.edges(data=True)) for a, b in zip(ge, he): assert a == b self.attribute_fh.seek(0) PG = nx.parse_graphml(self.attribute_data) assert sorted(G.nodes(True)) == sorted(PG.nodes(data=True)) ge = sorted(G.edges(data=True)) he = sorted(PG.edges(data=True)) for a, b in zip(ge, he): assert a == b def test_directed_edge_in_undirected(self): s = """ """ fh = io.BytesIO(s.encode("UTF-8")) pytest.raises(nx.NetworkXError, nx.read_graphml, fh) pytest.raises(nx.NetworkXError, nx.parse_graphml, s) def test_undirected_edge_in_directed(self): s = """ """ fh = io.BytesIO(s.encode("UTF-8")) pytest.raises(nx.NetworkXError, nx.read_graphml, fh) pytest.raises(nx.NetworkXError, nx.parse_graphml, s) def test_key_raise(self): s = """ yellow green blue 1.0 """ fh = io.BytesIO(s.encode("UTF-8")) pytest.raises(nx.NetworkXError, nx.read_graphml, fh) pytest.raises(nx.NetworkXError, nx.parse_graphml, s) def test_hyperedge_raise(self): s = """ yellow green blue """ fh = io.BytesIO(s.encode("UTF-8")) pytest.raises(nx.NetworkXError, nx.read_graphml, fh) pytest.raises(nx.NetworkXError, nx.parse_graphml, s) def test_multigraph_keys(self): # Test that reading multigraphs uses edge id attributes as keys s = """ """ fh = io.BytesIO(s.encode("UTF-8")) G = nx.read_graphml(fh) expected = [("n0", "n1", "e0"), ("n0", "n1", "e1")] assert sorted(G.edges(keys=True)) == expected fh.seek(0) H = nx.parse_graphml(s) assert sorted(H.edges(keys=True)) == expected def test_preserve_multi_edge_data(self): """ Test that data and keys of edges are preserved on consequent write and reads """ G = nx.MultiGraph() G.add_node(1) G.add_node(2) G.add_edges_from( [ # edges with no data, no keys: (1, 2), # edges with only data: (1, 2, dict(key="data_key1")), (1, 2, dict(id="data_id2")), (1, 2, dict(key="data_key3", id="data_id3")), # edges with both data and keys: (1, 2, 103, dict(key="data_key4")), (1, 2, 104, dict(id="data_id5")), (1, 2, 105, dict(key="data_key6", id="data_id7")), ] ) fh = io.BytesIO() nx.write_graphml(G, fh) fh.seek(0) H = nx.read_graphml(fh, node_type=int) assert edges_equal(G.edges(data=True, keys=True), H.edges(data=True, keys=True)) assert G._adj == H._adj Gadj = { str(node): { str(nbr): {str(ekey): dd for ekey, dd in key_dict.items()} for nbr, key_dict in nbr_dict.items() } for node, nbr_dict in G._adj.items() } fh.seek(0) HH = nx.read_graphml(fh, node_type=str, edge_key_type=str) assert Gadj == HH._adj fh.seek(0) string_fh = fh.read() HH = nx.parse_graphml(string_fh, node_type=str, edge_key_type=str) assert Gadj == HH._adj def test_yfiles_extension(self): data = """ 1 2 """ fh = io.BytesIO(data.encode("UTF-8")) G = nx.read_graphml(fh, force_multigraph=True) assert list(G.edges()) == [("n0", "n1")] assert G.has_edge("n0", "n1", key="e0") assert G.nodes["n0"]["label"] == "1" assert G.nodes["n1"]["label"] == "2" fh.seek(0) G = nx.read_graphml(fh) assert list(G.edges()) == [("n0", "n1")] assert G["n0"]["n1"]["id"] == "e0" assert G.nodes["n0"]["label"] == "1" assert G.nodes["n1"]["label"] == "2" H = nx.parse_graphml(data, force_multigraph=True) assert list(H.edges()) == [("n0", "n1")] assert H.has_edge("n0", "n1", key="e0") assert H.nodes["n0"]["label"] == "1" assert H.nodes["n1"]["label"] == "2" H = nx.parse_graphml(data) assert list(H.edges()) == [("n0", "n1")] assert H["n0"]["n1"]["id"] == "e0" assert H.nodes["n0"]["label"] == "1" assert H.nodes["n1"]["label"] == "2" def test_bool(self): s = """ false true false FaLsE True 0 1 """ fh = io.BytesIO(s.encode("UTF-8")) G = nx.read_graphml(fh) H = nx.parse_graphml(s) for graph in [G, H]: assert graph.nodes["n0"]["test"] assert not graph.nodes["n2"]["test"] assert not graph.nodes["n3"]["test"] assert graph.nodes["n4"]["test"] assert not graph.nodes["n5"]["test"] assert graph.nodes["n6"]["test"] def test_graphml_header_line(self): good = """ false true """ bad = """ false true """ ugly = """ false true """ for s in (good, bad): fh = io.BytesIO(s.encode("UTF-8")) G = nx.read_graphml(fh) H = nx.parse_graphml(s) for graph in [G, H]: assert graph.nodes["n0"]["test"] fh = io.BytesIO(ugly.encode("UTF-8")) pytest.raises(nx.NetworkXError, nx.read_graphml, fh) pytest.raises(nx.NetworkXError, nx.parse_graphml, ugly) def test_read_attributes_with_groups(self): data = """\ 2 Group 3 Folder 3 Group 1 Folder 1 1 3 Group 2 Folder 2 5 6 9 """ # verify that nodes / attributes are correctly read when part of a group fh = io.BytesIO(data.encode("UTF-8")) G = nx.read_graphml(fh) data = [x for _, x in G.nodes(data=True)] assert len(data) == 9 for node_data in data: assert node_data["CustomProperty"] != "" def test_long_attribute_type(self): # test that graphs with attr.type="long" (as produced by botch and # dose3) can be parsed s = """ 4284 """ fh = io.BytesIO(s.encode("UTF-8")) G = nx.read_graphml(fh) expected = [("n1", {"cudfversion": 4284})] assert sorted(G.nodes(data=True)) == expected fh.seek(0) H = nx.parse_graphml(s) assert sorted(H.nodes(data=True)) == expected class TestWriteGraphML(BaseGraphML): writer = staticmethod(nx.write_graphml_lxml) @classmethod def setup_class(cls): BaseGraphML.setup_class() _ = pytest.importorskip("lxml.etree") def test_write_interface(self): try: import lxml.etree assert nx.write_graphml == nx.write_graphml_lxml except ImportError: assert nx.write_graphml == nx.write_graphml_xml def test_write_read_simple_directed_graphml(self): G = self.simple_directed_graph G.graph["hi"] = "there" fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh) assert sorted(G.nodes()) == sorted(H.nodes()) assert sorted(G.edges()) == sorted(H.edges()) assert sorted(G.edges(data=True)) == sorted(H.edges(data=True)) self.simple_directed_fh.seek(0) def test_GraphMLWriter_add_graphs(self): gmlw = GraphMLWriter() G = self.simple_directed_graph H = G.copy() gmlw.add_graphs([G, H]) def test_write_read_simple_no_prettyprint(self): G = self.simple_directed_graph G.graph["hi"] = "there" G.graph["id"] = "1" fh = io.BytesIO() self.writer(G, fh, prettyprint=False) fh.seek(0) H = nx.read_graphml(fh) assert sorted(G.nodes()) == sorted(H.nodes()) assert sorted(G.edges()) == sorted(H.edges()) assert sorted(G.edges(data=True)) == sorted(H.edges(data=True)) self.simple_directed_fh.seek(0) def test_write_read_attribute_named_key_ids_graphml(self): from xml.etree.ElementTree import parse G = self.attribute_named_key_ids_graph fh = io.BytesIO() self.writer(G, fh, named_key_ids=True) fh.seek(0) H = nx.read_graphml(fh) fh.seek(0) assert nodes_equal(G.nodes(), H.nodes()) assert edges_equal(G.edges(), H.edges()) assert edges_equal(G.edges(data=True), H.edges(data=True)) self.attribute_named_key_ids_fh.seek(0) xml = parse(fh) # Children are the key elements, and the graph element children = list(xml.getroot()) assert len(children) == 4 keys = [child.items() for child in children[:3]] assert len(keys) == 3 assert ("id", "edge_prop") in keys[0] assert ("attr.name", "edge_prop") in keys[0] assert ("id", "prop2") in keys[1] assert ("attr.name", "prop2") in keys[1] assert ("id", "prop1") in keys[2] assert ("attr.name", "prop1") in keys[2] # Confirm the read graph nodes/edge are identical when compared to # default writing behavior. default_behavior_fh = io.BytesIO() nx.write_graphml(G, default_behavior_fh) default_behavior_fh.seek(0) H = nx.read_graphml(default_behavior_fh) named_key_ids_behavior_fh = io.BytesIO() nx.write_graphml(G, named_key_ids_behavior_fh, named_key_ids=True) named_key_ids_behavior_fh.seek(0) J = nx.read_graphml(named_key_ids_behavior_fh) assert all(n1 == n2 for (n1, n2) in zip(H.nodes, J.nodes)) assert all(e1 == e2 for (e1, e2) in zip(H.edges, J.edges)) def test_write_read_attribute_numeric_type_graphml(self): from xml.etree.ElementTree import parse G = self.attribute_numeric_type_graph fh = io.BytesIO() self.writer(G, fh, infer_numeric_types=True) fh.seek(0) H = nx.read_graphml(fh) fh.seek(0) assert nodes_equal(G.nodes(), H.nodes()) assert edges_equal(G.edges(), H.edges()) assert edges_equal(G.edges(data=True), H.edges(data=True)) self.attribute_numeric_type_fh.seek(0) xml = parse(fh) # Children are the key elements, and the graph element children = list(xml.getroot()) assert len(children) == 3 keys = [child.items() for child in children[:2]] assert len(keys) == 2 assert ("attr.type", "double") in keys[0] assert ("attr.type", "double") in keys[1] def test_more_multigraph_keys(self): """Writing keys as edge id attributes means keys become strings. The original keys are stored as data, so read them back in if `str(key) == edge_id` This allows the adjacency to remain the same. """ G = nx.MultiGraph() G.add_edges_from([("a", "b", 2), ("a", "b", 3)]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname) assert H.is_multigraph() assert edges_equal(G.edges(keys=True), H.edges(keys=True)) assert G._adj == H._adj os.close(fd) os.unlink(fname) def test_default_attribute(self): G = nx.Graph(name="Fred") G.add_node(1, label=1, color="green") nx.add_path(G, [0, 1, 2, 3]) G.add_edge(1, 2, weight=3) G.graph["node_default"] = {"color": "yellow"} G.graph["edge_default"] = {"weight": 7} fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh, node_type=int) assert nodes_equal(G.nodes(), H.nodes()) assert edges_equal(G.edges(), H.edges()) assert G.graph == H.graph def test_mixed_type_attributes(self): G = nx.MultiGraph() G.add_node("n0", special=False) G.add_node("n1", special=0) G.add_edge("n0", "n1", special=False) G.add_edge("n0", "n1", special=0) fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh) assert not H.nodes["n0"]["special"] assert H.nodes["n1"]["special"] == 0 assert not H.edges["n0", "n1", 0]["special"] assert H.edges["n0", "n1", 1]["special"] == 0 def test_str_number_mixed_type_attributes(self): G = nx.MultiGraph() G.add_node("n0", special="hello") G.add_node("n1", special=0) G.add_edge("n0", "n1", special="hello") G.add_edge("n0", "n1", special=0) fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh) assert H.nodes["n0"]["special"] == "hello" assert H.nodes["n1"]["special"] == 0 assert H.edges["n0", "n1", 0]["special"] == "hello" assert H.edges["n0", "n1", 1]["special"] == 0 def test_mixed_int_type_number_attributes(self): np = pytest.importorskip("numpy") G = nx.MultiGraph() G.add_node("n0", special=np.int64(0)) G.add_node("n1", special=1) G.add_edge("n0", "n1", special=np.int64(2)) G.add_edge("n0", "n1", special=3) fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh) assert H.nodes["n0"]["special"] == 0 assert H.nodes["n1"]["special"] == 1 assert H.edges["n0", "n1", 0]["special"] == 2 assert H.edges["n0", "n1", 1]["special"] == 3 def test_numpy_float(self): np = pytest.importorskip("numpy") wt = np.float_(3.4) G = nx.Graph([(1, 2, {"weight": wt})]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=int) assert G._adj == H._adj os.close(fd) os.unlink(fname) def test_multigraph_to_graph(self): # test converting multigraph to graph if no parallel edges found G = nx.MultiGraph() G.add_edges_from([("a", "b", 2), ("b", "c", 3)]) # no multiedges fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname) assert not H.is_multigraph() H = nx.read_graphml(fname, force_multigraph=True) assert H.is_multigraph() os.close(fd) os.unlink(fname) # add a multiedge G.add_edge("a", "b", "e-id") fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname) assert H.is_multigraph() H = nx.read_graphml(fname, force_multigraph=True) assert H.is_multigraph() os.close(fd) os.unlink(fname) def test_numpy_float64(self): np = pytest.importorskip("numpy") wt = np.float64(3.4) G = nx.Graph([(1, 2, {"weight": wt})]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=int) assert G.edges == H.edges wtG = G[1][2]["weight"] wtH = H[1][2]["weight"] assert wtG == pytest.approx(wtH, abs=1e-6) assert type(wtG) == np.float64 assert type(wtH) == float os.close(fd) os.unlink(fname) def test_numpy_float32(self): np = pytest.importorskip("numpy") wt = np.float32(3.4) G = nx.Graph([(1, 2, {"weight": wt})]) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=int) assert G.edges == H.edges wtG = G[1][2]["weight"] wtH = H[1][2]["weight"] assert wtG == pytest.approx(wtH, abs=1e-6) assert type(wtG) == np.float32 assert type(wtH) == float os.close(fd) os.unlink(fname) def test_numpy_float64_inference(self): np = pytest.importorskip("numpy") G = self.attribute_numeric_type_graph G.edges[("n1", "n1")]["weight"] = np.float64(1.1) fd, fname = tempfile.mkstemp() self.writer(G, fname, infer_numeric_types=True) H = nx.read_graphml(fname) assert G._adj == H._adj os.close(fd) os.unlink(fname) def test_unicode_attributes(self): G = nx.Graph() name1 = chr(2344) + chr(123) + chr(6543) name2 = chr(5543) + chr(1543) + chr(324) node_type = str G.add_edge(name1, "Radiohead", foo=name2) fd, fname = tempfile.mkstemp() self.writer(G, fname) H = nx.read_graphml(fname, node_type=node_type) assert G._adj == H._adj os.close(fd) os.unlink(fname) def test_unicode_escape(self): # test for handling json escaped stings in python 2 Issue #1880 import json a = dict(a='{"a": "123"}') # an object with many chars to escape sa = json.dumps(a) G = nx.Graph() G.graph["test"] = sa fh = io.BytesIO() self.writer(G, fh) fh.seek(0) H = nx.read_graphml(fh) assert G.graph["test"] == H.graph["test"] class TestXMLGraphML(TestWriteGraphML): writer = staticmethod(nx.write_graphml_xml) @classmethod def setup_class(cls): TestWriteGraphML.setup_class()